Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions engine/packages/gasoline/src/db/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1494,6 +1494,8 @@ impl Database for DatabaseKv {
let input_key = keys::workflow::InputKey::new(wf.workflow_id);
let state_key = keys::workflow::StateKey::new(wf.workflow_id);
let output_key = keys::workflow::OutputKey::new(wf.workflow_id);
let silence_ts_key =
keys::workflow::SilenceTsKey::new(wf.workflow_id);
let input_subspace = self.subspace.subspace(&input_key);
let state_subspace = self.subspace.subspace(&state_key);
let output_subspace = self.subspace.subspace(&output_key);
Expand All @@ -1510,6 +1512,7 @@ impl Database for DatabaseKv {
input_chunks,
state_chunks,
has_output,
silence_ts_entry,
events,
) = tokio::try_join!(
tx.get(&self.subspace.pack(&create_ts_key), Serializable),
Expand Down Expand Up @@ -1543,6 +1546,7 @@ impl Database for DatabaseKv {
.await
.map(|entry| entry.is_some())
},
tx.get(&self.subspace.pack(&silence_ts_key), Serializable),
async {
let mut events_by_location: HashMap<Location, Vec<Event>> =
HashMap::new();
Expand Down Expand Up @@ -1776,10 +1780,15 @@ impl Database for DatabaseKv {
Ok(Some(events_by_location))
}
)?;
let is_silenced = silence_ts_entry.is_some();

if has_output {
tracing::warn!(workflow_id=?wf.workflow_id, "workflow already completed, ignoring");
} else if is_silenced {
tracing::warn!(workflow_id=?wf.workflow_id, "workflow silenced, ignoring");
}

if has_output || is_silenced {
// Clear lease
let lease_key = keys::workflow::LeaseKey::new(wf.workflow_id);
tx.clear(&self.subspace.pack(&lease_key));
Expand Down
6 changes: 0 additions & 6 deletions engine/packages/guard/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,6 @@ pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> R
Id::new_v1(config.dc_label()),
)?;

// Initialize rustls with the default ring CryptoProvider.
let provider = rustls::crypto::ring::default_provider();
if provider.install_default().is_err() {
tracing::debug!("crypto provider already installed in this process");
}

// Share shared context
let shared_state = shared_state::SharedState::new(&config, ctx.ups()?);
shared_state.start().await?;
Expand Down
1 change: 1 addition & 0 deletions engine/packages/pegboard/src/workflows/actor2/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ pub async fn allocate(ctx: &ActivityCtx, input: &AllocateInput) -> Result<Alloca

if allocation.is_some() {
state.sleep_ts = None;
state.reschedule_ts = None;
}

Ok(AllocateOutput {
Expand Down
9 changes: 8 additions & 1 deletion engine/packages/pegboard/src/workflows/runner_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ pub async fn pegboard_runner_pool2(ctx: &mut WorkflowCtx, input: &Input) -> Resu
return Ok(());
}

ctx.v(2)
let error_tracker_wf_id = ctx
.v(2)
.workflow(runner_pool_error_tracker::Input {
namespace_id: input.namespace_id,
runner_name: input.runner_name.clone(),
Expand Down Expand Up @@ -180,6 +181,12 @@ pub async fn pegboard_runner_pool2(ctx: &mut WorkflowCtx, input: &Input) -> Resu
})
.await?;

ctx.v(2)
.signal(crate::workflows::runner_pool_error_tracker::Shutdown {})
.to_workflow_id(error_tracker_wf_id)
.send()
.await?;

Ok(())
}

Expand Down
Loading