@@ -52,6 +52,7 @@ def __init__(self, username, key, service_host='io.adafruit.com', secure=True):
52
52
self .on_connect = None
53
53
self .on_disconnect = None
54
54
self .on_message = None
55
+ self .on_subscribe = None
55
56
# Initialize MQTT client.
56
57
self ._client = mqtt .Client ()
57
58
if secure :
@@ -66,6 +67,7 @@ def __init__(self, username, key, service_host='io.adafruit.com', secure=True):
66
67
self ._client .on_message = self ._mqtt_message
67
68
self ._connected = False
68
69
70
+
69
71
def _mqtt_connect (self , client , userdata , flags , rc ):
70
72
logger .debug ('Client on_connect called.' )
71
73
# Check if the result code is success (0) or some error (non-zero) and
@@ -75,7 +77,7 @@ def _mqtt_connect(self, client, userdata, flags, rc):
75
77
self ._connected = True
76
78
print ('Connected to Adafruit IO!' )
77
79
else :
78
- # handle RC errors within `errors.py`'s MQTTError class
80
+ # handle RC errors within MQTTError class
79
81
raise MQTTError (rc )
80
82
# Call the on_connect callback if available.
81
83
if self .on_connect is not None :
@@ -88,6 +90,7 @@ def _mqtt_disconnect(self, client, userdata, rc):
88
90
# log the RC as an error. Continue on to call any disconnect handler
89
91
# so clients can potentially recover gracefully.
90
92
if rc != 0 :
93
+ print ("Unexpected disconnection." )
91
94
raise MQTTError (rc )
92
95
print ('Disconnected from Adafruit IO!' )
93
96
# Call the on_disconnect callback if available.
@@ -99,13 +102,17 @@ def _mqtt_message(self, client, userdata, msg):
99
102
# Parse out the feed id and call on_message callback.
100
103
# Assumes topic looks like "username/feeds/id"
101
104
parsed_topic = msg .topic .split ('/' )
102
- if self .on_message is not None and self . _username == parsed_topic [ 0 ] :
105
+ if self .on_message is not None :
103
106
feed = parsed_topic [2 ]
104
107
payload = '' if msg .payload is None else msg .payload .decode ('utf-8' )
105
108
elif self .on_message is not None and parsed_topic [0 ] == 'time' :
106
109
feed = parsed_topic [0 ]
107
110
payload = msg .payload .decode ('utf-8' )
108
111
self .on_message (self , feed , payload )
112
+
113
+ def _mqtt_subscribe (client , userdata , mid , granted_qos ):
114
+ """Called when broker responds to a subscribe request."""
115
+
109
116
110
117
def connect (self , ** kwargs ):
111
118
"""Connect to the Adafruit.IO service. Must be called before any loop
@@ -162,16 +169,24 @@ def loop(self, timeout_sec=1.0):
162
169
"""
163
170
self ._client .loop (timeout = timeout_sec )
164
171
165
- def subscribe (self , feed_id ):
172
+ def subscribe (self , feed_id , feed_user = None ):
166
173
"""Subscribe to changes on the specified feed. When the feed is updated
167
174
the on_message function will be called with the feed_id and new value.
175
+
176
+ Params:
177
+ - feed_id: The id of the feed to update.
178
+ - feed_user (optional): The user id of the feed. Used for feed sharing.
168
179
"""
169
- self ._client .subscribe ('{0}/feeds/{1}' .format (self ._username , feed_id ))
180
+ if feed_user is not None :
181
+ (res , mid ) = self ._client .subscribe ('{0}/feeds/{1}' .format (feed_user , feed_id ))
182
+ else :
183
+ (res , mid ) = self ._client .subscribe ('{0}/feeds/{1}' .format (self ._username , feed_id ))
184
+ return res , mid
170
185
171
186
def subscribe_time (self , time ):
172
187
"""Subscribe to changes on the Adafruit IO time feeds. When the feed is
173
188
updated, the on_message function will be called and publish a new value:
174
- time =
189
+ time feeds:
175
190
millis: milliseconds
176
191
seconds: seconds
177
192
iso: ISO-8601 (https://en.wikipedia.org/wiki/ISO_8601)
@@ -181,15 +196,27 @@ def subscribe_time(self, time):
181
196
elif time == 'iso' :
182
197
self ._client .subscribe ('time/ISO-8601' )
183
198
else :
184
- print ( 'ERROR: Invalid time type specified ' )
199
+ raise TypeError ( ' Invalid Time Feed Specified. ' )
185
200
return
201
+
202
+ def unsubscribe (self , feed_id ):
203
+ """Unsubscribes from a specified MQTT feed.
204
+ Note: this does not prevent publishing to a feed, it will unsubscribe
205
+ from receiving messages via on_message.
206
+ """
207
+ (res , mid ) = self ._client .unsubscribe ('{0}/feeds/{1}' .format (self ._username , feed_id ))
186
208
187
- def publish (self , feed_id , value ):
209
+ def publish (self , feed_id , value = None , feed_user = None ):
188
210
"""Publish a value to a specified feed.
189
211
190
- Required parameters :
212
+ Params :
191
213
- feed_id: The id of the feed to update.
214
+ - feed_user (optional): The user id of the feed. Used for feed sharing.
192
215
- value: The new value to publish to the feed.
193
216
"""
194
- self ._client .publish ('{0}/feeds/{1}' .format (self ._username , feed_id ),
195
- payload = value )
217
+ if feed_user is not None :
218
+ (res , self ._pub_mid ) = self ._client .publish ('{0}/feeds/{1}' .format (feed_user , feed_id ),
219
+ payload = value )
220
+ else :
221
+ (res , self ._pub_mid ) = self ._client .publish ('{0}/feeds/{1}' .format (self ._username , feed_id ),
222
+ payload = value )
0 commit comments