Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions rivetkit-rust/packages/rivetkit-core/src/actor/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1164,16 +1164,10 @@ impl ActorTask {
self.ctx.warn_work_sent_to_stopping_instance("dispatch");
return Some(ActorLifecycleError::Destroying.build());
}
if self.ctx.sleep_requested() {
self.ctx.warn_work_sent_to_stopping_instance("dispatch");
return Some(ActorLifecycleError::Stopping.build());
}

match self.lifecycle {
LifecycleState::Started => None,
LifecycleState::SleepGrace
| LifecycleState::SleepFinalize
| LifecycleState::DestroyGrace => {
LifecycleState::Started | LifecycleState::SleepGrace => None,
LifecycleState::SleepFinalize | LifecycleState::DestroyGrace => {
self.ctx.warn_work_sent_to_stopping_instance("dispatch");
Some(ActorLifecycleError::Stopping.build())
}
Expand Down
2 changes: 2 additions & 0 deletions rivetkit-rust/packages/rivetkit-core/tests/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,7 @@ mod moved_tests {
envoy_key: "test-envoy".to_string(),
envoy_tx,
actors: Arc::new(std::sync::Mutex::new(HashMap::new())),
actors_notify: Arc::new(tokio::sync::Notify::new()),
live_tunnel_requests,
pending_hibernation_restores: Arc::new(std::sync::Mutex::new(HashMap::from([(
actor_id.to_owned(),
Expand Down Expand Up @@ -360,6 +361,7 @@ mod moved_tests {
envoy_key: "test-envoy".to_string(),
envoy_tx,
actors: Arc::new(std::sync::Mutex::new(HashMap::new())),
actors_notify: Arc::new(tokio::sync::Notify::new()),
live_tunnel_requests: Arc::new(std::sync::Mutex::new(HashMap::new())),
pending_hibernation_restores: Arc::new(std::sync::Mutex::new(HashMap::new())),
ws_tx: Arc::new(tokio::sync::Mutex::new(
Expand Down
1 change: 1 addition & 0 deletions rivetkit-rust/packages/rivetkit-core/tests/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ mod moved_tests {
envoy_key: "test-envoy".to_string(),
envoy_tx,
actors: Arc::new(EnvoySharedMutex::new(HashMap::new())),
actors_notify: Arc::new(tokio::sync::Notify::new()),
live_tunnel_requests: Arc::new(EnvoySharedMutex::new(HashMap::new())),
pending_hibernation_restores: Arc::new(EnvoySharedMutex::new(HashMap::new())),
ws_tx: Arc::new(tokio::sync::Mutex::new(
Expand Down
45 changes: 40 additions & 5 deletions rivetkit-rust/packages/rivetkit-core/tests/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ mod moved_tests {
envoy_key: "test-envoy".to_string(),
envoy_tx,
actors: Arc::new(Mutex::new(HashMap::new())),
actors_notify: Arc::new(tokio::sync::Notify::new()),
live_tunnel_requests: Arc::new(Mutex::new(HashMap::new())),
pending_hibernation_restores: Arc::new(Mutex::new(HashMap::new())),
ws_tx: Arc::new(tokio::sync::Mutex::new(
Expand Down Expand Up @@ -2812,11 +2813,14 @@ mod moved_tests {
})
.await
.expect("action should send during sleep grace");
let _error = action_rx
.await
.expect("action reply should send")
.expect_err("sleep grace should reject new dispatch");
assert_eq!(action_count.load(Ordering::SeqCst), 0);
assert_eq!(
action_rx
.await
.expect("action reply should send")
.expect("sleep grace should accept new dispatch"),
vec![7, 7, 7]
);
assert_eq!(action_count.load(Ordering::SeqCst), 1);
assert_eq!(destroy_count.load(Ordering::SeqCst), 0);

release_tx.send(()).expect("keep-awake release should send");
Expand All @@ -2833,6 +2837,37 @@ mod moved_tests {
.expect("task run should succeed");
}

#[tokio::test]
async fn sleep_finalize_rejects_new_dispatch() {
let ctx = new_with_kv(
"actor-sleep-finalize-dispatch",
"task-sleep-finalize-dispatch",
Vec::new(),
"local",
new_in_memory(),
);
let mut task = new_task(ctx);
task.lifecycle = LifecycleState::SleepFinalize;

let (reply_tx, reply_rx) = oneshot::channel();
task.handle_dispatch(DispatchCommand::Action {
name: "ping".to_owned(),
args: Vec::new(),
conn: ConnHandle::new("conn-finalize", Vec::new(), Vec::new(), false),
reply: reply_tx,
})
.await;

let error = reply_rx
.await
.expect("action reply should send")
.expect_err("sleep finalize should reject new dispatch");
assert!(
error.to_string().contains("Actor is stopping"),
"expected actor stopping error, got {error:#}"
);
}

#[cfg(not(debug_assertions))]
#[tokio::test]
async fn duplicate_destroy_during_sleep_grace_is_acked_and_ignored_in_release() {
Expand Down
Loading