Skip to content

Commit 5848c64

Browse files
authored
Merge pull request #12 from chainstacklabs/fix/improved-logging-and-state-update
Add resilient state updates with fallback
2 parents 673480f + ce9f4eb commit 5848c64

File tree

3 files changed

+90
-25
lines changed

3 files changed

+90
-25
lines changed

api/support/update_state.py

Lines changed: 58 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@
55
import logging
66
import os
77
from http.server import BaseHTTPRequestHandler
8-
from typing import Dict, Set, Tuple
8+
from typing import Any, Dict, Set, Tuple
99

1010
from common.state.blob_storage import BlobConfig, BlobStorageHandler
1111
from common.state.blockchain_fetcher import BlockchainData, BlockchainDataFetcher
12+
from common.state.blockchain_state import BlockchainState
1213

1314
SUPPORTED_BLOCKCHAINS = ["ethereum", "solana", "ton", "base"]
1415
ALLOWED_PROVIDERS = {"Chainstack"}
@@ -32,6 +33,7 @@ def __init__(self):
3233
raise ValueError("Missing required blob storage configuration")
3334

3435
self.blob_config = BlobConfig(store_id=store_id, token=token) # type: ignore
36+
self.logger = logging.getLogger(__name__)
3537

3638
async def _get_chainstack_endpoints(self) -> Dict[str, str]:
3739
"""Get Chainstack endpoints for supported blockchains."""
@@ -54,18 +56,55 @@ async def _get_chainstack_endpoints(self) -> Dict[str, str]:
5456

5557
return chainstack_endpoints
5658

59+
async def _get_previous_data(self) -> Dict[str, Any]:
60+
"""Fetch previous blockchain state data"""
61+
try:
62+
state = BlockchainState()
63+
previous_data = {}
64+
for blockchain in SUPPORTED_BLOCKCHAINS:
65+
try:
66+
chain_data = await state.get_data(blockchain)
67+
if chain_data:
68+
previous_data[blockchain] = chain_data
69+
except Exception as e:
70+
self.logger.warning(
71+
f"Failed to get previous data for {blockchain}: {e}"
72+
)
73+
return previous_data
74+
except Exception as e:
75+
self.logger.error(f"Failed to get previous state data: {e}")
76+
return {}
77+
5778
async def _collect_blockchain_data(
58-
self, providers: Dict[str, str]
79+
self, providers: Dict[str, str], previous_data: Dict[str, Any]
5980
) -> Dict[str, dict]:
6081
async def fetch_single(
6182
blockchain: str, endpoint: str
6283
) -> Tuple[str, Dict[str, str]]:
6384
try:
6485
fetcher = BlockchainDataFetcher(endpoint)
6586
data: BlockchainData = await fetcher.fetch_latest_data(blockchain)
66-
return blockchain, {"block": data.block_id, "tx": data.transaction_id}
87+
88+
if data.block_id and data.transaction_id:
89+
return blockchain, {
90+
"block": data.block_id,
91+
"tx": data.transaction_id,
92+
}
93+
94+
if blockchain in previous_data:
95+
self.logger.warning(f"Using previous data for {blockchain}")
96+
return blockchain, previous_data[blockchain]
97+
98+
self.logger.warning(f"Returning empty data for {blockchain}")
99+
return blockchain, {"block": "", "tx": ""}
67100
except Exception as e:
68-
logging.error(f"Failed to fetch {blockchain} data: {e}")
101+
self.logger.error(f"Failed to fetch {blockchain} data: {e}")
102+
if blockchain in previous_data:
103+
self.logger.warning(
104+
f"Using previous data for {blockchain} after error"
105+
)
106+
return blockchain, previous_data[blockchain]
107+
self.logger.warning(f"Returning empty data for {blockchain}")
69108
return blockchain, {"block": "", "tx": ""}
70109

71110
tasks = [
@@ -85,21 +124,31 @@ async def update(self) -> str:
85124
return "Region not authorized for state updates"
86125

87126
try:
88-
providers = await self._get_chainstack_endpoints()
89-
blockchain_data = await self._collect_blockchain_data(providers)
127+
previous_data = await self._get_previous_data()
128+
129+
chainstack_endpoints = await self._get_chainstack_endpoints()
130+
blockchain_data = await self._collect_blockchain_data(
131+
chainstack_endpoints, previous_data
132+
)
133+
134+
# If we didn't get any data, use previous data
90135
if not blockchain_data:
91-
return "No blockchain data collected"
136+
if previous_data:
137+
self.logger.warning("Using complete previous state as fallback")
138+
blockchain_data = previous_data
139+
else:
140+
return "No blockchain data collected and no previous data available"
92141

93142
blob_handler = BlobStorageHandler(self.blob_config)
94143
await blob_handler.update_data(blockchain_data)
95144

96145
return "State updated successfully"
97146

98147
except MissingEndpointsError as e:
99-
logging.error(f"Configuration error: {e}")
148+
self.logger.error(f"Configuration error: {e}")
100149
raise
101150
except Exception as e:
102-
logging.error(f"State update failed: {e}")
151+
self.logger.error(f"State update failed: {e}")
103152
raise
104153

105154

common/metrics_handler.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,14 +51,13 @@ def get_metrics_text(self) -> str:
5151
metrics = self.get_metrics_influx_format()
5252
return "\n".join(f"{metric} {current_time}" for metric in metrics)
5353

54-
async def collect_metrics(self, provider: dict, config: dict):
54+
async def collect_metrics(self, provider: dict, config: dict, state_data: dict):
5555
metric_config = MetricConfig(
5656
timeout=self.grafana_config["metric_request_timeout"],
5757
max_latency=self.grafana_config["metric_max_latency"],
5858
endpoints=None, # Will be set in factory
5959
extra_params={"tx_data": provider.get("data")},
6060
)
61-
state_data = await BlockchainState.get_data(self.blockchain)
6261

6362
metrics = MetricFactory.create_metrics(
6463
blockchain_name=self.blockchain,
@@ -116,14 +115,19 @@ async def handle(self) -> Tuple[str, str]:
116115
if p["blockchain"] == self.blockchain
117116
]
118117

118+
state_data = await BlockchainState.get_data(self.blockchain)
119+
119120
collection_tasks = [
120-
self.collect_metrics(provider, config) for provider in rpc_providers
121+
self.collect_metrics(provider, config, state_data)
122+
for provider in rpc_providers
121123
]
122124
await asyncio.gather(*collection_tasks, return_exceptions=True)
123125

124126
metrics_text = self.get_metrics_text()
125127
if metrics_text:
126128
await self.push_to_grafana(metrics_text)
129+
else:
130+
logging.warning("Noting to push to Grafana.")
127131

128132
return "done", metrics_text
129133

common/state/blockchain_fetcher.py

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Fetches latest block and transaction data from blockchain RPC nodes."""
22

3+
import asyncio
34
import logging
45
from dataclasses import dataclass
56
from typing import Any, Dict, List, Optional, Union
@@ -21,7 +22,9 @@ class BlockchainDataFetcher:
2122
def __init__(self, http_endpoint: str) -> None:
2223
self.http_endpoint = http_endpoint
2324
self._headers = {"Content-Type": "application/json"}
24-
self._timeout = aiohttp.ClientTimeout(total=10)
25+
self._timeout = aiohttp.ClientTimeout(total=15)
26+
self._max_retries = 3
27+
self._retry_delay = 5
2528

2629
logging.basicConfig(
2730
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
@@ -33,14 +36,23 @@ async def _make_rpc_request(
3336
) -> Any:
3437
request = {"jsonrpc": "2.0", "method": method, "params": params or [], "id": 1}
3538

36-
async with aiohttp.ClientSession(timeout=self._timeout) as session:
37-
async with session.post(
38-
self.http_endpoint, headers=self._headers, json=request
39-
) as response:
40-
data = await response.json()
41-
if "error" in data:
42-
raise Exception(f"RPC error: {data['error']}")
43-
return data.get("result")
39+
for attempt in range(1, self._max_retries + 1):
40+
try:
41+
async with aiohttp.ClientSession(timeout=self._timeout) as session:
42+
async with session.post(
43+
self.http_endpoint, headers=self._headers, json=request
44+
) as response:
45+
data = await response.json()
46+
if "error" in data:
47+
raise Exception(f"RPC error: {data['error']}")
48+
return data.get("result")
49+
except Exception as e:
50+
self._logger.warning(f"Attempt {attempt} failed: {e}")
51+
if attempt < self._max_retries:
52+
await asyncio.sleep(self._retry_delay)
53+
else:
54+
self._logger.error(f"All {self._max_retries} attempts failed")
55+
raise
4456

4557
async def _fetch_evm_data(self) -> BlockchainData:
4658
try:
@@ -62,7 +74,7 @@ async def _fetch_evm_data(self) -> BlockchainData:
6274
else transactions[0]
6375
)
6476

65-
logging.info(f"{block_hash} {tx_hash}")
77+
self._logger.info(f"{block_hash} {tx_hash}")
6678
return BlockchainData(block_id=block_hash, transaction_id=tx_hash)
6779

6880
except Exception as e:
@@ -100,7 +112,7 @@ async def _fetch_solana_data(self) -> BlockchainData:
100112
if signatures:
101113
tx_sig = signatures[0]
102114

103-
logging.info(f"{block_slot} {tx_sig}")
115+
self._logger.info(f"{block_slot} {tx_sig}")
104116
return BlockchainData(block_id=str(block_slot), transaction_id=tx_sig)
105117

106118
except Exception as e:
@@ -135,7 +147,7 @@ async def _fetch_ton_data(self) -> BlockchainData:
135147
if isinstance(block, dict) and block.get("transactions"):
136148
tx_id = block["transactions"][0].get("hash", "")
137149

138-
logging.info(f"{block_id} {tx_id}")
150+
self._logger.info(f"{block_id} {tx_id}")
139151
return BlockchainData(block_id=block_id, transaction_id=tx_id)
140152

141153
except Exception as e:
@@ -153,5 +165,5 @@ async def fetch_latest_data(self, blockchain: str) -> BlockchainData:
153165
raise ValueError(f"Unsupported blockchain: {blockchain}")
154166

155167
except Exception as e:
156-
logging.error(f"Failed to fetch {blockchain} data: {e}")
168+
self._logger.error(f"Failed to fetch {blockchain} data: {e}")
157169
return BlockchainData(block_id="", transaction_id="")

0 commit comments

Comments
 (0)