Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 93 additions & 75 deletions common/state/blockchain_fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import logging
import random
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Union
from typing import Any, Dict, List, Optional, Tuple, Union

import aiohttp

Expand All @@ -19,6 +19,10 @@ class BlockchainData:
transaction_id: str
old_block_id: str = ""

@classmethod
def empty(cls) -> "BlockchainData":
return cls(block_id="", transaction_id="", old_block_id="")


class BlockchainDataFetcher:
"""Fetches blockchain data from RPC nodes using JSON-RPC protocol."""
Expand All @@ -38,6 +42,7 @@ def __init__(self, http_endpoint: str) -> None:
async def _make_rpc_request(
self, method: str, params: Optional[Union[List, Dict]] = None
) -> Any:
"""Makes a JSON-RPC request with retries."""
request = {"jsonrpc": "2.0", "method": method, "params": params or [], "id": 1}

for attempt in range(1, self._max_retries + 1):
Expand All @@ -47,22 +52,69 @@ async def _make_rpc_request(
self.http_endpoint, headers=self._headers, json=request
) as response:
data = await response.json()

if "error" in data:
raise Exception(f"RPC error: {data['error']}")
error = data["error"]
if error.get("code") == -32004:
raise ValueError(f"Block not available: {error}")
if attempt < self._max_retries:
self._logger.warning(
f"Attempt {attempt} failed: {error}"
)
await asyncio.sleep(self._retry_delay)
continue
raise ValueError(f"RPC error after all retries: {error}")

return data.get("result")

except Exception as e:
self._logger.warning(f"Attempt {attempt} failed: {e}")
if "Block not available" in str(e):
raise
if attempt < self._max_retries:
self._logger.warning(f"Attempt {attempt} failed: {e}")
await asyncio.sleep(self._retry_delay)
else:
self._logger.error(f"All {self._max_retries} attempts failed")
raise
continue
raise

async def _get_block_in_range(
self, slot_start: int, slot_end: int
) -> Tuple[Optional[int], Optional[Dict]]:
"""Search for available block in given slot range."""
current_slot = slot_end
while current_slot >= slot_start:
try:
block = await self._make_rpc_request(
"getBlock",
[
current_slot,
{
"encoding": "json",
"maxSupportedTransactionVersion": 0,
"transactionDetails": "signatures",
"rewards": False,
},
],
)
if block:
return current_slot, block
current_slot -= 1

except ValueError as e:
if "Block not available" in str(e):
current_slot -= 1
continue
raise

return None, None

async def _fetch_evm_data(self, blockchain: str) -> BlockchainData:
"""Fetches latest block data from EVM-compatible chains."""
try:
latest_block = await self._make_rpc_request(
"eth_getBlockByNumber", ["latest", True]
"eth_getBlockByNumber", ["latest", False]
)
if not isinstance(latest_block, dict):
return BlockchainData.empty()

latest_number = int(latest_block["number"], 16)
offset_range = MetricsServiceConfig.BLOCK_OFFSET_RANGES.get(
Expand All @@ -71,18 +123,12 @@ async def _fetch_evm_data(self, blockchain: str) -> BlockchainData:
offset = random.randint(offset_range[0], offset_range[1])
old_number = max(0, latest_number - offset)

if not isinstance(latest_block, dict):
return BlockchainData(block_id="", transaction_id="", old_block_id="")

tx_hash = ""

transactions = latest_block.get("transactions", [])
if transactions and isinstance(transactions[0], (dict, str)):
tx_hash = (
transactions[0].get("hash", "")
if isinstance(transactions[0], dict)
else transactions[0]
)
tx_hash = (
transactions[0].get("hash", "")
if isinstance(transactions[0], dict)
else transactions[0] if transactions else ""
)

return BlockchainData(
block_id=latest_block["number"],
Expand All @@ -92,87 +138,56 @@ async def _fetch_evm_data(self, blockchain: str) -> BlockchainData:

except Exception as e:
self._logger.error(f"EVM fetch failed: {e!s}")
return BlockchainData(block_id="", transaction_id="", old_block_id="")
return BlockchainData.empty()

async def _fetch_solana_data(self) -> BlockchainData:
"""Fetches latest block data from Solana."""
try:
block_info = await self._make_rpc_request(
"getLatestBlockhash", [{"commitment": "finalized"}]
)
if not isinstance(block_info, dict):
return BlockchainData(block_id="", transaction_id="", old_block_id="")
return BlockchainData.empty()

latest_slot = block_info.get("context", {}).get("slot", "")
latest_slot = block_info.get("context", {}).get("slot")
if not latest_slot:
return BlockchainData(block_id="", transaction_id="", old_block_id="")
return BlockchainData.empty()

latest_block = await self._make_rpc_request(
"getBlock",
[
latest_slot,
{
"encoding": "json",
"maxSupportedTransactionVersion": 0,
"transactionDetails": "signatures",
"rewards": False,
},
],
actual_latest_slot, latest_block = await self._get_block_in_range(
latest_slot - 10, latest_slot
)
if not actual_latest_slot or not latest_block:
return BlockchainData.empty()

tx_sig = ""
if isinstance(latest_block, dict):
signatures = latest_block.get("signatures", [])
if signatures:
tx_sig = signatures[0]
tx_sig = latest_block.get("signatures", [""])[0]

offset_range = MetricsServiceConfig.BLOCK_OFFSET_RANGES.get(
"solana", (100, 1000)
)
offset = random.randint(offset_range[0], offset_range[1])
target_slot = max(0, latest_slot - offset)
old_slot = None

for slot in range(target_slot - 100, target_slot):
try:
block_exists = await self._make_rpc_request(
"getBlock",
[
slot,
{
"encoding": "json",
"maxSupportedTransactionVersion": 0,
"transactionDetails": "none",
"rewards": False,
},
],
)
if block_exists:
old_slot = slot
break
except Exception as e:
if "Block not available" not in str(e):
self._logger.warning(f"Error checking slot {slot}: {e}")
continue
target_slot = max(0, actual_latest_slot - offset)
old_slot, _ = await self._get_block_in_range(target_slot - 10, target_slot)

return BlockchainData(
block_id=str(latest_slot),
block_id=str(actual_latest_slot),
transaction_id=tx_sig,
old_block_id=str(old_slot) if old_slot is not None else "",
old_block_id=str(old_slot or target_slot),
)

except Exception as e:
self._logger.error(f"Solana fetch failed: {e!s}")
return BlockchainData(block_id="", transaction_id="", old_block_id="")
return BlockchainData.empty()

async def _fetch_ton_data(self) -> BlockchainData:
"""Fetches latest block data from TON."""
try:
info = await self._make_rpc_request("getMasterchainInfo")
if not isinstance(info, dict) or "last" not in info:
raise ValueError("Invalid masterchain info")
return BlockchainData.empty()

last_block = info["last"]
if not isinstance(last_block, dict):
raise ValueError("Invalid last block format")
return BlockchainData.empty()

offset_range = MetricsServiceConfig.BLOCK_OFFSET_RANGES.get("ton", (10, 50))
offset = random.randint(offset_range[0], offset_range[1])
Expand All @@ -195,9 +210,11 @@ async def _fetch_ton_data(self) -> BlockchainData:
},
)

tx_id = ""
if isinstance(block, dict) and block.get("transactions"):
tx_id = block["transactions"][0].get("hash", "")
tx_id = (
block.get("transactions", [{}])[0].get("hash", "")
if isinstance(block, dict)
else ""
)

return BlockchainData(
block_id=latest_block_id,
Expand All @@ -207,18 +224,19 @@ async def _fetch_ton_data(self) -> BlockchainData:

except Exception as e:
self._logger.error(f"TON fetch failed: {e!s}")
return BlockchainData(block_id="", transaction_id="", old_block_id="")
return BlockchainData.empty()

async def fetch_latest_data(self, blockchain: str) -> BlockchainData:
"""Fetches latest block and transaction data for specified blockchain."""
try:
if blockchain in ("ethereum", "base"):
if blockchain.lower() in ("ethereum", "base"):
return await self._fetch_evm_data(blockchain)
elif blockchain == "solana":
elif blockchain.lower() == "solana":
return await self._fetch_solana_data()
elif blockchain == "ton":
elif blockchain.lower() == "ton":
return await self._fetch_ton_data()
raise ValueError(f"Unsupported blockchain: {blockchain}")

except Exception as e:
self._logger.error(f"Failed to fetch {blockchain} data: {e}")
return BlockchainData(block_id="", transaction_id="")
return BlockchainData.empty()