Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion src/a2a/compat/v0_3/jsonrpc_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@
from a2a.server.jsonrpc_models import (
JSONRPCError as CoreJSONRPCError,
)
from a2a.utils import constants
from a2a.utils.errors import ExtendedAgentCardNotConfiguredError
from a2a.utils.helpers import maybe_await
from a2a.utils.helpers import maybe_await, validate_version


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -152,6 +153,7 @@ async def handle_request(
request_id, CoreInternalError(message=str(e))
)

@validate_version(constants.PROTOCOL_VERSION_0_3)
async def _process_non_streaming_request(
self,
request_id: 'str | int | None',
Expand Down Expand Up @@ -266,6 +268,7 @@ async def get_authenticated_extended_card(

return conversions.to_compat_agent_card(card_to_serve)

@validate_version(constants.PROTOCOL_VERSION_0_3)
async def _process_streaming_request(
self,
request_id: 'str | int | None',
Expand Down
16 changes: 15 additions & 1 deletion src/a2a/compat/v0_3/rest_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@
from a2a.compat.v0_3 import types as types_v03
from a2a.compat.v0_3.request_handler import RequestHandler03
from a2a.server.context import ServerCallContext
from a2a.utils.helpers import validate, validate_async_generator
from a2a.utils import constants
from a2a.utils.helpers import (
validate,
validate_async_generator,
validate_version,
)
from a2a.utils.telemetry import SpanKind, trace_class


Expand All @@ -53,6 +58,7 @@ def __init__(
self.agent_card = agent_card
self.handler03 = RequestHandler03(request_handler=request_handler)

@validate_version(constants.PROTOCOL_VERSION_0_3)
async def on_message_send(
self,
request: Request,
Expand All @@ -78,6 +84,7 @@ async def on_message_send(
pb2_v03_resp = proto_utils.ToProto.task_or_message(v03_resp)
return MessageToDict(pb2_v03_resp)

@validate_version(constants.PROTOCOL_VERSION_0_3)
@validate_async_generator(
lambda self: self.agent_card.capabilities.streaming,
'Streaming is not supported by the agent',
Expand Down Expand Up @@ -110,6 +117,7 @@ async def on_message_send_stream(
)
yield MessageToDict(v03_pb_resp)

@validate_version(constants.PROTOCOL_VERSION_0_3)
async def on_cancel_task(
self,
request: Request,
Expand All @@ -134,6 +142,7 @@ async def on_cancel_task(
pb2_v03_task = proto_utils.ToProto.task(v03_resp)
return MessageToDict(pb2_v03_task)

@validate_version(constants.PROTOCOL_VERSION_0_3)
@validate_async_generator(
lambda self: self.agent_card.capabilities.streaming,
'Streaming is not supported by the agent',
Expand Down Expand Up @@ -166,6 +175,7 @@ async def on_subscribe_to_task(
)
yield MessageToDict(v03_pb_resp)

@validate_version(constants.PROTOCOL_VERSION_0_3)
async def get_push_notification(
self,
request: Request,
Expand Down Expand Up @@ -198,6 +208,7 @@ async def get_push_notification(
)
return MessageToDict(pb2_v03_config)

@validate_version(constants.PROTOCOL_VERSION_0_3)
@validate(
lambda self: self.agent_card.capabilities.push_notifications,
'Push notifications are not supported by the agent',
Expand Down Expand Up @@ -242,6 +253,7 @@ async def set_push_notification(
)
return MessageToDict(pb2_v03_config)

@validate_version(constants.PROTOCOL_VERSION_0_3)
async def on_get_task(
self,
request: Request,
Expand Down Expand Up @@ -271,6 +283,7 @@ async def on_get_task(
pb2_v03_task = proto_utils.ToProto.task(v03_resp)
return MessageToDict(pb2_v03_task)

@validate_version(constants.PROTOCOL_VERSION_0_3)
async def list_push_notifications(
self,
request: Request,
Expand All @@ -297,6 +310,7 @@ async def list_push_notifications(

return MessageToDict(pb2_v03_resp)

@validate_version(constants.PROTOCOL_VERSION_0_3)
async def list_tasks(
self,
request: Request,
Expand Down
4 changes: 3 additions & 1 deletion src/a2a/server/apps/jsonrpc/jsonrpc_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ def __init__( # noqa: PLR0913
agent_card=agent_card,
http_handler=http_handler,
extended_agent_card=extended_agent_card,
context_builder=context_builder,
context_builder=self._context_builder,
card_modifier=card_modifier,
extended_card_modifier=extended_card_modifier,
)
Expand Down Expand Up @@ -444,6 +444,8 @@ async def _handle_requests(self, request: Request) -> Response: # noqa: PLR0911
InvalidRequestError(message='Payload too large'),
)
raise e
except A2AError as e:
return self._generate_error_response(request_id, e)
except Exception as e:
logger.exception('Unhandled exception')
return self._generate_error_response(
Expand Down
19 changes: 17 additions & 2 deletions src/a2a/server/request_handlers/jsonrpc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
Task,
TaskPushNotificationConfig,
)
from a2a.utils import proto_utils
from a2a.utils import constants, proto_utils
from a2a.utils.errors import (
JSON_RPC_ERROR_CODE_MAP,
A2AError,
Expand All @@ -49,7 +49,12 @@
UnsupportedOperationError,
VersionNotSupportedError,
)
from a2a.utils.helpers import maybe_await, validate, validate_async_generator
from a2a.utils.helpers import (
maybe_await,
validate,
validate_async_generator,
validate_version,
)
from a2a.utils.telemetry import SpanKind, trace_class


Expand Down Expand Up @@ -142,6 +147,7 @@
return None
return context.state.get('request_id')

@validate_version(constants.PROTOCOL_VERSION_1_0)
async def on_message_send(
self,
request: SendMessageRequest,
Expand Down Expand Up @@ -171,6 +177,7 @@
except A2AError as e:
return _build_error_response(request_id, e)

@validate_version(constants.PROTOCOL_VERSION_1_0)
@validate_async_generator(
lambda self: self.agent_card.capabilities.streaming,
'Streaming is not supported by the agent',
Expand All @@ -192,24 +199,25 @@
Dict representations of JSON-RPC responses containing streaming events.
"""
try:
async for event in self.request_handler.on_message_send_stream(
request, context
):
# Wrap the event in StreamResponse for consistent client parsing
stream_response = proto_utils.to_stream_response(event)
result = MessageToDict(
stream_response, preserving_proto_field_name=False
)
yield _build_success_response(
self._get_request_id(context), result
)
except A2AError as e:
yield _build_error_response(
self._get_request_id(context),
e,
)

@validate_version(constants.PROTOCOL_VERSION_1_0)
async def on_cancel_task(

Check notice on line 220 in src/a2a/server/request_handlers/jsonrpc_handler.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Copy/pasted code

see src/a2a/server/request_handlers/jsonrpc_handler.py (268-286)
self,
request: CancelTaskRequest,
context: ServerCallContext,
Expand All @@ -225,17 +233,18 @@
"""
request_id = self._get_request_id(context)
try:
task = await self.request_handler.on_cancel_task(request, context)
except A2AError as e:
return _build_error_response(request_id, e)

if task:
result = MessageToDict(task, preserving_proto_field_name=False)
return _build_success_response(request_id, result)

return _build_error_response(request_id, TaskNotFoundError())

@validate_version(constants.PROTOCOL_VERSION_1_0)
@validate_async_generator(

Check notice on line 247 in src/a2a/server/request_handlers/jsonrpc_handler.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Copy/pasted code

see src/a2a/server/request_handlers/jsonrpc_handler.py (367-378)
lambda self: self.agent_card.capabilities.streaming,
'Streaming is not supported by the agent',
)
Expand All @@ -256,24 +265,25 @@
Dict representations of JSON-RPC responses containing streaming events.
"""
try:
async for event in self.request_handler.on_subscribe_to_task(
request, context
):
# Wrap the event in StreamResponse for consistent client parsing
stream_response = proto_utils.to_stream_response(event)
result = MessageToDict(
stream_response, preserving_proto_field_name=False
)
yield _build_success_response(
self._get_request_id(context), result
)
except A2AError as e:
yield _build_error_response(
self._get_request_id(context),
e,
)

@validate_version(constants.PROTOCOL_VERSION_1_0)
async def get_push_notification_config(

Check notice on line 286 in src/a2a/server/request_handlers/jsonrpc_handler.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Copy/pasted code

see src/a2a/server/request_handlers/jsonrpc_handler.py (202-220)
self,
request: GetTaskPushNotificationConfigRequest,
context: ServerCallContext,
Expand All @@ -299,6 +309,7 @@
except A2AError as e:
return _build_error_response(request_id, e)

@validate_version(constants.PROTOCOL_VERSION_1_0)
@validate(
lambda self: self.agent_card.capabilities.push_notifications,
'Push notifications are not supported by the agent',
Expand Down Expand Up @@ -336,6 +347,7 @@
except A2AError as e:
return _build_error_response(request_id, e)

@validate_version(constants.PROTOCOL_VERSION_1_0)
async def on_get_task(
self,
request: GetTaskRequest,
Expand All @@ -352,17 +364,18 @@
"""
request_id = self._get_request_id(context)
try:
task = await self.request_handler.on_get_task(request, context)
except A2AError as e:
return _build_error_response(request_id, e)

if task:
result = MessageToDict(task, preserving_proto_field_name=False)
return _build_success_response(request_id, result)

return _build_error_response(request_id, TaskNotFoundError())

@validate_version(constants.PROTOCOL_VERSION_1_0)
async def list_tasks(

Check notice on line 378 in src/a2a/server/request_handlers/jsonrpc_handler.py

View workflow job for this annotation

GitHub Actions / Lint Code Base

Copy/pasted code

see src/a2a/server/request_handlers/jsonrpc_handler.py (236-247)
self,
request: ListTasksRequest,
context: ServerCallContext,
Expand Down Expand Up @@ -390,6 +403,7 @@
except A2AError as e:
return _build_error_response(request_id, e)

@validate_version(constants.PROTOCOL_VERSION_1_0)
async def list_push_notification_configs(
self,
request: ListTaskPushNotificationConfigsRequest,
Expand All @@ -415,6 +429,7 @@
except A2AError as e:
return _build_error_response(request_id, e)

@validate_version(constants.PROTOCOL_VERSION_1_0)
async def delete_push_notification_config(
self,
request: DeleteTaskPushNotificationConfigRequest,
Expand Down
18 changes: 16 additions & 2 deletions src/a2a/server/request_handlers/rest_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@
GetTaskPushNotificationConfigRequest,
SubscribeToTaskRequest,
)
from a2a.utils import proto_utils
from a2a.utils import constants, proto_utils
from a2a.utils.errors import TaskNotFoundError
from a2a.utils.helpers import validate, validate_async_generator
from a2a.utils.helpers import (
validate,
validate_async_generator,
validate_version,
)
from a2a.utils.telemetry import SpanKind, trace_class


Expand Down Expand Up @@ -61,6 +65,7 @@ def __init__(
self.agent_card = agent_card
self.request_handler = request_handler

@validate_version(constants.PROTOCOL_VERSION_1_0)
async def on_message_send(
self,
request: Request,
Expand All @@ -87,6 +92,7 @@ async def on_message_send(
response = a2a_pb2.SendMessageResponse(message=task_or_message)
return MessageToDict(response)

@validate_version(constants.PROTOCOL_VERSION_1_0)
@validate_async_generator(
lambda self: self.agent_card.capabilities.streaming,
'Streaming is not supported by the agent',
Expand Down Expand Up @@ -117,6 +123,7 @@ async def on_message_send_stream(
response = proto_utils.to_stream_response(event)
yield MessageToDict(response)

@validate_version(constants.PROTOCOL_VERSION_1_0)
async def on_cancel_task(
self,
request: Request,
Expand All @@ -139,6 +146,7 @@ async def on_cancel_task(
return MessageToDict(task)
raise TaskNotFoundError

@validate_version(constants.PROTOCOL_VERSION_1_0)
@validate_async_generator(
lambda self: self.agent_card.capabilities.streaming,
'Streaming is not supported by the agent',
Expand All @@ -165,6 +173,7 @@ async def on_subscribe_to_task(
):
yield MessageToDict(proto_utils.to_stream_response(event))

@validate_version(constants.PROTOCOL_VERSION_1_0)
async def get_push_notification(
self,
request: Request,
Expand Down Expand Up @@ -192,6 +201,7 @@ async def get_push_notification(
)
return MessageToDict(config)

@validate_version(constants.PROTOCOL_VERSION_1_0)
@validate(
lambda self: self.agent_card.capabilities.push_notifications,
'Push notifications are not supported by the agent',
Expand Down Expand Up @@ -229,6 +239,7 @@ async def set_push_notification(
)
return MessageToDict(config)

@validate_version(constants.PROTOCOL_VERSION_1_0)
async def on_get_task(
self,
request: Request,
Expand All @@ -251,6 +262,7 @@ async def on_get_task(
return MessageToDict(task)
raise TaskNotFoundError

@validate_version(constants.PROTOCOL_VERSION_1_0)
async def delete_push_notification(
self,
request: Request,
Expand All @@ -275,6 +287,7 @@ async def delete_push_notification(
)
return {}

@validate_version(constants.PROTOCOL_VERSION_1_0)
async def list_tasks(
self,
request: Request,
Expand All @@ -295,6 +308,7 @@ async def list_tasks(
result = await self.request_handler.on_list_tasks(params, context)
return MessageToDict(result, always_print_fields_with_no_presence=True)

@validate_version(constants.PROTOCOL_VERSION_1_0)
async def list_push_notifications(
self,
request: Request,
Expand Down
Loading
Loading