Skip to content

Commit 596e463

Browse files
committed
broker.py: Agents can have args.
1 parent 709663d commit 596e463

File tree

3 files changed

+166
-55
lines changed

3 files changed

+166
-55
lines changed

v3/docs/DRIVERS.md

Lines changed: 122 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ MicroPython's `asyncio` when used in a microcontroller context.
2929
9. [Message Broker](./DRIVERS.md#9-message-broker) A flexible means of messaging between
3030
tasks.
3131
9.1 [Further examples](./DRIVERS.md#91-further-examples)
32-
9.2 [User agents](./DRIVERS.md#92-user-agents)
32+
9.2 [User agents](./DRIVERS.md#92-user-agents) User defined Agent classes.
33+
9.3 [Notes](./DRIVERS.md#93-notes)
3334
10. [Additional functions](./DRIVERS.md#10-additional-functions)
3435
10.1 [launch](./DRIVERS.md#101-launch) Run a coro or callback interchangeably.
3536
10.2 [set_global_exception](./DRIVERS.md#102-set_global_exception) Simplify debugging with a global exception handler.
@@ -1027,6 +1028,9 @@ def add_item(q, data):
10271028

10281029
# 8. Delay_ms class
10291030

1031+
```python
1032+
from primitives import Delay_ms # delay_ms.py
1033+
```
10301034
This implements the software equivalent of a retriggerable monostable or a
10311035
watchdog timer. It has an internal boolean `running` state. When instantiated
10321036
the `Delay_ms` instance does nothing, with `running` `False` until triggered.
@@ -1132,50 +1136,70 @@ finally:
11321136

11331137
# 9. Message Broker
11341138

1135-
This is under development: please check for updates. See
1136-
[code](https://github.com/peterhinch/micropython-async/blob/master/v3/primitives/broker.py).
1137-
```py
1138-
from primitives import Broker
1139+
This is under development: please check for updates.
1140+
1141+
```python
1142+
from primitives import Broker # broker.py
11391143
```
11401144
The `Broker` class provides a flexible means of messaging between running tasks.
11411145
It uses a publish-subscribe model (akin to MQTT) whereby the transmitting task
11421146
publishes to a topic. Any tasks subscribed to that topic will receive the
1143-
message. This enables one to one, one to many or many to many messaging.
1147+
message. This enables one to one, one to many, many to one or many to many
1148+
messaging.
11441149

11451150
A task subscribes to a topic with an `agent`. This is stored by the broker. When
11461151
the broker publishes a message, the `agent` of each task subscribed to its topic
11471152
will be triggered. In the simplest case the `agent` is a `Queue` instance: the
11481153
broker puts the topic and message onto the subscriber's queue for retrieval.
11491154

11501155
More advanced agents can perform actions in response to a message, such as
1151-
calling a function or launching a `task`.
1156+
calling a function, launching a `task` or lighting an LED.
1157+
1158+
#### Broker methods
11521159

1153-
Broker methods. All are synchronous, constructor has no args:
1154-
* `subscribe(topic, agent)` Passed `agent` will be triggered by messages with a
1155-
matching `topic`.
1156-
* `unsubscribe(topic, agent)` The `agent` will stop being triggered.
1160+
All are synchronous. They are not threadsafe so should not be called from a hard
1161+
ISR or from another thread. The constructor has no args.
1162+
* `subscribe(topic, agent, *args)` Passed `agent` will be triggered by messages
1163+
with a matching `topic`. Any additional args will be passed to the `agent` when
1164+
it is triggered.
1165+
* `unsubscribe(topic, agent, *args)` The `agent` will stop being triggered. If
1166+
args were passed on subscription, the same args must be passed.
11571167
* `publish(topic, message)` All `agent` instances subscribed to `topic` will be
1158-
triggered, receiving `topic` and `message` args. The method is not threadsafe;
1159-
it should not be called from a hard ISR or from another thread.
1168+
triggered, receiving `topic` and `message` plus any further args that were
1169+
passed to `subscribe`.
11601170

11611171
The `topic` arg is typically a string but may be any hashable object. A
1162-
`message` is an arbitrary Python object. An `agent` may be any of the following:
1163-
* `Queue` When a message is received receives 2-tuple `(topic, message)`.
1164-
* `RingbufQueue` When a message is received receives 2-tuple `(topic, message)`.
1165-
* `function` Called when a message is received. Gets 2 args, topic and message.
1166-
* `bound method` Called when a message is received. Gets 2 args, topic and
1167-
message.
1168-
* `coroutine` Task created when a message is received with 2 args, topic and
1169-
message.
1170-
* `bound coroutine` Task created when a message is received with 2 args, topic
1171-
and message.
1172-
* Instance of a user class. See user agents below.
1172+
`message` is an arbitrary Python object.
1173+
1174+
#### Agent types
1175+
1176+
An `agent` may be any of the following:
1177+
1178+
* `Queue` When a message is received it receives 2-tuple `(topic, message)`. If
1179+
extra args were passed on subscription the queue receives a 3-tuple.
1180+
`(topic, message, (args...))`.
1181+
* `RingbufQueue` When a message is received it receives 2-tuple `(topic, message)`.
1182+
If extra args were passed on subscription it receives a 3-tuple,
1183+
`(topic, message, (args...))`.
1184+
* `function` Called when a message is received. Args: topic, message plus any
1185+
further args.
1186+
* `bound method` Called when a message is received. Args: topic, message plus any
1187+
further args.
1188+
* `coroutine` Converted to a `task` when a message is received. Args: topic,
1189+
message plus any further args.
1190+
* `bound coroutine` Converted to a `task` when a message is received. Args: topic,
1191+
message plus any further args.
1192+
* `user_agent` Instance of a user class. See user agents below.
11731193
* `Event` Set when a message is received.
11741194

11751195
Note that synchronous `agent` instances must run to completion quickly otherwise
11761196
the `publish` method will be slowed.
11771197

1178-
The following is a simple example:
1198+
#### Broker class variable
1199+
1200+
* `Verbose=True` Enables printing of debug messages.
1201+
1202+
#### example
11791203
```py
11801204
import asyncio
11811205
from primitives import Broker, Queue
@@ -1212,32 +1236,100 @@ async def messages(client):
12121236
Assuming the MQTT client is subscribed to multiple topics, message strings are
12131237
directed to individual tasks each supporting one topic.
12141238

1239+
The following illustrates a use case for `agent` args.
1240+
```py
1241+
import asyncio
1242+
from primitives import Broker
1243+
from machine import Pin
1244+
red = Pin("A13", Pin.OUT, value=0) # Pin nos. for Pyboard V1.1
1245+
green = Pin("A14", Pin.OUT, value=0)
1246+
broker = Broker()
1247+
1248+
async def flash():
1249+
broker.publish("led", 1)
1250+
await asyncio.sleep(1)
1251+
broker.publish("led", 0)
1252+
1253+
def recv(topic, message, led):
1254+
led(message) # Light or extinguish an LED
1255+
1256+
async def main():
1257+
broker.subscribe("led", recv, red)
1258+
broker.subscribe("led", recv, green)
1259+
for _ in range(10):
1260+
await flash()
1261+
await asyncio.sleep(1)
1262+
broker.unsubscribe("led", recv, green) # Arg(s) must be passed
1263+
for _ in range(3):
1264+
await flash()
1265+
await asyncio.sleep(1)
1266+
1267+
asyncio.run(main())
1268+
```
1269+
12151270
## 9.2 User agents
12161271

12171272
An `agent` can be an instance of a user class. The class must be a subclass of
1218-
`Agent`, and it must support a synchronous `.put` method. The latter takes two
1219-
args, being `topic` and `message`. It should run to completion quickly.
1273+
`Agent`, and it must support a synchronous `.put` method. Arguments are `topic`
1274+
and `message`, followed by any further args passed on subscription. The method
1275+
should run to completion quickly.
12201276

12211277
```py
12221278
import asyncio
12231279
from primitives import Broker, Agent
12241280

12251281
broker = Broker()
12261282
class MyAgent(Agent):
1227-
def put(sef, topic, message):
1228-
print(f"User agent. Topic: {topic} Message: {message}")
1283+
def put(sef, topic, message, arg):
1284+
print(f"User agent. Topic: {topic} Message: {message} Arg: {arg}")
12291285

12301286
async def sender(t):
12311287
for x in range(t):
12321288
await asyncio.sleep(1)
12331289
broker.publish("foo_topic", f"test {x}")
12341290

12351291
async def main():
1236-
broker.subscribe("foo_topic", MyAgent())
1292+
broker.subscribe("foo_topic", MyAgent(), 42)
12371293
await sender(10)
12381294

12391295
asyncio.run(main())
12401296
```
1297+
## 9.3 Notes
1298+
1299+
#### The publish/subscribe model
1300+
1301+
As in the real world publication carries no guarantee of reception. If at the
1302+
time of publication there are no tasks with subscribed `agent` instances, the
1303+
message will silently be lost.
1304+
1305+
#### agent arguments
1306+
1307+
Arguments must be hashable objects. Mutable objects such as lists and
1308+
dictionaries are not permitted. If an object can be added to a `set` it is
1309+
valid. In general, interfaces such as `Pin` instances are OK.
1310+
1311+
#### agent uniqueness
1312+
1313+
An `agent` can be subscribed to multiple `topic`s. An `agent` may be subscribed
1314+
to a `topic` multiple times only if each instance has different arguments.
1315+
1316+
#### queues
1317+
1318+
If a message causes a queue to fill, a message will silently be lost. It is the
1319+
responsibility of the subscriber to avoid this. In the case of a `Queue`
1320+
instance the lost message is the one causing the overflow. In the case of
1321+
`RingbufQueue` the oldest message in the queue is discarded. In some
1322+
applications this behaviour is preferable.
1323+
1324+
#### exceptions
1325+
1326+
An instance of an `agent` objects is owned by a subscribing tasks but is
1327+
executed by a publishing task. If a function used as an `agent` throws an
1328+
exception, the traceback will point to a `Broker.publish` call.
1329+
1330+
The `Broker` class does not throw exceptions. There are a number of non-fatal
1331+
conditions which can occur such as a queue overflow or an attempt to unsubscribe
1332+
an `agent` twice. The `Broker` will report these if `Broker.Verboase=True`.
12411333

12421334
###### [Contents](./DRIVERS.md#0-contents)
12431335

v3/primitives/broker.py

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,36 +15,45 @@ class Agent:
1515

1616

1717
class Broker(dict):
18-
def subscribe(self, topic, agent):
19-
if not self.get(topic, False):
20-
self[topic] = {agent}
21-
else:
22-
self[topic].add(agent)
18+
Verbose = True
2319

24-
def unsubscribe(self, topic, agent):
25-
try:
26-
self[topic].remove(agent)
20+
def subscribe(self, topic, agent, *args):
21+
aa = (agent, args)
22+
if not (t := self.get(topic, False)):
23+
self[topic] = {aa}
24+
else:
25+
if aa in t and Broker.Verbose:
26+
print(f"Duplicate agent {aa} in topic {topic}.")
27+
t.add(aa)
28+
29+
def unsubscribe(self, topic, agent, *args):
30+
if topic in self:
31+
if (aa := (agent, args)) in self[topic]:
32+
self[topic].remove(aa)
33+
elif Broker.Verbose:
34+
print(f"Unsubscribe agent {aa} from topic {topic} fail: agent not subscribed.")
2735
if len(self[topic]) == 0:
2836
del self[topic]
29-
except KeyError:
30-
pass # Topic already removed
37+
elif Broker.Verbose:
38+
print(f"Unsubscribe topic {topic} fail: topic not subscribed.")
3139

3240
def publish(self, topic, message):
3341
agents = self.get(topic, [])
34-
for agent in agents:
42+
for agent, args in agents:
3543
if isinstance(agent, asyncio.Event):
3644
agent.set()
3745
continue
3846
if isinstance(agent, Agent): # User class
39-
agent.put(topic, message) # Must support .put
47+
agent.put(topic, message, *args) # Must support .put
4048
continue
4149
if isinstance(agent, Queue) or isinstance(agent, RingbufQueue):
50+
t = (topic, message, args)
4251
try:
43-
agent.put_nowait((topic, message))
44-
except Exception: # TODO
45-
pass
52+
agent.put_nowait(t if args else t[:2])
53+
except Exception: # Queue discards current message. RingbufQueue discards oldest
54+
Broker.verbose and print(f"Message lost topic {topic} message {message}")
4655
continue
4756
# agent is function, method, coroutine or bound coroutine
48-
res = agent(topic, message)
57+
res = agent(topic, message, *args)
4958
if isinstance(res, type_coro):
5059
asyncio.create_task(res)

v3/primitives/tests/broker_test.py

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@ async def event_test():
3939

4040

4141
class TestClass:
42-
async def fetch_data(self, topic, message):
42+
async def fetch_data(self, topic, message, arg1, arg2):
4343
await asyncio.sleep_ms(100)
44-
print("bound coro", topic, message)
44+
print("bound coro", topic, message, arg1, arg2)
4545

4646
def get_data(self, topic, message):
4747
print("bound method", topic, message)
@@ -53,16 +53,21 @@ async def print_queue(q):
5353
print(topic, message)
5454

5555

56+
async def print_ringbuf_q(q):
57+
async for topic, message, args in q:
58+
print(topic, message, args)
59+
60+
5661
async def main():
5762
tc = TestClass()
5863
q = Queue(10)
5964
rq = RingbufQueue(10)
6065
print("Subscribing Event, coroutine, Queue, RingbufQueue and bound coroutine.")
61-
broker.subscribe("foo_topic", tc.fetch_data) # Bound coroutine
66+
broker.subscribe("foo_topic", tc.fetch_data, 1, 42) # Bound coroutine
6267
broker.subscribe("bar_topic", subs) # Coroutine
6368
broker.subscribe("bar_topic", event)
6469
broker.subscribe("foo_topic", q)
65-
broker.subscribe("bar_topic", rq)
70+
broker.subscribe("bar_topic", rq, "args", "added")
6671

6772
asyncio.create_task(test(30)) # Publish to topics for 30s
6873
asyncio.create_task(event_test())
@@ -83,24 +88,29 @@ async def main():
8388
broker.unsubscribe("bar_topic", func)
8489
print()
8590
print("Unsubscribing bound coroutine")
86-
broker.unsubscribe("foo_topic", tc.fetch_data) # Async method
91+
broker.unsubscribe("foo_topic", tc.fetch_data, 1, 42) # Async method
8792
print()
8893
print("Subscribing method")
8994
broker.subscribe("foo_topic", tc.get_data) # Sync method
9095
await asyncio.sleep(5)
9196
print()
9297
print("Unsubscribing method")
9398
broker.unsubscribe("foo_topic", tc.get_data) # Async method
94-
print("Pause 5s")
95-
await asyncio.sleep(5)
99+
# print("Pause 5s")
100+
# await asyncio.sleep(5)
96101
print("Retrieving foo_topic messages from Queue")
97102
try:
98103
await asyncio.wait_for(print_queue(q), 5)
99104
except asyncio.TimeoutError:
100105
print("Timeout")
101106
print("Retrieving bar_topic messages from RingbufQueue")
102-
async for topic, message in rq:
103-
print(topic, message)
107+
try:
108+
await asyncio.wait_for(print_ringbuf_q(rq), 5)
109+
except asyncio.TimeoutError:
110+
print("Timeout")
111+
print("Check error on invalid unsubscribe")
112+
broker.unsubscribe("rats", "more rats") # Invalid topic
113+
broker.unsubscribe("foo_topic", "rats") # Invalid agent
104114

105115

106116
asyncio.run(main())

0 commit comments

Comments
 (0)