-
-
Notifications
You must be signed in to change notification settings - Fork 3
[STREAM-640] Push-Based Broker POC #524
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
11d8651
bb19867
8a5accb
420508b
1b51f14
6466619
46f8cec
c7be82e
5934f64
e627a9d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,9 @@ | ||
| kafka_topic: "test-topic" | ||
| push: true | ||
| default_metrics_tags: | ||
| host: "127.0.0.1" | ||
| log_filter: "debug,sqlx=debug,librdkafka=warn,h2=off" | ||
| # workers: | ||
| # - "http://127.0.0.1:50052" | ||
| # - "http://127.0.0.1:50053" | ||
| # - "http://127.0.0.1:50054" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,27 +2,57 @@ use chrono::Utc; | |
| use prost::Message; | ||
| use sentry_protos::taskbroker::v1::consumer_service_server::ConsumerService; | ||
| use sentry_protos::taskbroker::v1::{ | ||
| FetchNextTask, GetTaskRequest, GetTaskResponse, SetTaskStatusRequest, SetTaskStatusResponse, | ||
| AddWorkerRequest, AddWorkerResponse, FetchNextTask, GetTaskRequest, GetTaskResponse, | ||
| RemoveWorkerRequest, RemoveWorkerResponse, SetTaskStatusRequest, SetTaskStatusResponse, | ||
| TaskActivation, TaskActivationStatus, | ||
| }; | ||
| use std::sync::Arc; | ||
| use std::time::Instant; | ||
| use tonic::{Request, Response, Status}; | ||
|
|
||
| use crate::pool::WorkerPool; | ||
| use crate::store::inflight_activation::{InflightActivationStatus, InflightActivationStore}; | ||
| use tracing::{error, instrument}; | ||
| use tracing::{debug, error, instrument}; | ||
|
|
||
| pub struct TaskbrokerServer { | ||
| pub store: Arc<InflightActivationStore>, | ||
| pub pool: Arc<WorkerPool>, | ||
| pub push: bool, | ||
| } | ||
|
|
||
| #[tonic::async_trait] | ||
| impl ConsumerService for TaskbrokerServer { | ||
| #[instrument(skip_all)] | ||
| async fn add_worker( | ||
| &self, | ||
| request: Request<AddWorkerRequest>, | ||
| ) -> Result<Response<AddWorkerResponse>, Status> { | ||
| let address = &request.get_ref().address; | ||
| self.pool.add_worker(address).await; | ||
| Ok(Response::new(AddWorkerResponse {})) | ||
| } | ||
|
|
||
| #[instrument(skip_all)] | ||
| async fn remove_worker( | ||
| &self, | ||
| request: Request<RemoveWorkerRequest>, | ||
| ) -> Result<Response<RemoveWorkerResponse>, Status> { | ||
| let address = &request.get_ref().address; | ||
| self.pool.remove_worker(address); | ||
| Ok(Response::new(RemoveWorkerResponse {})) | ||
| } | ||
|
Comment on lines
25
to
43
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As the interface of the taskbroker service is considerably different between push and poll mode I wonder whether it would be a good idea to simply have two separate GRPC server implementations (and even the GRPC service definition). this would probably generate a more composeable system with less coupling between the two modes. In alternative we can keep a single service but it needs to reject calls that are not valid in the mode of operation: add_worker/remove_worker must immediately reject the calls when in poll mode, so get_task should do when in push mode. @evanh what do you think ?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm also not sure that having workers register with brokers is going to work out well. It is going to be hard to keep the worker lists in each broker replica current and there are edge cases like worker oomkills that will need to be handled. Having the brokers send requests into a loadbalancer that handles forwarding to worker replicas seems simpler, and we can use response codes to signal that the worker that was reached is overcapacity and have the broker retry. |
||
|
|
||
| #[instrument(skip_all)] | ||
| async fn get_task( | ||
| &self, | ||
| request: Request<GetTaskRequest>, | ||
| ) -> Result<Response<GetTaskResponse>, Status> { | ||
| if self.push { | ||
| return Err(Status::failed_precondition( | ||
| "get_task is not available in push mode", | ||
| )); | ||
| } | ||
|
|
||
| let start_time = Instant::now(); | ||
| let namespace = &request.get_ref().namespace; | ||
| let inflight = self | ||
|
|
@@ -67,6 +97,8 @@ impl ConsumerService for TaskbrokerServer { | |
| let start_time = Instant::now(); | ||
| let id = request.get_ref().id.clone(); | ||
|
|
||
| debug!("Received task status {} for {id}", request.get_ref().status); | ||
|
|
||
| let status: InflightActivationStatus = | ||
| TaskActivationStatus::try_from(request.get_ref().status) | ||
| .map_err(|e| { | ||
|
|
@@ -83,6 +115,8 @@ impl ConsumerService for TaskbrokerServer { | |
| metrics::counter!("grpc_server.set_status.failure").increment(1); | ||
| } | ||
|
|
||
| debug!("Status of task {id} set to {:?}", status); | ||
|
|
||
| let update_result = self.store.set_status(&id, status).await; | ||
| if let Err(e) = update_result { | ||
| error!( | ||
|
|
@@ -101,6 +135,10 @@ impl ConsumerService for TaskbrokerServer { | |
| return Ok(Response::new(SetTaskStatusResponse { task: None })); | ||
| }; | ||
|
|
||
| if self.push { | ||
| return Ok(Response::new(SetTaskStatusResponse { task: None })); | ||
| } | ||
|
|
||
| let start_time = Instant::now(); | ||
| let res = match self | ||
| .store | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would we have a static list of workers? With the workers generally running behind an horizontal pod scaler we may not know how many workers will be online, and the number & names of those workers will not be fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, workers can use the
AddWorkerandRemoveWorkerRPC endpoints to add themselves on startup and to remove themselves on shutdown. This field is just something I've been using for testing, or if it's useful this way, as a list of initial workers we definitely want to connect to.