Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 18 additions & 15 deletions packages/windmill/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ name = "windmill"
version = "0.1.0"
edition = "2021"
authors = [
"Felix Robles <felix@sequentech.io>",
"Eduardo Robles <edu@sequentech.io>"
"Felix Robles <felix@sequentech.io>",
"Eduardo Robles <edu@sequentech.io>"
]
license = "AGPL-3.0-only"

Expand All @@ -25,7 +25,7 @@ rocket = { version = "0.5", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
serde_path_to_error = "0.1"
reqwest = { version="0.12", features = ["multipart", "blocking", "json"] }
reqwest = { version = "0.12", features = ["multipart", "blocking", "json"] }
handlebars = "6.1"
anyhow = "1.0"
tempfile = "3.15"
Expand Down Expand Up @@ -70,22 +70,22 @@ time = "0.3"

# for sending emails and sms
lettre = { version = "0.11", features = ["rustls-tls", "smtp-transport", "builder"] }
aws-config = { version = "1.6", features = ["behavior-version-latest"]}
aws-sdk-sesv2 = { version = "1.70"}
aws-sdk-sns = { version = "1.63"}
aws-config = { version = "1.6", features = ["behavior-version-latest"] }
aws-sdk-sesv2 = { version = "1.70" }
aws-sdk-sns = { version = "1.63" }
# for s3
aws-sdk-s3 = { version = "1.121.0"}
aws-smithy-types = { version = "1.3"}
aws-sdk-s3 = { version = "1.121.0" }
aws-smithy-types = { version = "1.3" }
# for secrets
aws-sdk-secretsmanager = "1.66"

wrap-map-err = { path = "../wrap-map-err" }
immudb-rs = { path="../immudb-rs" }
b3 = { path="../b3", features = ["client"] }
electoral-log = { path="../electoral-log" }
strand = { path="../strand" }
sequent-core = { path="../sequent-core", features = ["reports", "s3", "keycloak", "log", "probe", "areas", "default_features","plugins_wit", "sqlite"] }
velvet = { path="../velvet" }
immudb-rs = { path = "../immudb-rs" }
b3 = { path = "../b3", features = ["client"] }
electoral-log = { path = "../electoral-log" }
strand = { path = "../strand" }
sequent-core = { path = "../sequent-core", features = ["reports", "s3", "keycloak", "log", "probe", "areas", "default_features", "plugins_wit", "sqlite"] }
velvet = { path = "../velvet" }


# To connect to the Database
Expand All @@ -94,7 +94,7 @@ deadpool-postgres = { version = "0.14", features = ["rt_tokio_1", "serde"] }
tokio-postgres = { version = "0.7", features = ["with-uuid-1", "with-chrono-0_4", "with-serde_json-1"] }
tokio-stream = "0.1"
tokio-util = { version = "0.7", features = ["io"] }
rust_decimal= { version = "1.36", features = ["db-tokio-postgres"] }
rust_decimal = { version = "1.36", features = ["db-tokio-postgres"] }
rust_decimal_macros = "1.36"
config = "0.15"
cfg-if = "1.0"
Expand Down Expand Up @@ -142,3 +142,6 @@ dhat = "0.3"

[package.metadata.component.target.dependencies]
"docs:plugin" = { path = "packages/sequent-core/src/wit/plugin_interface.wit" }

[lints.rust]
unsafe_code = "forbid"
35 changes: 19 additions & 16 deletions packages/windmill/src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use sequent_core::util::init_log::init_log;
use std::collections::HashMap;
use tokio::runtime::Builder;
use tracing::{event, Level};
use windmill::services::celery_app::*;
use windmill::services::celery_app::{self as celery_cfg, Queue};
use windmill::services::probe::{setup_probe, AppName};
use windmill::services::tasks_semaphore::init_semaphore;

Expand Down Expand Up @@ -88,13 +88,13 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {

let opt = CeleryOpt::parse();

let cpus = read_worker_threads(&opt);
set_worker_threads(cpus);
let worker_threads = read_worker_threads(&opt);
celery_cfg::set_worker_threads(worker_threads);

// 1) Build a custom runtime
let rt = Builder::new_multi_thread()
.enable_all()
.worker_threads(cpus)
.worker_threads(worker_threads)
.thread_stack_size(8 * 1024 * 1024)
.build()?;

Expand All @@ -108,8 +108,8 @@ async fn async_main(opt: CeleryOpt) -> Result<()> {
init_log(true);
setup_probe(AppName::WINDMILL).await;

let cpus = get_worker_threads();
init_semaphore(cpus);
let cpus = celery_cfg::get_worker_threads();
init_semaphore(cpus).with_context(|| "failed to init semaphore")?;
let slug = std::env::var("ENV_SLUG").with_context(|| "missing env var ENV_SLUG")?;

match opt.clone() {
Expand All @@ -122,12 +122,15 @@ async fn async_main(opt: CeleryOpt) -> Result<()> {
heartbeat,
..
} => {
set_prefetch_count(prefetch_count);
set_acks_late(acks_late);
set_task_max_retries(task_max_retries);
set_broker_connection_max_retries(broker_connection_max_retries);
set_heartbeat(heartbeat);
let celery_app = get_celery_app().await;
celery_cfg::set_config(celery_cfg::CeleryConfig {
prefetch_count,
acks_late,
task_max_retries,
broker_connection_max_retries,
heartbeat_secs: heartbeat,
});

let celery_app = celery_cfg::get_celery_app().await;
celery_app.display_pretty().await;
let queues: Vec<String> = queues_input
.iter()
Expand All @@ -145,14 +148,14 @@ async fn async_main(opt: CeleryOpt) -> Result<()> {
if !duplicates.is_empty() {
return Err(anyhow!("Found duplicate queues: {:?}", duplicates));
}
set_queues(queues.clone());
set_is_app_active(true);
celery_cfg::set_queues(queues.clone());
celery_cfg::set_is_app_active(true);
celery_app.consume_from(&vec_str[..]).await?;
set_is_app_active(false);
celery_cfg::set_is_app_active(false);
celery_app.close().await?;
}
CeleryOpt::Produce => {
let celery_app = get_celery_app().await;
let celery_app = celery_cfg::get_celery_app().await;
event!(Level::INFO, "No new tasks to produce");
celery_app.close().await?;
}
Expand Down
145 changes: 78 additions & 67 deletions packages/windmill/src/services/celery_app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ use celery::Celery;
use lapin::{Connection, ConnectionProperties};
use std;
use std::convert::AsRef;
use std::sync::Arc;
use std::sync::{Arc, LazyLock, Mutex, RwLock};
use strum_macros::AsRefStr;
use tokio::sync::{Mutex, RwLock};
use tracing::{event, info, instrument, Level};

use crate::services::plugins_manager::plugin_manager::init_plugin_manager;
Expand Down Expand Up @@ -93,74 +92,86 @@ impl Queue {
}
}

static mut PREFETCH_COUNT_S: u16 = 100;
static mut ACKS_LATE_S: bool = true;
static mut TASK_MAX_RETRIES: u32 = 4;
static mut IS_APP_ACTIVE: bool = true;
static mut BROKER_CONNECTION_MAX_RETRIES: u32 = 5;
static mut HEARTBEAT_SECS: u16 = 10;
static mut WORKER_THREADS: usize = 1;
static mut QUEUES: Vec<String> = vec![];

pub fn set_prefetch_count(new_val: u16) {
unsafe {
PREFETCH_COUNT_S = new_val;
}
/// The main struct for global Celery configuration.
/// Set at-most once; either by command line options during startup or falls back to defaults.
pub struct CeleryConfig {
pub prefetch_count: u16,
pub acks_late: bool,
pub task_max_retries: u32,
pub broker_connection_max_retries: u32,
pub heartbeat_secs: u16,
}

pub fn set_worker_threads(new_val: usize) {
unsafe {
WORKER_THREADS = new_val;
}
}
/// Global Celery configuration.
/// Expected to be either set once during startup or used with defaults.
static CELERY_CONFIG: LazyLock<RwLock<CeleryConfig>> = LazyLock::new(|| {
RwLock::new(CeleryConfig {
prefetch_count: 100,
acks_late: true,
task_max_retries: 4,
broker_connection_max_retries: 5,
heartbeat_secs: 10,
})
});

pub fn get_worker_threads() -> usize {
unsafe { WORKER_THREADS }
}
/// Global Celery queues configured.
/// Expected to be either set once during startup or kept empty.
static QUEUES: LazyLock<RwLock<Vec<String>>> = LazyLock::new(|| RwLock::new(Vec::new()));

pub fn set_acks_late(new_val: bool) {
unsafe {
ACKS_LATE_S = new_val;
}
}
/// Globally configured worker threads.
static WORKER_THREADS: LazyLock<RwLock<usize>> = LazyLock::new(|| RwLock::new(1));

pub fn set_task_max_retries(new_val: u32) {
unsafe {
TASK_MAX_RETRIES = new_val;
}
/// Global app execution status.
static IS_APP_ACTIVE: LazyLock<RwLock<bool>> = LazyLock::new(|| RwLock::new(true));

/// Update global Celery config.
/// Expected to be called at-most once, during startup.
pub fn set_config(new_config: CeleryConfig) {
let mut config = CELERY_CONFIG
.write()
.expect("failed to write-lock CeleryConfig");
*config = new_config;
}

pub fn set_queues(new_val: Vec<String>) {
unsafe {
QUEUES = new_val;
}
/// Update global Celery queues.
/// Expected to be called at-most once, during startup.
pub fn set_queues(new_queues: Vec<String>) {
*QUEUES.write().expect("failed to write-lock queues") = new_queues;
}

#[instrument]
pub fn set_is_app_active(new_val: bool) {
unsafe {
IS_APP_ACTIVE = new_val;
}
/// Get globally configured Celery queues.
pub fn get_queues() -> Vec<String> {
QUEUES.read().expect("failed to read-lock queues").clone()
}

pub fn set_broker_connection_max_retries(new_val: u32) {
unsafe {
BROKER_CONNECTION_MAX_RETRIES = new_val;
}
/// Update global worker threads.
/// Expected to be called at-most once, during startup.
pub fn set_worker_threads(new_val: usize) {
*WORKER_THREADS
.write()
.expect("failed to write-lock worker_threads") = new_val;
}

pub fn set_heartbeat(new_val: u16) {
unsafe {
HEARTBEAT_SECS = new_val;
}
/// Get global worker threads.
pub fn get_worker_threads() -> usize {
*WORKER_THREADS
.read()
.expect("failed to read-lock worker_threads")
}

pub fn get_is_app_active() -> bool {
unsafe { IS_APP_ACTIVE }
/// Update global app execution status.
#[instrument]
pub fn set_is_app_active(new_val: bool) {
*IS_APP_ACTIVE
.write()
.expect("failed to write-lock is_app_active") = new_val;
}

pub fn get_queues() -> Vec<String> {
unsafe { QUEUES.clone() }
/// Get global app execution status.
pub fn get_is_app_active() -> bool {
*IS_APP_ACTIVE
.read()
.expect("failed to read-lock is_app_active")
}

lazy_static! {
Expand Down Expand Up @@ -215,18 +226,17 @@ async fn create_connection() -> Result<(Arc<Connection>, String)> {

#[instrument]
pub async fn generate_celery_app() -> Result<Arc<Celery>> {
let prefetch_count: u16;
let acks_late: bool;
let task_max_retries: u32;
let broker_connection_max_retries: u32;
let heartbeat: u16;
unsafe {
prefetch_count = PREFETCH_COUNT_S;
acks_late = ACKS_LATE_S;
task_max_retries = TASK_MAX_RETRIES;
broker_connection_max_retries = BROKER_CONNECTION_MAX_RETRIES;
heartbeat = HEARTBEAT_SECS;
}
let CeleryConfig {
prefetch_count,
acks_late,
task_max_retries,
broker_connection_max_retries,
heartbeat_secs,
..
} = *CELERY_CONFIG
.read()
.map_err(|_| anyhow!("failed to read-lock CeleryConfig"))?;
Comment on lines +229 to +238

event!(
Level::INFO,
"prefetch_count: {}, acks_late: {}",
Expand Down Expand Up @@ -348,14 +358,15 @@ pub async fn generate_celery_app() -> Result<Arc<Celery>> {
prefetch_count = prefetch_count,
acks_late = acks_late,
task_max_retries = task_max_retries,
heartbeat = Some(heartbeat),
heartbeat = Some(heartbeat_secs),
broker_connection_max_retries = broker_connection_max_retries,
)
.await
.map_err(|err| anyhow!("{:?}", err))
}

static CELERY_CONNECTION: RwLock<Option<Arc<Connection>>> = RwLock::const_new(None);
static CELERY_CONNECTION: tokio::sync::RwLock<Option<Arc<Connection>>> =
tokio::sync::RwLock::const_new(None);

/// Returns a reused AMQP connection wrapped in an Arc.
/// If no connection exists (or if it’s disconnected), a new connection is created and stored.
Expand Down
Loading