2323# TODO NEXT subscribe is not calling deriv_api_calls. that's , args not verified. can we improve it ?
2424# TODO list these features missed
2525# middleware is missed
26- # events is missed
2726
2827logging .basicConfig (
2928 format = "%(asctime)s %(message)s" ,
@@ -76,6 +75,8 @@ class DerivAPI(DerivAPICalls):
7675 ----------
7776 storage : Cache
7877 If specified, uses a more persistent cache (local storage, etc.)
78+ events: Observable
79+ An Observable object that will send data when events like 'connect', 'send', 'message' happen
7980 """
8081
8182 storage : None
@@ -89,6 +90,7 @@ def __init__(self, **options: str) -> None:
8990 self .wsconnection : Optional [WebSocketClientProtocol ] = None
9091 self .wsconnection_from_inside = True
9192 self .shouldReconnect = False
93+ self .events : Subject = Subject ()
9294 if options .get ('connection' ):
9395 self .wsconnection : Optional [WebSocketClientProtocol ] = options .get ('connection' )
9496 self .wsconnection_from_inside = False
@@ -137,8 +139,8 @@ async def __wait_data(self):
137139 self .sanity_errors .on_next (err )
138140 continue
139141 response = json .loads (data )
140- # TODO NEXT add self.events stream
141142
143+ self .events .on_next ({'name' : 'message' , 'data' : response })
142144 # TODO NEXT onopen onclose, can be set by await connection
143145 req_id = response .get ('req_id' , None )
144146 if not req_id or req_id not in self .pending_requests :
@@ -226,6 +228,7 @@ async def api_connect(self) -> websockets.WebSocketClientProtocol:
226228 Returns websockets.WebSocketClientProtocol
227229 """
228230 if not self .wsconnection and self .shouldReconnect :
231+ self .events .on_text ({'name' : 'connect' })
229232 self .wsconnection = await websockets .connect (self .api_url )
230233 if self .connected .is_pending ():
231234 self .connected .resolve (True )
@@ -246,6 +249,8 @@ async def send(self, request: dict) -> dict:
246249 -------
247250 API response
248251 """
252+
253+ self .events .on_next ({'name' : 'send' , 'data' : request })
249254 response_future = self .send_and_get_source (request ).pipe (op .first (), op .to_future ())
250255
251256 response = await response_future
@@ -351,6 +356,7 @@ async def disconnect(self) -> None:
351356 if self .wsconnection_from_inside :
352357 # TODO NEXT reconnect feature
353358 self .shouldReconnect = False
359+ self .events .on_next ({'name' : 'close' })
354360 await self .wsconnection .close ()
355361
356362 def expect_response (self , * msg_types ):
0 commit comments