Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
jwt_authorization_middleware,
jwt_authorization_decorator,
)
from .app.streaming import (

# Import streaming utilities from core for backward compatibility
from microsoft_agents.hosting.core.app.streaming import (
Citation,
CitationUtil,
StreamingResponse,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@

import asyncio
import logging
from typing import List, Optional, Callable, Literal, TYPE_CHECKING
from dataclasses import dataclass
from typing import List, Optional, Callable, Literal

from microsoft_agents.activity import (
Activity,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,194 +1,143 @@
# Copyright (c) Microsoft Corporation. All rights reserved.
# Licensed under the MIT License.
import json
from typing import List, Union, Type

from aiohttp.web import RouteTableDef, Request, Response

from microsoft_agents.activity import (
AgentsModel,
Activity,
AttachmentData,
ConversationParameters,
Transcript,
)
from microsoft_agents.hosting.core import ChannelApiHandlerProtocol
from microsoft_agents.hosting.core.http import ChannelServiceRoutes


async def deserialize_from_body(
request: Request, target_model: Type[AgentsModel]
) -> Activity:
if "application/json" in request.headers["Content-Type"]:
body = await request.json()
else:
return Response(status=415)
class AiohttpRequestAdapter:
"""Adapter for aiohttp requests to use with ChannelServiceRoutes."""

return target_model.model_validate(body)
def __init__(self, request: Request):
self._request = request

@property
def method(self) -> str:
return self._request.method

def get_serialized_response(
model_or_list: Union[AgentsModel, List[AgentsModel]],
) -> Response:
if isinstance(model_or_list, AgentsModel):
json_obj = model_or_list.model_dump(
mode="json", exclude_unset=True, by_alias=True
)
else:
json_obj = [
model.model_dump(mode="json", exclude_unset=True, by_alias=True)
for model in model_or_list
]
@property
def headers(self):
return self._request.headers

async def json(self):
return await self._request.json()

def get_claims_identity(self):
return self._request.get("claims_identity")

return Response(body=json.dumps(json_obj), content_type="application/json")
def get_path_param(self, name: str) -> str:
return self._request.match_info[name]


def channel_service_route_table(
handler: ChannelApiHandlerProtocol, base_url: str = ""
) -> RouteTableDef:
# pylint: disable=unused-variable
"""Create aiohttp route table for Channel Service API.

Args:
handler: The handler that implements the Channel API protocol.
base_url: Optional base URL prefix for all routes.

Returns:
RouteTableDef with all channel service routes.
"""
routes = RouteTableDef()
service_routes = ChannelServiceRoutes(handler, base_url)

def json_response(data: dict) -> Response:
return Response(body=json.dumps(data), content_type="application/json")

@routes.post(base_url + "/v3/conversations/{conversation_id}/activities")
async def send_to_conversation(request: Request):
activity = await deserialize_from_body(request, Activity)
result = await handler.on_send_to_conversation(
request.get("claims_identity"),
request.match_info["conversation_id"],
activity,
result = await service_routes.send_to_conversation(
AiohttpRequestAdapter(request)
)

return get_serialized_response(result)
return json_response(result)

@routes.post(
base_url + "/v3/conversations/{conversation_id}/activities/{activity_id}"
)
async def reply_to_activity(request: Request):
activity = await deserialize_from_body(request, Activity)
result = await handler.on_reply_to_activity(
request.get("claims_identity"),
request.match_info["conversation_id"],
request.match_info["activity_id"],
activity,
)

return get_serialized_response(result)
result = await service_routes.reply_to_activity(AiohttpRequestAdapter(request))
return json_response(result)

@routes.put(
base_url + "/v3/conversations/{conversation_id}/activities/{activity_id}"
)
async def update_activity(request: Request):
activity = await deserialize_from_body(request, Activity)
result = await handler.on_update_activity(
request.get("claims_identity"),
request.match_info["conversation_id"],
request.match_info["activity_id"],
activity,
)

return get_serialized_response(result)
result = await service_routes.update_activity(AiohttpRequestAdapter(request))
return json_response(result)

@routes.delete(
base_url + "/v3/conversations/{conversation_id}/activities/{activity_id}"
)
async def delete_activity(request: Request):
await handler.on_delete_activity(
request.get("claims_identity"),
request.match_info["conversation_id"],
request.match_info["activity_id"],
)

await service_routes.delete_activity(AiohttpRequestAdapter(request))
return Response()

@routes.get(
base_url
+ "/v3/conversations/{conversation_id}/activities/{activity_id}/members"
)
async def get_activity_members(request: Request):
result = await handler.on_get_activity_members(
request.get("claims_identity"),
request.match_info["conversation_id"],
request.match_info["activity_id"],
result = await service_routes.get_activity_members(
AiohttpRequestAdapter(request)
)

return get_serialized_response(result)
return json_response(result)

@routes.post(base_url + "/")
async def create_conversation(request: Request):
conversation_parameters = deserialize_from_body(request, ConversationParameters)
result = await handler.on_create_conversation(
request.get("claims_identity"), conversation_parameters
result = await service_routes.create_conversation(
AiohttpRequestAdapter(request)
)

return get_serialized_response(result)
return json_response(result)

@routes.get(base_url + "/")
async def get_conversation(request: Request):
# TODO: continuation token? conversation_id?
result = await handler.on_get_conversations(
request.get("claims_identity"), None
)

return get_serialized_response(result)
result = await service_routes.get_conversations(AiohttpRequestAdapter(request))
return json_response(result)

@routes.get(base_url + "/v3/conversations/{conversation_id}/members")
async def get_conversation_members(request: Request):
result = await handler.on_get_conversation_members(
request.get("claims_identity"),
request.match_info["conversation_id"],
result = await service_routes.get_conversation_members(
AiohttpRequestAdapter(request)
)

return get_serialized_response(result)
return json_response(result)

@routes.get(base_url + "/v3/conversations/{conversation_id}/members/{member_id}")
async def get_conversation_member(request: Request):
result = await handler.on_get_conversation_member(
request.get("claims_identity"),
request.match_info["member_id"],
request.match_info["conversation_id"],
result = await service_routes.get_conversation_member(
AiohttpRequestAdapter(request)
)

return get_serialized_response(result)
return json_response(result)

@routes.get(base_url + "/v3/conversations/{conversation_id}/pagedmembers")
async def get_conversation_paged_members(request: Request):
# TODO: continuation token? page size?
result = await handler.on_get_conversation_paged_members(
request.get("claims_identity"),
request.match_info["conversation_id"],
result = await service_routes.get_conversation_paged_members(
AiohttpRequestAdapter(request)
)

return get_serialized_response(result)
return json_response(result)

@routes.delete(base_url + "/v3/conversations/{conversation_id}/members/{member_id}")
async def delete_conversation_member(request: Request):
result = await handler.on_delete_conversation_member(
request.get("claims_identity"),
request.match_info["conversation_id"],
request.match_info["member_id"],
result = await service_routes.delete_conversation_member(
AiohttpRequestAdapter(request)
)

return get_serialized_response(result)
return json_response(result)

@routes.post(base_url + "/v3/conversations/{conversation_id}/activities/history")
async def send_conversation_history(request: Request):
transcript = deserialize_from_body(request, Transcript)
result = await handler.on_send_conversation_history(
request.get("claims_identity"),
request.match_info["conversation_id"],
transcript,
result = await service_routes.send_conversation_history(
AiohttpRequestAdapter(request)
)

return get_serialized_response(result)
return json_response(result)

@routes.post(base_url + "/v3/conversations/{conversation_id}/attachments")
async def upload_attachment(request: Request):
attachment_data = deserialize_from_body(request, AttachmentData)
result = await handler.on_upload_attachment(
request.get("claims_identity"),
request.match_info["conversation_id"],
attachment_data,
)

return get_serialized_response(result)
result = await service_routes.upload_attachment(AiohttpRequestAdapter(request))
return json_response(result)

return routes
Loading
Loading