Skip to content

Commit c681856

Browse files
committed
fix wildcard issue, add integration test and CI/CD
Signed-off-by: Lance-Drane <Lance-Drane@users.noreply.github.com>
1 parent 44eea21 commit c681856

11 files changed

Lines changed: 340 additions & 62 deletions

File tree

.github/workflows/ci.yml

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,15 +36,54 @@ jobs:
3636
3737
# Run unit tests only on Windows/MacOS, we can run the full test suite on Linux
3838
test-unit:
39-
name: Unit tests
39+
name: Unit tests for Windows, MacOS
4040
strategy:
4141
matrix:
4242
python-version: ["3.10", "3.11", "3.12", "3.13"]
4343
os:
44-
- ubuntu-latest
4544
- macos-latest
4645
- windows-latest
4746
runs-on: ${{ matrix.os }}
47+
steps:
48+
- uses: actions/checkout@v4
49+
- name: Setup UV
50+
uses: astral-sh/setup-uv@v7
51+
with:
52+
python-version: ${{ matrix.python-version }}
53+
- name: Install dependencies
54+
run: |
55+
uv sync --locked --all-extras --all-groups
56+
- name: Run tests
57+
run: uv run pytest tests/unit/ --cov=src/intersect_sdk_common/ --cov-report=html:reports/htmlcov/ --cov-report=xml:reports/coverage_report.xml --junitxml=reports/junit.xml
58+
59+
test-suite:
60+
name: Full test suite with RabbitMQ and MinIO on Linux
61+
strategy:
62+
matrix:
63+
python-version: ["3.10", "3.11", "3.12", "3.13"]
64+
runs-on: ubuntu-latest
65+
services:
66+
rabbitmq:
67+
image: "bitnamilegacy/rabbitmq:4.1"
68+
env:
69+
# space-delimited list of plugins
70+
RABBITMQ_PLUGINS: "rabbitmq_mqtt"
71+
RABBITMQ_USERNAME: "intersect_username"
72+
RABBITMQ_PASSWORD: "intersect_password"
73+
# misleading env name, this is needed to set "loopback_users.$USERNAME = false"
74+
RABBITMQ_MANAGEMENT_ALLOW_WEB_ACCESS: "yes"
75+
ports:
76+
- "1883:1883" # MQTT port
77+
- "5672:5672" # AMQP port
78+
minio:
79+
image: "bitnamilegacy/minio:2024.6.4"
80+
env:
81+
MINIO_ROOT_USER: AKIAIOSFODNN7EXAMPLE
82+
MINIO_ROOT_PASSWORD: wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
83+
MINIO_SKIP_CLIENT: "yes"
84+
ports:
85+
- "9000:9000" # main MINIO API port
86+
- "9001:9001" # web UI
4887
steps:
4988
- uses: actions/checkout@v4
5089
- name: Setup UV

src/intersect_sdk_common/control_plane/brokers/amqp_client.py

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,12 @@
2323
from .broker_client import BrokerClient
2424

2525
if TYPE_CHECKING:
26-
from collections.abc import Callable
27-
2826
from pika.channel import Channel
2927
from pika.frame import Frame
3028
from pika.spec import Basic, BasicProperties
3129

30+
from ..control_plane_manager import ControlPlaneManager
3231
from ..definitions import MessageCallback
33-
from ..topic_handler import TopicHandler
3432

3533

3634
_AMQP_MAX_RETRIES = 10
@@ -95,8 +93,7 @@ def __init__(
9593
port: int,
9694
username: str,
9795
password: str,
98-
topics_to_handlers: Callable[[], dict[str, TopicHandler]],
99-
find_wildcard_handler: Callable[[str], TopicHandler | None],
96+
control_plane_manager: ControlPlaneManager,
10097
is_root: bool,
10198
) -> None:
10299
"""The default constructor.
@@ -106,8 +103,7 @@ def __init__(
106103
port: port number of AMQP broker
107104
username: username credentials for AMQP broker
108105
password: password credentials for AMQP broker
109-
topics_to_handlers: callback function which gets the topic to handler map from the channel manager
110-
find_wildcard_handler: callback function which gets the wildcard topic handler from the channel manager
106+
control_plane_manager: reference to the ControlPlaneManager instance, remember to ONLY use functions which do not mutate state
111107
is_root: Whether or not the client can configure exchanges and queues themselves (core services), or if this must be delegated to a Core Service (SDK Clients/Services)
112108
"""
113109
self._connection_params = pika.ConnectionParameters(
@@ -132,11 +128,11 @@ def __init__(
132128
self._thread: threading.Thread | None = None
133129

134130
# Callback to the topics_to_handler list inside of
135-
self._topics_to_handlers = topics_to_handlers
136-
self._find_wildcard_handler = find_wildcard_handler
131+
self._control_plane_manager = control_plane_manager
137132

138133
# mapping of topics to callables which can unsubscribe from the topic
139134
self._topics_to_consumer_tags: dict[str, _ConsumerTagInfo] = {}
135+
"""NOTE: separators for topics in this object is '.' , not '/' """
140136
self._consumer_tags_to_threads: dict[str, threading.Thread] = {}
141137

142138
self._should_disconnect = False
@@ -390,7 +386,7 @@ def _on_output_channel_open(self, channel: Channel) -> None:
390386
def _on_input_channel_open(self, channel: Channel) -> None:
391387
channel_num = 1
392388
self._channel_in = channel
393-
# consumer channel flag can be set immediately
389+
# consumer channel flag can be set immediately (NOTE: this means that code which simultaneously creates Clients and Services will need to manually wait, to ensure that each application has made the basic_consume call to the broker before messages are published)
394390
self._channel_flags.set_nth_flag(channel_num)
395391
cb_1 = functools.partial(self._on_channel_closed, channel_num=channel_num)
396392
self._channel_in.add_on_close_callback(cb_1)
@@ -403,17 +399,7 @@ def _on_input_channel_open(self, channel: Channel) -> None:
403399
callback=cb_2,
404400
)
405401
else:
406-
for topic, topic_handler in self._topics_to_handlers().items():
407-
amqp_topic = _hierarchy_2_amqp(topic)
408-
cb = functools.partial(
409-
self._setup_basic_qos,
410-
None,
411-
channel=channel,
412-
topic=amqp_topic,
413-
persist=topic_handler.topic_persist,
414-
queue_name=topic_handler.queue_name,
415-
)
416-
self._connection.ioloop.add_callback_threadsafe(cb)
402+
self._create_all_queues(channel)
417403

418404
def _on_exchange_declareok(self, _unused_frame: Frame, channel: Channel) -> None:
419405
"""Create a queue on the broker (called from AMQP).
@@ -425,7 +411,17 @@ def _on_exchange_declareok(self, _unused_frame: Frame, channel: Channel) -> None
425411
_unused_frame: response from declaring the exchange on the broker (irrelevant).
426412
channel: The Channel being instantiated.
427413
"""
428-
for topic, topic_handler in self._topics_to_handlers().items():
414+
self._create_all_queues(channel)
415+
416+
def _create_all_queues(self, channel: Channel) -> None:
417+
"""Create queues for all subscription channels on the broker.
418+
419+
This should only be called once we've verified that the exchange exists.
420+
421+
Args:
422+
channel: The Channel being instantiated.
423+
"""
424+
for topic, topic_handler in self._control_plane_manager.get_all_subscription_channels():
429425
amqp_topic = _hierarchy_2_amqp(topic)
430426
cb = functools.partial(
431427
self._create_queue,
@@ -456,6 +452,7 @@ def _create_queue(self, channel: Channel, topic: str, persist: bool, queue_name:
456452
queue=queue_name
457453
if persist
458454
else '', # if we're transient, let the broker generate a name for us
455+
passive=not self._is_root,
459456
durable=persist,
460457
exclusive=not persist, # transient queues can be exclusive
461458
callback=cb,
@@ -597,17 +594,25 @@ def _consume_message(
597594
channel.basic_ack(basic_deliver.delivery_tag)
598595
return
599596

600-
tth_key = _amqp_2_hierarchy(basic_deliver.routing_key)
601-
topic_handler = self._topics_to_handlers().get(tth_key)
597+
amqp_routing_key_rep = basic_deliver.routing_key
598+
manager_routing_key_rep = _amqp_2_hierarchy(basic_deliver.routing_key)
599+
topic_handler = self._control_plane_manager.get_non_wildcard_subscription_channels().get(
600+
manager_routing_key_rep
601+
)
602602
if not topic_handler:
603603
# we may have included the topic in one of our wildcard handlers
604-
topic_handler = self._find_wildcard_handler(tth_key)
604+
wildcard_result = self._control_plane_manager.get_wildcard_topic_and_topic_handler(
605+
manager_routing_key_rep
606+
)
607+
if wildcard_result:
608+
manager_routing_key_rep, topic_handler = wildcard_result
609+
amqp_routing_key_rep = _hierarchy_2_amqp(manager_routing_key_rep)
605610
if topic_handler:
606-
consumer_tag_info = self._topics_to_consumer_tags.get(basic_deliver.routing_key)
611+
consumer_tag_info = self._topics_to_consumer_tags.get(amqp_routing_key_rep)
607612
if not consumer_tag_info:
608613
logger.error(
609614
'Could not fetch consumer tag for topic %s, please inform an INTERSECT-SDK developer that you saw this message',
610-
tth_key,
615+
amqp_routing_key_rep,
611616
)
612617
return
613618
while not consumer_tag_info.consumer_tag_obtained():

src/intersect_sdk_common/control_plane/brokers/mqtt_client.py

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,10 @@
2222
from .broker_client import BrokerClient
2323

2424
if TYPE_CHECKING:
25-
from collections.abc import Callable
26-
2725
from paho.mqtt.client import DisconnectFlags
2826
from paho.mqtt.reasoncodes import ReasonCode
2927

30-
from ..topic_handler import TopicHandler
28+
from ..control_plane_manager import ControlPlaneManager
3129

3230

3331
_MQTT_MAX_RETRIES = 10
@@ -37,7 +35,8 @@
3735
# this will be a breaking change, so only add it when ready to break
3836
def _hierarchy_2_mqtt(hierarchy: str) -> str:
3937
"""Take the hierarchy string format saved in the Service and map it to the MQTT topic format. Currently just covers wildcards."""
40-
return hierarchy.replace('#', '+')
38+
# return hierarchy.replace('#', '+')
39+
return hierarchy.replace('*', '+')
4140

4241

4342
class MQTTClient(BrokerClient):
@@ -61,8 +60,7 @@ def __init__(
6160
port: int,
6261
username: str,
6362
password: str,
64-
topics_to_handlers: Callable[[], dict[str, TopicHandler]],
65-
find_wildcard_handler: Callable[[str], TopicHandler | None],
63+
control_plane_manager: ControlPlaneManager,
6664
uid: str | None = None,
6765
) -> None:
6866
"""The default constructor.
@@ -72,8 +70,7 @@ def __init__(
7270
port: port number of MQTT broker
7371
username: username credentials for MQTT broker
7472
password: password credentials for MQTT broker
75-
topics_to_handlers: callback function which gets the topic to handler map from the channel manager
76-
find_wildcard_handler: callback function which gets the wildcard topic handler from the channel manager
73+
control_plane_manager: reference to the ControlPlaneManager instance, remember to ONLY use functions which do not mutate state
7774
uid: A string representing the unique id to identify the client.
7875
"""
7976
# Unique id for the MQTT broker to associate this client with
@@ -97,8 +94,7 @@ def __init__(
9794
self._connected_flag = threading.Event()
9895

9996
# ConnectionManager callable state
100-
self._topics_to_handlers = topics_to_handlers
101-
self._find_wildcard_handler = find_wildcard_handler
97+
self._control_plane_manager = control_plane_manager
10298

10399
# MQTT v3.1.1 automatically downgrades a QOS which is too high (good), but MQTT v5 will terminate the connection (bad)
104100
# see https://github.com/rabbitmq/rabbitmq-server/discussions/11842
@@ -204,10 +200,17 @@ def _on_message(
204200
message: MQTT message
205201
"""
206202
# NOTE: should not need to convert the MQTT topic to the protocol agnostic representation, as we only change the wildcard format (which won't show up as the message topic)
207-
topic_handler = self._topics_to_handlers().get(message.topic)
203+
inmem_topic = message.topic
204+
topic_handler = self._control_plane_manager.get_non_wildcard_subscription_channels().get(
205+
inmem_topic
206+
)
208207
if not topic_handler:
209208
# we may have included the topic in one of our wildcard handlers
210-
topic_handler = self._find_wildcard_handler(message.topic)
209+
wildcard_result = self._control_plane_manager.get_wildcard_topic_and_topic_handler(
210+
inmem_topic
211+
)
212+
if wildcard_result:
213+
_, topic_handler = wildcard_result
211214
# Note that if we return prior to the callback, there will be no reply message
212215
if not topic_handler:
213216
logger.warning('Incompatible message topic %s, rejecting message', message.topic)
@@ -298,7 +301,7 @@ def _handle_connect(
298301
self._max_supported_qos = properties.MaximumQoS
299302

300303
self._connected_flag.set()
301-
for topic, topic_handler in self._topics_to_handlers().items():
304+
for topic, topic_handler in self._control_plane_manager.get_all_subscription_channels():
302305
self.subscribe(topic, topic_handler.topic_persist)
303306
else:
304307
# This will generally suggest a misconfiguration

src/intersect_sdk_common/control_plane/control_plane_manager.py

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""Wrapper around interacting with the control plane, which may be multiple brokers."""
22

3+
import itertools
34
import re
4-
from collections.abc import Callable
55

66
from ..config import ControlPlaneConfig
77
from ..exceptions import IntersectSetupError
@@ -23,8 +23,7 @@
2323

2424
def _create_control_provider(
2525
config: ControlPlaneConfig,
26-
topic_handler_callback: Callable[[], dict[str, TopicHandler]],
27-
find_wildcard_handler: Callable[[str], TopicHandler | None],
26+
control_plane_manager: 'ControlPlaneManager',
2827
) -> BrokerClient:
2928
if config.protocol == 'amqp0.9.1':
3029
from .brokers.amqp_client import ( # noqa: PLC0415 (lazy load all AMQP modules)
@@ -36,8 +35,7 @@ def _create_control_provider(
3635
port=config.port or 5672,
3736
username=config.username,
3837
password=config.password,
39-
topics_to_handlers=topic_handler_callback,
40-
find_wildcard_handler=find_wildcard_handler,
38+
control_plane_manager=control_plane_manager,
4139
is_root=config.is_root,
4240
)
4341

@@ -49,8 +47,7 @@ def _create_control_provider(
4947
port=config.port or 1883,
5048
username=config.username,
5149
password=config.password,
52-
topics_to_handlers=topic_handler_callback,
53-
find_wildcard_handler=find_wildcard_handler,
50+
control_plane_manager=control_plane_manager,
5451
)
5552

5653

@@ -68,10 +65,7 @@ def __init__(
6865
queue_name_generator should be a hardcoded value for Core Services, the SDK should provide its own function to generate queue names.
6966
"""
7067
self._control_providers = [
71-
_create_control_provider(
72-
config, self.get_subscription_channels, self.get_wildcard_topic_handler
73-
)
74-
for config in control_configs
68+
_create_control_provider(config, self) for config in control_configs
7569
]
7670

7771
# flag which indicates if we SHOULD be connected.
@@ -142,29 +136,47 @@ def remove_subscription_channel(self, channel: str) -> bool:
142136
provider.unsubscribe(channel)
143137
return True
144138

145-
def get_subscription_channels(self) -> dict[str, TopicHandler]:
146-
"""Get the subscription channels.
139+
def get_non_wildcard_subscription_channels(self) -> dict[str, TopicHandler]:
140+
"""Get primary subscription channels.
147141
148-
These channels cannot be wildcards, and incoming topics must match the channel topics exactly. Note that this function gets accessed as a callback from the direct broker implementations.
142+
These channels cannot be wildcards, and incoming topics must match the channel topics exactly.
143+
144+
This function is safe to call from the broker clients, as it does not mutate state.
149145
150146
Returns:
151147
the dictionary of topics to topic information
152148
"""
153149
return self._topics_to_handlers
154150

155-
def get_wildcard_topic_handler(self, topic: str) -> TopicHandler | None:
151+
def get_wildcard_topic_and_topic_handler(self, topic: str) -> tuple[str, TopicHandler] | None:
156152
"""Get the wildcard topic handler for a given topic, if it exists.
157153
158154
This is an inefficient lookup, so should only be used if we fail to find a non-wildcard match for an incoming topic. Note that this function gets accessed as a callback from the direct broker implementations.
159155
156+
This function is safe to call from the broker clients, as it does not mutate state.
157+
158+
Params:
159+
topic: the topic for an incoming message (this will not have any wildcards in it)
160+
160161
Returns:
161-
the topic handler associated with the topic, if it exists; None otherwise
162+
if a match found: the wildcard string configured application-side to match the topic parameter, and the topic handler associated with the topic
163+
if no match found: None
162164
"""
163-
for topic_handler in self._wildcards.values():
165+
for wildcard, topic_handler in self._wildcards.items():
164166
if topic_handler.does_topic_match(topic):
165-
return topic_handler
167+
return wildcard, topic_handler
166168
return None
167169

170+
def get_all_subscription_channels(self) -> itertools.chain[tuple[str, TopicHandler]]:
171+
"""Get all subscription channels, including wildcard channels.
172+
173+
This function is safe to call from the broker clients, as it does not mutate state.
174+
175+
Returns:
176+
an iterator which yields tuples of (channel, topic handler) for all channels, including wildcards. Note that wildcard channels will be returned in no particular order, and may be interspersed with non-wildcard channels.
177+
"""
178+
return itertools.chain(self._topics_to_handlers.items(), self._wildcards.items())
179+
168180
def connect(self) -> None:
169181
"""Connect to all configured brokers.
170182

src/intersect_sdk_common/control_plane/topic_handler.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ def __init__(
4646
# match any sequence of 0 or more words (make sure to allow for the separator)
4747
regex_builder.append('[a-zA-Z0-9/-]*')
4848
elif char == '/':
49-
# in-memory topics use '/' as the separator, but on the broker '.' is used
50-
regex_builder.append('\\.')
49+
# in-memory topics use '/' as the separator, it is the protocol handler's responsibility to convert topics to use '/' as the separator
50+
regex_builder.append('/')
5151
else:
5252
# ordinary character
5353
regex_builder.append(char)

tests/e2e/__init__.py

Whitespace-only changes.

tests/fixtures/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)