@@ -255,16 +255,16 @@ def process_data(self, block) -> float:
255255
256256
257257class WSLogLatencyMetric (WebSocketMetric ):
258- """Collects log latency for EVM providers using predictable log events.
258+ """Collects log latency for EVM providers using Transfer events.
259259
260- This metric subscribes to Transfer events from USDT contracts, which have
261- predictable, consistent message sizes across all supported chains .
260+ This metric subscribes to Transfer events from major token contracts,
261+ gets the first event, immediately unsubscribes, and calculates latency .
262262 """
263263
264- # Use USDT (Tether) which is available on all chains
264+ # Use major tokens which have frequent but not overwhelming transfer activity
265265 TOKEN_CONTRACTS : dict [str , str ] = {
266266 "ethereum" : "0xdAC17F958D2ee523a2206206994597C13D831ec7" , # USDT
267- "base" : "0xfde4C96c8593536E31F229EA8f37b2ADa2699bb2 " , # USDT
267+ "base" : "0x833589fCD6eDb6E08f4c7C32D4f71b54bdA02913 " , # USDC
268268 "arbitrum" : "0xFd086bC7CD5C481DCC9C85ebE478A1C0b69FCbb9" , # USDT
269269 "bnb" : "0x55d398326f99059fF775485246999027B3197955" , # BSC-USD
270270 }
@@ -302,7 +302,6 @@ def __init__(
302302 self .token_contract : str = self .TOKEN_CONTRACTS ["ethereum" ]
303303
304304 self .labels .update_label (MetricLabelKey .API_METHOD , "eth_subscribe_logs" )
305- self .first_event_received = False # Flag to track first event
306305
307306 async def send_with_timeout (self , websocket , message : str , timeout : float ) -> None :
308307 """Send a message with a timeout."""
@@ -329,7 +328,7 @@ async def recv_with_timeout(self, websocket, timeout: float) -> str:
329328 )
330329
331330 async def subscribe (self , websocket ) -> None :
332- """Subscribe to Transfer logs from USDT contract ."""
331+ """Subscribe to Transfer logs from major token contracts ."""
333332 subscription_msg : str = json .dumps (
334333 {
335334 "id" : 1 ,
@@ -376,15 +375,15 @@ async def unsubscribe(self, websocket) -> None:
376375 logging .warning (f"Error during unsubscribe: { e } " )
377376
378377 async def listen_for_data (self , websocket ) -> Optional [Any ]:
379- """Listen for the FIRST log event only and extract block information."""
380- while not self .first_event_received :
381- response : str = await self .recv_with_timeout (websocket , WS_DEFAULT_TIMEOUT )
382- response_data = json .loads (response )
378+ """Listen for the FIRST log event only and immediately unsubscribe."""
379+ response : str = await self .recv_with_timeout (websocket , WS_DEFAULT_TIMEOUT )
380+ response_data = json .loads (response )
383381
384- if "params" in response_data and "result" in response_data ["params" ]:
385- log_data = response_data ["params" ]["result" ]
386- self .first_event_received = True # Mark that we got the first event
387- return log_data
382+ if "params" in response_data and "result" in response_data ["params" ]:
383+ log_data = response_data ["params" ]["result" ]
384+ # Immediately unsubscribe after getting first event
385+ await self .unsubscribe (websocket )
386+ return log_data
388387
389388 return None
390389
@@ -412,7 +411,7 @@ async def collect_metric(self) -> None:
412411 finally :
413412 if websocket :
414413 try :
415- await self . unsubscribe ( websocket )
414+ # Don't call unsubscribe here since it's already called in listen_for_data
416415 await websocket .close ()
417416 except Exception as e :
418417 logging .error (f"Error closing websocket: { e !s} " )
0 commit comments