Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions engine/packages/pegboard-envoy/src/actor_lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ use rivet_envoy_protocol as protocol;
use crate::conn::Conn;

pub async fn stop_actor(conn: &Conn, checkpoint: &protocol::ActorCheckpoint) -> Result<()> {
// Depot owns SQLite correctness in FDB. The connection only holds a perf cache, so
// lifecycle stop evicts stale local state without touching storage.
conn.actor_dbs.remove_async(&checkpoint.actor_id).await;
Ok(())
}

pub async fn shutdown_conn_actors(conn: &Conn) {
// See `stop_actor`. This drops only per-connection cache entries.
conn.actor_dbs.clear_sync();
}
63 changes: 53 additions & 10 deletions engine/packages/pegboard-envoy/tests/actor_lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,23 @@ fn sqlite_keys(actor_id: &str) -> Vec<Vec<u8>> {
]
}

fn new_actor_db(
db: Arc<universaldb::Database>,
namespace_label: u16,
actor_id: &str,
) -> Arc<Db> {
Arc::new(Db::new(
db,
Id::new_v1(namespace_label),
actor_id.to_string(),
NodeId::new(),
))
}

#[tokio::test]
async fn stop_actor_evicts_cached_actor_db() -> Result<()> {
let db = Arc::new(test_db().await?);
let actor_db = Arc::new(Db::new(
db,
Id::new_v1(TEST_NAMESPACE_LABEL),
TEST_ACTOR.to_string(),
NodeId::new()));
let actor_db = new_actor_db(db, TEST_NAMESPACE_LABEL, TEST_ACTOR);
let conn = conn::Conn {
actor_dbs: HashMap::new(),
};
Expand All @@ -116,11 +125,7 @@ async fn stop_actor_evicts_cached_actor_db() -> Result<()> {
#[tokio::test]
async fn stop_actor_does_not_touch_udb() -> Result<()> {
let db = Arc::new(test_db().await?);
let actor_db = Arc::new(Db::new(
Arc::clone(&db),
Id::new_v1(TEST_NAMESPACE_LABEL),
TEST_ACTOR.to_string(),
NodeId::new()));
let actor_db = new_actor_db(Arc::clone(&db), TEST_NAMESPACE_LABEL, TEST_ACTOR);
let conn = conn::Conn {
actor_dbs: HashMap::new(),
};
Expand All @@ -141,3 +146,41 @@ async fn stop_actor_does_not_touch_udb() -> Result<()> {

Ok(())
}

#[tokio::test]
async fn stop_actor_allows_missing_cache_entry() -> Result<()> {
let conn = conn::Conn {
actor_dbs: HashMap::new(),
};

actor_lifecycle::stop_actor(&conn, &checkpoint(TEST_ACTOR)).await?;

assert!(!conn.actor_dbs.contains_async(TEST_ACTOR).await);
Ok(())
}

#[tokio::test]
async fn shutdown_conn_actors_evicts_all_cached_actor_dbs() -> Result<()> {
let db = Arc::new(test_db().await?);
let conn = conn::Conn {
actor_dbs: HashMap::new(),
};

for (idx, actor_id) in ["shutdown-actor-a", "shutdown-actor-b"].into_iter().enumerate() {
let actor_db = new_actor_db(
Arc::clone(&db),
TEST_NAMESPACE_LABEL + idx as u16,
actor_id,
);
assert!(conn
.actor_dbs
.insert_async(actor_id.to_string(), actor_db)
.await
.is_ok());
}

actor_lifecycle::shutdown_conn_actors(&conn).await;

assert!(conn.actor_dbs.is_empty());
Ok(())
}
Loading