Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .vscode/cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -2211,6 +2211,10 @@
{
"filename": "sdk/agentserver/azure-ai-agentserver-githubcopilot/**",
"words": ["RAPI", "BYOK", "byok", "NCUS", "ncusacr", "fstring", "ename", "valeriepham", "coreai", "Vnext", "PYTHONIOENCODING"]
},
{
"filename": "sdk/agentserver/azure-ai-agentserver-invocations/**",
"words": ["Segoe", "Roboto", "unconfigure"]
}
],
"allowCompoundWords": true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@

### Features Added

- Added WebSocket invocation protocol (`invocations_ws`) — merged from the standalone `azure-ai-agentserver-websocket` package.
- New `InvocationWSAgentServerHost` class exposing a single persistent WebSocket endpoint at `/invocations_ws/ws` for invoke / get_invocation / cancel_invocation actions, with built-in streaming support via async generators.
- New `InvocationWSContext` and `InvocationWSError` types passed to handler functions.
- Decorator-based handler registration: `@app.ws_invoke_handler`, `@app.ws_get_invocation_handler`, `@app.ws_cancel_invocation_handler`.
- Built-in WebSocket keep-alive with configurable `ws_ping_interval` (default 30 s) to survive Azure APIM / Load Balancer idle timeouts.
- OpenAPI spec discovery endpoint at `GET /invocations_ws/docs/openapi.json`.
- Distributed tracing with GenAI semantic-convention spans (`invoke_agent`, `get_invocation`, `cancel_invocation`) and span attributes under the `azure.ai.agentserver.invocations_ws.*` namespace.
- Cooperative multiple inheritance with `InvocationAgentServerHost` so a single host can serve both HTTP (`invocations`) and WebSocket (`invocations_ws`) protocols.

### Breaking Changes

### Bugs Fixed
Expand Down
257 changes: 254 additions & 3 deletions sdk/agentserver/azure-ai-agentserver-invocations/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
# Azure AI Agent Server Invocations client library for Python

The `azure-ai-agentserver-invocations` package provides the invocation protocol endpoints for Azure AI Hosted Agent containers. It plugs into the [`azure-ai-agentserver-core`](https://pypi.org/project/azure-ai-agentserver-core/) host framework and adds the full invocation lifecycle: `POST /invocations`, `GET /invocations/{id}`, `POST /invocations/{id}/cancel`, and `GET /invocations/docs/openapi.json`.
The `azure-ai-agentserver-invocations` package provides the invocation protocol endpoints for Azure AI Hosted Agent containers. It plugs into the [`azure-ai-agentserver-core`](https://pypi.org/project/azure-ai-agentserver-core/) host framework and supports two transport modes:

- **HTTP** (`invocations` protocol) — `POST /invocations`, `GET /invocations/{id}`, `POST /invocations/{id}/cancel`, `GET /invocations/docs/openapi.json`
- **WebSocket** (`invocations_ws` protocol) — persistent WebSocket at `/invocations_ws/ws` with invoke, get, cancel, and streaming over a single connection

## Getting started

Expand Down Expand Up @@ -182,6 +185,253 @@ app = InvocationAgentServerHost(openapi_spec={
})
```

---

## WebSocket Protocol (`invocations_ws`)

The package also ships an alternative transport that runs the same invocation lifecycle over a single persistent **WebSocket** long connection. Use this when you want lower latency for streaming, full-duplex agent interactions, or to avoid HTTP request overhead per turn.

### InvocationWSAgentServerHost

`InvocationWSAgentServerHost` is an `AgentServerHost` subclass that adds a WebSocket endpoint for the `invocations_ws` protocol. It exposes decorator methods for registering handler functions:

- `@app.ws_invoke_handler` — **Required.** Handles `invoke` actions. Supports both async functions (non-streaming) and async generators (streaming).
- `@app.ws_get_invocation_handler` — Optional. Handles `get_invocation` actions.
- `@app.ws_cancel_invocation_handler` — Optional. Handles `cancel_invocation` actions.

### InvocationWSContext

WebSocket handler functions receive an `InvocationWSContext` object containing:

- `context.invocation_id` — The invocation ID (echoed from client or auto-generated UUID).
- `context.session_id` — The resolved session ID.

### InvocationWSError

Handlers can raise `InvocationWSError(code, message)` to return a domain-specific error to the client without exposing internal details.

### WebSocket endpoint

All operations use a single persistent WebSocket connection:

| Route | Description |
|---|---|
| `ws://host:port/invocations_ws/ws` | WebSocket endpoint for all `invocations_ws` operations |
| `GET /invocations_ws/docs/openapi.json` | Serve the agent's OpenAPI 3.x spec (HTTP) |
| `GET /readiness` | Health check (HTTP) |

### Client → Server messages

All messages are JSON text frames with an `action` field:

```text
{"action": "invoke", "payload": {...}, "invocation_id": "optional", "session_id": "optional"}
{"action": "get_invocation", "invocation_id": "required"}
{"action": "cancel_invocation", "invocation_id": "required"}
{"action": "ping"}
{"action": "pong"}
```

### Server → Client messages

```text
{"type": "result", "invocation_id": "...", "session_id": "...", "payload": {...}}
{"type": "stream_chunk", "invocation_id": "...", "session_id": "...", "payload": {...}}
{"type": "stream_end", "invocation_id": "...", "session_id": "..."}
{"type": "error", "invocation_id": "...", "error": {"code": "...", "message": "..."}}
{"type": "ping"}
{"type": "pong"}
```

### WebSocket keep-alive (ping/pong)

Azure APIM and Azure Load Balancer silently drop idle WebSocket connections after approximately 4 minutes. To prevent this, the server sends periodic `{"type": "ping"}` messages to each connected client.

- **Default interval**: 30 seconds.
- **Disable**: `ws_ping_interval=0`.
- **Custom**: any positive integer, e.g. `ws_ping_interval=15`.

```python
app = InvocationWSAgentServerHost(ws_ping_interval=20) # ping every 20 seconds
```

Clients should respond with `{"action": "pong"}` when they receive a `{"type": "ping"}` message. Clients may also send `{"action": "ping"}` at any time; the server replies with `{"type": "pong"}`.

### Session ID resolution (WebSocket)

Session IDs group related invocations. Resolution order:

1. `session_id` field in the WebSocket message
2. `FOUNDRY_AGENT_SESSION_ID` environment variable
3. Auto-generated UUID

### Distributed tracing (WebSocket)

When tracing is enabled on the `AgentServerHost`, `invocations_ws` spans are automatically created with GenAI semantic conventions:

- **Span name**: `invoke_agent {FOUNDRY_AGENT_NAME}:{FOUNDRY_AGENT_VERSION}`
- **Span attributes**: `gen_ai.system`, `gen_ai.operation.name`, `gen_ai.response.id`, `gen_ai.conversation.id`, `gen_ai.agent.id`, `gen_ai.agent.name`, `gen_ai.agent.version`
- **Error tags**: `azure.ai.agentserver.invocations_ws.error.code`, `.error.message`

### WebSocket examples

#### Simple agent

```python
from azure.ai.agentserver.invocations import InvocationWSAgentServerHost, InvocationWSContext

app = InvocationWSAgentServerHost()


@app.ws_invoke_handler
async def handle(payload: dict, context: InvocationWSContext) -> dict:
return {"greeting": f"Hello, {payload['name']}!"}

app.run()
```

**Client** (using the `websockets` library):

```python
import asyncio, json, websockets

async def main():
async with websockets.connect("ws://localhost:8088/invocations_ws/ws") as ws:
await ws.send(json.dumps({
"action": "invoke",
"payload": {"name": "Alice"}
}))
while True:
msg = json.loads(await ws.recv())
if msg["type"] == "ping":
await ws.send(json.dumps({"action": "pong"}))
elif msg["type"] == "result":
print(msg["payload"]["greeting"]) # Hello, Alice!
break

asyncio.run(main())
```

#### Long-running operations with get/cancel

```python
import asyncio

from azure.ai.agentserver.invocations import (
InvocationWSAgentServerHost,
InvocationWSContext,
InvocationWSError,
)

_tasks: dict[str, asyncio.Task] = {}
_results: dict[str, dict] = {}

app = InvocationWSAgentServerHost()


@app.ws_invoke_handler
async def handle(payload: dict, context: InvocationWSContext) -> dict:
task = asyncio.create_task(do_work(context.invocation_id, payload))
_tasks[context.invocation_id] = task
return {"invocation_id": context.invocation_id, "status": "running"}


@app.ws_get_invocation_handler
async def get_invocation(context: InvocationWSContext) -> dict:
if context.invocation_id in _results:
return _results[context.invocation_id]
if context.invocation_id in _tasks:
return {"invocation_id": context.invocation_id, "status": "running"}
raise InvocationWSError("not_found", "Invocation not found")


@app.ws_cancel_invocation_handler
async def cancel_invocation(context: InvocationWSContext) -> dict:
if context.invocation_id in _tasks:
_tasks[context.invocation_id].cancel()
del _tasks[context.invocation_id]
return {"invocation_id": context.invocation_id, "status": "cancelled"}
raise InvocationWSError("not_found", "Invocation not found")
```

#### Streaming

Use an async generator to stream chunks back to the client. Each yielded dict is sent as a `stream_chunk` message, followed by a `stream_end` when the generator completes.

```python
from azure.ai.agentserver.invocations import InvocationWSAgentServerHost, InvocationWSContext

app = InvocationWSAgentServerHost()


@app.ws_invoke_handler
async def handle(payload: dict, context: InvocationWSContext):
for word in ["Hello", " ", "world", "!"]:
yield {"delta": word}
```

#### Multi-turn conversation

Use the `session_id` field to group invocations over the same WebSocket connection:

```python
import asyncio, json, websockets

async def main():
async with websockets.connect("ws://localhost:8088/invocations_ws/ws") as ws:
# First turn
await ws.send(json.dumps({
"action": "invoke",
"session_id": "session-abc",
"payload": {"message": "My name is Alice"},
}))
print(json.loads(await ws.recv()))

# Second turn (same session, same connection)
await ws.send(json.dumps({
"action": "invoke",
"session_id": "session-abc",
"payload": {"message": "What is my name?"},
}))
print(json.loads(await ws.recv()))

asyncio.run(main())
```

#### Combined HTTP + WebSocket host

Use cooperative multiple inheritance to serve both `invocations` (HTTP) and `invocations_ws` (WebSocket) protocols on the same server:

```python
from azure.ai.agentserver.invocations import (
InvocationAgentServerHost,
InvocationWSAgentServerHost,
InvocationWSContext,
)
from starlette.requests import Request
from starlette.responses import JSONResponse, Response


class MyAgentHost(InvocationWSAgentServerHost, InvocationAgentServerHost):
pass


app = MyAgentHost()


@app.invoke_handler # HTTP — POST /invocations
async def handle_http(request: Request) -> Response:
data = await request.json()
return JSONResponse({"greeting": f"Hello, {data['name']}!"})


@app.ws_invoke_handler # WebSocket — /invocations_ws/ws
async def handle_ws(payload: dict, context: InvocationWSContext) -> dict:
return {"greeting": f"Hello, {payload['name']}!"}

app.run()
```

## Troubleshooting

### Reporting issues
Expand All @@ -194,8 +444,9 @@ Visit the [Samples](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/

| Sample | Description |
|---|---|
| [simple_invoke_agent](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/agentserver/azure-ai-agentserver-invocations/samples/simple_invoke_agent/) | Minimal synchronous request-response |
| [async_invoke_agent](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/agentserver/azure-ai-agentserver-invocations/samples/async_invoke_agent/) | Long-running operations with polling and cancellation |
| [simple_invoke_agent](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/agentserver/azure-ai-agentserver-invocations/samples/simple_invoke_agent/) | Minimal synchronous request-response (HTTP) |
| [async_invoke_agent](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/agentserver/azure-ai-agentserver-invocations/samples/async_invoke_agent/) | Long-running operations with polling and cancellation (HTTP) |
| [streaming_ws_invoke_agent](https://github.com/Azure/azure-sdk-for-python/tree/main/sdk/agentserver/azure-ai-agentserver-invocations/samples/streaming_ws_invoke_agent/) | Streaming token-by-token echo over both WebSocket and HTTP (SSE) |

## Contributing

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
# ---------------------------------------------------------
"""Invocations protocol for Azure AI Hosted Agents.

This package provides an invocation protocol host as a subclass of
This package provides invocation protocol hosts as subclasses of
:class:`~azure.ai.agentserver.core.AgentServerHost`.

Quick start::
**HTTP protocol** (``invocations``)::

from azure.ai.agentserver.invocations import InvocationAgentServerHost
from starlette.responses import JSONResponse
Expand All @@ -18,11 +18,29 @@ async def handle(request):
return JSONResponse({"ok": True})

app.run()

**WebSocket protocol** (``invocations_ws``)::

from azure.ai.agentserver.invocations import InvocationWSAgentServerHost, InvocationWSContext

app = InvocationWSAgentServerHost()

@app.ws_invoke_handler
async def handle(payload, context):
return {"reply": "hello"}

app.run()
"""
__path__ = __import__("pkgutil").extend_path(__path__, __name__)

from ._invocation import InvocationAgentServerHost
from ._invocation_ws import InvocationWSAgentServerHost, InvocationWSContext, InvocationWSError
from ._version import VERSION

__all__ = ["InvocationAgentServerHost"]
__all__ = [
"InvocationAgentServerHost",
"InvocationWSAgentServerHost",
"InvocationWSContext",
"InvocationWSError",
]
__version__ = VERSION
Loading