-
Notifications
You must be signed in to change notification settings - Fork 1.8k
/
Copy pathnet.py
75 lines (59 loc) · 1.53 KB
/
net.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
from typing import (
Awaitable,
Callable,
)
from web3._utils.rpc_abi import (
RPC,
)
from web3.method import (
Method,
default_root_munger,
)
from web3.module import (
Module,
)
class Net(Module):
_listening: Method[Callable[[], bool]] = Method(
RPC.net_listening,
mungers=[default_root_munger],
)
_peer_count: Method[Callable[[], int]] = Method(
RPC.net_peerCount,
mungers=[default_root_munger],
)
_version: Method[Callable[[], str]] = Method(
RPC.net_version,
mungers=[default_root_munger],
)
@property
def listening(self) -> bool:
return self._listening()
@property
def peer_count(self) -> int:
return self._peer_count()
@property
def version(self) -> str:
return self._version()
class AsyncNet(Module):
is_async = True
_listening: Method[Callable[[], Awaitable[bool]]] = Method(
RPC.net_listening,
mungers=[default_root_munger],
)
_peer_count: Method[Callable[[], Awaitable[int]]] = Method(
RPC.net_peerCount,
mungers=[default_root_munger],
)
_version: Method[Callable[[], Awaitable[str]]] = Method(
RPC.net_version,
mungers=[default_root_munger],
)
@property
async def listening(self) -> bool:
return await self._listening()
@property
async def peer_count(self) -> int:
return await self._peer_count()
@property
async def version(self) -> str:
return await self._version()