Skip to content

Commit 165fd04

Browse files
authored
Merge pull request #22 from chainstacklabs/feature/preceding-block-state
Add randomized historical blocks and fix RPC metrics timing
2 parents bc4bbcf + 115ed3c commit 165fd04

File tree

9 files changed

+147
-54
lines changed

9 files changed

+147
-54
lines changed

api/support/update_state.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,14 +89,15 @@ async def fetch_single(
8989
return blockchain, {
9090
"block": data.block_id,
9191
"tx": data.transaction_id,
92+
"old_block": data.old_block_id, # Add new field
9293
}
9394

9495
if blockchain in previous_data:
9596
self.logger.warning(f"Using previous data for {blockchain}")
9697
return blockchain, previous_data[blockchain]
9798

9899
self.logger.warning(f"Returning empty data for {blockchain}")
99-
return blockchain, {"block": "", "tx": ""}
100+
return blockchain, {"block": "", "tx": "", "old_block": ""}
100101
except Exception as e:
101102
self.logger.error(f"Failed to fetch {blockchain} data: {e}")
102103
if blockchain in previous_data:
@@ -105,7 +106,7 @@ async def fetch_single(
105106
)
106107
return blockchain, previous_data[blockchain]
107108
self.logger.warning(f"Returning empty data for {blockchain}")
108-
return blockchain, {"block": "", "tx": ""}
109+
return blockchain, {"block": "", "tx": "", "old_block": ""}
109110

110111
tasks = [
111112
fetch_single(blockchain, endpoint)

common/metric_types.py

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -175,23 +175,42 @@ async def fetch_data(self) -> float:
175175
endpoint = self.config.endpoints.get_endpoint(self.method)
176176

177177
async with aiohttp.ClientSession() as session:
178-
start_time = time.monotonic()
179-
async with await self._send_request(session, endpoint, 0) as response: # type: ignore
178+
response_time = 0.0 # Do not include retried requests after 429 error
179+
response = None
180+
181+
for retry_count in range(MAX_RETRIES):
182+
start_time = time.monotonic()
183+
response = await self._send_request(session, endpoint, retry_count)
184+
response_time = time.monotonic() - start_time
185+
186+
if response.status == 429 and retry_count < MAX_RETRIES - 1:
187+
wait_time = int(response.headers.get("Retry-After", 15))
188+
await response.release() # Release before retry
189+
await asyncio.sleep(wait_time)
190+
continue
191+
192+
break
193+
194+
if not response:
195+
raise ValueError("No response received")
196+
197+
try:
180198
if response.status != 200:
181199
raise ValueError(f"Status code: {response.status}")
200+
182201
json_response = await response.json()
183202
if "error" in json_response:
184203
raise ValueError(f"JSON-RPC error: {json_response['error']}")
185-
return time.monotonic() - start_time
204+
205+
return response_time
206+
finally:
207+
await response.release()
186208

187209
async def _send_request(
188210
self, session: aiohttp.ClientSession, endpoint: str, retry_count: int
189211
) -> aiohttp.ClientResponse:
190-
"""Send the request and handle rate limiting with retries."""
191-
if retry_count >= MAX_RETRIES:
192-
raise ValueError("Status code: 429. Max retries exceeded")
193-
194-
response = await session.post(
212+
"""Send the request without retry logic."""
213+
return await session.post(
195214
endpoint,
196215
headers={
197216
"Accept": "application/json",
@@ -201,14 +220,6 @@ async def _send_request(
201220
timeout=self.config.timeout, # type: ignore
202221
)
203222

204-
if response.status == 429 and retry_count < MAX_RETRIES:
205-
wait_time = int(response.headers.get("Retry-After", 10))
206-
await response.release()
207-
await asyncio.sleep(wait_time)
208-
return await self._send_request(session, endpoint, retry_count + 1)
209-
210-
return response
211-
212223
def process_data(self, value: float) -> float:
213224
"""Process raw latency measurement."""
214225
return value

common/state/blockchain_fetcher.py

Lines changed: 81 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,22 @@
22

33
import asyncio
44
import logging
5+
import random
56
from dataclasses import dataclass
67
from typing import Any, Dict, List, Optional, Union
78

89
import aiohttp
910

11+
from config.defaults import MetricsServiceConfig
12+
1013

1114
@dataclass
1215
class BlockchainData:
1316
"""Container for blockchain state data."""
1417

1518
block_id: str
1619
transaction_id: str
20+
old_block_id: str = ""
1721

1822

1923
class BlockchainDataFetcher:
@@ -54,49 +58,58 @@ async def _make_rpc_request(
5458
self._logger.error(f"All {self._max_retries} attempts failed")
5559
raise
5660

57-
async def _fetch_evm_data(self) -> BlockchainData:
61+
async def _fetch_evm_data(self, blockchain: str) -> BlockchainData:
5862
try:
59-
block = await self._make_rpc_request(
63+
latest_block = await self._make_rpc_request(
6064
"eth_getBlockByNumber", ["latest", True]
6165
)
62-
if not isinstance(block, dict):
63-
self._logger.error(f"Invalid block format: {type(block)}")
64-
return BlockchainData(block_id="", transaction_id="")
6566

66-
block_hash = block.get("hash", "")
67+
latest_number = int(latest_block["number"], 16)
68+
offset_range = MetricsServiceConfig.BLOCK_OFFSET_RANGES.get(
69+
blockchain.lower(), (20, 100)
70+
)
71+
offset = random.randint(offset_range[0], offset_range[1])
72+
old_number = max(0, latest_number - offset)
73+
74+
if not isinstance(latest_block, dict):
75+
return BlockchainData(block_id="", transaction_id="", old_block_id="")
76+
6777
tx_hash = ""
6878

69-
transactions = block.get("transactions", [])
79+
transactions = latest_block.get("transactions", [])
7080
if transactions and isinstance(transactions[0], (dict, str)):
7181
tx_hash = (
7282
transactions[0].get("hash", "")
7383
if isinstance(transactions[0], dict)
7484
else transactions[0]
7585
)
7686

77-
# self._logger.info(f"{block_hash} {tx_hash}")
78-
return BlockchainData(block_id=block_hash, transaction_id=tx_hash)
87+
return BlockchainData(
88+
block_id=latest_block["number"],
89+
transaction_id=tx_hash,
90+
old_block_id=hex(old_number),
91+
)
7992

8093
except Exception as e:
8194
self._logger.error(f"EVM fetch failed: {e!s}")
82-
return BlockchainData(block_id="", transaction_id="")
95+
return BlockchainData(block_id="", transaction_id="", old_block_id="")
8396

8497
async def _fetch_solana_data(self) -> BlockchainData:
8598
try:
8699
block_info = await self._make_rpc_request(
87100
"getLatestBlockhash", [{"commitment": "finalized"}]
88101
)
89102
if not isinstance(block_info, dict):
90-
return BlockchainData(block_id="", transaction_id="")
103+
return BlockchainData(block_id="", transaction_id="", old_block_id="")
91104

92-
block_slot = block_info.get("context", {}).get("slot", "")
93-
if not block_slot:
94-
return BlockchainData(block_id="", transaction_id="")
105+
latest_slot = block_info.get("context", {}).get("slot", "")
106+
if not latest_slot:
107+
return BlockchainData(block_id="", transaction_id="", old_block_id="")
95108

96-
block = await self._make_rpc_request(
109+
latest_block = await self._make_rpc_request(
97110
"getBlock",
98111
[
99-
block_slot,
112+
latest_slot,
100113
{
101114
"encoding": "json",
102115
"maxSupportedTransactionVersion": 0,
@@ -107,17 +120,49 @@ async def _fetch_solana_data(self) -> BlockchainData:
107120
)
108121

109122
tx_sig = ""
110-
if isinstance(block, dict):
111-
signatures = block.get("signatures", [])
123+
if isinstance(latest_block, dict):
124+
signatures = latest_block.get("signatures", [])
112125
if signatures:
113126
tx_sig = signatures[0]
114127

115-
# self._logger.info(f"{block_slot} {tx_sig}")
116-
return BlockchainData(block_id=str(block_slot), transaction_id=tx_sig)
128+
offset_range = MetricsServiceConfig.BLOCK_OFFSET_RANGES.get(
129+
"solana", (100, 1000)
130+
)
131+
offset = random.randint(offset_range[0], offset_range[1])
132+
target_slot = max(0, latest_slot - offset)
133+
old_slot = None
134+
135+
for slot in range(target_slot - 100, target_slot):
136+
try:
137+
block_exists = await self._make_rpc_request(
138+
"getBlock",
139+
[
140+
slot,
141+
{
142+
"encoding": "json",
143+
"maxSupportedTransactionVersion": 0,
144+
"transactionDetails": "none",
145+
"rewards": False,
146+
},
147+
],
148+
)
149+
if block_exists:
150+
old_slot = slot
151+
break
152+
except Exception as e:
153+
if "Block not available" not in str(e):
154+
self._logger.warning(f"Error checking slot {slot}: {e}")
155+
continue
156+
157+
return BlockchainData(
158+
block_id=str(latest_slot),
159+
transaction_id=tx_sig,
160+
old_block_id=str(old_slot) if old_slot is not None else "",
161+
)
117162

118163
except Exception as e:
119164
self._logger.error(f"Solana fetch failed: {e!s}")
120-
return BlockchainData(block_id="", transaction_id="")
165+
return BlockchainData(block_id="", transaction_id="", old_block_id="")
121166

122167
async def _fetch_ton_data(self) -> BlockchainData:
123168
try:
@@ -129,9 +174,16 @@ async def _fetch_ton_data(self) -> BlockchainData:
129174
if not isinstance(last_block, dict):
130175
raise ValueError("Invalid last block format")
131176

132-
block_id = (
177+
offset_range = MetricsServiceConfig.BLOCK_OFFSET_RANGES.get("ton", (10, 50))
178+
offset = random.randint(offset_range[0], offset_range[1])
179+
old_seqno = max(0, last_block["seqno"] - offset)
180+
181+
latest_block_id = (
133182
f"{last_block['workchain']}:{last_block['shard']}:{last_block['seqno']}"
134183
)
184+
old_block_id = (
185+
f"{last_block['workchain']}:{last_block['shard']}:{old_seqno}"
186+
)
135187

136188
block = await self._make_rpc_request(
137189
"getBlockTransactions",
@@ -147,17 +199,20 @@ async def _fetch_ton_data(self) -> BlockchainData:
147199
if isinstance(block, dict) and block.get("transactions"):
148200
tx_id = block["transactions"][0].get("hash", "")
149201

150-
# self._logger.info(f"{block_id} {tx_id}")
151-
return BlockchainData(block_id=block_id, transaction_id=tx_id)
202+
return BlockchainData(
203+
block_id=latest_block_id,
204+
transaction_id=tx_id,
205+
old_block_id=old_block_id,
206+
)
152207

153208
except Exception as e:
154209
self._logger.error(f"TON fetch failed: {e!s}")
155-
return BlockchainData(block_id="", transaction_id="")
210+
return BlockchainData(block_id="", transaction_id="", old_block_id="")
156211

157212
async def fetch_latest_data(self, blockchain: str) -> BlockchainData:
158213
try:
159214
if blockchain in ("ethereum", "base"):
160-
return await self._fetch_evm_data()
215+
return await self._fetch_evm_data(blockchain)
161216
elif blockchain == "solana":
162217
return await self._fetch_solana_data()
163218
elif blockchain == "ton":

common/state/blockchain_state.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,14 @@ async def _fetch_state_data(session: aiohttp.ClientSession, blob_url: str) -> Di
4949
async with session.get(blob_url, headers=headers) as response:
5050
if response.status != 200:
5151
raise ValueError(f"Failed to fetch state: {response.status}")
52-
return await response.json()
52+
data = await response.json()
53+
54+
# Ensure backward compatibility for old state data
55+
for chain in data:
56+
if isinstance(data[chain], dict) and "old_block" not in data[chain]:
57+
data[chain]["old_block"] = ""
58+
59+
return data
5360

5461
@staticmethod
5562
async def get_data(blockchain: str) -> dict:

config/defaults.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,14 @@ class MetricsServiceConfig:
2222
"dev_" if os.getenv("VERCEL_ENV") != "production" else ""
2323
) # System env var, standard name
2424

25+
# Block offset configuration (N blocks back from latest)
26+
BLOCK_OFFSET_RANGES = {
27+
"ethereum": (200, 1024),
28+
"base": (200, 1024),
29+
"solana": (324000, 334000),
30+
"ton": (1600000, 1610000),
31+
}
32+
2533

2634
class BlobStorageConfig:
2735
"""Default configuration for blob storage."""

metrics/base.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,15 @@ class HTTPAccBalanceLatencyMetric(HttpCallLatencyMetricBase):
4747
def method(self) -> str:
4848
return "eth_getBalance"
4949

50+
@staticmethod
51+
def validate_state(state_data: dict) -> bool:
52+
"""Validates that required block number (hex) exists in state data."""
53+
return bool(state_data and state_data.get("old_block"))
54+
5055
@staticmethod
5156
def get_params_from_state(state_data: dict) -> list:
5257
"""Get parameters with fixed monitoring address."""
53-
return ["0xF977814e90dA44bFA03b6295A0616a897441aceC", "latest"]
58+
return ["0xF977814e90dA44bFA03b6295A0616a897441aceC", state_data["old_block"]]
5459

5560

5661
class HTTPDebugTraceTxLatencyMetric(HttpCallLatencyMetricBase):

metrics/ethereum.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,15 @@ class HTTPAccBalanceLatencyMetric(HttpCallLatencyMetricBase):
6565
def method(self) -> str:
6666
return "eth_getBalance"
6767

68+
@staticmethod
69+
def validate_state(state_data: dict) -> bool:
70+
"""Validates that required block number (hex) exists in state data."""
71+
return bool(state_data and state_data.get("old_block"))
72+
6873
@staticmethod
6974
def get_params_from_state(state_data: dict) -> list:
7075
"""Returns parameters for balance check of monitoring address."""
71-
return ["0x690B9A9E9aa1C9dB991C7721a92d351Db4FaC990", "pending"]
76+
return ["0x690B9A9E9aa1C9dB991C7721a92d351Db4FaC990", state_data["old_block"]]
7277

7378

7479
class HTTPDebugTraceBlockByNumberLatencyMetric(HttpCallLatencyMetricBase):

metrics/solana.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ def get_params_from_state(state_data: dict) -> list:
1515
"""Get parameters for simulating a token transfer."""
1616
return [
1717
"AQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAAEDArczbMia1tLmq7zz4DinMNN0pJ1JtLdqIJPUw3YrGCzYAMHBsgN27lcgB6H2WQvFgyZuJYHa46puOQo9yQ8CVQbd9uHXZaGT2cvhRs7reawctIXtX1s3kTqM9YV+/wCp20C7Wj2aiuk5TReAXo+VTVg8QTHjs0UjNMMKCvpzZ+ABAgEBARU=",
18-
{"encoding": "base64"},
18+
# The transaction recent blockhash will be replaced with the most recent blockhash.
19+
{"encoding": "base64", "replaceRecentBlockhash": True},
1920
]
2021

2122

@@ -76,19 +77,19 @@ def method(self) -> str:
7677
@staticmethod
7778
def validate_state(state_data: dict) -> bool:
7879
"""Validate blockchain state contains block slot number."""
79-
return bool(state_data and state_data.get("block"))
80+
return bool(state_data and state_data.get("old_block"))
8081

8182
@staticmethod
8283
def get_params_from_state(state_data: dict) -> list:
8384
"""Get parameters using block slot from state."""
8485
return [
85-
int(state_data["block"]),
86+
int(state_data["old_block"]),
8687
{
8788
"encoding": "jsonParsed",
8889
"maxSupportedTransactionVersion": 0,
8990
"transactionDetails": "none", # Reduce response size
90-
"rewards": False # Further reduce response size
91-
}
91+
"rewards": False, # Further reduce response size
92+
},
9293
]
9394

9495

0 commit comments

Comments
 (0)