44import json
55import logging
66import os
7+ from collections .abc import Coroutine
78from http .server import BaseHTTPRequestHandler
8- from typing import Any , Dict , Set , Tuple
9+ from typing import Any
910
1011from common .state .blob_storage import BlobConfig , BlobStorageHandler
1112from common .state .blockchain_fetcher import BlockchainData , BlockchainDataFetcher
1213from common .state .blockchain_state import BlockchainState
1314
14- SUPPORTED_BLOCKCHAINS = ["ethereum" , "solana" , "ton" , "base" ]
15- ALLOWED_PROVIDERS = {"Chainstack" }
16- ALLOWED_REGIONS = {"fra1" }
15+ SUPPORTED_BLOCKCHAINS : list [str ] = ["ethereum" , "solana" , "ton" , "base" ]
16+ ALLOWED_PROVIDERS : set [str ] = {
17+ "Chainstack"
18+ } # To reduce number of RPC calls, use only one provider here
19+ ALLOWED_REGIONS : set [str ] = {
20+ "fra1"
21+ } # To reduce number of RPC calls, use only one region here
1722
1823
1924class MissingEndpointsError (Exception ):
2025 """Raised when required blockchain endpoints are not found."""
2126
22- def __init__ (self , missing_chains : Set [str ]):
23- self .missing_chains = missing_chains
24- chains = ", " .join (missing_chains )
27+ def __init__ (self , missing_chains : set [str ]) -> None :
28+ self .missing_chains : set [ str ] = missing_chains
29+ chains : str = ", " .join (missing_chains )
2530 super ().__init__ (f"Missing Chainstack endpoints for: { chains } " )
2631
2732
2833class StateUpdateManager :
29- def __init__ (self ):
30- store_id = os .getenv ("STORE_ID" )
31- token = os .getenv ("VERCEL_BLOB_TOKEN" )
34+ """Manages the collection, processing, and storage of blockchain state data.
35+
36+ This class orchestrates the retrieval of blockchain state data from configured endpoints,
37+ handles fallback to previous data in case of errors, and updates the centralized blob storage.
38+ It enforces provider and region filtering to optimize RPC calls and ensures data consistency.
39+ """
40+
41+ def __init__ (self ) -> None :
42+ store_id : str | None = os .getenv ("STORE_ID" )
43+ token : str | None = os .getenv ("VERCEL_BLOB_TOKEN" )
3244 if not all ([store_id , token ]):
3345 raise ValueError ("Missing required blob storage configuration" )
3446
3547 self .blob_config = BlobConfig (store_id = store_id , token = token ) # type: ignore
36- self .logger = logging .getLogger (__name__ )
48+ self .logger : logging . Logger = logging .getLogger (__name__ )
3749
38- async def _get_chainstack_endpoints (self ) -> Dict [str , str ]:
50+ async def _get_chainstack_endpoints (self ) -> dict [str , str ]:
3951 """Get Chainstack endpoints for supported blockchains."""
4052 endpoints = json .loads (os .getenv ("ENDPOINTS" , "{}" ))
41- chainstack_endpoints : Dict [str , str ] = {}
42- missing_chains : Set [str ] = set (SUPPORTED_BLOCKCHAINS )
53+ chainstack_endpoints : dict [str , str ] = {}
54+ missing_chains : set [str ] = set (SUPPORTED_BLOCKCHAINS )
4355
4456 for provider in endpoints .get ("providers" , []):
4557 blockchain = provider ["blockchain" ].lower ()
@@ -56,8 +68,8 @@ async def _get_chainstack_endpoints(self) -> Dict[str, str]:
5668
5769 return chainstack_endpoints
5870
59- async def _get_previous_data (self ) -> Dict [str , Any ]:
60- """Fetch previous blockchain state data"""
71+ async def _get_previous_data (self ) -> dict [str , Any ]:
72+ """Fetch previous blockchain state data. """
6173 try :
6274 state = BlockchainState ()
6375 previous_data = {}
@@ -76,11 +88,11 @@ async def _get_previous_data(self) -> Dict[str, Any]:
7688 return {}
7789
7890 async def _collect_blockchain_data (
79- self , providers : Dict [str , str ], previous_data : Dict [str , Any ]
80- ) -> Dict [str , dict ]:
91+ self , providers : dict [str , str ], previous_data : dict [str , Any ]
92+ ) -> dict [str , dict ]:
8193 async def fetch_single (
8294 blockchain : str , endpoint : str
83- ) -> Tuple [str , Dict [str , str ]]:
95+ ) -> tuple [str , dict [str , str ]]:
8496 try :
8597 fetcher = BlockchainDataFetcher (endpoint )
8698 data : BlockchainData = await fetcher .fetch_latest_data (blockchain )
@@ -89,7 +101,7 @@ async def fetch_single(
89101 return blockchain , {
90102 "block" : data .block_id ,
91103 "tx" : data .transaction_id ,
92- "old_block" : data .old_block_id , # Add new field
104+ "old_block" : data .old_block_id ,
93105 }
94106
95107 if blockchain in previous_data :
@@ -108,7 +120,7 @@ async def fetch_single(
108120 self .logger .warning (f"Returning empty data for { blockchain } " )
109121 return blockchain , {"block" : "" , "tx" : "" , "old_block" : "" }
110122
111- tasks = [
123+ tasks : list [ Coroutine [ Any , Any , tuple [ str , dict [ str , str ]]]] = [
112124 fetch_single (blockchain , endpoint )
113125 for blockchain , endpoint in providers .items ()
114126 ]
@@ -125,9 +137,11 @@ async def update(self) -> str:
125137 return "Region not authorized for state updates"
126138
127139 try :
128- previous_data = await self ._get_previous_data ()
140+ previous_data : dict [ str , Any ] = await self ._get_previous_data ()
129141
130- chainstack_endpoints = await self ._get_chainstack_endpoints ()
142+ chainstack_endpoints : dict [str , str ] = (
143+ await self ._get_chainstack_endpoints ()
144+ )
131145 blockchain_data = await self ._collect_blockchain_data (
132146 chainstack_endpoints , previous_data
133147 )
@@ -136,7 +150,7 @@ async def update(self) -> str:
136150 if not blockchain_data :
137151 if previous_data :
138152 self .logger .warning ("Using complete previous state as fallback" )
139- blockchain_data = previous_data
153+ blockchain_data : dict [ str , Any ] = previous_data
140154 else :
141155 return "No blockchain data collected and no previous data available"
142156
@@ -157,21 +171,21 @@ class handler(BaseHTTPRequestHandler):
157171 def _check_auth (self ) -> bool :
158172 if os .getenv ("SKIP_AUTH" , "" ).lower () == "true" :
159173 return True
160- token = self .headers .get ("Authorization" , "" )
174+ token : str = self .headers .get ("Authorization" , "" )
161175 return token == f"Bearer { os .getenv ('CRON_SECRET' , '' )} "
162176
163- def do_GET (self ):
177+ def do_GET (self ) -> None :
164178 if not self ._check_auth ():
165179 self .send_response (401 )
166180 self .end_headers ()
167181 self .wfile .write (b"Unauthorized" )
168182 return
169183
170- loop = asyncio .new_event_loop ()
184+ loop : asyncio . AbstractEventLoop = asyncio .new_event_loop ()
171185 asyncio .set_event_loop (loop )
172186
173187 try :
174- result = loop .run_until_complete (StateUpdateManager ().update ())
188+ result : str = loop .run_until_complete (StateUpdateManager ().update ())
175189 self .send_response (200 )
176190 self .send_header ("Content-type" , "text/plain" )
177191 self .end_headers ()
0 commit comments