diff --git a/common/metric_types.py b/common/metric_types.py index 42ba5fd..d86dc8a 100644 --- a/common/metric_types.py +++ b/common/metric_types.py @@ -13,7 +13,7 @@ from common.metric_config import MetricConfig, MetricLabelKey, MetricLabels from common.metrics_handler import MetricsHandler -MAX_RETRIES = 3 +MAX_RETRIES = 2 class WebSocketMetric(BaseMetric): @@ -59,18 +59,29 @@ async def collect_metric(self) -> None: """Collects single WebSocket message.""" websocket = None - try: + async def _collect_ws_data(): + nonlocal websocket websocket = await self.connect() await self.subscribe(websocket) data = await self.listen_for_data(websocket) - + if data is not None: - latency: int | float = self.process_data(data) - self.update_metric_value(latency) - self.mark_success() - return + return data raise ValueError("No data in response") + try: + data = await asyncio.wait_for( + _collect_ws_data(), + timeout=self.config.timeout + ) + latency: int | float = self.process_data(data) + self.update_metric_value(latency) + self.mark_success() + + except asyncio.TimeoutError: + self.mark_failure() + self.handle_error(TimeoutError(f"WebSocket metric collection exceeded {self.config.timeout}s timeout")) + except Exception as e: self.mark_failure() self.handle_error(e) @@ -78,8 +89,9 @@ async def collect_metric(self) -> None: finally: if websocket: try: - await self.unsubscribe(websocket) - await websocket.close() + # Shield cleanup from cancellation to ensure proper resource cleanup + await asyncio.shield(self.unsubscribe(websocket)) + await asyncio.shield(websocket.close()) except Exception as e: logging.error(f"Error closing websocket: {e!s}") @@ -97,13 +109,19 @@ def get_endpoint(self) -> str: async def collect_metric(self) -> None: try: - data = await self.fetch_data() + data = await asyncio.wait_for( + self.fetch_data(), + timeout=self.config.timeout + ) if data is not None: latency: int | float = self.process_data(data) self.update_metric_value(latency) self.mark_success() return raise ValueError("No data in response") + except asyncio.TimeoutError: + self.mark_failure() + self.handle_error(TimeoutError(f"Metric collection exceeded {self.config.timeout}s timeout")) except Exception as e: self.mark_failure() self.handle_error(e) @@ -205,7 +223,6 @@ async def on_request_end(session, context, params): trace_config.on_request_end.append(on_request_end) async with aiohttp.ClientSession( - timeout=aiohttp.ClientTimeout(total=self.config.timeout), trace_configs=[trace_config], ) as session: response_time = 0.0 @@ -219,7 +236,7 @@ async def on_request_end(session, context, params): response_time: float = time.monotonic() - start_time if response.status == 429 and retry_count < MAX_RETRIES - 1: - wait_time = int(response.headers.get("Retry-After", 15)) + wait_time = int(response.headers.get("Retry-After", 3)) await response.release() await asyncio.sleep(wait_time) continue @@ -239,12 +256,7 @@ async def on_request_end(session, context, params): total_time = response_time * 1000 - # Log breakdown - provider = self.labels.get_label(MetricLabelKey.PROVIDER) - method = self.labels.get_label(MetricLabelKey.API_METHOD) - print( - f"[{provider}] {method} timing: DNS={dns_time:.0f}ms, Connect={conn_time:.0f}ms, Total={total_time:.0f}ms, Endpoint={endpoint}" - ) + # Log breakdown removed - use proper logging if needed if not response: raise ValueError("No response received") @@ -279,7 +291,6 @@ async def _send_request( "Content-Type": "application/json", }, json=self._base_request, - timeout=self.config.timeout, # type: ignore ) def process_data(self, value: float) -> float: diff --git a/common/state/blockchain_fetcher.py b/common/state/blockchain_fetcher.py index 749c2e0..c497e13 100644 --- a/common/state/blockchain_fetcher.py +++ b/common/state/blockchain_fetcher.py @@ -47,7 +47,7 @@ async def _make_rpc_request( for attempt in range(1, self._max_retries + 1): try: - async with aiohttp.ClientSession(timeout=self._timeout) as session: + async with aiohttp.ClientSession() as session: async with session.post( self.http_endpoint, headers=self._headers, json=request ) as response: