Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ python:
- '3.7'
before_install:
- docker pull eclipse-mosquitto
- docker run -d -p 11883:1883 -p 9001:9001 -v $(pwd)/mosquitto:/mosquitto/config eclipse-mosquitto
- docker run -d -p 1883:1883 eclipse-mosquitto
- docker run -d -p 11883:1883 -p 9001:9001 -v $(pwd)/mosquitto/private:/mosquitto/config eclipse-mosquitto
- docker run -d -p 1883:1883 -v $(pwd)/mosquitto/anonymous:/mosquitto/config eclipse-mosquitto
install:
- pip install -r requirements.txt
script:
Expand Down
4 changes: 2 additions & 2 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ The keywords in this library are based on some of the methods available in eclip
The tests are in ``tests`` folder and make use of Robot Framework itself. They are run automatically through travis when code is pushed to a branch. When run locally, these tests rely on locally running mqtt brokers. We need 2 running brokers, one without auth that is used by most of the tests, and the other one with auth (configuration file is provided). You'll need to start them before running the tests. You can then run the tests locally::

docker pull eclipse-mosquitto
docker run -d -p 1883:1883 eclipse-mosquitto
docker run -d -p 11883:1883 -p 9001:9001 -v $(pwd)/mosquitto:/mosquitto/config eclipse-mosquitto
docker run -d -p 1883:1883 -v $(pwd)/mosquitto/anonymous:/mosquitto/config eclipse-mosquitto
docker run -d -p 11883:1883 -p 9001:9001 -v $(pwd)/mosquitto/private:/mosquitto/config eclipse-mosquitto
robot -P src tests


Expand Down
2 changes: 2 additions & 0 deletions mosquitto/anonymous/mosquitto.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
listener 1883
allow_anonymous true
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
allow_anonymous false
listener 1883
password_file ./mosquitto/config/passwd_file
File renamed without changes.
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
paho-mqtt~=1.1
robotframework~=3.0
paho-mqtt~=2.0
robotframework~=7.0
108 changes: 69 additions & 39 deletions src/MQTTLibrary/MQTTKeywords.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from paho.mqtt.matcher import MQTTMatcher
import paho.mqtt.client as mqtt
import paho.mqtt.publish as publish
import robot
import time
import re

from robot.libraries.DateTime import convert_time
from robot.api import logger


# https://github.com/eclipse/paho.mqtt.python/blob/1eec03edf39128e461e6729694cf5d7c1959e5e4/src/paho/mqtt/client.py#L250
def topic_matches_sub(sub, topic):
"""Check whether a topic matches a subscription.
Expand All @@ -23,6 +23,7 @@ def topic_matches_sub(sub, topic):
except StopIteration:
return False


class MQTTKeywords(object):

# Timeout used for all blocking loop* functions. This serves as a
Expand All @@ -35,7 +36,7 @@ def __init__(self, loop_timeout=LOOP_TIMEOUT):
self._messages = {}
self._username = None
self._password = None
#self._mqttc = mqtt.Client()
# self._mqttc = mqtt.Client()

def set_username_and_password(self, username, password=None):
self._username = username
Expand Down Expand Up @@ -68,7 +69,11 @@ def connect(self, broker, port=1883, client_id="", clean_session=True):
logger.info('Connecting to %s at port %s' % (broker, port))
self._connected = False
self._unexpected_disconnect = False
self._mqttc = mqtt.Client(client_id, clean_session)
self._mqttc = mqtt.Client(
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
client_id=client_id,
clean_session=clean_session
)

# set callbacks
self._mqttc.on_connect = self._on_connect
Expand All @@ -82,7 +87,7 @@ def connect(self, broker, port=1883, client_id="", clean_session=True):
timer_start = time.time()
while time.time() < timer_start + self._loop_timeout:
if self._connected or self._unexpected_disconnect:
break;
break
self._mqttc.loop()

if self._unexpected_disconnect:
Expand All @@ -108,8 +113,10 @@ def publish(self, topic, message=None, qos=0, retain=False):
| Publish | test/test | test message | 1 | ${false} |

"""
logger.info('Publish topic: %s, message: %s, qos: %s, retain: %s'
% (topic, message, qos, retain))
logger.info(
'Publish topic: %s, message: %s, qos: %s, retain: %s'
% (topic, message, qos, retain)
)
self._mid = -1
self._mqttc.on_publish = self._on_publish
result, mid = self._mqttc.publish(topic, message, int(qos), retain)
Expand All @@ -119,7 +126,7 @@ def publish(self, topic, message=None, qos=0, retain=False):
timer_start = time.time()
while time.time() < timer_start + self._loop_timeout:
if mid == self._mid:
break;
break
self._mqttc.loop()

if mid != self._mid:
Expand All @@ -133,7 +140,8 @@ def subscribe(self, topic, qos, timeout=1, limit=1):

`qos` quality of service for the subscription

`timeout` duration of subscription. Specify 0 to enable background looping (async)
`timeout` duration of subscription. Specify 0 to enable background
looping (async)

`limit` the max number of payloads that will be returned. Specify 0
for no limit
Expand Down Expand Up @@ -180,7 +188,8 @@ def subscribe(self, topic, qos, timeout=1, limit=1):

def listen(self, topic, timeout=1, limit=1):
""" Listen to a topic and return a list of message payloads received
within the specified time. Requires an async Subscribe to have been called previously.
within the specified time. Requires an async Subscribe to have
been called previously.

`topic` topic to listen to

Expand All @@ -202,14 +211,17 @@ def listen(self, topic, timeout=1, limit=1):
timer_start = time.time()
while time.time() < timer_start + self._loop_timeout:
if self._subscribed:
break;
break
time.sleep(1)
if not self._subscribed:
logger.warn('Cannot listen when not subscribed to a topic')
return []

if topic not in self._messages:
logger.warn('Cannot listen when not subscribed to topic: %s' % topic)
logger.warn(
'Cannot listen when not subscribed to topic: %s'
% topic
)
return []

# If enough messages have already been gathered, return them
Expand Down Expand Up @@ -279,7 +291,9 @@ def subscribe_and_validate(self, topic, qos, payload, timeout=1):
self._mqttc.loop()

if not self._verified:
raise AssertionError("The expected payload didn't arrive in the topic")
raise AssertionError(
"The expected payload didn't arrive in the topic"
)

def unsubscribe(self, topic):
""" Unsubscribe the client from the specified topic.
Expand All @@ -291,9 +305,11 @@ def unsubscribe(self, topic):

"""
try:
tmp = self._mqttc
self._mqttc
except AttributeError:
logger.info('No MQTT Client instance found so nothing to unsubscribe from.')
logger.info(
'No MQTT Client instance found so nothing to unsubscribe from.'
)
return

if self._background_mqttc:
Expand Down Expand Up @@ -325,9 +341,11 @@ def disconnect(self):

"""
try:
tmp = self._mqttc
self._mqttc
except AttributeError:
logger.info('No MQTT Client instance found so nothing to disconnect from.')
logger.info(
'No MQTT Client instance found so nothing to disconnect from.'
)
return

self._disconnected = False
Expand All @@ -338,14 +356,16 @@ def disconnect(self):
timer_start = time.time()
while time.time() < timer_start + self._loop_timeout:
if self._disconnected or self._unexpected_disconnect:
break;
break
self._mqttc.loop()
if self._unexpected_disconnect:
raise RuntimeError("The client disconnected unexpectedly")

def publish_single(self, topic, payload=None, qos=0, retain=False,
hostname="localhost", port=1883, client_id="", keepalive=60,
will=None, auth=None, tls=None, protocol=mqtt.MQTTv31):
def publish_single(
self, topic, payload=None, qos=0, retain=False,
hostname="localhost", port=1883, client_id="", keepalive=60,
will=None, auth=None, tls=None, protocol=mqtt.MQTTv5
):

""" Publish a single message and disconnect. This keyword uses the
[http://eclipse.org/paho/clients/python/docs/#single|single]
Expand Down Expand Up @@ -379,7 +399,7 @@ def publish_single(self, topic, payload=None, qos=0, retain=False,
'keyfile':"<keyfile>", 'tls_version':"<tls_version>",
'ciphers':"<ciphers">}

`protocol` MQTT protocol version (MQTTv31 or MQTTv311)
`protocol` MQTT protocol version (MQTTv31, MQTTv311 or MQTTv5)

Example:

Expand All @@ -389,12 +409,16 @@ def publish_single(self, topic, payload=None, qos=0, retain=False,
"""
logger.info('Publishing to: %s:%s, topic: %s, payload: %s, qos: %s' %
(hostname, port, topic, payload, qos))
publish.single(topic, payload, qos, retain, hostname, port,
client_id, keepalive, will, auth, tls, protocol)
publish.single(
topic, payload, qos, retain, hostname, port,
client_id, keepalive, will, auth, tls, protocol
)

def publish_multiple(self, msgs, hostname="localhost", port=1883,
client_id="", keepalive=60, will=None, auth=None,
tls=None, protocol=mqtt.MQTTv31):
def publish_multiple(
self, msgs, hostname="localhost", port=1883,
client_id="", keepalive=60, will=None, auth=None,
tls=None, protocol=mqtt.MQTTv5
):

""" Publish multiple messages and disconnect. This keyword uses the
[http://eclipse.org/paho/clients/python/docs/#multiple|multiple]
Expand Down Expand Up @@ -423,40 +447,46 @@ def publish_multiple(self, msgs, hostname="localhost", port=1883,
"""
logger.info('Publishing to: %s:%s, msgs: %s' %
(hostname, port, msgs))
publish.multiple(msgs, hostname, port, client_id, keepalive,
will, auth, tls, protocol)
publish.multiple(
msgs, hostname, port, client_id, keepalive,
will, auth, tls, protocol
)

def _on_message(self, client, userdata, message):
payload = message.payload.decode('utf-8')
logger.debug('Received message: %s on topic: %s with QoS: %s'
% (payload, message.topic, str(message.qos)))
logger.debug(
'Received message: %s on topic: %s with QoS: %s'
% (payload, message.topic, str(message.qos))
)
self._verified = re.match(self._payload, payload)

def _on_message_list(self, client, userdata, message):
payload = message.payload.decode('utf-8')
logger.debug('Received message: %s on topic: %s with QoS: %s'
% (payload, message.topic, str(message.qos)))
logger.debug(
'Received message: %s on topic: %s with QoS: %s'
% (payload, message.topic, str(message.qos))
)
if message.topic not in self._messages:
self._messages[message.topic] = []
for sub in self._messages:
if topic_matches_sub(sub, message.topic):
self._messages[sub].append(payload)

def _on_connect(self, client, userdata, flags, rc):
self._connected = True if rc == 0 else False
def _on_connect(self, client, userdata, flags, reason_code, properties):
self._connected = True if reason_code == 0 else False

def _on_disconnect(self, client, userdata, rc):
if rc == 0:
def _on_disconnect(self, client, userdata, flags, reason_code, properties):
if reason_code == 0:
self._disconnected = True
self._unexpected_disconnect = False
else:
self._unexpected_disconnect = True

def _on_subscribe(self, client, userdata, mid, granted_qos):
def _on_subscribe(self, client, userdata, mid, reason_codes, properties):
self._subscribed = True

def _on_unsubscribe(self, client, userdata, mid):
def _on_unsubscribe(self, client, userdata, mid, reason_codes, properties):
self._unsubscribed = True

def _on_publish(self, client, userdata, mid):
def _on_publish(self, client, userdata, mid, reason_codes, properties):
self._mid = mid
2 changes: 1 addition & 1 deletion src/MQTTLibrary/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
VERSION = '0.7.1.post3'
VERSION = '0.8.0'
2 changes: 1 addition & 1 deletion tests/connect.robot
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
*** Settings ***
| Library | String
| Library | MQTTLibrary
| Library | ../src/MQTTLibrary/MQTTKeywords.py
| Test Timeout | 30 seconds


Expand Down
9 changes: 4 additions & 5 deletions tests/keywords.robot
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
| *Settings* | *Value*
| Library | MQTTLibrary
| Library | BuiltIn
| Library | ../src/MQTTLibrary/MQTTKeywords.py

| *Variables* | *Value*
#| ${broker.uri} | mqtt.eclipse.org
Expand Down Expand Up @@ -51,7 +50,7 @@
| | Connect | ${broker.uri} | ${port} | ${client.id} | ${false}
| | @{messages} | Subscribe | ${topic} | ${qos} | ${timeout} | ${limit}
| | [Teardown] | Disconnect
| | [Return] | @{messages}
| | RETURN | @{messages}

| Subscribe Async
| | [Arguments] | ${broker.uri}=${broker.uri} | ${port}=${broker.port}
Expand Down Expand Up @@ -82,10 +81,10 @@
| | @{messages} | Subscribe | ${topic} | ${qos} | ${timeout} | ${limit}
| | Unsubscribe | ${topic}
| | [Teardown] | Disconnect
| | [Return] | @{messages}
| | RETURN | @{messages}

| Listen and Get Messages
| | [Arguments] | ${topic}=${topic} | ${timeout}=1s
| | ... | ${limit}=1
| | @{messages} | Listen | ${topic} | ${timeout} | ${limit}
| | [Return] | @{messages}
| | RETURN | @{messages}
2 changes: 1 addition & 1 deletion tests/publish.robot
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
*** Settings ***
| Library | MQTTLibrary
| Library | ../src/MQTTLibrary/MQTTKeywords.py
| Library | Collections
| Test Timeout | 30 seconds

Expand Down