Skip to content

Feed sharing #64

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 37 additions & 10 deletions Adafruit_IO/mqtt_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def __init__(self, username, key, service_host='io.adafruit.com', secure=True):
self.on_connect = None
self.on_disconnect = None
self.on_message = None
self.on_subscribe = None
# Initialize MQTT client.
self._client = mqtt.Client()
if secure:
Expand All @@ -66,6 +67,7 @@ def __init__(self, username, key, service_host='io.adafruit.com', secure=True):
self._client.on_message = self._mqtt_message
self._connected = False


def _mqtt_connect(self, client, userdata, flags, rc):
logger.debug('Client on_connect called.')
# Check if the result code is success (0) or some error (non-zero) and
Expand All @@ -75,7 +77,7 @@ def _mqtt_connect(self, client, userdata, flags, rc):
self._connected = True
print('Connected to Adafruit IO!')
else:
# handle RC errors within `errors.py`'s MQTTError class
# handle RC errors within MQTTError class
raise MQTTError(rc)
# Call the on_connect callback if available.
if self.on_connect is not None:
Expand All @@ -88,6 +90,7 @@ def _mqtt_disconnect(self, client, userdata, rc):
# log the RC as an error. Continue on to call any disconnect handler
# so clients can potentially recover gracefully.
if rc != 0:
print("Unexpected disconnection.")
raise MQTTError(rc)
print('Disconnected from Adafruit IO!')
# Call the on_disconnect callback if available.
Expand All @@ -99,13 +102,17 @@ def _mqtt_message(self, client, userdata, msg):
# Parse out the feed id and call on_message callback.
# Assumes topic looks like "username/feeds/id"
parsed_topic = msg.topic.split('/')
if self.on_message is not None and self._username == parsed_topic[0]:
if self.on_message is not None:
feed = parsed_topic[2]
payload = '' if msg.payload is None else msg.payload.decode('utf-8')
elif self.on_message is not None and parsed_topic[0] == 'time':
feed = parsed_topic[0]
payload = msg.payload.decode('utf-8')
self.on_message(self, feed, payload)

def _mqtt_subscribe(client, userdata, mid, granted_qos):
"""Called when broker responds to a subscribe request."""


def connect(self, **kwargs):
"""Connect to the Adafruit.IO service. Must be called before any loop
Expand Down Expand Up @@ -162,16 +169,24 @@ def loop(self, timeout_sec=1.0):
"""
self._client.loop(timeout=timeout_sec)

def subscribe(self, feed_id):
def subscribe(self, feed_id, feed_user=None):
"""Subscribe to changes on the specified feed. When the feed is updated
the on_message function will be called with the feed_id and new value.

Params:
- feed_id: The id of the feed to update.
- feed_user (optional): The user id of the feed. Used for feed sharing.
"""
self._client.subscribe('{0}/feeds/{1}'.format(self._username, feed_id))
if feed_user is not None:
(res, mid) = self._client.subscribe('{0}/feeds/{1}'.format(feed_user, feed_id))
else:
(res, mid) = self._client.subscribe('{0}/feeds/{1}'.format(self._username, feed_id))
return res, mid

def subscribe_time(self, time):
"""Subscribe to changes on the Adafruit IO time feeds. When the feed is
updated, the on_message function will be called and publish a new value:
time =
time feeds:
millis: milliseconds
seconds: seconds
iso: ISO-8601 (https://en.wikipedia.org/wiki/ISO_8601)
Expand All @@ -181,15 +196,27 @@ def subscribe_time(self, time):
elif time == 'iso':
self._client.subscribe('time/ISO-8601')
else:
print('ERROR: Invalid time type specified')
raise TypeError('Invalid Time Feed Specified.')
return

def unsubscribe(self, feed_id):
"""Unsubscribes from a specified MQTT feed.
Note: this does not prevent publishing to a feed, it will unsubscribe
from receiving messages via on_message.
"""
(res, mid) = self._client.unsubscribe('{0}/feeds/{1}'.format(self._username, feed_id))

def publish(self, feed_id, value):
def publish(self, feed_id, value=None, feed_user=None):
"""Publish a value to a specified feed.

Required parameters:
Params:
- feed_id: The id of the feed to update.
- feed_user (optional): The user id of the feed. Used for feed sharing.
- value: The new value to publish to the feed.
"""
self._client.publish('{0}/feeds/{1}'.format(self._username, feed_id),
payload=value)
if feed_user is not None:
(res, self._pub_mid) = self._client.publish('{0}/feeds/{1}'.format(feed_user, feed_id),
payload=value)
else:
(res, self._pub_mid) = self._client.publish('{0}/feeds/{1}'.format(self._username, feed_id),
payload=value)
14 changes: 14 additions & 0 deletions docs/feed-sharing.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
Feed Sharing
------------
Feed sharing is a feature of Adafruit IO which allows you to share your feeds with people you specify.

If you want to share a feed on your Adafruit IO Account with another user, visit the `Sharing a feed page <https://learn.adafruit.com/adafruit-io-basics-feeds/sharing-a-feed>`_
on the Adafruit Learning System.

The Adafruit IO Python client supports Feed Sharing in the mqtt_client.py class.

Usage Example
~~~~~~~~~~~~~


.. literalinclude:: ../examples/mqtt/mqtt_shared_feeds.py
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Table of Contents
:maxdepth: 6

feeds
feed-sharing
data
groups

Expand Down
74 changes: 74 additions & 0 deletions examples/mqtt/mqtt_shared_feeds.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""
`mqtt_shared_feeds.py`
---------------------------------------------------------
Example of reading and writing to a shared Adafruit IO Feed.

learn.adafruit.com/adafruit-io-basics-feeds/sharing-a-feed

Author: Brent Rubell for Adafruit Industries 2018
"""

# Import standard python modules.
import sys
import time
import random

# Import Adafruit IO MQTT client.
from Adafruit_IO import MQTTClient

# Set to your Adafruit IO key.
# Remember, your key is a secret,
# so make sure not to publish it when you publish this code!
ADAFRUIT_IO_KEY = 'YOUR_AIO_KEY'

# Set to your Adafruit IO username.
# (go to https://accounts.adafruit.com to find your username)
ADAFRUIT_IO_USERNAME = 'YOUR_AIO_USERNAME'

# Shared IO Feed
# Make sure you have read AND write access to this feed to publish.
IO_FEED = 'SHARED_AIO_FEED_NAME'

# IO Feed Owner's username
IO_FEED_USERNAME = 'SHARED_AIO_FEED_USERNAME'


# Define callback functions which will be called when certain events happen.
def connected(client):
"""Connected function will be called when the client connects.
"""
client.subscribe(IO_FEED, IO_FEED_USERNAME)

def disconnected(client):
"""Disconnected function will be called when the client disconnects.
"""
print('Disconnected from Adafruit IO!')
sys.exit(1)

def message(client, feed_id, payload):
"""Message function will be called when a subscribed feed has a new value.
The feed_id parameter identifies the feed, and the payload parameter has
the new value.
"""
print('Feed {0} received new value: {1}'.format(feed_id, payload))


# Create an MQTT client instance.
client = MQTTClient(ADAFRUIT_IO_USERNAME, ADAFRUIT_IO_KEY)

# Setup the callback functions defined above.
client.on_connect = connected
client.on_disconnect = disconnected
client.on_message = message

# Connect to the Adafruit IO server.
client.connect()

client.loop_background()
print('Publishing a new message every 10 seconds (press Ctrl-C to quit)...')

while True:
value = random.randint(0, 100)
print('Publishing {0} to {1}.'.format(value, IO_FEED))
client.publish(IO_FEED, value, IO_FEED_USERNAME)
time.sleep(10)