Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion ably/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
from ably.types.capability import Capability
from ably.types.channelsubscription import PushChannelSubscription
from ably.types.device import DeviceDetails
from ably.types.message import MessageAction, MessageVersion
from ably.types.operations import MessageOperation, PublishResult, UpdateDeleteResult
from ably.types.options import Options, VCDiffDecoder
from ably.util.crypto import CipherParams
from ably.util.exceptions import AblyAuthException, AblyException, IncompatibleClientIdException
Expand All @@ -15,5 +17,5 @@
logger = logging.getLogger(__name__)
logger.addHandler(logging.NullHandler())

api_version = '4'
api_version = '5'
lib_version = '2.1.3'
45 changes: 32 additions & 13 deletions ably/realtime/connectionmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
from collections import deque
from datetime import datetime
from itertools import zip_longest
from typing import TYPE_CHECKING

import httpx
Expand All @@ -13,6 +14,7 @@
from ably.types.connectiondetails import ConnectionDetails
from ably.types.connectionerrors import ConnectionErrors
from ably.types.connectionstate import ConnectionEvent, ConnectionState, ConnectionStateChange
from ably.types.operations import PublishResult
from ably.types.tokendetails import TokenDetails
from ably.util.eventemitter import EventEmitter
from ably.util.exceptions import AblyException, IncompatibleClientIdException
Expand All @@ -29,7 +31,7 @@ class PendingMessage:

def __init__(self, message: dict):
self.message = message
self.future: asyncio.Future | None = None
self.future: asyncio.Future[PublishResult] | None = None
action = message.get('action')

# Messages that require acknowledgment: MESSAGE, PRESENCE, ANNOTATION, OBJECT
Expand Down Expand Up @@ -58,15 +60,22 @@ def count(self) -> int:
"""Return the number of pending messages"""
return len(self.messages)

def complete_messages(self, serial: int, count: int, err: AblyException | None = None) -> None:
def complete_messages(
self,
serial: int,
count: int,
res: list[PublishResult] | None,
err: AblyException | None = None
) -> None:
"""Complete messages based on serial and count from ACK/NACK

Args:
serial: The msgSerial of the first message being acknowledged
count: The number of messages being acknowledged
res: List of PublishResult objects for each message acknowledged, or None if not available
err: Error from NACK, or None for successful ACK
"""
log.debug(f'MessageQueue.complete_messages(): serial={serial}, count={count}, err={err}')
log.debug(f'MessageQueue.complete_messages(): serial={serial}, count={count}, res={res}, err={err}')

if not self.messages:
log.warning('MessageQueue.complete_messages(): called on empty queue')
Expand All @@ -87,12 +96,17 @@ def complete_messages(self, serial: int, count: int, err: AblyException | None =
completed_messages = self.messages[:num_to_complete]
self.messages = self.messages[num_to_complete:]

for msg in completed_messages:
# Default res to empty list if None
res_list = res if res is not None else []
for (msg, publish_result) in zip_longest(completed_messages, res_list):
if msg.future and not msg.future.done():
if err:
msg.future.set_exception(err)
else:
msg.future.set_result(None)
# If publish_result is None, return empty PublishResult
if publish_result is None:
publish_result = PublishResult()
msg.future.set_result(publish_result)

def complete_all_messages(self, err: AblyException) -> None:
"""Complete all pending messages with an error"""
Expand Down Expand Up @@ -199,7 +213,7 @@ async def close_impl(self) -> None:

self.notify_state(ConnectionState.CLOSED)

async def send_protocol_message(self, protocol_message: dict) -> None:
async def send_protocol_message(self, protocol_message: dict) -> PublishResult | None:
"""Send a protocol message and optionally track it for acknowledgment

Args:
Expand Down Expand Up @@ -233,12 +247,14 @@ async def send_protocol_message(self, protocol_message: dict) -> None:
if state_should_queue:
self.queued_messages.appendleft(pending_message)
if pending_message.ack_required:
await pending_message.future
return await pending_message.future
return None

return await self._send_protocol_message_on_connected_state(pending_message)

async def _send_protocol_message_on_connected_state(self, pending_message: PendingMessage) -> None:
async def _send_protocol_message_on_connected_state(
self, pending_message: PendingMessage
) -> PublishResult | None:
if self.state == ConnectionState.CONNECTED and self.transport:
# Add to pending queue before sending (for messages being resent from queue)
if pending_message.ack_required and pending_message not in self.pending_message_queue.messages:
Expand All @@ -253,7 +269,7 @@ async def _send_protocol_message_on_connected_state(self, pending_message: Pendi
AblyException("No active transport", 500, 50000)
)
if pending_message.ack_required:
await pending_message.future
return await pending_message.future
return None

def send_queued_messages(self) -> None:
Expand Down Expand Up @@ -449,15 +465,18 @@ def on_heartbeat(self, id: str | None) -> None:
self.__ping_future.set_result(None)
self.__ping_future = None

def on_ack(self, serial: int, count: int) -> None:
def on_ack(
self, serial: int, count: int, res: list[PublishResult] | None
) -> None:
"""Handle ACK protocol message from server

Args:
serial: The msgSerial of the first message being acknowledged
count: The number of messages being acknowledged
res: List of PublishResult objects for each message acknowledged, or None if not available
"""
log.debug(f'ConnectionManager.on_ack(): serial={serial}, count={count}')
self.pending_message_queue.complete_messages(serial, count)
log.debug(f'ConnectionManager.on_ack(): serial={serial}, count={count}, res={res}')
self.pending_message_queue.complete_messages(serial, count, res)

def on_nack(self, serial: int, count: int, err: AblyException | None) -> None:
"""Handle NACK protocol message from server
Expand All @@ -471,7 +490,7 @@ def on_nack(self, serial: int, count: int, err: AblyException | None) -> None:
err = AblyException('Unable to send message; channel not responding', 50001, 500)

log.error(f'ConnectionManager.on_nack(): serial={serial}, count={count}, err={err}')
self.pending_message_queue.complete_messages(serial, count, err)
self.pending_message_queue.complete_messages(serial, count, None, err)

def deactivate_transport(self, reason: AblyException | None = None):
# RTN19a: Before disconnecting, requeue any pending messages
Expand Down
Loading
Loading