-
Notifications
You must be signed in to change notification settings - Fork 31
/
Copy pathinterfaces.py
168 lines (126 loc) · 4.68 KB
/
interfaces.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# SPDX-FileCopyrightText: Copyright (c) 2023 Michał Pokusa
#
# SPDX-License-Identifier: MIT
"""
`adafruit_httpserver.interfaces`
====================================================
* Author(s): Michał Pokusa
"""
try:
from typing import List, Tuple, Dict, Union, Any
except ImportError:
pass
class _ISocket: # pylint: disable=missing-function-docstring,no-self-use,unused-argument
"""A class for typing necessary methods for a socket object."""
def accept(self) -> Tuple["_ISocket", Tuple[str, int]]:
...
def bind(self, address: Tuple[str, int]) -> None:
...
def setblocking(self, flag: bool) -> None:
...
def settimeout(self, value: "Union[float, None]") -> None:
...
def setsockopt(self, level: int, optname: int, value: int) -> None:
...
def listen(self, backlog: int) -> None:
...
def send(self, data: bytes) -> int:
...
def recv_into(self, buffer: memoryview, nbytes: int) -> int:
...
def close(self) -> None:
...
class _ISocketPool: # pylint: disable=missing-function-docstring,no-self-use,unused-argument
"""A class to typing necessary methods and properties for a socket pool object."""
AF_INET: int
SO_REUSEADDR: int
SOCK_STREAM: int
SOL_SOCKET: int
def socket( # pylint: disable=redefined-builtin
self,
family: int = ...,
type: int = ...,
proto: int = ...,
) -> _ISocket:
...
def getaddrinfo( # pylint: disable=redefined-builtin,too-many-arguments
self,
host: str,
port: int,
family: int = ...,
type: int = ...,
proto: int = ...,
flags: int = ...,
) -> Tuple[int, int, int, str, Tuple[str, int]]:
...
class _IFieldStorage:
"""Interface with shared methods for QueryParams, FormData and Headers."""
_storage: Dict[str, List[Any]]
def _add_field_value(self, field_name: str, value: Any) -> None:
if field_name not in self._storage:
self._storage[field_name] = [value]
else:
self._storage[field_name].append(value)
def get(self, field_name: str, default: Any = None) -> Union[Any, None]:
"""Get the value of a field."""
return self._storage.get(field_name, [default])[0]
def get_list(self, field_name: str) -> List[Any]:
"""Get the list of values of a field."""
return self._storage.get(field_name, [])
@property
def fields(self):
"""Returns a list of field names."""
return list(self._storage.keys())
def items(self):
"""Returns a list of (name, value) tuples."""
return [(key, value) for key in self.fields for value in self.get_list(key)]
def keys(self):
"""Returns a list of header names."""
return self.fields
def values(self):
"""Returns a list of header values."""
return [value for key in self.keys() for value in self.get_list(key)]
def __getitem__(self, field_name: str):
return self._storage[field_name][0]
def __iter__(self):
return iter(self._storage)
def __len__(self) -> int:
return len(self._storage)
def __contains__(self, key: str) -> bool:
return key in self._storage
def __repr__(self) -> str:
return f"<{self.__class__.__name__} {repr(self._storage)}>"
def _encode_html_entities(value: Union[str, None]) -> Union[str, None]:
"""Encodes unsafe HTML characters that could enable XSS attacks."""
if value is None:
return None
return (
str(value)
.replace("&", "&")
.replace("<", "<")
.replace(">", ">")
.replace('"', """)
.replace("'", "'")
)
class _IXSSSafeFieldStorage(_IFieldStorage):
def get(
self, field_name: str, default: Any = None, *, safe=True
) -> Union[Any, None]:
if safe:
return _encode_html_entities(super().get(field_name, default))
_debug_warning_nonencoded_output()
return super().get(field_name, default)
def get_list(self, field_name: str, *, safe=True) -> List[Any]:
if safe:
return [
_encode_html_entities(value) for value in super().get_list(field_name)
]
_debug_warning_nonencoded_output()
return super().get_list(field_name)
def _debug_warning_nonencoded_output():
"""Warns about XSS risks."""
print(
"WARNING: Setting safe to False makes XSS vulnerabilities possible by "
"allowing access to raw untrusted values submitted by users. If this data is reflected "
"or shown within HTML without proper encoding it could enable Cross-Site Scripting."
)