Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions engine/packages/gasoline/src/ctx/activity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,17 @@ impl ActivityCtx {
.await
}

/// Finds the first incomplete workflow for each (name, tags) pair in a single batch transaction.
#[tracing::instrument(skip_all)]
pub async fn find_workflows(
&self,
queries: &[(&str, serde_json::Value)],
) -> Result<Vec<Option<Id>>> {
common::find_workflows(&self.db, queries)
.in_current_span()
.await
}

/// Finds the first incomplete workflow with the given tags.
#[tracing::instrument(skip_all)]
pub async fn get_workflows(&self, workflow_ids: Vec<Id>) -> Result<Vec<WorkflowData>> {
Expand Down
8 changes: 8 additions & 0 deletions engine/packages/gasoline/src/ctx/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ pub async fn find_workflow<W: Workflow>(
.map_err(Into::into)
}

/// Finds the first incomplete workflow for each (name, tags) pair in a single batch transaction.
pub async fn find_workflows(
db: &DatabaseHandle,
queries: &[(&str, serde_json::Value)],
) -> Result<Vec<Option<Id>>> {
db.find_workflows(queries).await.map_err(Into::into)
}

/// Finds the first incomplete workflow with the given tags.
pub async fn get_workflows(
db: &DatabaseHandle,
Expand Down
11 changes: 11 additions & 0 deletions engine/packages/gasoline/src/ctx/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,17 @@ impl OperationCtx {
.await
}

/// Finds the first incomplete workflow for each (name, tags) pair in a single batch transaction.
#[tracing::instrument(skip_all)]
pub async fn find_workflows(
&self,
queries: &[(&str, serde_json::Value)],
) -> Result<Vec<Option<Id>>> {
common::find_workflows(&self.db, queries)
.in_current_span()
.await
}

/// Finds the first incomplete workflow with the given tags.
#[tracing::instrument(skip_all)]
pub async fn get_workflows(&self, workflow_ids: Vec<Id>) -> Result<Vec<WorkflowData>> {
Expand Down
11 changes: 11 additions & 0 deletions engine/packages/gasoline/src/ctx/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,17 @@ impl StandaloneCtx {
.await
}

/// Finds the first incomplete workflow for each (name, tags) pair in a single batch transaction.
#[tracing::instrument(skip_all)]
pub async fn find_workflows(
&self,
queries: &[(&str, serde_json::Value)],
) -> Result<Vec<Option<Id>>> {
common::find_workflows(&self.db, queries)
.in_current_span()
.await
}

/// Finds the first incomplete workflow with the given tags.
#[tracing::instrument(skip_all)]
pub async fn get_workflows(&self, workflow_ids: Vec<Id>) -> Result<Vec<WorkflowData>> {
Expand Down
11 changes: 11 additions & 0 deletions engine/packages/gasoline/src/ctx/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,17 @@ impl TestCtx {
.await
}

/// Finds the first incomplete workflow for each (name, tags) pair in a single batch transaction.
#[tracing::instrument(skip_all)]
pub async fn find_workflows(
&self,
queries: &[(&str, serde_json::Value)],
) -> Result<Vec<Option<Id>>> {
common::find_workflows(&self.db, queries)
.in_current_span()
.await
}

/// Finds the first incomplete workflow with the given tags.
#[tracing::instrument(skip_all)]
pub async fn get_workflows(&self, workflow_ids: Vec<Id>) -> Result<Vec<WorkflowData>> {
Expand Down
30 changes: 29 additions & 1 deletion engine/packages/gasoline/src/db/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
};

use anyhow::{Context, Result, ensure};
use futures_util::{StreamExt, TryStreamExt, stream::BoxStream};
use futures_util::{StreamExt, TryStreamExt, future::try_join_all, stream::BoxStream};
use rivet_util::Id;
use rivet_util::future::CustomInstrumentExt;
use serde_json::json;
Expand Down Expand Up @@ -996,6 +996,34 @@
Ok(workflow_id)
}

#[tracing::instrument(skip_all)]
async fn find_workflows(
&self,
queries: &[(&str, serde_json::Value)],
) -> WorkflowResult<Vec<Option<Id>>> {
let start_instant = Instant::now();

let workflow_ids = self
.pools
.udb()
.map_err(WorkflowError::PoolsGeneric)?
.run(|tx| async move {
let futures = queries.iter().map(|(workflow_name, tags)| {
self.find_workflow_inner(workflow_name, tags, &tx)
});
try_join_all(futures).await
})
.custom_instrument(tracing::info_span!("find_workflows_batch_tx"))
.await
.context("failed to find workflows")
.map_err(WorkflowError::Udb)?;

let dt = start_instant.elapsed().as_secs_f64();
metrics::FIND_WORKFLOWS_DURATION.record(dt, &[KeyValue::new("workflow_name", "batch")]);

Check failure on line 1022 in engine/packages/gasoline/src/db/kv/mod.rs

View workflow job for this annotation

GitHub Actions / Check

no method named `record` found for struct `FIND_WORKFLOWS_DURATION` in the current scope

Check failure on line 1022 in engine/packages/gasoline/src/db/kv/mod.rs

View workflow job for this annotation

GitHub Actions / Check

failed to resolve: use of undeclared type `KeyValue`
Comment on lines +1021 to +1022
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: should be its own metric


Ok(workflow_ids)
}

#[tracing::instrument(skip_all)]
async fn pull_workflows(
&self,
Expand Down
6 changes: 6 additions & 0 deletions engine/packages/gasoline/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ pub trait Database: Send {
tags: &serde_json::Value,
) -> WorkflowResult<Option<Id>>;

/// Retrieves the first incomplete workflow for each (name, tags) pair in a single batch transaction.
async fn find_workflows(
&self,
queries: &[(&str, serde_json::Value)],
) -> WorkflowResult<Vec<Option<Id>>>;

/// Pulls workflows for processing by the worker. Will only pull workflows with names matching the filter.
/// Should also update the ping of this worker.
async fn pull_workflows(
Expand Down
Loading