Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 23 additions & 12 deletions src/adapter/src/coord/appends.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,18 +131,29 @@ pub(crate) enum BuiltinTableUpdateSource {
Background(oneshot::Sender<()>),
}

/// Where to deliver the result of a [`PendingWriteTxn::User`] write.
#[derive(Debug)]
pub(crate) enum UserWriteResponder {
/// Session-bound write. The coordinator retires the session's
/// `ExecuteContext` once the write commits.
Session(PendingTxn),
}

/// A pending write transaction that will be committing during the next group commit.
#[derive(Debug)]
pub(crate) enum PendingWriteTxn {
/// Write to a user table.
/// Write to a user table. The write timestamp is picked by the oracle
/// during group commit. The write lock is either handed off from the
/// submitting session (via `write_locks: Some(..)`) or acquired during
/// group commit (`write_locks: None`).
User {
span: Span,
/// List of all write operations within the transaction.
writes: BTreeMap<CatalogItemId, SmallVec<[TableData; 1]>>,
/// If they exist, should contain locks for each [`CatalogItemId`] in `writes`.
write_locks: Option<WriteLocks>,
/// Inner transaction.
pending_txn: PendingTxn,
/// Where to deliver the result once the write commits.
responder: UserWriteResponder,
},
/// Write to a system table.
System {
Expand Down Expand Up @@ -264,7 +275,7 @@ impl Coordinator {
span,
writes,
write_locks,
pending_txn,
responder: UserWriteResponder::Session(pending_txn),
});
}
}
Expand Down Expand Up @@ -349,7 +360,7 @@ impl Coordinator {
span,
write_locks: Some(write_locks),
writes,
pending_txn,
responder: UserWriteResponder::Session(pending_txn),
} => match write_locks.validate(writes.keys().copied()) {
Ok(validated_locks) => {
// Merge all of our write locks together since we can allow concurrent
Expand All @@ -360,7 +371,7 @@ impl Coordinator {
span,
writes,
write_locks: None,
pending_txn,
responder: UserWriteResponder::Session(pending_txn),
};
validated_writes.push(validated_write);
}
Expand All @@ -381,7 +392,7 @@ impl Coordinator {
span,
writes,
write_locks: None,
pending_txn,
responder: UserWriteResponder::Session(pending_txn),
} => {
let missing = group_write_locks.missing_locks(writes.keys().copied());

Expand All @@ -391,7 +402,7 @@ impl Coordinator {
span,
writes,
write_locks: None,
pending_txn,
responder: UserWriteResponder::Session(pending_txn),
};
validated_writes.push(validated_write);
} else {
Expand All @@ -412,7 +423,7 @@ impl Coordinator {
span,
writes,
write_locks: None,
pending_txn,
responder: UserWriteResponder::Session(pending_txn),
};
validated_writes.push(validated_write);
}
Expand Down Expand Up @@ -472,12 +483,12 @@ impl Coordinator {
span: _,
writes,
write_locks,
pending_txn:
PendingTxn {
responder:
UserWriteResponder::Session(PendingTxn {
ctx,
response,
action,
},
}),
} => {
assert_none!(write_locks, "should have merged together all locks above");
for (id, table_data) in writes {
Expand Down
6 changes: 3 additions & 3 deletions src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ use crate::command::{
CatalogSnapshot, Command, ExecuteResponse, Response, SASLChallengeResponse,
SASLVerifyProofResponse, StartupResponse, SuperuserAttribute,
};
use crate::coord::appends::PendingWriteTxn;
use crate::coord::appends::{PendingWriteTxn, UserWriteResponder};
use crate::coord::peek::PendingPeek;
use crate::coord::{
ConnMeta, Coordinator, DeferredPlanStatement, Message, PendingTxn, PlanStatement, PlanValidity,
Expand Down Expand Up @@ -1829,13 +1829,13 @@ impl Coordinator {
// Cancel pending writes. There is at most one pending write per session.
let pending_write_idx = self.pending_writes.iter().position(|pending_write_txn| {
matches!(pending_write_txn, PendingWriteTxn::User {
pending_txn: PendingTxn { ctx, .. },
responder: UserWriteResponder::Session(PendingTxn { ctx, .. }),
..
} if *ctx.session().conn_id() == conn_id)
});
if let Some(idx) = pending_write_idx {
if let PendingWriteTxn::User {
pending_txn: PendingTxn { ctx, .. },
responder: UserWriteResponder::Session(PendingTxn { ctx, .. }),
..
} = self.pending_writes.remove(idx)
{
Expand Down
8 changes: 5 additions & 3 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,9 @@ use tracing::{Instrument, Span, info, warn};

use crate::catalog::{self, Catalog, ConnCatalog, DropObjectInfo, UpdatePrivilegeVariant};
use crate::command::{ExecuteResponse, Response};
use crate::coord::appends::{BuiltinTableAppendNotify, DeferredOp, DeferredPlan, PendingWriteTxn};
use crate::coord::appends::{
BuiltinTableAppendNotify, DeferredOp, DeferredPlan, PendingWriteTxn, UserWriteResponder,
};
use crate::coord::read_then_write::validate_read_then_write_dependencies;
use crate::coord::sequencer::emit_optimizer_notices;
use crate::coord::{
Expand Down Expand Up @@ -2036,11 +2038,11 @@ impl Coordinator {
span: Span::current(),
writes: collected_writes,
write_locks: validated_locks,
pending_txn: PendingTxn {
responder: UserWriteResponder::Session(PendingTxn {
ctx,
response,
action,
},
}),
});
return;
}
Expand Down
Loading