-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmqttlistener.py
More file actions
executable file
·86 lines (63 loc) · 2.39 KB
/
mqttlistener.py
File metadata and controls
executable file
·86 lines (63 loc) · 2.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
#!/usr/bin/env python3
# Subscribe to an MQTT message, which should be containing JSON, and pretty print it.
# pylint: disable=missing-module-docstring,missing-class-docstring,missing-function-docstring
# Justification: Don't care about docstrings in this little utility.
import argparse
import json
import signal
import threading
import paho.mqtt.client as mqtt
# pylint: disable=consider-using-with
# Justification: Cannot use with in this context
waiter = threading.Lock()
waiter.acquire()
def _signal_handler(_sig, _frame):
waiter.release()
class MQTTListener:
# pylint: disable=too-few-public-methods
# Justification: Really only need the one
def __init__(self, host, port, topic):
self._client = self._init_client()
self._topic = topic
self._client.connect_async(host, port)
self._client.loop_start()
def stop(self):
self._client.loop_stop()
def _init_client(self):
client = mqtt.Client()
client.on_connect = self._on_connect
client.on_message = self._on_message
return client
def _on_connect(self, client, _userdata, _flags, _rc):
client.subscribe(self._topic)
@staticmethod
def _on_message(_client, _userdata, msg):
try:
payload = {}
if len(msg.payload) > 0:
payload = json.loads(msg.payload)
print(json.dumps(payload, indent=2, sort_keys=True))
except json.decoder.JSONDecodeError:
payload = msg.payload.decode("utf-8")
print(f"{payload}")
except UnicodeDecodeError:
print("Could not decode the message")
def _parse_command_line(args: list):
parser = argparse.ArgumentParser()
parser.add_argument("--host", default="localhost", type=str, help="MQTT host")
parser.add_argument("--port", default=1883, type=int, help="MQTT port")
parser.add_argument("topic", type=str, help="Topic to subscribe")
return parser.parse_args(args)
def run(args: list = None):
options = _parse_command_line(args)
print(f"Listening for '{options.topic}'...")
listener = MQTTListener(options.host, options.port, options.topic)
# Wait for a cntr-c
try:
signal.signal(signal.SIGTERM, _signal_handler)
signal.signal(signal.SIGINT, _signal_handler)
waiter.acquire()
except KeyboardInterrupt:
waiter.release()
listener.stop()
run()