Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions engine/packages/pegboard-envoy/src/actor_lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,23 @@ pub async fn stop_actor(conn: &Conn, checkpoint: &protocol::ActorCheckpoint) ->

pub async fn actor_stopped(conn: &Conn, checkpoint: &protocol::ActorCheckpoint) -> Result<()> {
let actor_id = checkpoint.actor_id.clone();
let active = conn
let active = match conn
.active_actors
.get_async(&actor_id)
.await
.map(|entry| entry.get().clone())
.context("actor stopped without active sqlite state")?;
{
Some(active) => active,
None if conn.is_serverless => {
conn.sqlite_engine.force_close(&actor_id).await;
conn.serverless_sqlite_actors.remove_async(&actor_id).await;
return Ok(());
}
None => {
ensure!(false, "actor stopped without active sqlite state");
unreachable!();
}
};
ensure!(
active.actor_generation == checkpoint.generation,
"stopped actor generation {} did not match active generation {}",
Expand Down Expand Up @@ -288,6 +299,22 @@ pub async fn shutdown_conn_actors(conn: &Conn) {
.buffer_unordered(SHUTDOWN_CLOSE_PARALLELISM)
.for_each(|_| async {})
.await;

let mut serverless_sqlite_actors = Vec::new();
conn.serverless_sqlite_actors
.retain_sync(|actor_id, _generation| {
serverless_sqlite_actors.push(actor_id.clone());
false
});
stream::iter(serverless_sqlite_actors.into_iter().map(|actor_id| {
let sqlite_engine = conn.sqlite_engine.clone();
async move {
sqlite_engine.force_close(&actor_id).await;
}
}))
.buffer_unordered(SHUTDOWN_CLOSE_PARALLELISM)
.for_each(|_| async {})
.await;
}

async fn close_actor_on_shutdown(
Expand Down
2 changes: 2 additions & 0 deletions engine/packages/pegboard-envoy/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub struct Conn {
pub authorized_tunnel_routes: HashMap<(protocol::GatewayId, protocol::RequestId), ()>,
pub sqlite_engine: Arc<SqliteEngine>,
pub active_actors: HashMap<String, actor_lifecycle::ActiveActor>,
pub serverless_sqlite_actors: HashMap<String, u64>,
pub is_serverless: bool,
pub last_rtt: AtomicU32,
/// Timestamp (epoch ms) of the last pong received from the envoy.
Expand Down Expand Up @@ -306,6 +307,7 @@ pub async fn init_conn(
authorized_tunnel_routes: HashMap::new(),
sqlite_engine,
active_actors: HashMap::new(),
serverless_sqlite_actors: HashMap::new(),
is_serverless,
last_rtt: AtomicU32::new(0),
last_ping_ts: AtomicI64::new(util::timestamp::now()),
Expand Down
21 changes: 21 additions & 0 deletions engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -703,6 +703,7 @@ async fn handle_sqlite_get_pages(
) -> Result<protocol::SqliteGetPagesResponse> {
validate_sqlite_get_pages_request(&request)?;
validate_sqlite_actor(ctx, conn, &request.actor_id).await?;
ensure_serverless_sqlite_open(conn, &request.actor_id, request.generation).await?;

match conn
.sqlite_engine
Expand Down Expand Up @@ -747,6 +748,7 @@ async fn handle_sqlite_commit(
let decode_request_start = Instant::now();
validate_sqlite_dirty_pages("sqlite commit", &request.dirty_pages)?;
validate_sqlite_actor(ctx, conn, &request.actor_id).await?;
ensure_serverless_sqlite_open(conn, &request.actor_id, request.generation).await?;
let decode_request_duration = decode_request_start.elapsed();
conn.sqlite_engine.metrics().observe_commit_phase(
"fast",
Expand Down Expand Up @@ -813,6 +815,7 @@ async fn handle_sqlite_commit_stage(
request: protocol::SqliteCommitStageRequest,
) -> Result<protocol::SqliteCommitStageResponse> {
validate_sqlite_actor(ctx, conn, &request.actor_id).await?;
ensure_serverless_sqlite_open(conn, &request.actor_id, request.generation).await?;

match conn
.sqlite_engine
Expand Down Expand Up @@ -850,6 +853,7 @@ async fn handle_sqlite_commit_stage_begin(
request: protocol::SqliteCommitStageBeginRequest,
) -> Result<protocol::SqliteCommitStageBeginResponse> {
validate_sqlite_actor(ctx, conn, &request.actor_id).await?;
ensure_serverless_sqlite_open(conn, &request.actor_id, request.generation).await?;

match conn
.sqlite_engine
Expand Down Expand Up @@ -884,6 +888,7 @@ async fn handle_sqlite_commit_finalize(
) -> Result<protocol::SqliteCommitFinalizeResponse> {
let decode_request_start = Instant::now();
validate_sqlite_actor(ctx, conn, &request.actor_id).await?;
ensure_serverless_sqlite_open(conn, &request.actor_id, request.generation).await?;
conn.sqlite_engine.metrics().observe_commit_phase(
"slow",
"decode_request",
Expand Down Expand Up @@ -952,6 +957,22 @@ async fn validate_sqlite_actor(ctx: &StandaloneCtx, conn: &Conn, actor_id: &str)
Ok(())
}

async fn ensure_serverless_sqlite_open(conn: &Conn, actor_id: &str, generation: u64) -> Result<()> {
if !conn.is_serverless {
return Ok(());
}

conn.sqlite_engine
.ensure_local_open(actor_id, generation)
.await?;

conn.serverless_sqlite_actors
.upsert_async(actor_id.to_string(), generation)
.await;

Ok(())
}

async fn sqlite_fence_mismatch(
conn: &Conn,
actor_id: &str,
Expand Down
33 changes: 33 additions & 0 deletions engine/packages/sqlite-storage/src/open.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,39 @@ impl SqliteEngine {
Ok(())
}

pub async fn ensure_local_open(&self, actor_id: &str, generation: u64) -> Result<()> {
let head = self.load_head(actor_id).await?;
ensure!(
head.generation == generation,
SqliteStorageError::FenceMismatch {
reason: format!(
"ensure_local_open generation {} did not match current generation {}",
generation, head.generation
),
},
);

match self.open_dbs.entry_async(actor_id.to_string()).await {
scc::hash_map::Entry::Occupied(entry) => {
ensure!(
entry.get().generation == generation,
SqliteStorageError::FenceMismatch {
reason: format!(
"ensure_local_open generation {} did not match open generation {}",
generation,
entry.get().generation
),
},
);
}
scc::hash_map::Entry::Vacant(entry) => {
entry.insert_entry(OpenDb { generation });
}
}

Ok(())
}

// Unconditionally evict the actor's open-db / page-index / pending-stage caches without
// generation fencing. Use only on shutdown paths where keeping a stale entry would block
// future opens of the same actor on this process-wide engine.
Expand Down
Loading