Skip to content

Commit

Permalink
Make RUM work on 10devel
Browse files Browse the repository at this point in the history
PostgreSQL 10devel was feature freezed.  Thus, we should start thinking about
providing RUM for it.  This commit adopts RUM to API changes yet introduced
in 10devel.  Macros are used to provide build on both 9.6 and 10.
  • Loading branch information
Alexander Korotkov committed Apr 11, 2017
1 parent eeb0a98 commit a1da826
Show file tree
Hide file tree
Showing 5 changed files with 68 additions and 12 deletions.
6 changes: 5 additions & 1 deletion src/rum.h
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,11 @@ extern IndexBuildResult *rumbuild(Relation heap, Relation index,
extern void rumbuildempty(Relation index);
extern bool ruminsert(Relation index, Datum *values, bool *isnull,
ItemPointer ht_ctid, Relation heapRel,
IndexUniqueCheck checkUnique);
IndexUniqueCheck checkUnique
#if PG_VERSION_NUM >= 100000
, struct IndexInfo *indexInfo
#endif
);
extern void rumEntryInsert(RumState * rumstate,
OffsetNumber attnum, Datum key, RumNullCategory category,
RumKey * items, uint32 nitem, GinStatsData *buildStats);
Expand Down
9 changes: 7 additions & 2 deletions src/rum_ts_utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@

#include <math.h>

/* Use TS_EXEC_PHRASE_AS_AND when TS_EXEC_PHRASE_NO_POS is not defined */
#ifndef TS_EXEC_PHRASE_NO_POS
#define TS_EXEC_PHRASE_NO_POS TS_EXEC_PHRASE_AS_AND
#endif

PG_FUNCTION_INFO_V1(rum_extract_tsvector);
PG_FUNCTION_INFO_V1(rum_extract_tsvector_hash);
PG_FUNCTION_INFO_V1(rum_extract_tsquery);
Expand Down Expand Up @@ -177,7 +182,7 @@ rum_tsquery_pre_consistent(PG_FUNCTION_ARGS)

res = TS_execute(GETQUERY(query),
&gcv,
TS_EXEC_PHRASE_AS_AND,
TS_EXEC_PHRASE_NO_POS,
pre_checkcondition_rum);
}

Expand Down Expand Up @@ -326,7 +331,7 @@ rum_tsquery_timestamp_consistent(PG_FUNCTION_ARGS)
gcv.recheckPhrase = true;

res = TS_execute(GETQUERY(query), &gcv,
TS_EXEC_CALC_NOT | TS_EXEC_PHRASE_AS_AND,
TS_EXEC_CALC_NOT | TS_EXEC_PHRASE_NO_POS,
checkcondition_rum);
}

Expand Down
6 changes: 5 additions & 1 deletion src/ruminsert.c
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,11 @@ rumHeapTupleInsert(RumState * rumstate, OffsetNumber attnum,
bool
ruminsert(Relation index, Datum *values, bool *isnull,
ItemPointer ht_ctid, Relation heapRel,
IndexUniqueCheck checkUnique)
IndexUniqueCheck checkUnique
#if PG_VERSION_NUM >= 100000
, struct IndexInfo *indexInfo
#endif
)
{
RumState rumstate;
MemoryContext oldCtx;
Expand Down
56 changes: 48 additions & 8 deletions src/rumsort.c
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,11 @@ bool trace_sort = false;
bool optimize_bounded_sort = true;
#endif

#if PG_VERSION_NUM < 100000
/* Provide fallback for old version of tape interface for 9.6 */
#define LogicalTapeRewindForRead(x, y, z) LogicalTapeRewind((x), (y), false)
#define LogicalTapeRewindForWrite(x, y) LogicalTapeRewind((x), (y), true)
#endif

/*
* The objects we actually sort are SortTuple structs. These contain
Expand Down Expand Up @@ -300,6 +305,9 @@ struct Tuplesortstate
int memtupsize; /* allocated length of memtuples array */
bool growmemtuples; /* memtuples' growth still underway? */

/* Buffer size to use for reading input tapes, during merge. */
size_t read_buffer_size;

/*
* While building initial runs, this is the current output run number
* (starting at 0). Afterwards, it is the number of initial runs we made.
Expand Down Expand Up @@ -2364,6 +2372,8 @@ mergeruns(Tuplesortstate *state)
svTape,
svRuns,
svDummy;
int numTapes;
int numInputTapes;

Assert(state->status == TSS_BUILDRUNS);
Assert(state->memtupcount == 0);
Expand All @@ -2383,9 +2393,33 @@ mergeruns(Tuplesortstate *state)
return;
}

/*
* If we had fewer runs than tapes, refund the memory that we imagined we
* would need for the tape buffers of the unused tapes.
*
* numTapes and numInputTapes reflect the actual number of tapes we will
* use. Note that the output tape's tape number is maxTapes - 1, so the
* tape numbers of the used tapes are not consecutive, and you cannot just
* loop from 0 to numTapes to visit all used tapes!
*/
if (state->Level == 1)
{
numInputTapes = state->currentRun;
numTapes = numInputTapes + 1;
FREEMEM(state, (state->maxTapes - numTapes) * TAPE_BUFFER_OVERHEAD);
}
else
{
numInputTapes = state->tapeRange;
numTapes = state->maxTapes;
}

state->read_buffer_size = Max(state->availMem / numInputTapes, 0);
USEMEM(state, state->read_buffer_size * numInputTapes);

/* End of step D2: rewind all output tapes to prepare for merging */
for (tapenum = 0; tapenum < state->tapeRange; tapenum++)
LogicalTapeRewind(state->tapeset, tapenum, false);
LogicalTapeRewindForRead(state->tapeset, tapenum, state->read_buffer_size);

for (;;)
{
Expand Down Expand Up @@ -2448,11 +2482,10 @@ mergeruns(Tuplesortstate *state)
if (--state->Level == 0)
break;
/* rewind output tape T to use as new input */
LogicalTapeRewind(state->tapeset, state->tp_tapenum[state->tapeRange],
false);
LogicalTapeRewindForRead(state->tapeset, state->tp_tapenum[state->tapeRange],
state->read_buffer_size);
/* rewind used-up input tape P, and prepare it for write pass */
LogicalTapeRewind(state->tapeset, state->tp_tapenum[state->tapeRange - 1],
true);
LogicalTapeRewindForWrite(state->tapeset, state->tp_tapenum[state->tapeRange - 1]);
state->tp_runs[state->tapeRange - 1] = 0;

/*
Expand Down Expand Up @@ -2818,9 +2851,9 @@ rum_tuplesort_rescan(Tuplesortstate *state)
state->markpos_eof = false;
break;
case TSS_SORTEDONTAPE:
LogicalTapeRewind(state->tapeset,
state->result_tape,
false);
LogicalTapeRewindForRead(state->tapeset,
state->result_tape,
state->read_buffer_size);
state->eof_reached = false;
state->markpos_block = 0L;
state->markpos_offset = 0;
Expand Down Expand Up @@ -2883,11 +2916,18 @@ rum_tuplesort_restorepos(Tuplesortstate *state)
state->eof_reached = state->markpos_eof;
break;
case TSS_SORTEDONTAPE:
#if PG_VERSION_NUM < 100000
if (!LogicalTapeSeek(state->tapeset,
state->result_tape,
state->markpos_block,
state->markpos_offset))
elog(ERROR, "rum_tuplesort_restorepos failed");
#else
LogicalTapeSeek(state->tapeset,
state->result_tape,
state->markpos_block,
state->markpos_offset);
#endif
state->eof_reached = state->markpos_eof;
break;
default:
Expand Down
3 changes: 3 additions & 0 deletions src/rumvalidate.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
#include "catalog/pg_type.h"
#include "utils/builtins.h"
#include "utils/catcache.h"
#if PG_VERSION_NUM >= 100000
#include "utils/regproc.h"
#endif
#include "utils/syscache.h"

#include "rum.h"
Expand Down

0 comments on commit a1da826

Please sign in to comment.