Skip to content

feat: SSE broadcaster for agent events#435

Open
kentbull wants to merge 1 commit into
WebOfTrust:mainfrom
kentbull:feat/agent-signals-sse
Open

feat: SSE broadcaster for agent events#435
kentbull wants to merge 1 commit into
WebOfTrust:mainfrom
kentbull:feat/agent-signals-sse

Conversation

@kentbull
Copy link
Copy Markdown
Collaborator

@kentbull kentbull commented May 6, 2026

This adds AgentSignals, SseBroadcaster, SseBroadcasterDoer, SignalsStreamEnd , and SseEventIterable to support Signify clients connecting to a server sent events stream from a given Agent to any connected clients for that agent.

This is in preparation for future event driven features including the did:webs integration, among others. The SSE event stream is also a convenient way for clients to switch from long-polling to observing events sent from an agent. Long-polling can still be supported as a fallback, though SSE can be the default. This will significantly help with both bandwidth and performance on highly used Agency servers.

The SseEventIterable uses the same pattern as KERIpy's QryRpyMailboxIterable to leverage Falcon's rep.stream to open an SSE event stream to the client. Wrapping the SseBroadcaster.subscribe around the SseEventIterable allows HIO to asynchronously service the WSGI's synchronous SSE connection client.

@codecov
Copy link
Copy Markdown

codecov Bot commented May 6, 2026

Codecov Report

❌ Patch coverage is 96.70330% with 3 lines in your changes missing coverage. Please review.
✅ Project coverage is 87.87%. Comparing base (93f6fb2) to head (8b12ea3).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
src/keria/app/streaming.py 96.51% 3 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main     #435      +/-   ##
==========================================
+ Coverage   87.74%   87.87%   +0.13%     
==========================================
  Files          26       27       +1     
  Lines        5816     5906      +90     
==========================================
+ Hits         5103     5190      +87     
- Misses        713      716       +3     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@kentbull
Copy link
Copy Markdown
Collaborator Author

kentbull commented May 6, 2026

See WebOfTrust/signifypy#145 for the SignifyPy client implementation of this.

@kentbull
Copy link
Copy Markdown
Collaborator Author

kentbull commented May 7, 2026

And see the SignifyTS PR WebOfTrust/signify-ts#376 for the TypeScript client-side implementation of this.

@kentbull kentbull force-pushed the feat/agent-signals-sse branch from 5b703bd to 9810ff8 Compare May 11, 2026 20:49
@kentbull kentbull force-pushed the feat/agent-signals-sse branch from 9810ff8 to a21c913 Compare May 26, 2026 22:39
rep.status = falcon.HTTP_200
rep.content_type = "text/event-stream"
rep.set_header("Cache-Control", "no-cache")
rep.set_header("connection", "close")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this is meant to be long lived, we shouldn't have this header

app.add_route("/signals/stream", SignalsStreamEnd())


class SignalsStreamEnd:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I presume the 2 use cases are notifications and long running operation completions. So will this endpoint be phased out in favour of those later? (OK by me)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really, we need to be able to replay using the "Last-Event-ID" header when a client reconnects, so I think it might be pragmatic to build this against notifications, as something that is DB backed

)


def signedReplyEnvelope(agent, route: str, payload: dict) -> dict:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably requires a discussion on a dev call, but does it need to be a reply envelope? Maybe a SPAC message or something makes the most sense, even if unencrypted for now.

try:
self.broadcaster.publish(
event=cue["event"],
data=signedReplyEnvelope(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly could sign the whole payload rather than the data

class SseEventIterable:
"""SSE iterable modeled after KERIpy signaling without shared draining."""

TimeoutSSE = 300 # seconds
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not an SSE expert but this might be short, and we may want to consider heartbeats to keep the connection open

Signed-off-by: Kent Bull <kent@kentbull.com>
@kentbull kentbull force-pushed the feat/agent-signals-sse branch from a21c913 to 8b12ea3 Compare May 27, 2026 20:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants