Skip to content

Commit cbd62cc

Browse files
committed
make unsubscribe & unregister public methods
1 parent b6e0ef4 commit cbd62cc

File tree

2 files changed

+56
-48
lines changed

2 files changed

+56
-48
lines changed

xconn/async_session.py

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,7 @@ def __init__(self, registration_id: int, session: AsyncSession):
2525
self._session = session
2626

2727
async def unregister(self) -> None:
28-
if not await self._session._base_session.transport.is_connected():
29-
raise Exception("cannot unregister procedure: session not established")
30-
31-
unregister = messages.Unregister(messages.UnregisterFields(self._session._idgen.next(), self.registration_id))
32-
data = self._session._session.send_message(unregister)
33-
34-
f: Future = Future()
35-
self._session._unregister_requests[unregister.request_id] = types.UnregisterRequest(f, self.registration_id)
36-
await self._session._base_session.send(data)
37-
38-
return await f
28+
return await self._session.unregister(self)
3929

4030

4131
@dataclass
@@ -50,19 +40,7 @@ def __init__(self, subscription_id: int, session: AsyncSession):
5040
self._session = session
5141

5242
async def unsubscribe(self) -> None:
53-
if not await self._session._base_session.transport.is_connected():
54-
raise Exception("cannot unsubscribe topic: session not established")
55-
56-
unsubscribe = messages.Unsubscribe(
57-
messages.UnsubscribeFields(self._session._idgen.next(), self.subscription_id)
58-
)
59-
data = self._session._session.send_message(unsubscribe)
60-
61-
f: Future = Future()
62-
self._session._unsubscribe_requests[unsubscribe.request_id] = types.UnsubscribeRequest(f, self.subscription_id)
63-
await self._session._base_session.send(data)
64-
65-
return await f
43+
return await self._session.unsubscribe(self)
6644

6745

6846
class AsyncSession:
@@ -237,6 +215,19 @@ async def call(
237215

238216
return await f
239217

218+
async def unregister(self, reg: Registration) -> None:
219+
if not await self._base_session.transport.is_connected():
220+
raise Exception("cannot unregister procedure: session not established")
221+
222+
unregister = messages.Unregister(messages.UnregisterFields(self._idgen.next(), reg.registration_id))
223+
data = self._session.send_message(unregister)
224+
225+
f: Future = Future()
226+
self._unregister_requests[unregister.request_id] = types.UnregisterRequest(f, reg.registration_id)
227+
await self._base_session.send(data)
228+
229+
return await f
230+
240231
async def subscribe(
241232
self, topic: str, event_handler: Callable[[types.Event], Awaitable[None]], options: dict | None = None
242233
) -> Subscription:
@@ -266,6 +257,19 @@ async def publish(
266257

267258
await self._base_session.send(data)
268259

260+
async def unsubscribe(self, sub: Subscription) -> None:
261+
if not await self._base_session.transport.is_connected():
262+
raise Exception("cannot unsubscribe topic: session not established")
263+
264+
unsubscribe = messages.Unsubscribe(messages.UnsubscribeFields(self._idgen.next(), sub.subscription_id))
265+
data = self._session.send_message(unsubscribe)
266+
267+
f: Future = Future()
268+
self._unsubscribe_requests[unsubscribe.request_id] = types.UnsubscribeRequest(f, sub.subscription_id)
269+
await self._base_session.send(data)
270+
271+
return await f
272+
269273
async def leave(self) -> None:
270274
self._goodbye_request = Future()
271275

xconn/session.py

Lines changed: 28 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -24,17 +24,7 @@ def __init__(self, registration_id: int, session: Session):
2424
self._session = session
2525

2626
def unregister(self) -> None:
27-
if not self._session._base_session.transport.is_connected():
28-
raise Exception("cannot unregister procedure: session not established")
29-
30-
unregister = messages.Unregister(messages.UnregisterFields(self._session._idgen.next(), self.registration_id))
31-
data = self._session._session.send_message(unregister)
32-
33-
f: Future = Future()
34-
self._session._unregister_requests[unregister.request_id] = types.UnregisterRequest(f, self.registration_id)
35-
self._session._base_session.send(data)
36-
37-
f.result()
27+
self._session.unregister(self)
3828

3929

4030
@dataclass
@@ -49,19 +39,7 @@ def __init__(self, subscription_id: int, session: Session):
4939
self._session = session
5040

5141
def unsubscribe(self) -> None:
52-
if not self._session._base_session.transport.is_connected():
53-
raise Exception("cannot unsubscribe topic: session not established")
54-
55-
unsubscribe = messages.Unsubscribe(
56-
messages.UnsubscribeFields(self._session._idgen.next(), self.subscription_id)
57-
)
58-
data = self._session._session.send_message(unsubscribe)
59-
60-
f: Future = Future()
61-
self._session._unsubscribe_requests[unsubscribe.request_id] = types.UnsubscribeRequest(f, self.subscription_id)
62-
self._session._base_session.send(data)
63-
64-
f.result()
42+
self._session.unsubscribe(self)
6543

6644

6745
class Session:
@@ -252,6 +230,19 @@ def register(
252230

253231
return f.result()
254232

233+
def unregister(self, reg: Registration) -> None:
234+
if not self._base_session.transport.is_connected():
235+
raise Exception("cannot unregister procedure: session not established")
236+
237+
unregister = messages.Unregister(messages.UnregisterFields(self._idgen.next(), reg.registration_id))
238+
data = self._session.send_message(unregister)
239+
240+
f: Future = Future()
241+
self._unregister_requests[unregister.request_id] = types.UnregisterRequest(f, reg.registration_id)
242+
self._base_session.send(data)
243+
244+
f.result()
245+
255246
def subscribe(self, topic: str, event_handler: Callable[[types.Event], None], options: dict = None) -> Subscription:
256247
subscribe = messages.Subscribe(messages.SubscribeFields(self._idgen.next(), topic, options=options))
257248
data = self._session.send_message(subscribe)
@@ -274,6 +265,19 @@ def publish(self, topic: str, args: list[Any] = None, kwargs: dict = None, optio
274265

275266
self._base_session.send(data)
276267

268+
def unsubscribe(self, sub: Subscription) -> None:
269+
if not self._base_session.transport.is_connected():
270+
raise Exception("cannot unsubscribe topic: session not established")
271+
272+
unsubscribe = messages.Unsubscribe(messages.UnsubscribeFields(self._idgen.next(), sub.subscription_id))
273+
data = self._session.send_message(unsubscribe)
274+
275+
f: Future = Future()
276+
self._unsubscribe_requests[unsubscribe.request_id] = types.UnsubscribeRequest(f, sub.subscription_id)
277+
self._base_session.send(data)
278+
279+
f.result()
280+
277281
def leave(self):
278282
self._goodbye_request = Future()
279283

0 commit comments

Comments
 (0)