Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 82 additions & 9 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 17 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,33 @@ keywords = ["apalis", "background-jobs", "task-queue", "sqlite", "async"]
categories = ["asynchronous", "database"]

[features]
default = ["tokio-comp", "migrate", "json"]
default = ["tokio-comp", "migrate", "json", "chrono"]
migrate = ["sqlx/migrate", "sqlx/macros"]
async-std-comp = ["async-std", "sqlx/runtime-async-std-rustls"]
async-std-comp-native-tls = ["async-std", "sqlx/runtime-async-std-native-tls"]
tokio-comp = ["tokio", "sqlx/runtime-tokio-rustls"]
tokio-comp-native-tls = ["tokio", "sqlx/runtime-tokio-native-tls"]
json = ["apalis-codec/json", "sqlx/json"]
chrono = ["dep:chrono", "sqlx/chrono"]
time = ["dep:time", "sqlx/time"]

[dependencies.sqlx]
version = "0.8.6"
default-features = false
features = ["chrono", "sqlite"]
features = ["sqlite"]

[dependencies]
serde = { version = "1", features = ["derive"] }
serde_json = { version = "1" }
apalis-codec = { version = "0.1.0-rc.1", default-features = false }
apalis-core = { version = "1.0.0-rc.1", features = ["sleep"] }
apalis-sql = { version = "1.0.0-rc.1" }
apalis-core = { path = "../apalis/apalis-core", features = ["sleep"] }
apalis-sql = { path = "../apalis/apalis-sql", default-features = false }
log = "0.4.21"
futures = "0.3.30"
tokio = { version = "1", features = ["rt", "net"], optional = true }
async-std = { version = "1.13.0", optional = true }
chrono = { version = "0.4", features = ["serde"] }
chrono = { version = "0.4", features = ["serde"], optional = true }
time = { version = "0.3", features = ["serde"], optional = true }
thiserror = "2.0.0"
pin-project = "1.1.10"
ulid = { version = "1", features = ["serde"] }
Expand Down Expand Up @@ -69,3 +72,12 @@ needless_pass_by_ref_mut = "warn"
needless_pass_by_value = "warn"
option_option = "warn"
redundant_clone = "warn"

[package.metadata.cargo-udeps.ignore]
# When both chrono and time features are enabled, time takes precedence
# so chrono appears unused. This is expected behavior for --all-features.
normal = ["chrono"]

[patch.crates-io]
apalis-core = { path = "../apalis/apalis-core" }
apalis-sql = { path = "../apalis/apalis-sql" }
4 changes: 3 additions & 1 deletion src/fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ use apalis_core::{
worker::context::WorkerContext,
};
use apalis_sql::from_row::TaskRow;

use crate::SqliteDateTime;
use futures::{FutureExt, future::BoxFuture, stream::Stream};
use pin_project::pin_project;
use sqlx::{Pool, Sqlite, SqlitePool};
Expand Down Expand Up @@ -41,7 +43,7 @@ where
.await?
.into_iter()
.map(|r| {
let row: TaskRow = r.try_into()?;
let row: TaskRow<SqliteDateTime> = r.try_into()?;
row.try_into_task_compact()
.map_err(|e| sqlx::Error::Protocol(e.to_string()))
})
Expand Down
33 changes: 12 additions & 21 deletions src/from_row.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use chrono::{TimeZone, Utc};
use crate::SqliteDateTime;

#[derive(Debug)]
pub(crate) struct SqliteTaskRow {
Expand All @@ -17,10 +17,16 @@ pub(crate) struct SqliteTaskRow {
pub(crate) metadata: Option<String>,
}

impl TryInto<apalis_sql::from_row::TaskRow> for SqliteTaskRow {
/// Convert a UNIX timestamp to SqliteDateTime
fn unix_to_datetime(ts: i64) -> Result<SqliteDateTime, sqlx::Error> {
SqliteDateTime::from_unix_timestamp(ts)
.ok_or_else(|| sqlx::Error::Protocol("Invalid timestamp".into()))
}

impl TryInto<apalis_sql::from_row::TaskRow<SqliteDateTime>> for SqliteTaskRow {
type Error = sqlx::Error;

fn try_into(self) -> Result<apalis_sql::from_row::TaskRow, Self::Error> {
fn try_into(self) -> Result<apalis_sql::from_row::TaskRow<SqliteDateTime>, Self::Error> {
Ok(apalis_sql::from_row::TaskRow {
job: self.job,
id: self
Expand All @@ -37,28 +43,13 @@ impl TryInto<apalis_sql::from_row::TaskRow> for SqliteTaskRow {
.ok_or_else(|| sqlx::Error::Protocol("Missing attempts".into()))?
as usize,
max_attempts: self.max_attempts.map(|v| v as usize),
run_at: self.run_at.map(|ts| {
Utc.timestamp_opt(ts, 0)
.single()
.ok_or_else(|| sqlx::Error::Protocol("Invalid run_at timestamp".into()))
.unwrap()
}),
run_at: self.run_at.map(unix_to_datetime).transpose()?,
last_result: self
.last_result
.map(|res| serde_json::from_str(&res).unwrap_or(serde_json::Value::Null)),
lock_at: self.lock_at.map(|ts| {
Utc.timestamp_opt(ts, 0)
.single()
.ok_or_else(|| sqlx::Error::Protocol("Invalid run_at timestamp".into()))
.unwrap()
}),
lock_at: self.lock_at.map(unix_to_datetime).transpose()?,
lock_by: self.lock_by,
done_at: self.done_at.map(|ts| {
Utc.timestamp_opt(ts, 0)
.single()
.ok_or_else(|| sqlx::Error::Protocol("Invalid run_at timestamp".into()))
.unwrap()
}),
done_at: self.done_at.map(unix_to_datetime).transpose()?,
priority: self.priority.map(|v| v as usize),
metadata: self
.metadata
Expand Down
19 changes: 6 additions & 13 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{fmt, marker::PhantomData};

use apalis_codec::json::JsonCodec;
use apalis_core::{
backend::{Backend, BackendExt, TaskStream, codec::Codec, queue::Queue},
backend::{Backend, BackendExt, TaskStream, codec::Codec},
features_table,
layers::Stack,
task::Task,
Expand Down Expand Up @@ -47,14 +47,16 @@ pub mod queries;
mod shared;
/// Sink module for pushing tasks to sqlite backend
pub mod sink;
mod timestamp;

pub use timestamp::SqliteDateTime;

/// Type alias for sqlite context
pub type SqliteContext = SqlContext<SqlitePool>;
pub type SqliteContext = SqlContext;

/// Type alias for a task stored in sqlite backend
pub type SqliteTask<Args> = Task<Args, SqliteContext, Ulid>;
pub use apalis_sql::config::Config;
pub use apalis_sql::ext::TaskBuilderExt;
pub use callback::{DbEvent, HookCallbackListener};
pub use shared::{SharedSqliteError, SharedSqliteStorage};

Expand Down Expand Up @@ -323,10 +325,6 @@ where
type Compact = CompactType;
type CompactStream = TaskStream<SqliteTask<Self::Compact>, sqlx::Error>;

fn get_queue(&self) -> Queue {
self.config.queue().to_owned()
}

fn poll_compact(self, worker: &WorkerContext) -> Self::CompactStream {
self.poll_default(worker).boxed()
}
Expand Down Expand Up @@ -396,10 +394,6 @@ where
type Compact = CompactType;
type CompactStream = TaskStream<SqliteTask<Self::Compact>, sqlx::Error>;

fn get_queue(&self) -> Queue {
self.config.queue().to_owned()
}

fn poll_compact(self, worker: &WorkerContext) -> Self::CompactStream {
self.poll_with_listener(worker).boxed()
}
Expand Down Expand Up @@ -473,7 +467,6 @@ mod tests {

use apalis::prelude::*;
use apalis_workflow::*;
use chrono::Local;
use serde::{Deserialize, Serialize};
use sqlx::SqlitePool;

Expand All @@ -498,7 +491,7 @@ mod tests {
.take(ITEMS);
backend.push_stream(&mut items).await.unwrap();

println!("Starting worker at {}", Local::now());
println!("Starting worker");

async fn send_reminder(item: usize, wrk: WorkerContext) -> Result<(), BoxDynError> {
if ITEMS == item {
Expand Down
Loading