Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions api/read/arbitrum.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@
HTTPTxReceiptLatencyMetric,
)
from metrics.ethereum import (
WSBlockLatencyMetric,
# WSBlockLatencyMetric,
WSLogLatencyMetric,
)

metric_name = f"{MetricsServiceConfig.METRIC_PREFIX}response_latency_seconds"

METRICS = [
(WSBlockLatencyMetric, metric_name),
# (WSBlockLatencyMetric, metric_name),
(WSLogLatencyMetric, metric_name),
(HTTPBlockNumberLatencyMetric, metric_name),
(HTTPEthCallLatencyMetric, metric_name),
(HTTPAccBalanceLatencyMetric, metric_name),
Expand Down
6 changes: 4 additions & 2 deletions api/read/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@
HTTPTxReceiptLatencyMetric,
)
from metrics.ethereum import (
WSBlockLatencyMetric,
# WSBlockLatencyMetric,
WSLogLatencyMetric,
)

metric_name = f"{MetricsServiceConfig.METRIC_PREFIX}response_latency_seconds"

METRICS = [
(WSBlockLatencyMetric, metric_name),
# (WSBlockLatencyMetric, metric_name),
(WSLogLatencyMetric, metric_name),
(HTTPBlockNumberLatencyMetric, metric_name),
(HTTPEthCallLatencyMetric, metric_name),
(HTTPAccBalanceLatencyMetric, metric_name),
Expand Down
6 changes: 4 additions & 2 deletions api/read/bnbsc.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@
HTTPTxReceiptLatencyMetric,
)
from metrics.ethereum import (
WSBlockLatencyMetric,
# WSBlockLatencyMetric,
WSLogLatencyMetric,
)

metric_name = f"{MetricsServiceConfig.METRIC_PREFIX}response_latency_seconds"

METRICS = [
(WSBlockLatencyMetric, metric_name),
# (WSBlockLatencyMetric, metric_name),
(WSLogLatencyMetric, metric_name),
(HTTPBlockNumberLatencyMetric, metric_name),
(HTTPEthCallLatencyMetric, metric_name),
(HTTPAccBalanceLatencyMetric, metric_name),
Expand Down
6 changes: 4 additions & 2 deletions api/read/ethereum.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
HTTPDebugTraceTxLatencyMetric,
HTTPEthCallLatencyMetric,
HTTPTxReceiptLatencyMetric,
WSBlockLatencyMetric,
# WSBlockLatencyMetric,
WSLogLatencyMetric,
)

metric_name = f"{MetricsServiceConfig.METRIC_PREFIX}response_latency_seconds"

METRICS = [
(WSBlockLatencyMetric, metric_name),
# (WSBlockLatencyMetric, metric_name),
(WSLogLatencyMetric, metric_name),
(HTTPBlockNumberLatencyMetric, metric_name),
(HTTPEthCallLatencyMetric, metric_name),
(HTTPAccBalanceLatencyMetric, metric_name),
Expand Down
1 change: 1 addition & 0 deletions common/base_metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import aiohttp
import websockets
import websockets.exceptions

from common.metric_config import MetricConfig, MetricLabelKey, MetricLabels
from config.defaults import MetricsServiceConfig
Expand Down
4 changes: 3 additions & 1 deletion common/metric_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,9 @@ async def fetch_data(self) -> float:
"""Measure single request latency with a retry on 429 error."""
endpoint: str | None = self.config.endpoints.get_endpoint()

async with aiohttp.ClientSession() as session:
async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=self.config.timeout)
) as session:
response_time = 0.0 # Do not include retried requests after 429 error
response = None # type: ignore

Expand Down
232 changes: 231 additions & 1 deletion metrics/ethereum.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
import json
import logging
from datetime import datetime, timezone
from typing import Any, Optional

import aiohttp

from common.metric_config import MetricConfig, MetricLabelKey, MetricLabels
from common.metric_types import HttpCallLatencyMetricBase, WebSocketMetric
Expand Down Expand Up @@ -155,7 +158,13 @@ async def send_with_timeout(self, websocket, message: str, timeout: float) -> No
async def recv_with_timeout(self, websocket, timeout: float) -> str:
"""Receive a message with a timeout."""
try:
return await asyncio.wait_for(websocket.recv(), timeout)
message = await asyncio.wait_for(websocket.recv(), timeout)
# Log incoming message size in bytes
message_size: int = len(message.encode("utf-8"))
logging.info(
f"WebSocket received {message_size} bytes from {self.labels.get_prometheus_labels()}"
)
return message
except asyncio.TimeoutError:
raise TimeoutError(
f"WebSocket message reception timed out after {timeout} seconds"
Expand Down Expand Up @@ -243,3 +252,224 @@ def process_data(self, block) -> float:
current_time: datetime = datetime.now(timezone.utc)
latency: float = (current_time - block_time).total_seconds()
return latency


class WSLogLatencyMetric(WebSocketMetric):
"""Collects log latency for EVM providers using predictable log events.

This metric subscribes to Transfer events from USDT contracts, which have
predictable, consistent message sizes across all supported chains.
"""

# Use USDT (Tether) which is available on all chains
TOKEN_CONTRACTS: dict[str, str] = {
"ethereum": "0xdAC17F958D2ee523a2206206994597C13D831ec7", # USDT
"base": "0xfde4C96c8593536E31F229EA8f37b2ADa2699bb2", # USDT
"arbitrum": "0xFd086bC7CD5C481DCC9C85ebE478A1C0b69FCbb9", # USDT
"bnb": "0x55d398326f99059fF775485246999027B3197955", # BSC-USD
}

# Transfer event signature: Transfer(address,address,uint256)
TRANSFER_SIGNATURE = (
"0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"
)

def __init__(
self,
handler: "MetricsHandler", # type: ignore # noqa: F821
metric_name: str,
labels: MetricLabels,
config: MetricConfig,
**kwargs,
) -> None:
ws_endpoint = kwargs.pop("ws_endpoint", None)
http_endpoint = kwargs.pop("http_endpoint", None)
super().__init__(
handler=handler,
metric_name=metric_name,
labels=labels,
config=config,
ws_endpoint=ws_endpoint,
http_endpoint=http_endpoint,
)

# Get blockchain name from labels and determine token contract
blockchain: str | None = labels.get_label(MetricLabelKey.BLOCKCHAIN)
if blockchain:
self.token_contract = self.TOKEN_CONTRACTS.get(blockchain.lower()) # type: ignore
else:
# Fallback to Ethereum USDT if blockchain not specified
self.token_contract: str = self.TOKEN_CONTRACTS["ethereum"]

self.labels.update_label(MetricLabelKey.API_METHOD, "eth_subscribe_logs")
self.first_event_received = False # Flag to track first event

async def send_with_timeout(self, websocket, message: str, timeout: float) -> None:
"""Send a message with a timeout."""
try:
await asyncio.wait_for(websocket.send(message), timeout)
except asyncio.TimeoutError:
raise TimeoutError(
f"WebSocket message send timed out after {timeout} seconds"
)

async def recv_with_timeout(self, websocket, timeout: float) -> str:
"""Receive a message with a timeout."""
try:
message = await asyncio.wait_for(websocket.recv(), timeout)
# Log incoming message size in bytes
message_size: int = len(message.encode("utf-8"))
logging.info(
f"WebSocket received {message_size} bytes from {self.labels.get_prometheus_labels()}"
)
return message
except asyncio.TimeoutError:
raise TimeoutError(
f"WebSocket message reception timed out after {timeout} seconds"
)

async def subscribe(self, websocket) -> None:
"""Subscribe to Transfer logs from USDT contract."""
subscription_msg: str = json.dumps(
{
"id": 1,
"jsonrpc": "2.0",
"method": "eth_subscribe",
"params": [
"logs",
{
"address": self.token_contract,
"topics": [self.TRANSFER_SIGNATURE],
},
],
}
)

await self.send_with_timeout(websocket, subscription_msg, WS_DEFAULT_TIMEOUT)
response: str = await self.recv_with_timeout(websocket, WS_DEFAULT_TIMEOUT)
subscription_data = json.loads(response)

if subscription_data.get("result") is None:
raise ValueError(f"Subscription to logs failed: {subscription_data}")

self.subscription_id = subscription_data["result"]

async def unsubscribe(self, websocket) -> None:
"""Unsubscribe from the WebSocket connection."""
if self.subscription_id is None:
logging.warning("No subscription ID available, skipping unsubscribe.")
return

unsubscribe_msg: str = json.dumps(
{
"id": 2,
"jsonrpc": "2.0",
"method": "eth_unsubscribe",
"params": [self.subscription_id],
}
)

try:
await self.send_with_timeout(websocket, unsubscribe_msg, WS_DEFAULT_TIMEOUT)
await self.recv_with_timeout(websocket, WS_DEFAULT_TIMEOUT)
except Exception as e:
logging.warning(f"Error during unsubscribe: {e}")

async def listen_for_data(self, websocket) -> Optional[Any]:
"""Listen for the FIRST log event only and extract block information."""
while not self.first_event_received:
response: str = await self.recv_with_timeout(websocket, WS_DEFAULT_TIMEOUT)
response_data = json.loads(response)

if "params" in response_data and "result" in response_data["params"]:
log_data = response_data["params"]["result"]
self.first_event_received = True # Mark that we got the first event
return log_data

return None

async def collect_metric(self) -> None:
"""Collects single WebSocket message and calculates timestamp-based latency."""
websocket = None

try:
websocket = await self.connect()
await self.subscribe(websocket)
log_data = await self.listen_for_data(websocket)

if log_data is not None:
# Get block timestamp using HTTP and calculate latency
latency: float = await self.calculate_timestamp_latency_http(log_data)
self.update_metric_value(latency)
self.mark_success()
return
raise ValueError("No data in response")

except Exception as e:
self.mark_failure()
self.handle_error(e)

finally:
if websocket:
try:
await self.unsubscribe(websocket)
await websocket.close()
except Exception as e:
logging.error(f"Error closing websocket: {e!s}")

async def calculate_timestamp_latency_http(self, log_data: dict) -> float:
"""Calculate latency between block timestamp and current time using HTTP."""
current_time: datetime = datetime.now(
timezone.utc
) # Get current time before making HTTP request

block_number = log_data.get("blockNumber")
if not block_number:
raise ValueError("No blockNumber in log data")

# Use HTTP endpoint to fetch block timestamp
block_request = {
"id": 1,
"jsonrpc": "2.0",
"method": "eth_getBlockByNumber",
"params": [block_number, False], # False = don't include full transactions
}

async with aiohttp.ClientSession() as session:
async with session.post(
self.http_endpoint, # type: ignore
headers={
"Accept": "application/json",
"Content-Type": "application/json",
},
json=block_request,
timeout=aiohttp.ClientTimeout(total=10),
) as response:
if response.status != 200:
raise ValueError(
f"HTTP request failed with status {response.status}"
)

block_data = await response.json()

if "error" in block_data:
raise ValueError(f"RPC error: {block_data['error']}")

if not block_data.get("result") or not block_data["result"].get(
"timestamp"
):
raise ValueError("Failed to get block timestamp")

block_timestamp_hex = block_data["result"]["timestamp"]
block_timestamp = int(block_timestamp_hex, 16)
block_time: datetime = datetime.fromtimestamp(
block_timestamp, timezone.utc
)
latency: float = (current_time - block_time).total_seconds()

return latency

def process_data(self, log_data: dict) -> float:
"""This method is not used in the updated flow."""
# The latency calculation is now handled in calculate_timestamp_latency_http
return 0.0
2 changes: 1 addition & 1 deletion tests/test_api_read.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def main() -> None:
setup_environment()

# Import handler after environment setup
from api.read.test_blockchain import handler
from api.read.bnbsc import handler

server = HTTPServer(("localhost", 8000), handler)
print("Server started at http://localhost:8000")
Expand Down