Skip to content

Commit 4fa094a

Browse files
authored
Merge pull request #25 from chainstacklabs/dev
[Promotion] Deploy dev to production (PR #24)
2 parents 489301b + 49d776b commit 4fa094a

File tree

1 file changed

+93
-75
lines changed

1 file changed

+93
-75
lines changed

common/state/blockchain_fetcher.py

Lines changed: 93 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import logging
55
import random
66
from dataclasses import dataclass
7-
from typing import Any, Dict, List, Optional, Union
7+
from typing import Any, Dict, List, Optional, Tuple, Union
88

99
import aiohttp
1010

@@ -19,6 +19,10 @@ class BlockchainData:
1919
transaction_id: str
2020
old_block_id: str = ""
2121

22+
@classmethod
23+
def empty(cls) -> "BlockchainData":
24+
return cls(block_id="", transaction_id="", old_block_id="")
25+
2226

2327
class BlockchainDataFetcher:
2428
"""Fetches blockchain data from RPC nodes using JSON-RPC protocol."""
@@ -38,6 +42,7 @@ def __init__(self, http_endpoint: str) -> None:
3842
async def _make_rpc_request(
3943
self, method: str, params: Optional[Union[List, Dict]] = None
4044
) -> Any:
45+
"""Makes a JSON-RPC request with retries."""
4146
request = {"jsonrpc": "2.0", "method": method, "params": params or [], "id": 1}
4247

4348
for attempt in range(1, self._max_retries + 1):
@@ -47,22 +52,69 @@ async def _make_rpc_request(
4752
self.http_endpoint, headers=self._headers, json=request
4853
) as response:
4954
data = await response.json()
55+
5056
if "error" in data:
51-
raise Exception(f"RPC error: {data['error']}")
57+
error = data["error"]
58+
if error.get("code") == -32004:
59+
raise ValueError(f"Block not available: {error}")
60+
if attempt < self._max_retries:
61+
self._logger.warning(
62+
f"Attempt {attempt} failed: {error}"
63+
)
64+
await asyncio.sleep(self._retry_delay)
65+
continue
66+
raise ValueError(f"RPC error after all retries: {error}")
67+
5268
return data.get("result")
69+
5370
except Exception as e:
54-
self._logger.warning(f"Attempt {attempt} failed: {e}")
71+
if "Block not available" in str(e):
72+
raise
5573
if attempt < self._max_retries:
74+
self._logger.warning(f"Attempt {attempt} failed: {e}")
5675
await asyncio.sleep(self._retry_delay)
57-
else:
58-
self._logger.error(f"All {self._max_retries} attempts failed")
59-
raise
76+
continue
77+
raise
78+
79+
async def _get_block_in_range(
80+
self, slot_start: int, slot_end: int
81+
) -> Tuple[Optional[int], Optional[Dict]]:
82+
"""Search for available block in given slot range."""
83+
current_slot = slot_end
84+
while current_slot >= slot_start:
85+
try:
86+
block = await self._make_rpc_request(
87+
"getBlock",
88+
[
89+
current_slot,
90+
{
91+
"encoding": "json",
92+
"maxSupportedTransactionVersion": 0,
93+
"transactionDetails": "signatures",
94+
"rewards": False,
95+
},
96+
],
97+
)
98+
if block:
99+
return current_slot, block
100+
current_slot -= 1
101+
102+
except ValueError as e:
103+
if "Block not available" in str(e):
104+
current_slot -= 1
105+
continue
106+
raise
107+
108+
return None, None
60109

61110
async def _fetch_evm_data(self, blockchain: str) -> BlockchainData:
111+
"""Fetches latest block data from EVM-compatible chains."""
62112
try:
63113
latest_block = await self._make_rpc_request(
64-
"eth_getBlockByNumber", ["latest", True]
114+
"eth_getBlockByNumber", ["latest", False]
65115
)
116+
if not isinstance(latest_block, dict):
117+
return BlockchainData.empty()
66118

67119
latest_number = int(latest_block["number"], 16)
68120
offset_range = MetricsServiceConfig.BLOCK_OFFSET_RANGES.get(
@@ -71,18 +123,12 @@ async def _fetch_evm_data(self, blockchain: str) -> BlockchainData:
71123
offset = random.randint(offset_range[0], offset_range[1])
72124
old_number = max(0, latest_number - offset)
73125

74-
if not isinstance(latest_block, dict):
75-
return BlockchainData(block_id="", transaction_id="", old_block_id="")
76-
77-
tx_hash = ""
78-
79126
transactions = latest_block.get("transactions", [])
80-
if transactions and isinstance(transactions[0], (dict, str)):
81-
tx_hash = (
82-
transactions[0].get("hash", "")
83-
if isinstance(transactions[0], dict)
84-
else transactions[0]
85-
)
127+
tx_hash = (
128+
transactions[0].get("hash", "")
129+
if isinstance(transactions[0], dict)
130+
else transactions[0] if transactions else ""
131+
)
86132

87133
return BlockchainData(
88134
block_id=latest_block["number"],
@@ -92,87 +138,56 @@ async def _fetch_evm_data(self, blockchain: str) -> BlockchainData:
92138

93139
except Exception as e:
94140
self._logger.error(f"EVM fetch failed: {e!s}")
95-
return BlockchainData(block_id="", transaction_id="", old_block_id="")
141+
return BlockchainData.empty()
96142

97143
async def _fetch_solana_data(self) -> BlockchainData:
144+
"""Fetches latest block data from Solana."""
98145
try:
99146
block_info = await self._make_rpc_request(
100147
"getLatestBlockhash", [{"commitment": "finalized"}]
101148
)
102149
if not isinstance(block_info, dict):
103-
return BlockchainData(block_id="", transaction_id="", old_block_id="")
150+
return BlockchainData.empty()
104151

105-
latest_slot = block_info.get("context", {}).get("slot", "")
152+
latest_slot = block_info.get("context", {}).get("slot")
106153
if not latest_slot:
107-
return BlockchainData(block_id="", transaction_id="", old_block_id="")
154+
return BlockchainData.empty()
108155

109-
latest_block = await self._make_rpc_request(
110-
"getBlock",
111-
[
112-
latest_slot,
113-
{
114-
"encoding": "json",
115-
"maxSupportedTransactionVersion": 0,
116-
"transactionDetails": "signatures",
117-
"rewards": False,
118-
},
119-
],
156+
actual_latest_slot, latest_block = await self._get_block_in_range(
157+
latest_slot - 10, latest_slot
120158
)
159+
if not actual_latest_slot or not latest_block:
160+
return BlockchainData.empty()
121161

122-
tx_sig = ""
123-
if isinstance(latest_block, dict):
124-
signatures = latest_block.get("signatures", [])
125-
if signatures:
126-
tx_sig = signatures[0]
162+
tx_sig = latest_block.get("signatures", [""])[0]
127163

128164
offset_range = MetricsServiceConfig.BLOCK_OFFSET_RANGES.get(
129165
"solana", (100, 1000)
130166
)
131167
offset = random.randint(offset_range[0], offset_range[1])
132-
target_slot = max(0, latest_slot - offset)
133-
old_slot = None
134-
135-
for slot in range(target_slot - 100, target_slot):
136-
try:
137-
block_exists = await self._make_rpc_request(
138-
"getBlock",
139-
[
140-
slot,
141-
{
142-
"encoding": "json",
143-
"maxSupportedTransactionVersion": 0,
144-
"transactionDetails": "none",
145-
"rewards": False,
146-
},
147-
],
148-
)
149-
if block_exists:
150-
old_slot = slot
151-
break
152-
except Exception as e:
153-
if "Block not available" not in str(e):
154-
self._logger.warning(f"Error checking slot {slot}: {e}")
155-
continue
168+
target_slot = max(0, actual_latest_slot - offset)
169+
old_slot, _ = await self._get_block_in_range(target_slot - 10, target_slot)
156170

157171
return BlockchainData(
158-
block_id=str(latest_slot),
172+
block_id=str(actual_latest_slot),
159173
transaction_id=tx_sig,
160-
old_block_id=str(old_slot) if old_slot is not None else "",
174+
old_block_id=str(old_slot or target_slot),
161175
)
162176

163177
except Exception as e:
164178
self._logger.error(f"Solana fetch failed: {e!s}")
165-
return BlockchainData(block_id="", transaction_id="", old_block_id="")
179+
return BlockchainData.empty()
166180

167181
async def _fetch_ton_data(self) -> BlockchainData:
182+
"""Fetches latest block data from TON."""
168183
try:
169184
info = await self._make_rpc_request("getMasterchainInfo")
170185
if not isinstance(info, dict) or "last" not in info:
171-
raise ValueError("Invalid masterchain info")
186+
return BlockchainData.empty()
172187

173188
last_block = info["last"]
174189
if not isinstance(last_block, dict):
175-
raise ValueError("Invalid last block format")
190+
return BlockchainData.empty()
176191

177192
offset_range = MetricsServiceConfig.BLOCK_OFFSET_RANGES.get("ton", (10, 50))
178193
offset = random.randint(offset_range[0], offset_range[1])
@@ -195,9 +210,11 @@ async def _fetch_ton_data(self) -> BlockchainData:
195210
},
196211
)
197212

198-
tx_id = ""
199-
if isinstance(block, dict) and block.get("transactions"):
200-
tx_id = block["transactions"][0].get("hash", "")
213+
tx_id = (
214+
block.get("transactions", [{}])[0].get("hash", "")
215+
if isinstance(block, dict)
216+
else ""
217+
)
201218

202219
return BlockchainData(
203220
block_id=latest_block_id,
@@ -207,18 +224,19 @@ async def _fetch_ton_data(self) -> BlockchainData:
207224

208225
except Exception as e:
209226
self._logger.error(f"TON fetch failed: {e!s}")
210-
return BlockchainData(block_id="", transaction_id="", old_block_id="")
227+
return BlockchainData.empty()
211228

212229
async def fetch_latest_data(self, blockchain: str) -> BlockchainData:
230+
"""Fetches latest block and transaction data for specified blockchain."""
213231
try:
214-
if blockchain in ("ethereum", "base"):
232+
if blockchain.lower() in ("ethereum", "base"):
215233
return await self._fetch_evm_data(blockchain)
216-
elif blockchain == "solana":
234+
elif blockchain.lower() == "solana":
217235
return await self._fetch_solana_data()
218-
elif blockchain == "ton":
236+
elif blockchain.lower() == "ton":
219237
return await self._fetch_ton_data()
220238
raise ValueError(f"Unsupported blockchain: {blockchain}")
221239

222240
except Exception as e:
223241
self._logger.error(f"Failed to fetch {blockchain} data: {e}")
224-
return BlockchainData(block_id="", transaction_id="")
242+
return BlockchainData.empty()

0 commit comments

Comments
 (0)