Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/spock_apply.c
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ static ApplyReplayEntry * apply_replay_tail = NULL;
static ApplyReplayEntry * apply_replay_next = NULL;

/* Total bytes of libpq-allocated data held by in-memory queue entries */
static int apply_replay_bytes = 0;
static uint64 apply_replay_bytes = 0;

static bool apply_replay_mode = false; /* true when replaying */
static BufFile *apply_replay_spill_file = NULL;
Expand Down Expand Up @@ -4488,7 +4488,7 @@ apply_replay_queue_append_entry(ApplyReplayEntry **entry_p, StringInfo *msg_p)
/* XXX: keep DEBUG1 logging until spill-to-disk code is proven stable */
elog(DEBUG1,
"SPOCK %s: replay queue spill activated: "
"in-memory %d bytes exceeds %d MB limit",
"in-memory " UINT64_FORMAT " bytes exceeds %d MB limit",
MySubscription->name, apply_replay_bytes,
spock_replay_queue_size);

Expand Down
23 changes: 22 additions & 1 deletion src/spock_apply_heap.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
#include "commands/sequence.h"
#include "commands/tablecmds.h"

#include "access/tableam.h"
#include "commands/trigger.h"
#include "executor/executor.h"
#include "executor/nodeModifyTable.h"

#include "libpq/pqformat.h"

Expand Down Expand Up @@ -705,17 +708,31 @@ spock_handle_conflict_and_apply(SpockRelation *rel, EState *estate,
*/
if (apply)
{
UserContext ucxt;
UserContext ucxt;
ResourceOwner save_resowner = NULL;

/* Make sure that any user-supplied code runs as the table owner. */
SwitchToUntrustedUser(rel->rel->rd_rel->relowner, &ucxt);

/*
* If this is a forced delta-apply we execute it in a subtransaction
* and record the local_ts & local_origin for CommitTsData override.
*
* Inside the subtransaction we redirect CurrentResourceOwner to
* TopTransactionResourceOwner for the executor call. Without this,
* TupleDesc refs acquired by the executor are registered under the
* SubTxn and force-released at ReleaseCurrentSubTransaction(),
* causing "tupdesc reference not owned by TopTransaction" during
* ExecResetTupleTable. Restored before ReleaseCurrentSubTransaction
* so the assertion holds, and inside PG_CATCH before re-throw so any
* outer error handler sees a sane state.
*/
if (is_delta_apply)
{
BeginInternalSubTransaction("SpockDeltaApply");
save_resowner = CurrentResourceOwner;
CurrentResourceOwner = TopTransactionResourceOwner;
}

EvalPlanQualSetSlot(epqstate, remoteslot);

Expand All @@ -726,6 +743,9 @@ spock_handle_conflict_and_apply(SpockRelation *rel, EState *estate,
}
PG_CATCH();
{
if (is_delta_apply)
CurrentResourceOwner = save_resowner;

/*
* If the UPDATE's new values violated a unique constraint,
* report it as an update_exists conflict before re-throwing.
Expand Down Expand Up @@ -763,6 +783,7 @@ spock_handle_conflict_and_apply(SpockRelation *rel, EState *estate,

if (is_delta_apply)
{
CurrentResourceOwner = save_resowner;
SubTransactionIdSetCommitTsData(GetCurrentTransactionId(),
local_ts, local_origin);
ReleaseCurrentSubTransaction();
Expand Down
12 changes: 11 additions & 1 deletion src/spock_node.c
Original file line number Diff line number Diff line change
Expand Up @@ -1109,7 +1109,17 @@ subscription_fromtuple(HeapTuple tuple, TupleDesc desc)
if (isnull)
sub->apply_delay = NULL;
else
sub->apply_delay = DatumGetIntervalP(d);
{
/*
* DatumGetIntervalP returns a pointer into the heap tuple (or a
* detoasted copy in the current memory context). Both go away when
* the calling transaction is committed, so deep-copy into the same
* context that owns this SpockSubscription.
*/
Interval *src = DatumGetIntervalP(d);
sub->apply_delay = (Interval *) palloc(sizeof(Interval));
memcpy(sub->apply_delay, src, sizeof(Interval));
}

/* Get force_text_transfer. */
d = heap_getattr(tuple, Anum_sub_force_text_transfer, desc, &isnull);
Expand Down
Loading