Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 58 additions & 9 deletions api/support/update_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
import logging
import os
from http.server import BaseHTTPRequestHandler
from typing import Dict, Set, Tuple
from typing import Any, Dict, Set, Tuple

from common.state.blob_storage import BlobConfig, BlobStorageHandler
from common.state.blockchain_fetcher import BlockchainData, BlockchainDataFetcher
from common.state.blockchain_state import BlockchainState

SUPPORTED_BLOCKCHAINS = ["ethereum", "solana", "ton", "base"]
ALLOWED_PROVIDERS = {"Chainstack"}
Expand All @@ -32,6 +33,7 @@ def __init__(self):
raise ValueError("Missing required blob storage configuration")

self.blob_config = BlobConfig(store_id=store_id, token=token) # type: ignore
self.logger = logging.getLogger(__name__)

async def _get_chainstack_endpoints(self) -> Dict[str, str]:
"""Get Chainstack endpoints for supported blockchains."""
Expand All @@ -54,18 +56,55 @@ async def _get_chainstack_endpoints(self) -> Dict[str, str]:

return chainstack_endpoints

async def _get_previous_data(self) -> Dict[str, Any]:
"""Fetch previous blockchain state data"""
try:
state = BlockchainState()
previous_data = {}
for blockchain in SUPPORTED_BLOCKCHAINS:
try:
chain_data = await state.get_data(blockchain)
if chain_data:
previous_data[blockchain] = chain_data
except Exception as e:
self.logger.warning(
f"Failed to get previous data for {blockchain}: {e}"
)
return previous_data
except Exception as e:
self.logger.error(f"Failed to get previous state data: {e}")
return {}

async def _collect_blockchain_data(
self, providers: Dict[str, str]
self, providers: Dict[str, str], previous_data: Dict[str, Any]
) -> Dict[str, dict]:
async def fetch_single(
blockchain: str, endpoint: str
) -> Tuple[str, Dict[str, str]]:
try:
fetcher = BlockchainDataFetcher(endpoint)
data: BlockchainData = await fetcher.fetch_latest_data(blockchain)
return blockchain, {"block": data.block_id, "tx": data.transaction_id}

if data.block_id and data.transaction_id:
return blockchain, {
"block": data.block_id,
"tx": data.transaction_id,
}

if blockchain in previous_data:
self.logger.warning(f"Using previous data for {blockchain}")
return blockchain, previous_data[blockchain]

self.logger.warning(f"Returning empty data for {blockchain}")
return blockchain, {"block": "", "tx": ""}
except Exception as e:
logging.error(f"Failed to fetch {blockchain} data: {e}")
self.logger.error(f"Failed to fetch {blockchain} data: {e}")
if blockchain in previous_data:
self.logger.warning(
f"Using previous data for {blockchain} after error"
)
return blockchain, previous_data[blockchain]
self.logger.warning(f"Returning empty data for {blockchain}")
return blockchain, {"block": "", "tx": ""}

tasks = [
Expand All @@ -85,21 +124,31 @@ async def update(self) -> str:
return "Region not authorized for state updates"

try:
providers = await self._get_chainstack_endpoints()
blockchain_data = await self._collect_blockchain_data(providers)
previous_data = await self._get_previous_data()

chainstack_endpoints = await self._get_chainstack_endpoints()
blockchain_data = await self._collect_blockchain_data(
chainstack_endpoints, previous_data
)

# If we didn't get any data, use previous data
if not blockchain_data:
return "No blockchain data collected"
if previous_data:
self.logger.warning("Using complete previous state as fallback")
blockchain_data = previous_data
else:
return "No blockchain data collected and no previous data available"

blob_handler = BlobStorageHandler(self.blob_config)
await blob_handler.update_data(blockchain_data)

return "State updated successfully"

except MissingEndpointsError as e:
logging.error(f"Configuration error: {e}")
self.logger.error(f"Configuration error: {e}")
raise
except Exception as e:
logging.error(f"State update failed: {e}")
self.logger.error(f"State update failed: {e}")
raise


Expand Down
10 changes: 7 additions & 3 deletions common/metrics_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,13 @@ def get_metrics_text(self) -> str:
metrics = self.get_metrics_influx_format()
return "\n".join(f"{metric} {current_time}" for metric in metrics)

async def collect_metrics(self, provider: dict, config: dict):
async def collect_metrics(self, provider: dict, config: dict, state_data: dict):
metric_config = MetricConfig(
timeout=self.grafana_config["metric_request_timeout"],
max_latency=self.grafana_config["metric_max_latency"],
endpoints=None, # Will be set in factory
extra_params={"tx_data": provider.get("data")},
)
state_data = await BlockchainState.get_data(self.blockchain)

metrics = MetricFactory.create_metrics(
blockchain_name=self.blockchain,
Expand Down Expand Up @@ -116,14 +115,19 @@ async def handle(self) -> Tuple[str, str]:
if p["blockchain"] == self.blockchain
]

state_data = await BlockchainState.get_data(self.blockchain)

collection_tasks = [
self.collect_metrics(provider, config) for provider in rpc_providers
self.collect_metrics(provider, config, state_data)
for provider in rpc_providers
]
await asyncio.gather(*collection_tasks, return_exceptions=True)

metrics_text = self.get_metrics_text()
if metrics_text:
await self.push_to_grafana(metrics_text)
else:
logging.warning("Noting to push to Grafana.")

return "done", metrics_text

Expand Down
38 changes: 25 additions & 13 deletions common/state/blockchain_fetcher.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Fetches latest block and transaction data from blockchain RPC nodes."""

import asyncio
import logging
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Union
Expand All @@ -21,7 +22,9 @@ class BlockchainDataFetcher:
def __init__(self, http_endpoint: str) -> None:
self.http_endpoint = http_endpoint
self._headers = {"Content-Type": "application/json"}
self._timeout = aiohttp.ClientTimeout(total=10)
self._timeout = aiohttp.ClientTimeout(total=15)
self._max_retries = 3
self._retry_delay = 5

logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
Expand All @@ -33,14 +36,23 @@ async def _make_rpc_request(
) -> Any:
request = {"jsonrpc": "2.0", "method": method, "params": params or [], "id": 1}

async with aiohttp.ClientSession(timeout=self._timeout) as session:
async with session.post(
self.http_endpoint, headers=self._headers, json=request
) as response:
data = await response.json()
if "error" in data:
raise Exception(f"RPC error: {data['error']}")
return data.get("result")
for attempt in range(1, self._max_retries + 1):
try:
async with aiohttp.ClientSession(timeout=self._timeout) as session:
async with session.post(
self.http_endpoint, headers=self._headers, json=request
) as response:
data = await response.json()
if "error" in data:
raise Exception(f"RPC error: {data['error']}")
return data.get("result")
except Exception as e:
self._logger.warning(f"Attempt {attempt} failed: {e}")
if attempt < self._max_retries:
await asyncio.sleep(self._retry_delay)
else:
self._logger.error(f"All {self._max_retries} attempts failed")
raise

async def _fetch_evm_data(self) -> BlockchainData:
try:
Expand All @@ -62,7 +74,7 @@ async def _fetch_evm_data(self) -> BlockchainData:
else transactions[0]
)

logging.info(f"{block_hash} {tx_hash}")
self._logger.info(f"{block_hash} {tx_hash}")
return BlockchainData(block_id=block_hash, transaction_id=tx_hash)

except Exception as e:
Expand Down Expand Up @@ -100,7 +112,7 @@ async def _fetch_solana_data(self) -> BlockchainData:
if signatures:
tx_sig = signatures[0]

logging.info(f"{block_slot} {tx_sig}")
self._logger.info(f"{block_slot} {tx_sig}")
return BlockchainData(block_id=str(block_slot), transaction_id=tx_sig)

except Exception as e:
Expand Down Expand Up @@ -135,7 +147,7 @@ async def _fetch_ton_data(self) -> BlockchainData:
if isinstance(block, dict) and block.get("transactions"):
tx_id = block["transactions"][0].get("hash", "")

logging.info(f"{block_id} {tx_id}")
self._logger.info(f"{block_id} {tx_id}")
return BlockchainData(block_id=block_id, transaction_id=tx_id)

except Exception as e:
Expand All @@ -153,5 +165,5 @@ async def fetch_latest_data(self, blockchain: str) -> BlockchainData:
raise ValueError(f"Unsupported blockchain: {blockchain}")

except Exception as e:
logging.error(f"Failed to fetch {blockchain} data: {e}")
self._logger.error(f"Failed to fetch {blockchain} data: {e}")
return BlockchainData(block_id="", transaction_id="")