@@ -1141,22 +1141,26 @@ from primitives import Broker # broker.py
11411141```
11421142The ` Broker ` class provides a flexible means of messaging between running tasks.
11431143It uses a publish-subscribe model (akin to MQTT) whereby the transmitting task
1144- publishes to a topic. Any tasks subscribed to that topic will receive the
1145- message. This enables one to one, one to many, many to one or many to many
1146- messaging.
1144+ publishes to a topic. Objects subscribed to that topic will receive the message.
1145+ This enables one to one, one to many, many to one or many to many messaging.
11471146
1148- A task subscribes to a topic with an ` agent ` . This is stored by the broker. When
1147+ A task subscribes to a topic via an ` agent ` . This is stored by the broker. When
11491148the broker publishes a message, every ` agent ` subscribed to the message topic
11501149will be triggered. In the simplest case the ` agent ` is a ` Queue ` instance: the
11511150broker puts the topic and message onto the subscriber's queue for retrieval.
11521151
11531152More advanced agents can perform actions in response to a message, such as
11541153calling a function, launching a ` task ` or lighting an LED.
11551154
1155+ Agents may be subscribed and unsubscribed dynamically. The publishing task has
1156+ no "knowledge" of the number or type of agents subscribed to a topic. The module
1157+ is not threadsafe: ` Broker ` methods should not be called from a hard ISR or from
1158+ another thread.
1159+
11561160#### Broker methods
11571161
1158- All are synchronous. They are not threadsafe so should not be called from a hard
1159- ISR or from another thread. The constructor has no args.
1162+ All are synchronous.
1163+ * Constructor This has no args.
11601164* ` subscribe(topic, agent, *args) ` Passed ` agent ` will be triggered by messages
11611165with a matching ` topic ` . Any additional args will be passed to the ` agent ` when
11621166it is triggered.
@@ -1172,21 +1176,21 @@ The `topic` arg is typically a string but may be any hashable object. A
11721176#### Agent types
11731177
11741178An ` agent ` may be an instance of any of the following types. Args refers to any
1175- arguments passed to the ` agent ` 's' subscription.
1179+ arguments passed to the ` agent ` on subscription.
11761180
11771181* ` RingbufQueue ` Received messages are queued as a 2-tuple ` (topic, message) `
1178- assuming no args.
1179- * ` Queue ` Received messages are queued as a 2-tuple ` (topic, message) ` .
1182+ assuming no subscription args - otheriwse ` (topic, message, (args...)) ` .
1183+ * ` Queue ` Received messages are queued as described above .
11801184* ` function ` Called when a message is received. Args: ` topic ` , ` message ` plus any
1181- further args.
1185+ further subscription args.
11821186* ` bound method ` Called when a message is received. Args: ` topic ` , ` message `
11831187plus any further args.
11841188* ` coroutine ` Converted to a ` task ` when a message is received. Args: ` topic ` ,
1185- ` message ` plus any further args.
1189+ ` message ` plus any further subscription args.
11861190* ` bound coroutine ` Converted to a ` task ` when a message is received. Args: ` topic ` ,
1187- ` message ` plus any further args.
1188- * ` user_agent ` Instance of a user class. See user agents below.
1191+ ` message ` plus any further subscription args.
11891192* ` Event ` Set when a message is received.
1193+ * ` user_agent ` Instance of a user class. See user agents below.
11901194
11911195Note that synchronous ` agent ` instances must run to completion quickly otherwise
11921196the ` publish ` method will be slowed. See [ Notes] ( ./DRIVERS.md#93-notes ) for
@@ -1202,18 +1206,18 @@ import asyncio
12021206from primitives import Broker, RingbufQueue
12031207
12041208broker = Broker()
1205- queue = RingbufQueue(20 )
12061209async def sender (t ):
12071210 for x in range (t):
12081211 await asyncio.sleep(1 )
12091212 broker.publish(" foo_topic" , f " test { x} " )
12101213
12111214async def receiver ():
1215+ queue = RingbufQueue(20 )
1216+ broker.subscribe(" foo_topic" , queue)
12121217 async for topic, message in queue:
12131218 print (topic, message)
12141219
12151220async def main ():
1216- broker.subscribe(" foo_topic" , queue)
12171221 rx = asyncio.create_task(receiver())
12181222 await sender(10 )
12191223 await asyncio.sleep(2 )
@@ -1266,6 +1270,40 @@ async def main():
12661270
12671271asyncio.run(main())
12681272```
1273+ A task can wait on multiple topics using a ` RingbufQueue ` :
1274+ ``` python
1275+ import asyncio
1276+ from primitives import Broker, RingbufQueue
1277+
1278+ broker = Broker()
1279+
1280+ async def receiver ():
1281+ q = RingbufQueue(10 )
1282+ broker.subscribe(" foo_topic" , q)
1283+ broker.subscribe(" bar_topic" , q)
1284+ async for topic, message in q:
1285+ print (f " Received Topic: { topic} Message: { message} " )
1286+
1287+
1288+ async def sender (t ):
1289+ for x in range (t):
1290+ await asyncio.sleep(1 )
1291+ broker.publish(" foo_topic" , f " test { x} " )
1292+ broker.publish(" bar_topic" , f " test { x} " )
1293+ broker.publish(" ignore me" , f " test { x} " )
1294+
1295+
1296+ async def main ():
1297+ rx = asyncio.create_task(receiver())
1298+ await sender(10 )
1299+ await asyncio.sleep(2 )
1300+ rx.cancel()
1301+
1302+
1303+ asyncio.run(main())
1304+ ```
1305+ here the ` receiver ` task waits on two topics. The asynchronous iterator returns
1306+ messages as they are published.
12691307
12701308## 9.2 User agents
12711309
@@ -1298,7 +1336,7 @@ asyncio.run(main())
12981336
12991337#### The publish/subscribe model
13001338
1301- As in the real world publication carries no guarantee of reception . If at the
1339+ As in the real world, publication carries no guarantee of readership . If at the
13021340time of publication there are no tasks with subscribed ` agent ` instances, the
13031341message will silently be lost.
13041342
0 commit comments