Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions sql/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1753,6 +1753,37 @@ begin
end;
$$;

-- Returns the current status and result of a task.
-- For completed tasks, includes the output payload.
-- For failed tasks, includes the error from the latest run via last_attempt_run.
-- Returns NULL (empty result set) if the task doesn't exist.
create function durable.get_task_result (
p_queue_name text,
p_task_id uuid
)
returns table (
task_id uuid,
state text,
completed_payload jsonb,
failure_reason jsonb
)
language plpgsql
as $$
begin
return query execute format(
'select t.task_id,
t.state,
t.completed_payload,
r.failure_reason
from durable.%I t
left join durable.%I r on r.run_id = t.last_attempt_run
where t.task_id = $1',
't_' || p_queue_name,
'r_' || p_queue_name
) using p_task_id;
end;
$$;

-- utility function to generate a uuidv7 even for older postgres versions.
create function durable.portable_uuidv7 ()
returns uuid
Expand Down
68 changes: 67 additions & 1 deletion src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ use crate::error::{DurableError, DurableResult};
use crate::task::{Task, TaskRegistry, TaskWrapper};
use crate::types::{
CancellationPolicy, DurableEventPayload, RetryStrategy, SpawnDefaults, SpawnOptions,
SpawnResult, SpawnResultRow, WorkerOptions,
SpawnResult, SpawnResultRow, TaskErrorInfo, TaskPollResult, TaskPollResultRow, TaskStatus,
WorkerOptions,
};

/// Internal struct for serializing spawn options to the database.
Expand Down Expand Up @@ -742,10 +743,75 @@ where
Ok(Worker::start(self.clone_inner(), options).await)
}

/// Get the current status and result of a task.
///
/// Returns `None` if the task doesn't exist in this queue.
/// For completed tasks, includes the output payload.
/// For failed tasks, includes the error from the latest failed run.
pub async fn get_task_result(&self, task_id: Uuid) -> DurableResult<Option<TaskPollResult>> {
let query = "SELECT task_id, state, completed_payload, failure_reason
FROM durable.get_task_result($1, $2)";

let row: Option<TaskPollResultRow> = sqlx::query_as(query)
.bind(self.queue_name())
.bind(task_id)
.fetch_optional(self.pool())
.await?;

let Some(row) = row else {
return Ok(None);
};

let status = parse_task_status(&row.state)?;

let error = if status == TaskStatus::Failed {
row.failure_reason.as_ref().map(|reason| {
let name = reason
.get("name")
.and_then(|v| v.as_str())
.unwrap_or("Unknown")
.to_string();
let message = reason
.get("message")
.and_then(|v| v.as_str())
.unwrap_or("Unknown error")
.to_string();
TaskErrorInfo {
name,
message,
raw: Some(reason.clone()),
}
})
} else {
None
};

Ok(Some(TaskPollResult {
task_id: row.task_id,
status,
output: row.completed_payload,
error,
}))
}

/// Close the client. Closes the pool if owned.
pub async fn close(self) {
if self.owns_pool.load(Ordering::Relaxed) {
self.pool.close().await;
}
}
}

fn parse_task_status(state: &str) -> DurableResult<TaskStatus> {
match state {
"pending" => Ok(TaskStatus::Pending),
"running" => Ok(TaskStatus::Running),
"sleeping" => Ok(TaskStatus::Sleeping),
"completed" => Ok(TaskStatus::Completed),
"failed" => Ok(TaskStatus::Failed),
"cancelled" => Ok(TaskStatus::Cancelled),
other => Err(DurableError::InvalidState {
state: other.to_string(),
}),
}
}
7 changes: 7 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,13 @@ pub enum DurableError {
/// The reason the configuration was invalid.
reason: String,
},

/// Database returned an unrecognized task state.
#[error("invalid task state: {state}")]
InvalidState {
/// The unrecognized state string.
state: String,
},
}

/// Result type alias for Client API operations.
Expand Down
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ pub use error::{ControlFlow, DurableError, DurableResult, TaskError, TaskResult}
pub use task::{ErasedTask, Task, TaskWrapper};
pub use types::{
CancellationPolicy, ClaimedTask, DurableEventPayload, RetryStrategy, SpawnDefaults,
SpawnOptions, SpawnResult, TaskHandle, WorkerOptions,
SpawnOptions, SpawnResult, TaskErrorInfo, TaskHandle, TaskPollResult, TaskStatus,
WorkerOptions,
};
pub use worker::Worker;

Expand Down
30 changes: 30 additions & 0 deletions src/postgres/migrations/20260221000000_add_get_task_result.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
-- Returns the current status and result of a task.
-- For completed tasks, includes the output payload.
-- For failed tasks, includes the error from the latest run via last_attempt_run.
-- Returns NULL (empty result set) if the task doesn't exist.
create function durable.get_task_result (
p_queue_name text,
p_task_id uuid
)
returns table (
task_id uuid,
state text,
completed_payload jsonb,
failure_reason jsonb
)
language plpgsql
as $$
begin
return query execute format(
'select t.task_id,
t.state,
t.completed_payload,
r.failure_reason
from durable.%I t
left join durable.%I r on r.run_id = t.last_attempt_run
where t.task_id = $1',
't_' || p_queue_name,
'r_' || p_queue_name
) using p_task_id;
end;
$$;
41 changes: 41 additions & 0 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,47 @@ impl<T> TaskHandle<T> {
}
}

/// Status of a durable task.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum TaskStatus {
Pending,
Running,
Sleeping,
Completed,
Failed,
Cancelled,
}

/// Error info from a failed task.
#[derive(Debug, Clone)]
pub struct TaskErrorInfo {
pub name: String,
pub message: String,
/// The raw error data from the failure_reason column, if available.
pub raw: Option<serde_json::Value>,
}

/// Result of querying a task's status and output.
#[derive(Debug, Clone)]
pub struct TaskPollResult {
pub task_id: Uuid,
pub status: TaskStatus,
/// Task output (JSON), present when status is Completed.
pub output: Option<serde_json::Value>,
/// Error info, present when status is Failed.
pub error: Option<TaskErrorInfo>,
}

/// Internal: Row returned from get_task_result stored procedure
#[derive(Debug, Clone, sqlx::FromRow)]
pub(crate) struct TaskPollResultRow {
pub task_id: Uuid,
pub state: String,
pub completed_payload: Option<JsonValue>,
pub failure_reason: Option<JsonValue>,
}

/// Default settings for spawned tasks.
///
/// Groups the default `max_attempts`, `retry_strategy`, and `cancellation`
Expand Down
Loading