Skip to content

Commit a2ca83e

Browse files
committed
have ControlPlaneManager take in queue name generator from SDK/core-service
Signed-off-by: Lance-Drane <Lance-Drane@users.noreply.github.com>
1 parent 18be89b commit a2ca83e

File tree

3 files changed

+17
-18
lines changed

3 files changed

+17
-18
lines changed

src/intersect_sdk_common/control_plane/brokers/amqp_client.py

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111

1212
import functools
1313
import threading
14-
from hashlib import sha384
1514
from typing import TYPE_CHECKING
1615

1716
import pika
@@ -63,19 +62,6 @@ def __repr__(self) -> str:
6362
return f'{self.consumer_tag} -- OBTAINED: {self.consumer_tag_obtained()}'
6463

6564

66-
def _get_queue_name(routing_key: str) -> str:
67-
"""Generate a valid queue name from the routing key.
68-
69-
We want to always be able to generate the same queue name from the routing key every time,
70-
so we don't use UUIDs or want the broker to generate a key name.
71-
72-
We must also keep the length under 128 characters.
73-
74-
See https://www.rabbitmq.com/docs/queues#names for a complete reference.
75-
"""
76-
return sha384(routing_key.encode()).hexdigest()
77-
78-
7965
# TODO we should be handling hierarchy parts as a list of strings until they get to the client
8066
# this will be a breaking change, so only add it when ready to break
8167
def _hierarchy_2_amqp(hierarchy: str) -> str:
@@ -418,10 +404,11 @@ def _on_exchange_declareok(self, _unused_frame: Frame, channel: Channel) -> None
418404
channel=channel,
419405
topic=amqp_topic,
420406
persist=topic_handler.topic_persist,
407+
queue_name=topic_handler.queue_name_generator(topic),
421408
)
422409
self._connection.ioloop.add_callback_threadsafe(cb)
423410

424-
def _create_queue(self, channel: Channel, topic: str, persist: bool) -> None:
411+
def _create_queue(self, channel: Channel, topic: str, persist: bool, queue_name: str) -> None:
425412
"""Create a queue on the broker.
426413
427414
This can be called directly from the AMQP Client if the subscribed connection already has a Channel it's listening to.
@@ -432,12 +419,13 @@ def _create_queue(self, channel: Channel, topic: str, persist: bool) -> None:
432419
persist: boolean value to determine how we manage the queue.
433420
If True, this queue will persist forever, even on application or broker shutdown, and we need a persistent name.
434421
If False, we will generate a temporary queue using the broker's naming scheme.
422+
queue_name: The name of the queue to create, if persist is True.
435423
"""
436424
cb = functools.partial(
437425
self._on_queue_declareok, channel=channel, topic=topic, persist=persist
438426
)
439427
channel.queue_declare(
440-
queue=_get_queue_name(topic)
428+
queue=queue_name
441429
if persist
442430
else '', # if we're transient, let the broker generate a name for us
443431
durable=persist,

src/intersect_sdk_common/control_plane/control_plane_manager.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,10 +44,13 @@ class ControlPlaneManager:
4444
def __init__(
4545
self,
4646
control_configs: list[ControlPlaneConfig],
47+
queue_name_generator: Callable[[str], str],
4748
) -> None:
4849
"""Basic constructor.
4950
5051
Some interaction with message brokers can change based on whether or not a Service or a Client is calling it.
52+
53+
queue_name_generator should be a hardcoded value for Core Services, the SDK should provide its own function to generate queue names.
5154
"""
5255
self._control_providers = [
5356
_create_control_provider(config, self.get_subscription_channels)
@@ -58,6 +61,7 @@ def __init__(
5861
self._ready = False
5962
# topics_to_handlers are managed here and transcend connections/disconnections to the broker
6063
self._topics_to_handlers: dict[str, TopicHandler] = {}
64+
self._queue_name_generator = queue_name_generator
6165

6266
def add_subscription_channel(
6367
self, channel: str, callbacks: set[MessageCallback], persist: bool
@@ -78,7 +82,7 @@ def add_subscription_channel(
7882
"""
7983
topic_handler = self._topics_to_handlers.get(channel)
8084
if topic_handler is None:
81-
topic_handler = TopicHandler(persist)
85+
topic_handler = TopicHandler(persist, self._queue_name_generator)
8286
topic_handler.callbacks |= callbacks
8387
self._topics_to_handlers[channel] = topic_handler
8488
else:

src/intersect_sdk_common/control_plane/topic_handler.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
"""Attributes associated with a specific pub/sub topic."""
22

3+
from collections.abc import Callable
4+
35
from .definitions import MessageCallback
46

57

@@ -14,11 +16,16 @@ class TopicHandler:
1416
topic_persist: bool
1517
"""Whether or not a topic queue is expected to persist on the message broker."""
1618

17-
def __init__(self, topic_persist: bool) -> None:
19+
queue_name_generator: Callable[[str], str]
20+
"""A pointer to the function which generates the queue name.."""
21+
22+
def __init__(self, topic_persist: bool, queue_name_generator: Callable[[str], str]) -> None:
1823
"""Initialize a TopicHandler instance.
1924
2025
Args:
2126
topic_persist: Whether the topic queue is expected to persist on the message broker.
27+
queue_name_generator: A pointer to the function which generates the queue name.
2228
"""
2329
self.callbacks = set()
2430
self.topic_persist = topic_persist
31+
self.queue_name_generator = queue_name_generator

0 commit comments

Comments
 (0)