Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions rivetkit-rust/packages/rivetkit-core/tests/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1208,6 +1208,76 @@ mod moved_tests {
assert_eq!(remaining_conns[0].id(), hibernating_conn.id());
}

#[tokio::test]
async fn wake_start_clears_previous_sleep_request() {
let kv = new_in_memory();
let ctx = new_with_kv("actor-wake", "task-wake", Vec::new(), "local", kv.clone());
let (_lifecycle_tx, lifecycle_rx) = mpsc::channel(4);
let (_dispatch_tx, dispatch_rx) = mpsc::channel(4);
let (events_tx, events_rx) = mpsc::channel(4);
ctx.configure_lifecycle_events(Some(events_tx));

let factory = Arc::new(ActorFactory::new(
ActorConfig {
sleep_grace_period: Duration::from_millis(200),
sleep_grace_period_overridden: true,
..ActorConfig::default()
},
|start| {
Box::pin(async move {
let mut events = start.events;
while let Some(event) = events.recv().await {
match event {
ActorEvent::SerializeState { reply, .. } => {
reply.send(Ok(Vec::new()));
}
ActorEvent::RunGracefulCleanup { reply, .. } => {
reply.send(Ok(()));
}
_ => {}
}
}
Ok(())
})
},
));

let mut task = ActorTask::new(
"actor-wake".into(),
0,
lifecycle_rx,
dispatch_rx,
events_rx,
factory,
ctx.clone(),
None,
None,
);
let (start_tx, start_rx) = oneshot::channel();
task.handle_lifecycle(LifecycleCommand::Start { reply: start_tx })
.await;
start_rx
.await
.expect("start reply should send")
.expect("start should succeed");

ctx.sleep().expect("sleep request should succeed");
assert!(ctx.sleep_requested());

task.handle_stop(ShutdownKind::Sleep)
.await
.expect("sleep stop should succeed");
let (wake_tx, wake_rx) = oneshot::channel();
task.handle_lifecycle(LifecycleCommand::Start { reply: wake_tx })
.await;
wake_rx
.await
.expect("wake reply should send")
.expect("wake should succeed");

assert!(!ctx.sleep_requested());
}

#[tokio::test]
async fn sleep_shutdown_waits_for_on_state_change_before_final_save() {
let kv = new_in_memory();
Expand Down
Loading