Skip to content

Commit 574c673

Browse files
authored
feat: Add AsyncSSEClient with aiohttp-based async/await support (#58)
Adds AsyncSSEClient as a purely additive new public API alongside the existing SSEClient. Async users install with the [async] extra to get aiohttp; sync users have no new dependencies. All existing tests pass unchanged.
1 parent c460708 commit 574c673

22 files changed

+1704
-22
lines changed

.github/workflows/ci.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ jobs:
4646
- name: run SSE contract tests
4747
run: make run-contract-tests
4848

49+
- name: start async SSE contract test service
50+
run: make start-async-contract-test-service-bg
51+
52+
- name: run async SSE contract tests
53+
run: make run-async-contract-tests
54+
4955
windows:
5056
runs-on: windows-latest
5157

CONTRIBUTING.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@ poetry install
2323
eval $(poetry env activate)
2424
```
2525

26+
To also install the optional async dependencies (required to use `AsyncSSEClient`):
27+
28+
```
29+
poetry install --extras async
30+
```
31+
2632
### Testing
2733

2834
To run all unit tests:
@@ -36,6 +42,11 @@ To run the standardized contract tests that are run against all LaunchDarkly SSE
3642
make contract-tests
3743
```
3844

45+
To run the same contract tests against the async implementation:
46+
```
47+
make async-contract-tests
48+
```
49+
3950
### Linting
4051

4152
To run the linter and check type hints:

Makefile

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
PYTEST_FLAGS=-W error::SyntaxWarning
22

33
TEMP_TEST_OUTPUT=/tmp/sse-contract-test-service.log
4+
TEMP_ASYNC_TEST_OUTPUT=/tmp/sse-async-contract-test-service.log
45

56
SPHINXOPTS = -W --keep-going
67
SPHINXBUILD = sphinx-build
@@ -70,3 +71,21 @@ run-contract-tests:
7071
.PHONY: contract-tests
7172
contract-tests: #! Run the SSE contract test harness
7273
contract-tests: install-contract-tests-deps start-contract-test-service-bg run-contract-tests
74+
75+
.PHONY: start-async-contract-test-service
76+
start-async-contract-test-service:
77+
@cd contract-tests && poetry run python async_service.py 8001
78+
79+
.PHONY: start-async-contract-test-service-bg
80+
start-async-contract-test-service-bg:
81+
@echo "Async test service output will be captured in $(TEMP_ASYNC_TEST_OUTPUT)"
82+
@make start-async-contract-test-service >$(TEMP_ASYNC_TEST_OUTPUT) 2>&1 &
83+
84+
.PHONY: run-async-contract-tests
85+
run-async-contract-tests:
86+
@curl -s https://raw.githubusercontent.com/launchdarkly/sse-contract-tests/main/downloader/run.sh \
87+
| VERSION=v2 PARAMS="-url http://localhost:8001 -debug -stop-service-at-end" sh
88+
89+
.PHONY: async-contract-tests
90+
async-contract-tests: #! Run the SSE async contract test harness
91+
async-contract-tests: install-contract-tests-deps start-async-contract-test-service-bg run-async-contract-tests

README.md

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,13 @@ This package's primary purpose is to support the [LaunchDarkly SDK for Python](h
1414
* Setting read timeouts, custom headers, and other HTTP request properties.
1515
* Specifying that connections should be retried under circumstances where the standard EventSource behavior would not retry them, such as if the server returns an HTTP error status.
1616

17-
This is a synchronous implementation which blocks the caller's thread when reading events or reconnecting. By default, it uses `urllib3` to make HTTP requests, but it can be configured to read any input stream.
17+
The default `SSEClient` is a synchronous implementation which blocks the caller's thread when reading events or reconnecting. By default, it uses `urllib3` to make HTTP requests, but it can be configured to read any input stream.
18+
19+
An async implementation, `AsyncSSEClient`, is also available for use with `asyncio`-based applications. It uses `aiohttp` for HTTP and requires installing the optional `async` extra:
20+
21+
```
22+
pip install launchdarkly-eventsource[async]
23+
```
1824

1925
## Supported Python versions
2026

contract-tests/async_service.py

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
import json
2+
import logging
3+
import os
4+
import sys
5+
from logging.config import dictConfig
6+
7+
import aiohttp.web
8+
from async_stream_entity import AsyncStreamEntity
9+
10+
default_port = 8000
11+
12+
dictConfig({
13+
'version': 1,
14+
'formatters': {
15+
'default': {
16+
'format': '[%(asctime)s] [%(name)s] %(levelname)s: %(message)s',
17+
}
18+
},
19+
'handlers': {
20+
'console': {
21+
'class': 'logging.StreamHandler',
22+
'formatter': 'default'
23+
}
24+
},
25+
'root': {
26+
'level': 'INFO',
27+
'handlers': ['console']
28+
},
29+
})
30+
31+
global_log = logging.getLogger('testservice')
32+
33+
stream_counter = 0
34+
streams = {}
35+
36+
37+
async def handle_get_status(request):
38+
body = {
39+
'capabilities': [
40+
'comments',
41+
'headers',
42+
'last-event-id',
43+
'read-timeout',
44+
]
45+
}
46+
return aiohttp.web.Response(
47+
body=json.dumps(body),
48+
content_type='application/json',
49+
)
50+
51+
52+
async def handle_delete_stop(request):
53+
global_log.info("Test service has told us to exit")
54+
os._exit(0)
55+
56+
57+
async def handle_post_create_stream(request):
58+
global stream_counter, streams
59+
60+
options = json.loads(await request.read())
61+
62+
stream_counter += 1
63+
stream_id = str(stream_counter)
64+
resource_url = '/streams/%s' % stream_id
65+
66+
stream = AsyncStreamEntity(options, request.app['http_session'])
67+
streams[stream_id] = stream
68+
69+
return aiohttp.web.Response(status=201, headers={'Location': resource_url})
70+
71+
72+
async def handle_post_stream_command(request):
73+
stream_id = request.match_info['id']
74+
params = json.loads(await request.read())
75+
76+
stream = streams.get(stream_id)
77+
if stream is None:
78+
return aiohttp.web.Response(status=404)
79+
if not await stream.do_command(params.get('command')):
80+
return aiohttp.web.Response(status=400)
81+
return aiohttp.web.Response(status=204)
82+
83+
84+
async def handle_delete_stream(request):
85+
stream_id = request.match_info['id']
86+
87+
stream = streams.get(stream_id)
88+
if stream is None:
89+
return aiohttp.web.Response(status=404)
90+
await stream.close()
91+
return aiohttp.web.Response(status=204)
92+
93+
94+
async def on_startup(app):
95+
app['http_session'] = aiohttp.ClientSession()
96+
97+
98+
async def on_cleanup(app):
99+
await app['http_session'].close()
100+
101+
102+
def make_app():
103+
app = aiohttp.web.Application()
104+
app.router.add_get('/', handle_get_status)
105+
app.router.add_delete('/', handle_delete_stop)
106+
app.router.add_post('/', handle_post_create_stream)
107+
app.router.add_post('/streams/{id}', handle_post_stream_command)
108+
app.router.add_delete('/streams/{id}', handle_delete_stream)
109+
app.on_startup.append(on_startup)
110+
app.on_cleanup.append(on_cleanup)
111+
return app
112+
113+
114+
if __name__ == "__main__":
115+
port = default_port
116+
if sys.argv[len(sys.argv) - 1] != 'async_service.py':
117+
port = int(sys.argv[len(sys.argv) - 1])
118+
global_log.info('Listening on port %d', port)
119+
aiohttp.web.run_app(make_app(), host='0.0.0.0', port=port)
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
import asyncio
2+
import json
3+
import logging
4+
import os
5+
import sys
6+
import traceback
7+
8+
import aiohttp
9+
10+
# Import ld_eventsource from parent directory
11+
sys.path.insert(1, os.path.join(sys.path[0], '..'))
12+
from ld_eventsource.actions import Comment, Event, Fault # noqa: E402
13+
from ld_eventsource.async_client import AsyncSSEClient # noqa: E402
14+
from ld_eventsource.config.async_connect_strategy import \
15+
AsyncConnectStrategy # noqa: E402
16+
from ld_eventsource.config.error_strategy import ErrorStrategy # noqa: E402
17+
18+
19+
def millis_to_seconds(t):
20+
return None if t is None else t / 1000
21+
22+
23+
class AsyncStreamEntity:
24+
def __init__(self, options, http_session: aiohttp.ClientSession):
25+
self.options = options
26+
self.callback_url = options["callbackUrl"]
27+
self.log = logging.getLogger(options["tag"])
28+
self.closed = False
29+
self.callback_counter = 0
30+
self.sse = None
31+
self._http_session = http_session
32+
asyncio.create_task(self.run())
33+
34+
async def run(self):
35+
stream_url = self.options["streamUrl"]
36+
try:
37+
self.log.info('Opening stream from %s', stream_url)
38+
39+
request_options = {}
40+
if self.options.get("readTimeoutMs") is not None:
41+
request_options["timeout"] = aiohttp.ClientTimeout(
42+
sock_read=millis_to_seconds(self.options.get("readTimeoutMs"))
43+
)
44+
45+
connect = AsyncConnectStrategy.http(
46+
url=stream_url,
47+
headers=self.options.get("headers"),
48+
aiohttp_request_options=request_options if request_options else None,
49+
)
50+
sse = AsyncSSEClient(
51+
connect,
52+
initial_retry_delay=millis_to_seconds(self.options.get("initialDelayMs")),
53+
last_event_id=self.options.get("lastEventId"),
54+
error_strategy=ErrorStrategy.from_lambda(
55+
lambda _: (
56+
ErrorStrategy.FAIL if self.closed else ErrorStrategy.CONTINUE,
57+
None,
58+
)
59+
),
60+
logger=self.log,
61+
)
62+
self.sse = sse
63+
async for item in sse.all:
64+
if isinstance(item, Event):
65+
self.log.info('Received event from stream (%s)', item.event)
66+
await self.send_message(
67+
{
68+
'kind': 'event',
69+
'event': {
70+
'type': item.event,
71+
'data': item.data,
72+
'id': item.last_event_id,
73+
},
74+
}
75+
)
76+
elif isinstance(item, Comment):
77+
self.log.info('Received comment from stream: %s', item.comment)
78+
await self.send_message({'kind': 'comment', 'comment': item.comment})
79+
elif isinstance(item, Fault):
80+
if self.closed:
81+
break
82+
if item.error:
83+
self.log.info('Received error from stream: %s', item.error)
84+
await self.send_message({'kind': 'error', 'error': str(item.error)})
85+
except Exception as e:
86+
self.log.info('Received error from stream: %s', e)
87+
self.log.info(traceback.format_exc())
88+
await self.send_message({'kind': 'error', 'error': str(e)})
89+
90+
async def do_command(self, command: str) -> bool:
91+
self.log.info('Test service sent command: %s' % command)
92+
# currently we support no special commands
93+
return False
94+
95+
async def send_message(self, message):
96+
if self.closed:
97+
return
98+
self.callback_counter += 1
99+
callback_url = "%s/%d" % (self.callback_url, self.callback_counter)
100+
try:
101+
async with self._http_session.post(
102+
callback_url,
103+
data=json.dumps(message),
104+
headers={'Content-Type': 'application/json'},
105+
) as resp:
106+
if resp.status >= 300 and not self.closed:
107+
self.log.error('Callback request returned HTTP error %d', resp.status)
108+
except Exception as e:
109+
if not self.closed:
110+
self.log.error('Callback request failed: %s', e)
111+
112+
async def close(self):
113+
self.closed = True
114+
if self.sse is not None:
115+
await self.sse.close()
116+
self.log.info('Test ended')

docs/conf.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,3 +170,6 @@
170170
autodoc_default_options = {
171171
'undoc-members': False
172172
}
173+
174+
# aiohttp is an optional dependency not installed during doc builds
175+
autodoc_mock_imports = ['aiohttp']

docs/index.rst

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ LaunchDarkly Python SSE Client
33

44
This is the API reference for the `launchdarkly-eventsource <https://github.com/launchdarkly/python-eventsource/>`_ package, a `Server-Sent Events <https://html.spec.whatwg.org/multipage/server-sent-events.html>`_ client for Python. This package is used internally by the `LaunchDarkly Python SDK <https://github.com/launchdarkly/python-server-sdk>`_, but may also be useful for other purposes.
55

6-
76
ld_eventsource module
87
---------------------
98

@@ -37,3 +36,21 @@ ld_eventsource.errors module
3736
:members:
3837
:special-members: __init__
3938
:show-inheritance:
39+
40+
41+
ld_eventsource.async_client module
42+
-----------------------------------
43+
44+
.. automodule:: ld_eventsource.async_client
45+
:members:
46+
:special-members: __init__
47+
:show-inheritance:
48+
49+
50+
ld_eventsource.config.async_connect_strategy module
51+
----------------------------------------------------
52+
53+
.. automodule:: ld_eventsource.config.async_connect_strategy
54+
:members:
55+
:special-members: __init__
56+
:show-inheritance:

ld_eventsource/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,10 @@
11
from ld_eventsource.sse_client import *
2+
3+
4+
def __getattr__(name):
5+
# Lazily import AsyncSSEClient so that aiohttp (an optional dependency)
6+
# is never imported for sync-only users who don't have it installed.
7+
if name == 'AsyncSSEClient':
8+
from ld_eventsource.async_client import AsyncSSEClient
9+
return AsyncSSEClient
10+
raise AttributeError(f"module 'ld_eventsource' has no attribute {name!r}")

0 commit comments

Comments
 (0)