Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion hazelcast/internal/asyncio_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,18 @@
MULTI_MAP_SERVICE,
ProxyManager,
REPLICATED_MAP_SERVICE,
TOPIC_SERVICE,
VECTOR_SERVICE,
)
from hazelcast.internal.asyncio_proxy.list import List
from hazelcast.internal.asyncio_proxy.map import Map
from hazelcast.internal.asyncio_proxy.multi_map import MultiMap
from hazelcast.internal.asyncio_proxy.replicated_map import ReplicatedMap
from hazelcast.internal.asyncio_proxy.topic import Topic
from hazelcast.internal.asyncio_reactor import AsyncioReactor
from hazelcast.serialization import SerializationServiceV1
from hazelcast.internal.asyncio_statistics import Statistics
from hazelcast.types import KeyType, ValueType
from hazelcast.types import KeyType, MessageType, ValueType
from hazelcast.util import AtomicInteger, RoundRobinLB

__all__ = ("HazelcastClient",)
Expand Down Expand Up @@ -297,6 +299,17 @@ async def get_replicated_map(self, name: str) -> ReplicatedMap[KeyType, ValueTyp
"""
return await self._proxy_manager.get_or_create(REPLICATED_MAP_SERVICE, name)

async def get_topic(self, name: str) -> Topic[MessageType]:
"""Returns the distributed topic instance with the specified name.

Args:
name: Name of the distributed topic.

Returns:
Distributed topic instance with the specified name.
"""
return await self._proxy_manager.get_or_create(TOPIC_SERVICE, name)

async def create_vector_collection_config(
self,
name: str,
Expand Down
3 changes: 3 additions & 0 deletions hazelcast/internal/asyncio_proxy/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from hazelcast.internal.asyncio_proxy.list import create_list_proxy
from hazelcast.internal.asyncio_proxy.multi_map import create_multi_map_proxy
from hazelcast.internal.asyncio_proxy.topic import create_topic_proxy
from hazelcast.internal.asyncio_proxy.vector_collection import (
VectorCollection,
create_vector_collection_proxy,
Expand All @@ -18,6 +19,7 @@
MAP_SERVICE = "hz:impl:mapService"
MULTI_MAP_SERVICE = "hz:impl:multiMapService"
REPLICATED_MAP_SERVICE = "hz:impl:replicatedMapService"
TOPIC_SERVICE = "hz:impl:topicService"
VECTOR_SERVICE = "hz:service:vector"

_proxy_init: typing.Dict[
Expand All @@ -28,6 +30,7 @@
MAP_SERVICE: create_map_proxy,
MULTI_MAP_SERVICE: create_multi_map_proxy,
REPLICATED_MAP_SERVICE: create_replicated_map_proxy,
TOPIC_SERVICE: create_topic_proxy,
VECTOR_SERVICE: create_vector_collection_proxy,
}

Expand Down
116 changes: 116 additions & 0 deletions hazelcast/internal/asyncio_proxy/topic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
import typing

from hazelcast.protocol.codec import (
topic_add_message_listener_codec,
topic_publish_codec,
topic_publish_all_codec,
topic_remove_message_listener_codec,
)
from hazelcast.internal.asyncio_proxy.base import PartitionSpecificProxy
from hazelcast.proxy.base import TopicMessage
from hazelcast.serialization.compact import SchemaNotReplicatedError
from hazelcast.types import MessageType
from hazelcast.util import check_not_none


class Topic(PartitionSpecificProxy, typing.Generic[MessageType]):
"""Hazelcast provides distribution mechanism for publishing messages that
are delivered to multiple subscribers, which is also known as a
publish/subscribe (pub/sub) messaging model.

Publish and subscriptions are cluster-wide. When a member subscribes to
a topic, it is actually registering for messages published by any member
in the cluster, including the new members joined after you added the
listener.

Messages are ordered, meaning that listeners(subscribers) will process the
messages in the order they are actually published.

Example:
>>> my_topic = await client.get_topic("my_topic")
>>> await my_topic.publish("hello")

Warning:
Asyncio client topic proxy is not thread-safe, do not access it from other threads.
"""

async def add_listener(
self, on_message: typing.Callable[[TopicMessage[MessageType]], None] = None
) -> str:
"""Subscribes to this topic.

When someone publishes a message on this topic, ``on_message`` function
is called if provided.

Args:
on_message: Function to be called when a message is published.

Returns:
A registration id which is used as a key to remove the listener.
"""
codec = topic_add_message_listener_codec
request = codec.encode_request(self.name, self._is_smart)

def handle(item_data, publish_time, uuid):
member = self._context.cluster_service.get_member(uuid)
item_event = TopicMessage(
self.name, self._to_object(item_data), publish_time / 1000.0, member
)
on_message(item_event)

return await self._register_listener(
request,
lambda r: codec.decode_response(r),
lambda reg_id: topic_remove_message_listener_codec.encode_request(self.name, reg_id),
lambda m: codec.handle(m, handle),
)

async def publish(self, message: MessageType) -> None:
"""Publishes the message to all subscribers of this topic.

Args:
message: The message to be published.
"""
try:
message_data = self._to_data(message)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.publish, message)

request = topic_publish_codec.encode_request(self.name, message_data)
return await self._invoke(request)

async def publish_all(self, messages: typing.Sequence[MessageType]) -> None:
"""Publishes the messages to all subscribers of this topic.

Args:
messages: The messages to be published.
"""
check_not_none(messages, "Messages cannot be None")
try:
topic_messages = []
for m in messages:
check_not_none(m, "Message cannot be None")
data = self._to_data(m)
topic_messages.append(data)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.publish_all, messages)

request = topic_publish_all_codec.encode_request(self.name, topic_messages)
return await self._invoke(request)

async def remove_listener(self, registration_id: str) -> bool:
"""Stops receiving messages for the given message listener.

If the given listener already removed, this method does nothing.

Args:
registration_id: Registration id of the listener to be removed.

Returns:
``True`` if the listener is removed, ``False`` otherwise.
"""
return await self._deregister_listener(registration_id)


async def create_topic_proxy(service_name, name, context):
return Topic(service_name, name, context)
2 changes: 1 addition & 1 deletion hazelcast/proxy/topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class Topic(PartitionSpecificProxy["BlockingTopic"], typing.Generic[MessageType]
are delivered to multiple subscribers, which is also known as a
publish/subscribe (pub/sub) messaging model.

Publish and subscriptions are cluster-wide. When a member subscribes for
Publish and subscriptions are cluster-wide. When a member subscribes to
a topic, it is actually registering for messages published by any member
in the cluster, including the new members joined after you added the
listener.
Expand Down
83 changes: 83 additions & 0 deletions tests/integration/asyncio/proxy/topic_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from tests.integration.asyncio.base import SingleMemberTestCase
from tests.util import (
random_string,
event_collector,
skip_if_client_version_older_than,
skip_if_server_version_older_than,
)


class TopicTest(SingleMemberTestCase):
@classmethod
def configure_client(cls, config):
config["cluster_name"] = cls.cluster.id
return config

async def asyncSetUp(self):
await super().asyncSetUp()
self.topic = await self.client.get_topic(random_string())

async def asyncTearDown(self):
await self.topic.destroy()
await super().asyncTearDown()

async def test_add_listener(self):
collector = event_collector()
await self.topic.add_listener(on_message=collector)
await self.topic.publish("item-value")

def assert_event():
self.assertEqual(len(collector.events), 1)
event = collector.events[0]
self.assertEqual(event.message, "item-value")
self.assertGreater(event.publish_time, 0)

await self.assertTrueEventually(assert_event, 5)

async def test_remove_listener(self):
collector = event_collector()
reg_id = await self.topic.add_listener(on_message=collector)
await self.topic.remove_listener(reg_id)
await self.topic.publish("item-value")

def assert_event():
self.assertEqual(len(collector.events), 0)
if len(collector.events) > 0:
event = collector.events[0]
self.assertEqual(event.message, "item-value")
self.assertGreater(event.publish_time, 0)

await self.assertTrueEventually(assert_event, 5)

async def test_str(self):
self.assertTrue(str(self.topic).startswith("Topic"))

async def test_publish_all(self):
skip_if_client_version_older_than(self, "5.2")
skip_if_server_version_older_than(self, self.client, "4.1")

collector = event_collector()
await self.topic.add_listener(on_message=collector)

messages = ["message1", "message2", "message3"]
await self.topic.publish_all(messages)

def assert_event():
self.assertEqual(len(collector.events), 3)

await self.assertTrueEventually(assert_event, 5)

async def test_publish_all_none_messages(self):
skip_if_client_version_older_than(self, "5.2")
skip_if_server_version_older_than(self, self.client, "4.1")

with self.assertRaises(AssertionError):
await self.topic.publish_all(None)

async def test_publish_all_none_message(self):
skip_if_client_version_older_than(self, "5.2")
skip_if_server_version_older_than(self, self.client, "4.1")

messages = ["message1", None, "message3"]
with self.assertRaises(AssertionError):
await self.topic.publish_all(messages)
Loading