From 208701d884a529b42e0cc94250c6d1766d06cfba Mon Sep 17 00:00:00 2001 From: smypmsa Date: Sat, 22 Feb 2025 17:04:53 +0000 Subject: [PATCH 1/2] fix: improve error handling and Solana block fetching --- common/state/blockchain_fetcher.py | 93 +++++++++++++++++------------- 1 file changed, 52 insertions(+), 41 deletions(-) diff --git a/common/state/blockchain_fetcher.py b/common/state/blockchain_fetcher.py index a802c8e..96d31b1 100644 --- a/common/state/blockchain_fetcher.py +++ b/common/state/blockchain_fetcher.py @@ -4,7 +4,7 @@ import logging import random from dataclasses import dataclass -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, List, Optional, Tuple, Union import aiohttp @@ -48,15 +48,29 @@ async def _make_rpc_request( ) as response: data = await response.json() if "error" in data: - raise Exception(f"RPC error: {data['error']}") + error = data["error"] + if error.get("code") == -32004: + raise Exception(f"Block not available: {error}") + + if attempt < self._max_retries: + self._logger.warning( + f"Attempt {attempt} failed: {error}" + ) + await asyncio.sleep(self._retry_delay) + continue + + raise Exception(f"RPC error after all retries: {error}") + return data.get("result") + except Exception as e: - self._logger.warning(f"Attempt {attempt} failed: {e}") + if "Block not available" in str(e): + raise if attempt < self._max_retries: + self._logger.warning(f"Attempt {attempt} failed: {e}") await asyncio.sleep(self._retry_delay) - else: - self._logger.error(f"All {self._max_retries} attempts failed") - raise + continue + raise async def _fetch_evm_data(self, blockchain: str) -> BlockchainData: try: @@ -94,6 +108,34 @@ async def _fetch_evm_data(self, blockchain: str) -> BlockchainData: self._logger.error(f"EVM fetch failed: {e!s}") return BlockchainData(block_id="", transaction_id="", old_block_id="") + async def _get_block_in_range( + self, slot_start: int, slot_end: int, get_signatures: bool = False + ) -> Tuple[Optional[int], Optional[Dict]]: + for slot in range(slot_start, slot_end + 1): + try: + block = await self._make_rpc_request( + "getBlock", + [ + slot, + { + "encoding": "json", + "maxSupportedTransactionVersion": 0, + "transactionDetails": ( + "signatures" if get_signatures else "none" + ), + "rewards": False, + }, + ], + ) + if block: + return slot, block + except Exception as e: + if "Block not available" in str(e): + continue + self._logger.warning(f"Unexpected error checking slot {slot}: {e}") + raise + raise Exception(f"No blocks found in range {slot_start} to {slot_end}") + async def _fetch_solana_data(self) -> BlockchainData: try: block_info = await self._make_rpc_request( @@ -106,21 +148,12 @@ async def _fetch_solana_data(self) -> BlockchainData: if not latest_slot: return BlockchainData(block_id="", transaction_id="", old_block_id="") - latest_block = await self._make_rpc_request( - "getBlock", - [ - latest_slot, - { - "encoding": "json", - "maxSupportedTransactionVersion": 0, - "transactionDetails": "signatures", - "rewards": False, - }, - ], + _, latest_block = await self._get_block_in_range( + latest_slot, latest_slot, get_signatures=True ) tx_sig = "" - if isinstance(latest_block, dict): + if latest_block: signatures = latest_block.get("signatures", []) if signatures: tx_sig = signatures[0] @@ -130,29 +163,7 @@ async def _fetch_solana_data(self) -> BlockchainData: ) offset = random.randint(offset_range[0], offset_range[1]) target_slot = max(0, latest_slot - offset) - old_slot = None - - for slot in range(target_slot - 100, target_slot): - try: - block_exists = await self._make_rpc_request( - "getBlock", - [ - slot, - { - "encoding": "json", - "maxSupportedTransactionVersion": 0, - "transactionDetails": "none", - "rewards": False, - }, - ], - ) - if block_exists: - old_slot = slot - break - except Exception as e: - if "Block not available" not in str(e): - self._logger.warning(f"Error checking slot {slot}: {e}") - continue + old_slot, _ = await self._get_block_in_range(target_slot - 100, target_slot) return BlockchainData( block_id=str(latest_slot), From 0474e1e32eec0c93be88e0492f3e44bf6a479ae9 Mon Sep 17 00:00:00 2001 From: smypmsa Date: Sat, 22 Feb 2025 19:45:15 +0000 Subject: [PATCH 2/2] fix: improve Solana block search with descending slot traversal --- common/state/blockchain_fetcher.py | 147 +++++++++++++++-------------- 1 file changed, 77 insertions(+), 70 deletions(-) diff --git a/common/state/blockchain_fetcher.py b/common/state/blockchain_fetcher.py index 96d31b1..542bb9e 100644 --- a/common/state/blockchain_fetcher.py +++ b/common/state/blockchain_fetcher.py @@ -19,6 +19,10 @@ class BlockchainData: transaction_id: str old_block_id: str = "" + @classmethod + def empty(cls) -> "BlockchainData": + return cls(block_id="", transaction_id="", old_block_id="") + class BlockchainDataFetcher: """Fetches blockchain data from RPC nodes using JSON-RPC protocol.""" @@ -38,6 +42,7 @@ def __init__(self, http_endpoint: str) -> None: async def _make_rpc_request( self, method: str, params: Optional[Union[List, Dict]] = None ) -> Any: + """Makes a JSON-RPC request with retries.""" request = {"jsonrpc": "2.0", "method": method, "params": params or [], "id": 1} for attempt in range(1, self._max_retries + 1): @@ -47,19 +52,18 @@ async def _make_rpc_request( self.http_endpoint, headers=self._headers, json=request ) as response: data = await response.json() + if "error" in data: error = data["error"] if error.get("code") == -32004: - raise Exception(f"Block not available: {error}") - + raise ValueError(f"Block not available: {error}") if attempt < self._max_retries: self._logger.warning( f"Attempt {attempt} failed: {error}" ) await asyncio.sleep(self._retry_delay) continue - - raise Exception(f"RPC error after all retries: {error}") + raise ValueError(f"RPC error after all retries: {error}") return data.get("result") @@ -72,11 +76,45 @@ async def _make_rpc_request( continue raise + async def _get_block_in_range( + self, slot_start: int, slot_end: int + ) -> Tuple[Optional[int], Optional[Dict]]: + """Search for available block in given slot range.""" + current_slot = slot_end + while current_slot >= slot_start: + try: + block = await self._make_rpc_request( + "getBlock", + [ + current_slot, + { + "encoding": "json", + "maxSupportedTransactionVersion": 0, + "transactionDetails": "signatures", + "rewards": False, + }, + ], + ) + if block: + return current_slot, block + current_slot -= 1 + + except ValueError as e: + if "Block not available" in str(e): + current_slot -= 1 + continue + raise + + return None, None + async def _fetch_evm_data(self, blockchain: str) -> BlockchainData: + """Fetches latest block data from EVM-compatible chains.""" try: latest_block = await self._make_rpc_request( - "eth_getBlockByNumber", ["latest", True] + "eth_getBlockByNumber", ["latest", False] ) + if not isinstance(latest_block, dict): + return BlockchainData.empty() latest_number = int(latest_block["number"], 16) offset_range = MetricsServiceConfig.BLOCK_OFFSET_RANGES.get( @@ -85,18 +123,12 @@ async def _fetch_evm_data(self, blockchain: str) -> BlockchainData: offset = random.randint(offset_range[0], offset_range[1]) old_number = max(0, latest_number - offset) - if not isinstance(latest_block, dict): - return BlockchainData(block_id="", transaction_id="", old_block_id="") - - tx_hash = "" - transactions = latest_block.get("transactions", []) - if transactions and isinstance(transactions[0], (dict, str)): - tx_hash = ( - transactions[0].get("hash", "") - if isinstance(transactions[0], dict) - else transactions[0] - ) + tx_hash = ( + transactions[0].get("hash", "") + if isinstance(transactions[0], dict) + else transactions[0] if transactions else "" + ) return BlockchainData( block_id=latest_block["number"], @@ -106,84 +138,56 @@ async def _fetch_evm_data(self, blockchain: str) -> BlockchainData: except Exception as e: self._logger.error(f"EVM fetch failed: {e!s}") - return BlockchainData(block_id="", transaction_id="", old_block_id="") - - async def _get_block_in_range( - self, slot_start: int, slot_end: int, get_signatures: bool = False - ) -> Tuple[Optional[int], Optional[Dict]]: - for slot in range(slot_start, slot_end + 1): - try: - block = await self._make_rpc_request( - "getBlock", - [ - slot, - { - "encoding": "json", - "maxSupportedTransactionVersion": 0, - "transactionDetails": ( - "signatures" if get_signatures else "none" - ), - "rewards": False, - }, - ], - ) - if block: - return slot, block - except Exception as e: - if "Block not available" in str(e): - continue - self._logger.warning(f"Unexpected error checking slot {slot}: {e}") - raise - raise Exception(f"No blocks found in range {slot_start} to {slot_end}") + return BlockchainData.empty() async def _fetch_solana_data(self) -> BlockchainData: + """Fetches latest block data from Solana.""" try: block_info = await self._make_rpc_request( "getLatestBlockhash", [{"commitment": "finalized"}] ) if not isinstance(block_info, dict): - return BlockchainData(block_id="", transaction_id="", old_block_id="") + return BlockchainData.empty() - latest_slot = block_info.get("context", {}).get("slot", "") + latest_slot = block_info.get("context", {}).get("slot") if not latest_slot: - return BlockchainData(block_id="", transaction_id="", old_block_id="") + return BlockchainData.empty() - _, latest_block = await self._get_block_in_range( - latest_slot, latest_slot, get_signatures=True + actual_latest_slot, latest_block = await self._get_block_in_range( + latest_slot - 10, latest_slot ) + if not actual_latest_slot or not latest_block: + return BlockchainData.empty() - tx_sig = "" - if latest_block: - signatures = latest_block.get("signatures", []) - if signatures: - tx_sig = signatures[0] + tx_sig = latest_block.get("signatures", [""])[0] offset_range = MetricsServiceConfig.BLOCK_OFFSET_RANGES.get( "solana", (100, 1000) ) offset = random.randint(offset_range[0], offset_range[1]) - target_slot = max(0, latest_slot - offset) - old_slot, _ = await self._get_block_in_range(target_slot - 100, target_slot) + target_slot = max(0, actual_latest_slot - offset) + old_slot, _ = await self._get_block_in_range(target_slot - 10, target_slot) return BlockchainData( - block_id=str(latest_slot), + block_id=str(actual_latest_slot), transaction_id=tx_sig, - old_block_id=str(old_slot) if old_slot is not None else "", + old_block_id=str(old_slot or target_slot), ) except Exception as e: self._logger.error(f"Solana fetch failed: {e!s}") - return BlockchainData(block_id="", transaction_id="", old_block_id="") + return BlockchainData.empty() async def _fetch_ton_data(self) -> BlockchainData: + """Fetches latest block data from TON.""" try: info = await self._make_rpc_request("getMasterchainInfo") if not isinstance(info, dict) or "last" not in info: - raise ValueError("Invalid masterchain info") + return BlockchainData.empty() last_block = info["last"] if not isinstance(last_block, dict): - raise ValueError("Invalid last block format") + return BlockchainData.empty() offset_range = MetricsServiceConfig.BLOCK_OFFSET_RANGES.get("ton", (10, 50)) offset = random.randint(offset_range[0], offset_range[1]) @@ -206,9 +210,11 @@ async def _fetch_ton_data(self) -> BlockchainData: }, ) - tx_id = "" - if isinstance(block, dict) and block.get("transactions"): - tx_id = block["transactions"][0].get("hash", "") + tx_id = ( + block.get("transactions", [{}])[0].get("hash", "") + if isinstance(block, dict) + else "" + ) return BlockchainData( block_id=latest_block_id, @@ -218,18 +224,19 @@ async def _fetch_ton_data(self) -> BlockchainData: except Exception as e: self._logger.error(f"TON fetch failed: {e!s}") - return BlockchainData(block_id="", transaction_id="", old_block_id="") + return BlockchainData.empty() async def fetch_latest_data(self, blockchain: str) -> BlockchainData: + """Fetches latest block and transaction data for specified blockchain.""" try: - if blockchain in ("ethereum", "base"): + if blockchain.lower() in ("ethereum", "base"): return await self._fetch_evm_data(blockchain) - elif blockchain == "solana": + elif blockchain.lower() == "solana": return await self._fetch_solana_data() - elif blockchain == "ton": + elif blockchain.lower() == "ton": return await self._fetch_ton_data() raise ValueError(f"Unsupported blockchain: {blockchain}") except Exception as e: self._logger.error(f"Failed to fetch {blockchain} data: {e}") - return BlockchainData(block_id="", transaction_id="") + return BlockchainData.empty()