Skip to content

Commit fd4ea9c

Browse files
authored
Implementation of Msgpack RPC Monitor API
1 parent 26e907b commit fd4ea9c

File tree

2 files changed

+170
-4
lines changed

2 files changed

+170
-4
lines changed

cmd/router/main.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414
"syscall"
1515
"time"
1616

17+
"github.com/arduino/router/monitorapi"
1718
"github.com/arduino/router/msgpackrouter"
1819
"github.com/arduino/router/msgpackrpc"
1920
networkapi "github.com/arduino/router/network-api"
@@ -25,10 +26,11 @@ import (
2526

2627
// Server configuration
2728
type Config struct {
28-
LogLevel slog.Level
29-
ListenTCPAddr string
30-
ListenUnixAddr string
31-
SerialPortAddr string
29+
LogLevel slog.Level
30+
ListenTCPAddr string
31+
ListenUnixAddr string
32+
SerialPortAddr string
33+
MonitorPortAddr string
3234
}
3335

3436
func main() {
@@ -56,6 +58,7 @@ func main() {
5658
cmd.Flags().StringVarP(&cfg.ListenTCPAddr, "listen-port", "l", ":8900", "Listening port for RPC services")
5759
cmd.Flags().StringVarP(&cfg.ListenUnixAddr, "unix-port", "u", "/var/run/arduino-router.sock", "Listening port for RPC services")
5860
cmd.Flags().StringVarP(&cfg.SerialPortAddr, "serial-port", "p", "", "Serial port address")
61+
cmd.Flags().StringVarP(&cfg.MonitorPortAddr, "monitor-port", "m", "127.0.0.1:7500", "Listening port for MCU monitor proxy")
5962
if err := cmd.Execute(); err != nil {
6063
slog.Error("Error executing command.", "error", err)
6164
}
@@ -107,6 +110,7 @@ func startRouter(cfg Config) error {
107110

108111
// Open listening UNIX socket
109112
if cfg.ListenUnixAddr != "" {
113+
_ = os.Remove(cfg.ListenUnixAddr) // Remove the socket file if it exists
110114
if l, err := net.Listen("unix", cfg.ListenUnixAddr); err != nil {
111115
return fmt.Errorf("failed to listen on UNIX socket %s: %w", cfg.ListenUnixAddr, err)
112116
} else {
@@ -121,6 +125,11 @@ func startRouter(cfg Config) error {
121125
// Register TCP network API methods
122126
networkapi.Register(router)
123127

128+
// Register monitor API methods
129+
if err := monitorapi.Register(router, cfg.MonitorPortAddr); err != nil {
130+
slog.Error("Failed to register monitor API", "err", err)
131+
}
132+
124133
// Open serial port if specified
125134
if cfg.SerialPortAddr != "" {
126135
var serialLock sync.Mutex
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
package monitorapi
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"log/slog"
8+
"net"
9+
"os"
10+
"sync"
11+
"time"
12+
13+
"github.com/arduino/router/msgpackrouter"
14+
"github.com/arduino/router/msgpackrpc"
15+
)
16+
17+
var lock sync.RWMutex
18+
var socket net.Conn
19+
var monitorConnectionLost sync.Cond = *sync.NewCond(&lock)
20+
21+
// Register the Monitor API methods
22+
func Register(router *msgpackrouter.Router, addr string) error {
23+
listener, err := net.Listen("tcp", addr)
24+
if err != nil {
25+
return fmt.Errorf("failed to start listener: %w", err)
26+
}
27+
go connectionHandler(listener)
28+
_ = router.RegisterMethod("mon/connected", connected)
29+
_ = router.RegisterMethod("mon/read", read)
30+
_ = router.RegisterMethod("mon/write", write)
31+
_ = router.RegisterMethod("mon/reset", reset)
32+
return nil
33+
}
34+
35+
func connectionHandler(listener net.Listener) {
36+
for {
37+
conn, err := listener.Accept()
38+
if err != nil {
39+
slog.Error("Failed to accept monitor connection", "error", err)
40+
return
41+
}
42+
43+
slog.Info("Accepted monitor connection", "from", conn.RemoteAddr())
44+
lock.Lock()
45+
socket = conn
46+
lock.Unlock()
47+
48+
lock.Lock()
49+
for socket != nil {
50+
monitorConnectionLost.Wait()
51+
}
52+
lock.Unlock()
53+
}
54+
}
55+
56+
func connected(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_result any, _err any) {
57+
if len(params) != 0 {
58+
return nil, []any{1, "Invalid number of parameters, expected no parameters"}
59+
}
60+
61+
lock.RLock()
62+
connected := socket != nil
63+
lock.RUnlock()
64+
65+
return connected, nil
66+
}
67+
68+
func read(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_result any, _err any) {
69+
if len(params) != 1 {
70+
return nil, []any{1, "Invalid number of parameters, expected max bytes to read"}
71+
}
72+
maxBytes, ok := msgpackrpc.ToUint(params[0])
73+
if !ok {
74+
return nil, []any{1, "Invalid parameter type, expected positive int for max bytes to read"}
75+
}
76+
77+
lock.RLock()
78+
conn := socket
79+
lock.RUnlock()
80+
81+
// No active connection, return empty slice
82+
if conn == nil {
83+
return []byte{}, nil
84+
}
85+
86+
buffer := make([]byte, maxBytes)
87+
// It seems that the only way to make a non-blocking read is to set a read deadline.
88+
// BTW setting the read deadline to time.Now() will always returns an empty (zero bytes)
89+
// read, so we set it to a very short duration in the future.
90+
if err := conn.SetReadDeadline(time.Now().Add(time.Millisecond)); err != nil {
91+
return nil, []any{3, "Failed to set read timeout: " + err.Error()}
92+
}
93+
n, err := conn.Read(buffer)
94+
if errors.Is(err, os.ErrDeadlineExceeded) {
95+
// timeout
96+
} else if err != nil {
97+
// If we get an error other than timeout, we assume the connection is lost.
98+
slog.Error("Monitor connection lost, closing connection", "error", err)
99+
close()
100+
return nil, []any{3, "Failed to read from connection: " + err.Error()}
101+
}
102+
103+
return buffer[:n], nil
104+
}
105+
106+
func write(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_result any, _err any) {
107+
if len(params) != 1 {
108+
return nil, []any{1, "Invalid number of parameters, expected data to write"}
109+
}
110+
data, ok := params[0].([]byte)
111+
if !ok {
112+
if dataStr, ok := params[0].(string); ok {
113+
data = []byte(dataStr)
114+
} else {
115+
// If data is not []byte or string, return an error
116+
return nil, []any{1, "Invalid parameter type, expected []byte or string for data to write"}
117+
}
118+
}
119+
120+
lock.RLock()
121+
conn := socket
122+
lock.RUnlock()
123+
124+
if conn == nil { // No active connection, drop the data
125+
return len(data), nil
126+
}
127+
128+
n, err := conn.Write(data)
129+
if err != nil {
130+
// If we get an error, we assume the connection is lost.
131+
slog.Error("Monitor connection lost, closing connection", "error", err)
132+
close()
133+
134+
return nil, []any{3, "Failed to write to connection: " + err.Error()}
135+
}
136+
137+
return n, nil
138+
}
139+
140+
func reset(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_result any, _err any) {
141+
if len(params) != 0 {
142+
return nil, []any{1, "Invalid number of parameters, expected no parameters"}
143+
}
144+
close()
145+
slog.Info("Monitor connection reset")
146+
return true, nil
147+
}
148+
149+
func close() {
150+
lock.Lock()
151+
if socket != nil {
152+
_ = socket.Close()
153+
}
154+
socket = nil
155+
monitorConnectionLost.Broadcast()
156+
lock.Unlock()
157+
}

0 commit comments

Comments
 (0)