-
Notifications
You must be signed in to change notification settings - Fork 91
/
Copy pathobserver_async.py
132 lines (90 loc) · 3.9 KB
/
observer_async.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# Code Listing #15
"""
Asynchronouse Observer Example
NOTE: This requires the package aiohttp.
"""
import weakref
import asyncio
import aiohttp
from collections import defaultdict, deque
class NewsPublisher(object):
""" A news publisher class with asynchronous notifications """
def __init__(self):
# News channels
self.channels = defaultdict(deque)
self.subscribers = defaultdict(list)
self.flag = True
def add_news(self, channel, url):
""" Add a news story """
self.channels[channel].append(url)
def register(self, subscriber, channel):
""" Register a subscriber for a news channel """
self.subscribers[channel].append(weakref.proxy(subscriber))
def stop(self):
""" Stop the publisher """
self.flag = False
async def notify(self):
""" Notify subscribers """
self.data_null_count = 0
while self.flag:
# Subscribers who were notified
subs = []
for channel in self.channels:
try:
data = self.channels[channel].popleft()
except IndexError:
self.data_null_count += 1
continue
subscribers = self.subscribers[channel]
for sub in subscribers:
print('Notifying',sub,'on channel',channel,'with data=>',data)
response = await sub.callback(channel, data)
print('Response from',sub,'for channel',channel,'=>',response)
subs.append(sub)
await asyncio.sleep(2.0)
class NewsSubscriber(object):
""" A news subscriber class with asynchronous callbacks """
def __init__(self):
self.stories = {}
self.futures = []
self.future_status = {}
self.flag = True
async def callback(self, channel, data):
""" Callback method """
# The data is a URL
url = data
# We return the response immediately
print('Fetching URL',url,'...')
future = aiohttp.request('GET', url)
self.futures.append(future)
return future
async def fetch_urls(self):
while self.flag:
for future in self.futures:
if self.future_status.get(future):
continue
response = await future
# Read data
data = await response.read()
print('\t',self,'Got data for URL',response.url,'length:',len(data))
self.stories[response.url] = data
# Mark as such
self.future_status[future] = 1
await asyncio.sleep(2.0)
if __name__ == "__main__":
publisher = NewsPublisher()
# Append some stories
publisher.add_news('sports', 'http://www.cricbuzz.com/cricket-news/94018/collective-dd-show-hands-massive-loss-to-kings-xi-punjab')
publisher.add_news('sports', 'https://sports.ndtv.com/indian-premier-league-2017/ipl-2017-this-is-how-virat-kohli-recovered-from-the-loss-against-mumbai-indians-1681955')
publisher.add_news('india','http://www.business-standard.com/article/current-affairs/mumbai-chennai-and-hyderabad-airports-put-on-hijack-alert-report-117041600183_1.html')
publisher.add_news('india','http://timesofindia.indiatimes.com/india/pakistan-to-submit-new-dossier-on-jadhav-to-un-report/articleshow/58204955.cms')
subscriber1 = NewsSubscriber()
subscriber2 = NewsSubscriber()
publisher.register(subscriber1, 'sports')
publisher.register(subscriber2, 'india')
# subscriber.start()
loop = asyncio.get_event_loop()
tasks = map(lambda x: x.fetch_urls(), (subscriber1, subscriber2))
loop.run_until_complete(asyncio.wait([publisher.notify(), *tasks], timeout=120))
print('Ending loop')
loop.close()