Skip to content

Commit 2704dd7

Browse files
authored
feat: add async redeliver supprot (#285)
1 parent 36d6fd6 commit 2704dd7

File tree

2 files changed

+49
-0
lines changed

2 files changed

+49
-0
lines changed

pulsar/asyncio.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -399,6 +399,17 @@ async def close(self) -> None:
399399
self._consumer.close_async(functools.partial(_set_future, future, value=None))
400400
await future
401401

402+
def redeliver_unacknowledged_messages(self):
403+
"""
404+
Redelivers all the unacknowledged messages. In failover mode, the
405+
request is ignored if the consumer is not active for the given topic. In
406+
shared mode, the consumer's messages to be redelivered are distributed
407+
across all the connected consumers. This is a non-blocking call and
408+
doesn't throw an exception. In case the connection breaks, the messages
409+
are redelivered after reconnect.
410+
"""
411+
self._consumer.redeliver_unacknowledged_messages()
412+
402413
def topic(self) -> str:
403414
"""
404415
Return the topic this consumer is subscribed to.

tests/asyncio_test.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,44 @@ async def verify_receive(consumer: Consumer):
267267
await verify_receive(consumer)
268268
await consumer.close()
269269

270+
async def test_async_dead_letter_policy(self):
271+
topic = f'asyncio-test-dlq-{time.time()}'
272+
dlq_topic = 'dlq-' + topic
273+
max_redeliver_count = 5
274+
275+
dlq_consumer = await self._client.subscribe(dlq_topic, "my-sub", consumer_type=pulsar.ConsumerType.Shared)
276+
consumer = await self._client.subscribe(topic, "my-sub", consumer_type=pulsar.ConsumerType.Shared,
277+
dead_letter_policy=pulsar.ConsumerDeadLetterPolicy(max_redeliver_count, dlq_topic, 'init-sub'))
278+
producer = await self._client.create_producer(topic)
279+
280+
# Sen num msgs.
281+
num = 10
282+
for i in range(num):
283+
await producer.send(b"hello-%d" % i)
284+
await producer.flush()
285+
286+
# Redelivery all messages maxRedeliverCountNum time.
287+
for i in range(1, num * max_redeliver_count + num + 1):
288+
msg = await consumer.receive()
289+
if i % num == 0:
290+
consumer.redeliver_unacknowledged_messages()
291+
print(f"Start redeliver msgs '{i}'")
292+
293+
with self.assertRaises(asyncio.TimeoutError):
294+
await asyncio.wait_for(consumer.receive(), 0.1)
295+
296+
for i in range(num):
297+
msg = await dlq_consumer.receive()
298+
self.assertTrue(msg)
299+
self.assertEqual(msg.data(), b"hello-%d" % i)
300+
dlq_consumer.acknowledge(msg)
301+
302+
with self.assertRaises(asyncio.TimeoutError):
303+
await asyncio.wait_for(dlq_consumer.receive(), 0.1)
304+
305+
await consumer.close()
306+
await dlq_consumer.close()
307+
270308
async def test_unsubscribe(self):
271309
topic = f'asyncio-test-unsubscribe-{time.time()}'
272310
sub = 'sub'

0 commit comments

Comments
 (0)