Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 11 additions & 11 deletions crates/orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,35 +7,35 @@ edition.workspace = true
workspace = true

[dependencies]
p2p = { workspace = true}
shared = { workspace = true }

actix-web = { workspace = true }
actix-web-prometheus = "0.1.2"
alloy = { workspace = true }
anyhow = { workspace = true }
async-trait = "0.1.88"
base64 = "0.22.1"
chrono = { workspace = true, features = ["serde"] }
clap = { workspace = true }
env_logger = { workspace = true }
futures = { workspace = true }
google-cloud-auth = "0.18.0"
google-cloud-storage = "0.24.0"
hex = { workspace = true }
log = { workspace = true }
prometheus = "0.14.0"
rand = "0.9.0"
redis = { workspace = true, features = ["tokio-comp"] }
redis-test = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
shared = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
url = { workspace = true }
uuid = { workspace = true }

actix-web-prometheus = "0.1.2"
google-cloud-auth = "0.18.0"
google-cloud-storage = "0.24.0"
prometheus = "0.14.0"
rand = "0.9.0"
utoipa = { version = "5.3.0", features = ["actix_extras", "chrono", "uuid"] }
utoipa-swagger-ui = { version = "9.0.2", features = ["actix-web", "debug-embed", "reqwest", "vendored"] }
uuid = { workspace = true }
iroh = { workspace = true }
rand_v8 = { workspace = true }

[dev-dependencies]
mockito = { workspace = true }
24 changes: 16 additions & 8 deletions crates/orchestrator/src/api/routes/groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,6 @@ async fn fetch_node_logs_p2p(

match node {
Some(node) => {
// Check if P2P client is available
let p2p_client = app_state.p2p_client.clone();

// Check if node has P2P information
let (worker_p2p_id, worker_p2p_addresses) =
match (&node.worker_p2p_id, &node.worker_p2p_addresses) {
Expand All @@ -254,11 +251,22 @@ async fn fetch_node_logs_p2p(
};

// Send P2P request for task logs
match tokio::time::timeout(
Duration::from_secs(NODE_REQUEST_TIMEOUT),
p2p_client.get_task_logs(node_address, worker_p2p_id, worker_p2p_addresses),
)
.await
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
let get_task_logs_request = crate::p2p::GetTaskLogsRequest {
worker_wallet_address: node_address,
worker_p2p_id: worker_p2p_id.clone(),
worker_addresses: worker_p2p_addresses.clone(),
response_tx,
};
if let Err(e) = app_state.get_task_logs_tx.send(get_task_logs_request).await {
error!("Failed to send GetTaskLogsRequest for node {node_address}: {e}");
return json!({
"success": false,
"error": format!("Failed to send request: {}", e),
"status": node.status.to_string()
});
};
match tokio::time::timeout(Duration::from_secs(NODE_REQUEST_TIMEOUT), response_rx).await
{
Ok(Ok(log_lines)) => {
json!({
Expand Down
42 changes: 32 additions & 10 deletions crates/orchestrator/src/api/routes/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,22 @@ async fn restart_node_task(node_id: web::Path<String>, app_state: Data<AppState>
.as_ref()
.expect("worker_p2p_addresses should be present");

match app_state
.p2p_client
.restart_task(node_address, p2p_id, p2p_addresses)
.await
{
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
let restart_task_request = crate::p2p::RestartTaskRequest {
worker_wallet_address: node.address,
worker_p2p_id: p2p_id.clone(),
worker_addresses: p2p_addresses.clone(),
response_tx,
};
if let Err(e) = app_state.restart_task_tx.send(restart_task_request).await {
error!("Failed to send restart task request: {e}");
return HttpResponse::InternalServerError().json(json!({
"success": false,
"error": "Failed to send restart task request"
}));
}

match response_rx.await {
Ok(_) => HttpResponse::Ok().json(json!({
"success": true,
"message": "Task restarted successfully"
Expand Down Expand Up @@ -240,11 +251,22 @@ async fn get_node_logs(node_id: web::Path<String>, app_state: Data<AppState>) ->
}));
};

match app_state
.p2p_client
.get_task_logs(node_address, p2p_id, p2p_addresses)
.await
{
let (response_tx, response_rx) = tokio::sync::oneshot::channel();
let get_task_logs_request = crate::p2p::GetTaskLogsRequest {
worker_wallet_address: node.address,
worker_p2p_id: p2p_id.clone(),
worker_addresses: p2p_addresses.clone(),
response_tx,
};
if let Err(e) = app_state.get_task_logs_tx.send(get_task_logs_request).await {
error!("Failed to send get task logs request: {e}");
return HttpResponse::InternalServerError().json(json!({
"success": false,
"error": "Failed to send get task logs request"
}));
}

match response_rx.await {
Ok(logs) => HttpResponse::Ok().json(json!({
"success": true,
"logs": logs
Expand Down
32 changes: 18 additions & 14 deletions crates/orchestrator/src/api/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::api::routes::task::tasks_routes;
use crate::api::routes::{heartbeat::heartbeat_routes, metrics::metrics_routes};
use crate::metrics::MetricsContext;
use crate::models::node::NodeStatus;
use crate::p2p::client::P2PClient;
use crate::p2p::{GetTaskLogsRequest, RestartTaskRequest};
use crate::plugins::node_groups::NodeGroupsPlugin;
use crate::scheduler::Scheduler;
use crate::store::core::{RedisStore, StoreContext};
Expand All @@ -23,6 +23,7 @@ use shared::utils::StorageProvider;
use shared::web3::contracts::core::builder::Contracts;
use shared::web3::wallet::WalletProvider;
use std::sync::Arc;
use tokio::sync::mpsc::Sender;
use utoipa::{
openapi::security::{ApiKey, ApiKeyValue, SecurityScheme},
Modify, OpenApi,
Expand Down Expand Up @@ -116,17 +117,18 @@ async fn health_check(data: web::Data<AppState>) -> HttpResponse {
}

pub(crate) struct AppState {
pub store_context: Arc<StoreContext>,
pub storage_provider: Option<Arc<dyn StorageProvider>>,
pub heartbeats: Arc<LoopHeartbeats>,
pub redis_store: Arc<RedisStore>,
pub hourly_upload_limit: i64,
pub contracts: Option<Contracts<WalletProvider>>,
pub pool_id: u32,
pub scheduler: Scheduler,
pub node_groups_plugin: Option<Arc<NodeGroupsPlugin>>,
pub metrics: Arc<MetricsContext>,
pub p2p_client: Arc<P2PClient>,
pub(crate) store_context: Arc<StoreContext>,
pub(crate) storage_provider: Option<Arc<dyn StorageProvider>>,
pub(crate) heartbeats: Arc<LoopHeartbeats>,
pub(crate) redis_store: Arc<RedisStore>,
pub(crate) hourly_upload_limit: i64,
pub(crate) contracts: Option<Contracts<WalletProvider>>,
pub(crate) pool_id: u32,
pub(crate) scheduler: Scheduler,
pub(crate) node_groups_plugin: Option<Arc<NodeGroupsPlugin>>,
pub(crate) metrics: Arc<MetricsContext>,
pub(crate) get_task_logs_tx: Sender<GetTaskLogsRequest>,
pub(crate) restart_task_tx: Sender<RestartTaskRequest>,
}

#[allow(clippy::too_many_arguments)]
Expand All @@ -145,7 +147,8 @@ pub async fn start_server(
scheduler: Scheduler,
node_groups_plugin: Option<Arc<NodeGroupsPlugin>>,
metrics: Arc<MetricsContext>,
p2p_client: Arc<P2PClient>,
get_task_logs_tx: Sender<GetTaskLogsRequest>,
restart_task_tx: Sender<RestartTaskRequest>,
) -> Result<(), Error> {
info!("Starting server at http://{host}:{port}");
let app_state = Data::new(AppState {
Expand All @@ -159,7 +162,8 @@ pub async fn start_server(
scheduler,
node_groups_plugin,
metrics,
p2p_client,
get_task_logs_tx,
restart_task_tx,
});
let node_store = app_state.store_context.node_store.clone();
let node_store_clone = node_store.clone();
Expand Down
Loading