From 16401f1667884c315306bedc7c1916d2c8b7ac02 Mon Sep 17 00:00:00 2001 From: smypmsa Date: Mon, 10 Feb 2025 10:19:58 +0000 Subject: [PATCH] added dynamic params --- api/read/base.py | 4 +- common/metric_types.py | 64 +++++-- common/metrics_handler.py | 4 +- common/state/blockchain_state.py | 65 +++++++ metrics/base.py | 174 ++++++++--------- metrics/ethereum.py | 302 ++++++++++++++--------------- metrics/solana.py | 316 +++++++++---------------------- metrics/ton.py | 198 ++++++++----------- tests/test_api_read.py | 4 +- 9 files changed, 514 insertions(+), 617 deletions(-) create mode 100644 common/state/blockchain_state.py diff --git a/api/read/base.py b/api/read/base.py index 11bd588..1aaaa32 100644 --- a/api/read/base.py +++ b/api/read/base.py @@ -2,13 +2,13 @@ from config.defaults import MetricsServiceConfig from metrics.base import ( HTTPAccBalanceLatencyMetric, + HTTPBlockNumberLatencyMetric, + HTTPDebugTraceBlockByNumberLatencyMetric, HTTPDebugTraceTxLatencyMetric, HTTPEthCallLatencyMetric, HTTPTxReceiptLatencyMetric, ) from metrics.ethereum import ( - HTTPBlockNumberLatencyMetric, - HTTPDebugTraceBlockByNumberLatencyMetric, WSBlockLatencyMetric, ) diff --git a/common/metric_types.py b/common/metric_types.py index 238b411..aa98e82 100644 --- a/common/metric_types.py +++ b/common/metric_types.py @@ -3,7 +3,7 @@ import logging import time from abc import abstractmethod -from typing import Any, Optional +from typing import Any, Dict, Optional import aiohttp import websockets @@ -110,7 +110,17 @@ async def collect_metric(self) -> None: class HttpCallLatencyMetricBase(HttpMetric): - """Base class for JSON-RPC HTTP endpoint latency metrics.""" + """Base class for JSON-RPC HTTP endpoint latency metrics. + + Handles request configuration, state validation, and response time measurement + for blockchain RPC endpoints. + """ + + @property + @abstractmethod + def method(self) -> str: + """RPC method name to be implemented by subclasses.""" + pass def __init__( self, @@ -118,49 +128,73 @@ def __init__( metric_name: str, labels: MetricLabels, config: MetricConfig, - method: str, - method_params: dict = None, - **kwargs, - ): + method_params: Optional[Dict[str, Any]] = None, + **kwargs: Any, + ) -> None: + state_data = kwargs.get("state_data", {}) + if not self.validate_state(state_data): + raise ValueError(f"Invalid state data for {self.method}") + super().__init__( handler=handler, metric_name=metric_name, labels=labels, config=config, ) - self.method = method - self.method_params = method_params or None - self.labels.update_label(MetricLabelKey.API_METHOD, method) - self._base_request = { + + self.method_params = ( + self.get_params_from_state(state_data) + if method_params is None + else method_params + ) + self.labels.update_label(MetricLabelKey.API_METHOD, self.method) + self._base_request = self._build_base_request() + + def _build_base_request(self) -> Dict[str, Any]: + """Build the base JSON-RPC request object.""" + request = { "id": 1, "jsonrpc": "2.0", "method": self.method, } if self.method_params: - self._base_request["params"] = self.method_params + request["params"] = self.method_params + return request + + @staticmethod + def validate_state(state_data: Dict[str, Any]) -> bool: + """Validate blockchain state data.""" + return True + + @staticmethod + def get_params_from_state(state_data: Dict[str, Any]) -> Dict[str, Any]: + """Get RPC method parameters from state data.""" + return {} async def fetch_data(self) -> float: - """Measures single request latency.""" + """Measure single request latency.""" start_time = time.monotonic() - endpoint = self.config.endpoints.get_endpoint(self.method) async with aiohttp.ClientSession() as session: async with session.post( - endpoint, + endpoint, # type: ignore headers={ "Accept": "application/json", "Content-Type": "application/json", }, json=self._base_request, - timeout=self.config.timeout, + timeout=self.config.timeout, # type: ignore ) as response: if response.status != 200: raise ValueError(f"Status code: {response.status}") + json_response = await response.json() if "error" in json_response: raise ValueError(f"JSON-RPC error: {json_response['error']}") + return time.monotonic() - start_time def process_data(self, value: float) -> float: + """Process raw latency measurement.""" return value diff --git a/common/metrics_handler.py b/common/metrics_handler.py index d6ac359..a45847c 100644 --- a/common/metrics_handler.py +++ b/common/metrics_handler.py @@ -13,6 +13,7 @@ from common.base_metric import BaseMetric from common.factory import MetricFactory from common.metric_config import MetricConfig +from common.state.blockchain_state import BlockchainState from config.defaults import MetricsServiceConfig @@ -57,6 +58,7 @@ async def collect_metrics(self, provider: dict, config: dict): endpoints=None, # Will be set in factory extra_params={"tx_data": provider.get("data")}, ) + state_data = await BlockchainState.get_data(self.blockchain) metrics = MetricFactory.create_metrics( blockchain_name=self.blockchain, @@ -68,6 +70,7 @@ async def collect_metrics(self, provider: dict, config: dict): ws_endpoint=provider.get("websocket_endpoint"), http_endpoint=provider.get("http_endpoint"), tx_endpoint=provider.get("tx_endpoint"), + state_data=state_data, ) await asyncio.gather(*(m.collect_metric() for m in metrics)) @@ -103,7 +106,6 @@ async def push_to_grafana(self, metrics_text: str): async def handle(self) -> Tuple[str, str]: """Main handler for metric collection and pushing.""" self._instances = [] - try: config = json.loads(os.getenv("ENDPOINTS")) MetricFactory._registry.clear() diff --git a/common/state/blockchain_state.py b/common/state/blockchain_state.py new file mode 100644 index 0000000..f3e544b --- /dev/null +++ b/common/state/blockchain_state.py @@ -0,0 +1,65 @@ +import logging +import os +from typing import Dict + +import aiohttp + +from config.defaults import BlobStorageConfig + + +class BlockchainState: + """Manages blockchain state data retrieval from blob storage.""" + + _TIMEOUT = aiohttp.ClientTimeout(total=10) + + @staticmethod + async def _get_blob_url(session: aiohttp.ClientSession) -> str: + """Get URL of the latest blockchain data blob.""" + list_url = ( + f"{BlobStorageConfig.BLOB_BASE_URL}?prefix={BlobStorageConfig.BLOB_FOLDER}/" + ) + headers = { + "Authorization": f"Bearer {os.getenv('VERCEL_BLOB_TOKEN')}", + "x-store-id": os.getenv("STORE_ID"), + } + + async with session.get(list_url, headers=headers) as response: + if response.status != 200: + raise ValueError(f"Failed to list blobs: {response.status}") + + data = await response.json() + blobs = data.get("blobs", []) + + for blob in blobs: + if blob["pathname"].endswith(BlobStorageConfig.BLOB_FILENAME): + return blob["url"] + + raise ValueError("Blockchain data blob not found") + + @staticmethod + async def _fetch_state_data(session: aiohttp.ClientSession, blob_url: str) -> Dict: + """Fetch state data from blob storage.""" + async with session.get(blob_url) as response: + if response.status != 200: + raise ValueError(f"Failed to fetch state: {response.status}") + return await response.json() + + @staticmethod + async def get_data(blockchain: str) -> dict: + """Get blockchain state data.""" + try: + async with aiohttp.ClientSession( + timeout=BlockchainState._TIMEOUT + ) as session: + blob_url = await BlockchainState._get_blob_url(session) + state_data = await BlockchainState._fetch_state_data(session, blob_url) + return state_data.get(blockchain.lower(), {}) + + except Exception as e: + logging.error(f"State fetch failed: {e!s}") + raise + + @staticmethod + def clear_cache() -> None: + """Maintained for API compatibility.""" + pass diff --git a/metrics/base.py b/metrics/base.py index 2a1289a..8193a5b 100644 --- a/metrics/base.py +++ b/metrics/base.py @@ -1,105 +1,97 @@ -"""Base EVM metrics implementation for WebSocket and HTTP endpoints.""" +"""Base EVM metrics implementation for HTTP endpoints.""" -from common.metric_config import MetricConfig, MetricLabels from common.metric_types import HttpCallLatencyMetricBase class HTTPEthCallLatencyMetric(HttpCallLatencyMetricBase): - """Collects transaction latency for endpoints using eth_call to simulate a transaction. - This metric tracks the time taken for a simulated transaction (eth_call) to be processed by the RPC node. - """ - - def __init__( - self, - handler: "MetricsHandler", # type: ignore - metric_name: str, - labels: MetricLabels, - config: MetricConfig, - **kwargs, - ): - super().__init__( - handler=handler, - metric_name=metric_name, - labels=labels, - config=config, - method="eth_call", - method_params=[ - { - "to": "0x833589fcd6edb6e08f4c7c32d4f71b54bda02913", - "data": "0x70a082310000000000000000000000001985ea6e9c68e1c272d8209f3b478ac2fdb25c87", - }, - "latest", - ], - **kwargs, - ) + """Collects response time for eth_call simulation.""" + + @property + def method(self) -> str: + return "eth_call" + + @staticmethod + def get_params_from_state(state_data: dict) -> list: + """Get eth_call parameters for USDC token balance query.""" + return [ + { + "to": "0x833589fcd6edb6e08f4c7c32d4f71b54bda02913", + "data": "0x70a082310000000000000000000000001985ea6e9c68e1c272d8209f3b478ac2fdb25c87", + }, + "latest", + ] class HTTPTxReceiptLatencyMetric(HttpCallLatencyMetricBase): - """Collects call latency for the `eth_getTransactionReceipt` method.""" - - def __init__( - self, - handler: "MetricsHandler", # type: ignore - metric_name: str, - labels: MetricLabels, - config: MetricConfig, - **kwargs, - ): - super().__init__( - handler=handler, - metric_name=metric_name, - labels=labels, - config=config, - method="eth_getTransactionReceipt", - method_params=[ - "0x1759c699e6e2b1f249fa0ed605c0de18998bc66556cd6ea3362f92f511aeb06a" - ], - **kwargs, - ) + """Collects latency for transaction receipt retrieval.""" + + @property + def method(self) -> str: + return "eth_getTransactionReceipt" + + @staticmethod + def validate_state(state_data: dict) -> bool: + """Validate blockchain state contains transaction hash.""" + return bool(state_data and state_data.get("tx")) + + @staticmethod + def get_params_from_state(state_data: dict) -> list: + """Get parameters using transaction hash from state.""" + return [state_data["tx"]] class HTTPAccBalanceLatencyMetric(HttpCallLatencyMetricBase): - """Collects call latency for the `eth_getBalance` method.""" - - def __init__( - self, - handler: "MetricsHandler", # type: ignore - metric_name: str, - labels: MetricLabels, - config: MetricConfig, - **kwargs, - ): - super().__init__( - handler=handler, - metric_name=metric_name, - labels=labels, - config=config, - method="eth_getBalance", - method_params=["0xF977814e90dA44bFA03b6295A0616a897441aceC", "latest"], - **kwargs, - ) + """Collects latency for account balance queries.""" + + @property + def method(self) -> str: + return "eth_getBalance" + + @staticmethod + def get_params_from_state(state_data: dict) -> list: + """Get parameters with fixed monitoring address.""" + return ["0xF977814e90dA44bFA03b6295A0616a897441aceC", "latest"] class HTTPDebugTraceTxLatencyMetric(HttpCallLatencyMetricBase): - """Collects call latency for the `debug_traceTransaction` method.""" - - def __init__( - self, - handler: "MetricsHandler", # type: ignore - metric_name: str, - labels: MetricLabels, - config: MetricConfig, - **kwargs, - ): - super().__init__( - handler=handler, - metric_name=metric_name, - labels=labels, - config=config, - method="debug_traceTransaction", - method_params=[ - "0x317888c89fe0914c6d11be51acf758742afbe0cf1fdac11f19d35d6ed652ac29", - {"tracer": "callTracer"}, - ], - **kwargs, - ) + """Collects latency for transaction tracing.""" + + @property + def method(self) -> str: + return "debug_traceTransaction" + + @staticmethod + def validate_state(state_data: dict) -> bool: + """Validate blockchain state contains transaction hash.""" + return bool(state_data and state_data.get("tx")) + + @staticmethod + def get_params_from_state(state_data: dict) -> list: + """Get parameters using transaction hash from state.""" + return [state_data["tx"], {"tracer": "callTracer"}] + + +class HTTPDebugTraceBlockByNumberLatencyMetric(HttpCallLatencyMetricBase): + """Collects call latency for the `debug_traceBlockByNumber` method.""" + + @property + def method(self) -> str: + return "debug_traceBlockByNumber" + + @staticmethod + def get_params_from_state(state_data: dict) -> list: + """Get fixed parameters for latest block tracing.""" + return ["latest", {"tracer": "callTracer"}] + + +class HTTPBlockNumberLatencyMetric(HttpCallLatencyMetricBase): + """Collects call latency for the `eth_blockNumber` method.""" + + @property + def method(self) -> str: + return "eth_blockNumber" + + @staticmethod + def get_params_from_state(state_data: dict) -> list: + """Get empty parameter list for block number query.""" + return [] diff --git a/metrics/ethereum.py b/metrics/ethereum.py index bbeb5c4..b4e2e08 100644 --- a/metrics/ethereum.py +++ b/metrics/ethereum.py @@ -8,19 +8,123 @@ from common.metric_types import HttpCallLatencyMetricBase, WebSocketMetric +class HTTPEthCallLatencyMetric(HttpCallLatencyMetricBase): + """Collects transaction latency for eth_call simulation.""" + + @property + def method(self) -> str: + return "eth_call" + + @staticmethod + def get_params_from_state(state_data: dict) -> list: + """Returns parameters for eth_call with fixed USDC token query.""" + return [ + { + "to": "0xc2edad668740f1aa35e4d8f227fb8e17dca888cd", + "data": "0x1526fe270000000000000000000000000000000000000000000000000000000000000001", # noqa: E501 + }, + "latest", + ] + + +class HTTPBlockNumberLatencyMetric(HttpCallLatencyMetricBase): + """Collects call latency for the eth_blockNumber method.""" + + @property + def method(self) -> str: + return "eth_blockNumber" + + @staticmethod + def get_params_from_state(state_data: dict) -> list: + """Returns empty parameters list for eth_blockNumber.""" + return [] + + +class HTTPTxReceiptLatencyMetric(HttpCallLatencyMetricBase): + """Collects call latency for the eth_getTransactionReceipt method.""" + + @property + def method(self) -> str: + return "eth_getTransactionReceipt" + + @staticmethod + def validate_state(state_data: dict) -> bool: + """Validates that required transaction hash exists in state data.""" + return bool(state_data and state_data.get("tx")) + + @staticmethod + def get_params_from_state(state_data: dict) -> list: + """Returns parameters using transaction hash from state.""" + return [state_data["tx"]] + + +class HTTPAccBalanceLatencyMetric(HttpCallLatencyMetricBase): + """Collects call latency for the eth_getBalance method.""" + + @property + def method(self) -> str: + return "eth_getBalance" + + @staticmethod + def get_params_from_state(state_data: dict) -> list: + """Returns parameters for balance check of monitoring address.""" + return ["0x690B9A9E9aa1C9dB991C7721a92d351Db4FaC990", "pending"] + + +class HTTPDebugTraceBlockByNumberLatencyMetric(HttpCallLatencyMetricBase): + """Collects call latency for the debug_traceBlockByNumber method.""" + + @property + def method(self) -> str: + return "debug_traceBlockByNumber" + + @staticmethod + def get_params_from_state(state_data: dict) -> list: + """Returns parameters for tracing latest block.""" + return ["latest", {"tracer": "callTracer"}] + + +class HTTPDebugTraceTxLatencyMetric(HttpCallLatencyMetricBase): + """Collects call latency for the debug_traceTransaction method.""" + + @property + def method(self) -> str: + return "debug_traceTransaction" + + @staticmethod + def validate_state(state_data: dict) -> bool: + """Validates that required transaction hash exists in state data.""" + return bool(state_data and state_data.get("tx")) + + @staticmethod + def get_params_from_state(state_data: dict) -> list: + """Returns parameters using transaction hash from state.""" + return [state_data["tx"], {"tracer": "callTracer"}] + + class WSBlockLatencyMetric(WebSocketMetric): """Collects block latency for EVM providers using a WebSocket connection. + Suitable for serverless invocation: connects, subscribes, collects one message, and disconnects. - """ + """ # noqa: E501 def __init__( self, - handler: "MetricsHandler", # type: ignore + handler: "MetricsHandler", # type: ignore # noqa: F821 metric_name: str, labels: MetricLabels, config: MetricConfig, **kwargs, - ): + ) -> None: + """Initialize WebSocket block latency metric. + + Args: + handler: Metrics handler instance + metric_name: Name of the metric + labels: Metric labels container + config: Metric configuration + **kwargs: Additional arguments including ws_endpoint + """ ws_endpoint = kwargs.pop("ws_endpoint", None) super().__init__( handler=handler, @@ -31,8 +135,15 @@ def __init__( ) self.labels.update_label(MetricLabelKey.API_METHOD, "eth_subscribe") - async def subscribe(self, websocket): - """Subscribe to the newHeads event on the WebSocket endpoint.""" + async def subscribe(self, websocket) -> None: + """Subscribe to the newHeads event on the WebSocket endpoint. + + Args: + websocket: WebSocket connection instance + + Raises: + ValueError: If subscription to newHeads fails + """ subscription_msg = json.dumps( { "id": 1, @@ -44,174 +155,51 @@ async def subscribe(self, websocket): await websocket.send(subscription_msg) response = await websocket.recv() subscription_data = json.loads(response) - if subscription_data.get("result") is None: raise ValueError("Subscription to newHeads failed") - async def unsubscribe(self, websocket): + async def unsubscribe(self, websocket) -> None: + """Clean up WebSocket subscription. + + Args: + websocket: WebSocket connection instance + """ pass async def listen_for_data(self, websocket): - """Listen for a single data message from the WebSocket and process block latency.""" + """Listen for a single data message from the WebSocket and process block latency. + + Args: + websocket: WebSocket connection instance + + Returns: + dict: Block data if received successfully, None otherwise + + Raises: + asyncio.TimeoutError: If no message received within timeout period + """ response = await asyncio.wait_for(websocket.recv(), timeout=self.config.timeout) response_data = json.loads(response) - if "params" in response_data: block = response_data["params"]["result"] return block - return None - def process_data(self, block): - """Calculate block latency in seconds.""" + def process_data(self, block) -> float: + """Calculate block latency in seconds. + + Args: + block (dict): Block data containing timestamp + + Returns: + float: Latency in seconds between block timestamp and current time + + Raises: + ValueError: If block timestamp is invalid or missing + """ block_timestamp_hex = block.get("timestamp", "0x0") block_timestamp = int(block_timestamp_hex, 16) block_time = datetime.fromtimestamp(block_timestamp, timezone.utc) current_time = datetime.now(timezone.utc) latency = (current_time - block_time).total_seconds() return latency - - -class HTTPEthCallLatencyMetric(HttpCallLatencyMetricBase): - """Collects transaction latency for endpoints using eth_call to simulate a transaction. - This metric tracks the time taken for a simulated transaction (eth_call) to be processed by the RPC node. - """ - - def __init__( - self, - handler: "MetricsHandler", # type: ignore - metric_name: str, - labels: MetricLabels, - config: MetricConfig, - **kwargs, - ): - super().__init__( - handler=handler, - metric_name=metric_name, - labels=labels, - config=config, - method="eth_call", - method_params=[ - { - "to": "0xc2edad668740f1aa35e4d8f227fb8e17dca888cd", - "data": "0x1526fe270000000000000000000000000000000000000000000000000000000000000001", - }, - "latest", - ], - **kwargs, - ) - - -class HTTPBlockNumberLatencyMetric(HttpCallLatencyMetricBase): - """Collects call latency for the `eth_blockNumber` method.""" - - def __init__( - self, - handler: "MetricsHandler", # type: ignore - metric_name: str, - labels: MetricLabels, - config: MetricConfig, - **kwargs, - ): - super().__init__( - handler=handler, - metric_name=metric_name, - labels=labels, - config=config, - method="eth_blockNumber", - method_params=None, - **kwargs, - ) - - -class HTTPTxReceiptLatencyMetric(HttpCallLatencyMetricBase): - """Collects call latency for the `eth_getTransactionReceipt` method.""" - - def __init__( - self, - handler: "MetricsHandler", # type: ignore - metric_name: str, - labels: MetricLabels, - config: MetricConfig, - **kwargs, - ): - super().__init__( - handler=handler, - metric_name=metric_name, - labels=labels, - config=config, - method="eth_getTransactionReceipt", - method_params=[ - "0xf033310487c37a86db8099a738ffa2bb62bb06efeb486a65ff595d411b5321f4" - ], - **kwargs, - ) - - -class HTTPAccBalanceLatencyMetric(HttpCallLatencyMetricBase): - """Collects call latency for the `eth_getBalance` method.""" - - def __init__( - self, - handler: "MetricsHandler", # type: ignore - metric_name: str, - labels: MetricLabels, - config: MetricConfig, - **kwargs, - ): - super().__init__( - handler=handler, - metric_name=metric_name, - labels=labels, - config=config, - method="eth_getBalance", - method_params=["0x690B9A9E9aa1C9dB991C7721a92d351Db4FaC990", "pending"], - **kwargs, - ) - - -class HTTPDebugTraceBlockByNumberLatencyMetric(HttpCallLatencyMetricBase): - """Collects call latency for the `debug_traceBlockByNumber` method.""" - - def __init__( - self, - handler: "MetricsHandler", # type: ignore - metric_name: str, - labels: MetricLabels, - config: MetricConfig, - **kwargs, - ): - super().__init__( - handler=handler, - metric_name=metric_name, - labels=labels, - config=config, - method="debug_traceBlockByNumber", - method_params=["latest", {"tracer": "callTracer"}], - **kwargs, - ) - - -class HTTPDebugTraceTxLatencyMetric(HttpCallLatencyMetricBase): - """Collects call latency for the `debug_traceTransaction` method.""" - - def __init__( - self, - handler: "MetricsHandler", # type: ignore - metric_name: str, - labels: MetricLabels, - config: MetricConfig, - **kwargs, - ): - super().__init__( - handler=handler, - metric_name=metric_name, - labels=labels, - config=config, - method="debug_traceTransaction", - method_params=[ - "0x4fc2005859dccab5d9c73c543f533899fe50e25e8d6365c9c335f267d6d12541", - {"tracer": "callTracer"}, - ], - **kwargs, - ) diff --git a/metrics/solana.py b/metrics/solana.py index 3f93bbb..e1df848 100644 --- a/metrics/solana.py +++ b/metrics/solana.py @@ -1,249 +1,103 @@ -"""Solana metrics implementation for WebSocket and HTTP endpoints.""" - -import asyncio -import json -import logging -from datetime import datetime, timezone -from typing import Any, Dict, Optional - -from websockets.client import WebSocketClientProtocol - -from common.metric_config import MetricConfig, MetricLabelKey, MetricLabels -from common.metric_types import HttpCallLatencyMetricBase, WebSocketMetric - - -class WSBlockLatencyMetric(WebSocketMetric): - """Collects block latency for Solana providers using a WebSocket connection. - Suitable for serverless invocation: connects, subscribes, collects one message, and disconnects. - """ - - def __init__( - self, - handler: "MetricsHandler", # type: ignore - metric_name: str, - labels: MetricLabels, - config: MetricConfig, - **kwargs: Dict[str, Any], - ): - ws_endpoint: str = kwargs.get("ws_endpoint", "") - super().__init__( - handler=handler, - metric_name=metric_name, - labels=labels, - config=config, - ws_endpoint=ws_endpoint, - ) - self.labels.update_label(MetricLabelKey.API_METHOD, "blockSubscribe") - self.last_block_hash: Optional[str] = None - - async def subscribe(self, websocket: WebSocketClientProtocol) -> None: - """Subscribe to the newBlocks event on the WebSocket endpoint.""" - subscription_msg: str = json.dumps( - { - "jsonrpc": "2.0", - "id": 1, - "method": "blockSubscribe", - "params": [ - { - "commitment": "confirmed", - "encoding": "jsonParsed", - } - ], - } - ) - await websocket.send(subscription_msg) - response: str = await websocket.recv() - subscription_data: Dict[str, Any] = json.loads(response) - - if subscription_data.get("result") is None: - raise ValueError("Subscription to new blocks failed") - - self.subscription_id = subscription_data.get("result") - - async def unsubscribe(self, websocket: WebSocketClientProtocol) -> None: - """Unsubscribe from the block subscription.""" - unsubscribe_msg: str = json.dumps( - { - "jsonrpc": "2.0", - "id": 1, - "method": "blockUnsubscribe", - "params": [self.subscription_id], - } - ) - await websocket.send(unsubscribe_msg) - response = await websocket.recv() - response_data = json.loads(response) - - if not response_data.get("result", False): - logging.warning("Unsubscribe call failed or returned false") - else: - logging.debug("Successfully unsubscribed from block subscription") - - async def listen_for_data( - self, websocket: WebSocketClientProtocol - ) -> Optional[Dict[str, Any]]: - """Listen for a single data message from the WebSocket and process block latency.""" - response = await asyncio.wait_for(websocket.recv(), timeout=self.config.timeout) - response_data: Dict[str, Any] = json.loads(response) - - if "params" in response_data: - block = response_data["params"]["result"] - return block - - return None - - def process_data(self, block_info: Dict[str, Any]) -> float: - """Calculate block latency in seconds.""" - block_time: Optional[int] = block_info.get("blockTime") - - if block_time is None: - raise ValueError("Block time missing in block data") - - block_datetime: datetime = datetime.fromtimestamp(block_time, timezone.utc) - current_time: datetime = datetime.now(timezone.utc) - latency: float = (current_time - block_datetime).total_seconds() - return latency +"""Solana metrics implementation for HTTP endpoints.""" + +from common.metric_types import HttpCallLatencyMetricBase class HTTPSimulateTxLatencyMetric(HttpCallLatencyMetricBase): - """Collects call latency for the `simulateTransaction` method.""" - - def __init__( - self, - handler: "MetricsHandler", # type: ignore - metric_name: str, - labels: MetricLabels, - config: MetricConfig, - **kwargs, - ): - super().__init__( - handler=handler, - metric_name=metric_name, - labels=labels, - config=config, - method="simulateTransaction", - method_params=[ - "AQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAAEDArczbMia1tLmq7zz4DinMNN0pJ1JtLdqIJPUw3YrGCzYAMHBsgN27lcgB6H2WQvFgyZuJYHa46puOQo9yQ8CVQbd9uHXZaGT2cvhRs7reawctIXtX1s3kTqM9YV+/wCp20C7Wj2aiuk5TReAXo+VTVg8QTHjs0UjNMMKCvpzZ+ABAgEBARU=", - {"encoding": "base64"}, - ], - **kwargs, - ) + """Collects call latency for the simulateTransaction method.""" + + @property + def method(self) -> str: + return "simulateTransaction" + + @staticmethod + def get_params_from_state(state_data: dict) -> list: + """Get parameters for simulating a token transfer.""" + return [ + "AQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABAAEDArczbMia1tLmq7zz4DinMNN0pJ1JtLdqIJPUw3YrGCzYAMHBsgN27lcgB6H2WQvFgyZuJYHa46puOQo9yQ8CVQbd9uHXZaGT2cvhRs7reawctIXtX1s3kTqM9YV+/wCp20C7Wj2aiuk5TReAXo+VTVg8QTHjs0UjNMMKCvpzZ+ABAgEBARU=", + {"encoding": "base64"}, + ] class HTTPGetRecentBlockhashLatencyMetric(HttpCallLatencyMetricBase): - """Collects call latency for the `getLatestBlockhash` method.""" - - def __init__( - self, - handler: "MetricsHandler", # type: ignore - metric_name: str, - labels: MetricLabels, - config: MetricConfig, - **kwargs, - ): - super().__init__( - handler=handler, - metric_name=metric_name, - labels=labels, - config=config, - method="getLatestBlockhash", - method_params=None, - **kwargs, - ) + """Collects call latency for the getLatestBlockhash method.""" + + @property + def method(self) -> str: + return "getLatestBlockhash" + + @staticmethod + def get_params_from_state(state_data: dict) -> list: + """Get empty parameters list for blockhash retrieval.""" + return [] class HTTPGetTxLatencyMetric(HttpCallLatencyMetricBase): - """Collects call latency for the `getTransaction` method.""" - - def __init__( - self, - handler: "MetricsHandler", # type: ignore - metric_name: str, - labels: MetricLabels, - config: MetricConfig, - **kwargs, - ): - super().__init__( - handler=handler, - metric_name=metric_name, - labels=labels, - config=config, - method="getTransaction", - method_params=[ - "4VAGET7z5g7ogVGmbmZ6KBtF6DS8ftLWzD65BXZWQJjwASUqLod7LhGB6mqThcqo97QcC7r7uNmBY8GwsnLAA52n", - {"encoding": "jsonParsed", "maxSupportedTransactionVersion": 0}, - ], - **kwargs, - ) + """Collects call latency for the getTransaction method.""" + + @property + def method(self) -> str: + return "getTransaction" + + @staticmethod + def validate_state(state_data: dict) -> bool: + """Validate blockchain state contains transaction signature.""" + return bool(state_data and state_data.get("tx")) + + @staticmethod + def get_params_from_state(state_data: dict) -> list: + """Get parameters using transaction signature from state.""" + return [ + state_data["tx"], + {"encoding": "jsonParsed", "maxSupportedTransactionVersion": 0}, + ] class HTTPGetBalanceLatencyMetric(HttpCallLatencyMetricBase): - """Collects call latency for the `getBalance` method.""" - - def __init__( - self, - handler: "MetricsHandler", # type: ignore - metric_name: str, - labels: MetricLabels, - config: MetricConfig, - **kwargs, - ): - super().__init__( - handler=handler, - metric_name=metric_name, - labels=labels, - config=config, - method="getBalance", - method_params=["9WzDXwBbmkg8ZTbNMqUxvQRAyrZzDsGYdLVL9zYtAWWM"], - **kwargs, - ) + """Collects call latency for the getBalance method.""" + + @property + def method(self) -> str: + return "getBalance" + + @staticmethod + def get_params_from_state(state_data: dict) -> list: + """Get parameters for balance check of monitoring address.""" + return ["9WzDXwBbmkg8ZTbNMqUxvQRAyrZzDsGYdLVL9zYtAWWM"] class HTTPGetBlockLatencyMetric(HttpCallLatencyMetricBase): - """Collects call latency for the `getBlock` method.""" - - def __init__( - self, - handler: "MetricsHandler", # type: ignore - metric_name: str, - labels: MetricLabels, - config: MetricConfig, - **kwargs, - ): - super().__init__( - handler=handler, - metric_name=metric_name, - labels=labels, - config=config, - method="getBlock", - method_params=[ - 239462061, - {"encoding": "jsonParsed", "maxSupportedTransactionVersion": 0}, - ], - **kwargs, - ) + """Collects call latency for the getBlock method.""" + + @property + def method(self) -> str: + return "getBlock" + + @staticmethod + def validate_state(state_data: dict) -> bool: + """Validate blockchain state contains block slot number.""" + return bool(state_data and state_data.get("block")) + + @staticmethod + def get_params_from_state(state_data: dict) -> list: + """Get parameters using block slot from state.""" + return [ + int(state_data["block"]), + {"encoding": "jsonParsed", "maxSupportedTransactionVersion": 0}, + ] class HTTPGetProgramAccsLatencyMetric(HttpCallLatencyMetricBase): - """Collects call latency for the `getProgramAccounts` method.""" - - def __init__( - self, - handler: "MetricsHandler", # type: ignore - metric_name: str, - labels: MetricLabels, - config: MetricConfig, - **kwargs, - ): - super().__init__( - handler=handler, - metric_name=metric_name, - labels=labels, - config=config, - method="getProgramAccounts", - method_params=[ - "FsJ3A3u2vn5cTVofAjvy6y5kwABJAqYWpe4975bi2epH", - {"encoding": "jsonParsed"}, - ], - **kwargs, - ) + """Collects call latency for the getProgramAccounts method.""" + + @property + def method(self) -> str: + return "getProgramAccounts" + + @staticmethod + def get_params_from_state(state_data: dict) -> list: + """Get parameters for program accounts query.""" + return [ + "FsJ3A3u2vn5cTVofAjvy6y5kwABJAqYWpe4975bi2epH", + {"encoding": "jsonParsed"}, + ] diff --git a/metrics/ton.py b/metrics/ton.py index 8b46258..2b0c050 100644 --- a/metrics/ton.py +++ b/metrics/ton.py @@ -1,136 +1,98 @@ """TON (The Open Network) metrics implementation for HTTP endpoints.""" -from common.metric_config import MetricConfig, MetricLabels from common.metric_types import HttpCallLatencyMetricBase class HTTPRunGetMethodLatencyMetric(HttpCallLatencyMetricBase): - """Collects call latency for the `runGetMethod` method.""" - - def __init__( - self, - handler: "MetricsHandler", # type: ignore - metric_name: str, - labels: MetricLabels, - config: MetricConfig, - **kwargs, - ): - super().__init__( - handler=handler, - metric_name=metric_name, - labels=labels, - config=config, - method="runGetMethod", - method_params={ - "address": "EQCxE6mUtQJKFnGfaROTKOt1lZbDiiX1kCixRv7Nw2Id_sDs", - "method": "get_wallet_address", - "stack": [ - [ - "tvm.Slice", - "te6cckEBAQEAJAAAQ4AbUzrTQYTUv8s/I9ds2TSZgRjyrgl2S2LKcZMEFcxj6PARy3rF", - ] - ], - }, - **kwargs, - ) + """Collects call latency for smart contract method execution.""" + + @property + def method(self) -> str: + return "runGetMethod" + + @staticmethod + def get_params_from_state(state_data: dict) -> dict: + """Returns parameters for TVM smart contract method call.""" + return { + "address": "EQCxE6mUtQJKFnGfaROTKOt1lZbDiiX1kCixRv7Nw2Id_sDs", + "method": "get_wallet_address", + "stack": [ + [ + "tvm.Slice", + "te6cckEBAQEAJAAAQ4AbUzrTQYTUv8s/I9ds2TSZgRjyrgl2S2LKcZMEFcxj6PARy3rF", + ] + ], + } class HTTPGetBlockHeaderLatencyMetric(HttpCallLatencyMetricBase): - """Collects call latency for the `getBlockHeader` method.""" - - def __init__( - self, - handler: "MetricsHandler", # type: ignore - metric_name: str, - labels: MetricLabels, - config: MetricConfig, - **kwargs, - ): - super().__init__( - handler=handler, - metric_name=metric_name, - labels=labels, - config=config, - method="getBlockHeader", - method_params={ - "workchain": -1, - "shard": "-9223372036854775808", - "seqno": 39064874, - }, - **kwargs, - ) + """Collects call latency for masterchain block header retrieval.""" + + @property + def method(self) -> str: + return "getBlockHeader" + + @staticmethod + def validate_state(state_data: dict) -> bool: + """Validates that required block identifier exists in state data.""" + return bool(state_data and state_data.get("block")) + + @staticmethod + def get_params_from_state(state_data: dict) -> dict: + """Returns parameters using TON block identifier components.""" + workchain, shard, seqno = state_data["block"].split(":") + return { + "workchain": int(workchain), + "shard": shard, + "seqno": int(seqno), + } class HTTPGetWalletTxsLatencyMetric(HttpCallLatencyMetricBase): - """Collects call latency for the `getWalletInformation` method.""" - - def __init__( - self, - handler: "MetricsHandler", # type: ignore - metric_name: str, - labels: MetricLabels, - config: MetricConfig, - **kwargs, - ): - super().__init__( - handler=handler, - metric_name=metric_name, - labels=labels, - config=config, - method="getWalletInformation", - method_params={ - "address": "EQDtFpEwcFAEcRe5mLVh2N6C0x-_hJEM7W61_JLnSF74p4q2" - }, - **kwargs, - ) + """Collects call latency for TON wallet information retrieval.""" + + @property + def method(self) -> str: + return "getWalletInformation" + + @staticmethod + def get_params_from_state(state_data: dict) -> dict: + """Returns parameters for TON wallet query.""" + return {"address": "EQDtFpEwcFAEcRe5mLVh2N6C0x-_hJEM7W61_JLnSF74p4q2"} class HTTPGetAddressBalanceLatencyMetric(HttpCallLatencyMetricBase): - """Collects call latency for the `getAddressBalance` method.""" - - def __init__( - self, - handler: "MetricsHandler", # type: ignore - metric_name: str, - labels: MetricLabels, - config: MetricConfig, - **kwargs, - ): - super().__init__( - handler=handler, - metric_name=metric_name, - labels=labels, - config=config, - method="getAddressBalance", - method_params={ - "address": "EQDtFpEwcFAEcRe5mLVh2N6C0x-_hJEM7W61_JLnSF74p4q2" - }, - **kwargs, - ) + """Collects call latency for TON address balance queries.""" + + @property + def method(self) -> str: + return "getAddressBalance" + + @staticmethod + def get_params_from_state(state_data: dict) -> dict: + """Returns parameters for TON address balance check.""" + return {"address": "EQDtFpEwcFAEcRe5mLVh2N6C0x-_hJEM7W61_JLnSF74p4q2"} class HTTPGetBlockTxsLatencyMetric(HttpCallLatencyMetricBase): - """Collects call latency for the `getBlockTransactions` method.""" - - def __init__( - self, - handler: "MetricsHandler", # type: ignore - metric_name: str, - labels: MetricLabels, - config: MetricConfig, - **kwargs, - ): - super().__init__( - handler=handler, - metric_name=metric_name, - labels=labels, - config=config, - method="getBlockTransactions", - method_params={ - "workchain": -1, - "shard": "-9223372036854775808", - "seqno": 39064874, - "count": 40, - }, - **kwargs, - ) + """Collects call latency for TON block transactions retrieval.""" + + @property + def method(self) -> str: + return "getBlockTransactions" + + @staticmethod + def validate_state(state_data: dict) -> bool: + """Validates that required block identifier exists in state data.""" + return bool(state_data and state_data.get("block")) + + @staticmethod + def get_params_from_state(state_data: dict) -> dict: + """Returns parameters using TON block identifier components.""" + workchain, shard, seqno = state_data["block"].split(":") + return { + "workchain": int(workchain), + "shard": shard, + "seqno": int(seqno), + "count": 40, + } diff --git a/tests/test_api_read.py b/tests/test_api_read.py index 7efbbb9..4574d8e 100644 --- a/tests/test_api_read.py +++ b/tests/test_api_read.py @@ -29,9 +29,9 @@ def main(): setup_environment() # Import handler after environment setup - from api.read.solana import handler as SolanaHandler + from api.read.base import handler as Handler - server = HTTPServer(("localhost", 8000), SolanaHandler) + server = HTTPServer(("localhost", 8000), Handler) print("Server started at http://localhost:8000") server.serve_forever()