Skip to content

Commit ab85d52

Browse files
committed
Add support for multiple concurrent WebSocket connections
This commit implements multi-connection support for the Python Deriv API, allowing users to: - Create and manage multiple WebSocket connections - Send requests through specific connections - Subscribe to market data on dedicated connections - Handle connection-specific errors - Monitor events across all connections Key additions: - Connection class to encapsulate individual WebSocket connections - ConnectionManager to handle multiple connections - Updated SubscriptionManager for connection-specific subscriptions - Added comprehensive documentation and examples - Added unit and integration tests for the new functionality The implementation is fully backward compatible with existing code.
1 parent 7e8dd0f commit ab85d52

File tree

10 files changed

+1722
-228
lines changed

10 files changed

+1722
-228
lines changed

deriv_api/connection.py

Lines changed: 332 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,332 @@
1+
import asyncio
2+
import json
3+
import logging
4+
from asyncio import Future
5+
from typing import Dict, Optional, Union, Coroutine
6+
7+
import websockets
8+
from reactivex import operators as op
9+
from reactivex.subject import Subject
10+
from reactivex import Observable
11+
from websockets.legacy.client import WebSocketClientProtocol
12+
from websockets.exceptions import ConnectionClosedOK, ConnectionClosed
13+
from websockets.frames import Close
14+
15+
from deriv_api.easy_future import EasyFuture
16+
from deriv_api.errors import APIError, ConstructionError, ResponseError, AddedTaskError
17+
from deriv_api.utils import is_valid_url
18+
19+
class Connection:
20+
"""
21+
Encapsulates a single WebSocket connection to the Deriv API.
22+
23+
This class handles the low-level connection management and message handling
24+
for a single WebSocket connection to the Deriv API.
25+
26+
Parameters
27+
----------
28+
connection_id : int
29+
Unique identifier for this connection
30+
options : dict
31+
Connection configuration options
32+
endpoint : str
33+
API server to connect to
34+
app_id : str
35+
Application ID of the API user
36+
connection : WebSocketClientProtocol
37+
A ready to use connection (optional)
38+
lang : str
39+
Language of the API communication
40+
brand : str
41+
Brand name
42+
auto_reconnect : bool
43+
Whether to automatically reconnect on connection loss
44+
45+
Properties
46+
----------
47+
events : Subject
48+
An Observable object that will send data when events like 'connect', 'send', 'message' happen
49+
connected : EasyFuture
50+
A future that resolves when the connection is established
51+
"""
52+
53+
def __init__(self, connection_id: int, **options):
54+
self.connection_id = connection_id
55+
self.endpoint = options.get('endpoint', 'ws.derivws.com')
56+
self.lang = options.get('lang', 'EN')
57+
self.brand = options.get('brand', '')
58+
self.app_id = options.get('app_id')
59+
self.auto_reconnect = options.get('auto_reconnect', False)
60+
self.max_retry_count = options.get('max_retry_count', 5)
61+
62+
self.wsconnection: Optional[WebSocketClientProtocol] = None
63+
self.wsconnection_from_inside = True
64+
self.events = Subject()
65+
66+
if options.get('connection'):
67+
self.wsconnection = options.get('connection')
68+
self.wsconnection_from_inside = False
69+
else:
70+
if not self.app_id:
71+
raise ConstructionError('An app_id is required to connect to the API')
72+
73+
self.api_url = self._build_api_url()
74+
75+
self.req_id = 0
76+
self.pending_requests: Dict[str, Subject] = {}
77+
self.connected = EasyFuture()
78+
self.receive_task = None
79+
self.reconnect_task = None
80+
self.is_closing = False
81+
82+
def _build_api_url(self) -> str:
83+
"""
84+
Construct the WebSocket URL for the API.
85+
86+
Returns
87+
-------
88+
str
89+
The complete WebSocket URL
90+
"""
91+
endpoint_url = self.get_url(self.endpoint)
92+
return f"{endpoint_url}/websockets/v3?app_id={self.app_id}&l={self.lang}&brand={self.brand}"
93+
94+
def get_url(self, original_endpoint: str) -> str:
95+
"""
96+
Validate and return the URL.
97+
98+
Parameters
99+
----------
100+
original_endpoint : str
101+
Endpoint argument passed to constructor
102+
103+
Returns
104+
-------
105+
str
106+
Returns API URL. If validation fails then throws ConstructionError
107+
"""
108+
if not isinstance(original_endpoint, str):
109+
raise ConstructionError(f"Endpoint must be a string, passed: {type(original_endpoint)}")
110+
111+
import re
112+
match = re.match(r'((?:\w*://)*)(.*)', original_endpoint).groups()
113+
protocol = match[0] if match[0] == "ws://" else "wss://"
114+
endpoint = match[1]
115+
116+
url = protocol + endpoint
117+
if not is_valid_url(url):
118+
raise ConstructionError(f'Invalid URL:{original_endpoint}')
119+
120+
return url
121+
122+
async def connect(self):
123+
"""
124+
Establish the WebSocket connection.
125+
126+
Returns
127+
-------
128+
WebSocketClientProtocol
129+
The established WebSocket connection
130+
"""
131+
if not self.wsconnection and self.wsconnection_from_inside:
132+
self.events.on_next({'name': 'connect', 'connection_id': self.connection_id})
133+
self.wsconnection = await websockets.connect(self.api_url)
134+
135+
if self.connected.is_pending():
136+
self.connected.resolve(True)
137+
else:
138+
self.connected = EasyFuture().resolve(True)
139+
140+
# Start the message receiving task
141+
self.receive_task = asyncio.create_task(self._receive_messages())
142+
143+
return self.wsconnection
144+
145+
async def disconnect(self):
146+
"""
147+
Disconnect the WebSocket connection.
148+
"""
149+
self.is_closing = True
150+
151+
if not self.connected.is_resolved():
152+
return
153+
154+
self.connected = EasyFuture().reject(ConnectionClosedOK(None, Close(1000, 'Closed by disconnect')))
155+
self.connected.exception() # Fetch exception to avoid the warning of 'exception never retrieved'
156+
157+
if self.wsconnection_from_inside and self.wsconnection:
158+
self.events.on_next({'name': 'close', 'connection_id': self.connection_id})
159+
await self.wsconnection.close()
160+
self.wsconnection = None
161+
162+
# Cancel tasks
163+
if self.receive_task and not self.receive_task.done():
164+
self.receive_task.cancel()
165+
166+
if self.reconnect_task and not self.reconnect_task.done():
167+
self.reconnect_task.cancel()
168+
169+
async def _receive_messages(self):
170+
"""
171+
Receive and process messages from the WebSocket.
172+
"""
173+
await self.connected
174+
175+
while self.connected.is_resolved():
176+
try:
177+
data = await self.wsconnection.recv()
178+
await self._process_message(data)
179+
except ConnectionClosed as err:
180+
if self.connected.is_resolved():
181+
self.connected = EasyFuture().reject(err)
182+
self.connected.exception() # Call it to hide the warning of 'exception never retrieved'
183+
184+
self.events.on_next({'name': 'connection_closed', 'connection_id': self.connection_id, 'error': err})
185+
186+
# Attempt reconnection if configured to do so
187+
if self.auto_reconnect and not self.is_closing:
188+
self.reconnect_task = asyncio.create_task(self._attempt_reconnection())
189+
190+
break
191+
except asyncio.CancelledError:
192+
# Task was cancelled, exit gracefully
193+
break
194+
except Exception as err:
195+
self.events.on_next({'name': 'error', 'connection_id': self.connection_id, 'error': err})
196+
197+
async def _process_message(self, data):
198+
"""
199+
Process a message received from the WebSocket.
200+
201+
Parameters
202+
----------
203+
data : str
204+
The raw message data received from the WebSocket
205+
"""
206+
response = json.loads(data)
207+
208+
self.events.on_next({'name': 'message', 'connection_id': self.connection_id, 'data': response})
209+
210+
req_id = response.get('req_id', None)
211+
if not req_id or req_id not in self.pending_requests:
212+
self.events.on_next({'name': 'unmatched_response', 'connection_id': self.connection_id, 'data': response})
213+
return
214+
215+
request = response.get('echo_req', {})
216+
217+
# Check for error in response
218+
is_parent_subscription = request and request.get('proposal_open_contract') and not request.get('contract_id')
219+
if response.get('error') and not is_parent_subscription:
220+
self.pending_requests[req_id].on_error(ResponseError(response))
221+
return
222+
223+
# Handle completed subscriptions
224+
if self.pending_requests[req_id].is_stopped and response.get('subscription'):
225+
# Source is already marked as completed. In this case we should
226+
# send a forget request with the subscription id and ignore the response received.
227+
subs_id = response['subscription']['id']
228+
self.events.on_next({'name': 'forget_subscription', 'connection_id': self.connection_id, 'subscription_id': subs_id})
229+
return
230+
231+
# Forward the response to the appropriate Subject
232+
self.pending_requests[req_id].on_next(response)
233+
234+
async def _attempt_reconnection(self):
235+
"""
236+
Attempt to reconnect with exponential backoff.
237+
"""
238+
retry_delay = 1 # Start with 1 second delay
239+
max_delay = 60 # Maximum delay of 60 seconds
240+
retries = 0
241+
242+
while retries < self.max_retry_count and not self.is_closing:
243+
try:
244+
self.events.on_next({
245+
'name': 'reconnecting',
246+
'connection_id': self.connection_id,
247+
'attempt': retries + 1
248+
})
249+
250+
await asyncio.sleep(retry_delay)
251+
252+
# Try to reconnect
253+
self.wsconnection = await websockets.connect(self.api_url)
254+
self.connected = EasyFuture().resolve(True)
255+
256+
# Restart message receiving
257+
self.receive_task = asyncio.create_task(self._receive_messages())
258+
259+
self.events.on_next({
260+
'name': 'reconnected',
261+
'connection_id': self.connection_id
262+
})
263+
264+
return True
265+
except Exception as err:
266+
retries += 1
267+
retry_delay = min(retry_delay * 2, max_delay) # Exponential backoff
268+
269+
self.events.on_next({
270+
'name': 'reconnect_failed',
271+
'connection_id': self.connection_id,
272+
'error': err,
273+
'attempt': retries
274+
})
275+
276+
# All retries failed
277+
self.events.on_next({
278+
'name': 'reconnect_max_retries_exceeded',
279+
'connection_id': self.connection_id
280+
})
281+
282+
return False
283+
284+
async def send(self, request: dict) -> dict:
285+
"""
286+
Send a request and get the response.
287+
288+
Parameters
289+
----------
290+
request : dict
291+
The API request to send
292+
293+
Returns
294+
-------
295+
dict
296+
The API response
297+
"""
298+
response_future = self.send_and_get_source(request).pipe(op.first(), op.to_future())
299+
return await response_future
300+
301+
def send_and_get_source(self, request: dict) -> Subject:
302+
"""
303+
Send a message and return a Subject that will emit the response.
304+
305+
Parameters
306+
----------
307+
request : dict
308+
The API request to send
309+
310+
Returns
311+
-------
312+
Subject
313+
A Subject that will emit the response
314+
"""
315+
pending = Subject()
316+
317+
if 'req_id' not in request:
318+
self.req_id += 1
319+
request['req_id'] = self.req_id
320+
321+
self.pending_requests[request['req_id']] = pending
322+
323+
async def send_message():
324+
try:
325+
await self.connected
326+
await self.wsconnection.send(json.dumps(request))
327+
self.events.on_next({'name': 'send', 'connection_id': self.connection_id, 'data': request})
328+
except Exception as err:
329+
pending.on_error(err)
330+
331+
asyncio.create_task(send_message())
332+
return pending

0 commit comments

Comments
 (0)