Skip to content

Commit c6c001e

Browse files
committed
allow queue name to be specified by libraries, add is_root option to configuration
also add nonroot handling for AMQP protocol Signed-off-by: Lance-Drane <Lance-Drane@users.noreply.github.com>
1 parent 7d1f553 commit c6c001e

7 files changed

Lines changed: 75 additions & 32 deletions

File tree

CHANGELOG.md

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,13 @@
22

33
We follow [Common Changelog](https://common-changelog.org/) formatting for this document.
44

5-
## [0.9.0] - 2026-02-11
5+
## [0.9.1] - 2026-02-25
66

77
Initial reorganization of SDK packaging.
88

9-
[0.9.0]: https://github.com/INTERSECT-SDK/python-sdk-common/releases/tag/0.9.0
9+
### Changed
10+
11+
- Added new argument to `ControlPlaneManager.add_subscription_channel()` which specifies a queue name to use.
12+
- Add `is_root` option to `ControlPlaneConfig` in preparation for Registry Service.
13+
14+
[0.9.1]: https://github.com/INTERSECT-SDK/python-sdk-common/releases/tag/0.9.1

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ readme = "README.md"
1818
license = { text = "BSD-3-Clause" }
1919
requires-python = ">=3.10,<4.0"
2020
keywords = ["intersect"]
21-
version = "0.9.0"
21+
version = "0.9.1a"
2222
classifiers = [
2323
"Programming Language :: Python :: 3",
2424
"Programming Language :: Python :: 3.10",

src/intersect_sdk_common/config.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,21 @@ class ControlPlaneConfig:
122122
NOTE: INTERSECT currently only supports AMQP and MQTT.
123123
"""
124124

125+
# TODO default this to False once the registry service is in place
126+
is_root: bool = True
127+
"""
128+
Whether or not the broker credentials are for connecting as a root user.
129+
130+
This should be True IF:
131+
- You are a Core Service
132+
- You are an SDK Client or Service, but your message broker is hosted locally.
133+
134+
This should be False IF:
135+
- You are an SDK Client or Service, and the broker you're connected to is remote.
136+
137+
This is important for specific implementations; the Registry Service configures queues for microservices, but Core Services configure their own queues themselves.
138+
"""
139+
125140

126141
@dataclass
127142
class DataStoreConfig:

src/intersect_sdk_common/control_plane/brokers/amqp_client.py

Lines changed: 40 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ def __init__(
9696
username: str,
9797
password: str,
9898
topics_to_handlers: Callable[[], dict[str, TopicHandler]],
99+
is_root: bool,
99100
) -> None:
100101
"""The default constructor.
101102
@@ -105,6 +106,7 @@ def __init__(
105106
username: username credentials for AMQP broker
106107
password: password credentials for AMQP broker
107108
topics_to_handlers: callback function which gets the topic to handler map from the channel manager
109+
is_root: Whether or not the client can configure exchanges and queues themselves (core services), or if this must be delegated to a Core Service (SDK Clients/Services)
108110
"""
109111
self._connection_params = pika.ConnectionParameters(
110112
host=host,
@@ -118,6 +120,8 @@ def __init__(
118120
retry_delay=0.5,
119121
)
120122

123+
self._is_root = is_root
124+
121125
# The pika connection to the broker
122126
self._connection: pika.adapters.SelectConnection = None
123127
self._channel_in: Channel = None
@@ -225,7 +229,6 @@ def subscribe(self, topic: str, persist: bool) -> None:
225229
persist: If True, we will create an idempotent queue name which should persist
226230
even on broker or application shutdown. If False, we will allow the server to create a unique
227231
queue name, and the queue will be destroyed once the associated channel is closed.
228-
229232
"""
230233
topic = _hierarchy_2_amqp(topic)
231234
cb = functools.partial(
@@ -365,13 +368,17 @@ def _on_output_channel_open(self, channel: Channel) -> None:
365368
self._channel_out = channel
366369
cb = functools.partial(self._on_channel_closed, channel_num=channel_num)
367370
self._channel_out.add_on_close_callback(cb)
368-
# producer flag should first make sure the exchange exists before publishing
369-
channel.exchange_declare(
370-
exchange=_INTERSECT_MESSAGE_EXCHANGE,
371-
exchange_type='topic',
372-
durable=True,
373-
callback=lambda _frame: self._channel_flags.set_nth_flag(channel_num),
374-
)
371+
if self._is_root:
372+
# if root, producer flag should first make sure the exchange exists before publishing
373+
channel.exchange_declare(
374+
exchange=_INTERSECT_MESSAGE_EXCHANGE,
375+
exchange_type='topic',
376+
durable=True,
377+
callback=lambda _frame: self._channel_flags.set_nth_flag(channel_num),
378+
)
379+
else:
380+
# nonroot producers can assume they may publish immediately
381+
self._channel_flags.set_nth_flag(channel_num)
375382
logger.info('AMQP: output channel ready')
376383

377384
# CONSUMER #
@@ -382,10 +389,26 @@ def _on_input_channel_open(self, channel: Channel) -> None:
382389
self._channel_flags.set_nth_flag(channel_num)
383390
cb_1 = functools.partial(self._on_channel_closed, channel_num=channel_num)
384391
self._channel_in.add_on_close_callback(cb_1)
385-
cb_2 = functools.partial(self._on_exchange_declareok, channel=channel)
386-
channel.exchange_declare(
387-
exchange=_INTERSECT_MESSAGE_EXCHANGE, exchange_type='topic', durable=True, callback=cb_2
388-
)
392+
if self._is_root:
393+
cb_2 = functools.partial(self._on_exchange_declareok, channel=channel)
394+
channel.exchange_declare(
395+
exchange=_INTERSECT_MESSAGE_EXCHANGE,
396+
exchange_type='topic',
397+
durable=True,
398+
callback=cb_2,
399+
)
400+
else:
401+
for topic, topic_handler in self._topics_to_handlers().items():
402+
amqp_topic = _hierarchy_2_amqp(topic)
403+
cb = functools.partial(
404+
self._setup_basic_qos,
405+
None,
406+
channel=channel,
407+
topic=amqp_topic,
408+
persist=topic_handler.topic_persist,
409+
queue_name=topic_handler.queue_name,
410+
)
411+
self._connection.ioloop.add_callback_threadsafe(cb)
389412

390413
def _on_exchange_declareok(self, _unused_frame: Frame, channel: Channel) -> None:
391414
"""Create a queue on the broker (called from AMQP).
@@ -404,7 +427,7 @@ def _on_exchange_declareok(self, _unused_frame: Frame, channel: Channel) -> None
404427
channel=channel,
405428
topic=amqp_topic,
406429
persist=topic_handler.topic_persist,
407-
queue_name=topic_handler.queue_name_generator(topic),
430+
queue_name=topic_handler.queue_name,
408431
)
409432
self._connection.ioloop.add_callback_threadsafe(cb)
410433

@@ -448,7 +471,7 @@ def _on_queue_declareok(
448471
"""
449472
queue_name = frame.method.queue
450473
cb = functools.partial(
451-
self._on_queue_bindok,
474+
self._setup_basic_qos,
452475
channel=channel,
453476
topic=topic,
454477
queue_name=queue_name,
@@ -461,9 +484,9 @@ def _on_queue_declareok(
461484
callback=cb,
462485
)
463486

464-
def _on_queue_bindok(
487+
def _setup_basic_qos(
465488
self,
466-
_unused_frame: Frame,
489+
_unused_frame: Frame | None,
467490
channel: Channel,
468491
topic: str,
469492
queue_name: str,
@@ -472,7 +495,7 @@ def _on_queue_bindok(
472495
"""Sets up basic QOS on the current channel.
473496
474497
Args:
475-
_unused_frame: AMQP response from binding to the queue. Ignored.
498+
_unused_frame: AMQP response from binding to the queue. Ignored. Also explicitly set to "None" if called from nonroot configuration.
476499
channel: The Channel being instantiated.
477500
topic: Name of the topic on the broker.
478501
queue_name: The name of the queue on the AMQP broker.

src/intersect_sdk_common/control_plane/control_plane_manager.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ def _create_control_provider(
2424
username=config.username,
2525
password=config.password,
2626
topics_to_handlers=topic_handler_callback,
27+
is_root=config.is_root,
2728
)
2829

2930
# MQTT
@@ -44,7 +45,6 @@ class ControlPlaneManager:
4445
def __init__(
4546
self,
4647
control_configs: list[ControlPlaneConfig],
47-
queue_name_generator: Callable[[str], str],
4848
) -> None:
4949
"""Basic constructor.
5050
@@ -61,10 +61,9 @@ def __init__(
6161
self._ready = False
6262
# topics_to_handlers are managed here and transcend connections/disconnections to the broker
6363
self._topics_to_handlers: dict[str, TopicHandler] = {}
64-
self._queue_name_generator = queue_name_generator
6564

6665
def add_subscription_channel(
67-
self, channel: str, callbacks: set[MessageCallback], persist: bool
66+
self, channel: str, callbacks: set[MessageCallback], persist: bool, queue_name: str
6867
) -> None:
6968
"""Start listening for messages on a channel on all configured brokers.
7069
@@ -79,14 +78,17 @@ def add_subscription_channel(
7978
callbacks: functions to call on subscribing to a message
8079
persist: if True, expect the associated message queue to live long; if False, it will only live the duration of the application.
8180
Any queue associated with a Service should always set this to True. Clients will need to subscribe to their own, temporary queues, and should set this to False.
81+
queue_name: the name of the queue to subscribe to. This is generally hardcoded for Core Services, but autogenerated for SDK Services and Clients.
8282
"""
8383
topic_handler = self._topics_to_handlers.get(channel)
8484
if topic_handler is None:
85-
topic_handler = TopicHandler(persist, self._queue_name_generator)
85+
topic_handler = TopicHandler(persist, queue_name)
8686
topic_handler.callbacks |= callbacks
8787
self._topics_to_handlers[channel] = topic_handler
8888
else:
8989
topic_handler.callbacks |= callbacks
90+
topic_handler.topic_persist = persist
91+
topic_handler.queue_name = queue_name
9092
if self.is_connected():
9193
for provider in self._control_providers:
9294
provider.subscribe(channel, persist)
Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
"""Attributes associated with a specific pub/sub topic."""
22

3-
from collections.abc import Callable
4-
53
from .definitions import MessageCallback
64

75

@@ -16,16 +14,16 @@ class TopicHandler:
1614
topic_persist: bool
1715
"""Whether or not a topic queue is expected to persist on the message broker."""
1816

19-
queue_name_generator: Callable[[str], str]
20-
"""A pointer to the function which generates the queue name.."""
17+
queue_name: str
18+
"""The name of the queue to subscribe to for this topic."""
2119

22-
def __init__(self, topic_persist: bool, queue_name_generator: Callable[[str], str]) -> None:
20+
def __init__(self, topic_persist: bool, queue_name: str) -> None:
2321
"""Initialize a TopicHandler instance.
2422
2523
Args:
2624
topic_persist: Whether the topic queue is expected to persist on the message broker.
27-
queue_name_generator: A pointer to the function which generates the queue name.
25+
queue_name: The name of the queue to subscribe to for this topic.
2826
"""
2927
self.callbacks = set()
3028
self.topic_persist = topic_persist
31-
self.queue_name_generator = queue_name_generator
29+
self.queue_name = queue_name

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)