|
11 | 11 | from paho.mqtt.client import ( |
12 | 12 | LOGGING_LEVEL, |
13 | 13 | MQTT_ERR_SUCCESS, |
| 14 | + CallbackAPIVersion, |
14 | 15 | Client, |
15 | 16 | MQTTv5, |
16 | 17 | MQTTv31, |
@@ -58,7 +59,8 @@ def get_instance(cls) -> "MQTTConnection": |
58 | 59 |
|
59 | 60 | def __init__(self): |
60 | 61 | self.client = Client( |
61 | | - "pai" + os.urandom(8).hex(), |
| 62 | + CallbackAPIVersion.VERSION2, |
| 63 | + client_id="pai" + os.urandom(8).hex(), |
62 | 64 | protocol=protocol_map.get(str(cfg.MQTT_PROTOCOL), MQTTv311), |
63 | 65 | transport=cfg.MQTT_TRANSPORT, |
64 | 66 | ) |
@@ -197,30 +199,27 @@ def _report_pai_status(self, status): |
197 | 199 | retain=True, |
198 | 200 | ) |
199 | 201 |
|
200 | | - def _on_connect_cb(self, client, userdata, flags, result, properties=None): |
| 202 | + def _on_connect_cb(self, client, userdata, connect_flags, reason_code, properties=None): |
201 | 203 | # called on Thread-6 |
202 | | - if result == MQTT_ERR_SUCCESS: |
| 204 | + if not reason_code.is_failure: |
203 | 205 | logger.info("MQTT Broker Connected") |
204 | 206 | self.state = ConnectionState.CONNECTED |
205 | 207 | self._report_pai_status(self._last_pai_status) |
206 | | - self._call_registars("on_connect", client, userdata, flags, result) |
| 208 | + self._call_registars("on_connect", client, userdata, connect_flags, reason_code, properties) |
207 | 209 | else: |
208 | 210 | logger.error( |
209 | | - f"Failed to connect to MQTT: {connack_string(result)} ({result})" |
| 211 | + f"Failed to connect to MQTT: {connack_string(reason_code)} ({reason_code})" |
210 | 212 | ) |
211 | 213 |
|
212 | | - def _on_disconnect_cb(self, client, userdata, *args, **kwargs): |
| 214 | + def _on_disconnect_cb(self, client, userdata, disconnect_flags, reason_code, properties=None): |
213 | 215 | # called on Thread-6 |
214 | | - # Handle different MQTT version signatures by using the first argument as rc |
215 | | - rc = args[0] if args else MQTT_ERR_SUCCESS |
216 | | - |
217 | | - if rc == MQTT_ERR_SUCCESS: |
| 216 | + if not reason_code.is_failure: |
218 | 217 | logger.info("MQTT Broker Disconnected") |
219 | 218 | else: |
220 | | - logger.error(f"MQTT Broker unexpectedly disconnected. Code: {rc}") |
| 219 | + logger.error(f"MQTT Broker unexpectedly disconnected. Code: {reason_code}") |
221 | 220 |
|
222 | 221 | self.state = ConnectionState.NEW |
223 | | - self._call_registars("on_disconnect", self.client, userdata, rc) |
| 222 | + self._call_registars("on_disconnect", self.client, userdata, disconnect_flags, reason_code, properties) |
224 | 223 |
|
225 | 224 | def disconnect(self, reasoncode=None, properties=None): |
226 | 225 | self.state = ConnectionState.DISCONNECTING |
@@ -305,10 +304,10 @@ def subscribe_callback(self, sub, callback: typing.Callable): |
305 | 304 | self.mqtt.message_callback_add(sub, callback) |
306 | 305 | self.mqtt.subscribe(sub) |
307 | 306 |
|
308 | | - def on_disconnect(self, client, userdata, rc): |
| 307 | + def on_disconnect(self, client, userdata, disconnect_flags, reason_code, properties=None): |
309 | 308 | """Called from MQTT connection""" |
310 | 309 | pass |
311 | 310 |
|
312 | | - def on_connect(self, client, userdata, flags, result): |
| 311 | + def on_connect(self, client, userdata, connect_flags, reason_code, properties=None): |
313 | 312 | """Called from MQTT connection""" |
314 | 313 | pass |
0 commit comments