forked from adafruit/Adafruit_CircuitPython_HTTPServer
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathhttpserver_sse.py
64 lines (46 loc) · 1.62 KB
/
httpserver_sse.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
# SPDX-FileCopyrightText: 2023 Michał Pokusa
#
# SPDX-License-Identifier: Unlicense
from time import monotonic
import microcontroller
import socketpool
import wifi
from adafruit_httpserver import Server, Request, Response, SSEResponse, GET
pool = socketpool.SocketPool(wifi.radio)
server = Server(pool, debug=True)
sse_response: SSEResponse = None
next_event_time = monotonic()
HTML_TEMPLATE = """
<html lang="en">
<head>
<title>Server-Sent Events Client</title>
</head>
<body>
<p>CPU temperature: <strong>-</strong>°C</p>
<script>
const eventSource = new EventSource('/connect-client');
const cpuTemp = document.querySelector('strong');
eventSource.onmessage = event => cpuTemp.textContent = event.data;
eventSource.onerror = error => cpuTemp.textContent = error;
</script>
</body>
</html>
"""
@server.route("/client", GET)
def client(request: Request):
return Response(request, HTML_TEMPLATE, content_type="text/html")
@server.route("/connect-client", GET)
def connect_client(request: Request):
global sse_response # pylint: disable=global-statement
if sse_response is not None:
sse_response.close() # Close any existing connection
sse_response = SSEResponse(request)
return sse_response
server.start(str(wifi.radio.ipv4_address))
while True:
server.poll()
# Send an event every second
if sse_response is not None and next_event_time < monotonic():
cpu_temp = round(microcontroller.cpu.temperature, 2)
sse_response.send_event(str(cpu_temp))
next_event_time = monotonic() + 1