1
- # Copyright (c) 2014 Adafruit Industries
2
- # Author: Tony DiCola
1
+ # Copyright (c) 2020 Adafruit Industries
2
+ # Author: Tony DiCola, Brent Rubell
3
3
4
4
# Permission is hereby granted, free of charge, to any person obtaining a copy
5
5
# of this software and associated documentation files (the "Software"), to deal
@@ -43,9 +43,10 @@ class MQTTClient(object):
43
43
def __init__ (self , username , key , service_host = 'io.adafruit.com' , secure = True ):
44
44
"""Create instance of MQTT client.
45
45
46
- :param username: Adafruit.IO Username for your account.
47
- :param key: Adafruit IO access key (AIO Key) for your account.
48
- :param secure: (optional, boolean) Switches secure/insecure connections
46
+ :param username: Adafruit.IO Username for your account.
47
+ :param key: Adafruit IO access key (AIO Key) for your account.
48
+ :param secure: (optional, boolean) Switches secure/insecure connections
49
+
49
50
"""
50
51
self ._username = username
51
52
self ._service_host = service_host
@@ -70,6 +71,7 @@ def __init__(self, username, key, service_host='io.adafruit.com', secure=True):
70
71
self ._client .on_connect = self ._mqtt_connect
71
72
self ._client .on_disconnect = self ._mqtt_disconnect
72
73
self ._client .on_message = self ._mqtt_message
74
+ self ._client .on_subscribe = self ._mqtt_subscribe
73
75
self ._connected = False
74
76
75
77
@@ -95,7 +97,7 @@ def _mqtt_disconnect(self, client, userdata, rc):
95
97
# log the RC as an error. Continue on to call any disconnect handler
96
98
# so clients can potentially recover gracefully.
97
99
if rc != 0 :
98
- print (" Unexpected disconnection." )
100
+ print (' Unexpected disconnection.' )
99
101
raise MQTTError (rc )
100
102
print ('Disconnected from Adafruit IO!' )
101
103
# Call the on_disconnect callback if available.
@@ -105,6 +107,7 @@ def _mqtt_disconnect(self, client, userdata, rc):
105
107
def _mqtt_message (self , client , userdata , msg ):
106
108
"""Parse out the topic and call on_message callback
107
109
assume topic looks like `username/topic/id`
110
+
108
111
"""
109
112
logger .debug ('Client on_message called.' )
110
113
parsed_topic = msg .topic .split ('/' )
@@ -124,15 +127,19 @@ def _mqtt_message(self, client, userdata, msg):
124
127
else :
125
128
raise ValueError ('on_message not defined' )
126
129
self .on_message (self , topic , payload )
127
-
128
- def _mqtt_subscribe (client , userdata , mid , granted_qos ):
130
+
131
+ def _mqtt_subscribe (self , client , userdata , mid , granted_qos ):
129
132
"""Called when broker responds to a subscribe request."""
133
+ logger .debug ('Client called on_subscribe' )
134
+ if self .on_subscribe is not None :
135
+ self .on_subscribe (self , userdata , mid , granted_qos )
130
136
131
137
def connect (self , ** kwargs ):
132
138
"""Connect to the Adafruit.IO service. Must be called before any loop
133
139
or publish operations are called. Will raise an exception if a
134
140
connection cannot be made. Optional keyword arguments will be passed
135
141
to paho-mqtt client connect function.
142
+
136
143
"""
137
144
# Skip calling connect if already connected.
138
145
if self ._connected :
@@ -145,6 +152,7 @@ def connect(self, **kwargs):
145
152
146
153
def is_connected (self ):
147
154
"""Returns True if connected to Adafruit.IO and False if not connected.
155
+
148
156
"""
149
157
return self ._connected
150
158
@@ -157,9 +165,9 @@ def loop_background(self, stop=None):
157
165
"""Starts a background thread to listen for messages from Adafruit.IO
158
166
and call the appropriate callbacks when feed events occur. Will return
159
167
immediately and will not block execution. Should only be called once.
160
-
161
- Params:
162
- - stop: boolean, stops the execution of the background loop.
168
+
169
+ :param bool stop: Stops the execution of the background loop.
170
+
163
171
"""
164
172
if stop :
165
173
self ._client .loop_stop ()
@@ -174,6 +182,7 @@ def loop_blocking(self):
174
182
listen and respond to Adafruit.IO feed events. If you need to do other
175
183
processing, consider using the loop_background function to run a loop
176
184
in the background.
185
+
177
186
"""
178
187
self ._client .loop_forever ()
179
188
@@ -185,28 +194,36 @@ def loop(self, timeout_sec=1.0):
185
194
The optional timeout_sec parameter specifies at most how long to block
186
195
execution waiting for messages when this function is called. The default
187
196
is one second.
197
+
188
198
"""
189
199
self ._client .loop (timeout = timeout_sec )
190
200
191
- def subscribe (self , feed_id , feed_user = None ):
201
+ def subscribe (self , feed_id , feed_user = None , qos = 0 ):
192
202
"""Subscribe to changes on the specified feed. When the feed is updated
193
203
the on_message function will be called with the feed_id and new value.
194
204
195
- Params:
196
- - feed_id: The id of the feed to subscribe to.
197
- - feed_user (optional): The user id of the feed. Used for feed sharing functionality.
205
+ :param str feed_id: The key of the feed to subscribe to.
206
+ :param str feed_user: Optional, identifies feed owner. Used for feed sharing.
207
+ :param int qos: The QoS to use when subscribing. Defaults to 0.
208
+
198
209
"""
210
+ if qos > 1 :
211
+ raise MQTTError ("Adafruit IO only supports a QoS level of 0 or 1." )
199
212
if feed_user is not None :
200
- (res , mid ) = self ._client .subscribe ('{0}/feeds/{1}' .format (feed_user , feed_id ))
213
+ (res , mid ) = self ._client .subscribe ('{0}/feeds/{1}' .format (feed_user , feed_id , qos = qos ))
201
214
else :
202
- (res , mid ) = self ._client .subscribe ('{0}/feeds/{1}' .format (self ._username , feed_id ))
215
+ (res , mid ) = self ._client .subscribe ('{0}/feeds/{1}' .format (self ._username , feed_id ), qos = qos )
203
216
return res , mid
204
217
205
- def subscribe_group (self , group_id ):
218
+ def subscribe_group (self , group_id , qos = 0 ):
206
219
"""Subscribe to changes on the specified group. When the group is updated
207
220
the on_message function will be called with the group_id and the new value.
221
+
222
+ :param str feed_id: The key of the feed to subscribe to.
223
+ :param int qos: The QoS to use when subscribing. Defaults to 0.
224
+
208
225
"""
209
- self ._client .subscribe ('{0}/groups/{1}' .format (self ._username , group_id ))
226
+ self ._client .subscribe ('{0}/groups/{1}' .format (self ._username , group_id ), qos = qos )
210
227
211
228
def subscribe_randomizer (self , randomizer_id ):
212
229
"""Subscribe to changes on a specified random data stream from
@@ -216,6 +233,7 @@ def subscribe_randomizer(self, randomizer_id):
216
233
every client that is subscribed to the same topic.
217
234
218
235
:param int randomizer_id: ID of the random word record you want data for.
236
+
219
237
"""
220
238
self ._client .subscribe ('{0}/integration/words/{1}' .format (self ._username , randomizer_id ))
221
239
0 commit comments