Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions api/read/hyperliquid.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
HTTPGetLogsLatencyMetric,
HTTPTxReceiptLatencyMetric,
)
from metrics.hyperliquid_info import (
HTTPClearinghouseStateLatencyMetric,
HTTPOpenOrdersLatencyMetric,
)

METRIC_NAME = f"{MetricsServiceConfig.METRIC_PREFIX}response_latency_seconds"
ALLOWED_REGIONS: list[str] = [
Expand All @@ -26,6 +30,8 @@
(HTTPAccBalanceLatencyMetric, METRIC_NAME),
(HTTPTxReceiptLatencyMetric, METRIC_NAME),
(HTTPGetLogsLatencyMetric, METRIC_NAME),
(HTTPClearinghouseStateLatencyMetric, METRIC_NAME),
(HTTPOpenOrdersLatencyMetric, METRIC_NAME),
]
if os.getenv("VERCEL_REGION") in ALLOWED_REGIONS # System env var, standard name
else []
Expand Down
6 changes: 3 additions & 3 deletions api/support/update_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ async def update(self) -> str:
try:
previous_data: dict[str, Any] = await self._get_previous_data()

chainstack_endpoints: dict[str, str] = (
await self._get_chainstack_endpoints()
)
chainstack_endpoints: dict[
str, str
] = await self._get_chainstack_endpoints()
blockchain_data = await self._collect_blockchain_data(
chainstack_endpoints, previous_data
)
Expand Down
89 changes: 89 additions & 0 deletions common/hyperliquid_info_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
"""Base class for Hyperliquid /info endpoint metrics."""

from abc import abstractmethod
from typing import Any

import aiohttp

from common.metric_config import MetricConfig, MetricLabels
from common.metric_types import HttpCallLatencyMetricBase
from common.metrics_handler import MetricsHandler


class HyperliquidInfoMetricBase(HttpCallLatencyMetricBase):
"""Base class for Hyperliquid /info endpoint latency metrics.

Handles request configuration, state validation, and response time
measurement for Hyperliquid info API endpoints.
"""

@property
@abstractmethod
def method(self) -> str:
"""Info API method to be implemented by subclasses."""
pass

def __init__(
self,
handler: "MetricsHandler",
metric_name: str,
labels: MetricLabels,
config: MetricConfig,
**kwargs: Any,
) -> None:
"""Initialize Hyperliquid info metric with state-based parameters."""
# Extract state data before passing to parent
state_data = kwargs.get("state_data", {})
params: dict[str, str] = self.get_params_from_state(state_data)
self.user_address: str = params["user"]

# Call parent constructor with method_params set to None since we
# override _build_base_request
super().__init__(
handler=handler,
metric_name=metric_name,
labels=labels,
config=config,
method_params=None,
**kwargs,
)

def _build_base_request(self) -> dict[str, Any]:
"""Override to build Hyperliquid-specific request payload."""
return {"type": self.method, "user": self.user_address}

@staticmethod
def validate_state(state_data: dict[str, Any]) -> bool:
"""Validate state data. Override in subclasses if needed."""
return True

@staticmethod
def get_params_from_state(state_data: dict[str, Any]) -> dict[str, str]:
"""Get parameters from state data. Override in subclasses if needed."""
return {}

def get_info_endpoint(self) -> str:
"""Transform EVM endpoint to info endpoint."""
base_endpoint: str = self.get_endpoint().rstrip("/")

if base_endpoint.endswith("/info"):
return base_endpoint

if base_endpoint.endswith("/evm"):
base_endpoint = base_endpoint[:-4]

return f"{base_endpoint}/info"

async def _send_request(
self, session: aiohttp.ClientSession, endpoint: str
) -> aiohttp.ClientResponse:
"""Override to use Hyperliquid info endpoint and request format."""
info_endpoint: str = self.get_info_endpoint()
return await session.post(
info_endpoint,
headers={
"Accept": "application/json",
"Content-Type": "application/json",
},
json=self._base_request,
)
54 changes: 17 additions & 37 deletions common/metric_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,23 +64,26 @@ async def _collect_ws_data():
websocket = await self.connect()
await self.subscribe(websocket)
data = await self.listen_for_data(websocket)

if data is not None:
return data
raise ValueError("No data in response")

try:
data = await asyncio.wait_for(
_collect_ws_data(),
timeout=self.config.timeout
_collect_ws_data(), timeout=self.config.timeout
)
latency: int | float = self.process_data(data)
self.update_metric_value(latency)
self.mark_success()

except asyncio.TimeoutError:
self.mark_failure()
self.handle_error(TimeoutError(f"WebSocket metric collection exceeded {self.config.timeout}s timeout"))
self.handle_error(
TimeoutError(
f"WebSocket metric collection exceeded {self.config.timeout}s timeout"
)
)

except Exception as e:
self.mark_failure()
Expand Down Expand Up @@ -110,8 +113,7 @@ def get_endpoint(self) -> str:
async def collect_metric(self) -> None:
try:
data = await asyncio.wait_for(
self.fetch_data(),
timeout=self.config.timeout
self.fetch_data(), timeout=self.config.timeout
)
if data is not None:
latency: int | float = self.process_data(data)
Expand All @@ -121,7 +123,11 @@ async def collect_metric(self) -> None:
raise ValueError("No data in response")
except asyncio.TimeoutError:
self.mark_failure()
self.handle_error(TimeoutError(f"Metric collection exceeded {self.config.timeout}s timeout"))
self.handle_error(
TimeoutError(
f"Metric collection exceeded {self.config.timeout}s timeout"
)
)
except Exception as e:
self.mark_failure()
self.handle_error(e)
Expand Down Expand Up @@ -197,30 +203,14 @@ async def fetch_data(self) -> float:
trace_config = aiohttp.TraceConfig()
timing = {}

async def on_request_start(session, context, params):
timing["start"] = time.monotonic()

async def on_dns_resolvehost_start(session, context, params):
timing["dns_start"] = time.monotonic()

async def on_dns_resolvehost_end(session, context, params):
timing["dns_end"] = time.monotonic()

async def on_connection_create_start(session, context, params):
async def on_connection_create_start(session, context, params) -> None:
timing["conn_start"] = time.monotonic()

async def on_connection_create_end(session, context, params):
async def on_connection_create_end(session, context, params) -> None:
timing["conn_end"] = time.monotonic()

async def on_request_end(session, context, params):
timing["end"] = time.monotonic()

trace_config.on_request_start.append(on_request_start)
trace_config.on_dns_resolvehost_start.append(on_dns_resolvehost_start)
trace_config.on_dns_resolvehost_end.append(on_dns_resolvehost_end)
trace_config.on_connection_create_start.append(on_connection_create_start)
trace_config.on_connection_create_end.append(on_connection_create_end)
trace_config.on_request_end.append(on_request_end)

async with aiohttp.ClientSession(
trace_configs=[trace_config],
Expand All @@ -243,21 +233,11 @@ async def on_request_end(session, context, params):

break

# Log timing breakdown
if "dns_start" in timing and "dns_end" in timing:
dns_time = (timing["dns_end"] - timing["dns_start"]) * 1000
else:
dns_time = 0

if "conn_start" in timing and "conn_end" in timing:
conn_time = (timing["conn_end"] - timing["conn_start"]) * 1000
conn_time = timing["conn_end"] - timing["conn_start"]
else:
conn_time = 0

total_time = response_time * 1000

# Log breakdown removed - use proper logging if needed

if not response:
raise ValueError("No response received")

Expand All @@ -276,7 +256,7 @@ async def on_request_end(session, context, params):
raise ValueError(f"JSON-RPC error: {json_response['error']}")

# Return RPC time only (exclude connection time)
return response_time - (conn_time / 1000)
return response_time - conn_time
finally:
await response.release()

Expand Down
2 changes: 1 addition & 1 deletion common/state/blob_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class BlobConfig:


class BlobStorageHandler:
def __init__(self, config: BlobConfig):
def __init__(self, config: BlobConfig) -> None:
self.config: BlobConfig = config
self._headers: dict[str, str] = {
"Authorization": f"Bearer {config.token}",
Expand Down
4 changes: 3 additions & 1 deletion common/state/blockchain_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ async def _fetch_evm_data(self, blockchain: str) -> BlockchainData:
tx_hash = (
transactions[0].get("hash", "")
if isinstance(transactions[0], dict)
else transactions[0] if transactions else ""
else transactions[0]
if transactions
else ""
)

return BlockchainData(
Expand Down
20 changes: 13 additions & 7 deletions metrics/hyperliquid.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
"""Hyperliquid EVM metrics implementation for HTTP endpoints."""
"""Hyperliquid EVM metrics implementation for HTTP endpoints.

For Hyperliquid Info API metrics (clearinghouseState, openOrders, etc.),
see metrics.hyperliquid_info module.
"""

from common.metric_types import HttpCallLatencyMetricBase

Expand All @@ -18,7 +22,7 @@ def get_params_from_state(state_data: dict) -> list:
"to": "0x5555555555555555555555555555555555555555",
"data": "0x18160ddd",
},
"latest", # Only latest block is supported
"latest", # Only latest block is supported
]


Expand Down Expand Up @@ -50,9 +54,11 @@ def method(self) -> str:
@staticmethod
def get_params_from_state(state_data: dict) -> list:
"""Get parameters with fixed monitoring address."""
return ["0xFC1286EeddF81d6955eDAd5C8D99B8Aa32F3D2AA",
#state_data["old_block"],
"latest"] # Only latest block is supported
return [
"0xFC1286EeddF81d6955eDAd5C8D99B8Aa32F3D2AA",
# state_data["old_block"],
"latest",
] # Only latest block is supported


class HTTPBlockNumberLatencyMetric(HttpCallLatencyMetricBase):
Expand Down Expand Up @@ -93,8 +99,8 @@ def get_params_from_state(state_data: dict) -> list:
"fromBlock": from_block_hex,
"toBlock": to_block_hex,
"address": "0x5555555555555555555555555555555555555555", # Wrapped HYPE
#"topics": [
# "topics": [
# " 0x7fcf532c15f0a6db0bd6d0e038bea71d30d808c7d98cb3bf7268a95bf5081b65" # Withdrawal event
#],
# ],
}
]
33 changes: 33 additions & 0 deletions metrics/hyperliquid_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""Hyperliquid Info API metrics implementation for /info endpoints."""

from typing import Any

from common.hyperliquid_info_base import HyperliquidInfoMetricBase


class HTTPClearinghouseStateLatencyMetric(HyperliquidInfoMetricBase):
"""Collects response time for clearinghouseState queries."""

@property
def method(self) -> str:
"""Return the API method name for clearinghouse state queries."""
return "clearinghouseState"

@staticmethod
def get_params_from_state(state_data: dict[str, Any]) -> dict[str, str]:
"""Get parameters for clearinghouseState query."""
return {"user": "0x31ca8395cf837de08b24da3f660e77761dfb974b"}


class HTTPOpenOrdersLatencyMetric(HyperliquidInfoMetricBase):
"""Collects response time for openOrders queries."""

@property
def method(self) -> str:
"""Return the API method name for open orders queries."""
return "openOrders"

@staticmethod
def get_params_from_state(state_data: dict[str, Any]) -> dict[str, str]:
"""Get parameters for openOrders query."""
return {"user": "0x31ca8395cf837de08b24da3f660e77761dfb974b"}
4 changes: 3 additions & 1 deletion metrics/solana_landing_rate.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ async def _create_client(self) -> AsyncClient:
return AsyncClient(endpoint)

async def _get_slot(self, client: AsyncClient) -> int:
response: GetSlotResp = await client.get_slot(MetricsServiceConfig.SOLANA_CONFIRMATION_LEVEL) # type: ignore
response: GetSlotResp = await client.get_slot(
MetricsServiceConfig.SOLANA_CONFIRMATION_LEVEL
) # type: ignore
if not response or response.value is None:
raise ValueError("Failed to get current slot")
return response.value
Expand Down