Skip to content

Commit 8322ab6

Browse files
committed
fix: add more strict timeouts
1 parent 08cd50c commit 8322ab6

File tree

2 files changed

+28
-6
lines changed

2 files changed

+28
-6
lines changed

common/metric_types.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,9 @@ async def connect(self) -> Any:
4949
"""Creates WebSocket connection."""
5050
websocket: websockets.WebSocketClientProtocol = await websockets.connect(
5151
self.ws_endpoint, # type: ignore
52-
ping_timeout=self.config.timeout,
53-
close_timeout=self.config.timeout,
52+
ping_timeout=10, # self.config.timeout,
53+
open_timeout=10, # self.config.timeout,
54+
close_timeout=10, # self.config.timeout,
5455
)
5556
return websocket
5657

metrics/ethereum.py

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
from common.metric_config import MetricConfig, MetricLabelKey, MetricLabels
99
from common.metric_types import HttpCallLatencyMetricBase, WebSocketMetric
1010

11+
WS_DEFAULT_TIMEOUT = 10
12+
1113

1214
class HTTPEthCallLatencyMetric(HttpCallLatencyMetricBase):
1315
"""Collects transaction latency for eth_call simulation."""
@@ -141,6 +143,24 @@ def __init__(
141143
)
142144
self.labels.update_label(MetricLabelKey.API_METHOD, "eth_subscribe")
143145

146+
async def send_with_timeout(self, websocket, message: str, timeout: float) -> None:
147+
"""Send a message with a timeout."""
148+
try:
149+
await asyncio.wait_for(websocket.send(message), timeout)
150+
except asyncio.TimeoutError:
151+
raise TimeoutError(
152+
f"WebSocket message send timed out after {timeout} seconds"
153+
)
154+
155+
async def recv_with_timeout(self, websocket, timeout: float) -> str:
156+
"""Receive a message with a timeout."""
157+
try:
158+
return await asyncio.wait_for(websocket.recv(), timeout)
159+
except asyncio.TimeoutError:
160+
raise TimeoutError(
161+
f"WebSocket message reception timed out after {timeout} seconds"
162+
)
163+
144164
async def subscribe(self, websocket) -> None:
145165
"""Subscribe to the newHeads event on the WebSocket endpoint.
146166
@@ -158,8 +178,8 @@ async def subscribe(self, websocket) -> None:
158178
"params": ["newHeads"],
159179
}
160180
)
161-
await websocket.send(subscription_msg)
162-
response = await websocket.recv()
181+
await self.send_with_timeout(websocket, subscription_msg, WS_DEFAULT_TIMEOUT)
182+
response: str = await self.recv_with_timeout(websocket, WS_DEFAULT_TIMEOUT)
163183
subscription_data = json.loads(response)
164184
if subscription_data.get("result") is None:
165185
raise ValueError("Subscription to newHeads failed")
@@ -183,7 +203,8 @@ async def unsubscribe(self, websocket) -> None:
183203
"params": [self.subscription_id],
184204
}
185205
)
186-
await websocket.send(unsubscribe_msg)
206+
await self.send_with_timeout(websocket, unsubscribe_msg, WS_DEFAULT_TIMEOUT)
207+
await self.recv_with_timeout(websocket, WS_DEFAULT_TIMEOUT)
187208

188209
async def listen_for_data(self, websocket):
189210
"""Listen for a single data message from the WebSocket and process block latency.
@@ -197,7 +218,7 @@ async def listen_for_data(self, websocket):
197218
Raises:
198219
asyncio.TimeoutError: If no message received within timeout period
199220
"""
200-
response = await asyncio.wait_for(websocket.recv(), timeout=self.config.timeout)
221+
response: str = await self.recv_with_timeout(websocket, WS_DEFAULT_TIMEOUT)
201222
response_data = json.loads(response)
202223
if "params" in response_data:
203224
block = response_data["params"]["result"]

0 commit comments

Comments
 (0)