diff --git a/api/support/update_state.py b/api/support/update_state.py index 5e507ab..6d23496 100644 --- a/api/support/update_state.py +++ b/api/support/update_state.py @@ -89,6 +89,7 @@ async def fetch_single( return blockchain, { "block": data.block_id, "tx": data.transaction_id, + "old_block": data.old_block_id, # Add new field } if blockchain in previous_data: @@ -96,7 +97,7 @@ async def fetch_single( return blockchain, previous_data[blockchain] self.logger.warning(f"Returning empty data for {blockchain}") - return blockchain, {"block": "", "tx": ""} + return blockchain, {"block": "", "tx": "", "old_block": ""} except Exception as e: self.logger.error(f"Failed to fetch {blockchain} data: {e}") if blockchain in previous_data: @@ -105,7 +106,7 @@ async def fetch_single( ) return blockchain, previous_data[blockchain] self.logger.warning(f"Returning empty data for {blockchain}") - return blockchain, {"block": "", "tx": ""} + return blockchain, {"block": "", "tx": "", "old_block": ""} tasks = [ fetch_single(blockchain, endpoint) diff --git a/common/metric_types.py b/common/metric_types.py index 64c0dc2..efa9f8d 100644 --- a/common/metric_types.py +++ b/common/metric_types.py @@ -175,23 +175,42 @@ async def fetch_data(self) -> float: endpoint = self.config.endpoints.get_endpoint(self.method) async with aiohttp.ClientSession() as session: - start_time = time.monotonic() - async with await self._send_request(session, endpoint, 0) as response: # type: ignore + response_time = 0.0 # Do not include retried requests after 429 error + response = None + + for retry_count in range(MAX_RETRIES): + start_time = time.monotonic() + response = await self._send_request(session, endpoint, retry_count) + response_time = time.monotonic() - start_time + + if response.status == 429 and retry_count < MAX_RETRIES - 1: + wait_time = int(response.headers.get("Retry-After", 15)) + await response.release() # Release before retry + await asyncio.sleep(wait_time) + continue + + break + + if not response: + raise ValueError("No response received") + + try: if response.status != 200: raise ValueError(f"Status code: {response.status}") + json_response = await response.json() if "error" in json_response: raise ValueError(f"JSON-RPC error: {json_response['error']}") - return time.monotonic() - start_time + + return response_time + finally: + await response.release() async def _send_request( self, session: aiohttp.ClientSession, endpoint: str, retry_count: int ) -> aiohttp.ClientResponse: - """Send the request and handle rate limiting with retries.""" - if retry_count >= MAX_RETRIES: - raise ValueError("Status code: 429. Max retries exceeded") - - response = await session.post( + """Send the request without retry logic.""" + return await session.post( endpoint, headers={ "Accept": "application/json", @@ -201,14 +220,6 @@ async def _send_request( timeout=self.config.timeout, # type: ignore ) - if response.status == 429 and retry_count < MAX_RETRIES: - wait_time = int(response.headers.get("Retry-After", 10)) - await response.release() - await asyncio.sleep(wait_time) - return await self._send_request(session, endpoint, retry_count + 1) - - return response - def process_data(self, value: float) -> float: """Process raw latency measurement.""" return value diff --git a/common/state/blockchain_fetcher.py b/common/state/blockchain_fetcher.py index 56dac42..a802c8e 100644 --- a/common/state/blockchain_fetcher.py +++ b/common/state/blockchain_fetcher.py @@ -2,11 +2,14 @@ import asyncio import logging +import random from dataclasses import dataclass from typing import Any, Dict, List, Optional, Union import aiohttp +from config.defaults import MetricsServiceConfig + @dataclass class BlockchainData: @@ -14,6 +17,7 @@ class BlockchainData: block_id: str transaction_id: str + old_block_id: str = "" class BlockchainDataFetcher: @@ -54,19 +58,25 @@ async def _make_rpc_request( self._logger.error(f"All {self._max_retries} attempts failed") raise - async def _fetch_evm_data(self) -> BlockchainData: + async def _fetch_evm_data(self, blockchain: str) -> BlockchainData: try: - block = await self._make_rpc_request( + latest_block = await self._make_rpc_request( "eth_getBlockByNumber", ["latest", True] ) - if not isinstance(block, dict): - self._logger.error(f"Invalid block format: {type(block)}") - return BlockchainData(block_id="", transaction_id="") - block_hash = block.get("hash", "") + latest_number = int(latest_block["number"], 16) + offset_range = MetricsServiceConfig.BLOCK_OFFSET_RANGES.get( + blockchain.lower(), (20, 100) + ) + offset = random.randint(offset_range[0], offset_range[1]) + old_number = max(0, latest_number - offset) + + if not isinstance(latest_block, dict): + return BlockchainData(block_id="", transaction_id="", old_block_id="") + tx_hash = "" - transactions = block.get("transactions", []) + transactions = latest_block.get("transactions", []) if transactions and isinstance(transactions[0], (dict, str)): tx_hash = ( transactions[0].get("hash", "") @@ -74,12 +84,15 @@ async def _fetch_evm_data(self) -> BlockchainData: else transactions[0] ) - # self._logger.info(f"{block_hash} {tx_hash}") - return BlockchainData(block_id=block_hash, transaction_id=tx_hash) + return BlockchainData( + block_id=latest_block["number"], + transaction_id=tx_hash, + old_block_id=hex(old_number), + ) except Exception as e: self._logger.error(f"EVM fetch failed: {e!s}") - return BlockchainData(block_id="", transaction_id="") + return BlockchainData(block_id="", transaction_id="", old_block_id="") async def _fetch_solana_data(self) -> BlockchainData: try: @@ -87,16 +100,16 @@ async def _fetch_solana_data(self) -> BlockchainData: "getLatestBlockhash", [{"commitment": "finalized"}] ) if not isinstance(block_info, dict): - return BlockchainData(block_id="", transaction_id="") + return BlockchainData(block_id="", transaction_id="", old_block_id="") - block_slot = block_info.get("context", {}).get("slot", "") - if not block_slot: - return BlockchainData(block_id="", transaction_id="") + latest_slot = block_info.get("context", {}).get("slot", "") + if not latest_slot: + return BlockchainData(block_id="", transaction_id="", old_block_id="") - block = await self._make_rpc_request( + latest_block = await self._make_rpc_request( "getBlock", [ - block_slot, + latest_slot, { "encoding": "json", "maxSupportedTransactionVersion": 0, @@ -107,17 +120,49 @@ async def _fetch_solana_data(self) -> BlockchainData: ) tx_sig = "" - if isinstance(block, dict): - signatures = block.get("signatures", []) + if isinstance(latest_block, dict): + signatures = latest_block.get("signatures", []) if signatures: tx_sig = signatures[0] - # self._logger.info(f"{block_slot} {tx_sig}") - return BlockchainData(block_id=str(block_slot), transaction_id=tx_sig) + offset_range = MetricsServiceConfig.BLOCK_OFFSET_RANGES.get( + "solana", (100, 1000) + ) + offset = random.randint(offset_range[0], offset_range[1]) + target_slot = max(0, latest_slot - offset) + old_slot = None + + for slot in range(target_slot - 100, target_slot): + try: + block_exists = await self._make_rpc_request( + "getBlock", + [ + slot, + { + "encoding": "json", + "maxSupportedTransactionVersion": 0, + "transactionDetails": "none", + "rewards": False, + }, + ], + ) + if block_exists: + old_slot = slot + break + except Exception as e: + if "Block not available" not in str(e): + self._logger.warning(f"Error checking slot {slot}: {e}") + continue + + return BlockchainData( + block_id=str(latest_slot), + transaction_id=tx_sig, + old_block_id=str(old_slot) if old_slot is not None else "", + ) except Exception as e: self._logger.error(f"Solana fetch failed: {e!s}") - return BlockchainData(block_id="", transaction_id="") + return BlockchainData(block_id="", transaction_id="", old_block_id="") async def _fetch_ton_data(self) -> BlockchainData: try: @@ -129,9 +174,16 @@ async def _fetch_ton_data(self) -> BlockchainData: if not isinstance(last_block, dict): raise ValueError("Invalid last block format") - block_id = ( + offset_range = MetricsServiceConfig.BLOCK_OFFSET_RANGES.get("ton", (10, 50)) + offset = random.randint(offset_range[0], offset_range[1]) + old_seqno = max(0, last_block["seqno"] - offset) + + latest_block_id = ( f"{last_block['workchain']}:{last_block['shard']}:{last_block['seqno']}" ) + old_block_id = ( + f"{last_block['workchain']}:{last_block['shard']}:{old_seqno}" + ) block = await self._make_rpc_request( "getBlockTransactions", @@ -147,17 +199,20 @@ async def _fetch_ton_data(self) -> BlockchainData: if isinstance(block, dict) and block.get("transactions"): tx_id = block["transactions"][0].get("hash", "") - # self._logger.info(f"{block_id} {tx_id}") - return BlockchainData(block_id=block_id, transaction_id=tx_id) + return BlockchainData( + block_id=latest_block_id, + transaction_id=tx_id, + old_block_id=old_block_id, + ) except Exception as e: self._logger.error(f"TON fetch failed: {e!s}") - return BlockchainData(block_id="", transaction_id="") + return BlockchainData(block_id="", transaction_id="", old_block_id="") async def fetch_latest_data(self, blockchain: str) -> BlockchainData: try: if blockchain in ("ethereum", "base"): - return await self._fetch_evm_data() + return await self._fetch_evm_data(blockchain) elif blockchain == "solana": return await self._fetch_solana_data() elif blockchain == "ton": diff --git a/common/state/blockchain_state.py b/common/state/blockchain_state.py index a0327d6..a4f5917 100644 --- a/common/state/blockchain_state.py +++ b/common/state/blockchain_state.py @@ -49,7 +49,14 @@ async def _fetch_state_data(session: aiohttp.ClientSession, blob_url: str) -> Di async with session.get(blob_url, headers=headers) as response: if response.status != 200: raise ValueError(f"Failed to fetch state: {response.status}") - return await response.json() + data = await response.json() + + # Ensure backward compatibility for old state data + for chain in data: + if isinstance(data[chain], dict) and "old_block" not in data[chain]: + data[chain]["old_block"] = "" + + return data @staticmethod async def get_data(blockchain: str) -> dict: diff --git a/config/defaults.py b/config/defaults.py index 60c3f5e..8626a80 100644 --- a/config/defaults.py +++ b/config/defaults.py @@ -22,6 +22,14 @@ class MetricsServiceConfig: "dev_" if os.getenv("VERCEL_ENV") != "production" else "" ) # System env var, standard name + # Block offset configuration (N blocks back from latest) + BLOCK_OFFSET_RANGES = { + "ethereum": (200, 1024), + "base": (200, 1024), + "solana": (324000, 334000), + "ton": (1600000, 1610000), + } + class BlobStorageConfig: """Default configuration for blob storage.""" diff --git a/metrics/base.py b/metrics/base.py index 8193a5b..278361b 100644 --- a/metrics/base.py +++ b/metrics/base.py @@ -47,10 +47,15 @@ class HTTPAccBalanceLatencyMetric(HttpCallLatencyMetricBase): def method(self) -> str: return "eth_getBalance" + @staticmethod + def validate_state(state_data: dict) -> bool: + """Validates that required block number (hex) exists in state data.""" + return bool(state_data and state_data.get("old_block")) + @staticmethod def get_params_from_state(state_data: dict) -> list: """Get parameters with fixed monitoring address.""" - return ["0xF977814e90dA44bFA03b6295A0616a897441aceC", "latest"] + return ["0xF977814e90dA44bFA03b6295A0616a897441aceC", state_data["old_block"]] class HTTPDebugTraceTxLatencyMetric(HttpCallLatencyMetricBase): diff --git a/metrics/ethereum.py b/metrics/ethereum.py index b4e2e08..d69db36 100644 --- a/metrics/ethereum.py +++ b/metrics/ethereum.py @@ -65,10 +65,15 @@ class HTTPAccBalanceLatencyMetric(HttpCallLatencyMetricBase): def method(self) -> str: return "eth_getBalance" + @staticmethod + def validate_state(state_data: dict) -> bool: + """Validates that required block number (hex) exists in state data.""" + return bool(state_data and state_data.get("old_block")) + @staticmethod def get_params_from_state(state_data: dict) -> list: """Returns parameters for balance check of monitoring address.""" - return ["0x690B9A9E9aa1C9dB991C7721a92d351Db4FaC990", "pending"] + return ["0x690B9A9E9aa1C9dB991C7721a92d351Db4FaC990", state_data["old_block"]] class HTTPDebugTraceBlockByNumberLatencyMetric(HttpCallLatencyMetricBase): diff --git a/metrics/solana.py b/metrics/solana.py index 78576c3..8b99563 100644 --- a/metrics/solana.py +++ b/metrics/solana.py @@ -15,7 +15,8 @@ def get_params_from_state(state_data: dict) -> list: """Get parameters for simulating a token transfer.""" return [ "AQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAAEDArczbMia1tLmq7zz4DinMNN0pJ1JtLdqIJPUw3YrGCzYAMHBsgN27lcgB6H2WQvFgyZuJYHa46puOQo9yQ8CVQbd9uHXZaGT2cvhRs7reawctIXtX1s3kTqM9YV+/wCp20C7Wj2aiuk5TReAXo+VTVg8QTHjs0UjNMMKCvpzZ+ABAgEBARU=", - {"encoding": "base64"}, + # The transaction recent blockhash will be replaced with the most recent blockhash. + {"encoding": "base64", "replaceRecentBlockhash": True}, ] @@ -76,19 +77,19 @@ def method(self) -> str: @staticmethod def validate_state(state_data: dict) -> bool: """Validate blockchain state contains block slot number.""" - return bool(state_data and state_data.get("block")) + return bool(state_data and state_data.get("old_block")) @staticmethod def get_params_from_state(state_data: dict) -> list: """Get parameters using block slot from state.""" return [ - int(state_data["block"]), + int(state_data["old_block"]), { "encoding": "jsonParsed", "maxSupportedTransactionVersion": 0, "transactionDetails": "none", # Reduce response size - "rewards": False # Further reduce response size - } + "rewards": False, # Further reduce response size + }, ] diff --git a/metrics/ton.py b/metrics/ton.py index 2b0c050..4936d4b 100644 --- a/metrics/ton.py +++ b/metrics/ton.py @@ -35,12 +35,12 @@ def method(self) -> str: @staticmethod def validate_state(state_data: dict) -> bool: """Validates that required block identifier exists in state data.""" - return bool(state_data and state_data.get("block")) + return bool(state_data and state_data.get("old_block")) @staticmethod def get_params_from_state(state_data: dict) -> dict: """Returns parameters using TON block identifier components.""" - workchain, shard, seqno = state_data["block"].split(":") + workchain, shard, seqno = state_data["old_block"].split(":") return { "workchain": int(workchain), "shard": shard,