Skip to content
Open
16 changes: 13 additions & 3 deletions awscrt/mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from awscrt.http import HttpProxyOptions, HttpRequest
from awscrt.io import ClientBootstrap, ClientTlsContext, SocketOptions
from dataclasses import dataclass
from awscrt.mqtt5 import Client as Mqtt5Client
from awscrt.mqtt5 import Client as Mqtt5Client, SdkMetrics


class QoS(IntEnum):
Expand Down Expand Up @@ -330,6 +330,10 @@ class Connection(NativeResource):

proxy_options (Optional[awscrt.http.HttpProxyOptions]):
Optional proxy options for all connections.

enable_metrics (bool): If true, enable IoT SDK metrics in CONNECT packet username field, otherwise, disabled.
Default to True. You may set it to false if you are not using AWS IoT services, and
using a custom authentication mechanism.
"""

def __init__(self,
Expand All @@ -355,7 +359,8 @@ def __init__(self,
proxy_options=None,
on_connection_success=None,
on_connection_failure=None,
on_connection_closed=None
on_connection_closed=None,
enable_metrics=True,
):

assert isinstance(client, Client) or isinstance(client, Mqtt5Client)
Expand Down Expand Up @@ -408,6 +413,10 @@ def __init__(self,
self.password = password
self.socket_options = socket_options if socket_options else SocketOptions()
self.proxy_options = proxy_options if proxy_options else websocket_proxy_options
if enable_metrics:
self.metrics = SdkMetrics()
else:
self.metrics = None

self._binding = _awscrt.mqtt_client_connection_new(
self,
Expand Down Expand Up @@ -524,7 +533,8 @@ def on_connect(error_code, return_code, session_present):
self.password,
self.clean_session,
on_connect,
self.proxy_options
self.proxy_options,
self.metrics
)

except Exception as e:
Expand Down
36 changes: 32 additions & 4 deletions awscrt/mqtt5.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,18 @@
from inspect import signature


@dataclass
class SdkMetrics:
"""
Configuration for IoT SDK metrics that are embedded in MQTT username field.

Args:
library_name (str): The SDK library name (e.g., "IoTDeviceSDK/Python")

"""
library_name: str = "IoTDeviceSDK/Python"


class QoS(IntEnum):
"""MQTT message delivery quality of service.

Expand Down Expand Up @@ -1158,6 +1170,7 @@ class ConnectPacket:
will_delay_interval_sec (int): A time interval, in seconds, that the server should wait (for a session reconnection) before sending the will message associated with the connection's session. If omitted or None, the server will send the will when the associated session is destroyed. If the session is destroyed before a will delay interval has elapsed, then the will must be sent at the time of session destruction.
will (PublishPacket): The definition of a message to be published when the connection's session is destroyed by the server or when the will delay interval has elapsed, whichever comes first. If None, then nothing will be sent.
user_properties (Sequence[UserProperty]): List of MQTT5 user properties included with the packet.

"""
keep_alive_interval_sec: int = None
client_id: str = None
Expand Down Expand Up @@ -1338,6 +1351,8 @@ class ClientOptions:
on_lifecycle_event_connection_success_fn (Callable[[LifecycleConnectSuccessData],]): Callback for Lifecycle Event Connection Success.
on_lifecycle_event_connection_failure_fn (Callable[[LifecycleConnectFailureData],]): Callback for Lifecycle Event Connection Failure.
on_lifecycle_event_disconnection_fn (Callable[[LifecycleDisconnectData],]): Callback for Lifecycle Event Disconnection.
enable_metrics (bool): If true, enable IoT SDK metrics in CONNECT packet username field, otherwise, disabled. Default to True. You may set it to false if you are not using AWS IoT services, and using a custom authentication mechanism.

"""
host_name: str
port: int = None
Expand All @@ -1364,6 +1379,7 @@ class ClientOptions:
on_lifecycle_event_connection_success_fn: Callable[[LifecycleConnectSuccessData], None] = None
on_lifecycle_event_connection_failure_fn: Callable[[LifecycleConnectFailureData], None] = None
on_lifecycle_event_disconnection_fn: Callable[[LifecycleDisconnectData], None] = None
enable_metrics: bool = True


def _check_callback(callback):
Expand Down Expand Up @@ -1392,6 +1408,7 @@ def __init__(self, client_options: ClientOptions):
self._on_lifecycle_connection_failure_cb = _check_callback(
client_options.on_lifecycle_event_connection_failure_fn)
self._on_lifecycle_disconnection_cb = _check_callback(client_options.on_lifecycle_event_disconnection_fn)
self._enable_metrics = client_options.enable_metrics

def _ws_handshake_transform(self, http_request_binding, http_headers_binding, native_userdata):
if self._ws_handshake_transform_cb is None:
Expand Down Expand Up @@ -1704,7 +1721,8 @@ def __init__(
ping_timeout_ms: int,
keep_alive_secs: int,
ack_timeout_secs: int,
clean_session: int):
clean_session: int,
enable_metrics: bool):
self.host_name = host_name
self.port = port
self.client_id = "" if client_id is None else client_id
Expand All @@ -1715,6 +1733,7 @@ def __init__(
self.keep_alive_secs: int = 1200 if keep_alive_secs is None else keep_alive_secs
self.ack_timeout_secs: int = 0 if ack_timeout_secs is None else ack_timeout_secs
self.clean_session: bool = True if clean_session is None else clean_session
self.enable_metrics: bool = True if enable_metrics is None else enable_metrics


class Client(NativeResource):
Expand All @@ -1728,7 +1747,6 @@ class Client(NativeResource):
"""

def __init__(self, client_options: ClientOptions):

super().__init__()

core = _ClientCore(client_options)
Expand All @@ -1746,6 +1764,12 @@ def __init__(self, client_options: ClientOptions):
if not socket_options:
socket_options = SocketOptions()

# Handle metrics configuration
if client_options.enable_metrics:
self.metrics = SdkMetrics()
else:
self.metrics = None

if not connect_options.will:
is_will_none = True
will = PublishPacket()
Expand Down Expand Up @@ -1785,6 +1809,8 @@ def __init__(self, client_options: ClientOptions):
will.correlation_data_bytes or will.correlation_data,
will.content_type,
will.user_properties,
client_options.enable_metrics,
self.metrics.library_name if self.metrics else None,
client_options.session_behavior,
client_options.extended_validation_and_flow_control_options,
client_options.offline_queue_behavior,
Expand All @@ -1811,7 +1837,8 @@ def __init__(self, client_options: ClientOptions):
keep_alive_secs=connect_options.keep_alive_interval_sec,
ack_timeout_secs=client_options.ack_timeout_sec,
clean_session=(
client_options.session_behavior < ClientSessionBehaviorType.REJOIN_ALWAYS if client_options.session_behavior else True))
client_options.session_behavior < ClientSessionBehaviorType.REJOIN_ALWAYS if client_options.session_behavior else True),
enable_metrics=client_options.enable_metrics)

def start(self):
"""Notifies the MQTT5 client that you want it maintain connectivity to the configured endpoint.
Expand Down Expand Up @@ -2043,5 +2070,6 @@ def new_connection(self, on_connection_interrupted=None, on_connection_resumed=N
use_websockets=False,
websocket_proxy_options=None,
websocket_handshake_transform=None,
proxy_options=None
proxy_options=None,
enable_metrics=self.adapter_options.enable_metrics
)
19 changes: 18 additions & 1 deletion source/mqtt5_client.c
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,10 @@ PyObject *aws_py_mqtt5_client_new(PyObject *self, PyObject *args) {
PyObject *will_delay_interval_sec_py; /* optional uint32_t */
PyObject *user_properties_py; /* optional */

/* Metrics */
PyObject *is_metrics_enabled_py; /* optional enable metrics */
struct aws_byte_cursor metrics_library_name; /* optional IoT SDK metrics username */

/* Will */
PyObject *is_will_none_py; /* optional PublishPacket */
PyObject *will_qos_val_py;
Expand Down Expand Up @@ -862,7 +866,7 @@ PyObject *aws_py_mqtt5_client_new(PyObject *self, PyObject *args) {

if (!PyArg_ParseTuple(
args,
"Os#IOOOOz#Oz#z#OOOOOOOOOz*Oz#OOOz#z*z#OOOOOOOOOOOOOO",
"Os#IOOOOz#Oz#z#OOOOOOOOOz*Oz#OOOz#z*z#OOz#OOOOOOOOOOOOO",
/* O */ &self_py,
/* s */ &host_name.ptr,
/* # */ &host_name.len,
Expand Down Expand Up @@ -904,6 +908,11 @@ PyObject *aws_py_mqtt5_client_new(PyObject *self, PyObject *args) {
/* # */ &will_content_type.len,
/* O */ &will_user_properties_py,

/* Metrics */
/* O */ &is_metrics_enabled_py,
/* z */ &metrics_library_name.ptr,
/* # */ &metrics_library_name.len,

/* O */ &session_behavior_py,
/* O */ &extended_validation_and_flow_control_options_py,
/* O */ &offline_queue_behavior_py,
Expand Down Expand Up @@ -1279,6 +1288,14 @@ PyObject *aws_py_mqtt5_client_new(PyObject *self, PyObject *args) {
connect_options.will = &will;
}

/* METRICS */
struct aws_mqtt_iot_sdk_metrics metrics_tmp;
AWS_ZERO_STRUCT(metrics_tmp);
if (PyObject_IsTrue(is_metrics_enabled_py)) {
metrics_tmp.library_name = metrics_library_name;
client_options.metrics = &metrics_tmp;
}

/* CALLBACKS */

Py_INCREF(client_core_py);
Expand Down
46 changes: 44 additions & 2 deletions source/mqtt_client_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,39 @@ static void s_on_connect(
PyGILState_Release(state);
}

/* If unsuccessful, false is returned and a Python error has been set */
bool s_set_metrics(struct aws_mqtt_client_connection *connection, PyObject *metrics) {
assert(metrics && (metrics != Py_None));

if (connection == NULL) {
return false;
}

bool success = false;

PyObject *library_name_py = PyObject_GetAttrString(metrics, "library_name");
struct aws_byte_cursor library_name = aws_byte_cursor_from_pyunicode(library_name_py);
if (!library_name.ptr) {
PyErr_SetString(PyExc_TypeError, "metrics.library_name must be str type");
goto done;
}

struct aws_mqtt_iot_sdk_metrics metrics_struct = {
.library_name = library_name,
};

if (aws_mqtt_client_connection_set_metrics(connection, &metrics_struct)) {
PyErr_SetAwsLastError();
success = false;
}

success = true;

done:
Py_DECREF(library_name_py);
return success;
}

/* If unsuccessful, false is returned and a Python error has been set */
bool s_set_will(struct aws_mqtt_client_connection *connection, PyObject *will) {
assert(will && (will != Py_None));
Expand Down Expand Up @@ -668,9 +701,10 @@ PyObject *aws_py_mqtt_client_connection_connect(PyObject *self, PyObject *args)
PyObject *is_clean_session;
PyObject *on_connect;
PyObject *proxy_options_py;
PyObject *metrics_py;
if (!PyArg_ParseTuple(
args,
"Os#s#IOOKKHIIOz#z#OOO",
"Os#s#IOOKKHIIOz#z#OOOO",
&impl_capsule,
&client_id,
&client_id_len,
Expand All @@ -691,7 +725,8 @@ PyObject *aws_py_mqtt_client_connection_connect(PyObject *self, PyObject *args)
&password_len,
&is_clean_session,
&on_connect,
&proxy_options_py)) {
&proxy_options_py,
&metrics_py)) {
return NULL;
}

Expand Down Expand Up @@ -773,6 +808,13 @@ PyObject *aws_py_mqtt_client_connection_connect(PyObject *self, PyObject *args)
}
}

// If metrics is None, we do not set metrics at all.
if (metrics_py != Py_None) {
if (!s_set_metrics(py_connection->native, metrics_py)) {
goto done;
}
}

if (on_connect != Py_None) {
Py_INCREF(on_connect);
py_connection->on_connect = on_connect;
Expand Down
6 changes: 4 additions & 2 deletions test/test_mqtt.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,7 +629,8 @@ def _test_mqtt311_direct_connect_basic_auth(self):
host_name=input_host_name,
port=input_port,
username=input_username,
password=input_password)
password=input_password,
enable_metrics=False)
connection.connect().result(TIMEOUT)
connection.disconnect().result(TIMEOUT)

Expand Down Expand Up @@ -760,7 +761,8 @@ def sign_function(transform_args, **kwargs):
username=input_username,
password=input_password,
use_websockets=True,
websocket_handshake_transform=sign_function)
websocket_handshake_transform=sign_function,
enable_metrics=False)
connection.connect().result(TIMEOUT)
connection.disconnect().result(TIMEOUT)

Expand Down
Loading
Loading