Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions api/read/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
from config.defaults import MetricsServiceConfig
from metrics.base import (
HTTPAccBalanceLatencyMetric,
HTTPBlockNumberLatencyMetric,
HTTPDebugTraceBlockByNumberLatencyMetric,
HTTPDebugTraceTxLatencyMetric,
HTTPEthCallLatencyMetric,
HTTPTxReceiptLatencyMetric,
)
from metrics.ethereum import (
HTTPBlockNumberLatencyMetric,
HTTPDebugTraceBlockByNumberLatencyMetric,
WSBlockLatencyMetric,
)

Expand Down
64 changes: 49 additions & 15 deletions common/metric_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import time
from abc import abstractmethod
from typing import Any, Optional
from typing import Any, Dict, Optional

import aiohttp
import websockets
Expand Down Expand Up @@ -110,57 +110,91 @@ async def collect_metric(self) -> None:


class HttpCallLatencyMetricBase(HttpMetric):
"""Base class for JSON-RPC HTTP endpoint latency metrics."""
"""Base class for JSON-RPC HTTP endpoint latency metrics.

Handles request configuration, state validation, and response time measurement
for blockchain RPC endpoints.
"""

@property
@abstractmethod
def method(self) -> str:
"""RPC method name to be implemented by subclasses."""
pass

def __init__(
self,
handler: "MetricsHandler",
metric_name: str,
labels: MetricLabels,
config: MetricConfig,
method: str,
method_params: dict = None,
**kwargs,
):
method_params: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> None:
state_data = kwargs.get("state_data", {})
if not self.validate_state(state_data):
raise ValueError(f"Invalid state data for {self.method}")

super().__init__(
handler=handler,
metric_name=metric_name,
labels=labels,
config=config,
)
self.method = method
self.method_params = method_params or None
self.labels.update_label(MetricLabelKey.API_METHOD, method)
self._base_request = {

self.method_params = (
self.get_params_from_state(state_data)
if method_params is None
else method_params
)
self.labels.update_label(MetricLabelKey.API_METHOD, self.method)
self._base_request = self._build_base_request()

def _build_base_request(self) -> Dict[str, Any]:
"""Build the base JSON-RPC request object."""
request = {
"id": 1,
"jsonrpc": "2.0",
"method": self.method,
}
if self.method_params:
self._base_request["params"] = self.method_params
request["params"] = self.method_params
return request

@staticmethod
def validate_state(state_data: Dict[str, Any]) -> bool:
"""Validate blockchain state data."""
return True

@staticmethod
def get_params_from_state(state_data: Dict[str, Any]) -> Dict[str, Any]:
"""Get RPC method parameters from state data."""
return {}

async def fetch_data(self) -> float:
"""Measures single request latency."""
"""Measure single request latency."""
start_time = time.monotonic()

endpoint = self.config.endpoints.get_endpoint(self.method)

async with aiohttp.ClientSession() as session:
async with session.post(
endpoint,
endpoint, # type: ignore
headers={
"Accept": "application/json",
"Content-Type": "application/json",
},
json=self._base_request,
timeout=self.config.timeout,
timeout=self.config.timeout, # type: ignore
) as response:
if response.status != 200:
raise ValueError(f"Status code: {response.status}")

json_response = await response.json()
if "error" in json_response:
raise ValueError(f"JSON-RPC error: {json_response['error']}")

return time.monotonic() - start_time

def process_data(self, value: float) -> float:
"""Process raw latency measurement."""
return value
4 changes: 3 additions & 1 deletion common/metrics_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from common.base_metric import BaseMetric
from common.factory import MetricFactory
from common.metric_config import MetricConfig
from common.state.blockchain_state import BlockchainState
from config.defaults import MetricsServiceConfig


Expand Down Expand Up @@ -57,6 +58,7 @@ async def collect_metrics(self, provider: dict, config: dict):
endpoints=None, # Will be set in factory
extra_params={"tx_data": provider.get("data")},
)
state_data = await BlockchainState.get_data(self.blockchain)

metrics = MetricFactory.create_metrics(
blockchain_name=self.blockchain,
Expand All @@ -68,6 +70,7 @@ async def collect_metrics(self, provider: dict, config: dict):
ws_endpoint=provider.get("websocket_endpoint"),
http_endpoint=provider.get("http_endpoint"),
tx_endpoint=provider.get("tx_endpoint"),
state_data=state_data,
)
await asyncio.gather(*(m.collect_metric() for m in metrics))

Expand Down Expand Up @@ -103,7 +106,6 @@ async def push_to_grafana(self, metrics_text: str):
async def handle(self) -> Tuple[str, str]:
"""Main handler for metric collection and pushing."""
self._instances = []

try:
config = json.loads(os.getenv("ENDPOINTS"))
MetricFactory._registry.clear()
Expand Down
65 changes: 65 additions & 0 deletions common/state/blockchain_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import logging
import os
from typing import Dict

import aiohttp

from config.defaults import BlobStorageConfig


class BlockchainState:
"""Manages blockchain state data retrieval from blob storage."""

_TIMEOUT = aiohttp.ClientTimeout(total=10)

@staticmethod
async def _get_blob_url(session: aiohttp.ClientSession) -> str:
"""Get URL of the latest blockchain data blob."""
list_url = (
f"{BlobStorageConfig.BLOB_BASE_URL}?prefix={BlobStorageConfig.BLOB_FOLDER}/"
)
headers = {
"Authorization": f"Bearer {os.getenv('VERCEL_BLOB_TOKEN')}",
"x-store-id": os.getenv("STORE_ID"),
}

async with session.get(list_url, headers=headers) as response:
if response.status != 200:
raise ValueError(f"Failed to list blobs: {response.status}")

data = await response.json()
blobs = data.get("blobs", [])

for blob in blobs:
if blob["pathname"].endswith(BlobStorageConfig.BLOB_FILENAME):
return blob["url"]

raise ValueError("Blockchain data blob not found")

@staticmethod
async def _fetch_state_data(session: aiohttp.ClientSession, blob_url: str) -> Dict:
"""Fetch state data from blob storage."""
async with session.get(blob_url) as response:
if response.status != 200:
raise ValueError(f"Failed to fetch state: {response.status}")
return await response.json()

@staticmethod
async def get_data(blockchain: str) -> dict:
"""Get blockchain state data."""
try:
async with aiohttp.ClientSession(
timeout=BlockchainState._TIMEOUT
) as session:
blob_url = await BlockchainState._get_blob_url(session)
state_data = await BlockchainState._fetch_state_data(session, blob_url)
return state_data.get(blockchain.lower(), {})

except Exception as e:
logging.error(f"State fetch failed: {e!s}")
raise

@staticmethod
def clear_cache() -> None:
"""Maintained for API compatibility."""
pass
Loading