From 3086b7c87fb074b87a5660975b85e7eba1e4cc59 Mon Sep 17 00:00:00 2001 From: smypmsa Date: Tue, 11 Mar 2025 16:01:54 +0000 Subject: [PATCH 1/2] chore: cron schedule, get_endpoint method, fomatting, typing --- .gitignore | 2 ++ api/support/update_state.py | 55 ++++++++++++++++-------------- api/write/solana.py | 2 +- common/base_metric.py | 12 +++---- common/factory.py | 44 +++++++++++------------- common/metric_config.py | 8 ++--- common/metric_types.py | 32 ++++++++--------- common/metrics_handler.py | 51 ++++++++++++++------------- common/state/blob_storage.py | 28 ++++++++------- common/state/blockchain_fetcher.py | 12 +++---- common/state/blockchain_state.py | 19 ++++++----- config/defaults.py | 4 ++- metrics/ethereum.py | 8 ++--- metrics/solana_landing_rate.py | 48 ++++++++++++++------------ vercel.json | 12 +++---- 15 files changed, 175 insertions(+), 162 deletions(-) diff --git a/.gitignore b/.gitignore index 2f9fd22..c427b6e 100644 --- a/.gitignore +++ b/.gitignore @@ -7,6 +7,8 @@ venv *.env *.local +.vscode + api/test endpoints*.json endpoints \ No newline at end of file diff --git a/api/support/update_state.py b/api/support/update_state.py index 6d23496..609c1c0 100644 --- a/api/support/update_state.py +++ b/api/support/update_state.py @@ -4,42 +4,43 @@ import json import logging import os +from collections.abc import Coroutine from http.server import BaseHTTPRequestHandler -from typing import Any, Dict, Set, Tuple +from typing import Any from common.state.blob_storage import BlobConfig, BlobStorageHandler from common.state.blockchain_fetcher import BlockchainData, BlockchainDataFetcher from common.state.blockchain_state import BlockchainState -SUPPORTED_BLOCKCHAINS = ["ethereum", "solana", "ton", "base"] -ALLOWED_PROVIDERS = {"Chainstack"} -ALLOWED_REGIONS = {"fra1"} +SUPPORTED_BLOCKCHAINS: list[str] = ["ethereum", "solana", "ton", "base"] +ALLOWED_PROVIDERS: set[str] = {"Chainstack"} +ALLOWED_REGIONS: set[str] = {"fra1"} class MissingEndpointsError(Exception): """Raised when required blockchain endpoints are not found.""" - def __init__(self, missing_chains: Set[str]): - self.missing_chains = missing_chains - chains = ", ".join(missing_chains) + def __init__(self, missing_chains: set[str]) -> None: + self.missing_chains: set[str] = missing_chains + chains: str = ", ".join(missing_chains) super().__init__(f"Missing Chainstack endpoints for: {chains}") class StateUpdateManager: - def __init__(self): - store_id = os.getenv("STORE_ID") - token = os.getenv("VERCEL_BLOB_TOKEN") + def __init__(self) -> None: + store_id: str | None = os.getenv("STORE_ID") + token: str | None = os.getenv("VERCEL_BLOB_TOKEN") if not all([store_id, token]): raise ValueError("Missing required blob storage configuration") self.blob_config = BlobConfig(store_id=store_id, token=token) # type: ignore - self.logger = logging.getLogger(__name__) + self.logger: logging.Logger = logging.getLogger(__name__) - async def _get_chainstack_endpoints(self) -> Dict[str, str]: + async def _get_chainstack_endpoints(self) -> dict[str, str]: """Get Chainstack endpoints for supported blockchains.""" endpoints = json.loads(os.getenv("ENDPOINTS", "{}")) - chainstack_endpoints: Dict[str, str] = {} - missing_chains: Set[str] = set(SUPPORTED_BLOCKCHAINS) + chainstack_endpoints: dict[str, str] = {} + missing_chains: set[str] = set(SUPPORTED_BLOCKCHAINS) for provider in endpoints.get("providers", []): blockchain = provider["blockchain"].lower() @@ -56,8 +57,8 @@ async def _get_chainstack_endpoints(self) -> Dict[str, str]: return chainstack_endpoints - async def _get_previous_data(self) -> Dict[str, Any]: - """Fetch previous blockchain state data""" + async def _get_previous_data(self) -> dict[str, Any]: + """Fetch previous blockchain state data.""" try: state = BlockchainState() previous_data = {} @@ -76,11 +77,11 @@ async def _get_previous_data(self) -> Dict[str, Any]: return {} async def _collect_blockchain_data( - self, providers: Dict[str, str], previous_data: Dict[str, Any] - ) -> Dict[str, dict]: + self, providers: dict[str, str], previous_data: dict[str, Any] + ) -> dict[str, dict]: async def fetch_single( blockchain: str, endpoint: str - ) -> Tuple[str, Dict[str, str]]: + ) -> tuple[str, dict[str, str]]: try: fetcher = BlockchainDataFetcher(endpoint) data: BlockchainData = await fetcher.fetch_latest_data(blockchain) @@ -108,7 +109,7 @@ async def fetch_single( self.logger.warning(f"Returning empty data for {blockchain}") return blockchain, {"block": "", "tx": "", "old_block": ""} - tasks = [ + tasks: list[Coroutine[Any, Any, tuple[str, dict[str, str]]]] = [ fetch_single(blockchain, endpoint) for blockchain, endpoint in providers.items() ] @@ -125,9 +126,11 @@ async def update(self) -> str: return "Region not authorized for state updates" try: - previous_data = await self._get_previous_data() + previous_data: dict[str, Any] = await self._get_previous_data() - chainstack_endpoints = await self._get_chainstack_endpoints() + chainstack_endpoints: dict[str, str] = ( + await self._get_chainstack_endpoints() + ) blockchain_data = await self._collect_blockchain_data( chainstack_endpoints, previous_data ) @@ -136,7 +139,7 @@ async def update(self) -> str: if not blockchain_data: if previous_data: self.logger.warning("Using complete previous state as fallback") - blockchain_data = previous_data + blockchain_data: dict[str, Any] = previous_data else: return "No blockchain data collected and no previous data available" @@ -157,17 +160,17 @@ class handler(BaseHTTPRequestHandler): def _check_auth(self) -> bool: if os.getenv("SKIP_AUTH", "").lower() == "true": return True - token = self.headers.get("Authorization", "") + token: str = self.headers.get("Authorization", "") return token == f"Bearer {os.getenv('CRON_SECRET', '')}" - def do_GET(self): + def do_GET(self) -> None: if not self._check_auth(): self.send_response(401) self.end_headers() self.wfile.write(b"Unauthorized") return - loop = asyncio.new_event_loop() + loop: asyncio.AbstractEventLoop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: diff --git a/api/write/solana.py b/api/write/solana.py index 5f4f814..4186795 100644 --- a/api/write/solana.py +++ b/api/write/solana.py @@ -8,7 +8,7 @@ target_region = "fra1" # Run this metric only in EU (fra1) -METRICS = ( +METRICS: list[tuple[type[SolanaLandingMetric], str]] = ( [] if os.getenv("VERCEL_REGION") != target_region # System env var, standard name else [(SolanaLandingMetric, metric_name)] diff --git a/common/base_metric.py b/common/base_metric.py index 9da7d22..6efbe86 100644 --- a/common/base_metric.py +++ b/common/base_metric.py @@ -4,7 +4,7 @@ import uuid from abc import ABC, abstractmethod from dataclasses import dataclass -from typing import Any, Dict, List, Optional, Union +from typing import Any, Optional, Union from common.metric_config import MetricConfig, MetricLabelKey, MetricLabels @@ -16,7 +16,7 @@ class MetricValue: """Container for a single metric value and its specific labels.""" value: Union[int, float] - labels: Optional[Dict[str, str]] = None + labels: Optional[dict[str, str]] = None class BaseMetric(ABC): @@ -24,7 +24,7 @@ class BaseMetric(ABC): def __init__( self, - handler: "MetricsHandler", # type: ignore + handler: "MetricsHandler", # type: ignore # noqa: F821 metric_name: str, labels: MetricLabels, config: MetricConfig, @@ -37,7 +37,7 @@ def __init__( self.config = config self.ws_endpoint = ws_endpoint self.http_endpoint = http_endpoint - self.values: Dict[str, MetricValue] = {} + self.values: dict[str, MetricValue] = {} handler._instances.append(self) @abstractmethod @@ -48,7 +48,7 @@ async def collect_metric(self) -> None: def process_data(self, data: Any) -> Union[int, float]: """Processes raw data into metric value.""" - def get_influx_format(self) -> List[str]: + def get_influx_format(self) -> list[str]: """Returns metrics in Influx line protocol format.""" if not self.values: raise ValueError("No metric values set") @@ -78,7 +78,7 @@ def update_metric_value( self, value: Union[int, float], value_type: str = "response_time", - labels: Optional[Dict[str, str]] = None, + labels: Optional[dict[str, str]] = None, ) -> None: """Updates metric value, preserving existing labels if present.""" if value_type in self.values: diff --git a/common/factory.py b/common/factory.py index b697c93..8947a1a 100644 --- a/common/factory.py +++ b/common/factory.py @@ -2,28 +2,25 @@ import copy from dataclasses import dataclass -from typing import Dict, List, Tuple, Type from common.base_metric import BaseMetric from common.metric_config import EndpointConfig, MetricConfig, MetricLabels @dataclass -class MetricRegistration: # Added: Type-safe registration container - metric_class: Type[BaseMetric] +class MetricRegistration: + metric_class: type[BaseMetric] metric_name: str class MetricFactory: """Creates metric instances for blockchains.""" - _registry: Dict[str, List[MetricRegistration]] = ( - {} - ) # Modified: Using type-safe registration + _registry: dict[str, list[MetricRegistration]] = {} @classmethod def register( - cls, blockchain_metrics: Dict[str, List[Tuple[Type[BaseMetric], str]]] + cls, blockchain_metrics: dict[str, list[tuple[type[BaseMetric], str]]] ) -> None: """Registers metric classes for blockchains.""" for blockchain_name, metrics in blockchain_metrics.items(): @@ -46,8 +43,8 @@ def create_metrics( blockchain_name: str, metrics_handler: "MetricsHandler", # type: ignore # noqa: F821 config: MetricConfig, - **kwargs: Dict, - ) -> List[BaseMetric]: + **kwargs: dict, + ) -> list[BaseMetric]: """Creates metric instances for a specific blockchain.""" if blockchain_name not in cls._registry: available = list(cls._registry.keys()) @@ -55,12 +52,11 @@ def create_metrics( f"No metric classes registered for blockchain '{blockchain_name}'. Available blockchains: {available}" ) - # Added: Extracted config setup to separate method cls._setup_endpoint_config(config, kwargs) - source_region = kwargs.get("source_region", "default") - target_region = kwargs.get("target_region", "default") - provider = kwargs.get("provider", "default") + source_region: str = str(kwargs.get("source_region", "default")) + target_region: str = str(kwargs.get("target_region", "default")) + provider: str = str(kwargs.get("provider", "default")) metrics = [] for registration in cls._registry[blockchain_name]: @@ -96,9 +92,7 @@ def create_metrics( return metrics @staticmethod - def _setup_endpoint_config( - config: MetricConfig, kwargs: Dict - ) -> None: # Added: Extracted method + def _setup_endpoint_config(config: MetricConfig, kwargs: dict) -> None: """Sets up endpoint configuration from kwargs.""" config.endpoints = EndpointConfig( main_endpoint=kwargs.get("http_endpoint"), @@ -107,16 +101,16 @@ def _setup_endpoint_config( @staticmethod def _create_solana_metrics( - blockchain_name: str, # Added: Type hints - metric_class: Type[BaseMetric], + blockchain_name: str, + metric_class: type[BaseMetric], metric_name: str, - metrics_handler: "MetricsHandler", + metrics_handler: "MetricsHandler", # noqa: F821 # type: ignore config: MetricConfig, - kwargs: Dict, + kwargs: dict, source_region: str, target_region: str, provider: str, - ) -> List[BaseMetric]: + ) -> list[BaseMetric]: """Creates SolanaLandingMetric-specific instances.""" metrics = [] @@ -157,12 +151,12 @@ def _create_solana_metrics( @staticmethod def _create_single_metric( - blockchain_name: str, # Added: Type hints - metric_class: Type[BaseMetric], + blockchain_name: str, + metric_class: type[BaseMetric], metric_name: str, - metrics_handler: "MetricsHandler", + metrics_handler: "MetricsHandler", # noqa: F821 # type: ignore config: MetricConfig, - kwargs: Dict, + kwargs: dict, source_region: str, target_region: str, provider: str, diff --git a/common/metric_config.py b/common/metric_config.py index 588b691..fd69e0b 100644 --- a/common/metric_config.py +++ b/common/metric_config.py @@ -2,7 +2,7 @@ import logging from enum import Enum -from typing import Any, Dict, Optional +from typing import Any, Optional class MetricLabelKey(Enum): @@ -29,10 +29,8 @@ def __init__( self.tx_endpoint = tx_endpoint self.ws_endpoint = ws_endpoint - def get_endpoint(self, method: str) -> Optional[str]: + def get_endpoint(self) -> Optional[str]: """Returns appropriate endpoint based on method.""" - if method == "NOT_USED_ANYMORE" and self.tx_endpoint: - return self.tx_endpoint return self.main_endpoint @@ -43,7 +41,7 @@ def __init__( self, timeout: int, max_latency: int, - extra_params: Optional[Dict[str, Any]] = None, + extra_params: Optional[dict[str, Any]] = None, endpoints: Optional[EndpointConfig] = None, ) -> None: self.timeout = timeout diff --git a/common/metric_types.py b/common/metric_types.py index efa9f8d..49e08f9 100644 --- a/common/metric_types.py +++ b/common/metric_types.py @@ -4,7 +4,7 @@ import logging import time from abc import abstractmethod -from typing import Any, Dict, Optional +from typing import Any, Optional import aiohttp import websockets @@ -47,8 +47,8 @@ async def listen_for_data(self, websocket: Any) -> Optional[Any]: async def connect(self) -> Any: """Creates WebSocket connection.""" - websocket = await websockets.connect( - self.ws_endpoint, + websocket: websockets.WebSocketClientProtocol = await websockets.connect( + self.ws_endpoint, # type: ignore ping_timeout=self.config.timeout, close_timeout=self.config.timeout, ) @@ -64,7 +64,7 @@ async def collect_metric(self) -> None: data = await self.listen_for_data(websocket) if data is not None: - latency = self.process_data(data) + latency: int | float = self.process_data(data) self.update_metric_value(latency) self.mark_success() return @@ -98,7 +98,7 @@ async def collect_metric(self) -> None: try: data = await self.fetch_data() if data is not None: - latency = self.process_data(data) + latency: int | float = self.process_data(data) self.update_metric_value(latency) self.mark_success() return @@ -127,7 +127,7 @@ def __init__( metric_name: str, labels: MetricLabels, config: MetricConfig, - method_params: Optional[Dict[str, Any]] = None, + method_params: Optional[dict[str, Any]] = None, **kwargs: Any, ) -> None: state_data = kwargs.get("state_data", {}) @@ -141,7 +141,7 @@ def __init__( config=config, ) - self.method_params = ( + self.method_params: dict[str, Any] = ( self.get_params_from_state(state_data) if method_params is None else method_params @@ -149,7 +149,7 @@ def __init__( self.labels.update_label(MetricLabelKey.API_METHOD, self.method) self._base_request = self._build_base_request() - def _build_base_request(self) -> Dict[str, Any]: + def _build_base_request(self) -> dict[str, Any]: """Build the base JSON-RPC request object.""" request = { "id": 1, @@ -161,27 +161,27 @@ def _build_base_request(self) -> Dict[str, Any]: return request @staticmethod - def validate_state(state_data: Dict[str, Any]) -> bool: + def validate_state(state_data: dict[str, Any]) -> bool: """Validate blockchain state data.""" return True @staticmethod - def get_params_from_state(state_data: Dict[str, Any]) -> Dict[str, Any]: + def get_params_from_state(state_data: dict[str, Any]) -> dict[str, Any]: """Get RPC method parameters from state data.""" return {} async def fetch_data(self) -> float: """Measure single request latency with a retry on 429 error.""" - endpoint = self.config.endpoints.get_endpoint(self.method) + endpoint: str | None = self.config.endpoints.get_endpoint() async with aiohttp.ClientSession() as session: response_time = 0.0 # Do not include retried requests after 429 error - response = None + response = None # type: ignore for retry_count in range(MAX_RETRIES): - start_time = time.monotonic() - response = await self._send_request(session, endpoint, retry_count) - response_time = time.monotonic() - start_time + start_time: float = time.monotonic() + response: aiohttp.ClientResponse = await self._send_request(session, endpoint) # type: ignore + response_time: float = time.monotonic() - start_time if response.status == 429 and retry_count < MAX_RETRIES - 1: wait_time = int(response.headers.get("Retry-After", 15)) @@ -207,7 +207,7 @@ async def fetch_data(self) -> float: await response.release() async def _send_request( - self, session: aiohttp.ClientSession, endpoint: str, retry_count: int + self, session: aiohttp.ClientSession, endpoint: str ) -> aiohttp.ClientResponse: """Send the request without retry logic.""" return await session.post( diff --git a/common/metrics_handler.py b/common/metrics_handler.py index ad4fa06..696d81e 100644 --- a/common/metrics_handler.py +++ b/common/metrics_handler.py @@ -6,7 +6,6 @@ import os import time from http.server import BaseHTTPRequestHandler -from typing import List, Tuple, Type import aiohttp @@ -20,10 +19,10 @@ class MetricsHandler: """Manages collection and pushing of blockchain metrics.""" - def __init__(self, blockchain: str, metrics: List[Tuple[Type, str]]): - self._instances: List[BaseMetric] = [] - self.blockchain = blockchain - self.metrics = metrics + def __init__(self, blockchain: str, metrics: list[tuple[type, str]]) -> None: + self._instances: list[BaseMetric] = [] + self.blockchain: str = blockchain + self.metrics: list[tuple[type, str]] = metrics self.grafana_config = { "current_region": os.getenv( "VERCEL_REGION" @@ -38,7 +37,7 @@ def __init__(self, blockchain: str, metrics: List[Tuple[Type, str]]): "metric_max_latency": MetricsServiceConfig.METRIC_MAX_LATENCY, } - def get_metrics_influx_format(self) -> List[str]: + def get_metrics_influx_format(self) -> list[str]: """Returns all metric values in Influx format.""" metrics = [] for instance in self._instances: @@ -48,10 +47,12 @@ def get_metrics_influx_format(self) -> List[str]: def get_metrics_text(self) -> str: current_time = int(time.time_ns()) - metrics = self.get_metrics_influx_format() + metrics: list[str] = self.get_metrics_influx_format() return "\n".join(f"{metric} {current_time}" for metric in metrics) - async def collect_metrics(self, provider: dict, config: dict, state_data: dict): + async def collect_metrics( + self, provider: dict, config: dict, state_data: dict + ) -> None: metric_config = MetricConfig( timeout=self.grafana_config["metric_request_timeout"], max_latency=self.grafana_config["metric_max_latency"], @@ -59,21 +60,21 @@ async def collect_metrics(self, provider: dict, config: dict, state_data: dict): extra_params={"tx_data": provider.get("data")}, ) - metrics = MetricFactory.create_metrics( + metrics: list[BaseMetric] = MetricFactory.create_metrics( blockchain_name=self.blockchain, metrics_handler=self, config=metric_config, provider=provider["name"], source_region=self.grafana_config["current_region"], target_region=config.get("region", "default"), - ws_endpoint=provider.get("websocket_endpoint"), - http_endpoint=provider.get("http_endpoint"), - tx_endpoint=provider.get("tx_endpoint"), + ws_endpoint=provider.get("websocket_endpoint"), # type: ignore + http_endpoint=provider.get("http_endpoint"), # type: ignore + tx_endpoint=provider.get("tx_endpoint"), # type: ignore state_data=state_data, ) await asyncio.gather(*(m.collect_metric() for m in metrics)) - async def push_to_grafana(self, metrics_text: str): + async def push_to_grafana(self, metrics_text: str) -> None: if not all( [ self.grafana_config["url"], @@ -102,11 +103,11 @@ async def push_to_grafana(self, metrics_text: str): if attempt < self.grafana_config["push_retries"]: await asyncio.sleep(self.grafana_config["push_retry_delay"]) - async def handle(self) -> Tuple[str, str]: + async def handle(self) -> tuple[str, str]: """Main handler for metric collection and pushing.""" self._instances = [] try: - config = json.loads(os.getenv("ENDPOINTS")) + config = json.loads(os.getenv("ENDPOINTS")) # type: ignore MetricFactory._registry.clear() MetricFactory.register({self.blockchain: self.metrics}) rpc_providers = [ @@ -123,7 +124,7 @@ async def handle(self) -> Tuple[str, str]: ] await asyncio.gather(*collection_tasks, return_exceptions=True) - metrics_text = self.get_metrics_text() + metrics_text: str = self.get_metrics_text() if metrics_text: await self.push_to_grafana(metrics_text) else: @@ -139,15 +140,17 @@ async def handle(self) -> Tuple[str, str]: class BaseVercelHandler(BaseHTTPRequestHandler): """HTTP handler for Vercel serverless endpoint.""" - metrics_handler: MetricsHandler = None + metrics_handler: MetricsHandler = None # type: ignore - def validate_token(self): - auth_token = self.headers.get("Authorization") - expected_token = os.environ.get("CRON_SECRET") # System env var, standard name + def validate_token(self) -> bool: + auth_token: str | None = self.headers.get("Authorization") + expected_token: str | None = os.environ.get( + "CRON_SECRET" + ) # System env var, standard name return auth_token == f"Bearer {expected_token}" - def do_GET(self): - skip_auth = os.environ.get("SKIP_AUTH", "false").lower() == "true" + def do_GET(self) -> None: + skip_auth: bool = os.environ.get("SKIP_AUTH", "false").lower() == "true" if not skip_auth and not self.validate_token(): self.send_response(401) self.send_header("Content-type", "text/plain") @@ -155,7 +158,7 @@ def do_GET(self): self.wfile.write(b"Unauthorized") return - loop = asyncio.new_event_loop() + loop: asyncio.AbstractEventLoop = asyncio.new_event_loop() asyncio.set_event_loop(loop) try: @@ -163,7 +166,7 @@ def do_GET(self): self.send_response(200) self.send_header("Content-type", "text/plain") self.end_headers() - response = ( + response: str = ( f"{self.metrics_handler.blockchain} metrics collection " f"completed\n\nMetrics:\n{metrics_text}" ) diff --git a/common/state/blob_storage.py b/common/state/blob_storage.py index a40d2bd..fdaa029 100644 --- a/common/state/blob_storage.py +++ b/common/state/blob_storage.py @@ -1,7 +1,9 @@ +"""Blob storage handler for managing blobs in Vercel Blob Storage.""" + import json import time from dataclasses import dataclass -from typing import Dict, List, Optional +from typing import Optional import aiohttp @@ -21,8 +23,8 @@ class BlobConfig: class BlobStorageHandler: def __init__(self, config: BlobConfig): - self.config = config - self._headers = { + self.config: BlobConfig = config + self._headers: dict[str, str] = { "Authorization": f"Bearer {config.token}", "Content-Type": "application/json", "x-store-id": config.store_id, @@ -33,8 +35,8 @@ def __init__(self, config: BlobConfig): } async def _make_request( - self, method: str, url: str, data: Optional[Dict] = None - ) -> Dict: + self, method: str, url: str, data: Optional[dict] = None + ) -> dict: async with aiohttp.ClientSession() as session: async with session.request( method, @@ -47,25 +49,27 @@ async def _make_request( raise Exception(f"Blob operation failed: {resp.status} - {text}") return await resp.json() - async def list_files(self) -> List[Dict[str, str]]: - list_url = f"{self.config.base_url}?prefix={self.config.folder}/" + async def list_files(self) -> list[dict[str, str]]: + list_url: str = f"{self.config.base_url}?prefix={self.config.folder}/" response = await self._make_request("GET", list_url) return response.get("blobs", []) - async def delete_blobs(self, urls: List[str]) -> None: + async def delete_blobs(self, urls: list[str]) -> None: if not urls: return delete_url = f"{self.config.base_url}/delete" await self._make_request("POST", delete_url, {"urls": urls}) async def delete_all_files(self) -> None: - files = await self.list_files() + files: list[dict[str, str]] = await self.list_files() if files: - urls = [file["url"] for file in files] + urls: list[str] = [file["url"] for file in files] await self.delete_blobs(urls) - async def update_data(self, blockchain_data: Dict[str, Dict[str, str]]) -> None: + async def update_data(self, blockchain_data: dict[str, dict[str, str]]) -> None: await self.delete_all_files() data = {**blockchain_data, "updated_at": int(time.time())} - blob_url = f"{self.config.base_url}/{self.config.folder}/{self.config.filename}" + blob_url: str = ( + f"{self.config.base_url}/{self.config.folder}/{self.config.filename}" + ) await self._make_request("PUT", blob_url, data) diff --git a/common/state/blockchain_fetcher.py b/common/state/blockchain_fetcher.py index 542bb9e..9c1b01b 100644 --- a/common/state/blockchain_fetcher.py +++ b/common/state/blockchain_fetcher.py @@ -4,7 +4,7 @@ import logging import random from dataclasses import dataclass -from typing import Any, Dict, List, Optional, Tuple, Union +from typing import Any, Optional, Union import aiohttp @@ -28,8 +28,8 @@ class BlockchainDataFetcher: """Fetches blockchain data from RPC nodes using JSON-RPC protocol.""" def __init__(self, http_endpoint: str) -> None: - self.http_endpoint = http_endpoint - self._headers = {"Content-Type": "application/json"} + self.http_endpoint: str = http_endpoint + self._headers: dict[str, str] = {"Content-Type": "application/json"} self._timeout = aiohttp.ClientTimeout(total=15) self._max_retries = 3 self._retry_delay = 5 @@ -37,10 +37,10 @@ def __init__(self, http_endpoint: str) -> None: logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) - self._logger = logging.getLogger(__name__) + self._logger: logging.Logger = logging.getLogger(__name__) async def _make_rpc_request( - self, method: str, params: Optional[Union[List, Dict]] = None + self, method: str, params: Optional[Union[list, dict]] = None ) -> Any: """Makes a JSON-RPC request with retries.""" request = {"jsonrpc": "2.0", "method": method, "params": params or [], "id": 1} @@ -78,7 +78,7 @@ async def _make_rpc_request( async def _get_block_in_range( self, slot_start: int, slot_end: int - ) -> Tuple[Optional[int], Optional[Dict]]: + ) -> tuple[Optional[int], Optional[dict]]: """Search for available block in given slot range.""" current_slot = slot_end while current_slot >= slot_start: diff --git a/common/state/blockchain_state.py b/common/state/blockchain_state.py index a4f5917..bf85c13 100644 --- a/common/state/blockchain_state.py +++ b/common/state/blockchain_state.py @@ -1,7 +1,8 @@ +"""Manages blockchain state data by fetching and processing data from blob storage.""" + import asyncio import logging import os -from typing import Dict import aiohttp @@ -16,11 +17,11 @@ class BlockchainState: _RETRY_DELAY = 3 @staticmethod - def _get_headers() -> Dict[str, str]: + def _get_headers() -> dict[str, str]: """Get the authorization headers for blob storage requests.""" return { "Authorization": f"Bearer {os.getenv('VERCEL_BLOB_TOKEN')}", - "x-store-id": os.getenv("STORE_ID"), + "x-store-id": os.getenv("STORE_ID"), # type: ignore } @staticmethod @@ -29,7 +30,7 @@ async def _get_blob_url(session: aiohttp.ClientSession) -> str: list_url = ( f"{BlobStorageConfig.BLOB_BASE_URL}?prefix={BlobStorageConfig.BLOB_FOLDER}/" ) - headers = BlockchainState._get_headers() + headers: dict[str, str] = BlockchainState._get_headers() async with session.get(list_url, headers=headers) as response: if response.status != 200: @@ -42,9 +43,9 @@ async def _get_blob_url(session: aiohttp.ClientSession) -> str: raise ValueError("Blockchain data blob not found") @staticmethod - async def _fetch_state_data(session: aiohttp.ClientSession, blob_url: str) -> Dict: + async def _fetch_state_data(session: aiohttp.ClientSession, blob_url: str) -> dict: """Fetch state data from blob storage.""" - headers = BlockchainState._get_headers() + headers: dict[str, str] = BlockchainState._get_headers() async with session.get(blob_url, headers=headers) as response: if response.status != 200: @@ -61,20 +62,20 @@ async def _fetch_state_data(session: aiohttp.ClientSession, blob_url: str) -> Di @staticmethod async def get_data(blockchain: str) -> dict: """Get blockchain state data with retries.""" - last_exception = None + last_exception = None # type: ignore for attempt in range(1, BlockchainState._RETRIES + 1): try: async with aiohttp.ClientSession( timeout=BlockchainState._TIMEOUT ) as session: - blob_url = await BlockchainState._get_blob_url(session) + blob_url: str = await BlockchainState._get_blob_url(session) state_data = await BlockchainState._fetch_state_data( session, blob_url ) return state_data.get(blockchain.lower(), {}) except Exception as e: - last_exception = str(e) if str(e) else "Unknown error occurred" + last_exception: str = str(e) if str(e) else "Unknown error occurred" logging.warning( f"Attempt {attempt}: State fetch failed: {last_exception}" ) diff --git a/config/defaults.py b/config/defaults.py index 69237da..d0b9df7 100644 --- a/config/defaults.py +++ b/config/defaults.py @@ -1,3 +1,5 @@ +"""Default configuration.""" + import os @@ -23,7 +25,7 @@ class MetricsServiceConfig: ) # System env var, standard name # Block offset configuration (N blocks back from latest) - BLOCK_OFFSET_RANGES = { + BLOCK_OFFSET_RANGES = { # noqa: RUF012 "ethereum": (7200, 14400), "base": (7200, 14400), "solana": (432000, 648000), diff --git a/metrics/ethereum.py b/metrics/ethereum.py index d69db36..a4686df 100644 --- a/metrics/ethereum.py +++ b/metrics/ethereum.py @@ -149,7 +149,7 @@ async def subscribe(self, websocket) -> None: Raises: ValueError: If subscription to newHeads fails """ - subscription_msg = json.dumps( + subscription_msg: str = json.dumps( { "id": 1, "jsonrpc": "2.0", @@ -204,7 +204,7 @@ def process_data(self, block) -> float: """ block_timestamp_hex = block.get("timestamp", "0x0") block_timestamp = int(block_timestamp_hex, 16) - block_time = datetime.fromtimestamp(block_timestamp, timezone.utc) - current_time = datetime.now(timezone.utc) - latency = (current_time - block_time).total_seconds() + block_time: datetime = datetime.fromtimestamp(block_timestamp, timezone.utc) + current_time: datetime = datetime.now(timezone.utc) + latency: float = (current_time - block_time).total_seconds() return latency diff --git a/metrics/solana_landing_rate.py b/metrics/solana_landing_rate.py index 3d8f3e5..32cb4ce 100644 --- a/metrics/solana_landing_rate.py +++ b/metrics/solana_landing_rate.py @@ -14,7 +14,11 @@ from solders.instruction import Instruction from solders.keypair import Keypair from solders.pubkey import Pubkey -from solders.rpc.responses import GetSignatureStatusesResp +from solders.rpc.responses import ( + GetLatestBlockhashResp, + GetSignatureStatusesResp, + SendTransactionResp, +) from solders.transaction import Transaction from solders.transaction_status import TransactionConfirmationStatus, TransactionStatus @@ -64,8 +68,8 @@ def __init__( self.method = "sendTransaction" self.labels.update_label(MetricLabelKey.API_METHOD, self.method) - self.private_key = base58.b58decode(os.environ["SOLANA_PRIVATE_KEY"]) - self.keypair = Keypair.from_bytes(self.private_key) + self.private_key: bytes = base58.b58decode(os.environ["SOLANA_PRIVATE_KEY"]) + self.keypair: Keypair = Keypair.from_bytes(self.private_key) self._slot_diff = 0 async def _create_client(self) -> AsyncClient: @@ -73,15 +77,15 @@ async def _create_client(self) -> AsyncClient: return AsyncClient(endpoint) async def _get_slot(self, client: AsyncClient) -> int: - response = await client.get_slot(MetricsServiceConfig.SOLANA_CONFIRMATION_LEVEL) + response = await client.get_slot(MetricsServiceConfig.SOLANA_CONFIRMATION_LEVEL) # type: ignore if not response or response.value is None: raise ValueError("Failed to get current slot") return response.value - async def _check_status(self, client: AsyncClient, signature: str) -> int: + async def _check_status(self, client: AsyncClient, signature: str) -> int | None: """Check single transaction status.""" response: GetSignatureStatusesResp = await client.get_signature_statuses( - [signature] + [signature] # type: ignore ) if not response or not response.value: return None @@ -102,9 +106,9 @@ async def _wait_for_confirmation( self, client: AsyncClient, signature: str, timeout: int ) -> int: """Wait for transaction confirmation using direct status checks.""" - end_time = time.monotonic() + timeout + end_time: float = time.monotonic() + timeout while time.monotonic() < end_time: - status = await self._check_status(client, signature) + status: int | None = await self._check_status(client, signature) if status: return status await asyncio.sleep(self.POLL_INTERVAL) @@ -113,11 +117,13 @@ async def _wait_for_confirmation( async def _prepare_memo_transaction(self, client: AsyncClient) -> Transaction: memo_text = generate_fixed_memo( - self.labels.get_label(MetricLabelKey.SOURCE_REGION) + self.labels.get_label(MetricLabelKey.SOURCE_REGION) # type: ignore ) - compute_limit_ix = set_compute_unit_limit(MetricsServiceConfig.COMPUTE_LIMIT) - compute_price_ix = set_compute_unit_price( + compute_limit_ix: Instruction = set_compute_unit_limit( + MetricsServiceConfig.COMPUTE_LIMIT + ) + compute_price_ix: Instruction = set_compute_unit_price( MetricsServiceConfig.PRIORITY_FEE_MICROLAMPORTS ) @@ -127,7 +133,7 @@ async def _prepare_memo_transaction(self, client: AsyncClient) -> Transaction: data=memo_text.encode(), ) - blockhash = await client.get_latest_blockhash() + blockhash: GetLatestBlockhashResp = await client.get_latest_blockhash() if not blockhash or not blockhash.value: raise ValueError("Failed to get latest blockhash") @@ -144,15 +150,15 @@ async def fetch_data(self) -> Optional[float]: self.update_metric_value(0, "response_time") self.update_metric_value(0, "slot_latency") - client = None + client = None # type: ignore try: - client = await self._create_client() - tx = await self._prepare_memo_transaction(client) + client: AsyncClient = await self._create_client() + tx: Transaction = await self._prepare_memo_transaction(client) - start_slot = await self._get_slot(client) - start_time = time.monotonic() + start_slot: int = await self._get_slot(client) + start_time: float = time.monotonic() - signature_response = await client.send_transaction( + signature_response: SendTransactionResp = await client.send_transaction( tx, TxOpts(skip_preflight=True, max_retries=0) ) if not signature_response or not signature_response.value: @@ -160,14 +166,14 @@ async def fetch_data(self) -> Optional[float]: confirmation_slot = await self._wait_for_confirmation( client, - signature_response.value, + signature_response.value, # type: ignore self.config.timeout, ) # `response_time` is not representative, # we don't use it in the visualizations - response_time = time.monotonic() - start_time - self._slot_diff = max(confirmation_slot - start_slot, 0) + response_time: float = time.monotonic() - start_time + self._slot_diff: int = max(confirmation_slot - start_slot, 0) self.update_metric_value(self._slot_diff, "slot_latency") return response_time diff --git a/vercel.json b/vercel.json index c1d1764..1cc8de6 100644 --- a/vercel.json +++ b/vercel.json @@ -28,19 +28,19 @@ "crons": [ { "path": "/api/read/ethereum", - "schedule": "*/2 * * * *" + "schedule": "* * * * *" }, { "path": "/api/read/base", - "schedule": "*/2 * * * *" + "schedule": "* * * * *" }, { "path": "/api/read/solana", - "schedule": "*/2 * * * *" + "schedule": "* * * * *" }, { "path": "/api/read/ton", - "schedule": "*/2 * * * *" + "schedule": "* * * * *" }, { "path": "/api/write/solana", @@ -48,7 +48,7 @@ }, { "path": "/api/support/update_state", - "schedule": "*/20 * * * *" + "schedule": "*/30 * * * *" } ] -} +} \ No newline at end of file From 0e049eb9ea5cc43bdde79d9d821c77e19a103755 Mon Sep 17 00:00:00 2001 From: smypmsa Date: Tue, 11 Mar 2025 17:28:44 +0000 Subject: [PATCH 2/2] minor fixes, typing, comments --- api/support/update_state.py | 19 +++++++++++++++---- common/base_metric.py | 18 +++++++++--------- common/factory.py | 11 +++++++++-- common/metric_config.py | 20 ++++++++++---------- common/metric_types.py | 4 ++-- metrics/solana_landing_rate.py | 9 +++++---- tests/test_api_read.py | 12 ++++++------ tests/test_api_write.py | 12 ++++++------ tests/test_update_state.py | 8 ++++---- 9 files changed, 66 insertions(+), 47 deletions(-) diff --git a/api/support/update_state.py b/api/support/update_state.py index 609c1c0..e5f2a7d 100644 --- a/api/support/update_state.py +++ b/api/support/update_state.py @@ -13,8 +13,12 @@ from common.state.blockchain_state import BlockchainState SUPPORTED_BLOCKCHAINS: list[str] = ["ethereum", "solana", "ton", "base"] -ALLOWED_PROVIDERS: set[str] = {"Chainstack"} -ALLOWED_REGIONS: set[str] = {"fra1"} +ALLOWED_PROVIDERS: set[str] = { + "Chainstack" +} # To reduce number of RPC calls, use only one provider here +ALLOWED_REGIONS: set[str] = { + "fra1" +} # To reduce number of RPC calls, use only one region here class MissingEndpointsError(Exception): @@ -27,6 +31,13 @@ def __init__(self, missing_chains: set[str]) -> None: class StateUpdateManager: + """Manages the collection, processing, and storage of blockchain state data. + + This class orchestrates the retrieval of blockchain state data from configured endpoints, + handles fallback to previous data in case of errors, and updates the centralized blob storage. + It enforces provider and region filtering to optimize RPC calls and ensures data consistency. + """ + def __init__(self) -> None: store_id: str | None = os.getenv("STORE_ID") token: str | None = os.getenv("VERCEL_BLOB_TOKEN") @@ -90,7 +101,7 @@ async def fetch_single( return blockchain, { "block": data.block_id, "tx": data.transaction_id, - "old_block": data.old_block_id, # Add new field + "old_block": data.old_block_id, } if blockchain in previous_data: @@ -174,7 +185,7 @@ def do_GET(self) -> None: asyncio.set_event_loop(loop) try: - result = loop.run_until_complete(StateUpdateManager().update()) + result: str = loop.run_until_complete(StateUpdateManager().update()) self.send_response(200) self.send_header("Content-type", "text/plain") self.end_headers() diff --git a/common/base_metric.py b/common/base_metric.py index 6efbe86..5986d80 100644 --- a/common/base_metric.py +++ b/common/base_metric.py @@ -32,11 +32,11 @@ def __init__( http_endpoint: Optional[str] = None, ) -> None: self.metric_id = str(uuid.uuid4()) - self.metric_name = metric_name - self.labels = labels - self.config = config - self.ws_endpoint = ws_endpoint - self.http_endpoint = http_endpoint + self.metric_name: str = metric_name + self.labels: MetricLabels = labels + self.config: MetricConfig = config + self.ws_endpoint: str | None = ws_endpoint + self.http_endpoint: str | None = http_endpoint self.values: dict[str, MetricValue] = {} handler._instances.append(self) @@ -54,18 +54,18 @@ def get_influx_format(self) -> list[str]: raise ValueError("No metric values set") metrics = [] - base_tags = ",".join( + base_tags: str = ",".join( [f"{label.key.value}={label.value}" for label in self.labels.labels] ) for value_type, metric_value in self.values.items(): - tags = base_tags + tags: str = base_tags if tags: tags = f"{base_tags},metric_type={value_type}" else: tags = f"metric_type={value_type}" - metric_line = f"{self.metric_name}" + metric_line: str = f"{self.metric_name}" if tags: metric_line += f",{tags}" metric_line += f" value={metric_value.value}" @@ -101,7 +101,7 @@ def handle_error(self, error: Exception) -> None: if not self.values: self.update_metric_value(0) - error_type = error.__class__.__name__ + error_type: str = error.__class__.__name__ error_details = getattr(error, "error_msg", str(error)) logging.error( diff --git a/common/factory.py b/common/factory.py index 8947a1a..8976686 100644 --- a/common/factory.py +++ b/common/factory.py @@ -9,12 +9,19 @@ @dataclass class MetricRegistration: + """Stores metadata for registering a metric, including its class and name.""" + metric_class: type[BaseMetric] metric_name: str class MetricFactory: - """Creates metric instances for blockchains.""" + """Creates metric instances for blockchains. + + For SolanaLandingMetric, a special logic is applied where both the default http endpoint + and an enhanced transaction endpoint (if available) are used to create separate metric + instances, allowing for differentiated provider names and richer data collection. + """ _registry: dict[str, list[MetricRegistration]] = {} @@ -131,7 +138,7 @@ def _create_solana_metrics( # Second instance using tx_endpoint as main endpoint and updated provider name if kwargs.get("tx_endpoint"): - config_copy = copy.deepcopy(config) + config_copy: MetricConfig = copy.deepcopy(config) config_copy.endpoints.main_endpoint = kwargs.get("tx_endpoint") metrics.append( MetricFactory._create_single_metric( diff --git a/common/metric_config.py b/common/metric_config.py index fd69e0b..4a70cc1 100644 --- a/common/metric_config.py +++ b/common/metric_config.py @@ -25,9 +25,9 @@ def __init__( tx_endpoint: Optional[str] = None, ws_endpoint: Optional[str] = None, ) -> None: - self.main_endpoint = main_endpoint - self.tx_endpoint = tx_endpoint - self.ws_endpoint = ws_endpoint + self.main_endpoint: str | None = main_endpoint + self.tx_endpoint: str | None = tx_endpoint + self.ws_endpoint: str | None = ws_endpoint def get_endpoint(self) -> Optional[str]: """Returns appropriate endpoint based on method.""" @@ -44,10 +44,10 @@ def __init__( extra_params: Optional[dict[str, Any]] = None, endpoints: Optional[EndpointConfig] = None, ) -> None: - self.timeout = timeout - self.max_latency = max_latency - self.endpoints = endpoints or EndpointConfig() - self.extra_params = extra_params or {} + self.timeout: int = timeout + self.max_latency: int = max_latency + self.endpoints: EndpointConfig = endpoints or EndpointConfig() + self.extra_params: dict[str, Any] = extra_params or {} class MetricLabel: @@ -58,8 +58,8 @@ def __init__(self, key: MetricLabelKey, value: str) -> None: raise ValueError( f"Invalid key, must be an instance of MetricLabelKey Enum: {key}" ) - self.key = key - self.value = value + self.key: MetricLabelKey = key + self.value: str = value class MetricLabels: @@ -74,7 +74,7 @@ def __init__( api_method: str = "default", response_status: str = "pending", ) -> None: - self.labels = [ + self.labels: list[MetricLabel] = [ MetricLabel(MetricLabelKey.SOURCE_REGION, source_region), MetricLabel(MetricLabelKey.TARGET_REGION, target_region), MetricLabel(MetricLabelKey.BLOCKCHAIN, blockchain), diff --git a/common/metric_types.py b/common/metric_types.py index 49e08f9..8d4f659 100644 --- a/common/metric_types.py +++ b/common/metric_types.py @@ -90,9 +90,9 @@ class HttpMetric(BaseMetric): async def fetch_data(self) -> Optional[Any]: """Fetches HTTP endpoint data.""" - def get_endpoint(self, method: str) -> str: + def get_endpoint(self) -> str: """Returns appropriate endpoint based on method.""" - return self.config.endpoints.get_endpoint(method) # type: ignore + return str(self.config.endpoints.get_endpoint()) async def collect_metric(self) -> None: try: diff --git a/metrics/solana_landing_rate.py b/metrics/solana_landing_rate.py index 32cb4ce..f751e5c 100644 --- a/metrics/solana_landing_rate.py +++ b/metrics/solana_landing_rate.py @@ -17,6 +17,7 @@ from solders.rpc.responses import ( GetLatestBlockhashResp, GetSignatureStatusesResp, + GetSlotResp, SendTransactionResp, ) from solders.transaction import Transaction @@ -73,11 +74,11 @@ def __init__( self._slot_diff = 0 async def _create_client(self) -> AsyncClient: - endpoint = self.get_endpoint("sendTransaction") + endpoint: str = self.get_endpoint() return AsyncClient(endpoint) async def _get_slot(self, client: AsyncClient) -> int: - response = await client.get_slot(MetricsServiceConfig.SOLANA_CONFIRMATION_LEVEL) # type: ignore + response: GetSlotResp = await client.get_slot(MetricsServiceConfig.SOLANA_CONFIRMATION_LEVEL) # type: ignore if not response or response.value is None: raise ValueError("Failed to get current slot") return response.value @@ -116,7 +117,7 @@ async def _wait_for_confirmation( raise ValueError(f"Transaction confirmation timeout after {timeout}s") async def _prepare_memo_transaction(self, client: AsyncClient) -> Transaction: - memo_text = generate_fixed_memo( + memo_text: str = generate_fixed_memo( self.labels.get_label(MetricLabelKey.SOURCE_REGION) # type: ignore ) @@ -164,7 +165,7 @@ async def fetch_data(self) -> Optional[float]: if not signature_response or not signature_response.value: raise ValueError("Failed to send transaction") - confirmation_slot = await self._wait_for_confirmation( + confirmation_slot: int = await self._wait_for_confirmation( client, signature_response.value, # type: ignore self.config.timeout, diff --git a/tests/test_api_read.py b/tests/test_api_read.py index 4574d8e..18014ad 100644 --- a/tests/test_api_read.py +++ b/tests/test_api_read.py @@ -12,26 +12,26 @@ sys.path.append(project_root) -def setup_environment(): +def setup_environment() -> None: """Load environment and endpoints configuration.""" - env_path = Path(project_root) / ".env.local" + env_path: Path = Path(project_root) / ".env.local" print(f"Looking for .env.local at: {env_path}") dotenv.load_dotenv(env_path) - endpoints_path = Path(project_root) / "endpoints.json" + endpoints_path: Path = Path(project_root) / "endpoints.json" with open(endpoints_path) as f: os.environ["ENDPOINTS"] = json.dumps(json.load(f)) -def main(): +def main() -> None: """Start local development server.""" setup_environment() # Import handler after environment setup - from api.read.base import handler as Handler + from api.read.ton import handler - server = HTTPServer(("localhost", 8000), Handler) + server = HTTPServer(("localhost", 8000), handler) print("Server started at http://localhost:8000") server.serve_forever() diff --git a/tests/test_api_write.py b/tests/test_api_write.py index 46d6ab1..2dca910 100644 --- a/tests/test_api_write.py +++ b/tests/test_api_write.py @@ -12,26 +12,26 @@ sys.path.append(project_root) -def setup_environment(): +def setup_environment() -> None: """Load environment and endpoints configuration.""" - env_path = Path(project_root) / ".env.local" + env_path: Path = Path(project_root) / ".env.local" print(f"Looking for .env.local at: {env_path}") dotenv.load_dotenv(env_path) - endpoints_path = Path(project_root) / "endpoints.json" + endpoints_path: Path = Path(project_root) / "endpoints.json" with open(endpoints_path) as f: os.environ["ENDPOINTS"] = json.dumps(json.load(f)) -def main(): +def main() -> None: """Start local development server.""" setup_environment() # Import handler after environment setup - from api.write.solana import handler as Handler + from api.write.solana import handler - server = HTTPServer(("localhost", 8000), Handler) + server = HTTPServer(("localhost", 8000), handler) print("Server started at http://localhost:8000") server.serve_forever() diff --git a/tests/test_update_state.py b/tests/test_update_state.py index 1568abc..90c067b 100644 --- a/tests/test_update_state.py +++ b/tests/test_update_state.py @@ -12,19 +12,19 @@ sys.path.append(project_root) -def setup_environment(): +def setup_environment() -> None: """Load environment and endpoints configuration.""" - env_path = Path(project_root) / ".env.local" + env_path: Path = Path(project_root) / ".env.local" print(f"Looking for .env.local at: {env_path}") dotenv.load_dotenv(env_path) - endpoints_path = Path(project_root) / "endpoints.json" + endpoints_path: Path = Path(project_root) / "endpoints.json" with open(endpoints_path) as f: os.environ["ENDPOINTS"] = json.dumps(json.load(f)) -def main(): +def main() -> None: """Start local development server.""" setup_environment()