Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions api/support/update_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,15 @@ async def fetch_single(
return blockchain, {
"block": data.block_id,
"tx": data.transaction_id,
"old_block": data.old_block_id, # Add new field
}

if blockchain in previous_data:
self.logger.warning(f"Using previous data for {blockchain}")
return blockchain, previous_data[blockchain]

self.logger.warning(f"Returning empty data for {blockchain}")
return blockchain, {"block": "", "tx": ""}
return blockchain, {"block": "", "tx": "", "old_block": ""}
except Exception as e:
self.logger.error(f"Failed to fetch {blockchain} data: {e}")
if blockchain in previous_data:
Expand All @@ -105,7 +106,7 @@ async def fetch_single(
)
return blockchain, previous_data[blockchain]
self.logger.warning(f"Returning empty data for {blockchain}")
return blockchain, {"block": "", "tx": ""}
return blockchain, {"block": "", "tx": "", "old_block": ""}

tasks = [
fetch_single(blockchain, endpoint)
Expand Down
43 changes: 27 additions & 16 deletions common/metric_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,23 +175,42 @@ async def fetch_data(self) -> float:
endpoint = self.config.endpoints.get_endpoint(self.method)

async with aiohttp.ClientSession() as session:
start_time = time.monotonic()
async with await self._send_request(session, endpoint, 0) as response: # type: ignore
response_time = 0.0 # Do not include retried requests after 429 error
response = None

for retry_count in range(MAX_RETRIES):
start_time = time.monotonic()
response = await self._send_request(session, endpoint, retry_count)
response_time = time.monotonic() - start_time

if response.status == 429 and retry_count < MAX_RETRIES - 1:
wait_time = int(response.headers.get("Retry-After", 15))
await response.release() # Release before retry
await asyncio.sleep(wait_time)
continue

break

if not response:
raise ValueError("No response received")

try:
if response.status != 200:
raise ValueError(f"Status code: {response.status}")

json_response = await response.json()
if "error" in json_response:
raise ValueError(f"JSON-RPC error: {json_response['error']}")
return time.monotonic() - start_time

return response_time
finally:
await response.release()

async def _send_request(
self, session: aiohttp.ClientSession, endpoint: str, retry_count: int
) -> aiohttp.ClientResponse:
"""Send the request and handle rate limiting with retries."""
if retry_count >= MAX_RETRIES:
raise ValueError("Status code: 429. Max retries exceeded")

response = await session.post(
"""Send the request without retry logic."""
return await session.post(
endpoint,
headers={
"Accept": "application/json",
Expand All @@ -201,14 +220,6 @@ async def _send_request(
timeout=self.config.timeout, # type: ignore
)

if response.status == 429 and retry_count < MAX_RETRIES:
wait_time = int(response.headers.get("Retry-After", 10))
await response.release()
await asyncio.sleep(wait_time)
return await self._send_request(session, endpoint, retry_count + 1)

return response

def process_data(self, value: float) -> float:
"""Process raw latency measurement."""
return value
107 changes: 81 additions & 26 deletions common/state/blockchain_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,22 @@

import asyncio
import logging
import random
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Union

import aiohttp

from config.defaults import MetricsServiceConfig


@dataclass
class BlockchainData:
"""Container for blockchain state data."""

block_id: str
transaction_id: str
old_block_id: str = ""


class BlockchainDataFetcher:
Expand Down Expand Up @@ -54,49 +58,58 @@ async def _make_rpc_request(
self._logger.error(f"All {self._max_retries} attempts failed")
raise

async def _fetch_evm_data(self) -> BlockchainData:
async def _fetch_evm_data(self, blockchain: str) -> BlockchainData:
try:
block = await self._make_rpc_request(
latest_block = await self._make_rpc_request(
"eth_getBlockByNumber", ["latest", True]
)
if not isinstance(block, dict):
self._logger.error(f"Invalid block format: {type(block)}")
return BlockchainData(block_id="", transaction_id="")

block_hash = block.get("hash", "")
latest_number = int(latest_block["number"], 16)
offset_range = MetricsServiceConfig.BLOCK_OFFSET_RANGES.get(
blockchain.lower(), (20, 100)
)
offset = random.randint(offset_range[0], offset_range[1])
old_number = max(0, latest_number - offset)

if not isinstance(latest_block, dict):
return BlockchainData(block_id="", transaction_id="", old_block_id="")

tx_hash = ""

transactions = block.get("transactions", [])
transactions = latest_block.get("transactions", [])
if transactions and isinstance(transactions[0], (dict, str)):
tx_hash = (
transactions[0].get("hash", "")
if isinstance(transactions[0], dict)
else transactions[0]
)

# self._logger.info(f"{block_hash} {tx_hash}")
return BlockchainData(block_id=block_hash, transaction_id=tx_hash)
return BlockchainData(
block_id=latest_block["number"],
transaction_id=tx_hash,
old_block_id=hex(old_number),
)

except Exception as e:
self._logger.error(f"EVM fetch failed: {e!s}")
return BlockchainData(block_id="", transaction_id="")
return BlockchainData(block_id="", transaction_id="", old_block_id="")

async def _fetch_solana_data(self) -> BlockchainData:
try:
block_info = await self._make_rpc_request(
"getLatestBlockhash", [{"commitment": "finalized"}]
)
if not isinstance(block_info, dict):
return BlockchainData(block_id="", transaction_id="")
return BlockchainData(block_id="", transaction_id="", old_block_id="")

block_slot = block_info.get("context", {}).get("slot", "")
if not block_slot:
return BlockchainData(block_id="", transaction_id="")
latest_slot = block_info.get("context", {}).get("slot", "")
if not latest_slot:
return BlockchainData(block_id="", transaction_id="", old_block_id="")

block = await self._make_rpc_request(
latest_block = await self._make_rpc_request(
"getBlock",
[
block_slot,
latest_slot,
{
"encoding": "json",
"maxSupportedTransactionVersion": 0,
Expand All @@ -107,17 +120,49 @@ async def _fetch_solana_data(self) -> BlockchainData:
)

tx_sig = ""
if isinstance(block, dict):
signatures = block.get("signatures", [])
if isinstance(latest_block, dict):
signatures = latest_block.get("signatures", [])
if signatures:
tx_sig = signatures[0]

# self._logger.info(f"{block_slot} {tx_sig}")
return BlockchainData(block_id=str(block_slot), transaction_id=tx_sig)
offset_range = MetricsServiceConfig.BLOCK_OFFSET_RANGES.get(
"solana", (100, 1000)
)
offset = random.randint(offset_range[0], offset_range[1])
target_slot = max(0, latest_slot - offset)
old_slot = None

for slot in range(target_slot - 100, target_slot):
try:
block_exists = await self._make_rpc_request(
"getBlock",
[
slot,
{
"encoding": "json",
"maxSupportedTransactionVersion": 0,
"transactionDetails": "none",
"rewards": False,
},
],
)
if block_exists:
old_slot = slot
break
except Exception as e:
if "Block not available" not in str(e):
self._logger.warning(f"Error checking slot {slot}: {e}")
continue

return BlockchainData(
block_id=str(latest_slot),
transaction_id=tx_sig,
old_block_id=str(old_slot) if old_slot is not None else "",
)

except Exception as e:
self._logger.error(f"Solana fetch failed: {e!s}")
return BlockchainData(block_id="", transaction_id="")
return BlockchainData(block_id="", transaction_id="", old_block_id="")

async def _fetch_ton_data(self) -> BlockchainData:
try:
Expand All @@ -129,9 +174,16 @@ async def _fetch_ton_data(self) -> BlockchainData:
if not isinstance(last_block, dict):
raise ValueError("Invalid last block format")

block_id = (
offset_range = MetricsServiceConfig.BLOCK_OFFSET_RANGES.get("ton", (10, 50))
offset = random.randint(offset_range[0], offset_range[1])
old_seqno = max(0, last_block["seqno"] - offset)

latest_block_id = (
f"{last_block['workchain']}:{last_block['shard']}:{last_block['seqno']}"
)
old_block_id = (
f"{last_block['workchain']}:{last_block['shard']}:{old_seqno}"
)

block = await self._make_rpc_request(
"getBlockTransactions",
Expand All @@ -147,17 +199,20 @@ async def _fetch_ton_data(self) -> BlockchainData:
if isinstance(block, dict) and block.get("transactions"):
tx_id = block["transactions"][0].get("hash", "")

# self._logger.info(f"{block_id} {tx_id}")
return BlockchainData(block_id=block_id, transaction_id=tx_id)
return BlockchainData(
block_id=latest_block_id,
transaction_id=tx_id,
old_block_id=old_block_id,
)

except Exception as e:
self._logger.error(f"TON fetch failed: {e!s}")
return BlockchainData(block_id="", transaction_id="")
return BlockchainData(block_id="", transaction_id="", old_block_id="")

async def fetch_latest_data(self, blockchain: str) -> BlockchainData:
try:
if blockchain in ("ethereum", "base"):
return await self._fetch_evm_data()
return await self._fetch_evm_data(blockchain)
elif blockchain == "solana":
return await self._fetch_solana_data()
elif blockchain == "ton":
Expand Down
9 changes: 8 additions & 1 deletion common/state/blockchain_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,14 @@ async def _fetch_state_data(session: aiohttp.ClientSession, blob_url: str) -> Di
async with session.get(blob_url, headers=headers) as response:
if response.status != 200:
raise ValueError(f"Failed to fetch state: {response.status}")
return await response.json()
data = await response.json()

# Ensure backward compatibility for old state data
for chain in data:
if isinstance(data[chain], dict) and "old_block" not in data[chain]:
data[chain]["old_block"] = ""

return data

@staticmethod
async def get_data(blockchain: str) -> dict:
Expand Down
8 changes: 8 additions & 0 deletions config/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,14 @@ class MetricsServiceConfig:
"dev_" if os.getenv("VERCEL_ENV") != "production" else ""
) # System env var, standard name

# Block offset configuration (N blocks back from latest)
BLOCK_OFFSET_RANGES = {
"ethereum": (200, 1024),
"base": (200, 1024),
"solana": (324000, 334000),
"ton": (1600000, 1610000),
}


class BlobStorageConfig:
"""Default configuration for blob storage."""
Expand Down
7 changes: 6 additions & 1 deletion metrics/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,15 @@ class HTTPAccBalanceLatencyMetric(HttpCallLatencyMetricBase):
def method(self) -> str:
return "eth_getBalance"

@staticmethod
def validate_state(state_data: dict) -> bool:
"""Validates that required block number (hex) exists in state data."""
return bool(state_data and state_data.get("old_block"))

@staticmethod
def get_params_from_state(state_data: dict) -> list:
"""Get parameters with fixed monitoring address."""
return ["0xF977814e90dA44bFA03b6295A0616a897441aceC", "latest"]
return ["0xF977814e90dA44bFA03b6295A0616a897441aceC", state_data["old_block"]]


class HTTPDebugTraceTxLatencyMetric(HttpCallLatencyMetricBase):
Expand Down
7 changes: 6 additions & 1 deletion metrics/ethereum.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,15 @@ class HTTPAccBalanceLatencyMetric(HttpCallLatencyMetricBase):
def method(self) -> str:
return "eth_getBalance"

@staticmethod
def validate_state(state_data: dict) -> bool:
"""Validates that required block number (hex) exists in state data."""
return bool(state_data and state_data.get("old_block"))

@staticmethod
def get_params_from_state(state_data: dict) -> list:
"""Returns parameters for balance check of monitoring address."""
return ["0x690B9A9E9aa1C9dB991C7721a92d351Db4FaC990", "pending"]
return ["0x690B9A9E9aa1C9dB991C7721a92d351Db4FaC990", state_data["old_block"]]


class HTTPDebugTraceBlockByNumberLatencyMetric(HttpCallLatencyMetricBase):
Expand Down
11 changes: 6 additions & 5 deletions metrics/solana.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ def get_params_from_state(state_data: dict) -> list:
"""Get parameters for simulating a token transfer."""
return [
"AQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAAEDArczbMia1tLmq7zz4DinMNN0pJ1JtLdqIJPUw3YrGCzYAMHBsgN27lcgB6H2WQvFgyZuJYHa46puOQo9yQ8CVQbd9uHXZaGT2cvhRs7reawctIXtX1s3kTqM9YV+/wCp20C7Wj2aiuk5TReAXo+VTVg8QTHjs0UjNMMKCvpzZ+ABAgEBARU=",
{"encoding": "base64"},
# The transaction recent blockhash will be replaced with the most recent blockhash.
{"encoding": "base64", "replaceRecentBlockhash": True},
]


Expand Down Expand Up @@ -76,19 +77,19 @@ def method(self) -> str:
@staticmethod
def validate_state(state_data: dict) -> bool:
"""Validate blockchain state contains block slot number."""
return bool(state_data and state_data.get("block"))
return bool(state_data and state_data.get("old_block"))

@staticmethod
def get_params_from_state(state_data: dict) -> list:
"""Get parameters using block slot from state."""
return [
int(state_data["block"]),
int(state_data["old_block"]),
{
"encoding": "jsonParsed",
"maxSupportedTransactionVersion": 0,
"transactionDetails": "none", # Reduce response size
"rewards": False # Further reduce response size
}
"rewards": False, # Further reduce response size
},
]


Expand Down
Loading