feat: SSE broadcaster for agent events#435
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #435 +/- ##
==========================================
+ Coverage 87.74% 87.87% +0.13%
==========================================
Files 26 27 +1
Lines 5816 5906 +90
==========================================
+ Hits 5103 5190 +87
- Misses 713 716 +3 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
|
See WebOfTrust/signifypy#145 for the SignifyPy client implementation of this. |
|
And see the SignifyTS PR WebOfTrust/signify-ts#376 for the TypeScript client-side implementation of this. |
5b703bd to
9810ff8
Compare
9810ff8 to
a21c913
Compare
| rep.status = falcon.HTTP_200 | ||
| rep.content_type = "text/event-stream" | ||
| rep.set_header("Cache-Control", "no-cache") | ||
| rep.set_header("connection", "close") |
There was a problem hiding this comment.
If this is meant to be long lived, we shouldn't have this header
| app.add_route("/signals/stream", SignalsStreamEnd()) | ||
|
|
||
|
|
||
| class SignalsStreamEnd: |
There was a problem hiding this comment.
I presume the 2 use cases are notifications and long running operation completions. So will this endpoint be phased out in favour of those later? (OK by me)
There was a problem hiding this comment.
Really, we need to be able to replay using the "Last-Event-ID" header when a client reconnects, so I think it might be pragmatic to build this against notifications, as something that is DB backed
| ) | ||
|
|
||
|
|
||
| def signedReplyEnvelope(agent, route: str, payload: dict) -> dict: |
There was a problem hiding this comment.
This probably requires a discussion on a dev call, but does it need to be a reply envelope? Maybe a SPAC message or something makes the most sense, even if unencrypted for now.
| try: | ||
| self.broadcaster.publish( | ||
| event=cue["event"], | ||
| data=signedReplyEnvelope( |
There was a problem hiding this comment.
Possibly could sign the whole payload rather than the data
| class SseEventIterable: | ||
| """SSE iterable modeled after KERIpy signaling without shared draining.""" | ||
|
|
||
| TimeoutSSE = 300 # seconds |
There was a problem hiding this comment.
Not an SSE expert but this might be short, and we may want to consider heartbeats to keep the connection open
Signed-off-by: Kent Bull <kent@kentbull.com>
a21c913 to
8b12ea3
Compare
This adds AgentSignals, SseBroadcaster, SseBroadcasterDoer, SignalsStreamEnd , and SseEventIterable to support Signify clients connecting to a server sent events stream from a given Agent to any connected clients for that agent.
This is in preparation for future event driven features including the did:webs integration, among others. The SSE event stream is also a convenient way for clients to switch from long-polling to observing events sent from an agent. Long-polling can still be supported as a fallback, though SSE can be the default. This will significantly help with both bandwidth and performance on highly used Agency servers.
The
SseEventIterableuses the same pattern as KERIpy'sQryRpyMailboxIterableto leverage Falcon'srep.streamto open an SSE event stream to the client. Wrapping theSseBroadcaster.subscribearound theSseEventIterableallows HIO to asynchronously service the WSGI's synchronous SSE connection client.