Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions rivetkit-rust/packages/rivetkit-core/src/actor/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2156,14 +2156,17 @@
self.lifecycle = lifecycle;
if matches!(lifecycle, LifecycleState::Started) {
// A restarted actor is a new generation. Clear shutdown state that was
// only meant to stop the previous generation.

Check warning on line 2159 in rivetkit-rust/packages/rivetkit-core/src/actor/task.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/rivetkit-rust/packages/rivetkit-core/src/actor/task.rs
self.ctx.reset_abort_signal_for_start();
self.ctx.clear_sleep_requested();
}
self.ctx
.set_started(matches!(
lifecycle,
LifecycleState::Started | LifecycleState::SleepGrace
));
}
self.ctx
.set_started(matches!(lifecycle, LifecycleState::Started));
}
}

fn shutdown_reason_label(reason: ShutdownKind) -> &'static str {
match reason {
Expand Down
152 changes: 87 additions & 65 deletions rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,20 +103,23 @@

enum ActorInstanceState {
Active(ActiveActorInstance),
Stopping(ActiveActorInstance),
Stopping {
instance: ActiveActorInstance,
reason: ShutdownKind,
},
}

impl ActorInstanceState {
fn instance(&self) -> ActiveActorInstance {
match self {
Self::Active(instance) | Self::Stopping(instance) => instance.clone(),
Self::Active(instance) | Self::Stopping { instance, .. } => instance.clone(),
}
}

fn active_instance(&self) -> Option<ActiveActorInstance> {
match self {
Self::Active(instance) => Some(instance.clone()),
Self::Stopping(_) => None,
Self::Stopping { .. } => None,
}
}
}
Expand Down Expand Up @@ -672,22 +675,23 @@
Ok(instance) => {
let pending_stop = self
.pending_stops
.remove_async(&request.actor_id.clone())

Check warning on line 678 in rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs
.await
.map(|(_, pending_stop)| pending_stop);
if let Some(pending_stop) = pending_stop {
let actor_id = request.actor_id.clone();
if matches!(
map_envoy_stop_reason(&pending_stop.reason),
ShutdownKind::Destroy
) {
instance.ctx.mark_destroy_requested();
}
self.set_actor_instance_state(
actor_id.clone(),
ActorInstanceState::Stopping(instance.clone()),
)
.await;
if let Some(pending_stop) = pending_stop {
let actor_id = request.actor_id.clone();
let stop_reason = map_envoy_stop_reason(&pending_stop.reason);
if matches!(stop_reason, ShutdownKind::Destroy) {
instance.ctx.mark_destroy_requested();
}
self.set_actor_instance_state(
actor_id.clone(),
ActorInstanceState::Stopping {
instance: instance.clone(),
reason: stop_reason,
},
)
.await;
let _ = self
.starting_instances
.remove_async(&request.actor_id.clone())
Expand Down Expand Up @@ -750,18 +754,25 @@
SccEntry::Vacant(entry) => {
entry.insert_entry(state);
}
}

Check warning on line 757 in rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs
}

async fn transition_actor_to_stopping(&self, actor_id: &str) -> Option<ActiveActorInstance> {
match self.actor_instances.entry_async(actor_id.to_owned()).await {
SccEntry::Occupied(mut entry) => {
let instance = entry.get().instance();
if matches!(entry.get(), ActorInstanceState::Active(_)) {
entry.insert(ActorInstanceState::Stopping(instance.clone()));
} else {
instance
.ctx
async fn transition_actor_to_stopping(
&self,
actor_id: &str,
reason: ShutdownKind,
) -> Option<ActiveActorInstance> {
match self.actor_instances.entry_async(actor_id.to_owned()).await {
SccEntry::Occupied(mut entry) => {
let instance = entry.get().instance();
if matches!(entry.get(), ActorInstanceState::Active(_)) {
entry.insert(ActorInstanceState::Stopping {
instance: instance.clone(),
reason,
});
} else {
instance
.ctx
.warn_work_sent_to_stopping_instance("stop_actor");
}
Some(instance)
Expand All @@ -773,13 +784,15 @@
}
}

async fn remove_stopping_actor_instance(&self, actor_id: &str, expected: &ActiveActorInstance) {

Check warning on line 787 in rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs
match self.actor_instances.entry_async(actor_id.to_owned()).await {
SccEntry::Occupied(entry) => {
let should_remove = match entry.get() {
ActorInstanceState::Stopping(instance) => Arc::ptr_eq(instance, expected),
ActorInstanceState::Active(_) => false,
};
let should_remove = match entry.get() {
ActorInstanceState::Stopping { instance, .. } => {
Arc::ptr_eq(instance, expected)
}
ActorInstanceState::Active(_) => false,
};
if should_remove {
let _ = entry.remove_entry();
}
Expand All @@ -790,48 +803,53 @@
}
}

async fn active_actor(&self, actor_id: &str) -> Result<Arc<ActorTaskHandle>> {

Check warning on line 806 in rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs
if let Some(instance) = self.actor_instances.get_async(&actor_id.to_owned()).await {
match instance.get() {
ActorInstanceState::Active(instance) => {
let instance = instance.clone();
if instance.ctx.started() {
if instance.ctx.destroy_requested() || instance.ctx.sleep_requested() {
instance
.ctx
.warn_work_sent_to_stopping_instance("active_actor");
return Err(if instance.ctx.destroy_requested() {
ActorLifecycleError::Destroying.build()
} else {
ActorLifecycleError::Stopping.build()
});
ActorInstanceState::Active(instance) => {
let instance = instance.clone();
if instance.ctx.started() {
if instance.ctx.destroy_requested() {
instance
.ctx
.warn_work_sent_to_stopping_instance("active_actor");
return Err(ActorLifecycleError::Destroying.build());
}
return Ok(instance);
}
return Ok(instance);
}

instance
.ctx
.warn_work_sent_to_stopping_instance("active_actor");

Check warning on line 823 in rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs
return Err(if instance.ctx.destroy_requested() {
ActorLifecycleError::Destroying.build()
} else {
ActorLifecycleError::Starting.build()
});
}
ActorInstanceState::Stopping(instance) => {
let instance = instance.clone();
instance
.ctx
.warn_work_sent_to_stopping_instance("active_actor");
return Err(if instance.ctx.destroy_requested() {
ActorLifecycleError::Destroying.build()
} else {
ActorLifecycleError::Stopping.build()
});
return Err(if instance.ctx.destroy_requested() {
ActorLifecycleError::Destroying.build()
} else if instance.ctx.sleep_requested() {
ActorLifecycleError::Stopping.build()
} else {
ActorLifecycleError::Starting.build()
});
}
ActorInstanceState::Stopping { instance, reason } => {
let instance = instance.clone();
match reason {
ShutdownKind::Sleep if instance.ctx.started() => return Ok(instance),
ShutdownKind::Sleep => {
instance
.ctx
.warn_work_sent_to_stopping_instance("active_actor");
return Err(ActorLifecycleError::Stopping.build());
}
ShutdownKind::Destroy => {
instance
.ctx
.warn_work_sent_to_stopping_instance("active_actor");
return Err(ActorLifecycleError::Destroying.build());
}
}
}
}
}
}

Check warning on line 852 in rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/rivetkit-rust/packages/rivetkit-core/src/registry/mod.rs
tracing::warn!(actor_id, "actor instance not found");
Err(ActorRuntime::NotFound {
resource: "instance".to_owned(),
Expand Down Expand Up @@ -865,9 +883,13 @@
return Ok(());
}

let instance = match self.transition_actor_to_stopping(actor_id).await {
Some(instance) => instance,
None => {
let task_stop_reason = map_envoy_stop_reason(&reason);
let instance = match self
.transition_actor_to_stopping(actor_id, task_stop_reason)
.await
{
Some(instance) => instance,
None => {
let _ = self
.pending_stops
.insert_async(
Expand All @@ -881,8 +903,8 @@
return Ok(());
}
};
let result = self
.shutdown_started_instance(actor_id, instance.clone(), reason, stop_handle)
let result = self
.shutdown_started_instance(actor_id, instance.clone(), reason, stop_handle)
.await;
self.remove_stopping_actor_instance(actor_id, &instance)
.await;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,8 +428,7 @@ describeDriverMatrix("Actor Sleep Db", (driverTestConfig) => {
expect(events).toContain("sleep-end");
});

// TODO(#4705): Root-cause handle action dispatch ordering during sleep shutdown and re-enable this coverage.
test.skip("action via handle during sleep shutdown is not queued", async (c) => {
test("action via handle during sleep shutdown is not queued", async (c) => {
const { client } = await setupDriverTest(c, driverTestConfig);

const handle = client.sleepWithDbAction.getOrCreate([
Expand Down Expand Up @@ -647,8 +646,7 @@ describeDriverMatrix("Actor Sleep Db", (driverTestConfig) => {
);
});

// TODO(#4705): Root-cause connection action dispatch ordering during sleep shutdown and re-enable this coverage.
test.skip("action via WebSocket connection during sleep shutdown is not queued", async (c) => {
test("action via WebSocket connection during sleep shutdown is not queued", async (c) => {
const { client } = await setupDriverTest(c, driverTestConfig);

const handle = client.sleepWithDbAction.getOrCreate([
Expand Down Expand Up @@ -698,8 +696,7 @@ describeDriverMatrix("Actor Sleep Db", (driverTestConfig) => {
expect(events).not.toContain("ws-during-sleep");
}
});
// TODO(#4705): Root-cause new connection behavior during sleep shutdown and re-enable this coverage.
test.skip("new connections rejected during sleep shutdown", async (c) => {
test("new connections rejected during sleep shutdown", async (c) => {
const { client } = await setupDriverTest(c, driverTestConfig);

// The sleepWithDbAction actor has a 500ms delay in
Expand Down Expand Up @@ -744,8 +741,7 @@ describeDriverMatrix("Actor Sleep Db", (driverTestConfig) => {
await secondConn.dispose();
});

// TODO(#4705): Root-cause raw WebSocket admission during sleep shutdown and re-enable this coverage.
test.skip("new raw WebSocket during sleep shutdown is rejected or queued", async (c) => {
test("new raw WebSocket during sleep shutdown is rejected or queued", async (c) => {
const { client } = await setupDriverTest(c, driverTestConfig);

// The sleepWithRawWs actor has a 500ms delay in onSleep.
Expand Down
Loading