diff --git a/api/support/update_state.py b/api/support/update_state.py index c63c8fc..5e507ab 100644 --- a/api/support/update_state.py +++ b/api/support/update_state.py @@ -5,10 +5,11 @@ import logging import os from http.server import BaseHTTPRequestHandler -from typing import Dict, Set, Tuple +from typing import Any, Dict, Set, Tuple from common.state.blob_storage import BlobConfig, BlobStorageHandler from common.state.blockchain_fetcher import BlockchainData, BlockchainDataFetcher +from common.state.blockchain_state import BlockchainState SUPPORTED_BLOCKCHAINS = ["ethereum", "solana", "ton", "base"] ALLOWED_PROVIDERS = {"Chainstack"} @@ -32,6 +33,7 @@ def __init__(self): raise ValueError("Missing required blob storage configuration") self.blob_config = BlobConfig(store_id=store_id, token=token) # type: ignore + self.logger = logging.getLogger(__name__) async def _get_chainstack_endpoints(self) -> Dict[str, str]: """Get Chainstack endpoints for supported blockchains.""" @@ -54,8 +56,27 @@ async def _get_chainstack_endpoints(self) -> Dict[str, str]: return chainstack_endpoints + async def _get_previous_data(self) -> Dict[str, Any]: + """Fetch previous blockchain state data""" + try: + state = BlockchainState() + previous_data = {} + for blockchain in SUPPORTED_BLOCKCHAINS: + try: + chain_data = await state.get_data(blockchain) + if chain_data: + previous_data[blockchain] = chain_data + except Exception as e: + self.logger.warning( + f"Failed to get previous data for {blockchain}: {e}" + ) + return previous_data + except Exception as e: + self.logger.error(f"Failed to get previous state data: {e}") + return {} + async def _collect_blockchain_data( - self, providers: Dict[str, str] + self, providers: Dict[str, str], previous_data: Dict[str, Any] ) -> Dict[str, dict]: async def fetch_single( blockchain: str, endpoint: str @@ -63,9 +84,27 @@ async def fetch_single( try: fetcher = BlockchainDataFetcher(endpoint) data: BlockchainData = await fetcher.fetch_latest_data(blockchain) - return blockchain, {"block": data.block_id, "tx": data.transaction_id} + + if data.block_id and data.transaction_id: + return blockchain, { + "block": data.block_id, + "tx": data.transaction_id, + } + + if blockchain in previous_data: + self.logger.warning(f"Using previous data for {blockchain}") + return blockchain, previous_data[blockchain] + + self.logger.warning(f"Returning empty data for {blockchain}") + return blockchain, {"block": "", "tx": ""} except Exception as e: - logging.error(f"Failed to fetch {blockchain} data: {e}") + self.logger.error(f"Failed to fetch {blockchain} data: {e}") + if blockchain in previous_data: + self.logger.warning( + f"Using previous data for {blockchain} after error" + ) + return blockchain, previous_data[blockchain] + self.logger.warning(f"Returning empty data for {blockchain}") return blockchain, {"block": "", "tx": ""} tasks = [ @@ -85,10 +124,20 @@ async def update(self) -> str: return "Region not authorized for state updates" try: - providers = await self._get_chainstack_endpoints() - blockchain_data = await self._collect_blockchain_data(providers) + previous_data = await self._get_previous_data() + + chainstack_endpoints = await self._get_chainstack_endpoints() + blockchain_data = await self._collect_blockchain_data( + chainstack_endpoints, previous_data + ) + + # If we didn't get any data, use previous data if not blockchain_data: - return "No blockchain data collected" + if previous_data: + self.logger.warning("Using complete previous state as fallback") + blockchain_data = previous_data + else: + return "No blockchain data collected and no previous data available" blob_handler = BlobStorageHandler(self.blob_config) await blob_handler.update_data(blockchain_data) @@ -96,10 +145,10 @@ async def update(self) -> str: return "State updated successfully" except MissingEndpointsError as e: - logging.error(f"Configuration error: {e}") + self.logger.error(f"Configuration error: {e}") raise except Exception as e: - logging.error(f"State update failed: {e}") + self.logger.error(f"State update failed: {e}") raise diff --git a/common/metrics_handler.py b/common/metrics_handler.py index a45847c..ad4fa06 100644 --- a/common/metrics_handler.py +++ b/common/metrics_handler.py @@ -51,14 +51,13 @@ def get_metrics_text(self) -> str: metrics = self.get_metrics_influx_format() return "\n".join(f"{metric} {current_time}" for metric in metrics) - async def collect_metrics(self, provider: dict, config: dict): + async def collect_metrics(self, provider: dict, config: dict, state_data: dict): metric_config = MetricConfig( timeout=self.grafana_config["metric_request_timeout"], max_latency=self.grafana_config["metric_max_latency"], endpoints=None, # Will be set in factory extra_params={"tx_data": provider.get("data")}, ) - state_data = await BlockchainState.get_data(self.blockchain) metrics = MetricFactory.create_metrics( blockchain_name=self.blockchain, @@ -116,14 +115,19 @@ async def handle(self) -> Tuple[str, str]: if p["blockchain"] == self.blockchain ] + state_data = await BlockchainState.get_data(self.blockchain) + collection_tasks = [ - self.collect_metrics(provider, config) for provider in rpc_providers + self.collect_metrics(provider, config, state_data) + for provider in rpc_providers ] await asyncio.gather(*collection_tasks, return_exceptions=True) metrics_text = self.get_metrics_text() if metrics_text: await self.push_to_grafana(metrics_text) + else: + logging.warning("Noting to push to Grafana.") return "done", metrics_text diff --git a/common/state/blockchain_fetcher.py b/common/state/blockchain_fetcher.py index b87d4b0..261b470 100644 --- a/common/state/blockchain_fetcher.py +++ b/common/state/blockchain_fetcher.py @@ -1,5 +1,6 @@ """Fetches latest block and transaction data from blockchain RPC nodes.""" +import asyncio import logging from dataclasses import dataclass from typing import Any, Dict, List, Optional, Union @@ -21,7 +22,9 @@ class BlockchainDataFetcher: def __init__(self, http_endpoint: str) -> None: self.http_endpoint = http_endpoint self._headers = {"Content-Type": "application/json"} - self._timeout = aiohttp.ClientTimeout(total=10) + self._timeout = aiohttp.ClientTimeout(total=15) + self._max_retries = 3 + self._retry_delay = 5 logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" @@ -33,14 +36,23 @@ async def _make_rpc_request( ) -> Any: request = {"jsonrpc": "2.0", "method": method, "params": params or [], "id": 1} - async with aiohttp.ClientSession(timeout=self._timeout) as session: - async with session.post( - self.http_endpoint, headers=self._headers, json=request - ) as response: - data = await response.json() - if "error" in data: - raise Exception(f"RPC error: {data['error']}") - return data.get("result") + for attempt in range(1, self._max_retries + 1): + try: + async with aiohttp.ClientSession(timeout=self._timeout) as session: + async with session.post( + self.http_endpoint, headers=self._headers, json=request + ) as response: + data = await response.json() + if "error" in data: + raise Exception(f"RPC error: {data['error']}") + return data.get("result") + except Exception as e: + self._logger.warning(f"Attempt {attempt} failed: {e}") + if attempt < self._max_retries: + await asyncio.sleep(self._retry_delay) + else: + self._logger.error(f"All {self._max_retries} attempts failed") + raise async def _fetch_evm_data(self) -> BlockchainData: try: @@ -62,7 +74,7 @@ async def _fetch_evm_data(self) -> BlockchainData: else transactions[0] ) - logging.info(f"{block_hash} {tx_hash}") + self._logger.info(f"{block_hash} {tx_hash}") return BlockchainData(block_id=block_hash, transaction_id=tx_hash) except Exception as e: @@ -100,7 +112,7 @@ async def _fetch_solana_data(self) -> BlockchainData: if signatures: tx_sig = signatures[0] - logging.info(f"{block_slot} {tx_sig}") + self._logger.info(f"{block_slot} {tx_sig}") return BlockchainData(block_id=str(block_slot), transaction_id=tx_sig) except Exception as e: @@ -135,7 +147,7 @@ async def _fetch_ton_data(self) -> BlockchainData: if isinstance(block, dict) and block.get("transactions"): tx_id = block["transactions"][0].get("hash", "") - logging.info(f"{block_id} {tx_id}") + self._logger.info(f"{block_id} {tx_id}") return BlockchainData(block_id=block_id, transaction_id=tx_id) except Exception as e: @@ -153,5 +165,5 @@ async def fetch_latest_data(self, blockchain: str) -> BlockchainData: raise ValueError(f"Unsupported blockchain: {blockchain}") except Exception as e: - logging.error(f"Failed to fetch {blockchain} data: {e}") + self._logger.error(f"Failed to fetch {blockchain} data: {e}") return BlockchainData(block_id="", transaction_id="")