Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 98 additions & 1 deletion dapr/src/appcallback.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::dapr::proto::runtime::v1::app_callback_alpha_server::AppCallbackAlpha;
use crate::dapr::proto::runtime::v1::app_callback_server::AppCallback;
use crate::dapr::proto::{common, runtime};
use std::collections::HashMap;
Expand Down Expand Up @@ -40,6 +41,12 @@ pub type TopicEventBulkRequest = runtime::v1::TopicEventBulkRequest;
/// It includes the result for each event in the request.
pub type TopicEventBulkResponse = runtime::v1::TopicEventBulkResponse;

/// JobEventRequest is the request message for a job event callback.
pub type JobEventRequest = runtime::v1::JobEventRequest;

/// JobEventResponse is the response from the app when a job is triggered.
pub type JobEventResponse = runtime::v1::JobEventResponse;

impl ListTopicSubscriptionsResponse {
/// Create `ListTopicSubscriptionsResponse` with a topic.
pub fn topic(pubsub_name: String, topic: String) -> Self {
Expand Down Expand Up @@ -82,6 +89,7 @@ impl ListInputBindingsResponse {

pub struct AppCallbackService {
handlers: Vec<Handler>,
job_handlers: HashMap<String, Box<dyn JobHandlerMethod + Send + Sync + 'static>>,
}

pub struct Handler {
Expand Down Expand Up @@ -156,6 +164,52 @@ impl AppCallback for AppCallbackService {
) -> Result<Response<TopicEventBulkResponse>, Status> {
todo!("on_bulk_topic_event is not implemented yet")
}

async fn on_job_event(
&self,
request: Request<runtime::v1::JobEventRequest>,
) -> Result<Response<runtime::v1::JobEventResponse>, Status> {
let request_inner = request.into_inner();
let job_name = if !request_inner.name.is_empty() {
request_inner.name.clone()
} else if let Some(stripped) = request_inner.method.strip_prefix("job/") {
stripped.to_string()
} else {
return Err(Status::invalid_argument(format!(
"cannot determine job name from request (method={:?})",
request_inner.method,
)));
};

if let Some(handler) = self.job_handlers.get(&job_name) {
let handle_response = handler.handler(request_inner).await;
handle_response.map(Response::new)
} else {
Err(Status::not_found(format!(
"no handler registered for job {:?}",
job_name,
)))
}
}
}

// Also implement AppCallbackAlpha so the same service handles
// Dapr ≤ 1.17 runtimes that call OnJobEventAlpha1 / OnBulkTopicEventAlpha1.
#[tonic::async_trait]
impl AppCallbackAlpha for AppCallbackService {
async fn on_bulk_topic_event_alpha1(
&self,
request: Request<runtime::v1::TopicEventBulkRequest>,
) -> Result<Response<runtime::v1::TopicEventBulkResponse>, Status> {
self.on_bulk_topic_event(request).await
}

async fn on_job_event_alpha1(
&self,
request: Request<runtime::v1::JobEventRequest>,
) -> Result<Response<runtime::v1::JobEventResponse>, Status> {
self.on_job_event(request).await
}
}

impl Default for AppCallbackService {
Expand Down Expand Up @@ -192,12 +246,19 @@ impl AppCallbackService {
/// The actor HTTP server ([`crate::server::DaprHttpServer`]) installs
/// the layer automatically.
pub fn new() -> AppCallbackService {
AppCallbackService { handlers: vec![] }
AppCallbackService {
handlers: vec![],
job_handlers: HashMap::new(),
}
}

pub fn add_handler(&mut self, handler: Handler) {
self.handlers.push(handler)
}

pub fn add_job_handler(&mut self, job_name: String, handler: Box<dyn JobHandlerMethod>) {
self.job_handlers.insert(job_name, handler);
}
}

#[tonic::async_trait]
Expand All @@ -207,3 +268,39 @@ pub trait HandlerMethod: Send + Sync + 'static {
request: runtime::v1::TopicEventRequest,
) -> Result<Response<runtime::v1::TopicEventResponse>, Status>;
}

#[tonic::async_trait]
pub trait JobHandlerMethod: Send + Sync + 'static {
async fn handler(
&self,
request: runtime::v1::JobEventRequest,
) -> Result<runtime::v1::JobEventResponse, Status>;
}

#[macro_export]
macro_rules! add_job_handler {
($app_callback_service:expr, $handler_name:ident, $handler_fn:expr) => {
pub struct $handler_name {}

#[$crate::reexport::async_trait]
impl $crate::appcallback::JobHandlerMethod for $handler_name {
async fn handler(
&self,
request: $crate::appcallback::JobEventRequest,
) -> ::std::result::Result<$crate::appcallback::JobEventResponse, ::tonic::Status>
{
$handler_fn(request).await
}
}

impl $handler_name {
pub fn new() -> Self {
$handler_name {}
}
}

let handler_name = $handler_name.to_string();

$app_callback_service.add_job_handler(handler_name, Box::new($handler_name::new()));
};
}
Loading
Loading