From 1c7f3232f572e8cb041bc57db47e4e67a3acfb4c Mon Sep 17 00:00:00 2001 From: smypmsa Date: Sat, 22 Feb 2025 12:34:50 +0000 Subject: [PATCH 1/5] feat: add preceding block tracking to blockchain state --- api/support/update_state.py | 5 +- common/state/blockchain_fetcher.py | 102 +++++++++++++++++++++-------- common/state/blockchain_state.py | 9 ++- config/defaults.py | 3 + 4 files changed, 90 insertions(+), 29 deletions(-) diff --git a/api/support/update_state.py b/api/support/update_state.py index 5e507ab..6d23496 100644 --- a/api/support/update_state.py +++ b/api/support/update_state.py @@ -89,6 +89,7 @@ async def fetch_single( return blockchain, { "block": data.block_id, "tx": data.transaction_id, + "old_block": data.old_block_id, # Add new field } if blockchain in previous_data: @@ -96,7 +97,7 @@ async def fetch_single( return blockchain, previous_data[blockchain] self.logger.warning(f"Returning empty data for {blockchain}") - return blockchain, {"block": "", "tx": ""} + return blockchain, {"block": "", "tx": "", "old_block": ""} except Exception as e: self.logger.error(f"Failed to fetch {blockchain} data: {e}") if blockchain in previous_data: @@ -105,7 +106,7 @@ async def fetch_single( ) return blockchain, previous_data[blockchain] self.logger.warning(f"Returning empty data for {blockchain}") - return blockchain, {"block": "", "tx": ""} + return blockchain, {"block": "", "tx": "", "old_block": ""} tasks = [ fetch_single(blockchain, endpoint) diff --git a/common/state/blockchain_fetcher.py b/common/state/blockchain_fetcher.py index 56dac42..c0f930d 100644 --- a/common/state/blockchain_fetcher.py +++ b/common/state/blockchain_fetcher.py @@ -7,6 +7,8 @@ import aiohttp +from config.defaults import MetricsServiceConfig + @dataclass class BlockchainData: @@ -14,6 +16,7 @@ class BlockchainData: block_id: str transaction_id: str + old_block_id: str = "" class BlockchainDataFetcher: @@ -54,19 +57,27 @@ async def _make_rpc_request( self._logger.error(f"All {self._max_retries} attempts failed") raise - async def _fetch_evm_data(self) -> BlockchainData: + async def _fetch_evm_data(self, blockchain: str) -> BlockchainData: try: - block = await self._make_rpc_request( + latest_block = await self._make_rpc_request( "eth_getBlockByNumber", ["latest", True] ) - if not isinstance(block, dict): - self._logger.error(f"Invalid block format: {type(block)}") - return BlockchainData(block_id="", transaction_id="") - block_hash = block.get("hash", "") + latest_number = int(latest_block["number"], 16) + offset = MetricsServiceConfig.BLOCK_OFFSET.get(blockchain.lower(), 20) + old_number = max(0, latest_number - offset) + old_block = await self._make_rpc_request( + "eth_getBlockByNumber", [hex(old_number), False] + ) + + if not isinstance(latest_block, dict): + return BlockchainData(block_id="", transaction_id="", old_block_id="") + + block_hash = latest_block.get("hash", "") + old_block_hash = old_block.get("hash", "") if old_block else "" tx_hash = "" - transactions = block.get("transactions", []) + transactions = latest_block.get("transactions", []) if transactions and isinstance(transactions[0], (dict, str)): tx_hash = ( transactions[0].get("hash", "") @@ -74,12 +85,13 @@ async def _fetch_evm_data(self) -> BlockchainData: else transactions[0] ) - # self._logger.info(f"{block_hash} {tx_hash}") - return BlockchainData(block_id=block_hash, transaction_id=tx_hash) + return BlockchainData( + block_id=block_hash, transaction_id=tx_hash, old_block_id=old_block_hash + ) except Exception as e: self._logger.error(f"EVM fetch failed: {e!s}") - return BlockchainData(block_id="", transaction_id="") + return BlockchainData(block_id="", transaction_id="", old_block_id="") async def _fetch_solana_data(self) -> BlockchainData: try: @@ -87,16 +99,16 @@ async def _fetch_solana_data(self) -> BlockchainData: "getLatestBlockhash", [{"commitment": "finalized"}] ) if not isinstance(block_info, dict): - return BlockchainData(block_id="", transaction_id="") + return BlockchainData(block_id="", transaction_id="", old_block_id="") - block_slot = block_info.get("context", {}).get("slot", "") - if not block_slot: - return BlockchainData(block_id="", transaction_id="") + latest_slot = block_info.get("context", {}).get("slot", "") + if not latest_slot: + return BlockchainData(block_id="", transaction_id="", old_block_id="") - block = await self._make_rpc_request( + latest_block = await self._make_rpc_request( "getBlock", [ - block_slot, + latest_slot, { "encoding": "json", "maxSupportedTransactionVersion": 0, @@ -107,17 +119,46 @@ async def _fetch_solana_data(self) -> BlockchainData: ) tx_sig = "" - if isinstance(block, dict): - signatures = block.get("signatures", []) + if isinstance(latest_block, dict): + signatures = latest_block.get("signatures", []) if signatures: tx_sig = signatures[0] - # self._logger.info(f"{block_slot} {tx_sig}") - return BlockchainData(block_id=str(block_slot), transaction_id=tx_sig) + offset = MetricsServiceConfig.BLOCK_OFFSET.get("solana", 100) + target_slot = max(0, latest_slot - offset) + old_slot = None + + for slot in range(target_slot - 100, target_slot): + try: + block_exists = await self._make_rpc_request( + "getBlock", + [ + slot, + { + "encoding": "json", + "maxSupportedTransactionVersion": 0, + "transactionDetails": "none", + "rewards": False, + }, + ], + ) + if block_exists: + old_slot = slot + break + except Exception as e: + if "Block not available" not in str(e): + self._logger.warning(f"Error checking slot {slot}: {e}") + continue + + return BlockchainData( + block_id=str(latest_slot), + transaction_id=tx_sig, + old_block_id=str(old_slot) if old_slot is not None else "", + ) except Exception as e: self._logger.error(f"Solana fetch failed: {e!s}") - return BlockchainData(block_id="", transaction_id="") + return BlockchainData(block_id="", transaction_id="", old_block_id="") async def _fetch_ton_data(self) -> BlockchainData: try: @@ -129,9 +170,15 @@ async def _fetch_ton_data(self) -> BlockchainData: if not isinstance(last_block, dict): raise ValueError("Invalid last block format") - block_id = ( + offset = MetricsServiceConfig.BLOCK_OFFSET.get("ton", 10) + old_seqno = max(0, last_block["seqno"] - offset) + + latest_block_id = ( f"{last_block['workchain']}:{last_block['shard']}:{last_block['seqno']}" ) + old_block_id = ( + f"{last_block['workchain']}:{last_block['shard']}:{old_seqno}" + ) block = await self._make_rpc_request( "getBlockTransactions", @@ -147,17 +194,20 @@ async def _fetch_ton_data(self) -> BlockchainData: if isinstance(block, dict) and block.get("transactions"): tx_id = block["transactions"][0].get("hash", "") - # self._logger.info(f"{block_id} {tx_id}") - return BlockchainData(block_id=block_id, transaction_id=tx_id) + return BlockchainData( + block_id=latest_block_id, + transaction_id=tx_id, + old_block_id=old_block_id, + ) except Exception as e: self._logger.error(f"TON fetch failed: {e!s}") - return BlockchainData(block_id="", transaction_id="") + return BlockchainData(block_id="", transaction_id="", old_block_id="") async def fetch_latest_data(self, blockchain: str) -> BlockchainData: try: if blockchain in ("ethereum", "base"): - return await self._fetch_evm_data() + return await self._fetch_evm_data(blockchain) elif blockchain == "solana": return await self._fetch_solana_data() elif blockchain == "ton": diff --git a/common/state/blockchain_state.py b/common/state/blockchain_state.py index a0327d6..a4f5917 100644 --- a/common/state/blockchain_state.py +++ b/common/state/blockchain_state.py @@ -49,7 +49,14 @@ async def _fetch_state_data(session: aiohttp.ClientSession, blob_url: str) -> Di async with session.get(blob_url, headers=headers) as response: if response.status != 200: raise ValueError(f"Failed to fetch state: {response.status}") - return await response.json() + data = await response.json() + + # Ensure backward compatibility for old state data + for chain in data: + if isinstance(data[chain], dict) and "old_block" not in data[chain]: + data[chain]["old_block"] = "" + + return data @staticmethod async def get_data(blockchain: str) -> dict: diff --git a/config/defaults.py b/config/defaults.py index 60c3f5e..204e3ed 100644 --- a/config/defaults.py +++ b/config/defaults.py @@ -22,6 +22,9 @@ class MetricsServiceConfig: "dev_" if os.getenv("VERCEL_ENV") != "production" else "" ) # System env var, standard name + # Block offset configuration (N blocks back from latest) + BLOCK_OFFSET = {"ethereum": 20, "base": 20, "solana": 100, "ton": 10} + class BlobStorageConfig: """Default configuration for blob storage.""" From a993f182f72f8d8972bc1937219afd01fdc75503 Mon Sep 17 00:00:00 2001 From: smypmsa Date: Sat, 22 Feb 2025 13:44:23 +0000 Subject: [PATCH 2/5] feat: add preceding block support to existing metrics --- common/metric_types.py | 2 +- common/state/blockchain_fetcher.py | 14 ++++++++------ metrics/base.py | 7 ++++++- metrics/ethereum.py | 7 ++++++- metrics/solana.py | 11 ++++++----- metrics/ton.py | 4 ++-- 6 files changed, 29 insertions(+), 16 deletions(-) diff --git a/common/metric_types.py b/common/metric_types.py index 64c0dc2..86f8387 100644 --- a/common/metric_types.py +++ b/common/metric_types.py @@ -202,7 +202,7 @@ async def _send_request( ) if response.status == 429 and retry_count < MAX_RETRIES: - wait_time = int(response.headers.get("Retry-After", 10)) + wait_time = int(response.headers.get("Retry-After", 15)) await response.release() await asyncio.sleep(wait_time) return await self._send_request(session, endpoint, retry_count + 1) diff --git a/common/state/blockchain_fetcher.py b/common/state/blockchain_fetcher.py index c0f930d..d35b6af 100644 --- a/common/state/blockchain_fetcher.py +++ b/common/state/blockchain_fetcher.py @@ -66,15 +66,15 @@ async def _fetch_evm_data(self, blockchain: str) -> BlockchainData: latest_number = int(latest_block["number"], 16) offset = MetricsServiceConfig.BLOCK_OFFSET.get(blockchain.lower(), 20) old_number = max(0, latest_number - offset) - old_block = await self._make_rpc_request( - "eth_getBlockByNumber", [hex(old_number), False] - ) + # old_block = await self._make_rpc_request( + # "eth_getBlockByNumber", [hex(old_number), False] + # ) if not isinstance(latest_block, dict): return BlockchainData(block_id="", transaction_id="", old_block_id="") - block_hash = latest_block.get("hash", "") - old_block_hash = old_block.get("hash", "") if old_block else "" + # block_hash = latest_block.get("hash", "") + # old_block_hash = old_block.get("hash", "") if old_block else "" tx_hash = "" transactions = latest_block.get("transactions", []) @@ -86,7 +86,9 @@ async def _fetch_evm_data(self, blockchain: str) -> BlockchainData: ) return BlockchainData( - block_id=block_hash, transaction_id=tx_hash, old_block_id=old_block_hash + block_id=latest_block["number"], + transaction_id=tx_hash, + old_block_id=hex(old_number), ) except Exception as e: diff --git a/metrics/base.py b/metrics/base.py index 8193a5b..278361b 100644 --- a/metrics/base.py +++ b/metrics/base.py @@ -47,10 +47,15 @@ class HTTPAccBalanceLatencyMetric(HttpCallLatencyMetricBase): def method(self) -> str: return "eth_getBalance" + @staticmethod + def validate_state(state_data: dict) -> bool: + """Validates that required block number (hex) exists in state data.""" + return bool(state_data and state_data.get("old_block")) + @staticmethod def get_params_from_state(state_data: dict) -> list: """Get parameters with fixed monitoring address.""" - return ["0xF977814e90dA44bFA03b6295A0616a897441aceC", "latest"] + return ["0xF977814e90dA44bFA03b6295A0616a897441aceC", state_data["old_block"]] class HTTPDebugTraceTxLatencyMetric(HttpCallLatencyMetricBase): diff --git a/metrics/ethereum.py b/metrics/ethereum.py index b4e2e08..d69db36 100644 --- a/metrics/ethereum.py +++ b/metrics/ethereum.py @@ -65,10 +65,15 @@ class HTTPAccBalanceLatencyMetric(HttpCallLatencyMetricBase): def method(self) -> str: return "eth_getBalance" + @staticmethod + def validate_state(state_data: dict) -> bool: + """Validates that required block number (hex) exists in state data.""" + return bool(state_data and state_data.get("old_block")) + @staticmethod def get_params_from_state(state_data: dict) -> list: """Returns parameters for balance check of monitoring address.""" - return ["0x690B9A9E9aa1C9dB991C7721a92d351Db4FaC990", "pending"] + return ["0x690B9A9E9aa1C9dB991C7721a92d351Db4FaC990", state_data["old_block"]] class HTTPDebugTraceBlockByNumberLatencyMetric(HttpCallLatencyMetricBase): diff --git a/metrics/solana.py b/metrics/solana.py index 78576c3..8b99563 100644 --- a/metrics/solana.py +++ b/metrics/solana.py @@ -15,7 +15,8 @@ def get_params_from_state(state_data: dict) -> list: """Get parameters for simulating a token transfer.""" return [ "AQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAAEDArczbMia1tLmq7zz4DinMNN0pJ1JtLdqIJPUw3YrGCzYAMHBsgN27lcgB6H2WQvFgyZuJYHa46puOQo9yQ8CVQbd9uHXZaGT2cvhRs7reawctIXtX1s3kTqM9YV+/wCp20C7Wj2aiuk5TReAXo+VTVg8QTHjs0UjNMMKCvpzZ+ABAgEBARU=", - {"encoding": "base64"}, + # The transaction recent blockhash will be replaced with the most recent blockhash. + {"encoding": "base64", "replaceRecentBlockhash": True}, ] @@ -76,19 +77,19 @@ def method(self) -> str: @staticmethod def validate_state(state_data: dict) -> bool: """Validate blockchain state contains block slot number.""" - return bool(state_data and state_data.get("block")) + return bool(state_data and state_data.get("old_block")) @staticmethod def get_params_from_state(state_data: dict) -> list: """Get parameters using block slot from state.""" return [ - int(state_data["block"]), + int(state_data["old_block"]), { "encoding": "jsonParsed", "maxSupportedTransactionVersion": 0, "transactionDetails": "none", # Reduce response size - "rewards": False # Further reduce response size - } + "rewards": False, # Further reduce response size + }, ] diff --git a/metrics/ton.py b/metrics/ton.py index 2b0c050..4936d4b 100644 --- a/metrics/ton.py +++ b/metrics/ton.py @@ -35,12 +35,12 @@ def method(self) -> str: @staticmethod def validate_state(state_data: dict) -> bool: """Validates that required block identifier exists in state data.""" - return bool(state_data and state_data.get("block")) + return bool(state_data and state_data.get("old_block")) @staticmethod def get_params_from_state(state_data: dict) -> dict: """Returns parameters using TON block identifier components.""" - workchain, shard, seqno = state_data["block"].split(":") + workchain, shard, seqno = state_data["old_block"].split(":") return { "workchain": int(workchain), "shard": shard, From 4d3d269a98fecc05d39ef902ebc89c84fc40a7cc Mon Sep 17 00:00:00 2001 From: smypmsa Date: Sat, 22 Feb 2025 13:56:39 +0000 Subject: [PATCH 3/5] feat: implement random block offset ranges for historical queries --- api/support/update_state.py | 1 + common/state/blockchain_fetcher.py | 19 +++++++++++-------- config/defaults.py | 7 ++++++- 3 files changed, 18 insertions(+), 9 deletions(-) diff --git a/api/support/update_state.py b/api/support/update_state.py index 6d23496..16836eb 100644 --- a/api/support/update_state.py +++ b/api/support/update_state.py @@ -142,6 +142,7 @@ async def update(self) -> str: blob_handler = BlobStorageHandler(self.blob_config) await blob_handler.update_data(blockchain_data) + print(blockchain_data) return "State updated successfully" diff --git a/common/state/blockchain_fetcher.py b/common/state/blockchain_fetcher.py index d35b6af..a802c8e 100644 --- a/common/state/blockchain_fetcher.py +++ b/common/state/blockchain_fetcher.py @@ -2,6 +2,7 @@ import asyncio import logging +import random from dataclasses import dataclass from typing import Any, Dict, List, Optional, Union @@ -64,17 +65,15 @@ async def _fetch_evm_data(self, blockchain: str) -> BlockchainData: ) latest_number = int(latest_block["number"], 16) - offset = MetricsServiceConfig.BLOCK_OFFSET.get(blockchain.lower(), 20) + offset_range = MetricsServiceConfig.BLOCK_OFFSET_RANGES.get( + blockchain.lower(), (20, 100) + ) + offset = random.randint(offset_range[0], offset_range[1]) old_number = max(0, latest_number - offset) - # old_block = await self._make_rpc_request( - # "eth_getBlockByNumber", [hex(old_number), False] - # ) if not isinstance(latest_block, dict): return BlockchainData(block_id="", transaction_id="", old_block_id="") - # block_hash = latest_block.get("hash", "") - # old_block_hash = old_block.get("hash", "") if old_block else "" tx_hash = "" transactions = latest_block.get("transactions", []) @@ -126,7 +125,10 @@ async def _fetch_solana_data(self) -> BlockchainData: if signatures: tx_sig = signatures[0] - offset = MetricsServiceConfig.BLOCK_OFFSET.get("solana", 100) + offset_range = MetricsServiceConfig.BLOCK_OFFSET_RANGES.get( + "solana", (100, 1000) + ) + offset = random.randint(offset_range[0], offset_range[1]) target_slot = max(0, latest_slot - offset) old_slot = None @@ -172,7 +174,8 @@ async def _fetch_ton_data(self) -> BlockchainData: if not isinstance(last_block, dict): raise ValueError("Invalid last block format") - offset = MetricsServiceConfig.BLOCK_OFFSET.get("ton", 10) + offset_range = MetricsServiceConfig.BLOCK_OFFSET_RANGES.get("ton", (10, 50)) + offset = random.randint(offset_range[0], offset_range[1]) old_seqno = max(0, last_block["seqno"] - offset) latest_block_id = ( diff --git a/config/defaults.py b/config/defaults.py index 204e3ed..28f2a6e 100644 --- a/config/defaults.py +++ b/config/defaults.py @@ -23,7 +23,12 @@ class MetricsServiceConfig: ) # System env var, standard name # Block offset configuration (N blocks back from latest) - BLOCK_OFFSET = {"ethereum": 20, "base": 20, "solana": 100, "ton": 10} + BLOCK_OFFSET_RANGES = { + "ethereum": (20, 100), # ~5-25 minutes back + "base": (20, 100), # ~5-25 minutes back + "solana": (100, 1000), # ~50-500 seconds back + "ton": (10, 50), # ~50-250 seconds back + } class BlobStorageConfig: From 1e86a42a8ec519c89cbd4ae7ad68c0ade1ed7296 Mon Sep 17 00:00:00 2001 From: smypmsa Date: Sat, 22 Feb 2025 14:16:54 +0000 Subject: [PATCH 4/5] chore: configure block offset ranges for each chain --- api/support/update_state.py | 1 - config/defaults.py | 8 ++++---- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/api/support/update_state.py b/api/support/update_state.py index 16836eb..6d23496 100644 --- a/api/support/update_state.py +++ b/api/support/update_state.py @@ -142,7 +142,6 @@ async def update(self) -> str: blob_handler = BlobStorageHandler(self.blob_config) await blob_handler.update_data(blockchain_data) - print(blockchain_data) return "State updated successfully" diff --git a/config/defaults.py b/config/defaults.py index 28f2a6e..8626a80 100644 --- a/config/defaults.py +++ b/config/defaults.py @@ -24,10 +24,10 @@ class MetricsServiceConfig: # Block offset configuration (N blocks back from latest) BLOCK_OFFSET_RANGES = { - "ethereum": (20, 100), # ~5-25 minutes back - "base": (20, 100), # ~5-25 minutes back - "solana": (100, 1000), # ~50-500 seconds back - "ton": (10, 50), # ~50-250 seconds back + "ethereum": (200, 1024), + "base": (200, 1024), + "solana": (324000, 334000), + "ton": (1600000, 1610000), } From 115ed3c8d5c8cc44b241b8a169c79ff5f734a3ee Mon Sep 17 00:00:00 2001 From: smypmsa Date: Sat, 22 Feb 2025 14:25:07 +0000 Subject: [PATCH 5/5] fix: measure only last request time in RPC metrics after 429 error --- common/metric_types.py | 43 ++++++++++++++++++++++++++---------------- 1 file changed, 27 insertions(+), 16 deletions(-) diff --git a/common/metric_types.py b/common/metric_types.py index 86f8387..efa9f8d 100644 --- a/common/metric_types.py +++ b/common/metric_types.py @@ -175,23 +175,42 @@ async def fetch_data(self) -> float: endpoint = self.config.endpoints.get_endpoint(self.method) async with aiohttp.ClientSession() as session: - start_time = time.monotonic() - async with await self._send_request(session, endpoint, 0) as response: # type: ignore + response_time = 0.0 # Do not include retried requests after 429 error + response = None + + for retry_count in range(MAX_RETRIES): + start_time = time.monotonic() + response = await self._send_request(session, endpoint, retry_count) + response_time = time.monotonic() - start_time + + if response.status == 429 and retry_count < MAX_RETRIES - 1: + wait_time = int(response.headers.get("Retry-After", 15)) + await response.release() # Release before retry + await asyncio.sleep(wait_time) + continue + + break + + if not response: + raise ValueError("No response received") + + try: if response.status != 200: raise ValueError(f"Status code: {response.status}") + json_response = await response.json() if "error" in json_response: raise ValueError(f"JSON-RPC error: {json_response['error']}") - return time.monotonic() - start_time + + return response_time + finally: + await response.release() async def _send_request( self, session: aiohttp.ClientSession, endpoint: str, retry_count: int ) -> aiohttp.ClientResponse: - """Send the request and handle rate limiting with retries.""" - if retry_count >= MAX_RETRIES: - raise ValueError("Status code: 429. Max retries exceeded") - - response = await session.post( + """Send the request without retry logic.""" + return await session.post( endpoint, headers={ "Accept": "application/json", @@ -201,14 +220,6 @@ async def _send_request( timeout=self.config.timeout, # type: ignore ) - if response.status == 429 and retry_count < MAX_RETRIES: - wait_time = int(response.headers.get("Retry-After", 15)) - await response.release() - await asyncio.sleep(wait_time) - return await self._send_request(session, endpoint, retry_count + 1) - - return response - def process_data(self, value: float) -> float: """Process raw latency measurement.""" return value