1313from common .metric_config import MetricConfig , MetricLabelKey , MetricLabels
1414from common .metrics_handler import MetricsHandler
1515
16- MAX_RETRIES = 3
16+ MAX_RETRIES = 2
1717
1818
1919class WebSocketMetric (BaseMetric ):
@@ -59,27 +59,39 @@ async def collect_metric(self) -> None:
5959 """Collects single WebSocket message."""
6060 websocket = None
6161
62- try :
62+ async def _collect_ws_data ():
63+ nonlocal websocket
6364 websocket = await self .connect ()
6465 await self .subscribe (websocket )
6566 data = await self .listen_for_data (websocket )
66-
67+
6768 if data is not None :
68- latency : int | float = self .process_data (data )
69- self .update_metric_value (latency )
70- self .mark_success ()
71- return
69+ return data
7270 raise ValueError ("No data in response" )
7371
72+ try :
73+ data = await asyncio .wait_for (
74+ _collect_ws_data (),
75+ timeout = self .config .timeout
76+ )
77+ latency : int | float = self .process_data (data )
78+ self .update_metric_value (latency )
79+ self .mark_success ()
80+
81+ except asyncio .TimeoutError :
82+ self .mark_failure ()
83+ self .handle_error (TimeoutError (f"WebSocket metric collection exceeded { self .config .timeout } s timeout" ))
84+
7485 except Exception as e :
7586 self .mark_failure ()
7687 self .handle_error (e )
7788
7889 finally :
7990 if websocket :
8091 try :
81- await self .unsubscribe (websocket )
82- await websocket .close ()
92+ # Shield cleanup from cancellation to ensure proper resource cleanup
93+ await asyncio .shield (self .unsubscribe (websocket ))
94+ await asyncio .shield (websocket .close ())
8395 except Exception as e :
8496 logging .error (f"Error closing websocket: { e !s} " )
8597
@@ -97,13 +109,19 @@ def get_endpoint(self) -> str:
97109
98110 async def collect_metric (self ) -> None :
99111 try :
100- data = await self .fetch_data ()
112+ data = await asyncio .wait_for (
113+ self .fetch_data (),
114+ timeout = self .config .timeout
115+ )
101116 if data is not None :
102117 latency : int | float = self .process_data (data )
103118 self .update_metric_value (latency )
104119 self .mark_success ()
105120 return
106121 raise ValueError ("No data in response" )
122+ except asyncio .TimeoutError :
123+ self .mark_failure ()
124+ self .handle_error (TimeoutError (f"Metric collection exceeded { self .config .timeout } s timeout" ))
107125 except Exception as e :
108126 self .mark_failure ()
109127 self .handle_error (e )
@@ -205,7 +223,6 @@ async def on_request_end(session, context, params):
205223 trace_config .on_request_end .append (on_request_end )
206224
207225 async with aiohttp .ClientSession (
208- timeout = aiohttp .ClientTimeout (total = self .config .timeout ),
209226 trace_configs = [trace_config ],
210227 ) as session :
211228 response_time = 0.0
@@ -219,7 +236,7 @@ async def on_request_end(session, context, params):
219236 response_time : float = time .monotonic () - start_time
220237
221238 if response .status == 429 and retry_count < MAX_RETRIES - 1 :
222- wait_time = int (response .headers .get ("Retry-After" , 15 ))
239+ wait_time = int (response .headers .get ("Retry-After" , 3 ))
223240 await response .release ()
224241 await asyncio .sleep (wait_time )
225242 continue
@@ -239,12 +256,7 @@ async def on_request_end(session, context, params):
239256
240257 total_time = response_time * 1000
241258
242- # Log breakdown
243- provider = self .labels .get_label (MetricLabelKey .PROVIDER )
244- method = self .labels .get_label (MetricLabelKey .API_METHOD )
245- print (
246- f"[{ provider } ] { method } timing: DNS={ dns_time :.0f} ms, Connect={ conn_time :.0f} ms, Total={ total_time :.0f} ms, Endpoint={ endpoint } "
247- )
259+ # Log breakdown removed - use proper logging if needed
248260
249261 if not response :
250262 raise ValueError ("No response received" )
@@ -279,7 +291,6 @@ async def _send_request(
279291 "Content-Type" : "application/json" ,
280292 },
281293 json = self ._base_request ,
282- timeout = self .config .timeout , # type: ignore
283294 )
284295
285296 def process_data (self , value : float ) -> float :
0 commit comments