Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 22 additions & 6 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,11 +293,23 @@ pub struct Config {
/// Maximum time in milliseconds for a single push RPC to the worker service. This should be greater than the worker's internal timeout.
pub push_timeout_ms: u64,

/// The size of a batch of status updates. Only active in push mode.
pub status_flush_batch_size: usize,
/// Should status updates initiated by the server be batched?
pub batch_server_updates: bool,

/// Maximum milliseconds to wait before flushing a batch of status updates.
pub status_flush_interval_ms: u64,
/// The size of a batch of status updates from the server.
pub server_update_batch_size: usize,

/// Maximum milliseconds to wait before flushing a batch of status updates from the server.
pub server_update_interval_ms: u64,

/// Should status updates initiated by the push pool be batched?
pub batch_push_updates: bool,

/// The size of a batch of status updates from the push pool.
pub push_update_batch_size: usize,

/// Maximum milliseconds to wait before flushing a batch of status updates from the push pool.
pub push_update_interval_ms: u64,

/// The hostname used to construct `callback_url` for task push requests.
pub callback_addr: String,
Expand Down Expand Up @@ -389,8 +401,12 @@ impl Default for Config {
push_queue_size: 1,
push_queue_timeout_ms: 5000,
push_timeout_ms: 30000,
status_flush_batch_size: 1,
status_flush_interval_ms: 100,
batch_server_updates: false,
server_update_batch_size: 1,
server_update_interval_ms: 100,
batch_push_updates: false,
push_update_batch_size: 1,
push_update_interval_ms: 100,
callback_addr: "0.0.0.0".into(),
callback_port: 50051,
worker_map: [("sentry".into(), "http://127.0.0.1:50052".into())].into(),
Expand Down
8 changes: 6 additions & 2 deletions src/fetch/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ impl InflightActivationStore for MockStore {
Ok(())
}

async fn mark_activation_processing_batch(&self, _id: &[String]) -> Result<u64, Error> {
Ok(0)
}

async fn pending_activation_max_lag(&self, _now: &DateTime<Utc>) -> f64 {
unimplemented!()
}
Expand All @@ -126,8 +130,8 @@ impl InflightActivationStore for MockStore {
&self,
_ids: &[String],
_status: InflightActivationStatus,
) -> Result<(), Error> {
Ok(())
) -> Result<u64, Error> {
Ok(0)
}

async fn delete_activation_batch(&self, _ids: &[String]) -> Result<u64, Error> {
Expand Down
1 change: 0 additions & 1 deletion src/grpc/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
pub mod auth_middleware;
pub mod metrics_middleware;
pub mod server;
pub mod status_flusher;

#[cfg(test)]
mod server_tests;
67 changes: 45 additions & 22 deletions src/grpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use sentry_protos::taskbroker::v1::{
FetchNextTask, GetTaskRequest, GetTaskResponse, SetTaskStatusRequest, SetTaskStatusResponse,
TaskActivation, TaskActivationStatus,
};
use tokio::sync::mpsc;
use tokio::sync::mpsc::Sender;
use tonic::{Request, Response, Status};
use tracing::{debug, error, instrument, warn};

Expand All @@ -21,7 +21,7 @@ use crate::store::traits::InflightActivationStore;
pub struct TaskbrokerServer {
pub store: Arc<dyn InflightActivationStore>,
pub config: Arc<Config>,
pub status_tx: Option<mpsc::Sender<StatusUpdate>>,
pub update_tx: Option<Sender<StatusUpdate>>,
}

#[tonic::async_trait]
Expand Down Expand Up @@ -106,7 +106,7 @@ impl ConsumerService for TaskbrokerServer {
metrics::counter!("grpc_server.set_status.failure").increment(1);
}

if let Some(ref tx) = self.status_tx {
if let Some(ref tx) = self.update_tx {
tx.send((id, status))
.await
.map_err(|_| Status::internal("Status update channel closed"))?;
Expand Down Expand Up @@ -211,7 +211,7 @@ impl ConsumerService for TaskbrokerServer {

pub type StatusUpdate = (String, InflightActivationStatus);

pub async fn flush_status_updates(
pub async fn flush_updates(
store: Arc<dyn InflightActivationStore>,
buffer: &mut Vec<StatusUpdate>,
) {
Expand All @@ -226,30 +226,56 @@ pub async fn flush_status_updates(
by_status.entry(status).or_default().push(id);
}

let mut success = 0;
let mut fail = 0;

for (status, ids) in by_status {
let count = ids.len() as u64;
let requested = ids.len() as u64;
let st = status.to_string();

match store
.set_status_batch(&ids, status)
.await
.map(|()| ids.len() as u64)
{
Ok(count) => {
success += count;
debug!(?status, ?count, "Flushed status batch");
metrics::histogram!("grpc_server.flush_updates.requested", "status" => st.clone())
.record(requested as f64);

match store.set_status_batch(&ids, status).await {
Ok(affected) => {
metrics::histogram!(
"grpc_server.flush_updates.affected",
"status" => st.clone()
)
.record(affected as f64);

metrics::counter!(
"grpc_server.flush_updates.updated",
"status" => st.clone()
)
.increment(affected);

metrics::counter!("grpc_server.flush_updates", "result" => "ok").increment(1);

if affected < requested {
metrics::counter!(
"grpc_server.flush_updates.partial",
"status" => st.clone()
)
.increment(1);

warn!(
?status,
requested, affected, "Updated fewer rows than IDs requested from server"
);
}

debug!(
?status,
affected, requested, "Flushed status batch from server"
);
}

Err(e) => {
fail += count;
metrics::counter!("grpc_server.flush_updates", "result" => "error").increment(1);

error!(
?status,
?count,
requested,
error = ?e,
"Failed to flush status batch"
"Failed to flush status batch from server"
);

// Push failed updates back into the buffer so they can be retried on next flush
Expand All @@ -259,7 +285,4 @@ pub async fn flush_status_updates(
}
}
}

metrics::gauge!(format!("grpc_server.flush_status_updates.success")).set(success as f64);
metrics::gauge!(format!("grpc_server.flush_status_updates.fail")).set(fail as f64);
}
22 changes: 11 additions & 11 deletions src/grpc/server_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ async fn test_get_task_push_mode_returns_permission_denied() {
let service = TaskbrokerServer {
store,
config,
status_tx: None,
update_tx: None,
};

let request = GetTaskRequest {
Expand All @@ -51,7 +51,7 @@ async fn test_get_task(#[case] adapter: &str) {
let service = TaskbrokerServer {
store,
config,
status_tx: None,
update_tx: None,
};

let request = GetTaskRequest {
Expand All @@ -78,7 +78,7 @@ async fn test_set_task_status(#[case] adapter: &str) {
let service = TaskbrokerServer {
store,
config,
status_tx: None,
update_tx: None,
};

let request = SetTaskStatusRequest {
Expand All @@ -105,7 +105,7 @@ async fn test_set_task_status_invalid(#[case] adapter: &str) {
let service = TaskbrokerServer {
store,
config,
status_tx: None,
update_tx: None,
};

let request = SetTaskStatusRequest {
Expand Down Expand Up @@ -139,7 +139,7 @@ async fn test_get_task_success(#[case] adapter: &str) {
let service = TaskbrokerServer {
store: store.clone(),
config,
status_tx: None,
update_tx: None,
};

let request = GetTaskRequest {
Expand Down Expand Up @@ -179,7 +179,7 @@ async fn test_get_task_with_application_success(#[case] adapter: &str) {
let service = TaskbrokerServer {
store,
config,
status_tx: None,
update_tx: None,
};

let request = GetTaskRequest {
Expand Down Expand Up @@ -213,7 +213,7 @@ async fn test_get_task_with_namespace_requires_application(#[case] adapter: &str
let service = TaskbrokerServer {
store,
config,
status_tx: None,
update_tx: None,
};

let request = GetTaskRequest {
Expand Down Expand Up @@ -243,7 +243,7 @@ async fn test_set_task_status_success(#[case] adapter: &str) {
let service = TaskbrokerServer {
store,
config,
status_tx: None,
update_tx: None,
};

let request = GetTaskRequest {
Expand Down Expand Up @@ -295,7 +295,7 @@ async fn test_set_task_status_with_application(#[case] adapter: &str) {
let service = TaskbrokerServer {
store,
config,
status_tx: None,
update_tx: None,
};

let request = SetTaskStatusRequest {
Expand Down Expand Up @@ -340,7 +340,7 @@ async fn test_set_task_status_with_application_no_match(#[case] adapter: &str) {
let service = TaskbrokerServer {
store,
config,
status_tx: None,
update_tx: None,
};

// Request a task from an application without any activations.
Expand Down Expand Up @@ -375,7 +375,7 @@ async fn test_set_task_status_with_namespace_requires_application(#[case] adapte
let service = TaskbrokerServer {
store,
config,
status_tx: None,
update_tx: None,
};

let request = SetTaskStatusRequest {
Expand Down
Loading
Loading