Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions sdk/servicebus/azure-servicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Release History

## 7.14.4 (Unreleased)

### Bugs Fixed

- Read `com.microsoft:max-message-batch-size` vendor property from the AMQP sender link to correctly limit batch size on Premium large-message entities, where `max-message-size` can be up to 100 MB but the batch limit is 1 MB.

## 7.14.3 (2025-11-11)

### Bugs Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ def session_id(self) -> Optional[str]:
return self._raw_amqp_message.properties.group_id

@session_id.setter
def session_id(self, value: str) -> None:
def session_id(self, value: Optional[str]) -> None:
if value and len(value) > MESSAGE_PROPERTY_MAX_LENGTH:
raise ValueError("session_id cannot be longer than {} characters.".format(MESSAGE_PROPERTY_MAX_LENGTH))

Expand All @@ -294,7 +294,7 @@ def application_properties(self) -> Optional[Dict[Union[str, bytes], PrimitiveTy
return self._raw_amqp_message.application_properties

@application_properties.setter
def application_properties(self, value: Dict[Union[str, bytes], Any]) -> None:
def application_properties(self, value: Optional[Dict[Union[str, bytes], Any]]) -> None:
self._raw_amqp_message.application_properties = value

@property
Expand All @@ -320,7 +320,7 @@ def partition_key(self) -> Optional[str]:
return None

@partition_key.setter
def partition_key(self, value: str) -> None:
def partition_key(self, value: Optional[str]) -> None:
if value and len(value) > MESSAGE_PROPERTY_MAX_LENGTH:
raise ValueError("partition_key cannot be longer than {} characters.".format(MESSAGE_PROPERTY_MAX_LENGTH))

Expand Down Expand Up @@ -349,7 +349,7 @@ def time_to_live(self) -> Optional[datetime.timedelta]:
return None

@time_to_live.setter
def time_to_live(self, value: Union[datetime.timedelta, int]) -> None:
def time_to_live(self, value: Optional[Union[datetime.timedelta, int]]) -> None:
if not self._raw_amqp_message.header:
self._raw_amqp_message.header = AmqpMessageHeader()
if value is None:
Expand Down Expand Up @@ -395,7 +395,7 @@ def scheduled_enqueue_time_utc(self) -> Optional[datetime.datetime]:
return None

@scheduled_enqueue_time_utc.setter
def scheduled_enqueue_time_utc(self, value: datetime.datetime) -> None:
def scheduled_enqueue_time_utc(self, value: Optional[datetime.datetime]) -> None:
if not self._raw_amqp_message.properties:
self._raw_amqp_message.properties = AmqpMessageProperties()
if not self._raw_amqp_message.properties.message_id:
Expand Down Expand Up @@ -442,7 +442,7 @@ def content_type(self) -> Optional[str]:
return self._raw_amqp_message.properties.content_type

@content_type.setter
def content_type(self, value: str) -> None:
def content_type(self, value: Optional[str]) -> None:
if not self._raw_amqp_message.properties:
self._raw_amqp_message.properties = AmqpMessageProperties()
self._raw_amqp_message.properties.content_type = value
Expand All @@ -468,7 +468,7 @@ def correlation_id(self) -> Optional[str]:
return self._raw_amqp_message.properties.correlation_id

@correlation_id.setter
def correlation_id(self, value: str) -> None:
def correlation_id(self, value: Optional[str]) -> None:
if not self._raw_amqp_message.properties:
self._raw_amqp_message.properties = AmqpMessageProperties()
self._raw_amqp_message.properties.correlation_id = value
Expand All @@ -490,7 +490,7 @@ def subject(self) -> Optional[str]:
return self._raw_amqp_message.properties.subject

@subject.setter
def subject(self, value: str) -> None:
def subject(self, value: Optional[str]) -> None:
if not self._raw_amqp_message.properties:
self._raw_amqp_message.properties = AmqpMessageProperties()
self._raw_amqp_message.properties.subject = value
Expand All @@ -515,7 +515,7 @@ def message_id(self) -> Optional[str]:
return self._raw_amqp_message.properties.message_id

@message_id.setter
def message_id(self, value: str) -> None:
def message_id(self, value: Optional[str]) -> None:
if value and len(str(value)) > MESSAGE_PROPERTY_MAX_LENGTH:
raise ValueError("message_id cannot be longer than {} characters.".format(MESSAGE_PROPERTY_MAX_LENGTH))
if not self._raw_amqp_message.properties:
Expand Down Expand Up @@ -544,7 +544,7 @@ def reply_to(self) -> Optional[str]:
return self._raw_amqp_message.properties.reply_to

@reply_to.setter
def reply_to(self, value: str) -> None:
def reply_to(self, value: Optional[str]) -> None:
if not self._raw_amqp_message.properties:
self._raw_amqp_message.properties = AmqpMessageProperties()
self._raw_amqp_message.properties.reply_to = value
Expand All @@ -570,7 +570,7 @@ def reply_to_session_id(self) -> Optional[str]:
return self._raw_amqp_message.properties.reply_to_group_id

@reply_to_session_id.setter
def reply_to_session_id(self, value: str) -> None:
def reply_to_session_id(self, value: Optional[str]) -> None:
if value and len(value) > MESSAGE_PROPERTY_MAX_LENGTH:
raise ValueError(
"reply_to_session_id cannot be longer than {} characters.".format(MESSAGE_PROPERTY_MAX_LENGTH)
Expand Down Expand Up @@ -600,7 +600,7 @@ def to(self) -> Optional[str]:
return self._raw_amqp_message.properties.to

@to.setter
def to(self, value: str) -> None:
def to(self, value: Optional[str]) -> None:
if not self._raw_amqp_message.properties:
self._raw_amqp_message.properties = AmqpMessageProperties()
self._raw_amqp_message.properties.to = value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,14 @@ def _open(self):
self._max_message_size_on_link = (
self._amqp_transport.get_remote_max_message_size(self._handler) or MAX_MESSAGE_LENGTH_BYTES
)
if self._max_message_size_on_link >= MAX_BATCH_SIZE_PREMIUM:
# Prefer the vendor property 'com.microsoft:max-message-batch-size'
# which correctly reports the batch size limit independent of the
# per-entity max-message-size (which can be up to 100 MB on Premium
# large-message entities).
vendor_batch_size = self._amqp_transport.get_remote_max_message_batch_size(self._handler)
if vendor_batch_size is not None:
self._max_batch_size_on_link = vendor_batch_size
elif self._max_message_size_on_link >= MAX_BATCH_SIZE_PREMIUM:
self._max_batch_size_on_link = MAX_BATCH_SIZE_PREMIUM
else:
self._max_batch_size_on_link = MAX_BATCH_SIZE_STANDARD
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,16 @@ def get_remote_max_message_size(handler):
:rtype: int
"""

@staticmethod
@abstractmethod
def get_remote_max_message_batch_size(handler):
"""
Returns the max batch size from the vendor link property
'com.microsoft:max-message-batch-size', or None if unavailable.
:param ~uamqp.AMQPClient or ~pyamqp.AMQPClient handler: Client to read link properties from.
:rtype: int or None
"""

@staticmethod
@abstractmethod
def get_handler_link_name(handler):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,53 +249,55 @@ def to_outgoing_amqp_message(annotated_message: "AmqpAnnotatedMessage") -> "Mess
header_vals = annotated_message.header.values() if annotated_message.header else None
# If header and non-None header values, create outgoing header.
if header_vals and header_vals.count(None) != len(header_vals):
annotated_message.header = cast("AmqpMessageHeader", annotated_message.header)
# Bind the cast result to a local so mypy can narrow the Optional type.
header = cast("AmqpMessageHeader", annotated_message.header)
message_header = Header(
delivery_count=annotated_message.header.delivery_count,
ttl=annotated_message.header.time_to_live,
first_acquirer=annotated_message.header.first_acquirer,
durable=annotated_message.header.durable,
priority=annotated_message.header.priority,
delivery_count=header.delivery_count,
ttl=header.time_to_live,
first_acquirer=header.first_acquirer,
durable=header.durable,
priority=header.priority,
)
if annotated_message.header.time_to_live and annotated_message.header.time_to_live != MAX_DURATION_VALUE:
if header.time_to_live and header.time_to_live != MAX_DURATION_VALUE:
ttl_set = True
creation_time_from_ttl = int(
time.mktime(datetime.datetime.now(timezone.utc).timetuple()) * 1000 # TODO: should this be * 1?
)
absolute_expiry_time_from_ttl = int(
min(MAX_ABSOLUTE_EXPIRY_TIME, creation_time_from_ttl + annotated_message.header.time_to_live)
min(MAX_ABSOLUTE_EXPIRY_TIME, creation_time_from_ttl + header.time_to_live)
)

message_properties = None
properties_vals = annotated_message.properties.values() if annotated_message.properties else None
# If properties and non-None properties values, create outgoing properties.
if properties_vals and properties_vals.count(None) != len(properties_vals):
annotated_message.properties = cast("AmqpMessageProperties", annotated_message.properties)
# Bind the cast result to a local so mypy can narrow the Optional type.
props = cast("AmqpMessageProperties", annotated_message.properties)
creation_time = None
absolute_expiry_time = None
if ttl_set:
creation_time = creation_time_from_ttl
absolute_expiry_time = absolute_expiry_time_from_ttl
else:
if annotated_message.properties.creation_time:
creation_time = int(annotated_message.properties.creation_time)
if annotated_message.properties.absolute_expiry_time:
absolute_expiry_time = int(annotated_message.properties.absolute_expiry_time)
if props.creation_time:
creation_time = int(props.creation_time)
if props.absolute_expiry_time:
absolute_expiry_time = int(props.absolute_expiry_time)

message_properties = Properties(
message_id=annotated_message.properties.message_id,
user_id=annotated_message.properties.user_id,
to=annotated_message.properties.to,
subject=annotated_message.properties.subject,
reply_to=annotated_message.properties.reply_to,
correlation_id=annotated_message.properties.correlation_id,
content_type=annotated_message.properties.content_type,
content_encoding=annotated_message.properties.content_encoding,
message_id=props.message_id,
user_id=props.user_id,
to=props.to,
subject=props.subject,
reply_to=props.reply_to,
correlation_id=props.correlation_id,
content_type=props.content_type,
content_encoding=props.content_encoding,
creation_time=creation_time,
absolute_expiry_time=absolute_expiry_time,
group_id=annotated_message.properties.group_id,
group_sequence=annotated_message.properties.group_sequence,
reply_to_group_id=annotated_message.properties.reply_to_group_id,
group_id=props.group_id,
group_sequence=props.group_sequence,
reply_to_group_id=props.reply_to_group_id,
)
elif ttl_set:
message_properties = Properties( # type: ignore[call-arg]
Expand Down Expand Up @@ -389,6 +391,26 @@ def get_remote_max_message_size(handler: "AMQPClient") -> int:
"""
return handler._link.remote_max_message_size # pylint: disable=protected-access

@staticmethod
def get_remote_max_message_batch_size(handler: "AMQPClient") -> Optional[int]:
"""
Returns the max batch size from the vendor link property
'com.microsoft:max-message-batch-size', or None if unavailable.

:param ~pyamqp.AMQPClient handler: Client to read link properties from.
:return: Remote max message batch size, or None.
:rtype: Optional[int]
"""
props = getattr(handler._link, "remote_properties", None) # pylint: disable=protected-access
if props:
# pyamqp decodes AMQP symbols as bytes; check both forms for safety.
for key in (b"com.microsoft:max-message-batch-size", "com.microsoft:max-message-batch-size"):
if key in props:
value = props[key]
if isinstance(value, int) and value > 0:
return value
return None

@staticmethod
def get_handler_link_name(handler: "AMQPClient") -> str:
"""
Expand Down
Loading