@@ -2,28 +2,35 @@ package monitorapi
22
33import (
44 "context"
5- "errors"
65 "fmt"
76 "log/slog"
87 "net"
9- "os"
108 "sync"
9+ "sync/atomic"
1110 "time"
1211
12+ "github.com/djherbis/buffer"
13+ "github.com/djherbis/nio/v3"
14+
1315 "github.com/arduino/arduino-router/msgpackrouter"
1416 "github.com/arduino/arduino-router/msgpackrpc"
1517)
1618
17- var lock sync.RWMutex
18- var socket net.Conn
19- var monitorConnectionLost sync.Cond = * sync .NewCond (& lock )
19+ var socketsLock sync.RWMutex
20+ var sockets map [net.Conn ]struct {}
21+ var monSendPipeRd * nio.PipeReader
22+ var monSendPipeWr * nio.PipeWriter
23+ var bytesInSendPipe atomic.Int64
2024
2125// Register the Monitor API methods
2226func Register (router * msgpackrouter.Router , addr string ) error {
2327 listener , err := net .Listen ("tcp" , addr )
2428 if err != nil {
2529 return fmt .Errorf ("failed to start listener: %w" , err )
2630 }
31+ sockets = make (map [net.Conn ]struct {})
32+ monSendPipeRd , monSendPipeWr = nio .Pipe (buffer .New (1024 ))
33+
2734 go connectionHandler (listener )
2835 _ = router .RegisterMethod ("mon/connected" , connected )
2936 _ = router .RegisterMethod ("mon/read" , read )
@@ -41,15 +48,26 @@ func connectionHandler(listener net.Listener) {
4148 }
4249
4350 slog .Info ("Accepted monitor connection" , "from" , conn .RemoteAddr ())
44- lock .Lock ()
45- socket = conn
46- lock .Unlock ()
47-
48- lock .Lock ()
49- for socket != nil {
50- monitorConnectionLost .Wait ()
51- }
52- lock .Unlock ()
51+ socketsLock .Lock ()
52+ sockets [conn ] = struct {}{}
53+ socketsLock .Unlock ()
54+
55+ go func () {
56+ defer close (conn )
57+
58+ // Read from the connection and write to the monitor send pipe
59+ buff := make ([]byte , 1024 )
60+ for {
61+ if n , err := conn .Read (buff ); err != nil {
62+ // Connection closed from client
63+ return
64+ } else if written , err := monSendPipeWr .Write (buff [:n ]); err != nil {
65+ return
66+ } else {
67+ bytesInSendPipe .Add (int64 (written ))
68+ }
69+ }
70+ }()
5371 }
5472}
5573
@@ -58,9 +76,9 @@ func connected(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_
5876 return nil , []any {1 , "Invalid number of parameters, expected no parameters" }
5977 }
6078
61- lock .RLock ()
62- connected := socket != nil
63- lock .RUnlock ()
79+ socketsLock .RLock ()
80+ connected := len ( sockets ) > 0
81+ socketsLock .RUnlock ()
6482
6583 return connected , nil
6684}
@@ -74,33 +92,18 @@ func read(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_resul
7492 return nil , []any {1 , "Invalid parameter type, expected positive int for max bytes to read" }
7593 }
7694
77- lock .RLock ()
78- conn := socket
79- lock .RUnlock ()
80-
81- // No active connection, return empty slice
82- if conn == nil {
95+ if bytesInSendPipe .Load () == 0 {
8396 return []byte {}, nil
8497 }
8598
8699 buffer := make ([]byte , maxBytes )
87- // It seems that the only way to make a non-blocking read is to set a read deadline.
88- // BTW setting the read deadline to time.Now() will always returns an empty (zero bytes)
89- // read, so we set it to a very short duration in the future.
90- if err := conn .SetReadDeadline (time .Now ().Add (time .Millisecond )); err != nil {
91- return nil , []any {3 , "Failed to set read timeout: " + err .Error ()}
92- }
93- n , err := conn .Read (buffer )
94- if errors .Is (err , os .ErrDeadlineExceeded ) {
95- // timeout
96- } else if err != nil {
97- // If we get an error other than timeout, we assume the connection is lost.
98- slog .Error ("Monitor connection lost, closing connection" , "error" , err )
99- close ()
100+ if readed , err := monSendPipeRd .Read (buffer ); err != nil {
101+ slog .Error ("Error reading monitor" , "error" , err )
100102 return nil , []any {3 , "Failed to read from connection: " + err .Error ()}
103+ } else {
104+ bytesInSendPipe .Add (int64 (- readed ))
105+ return buffer [:readed ], nil
101106 }
102-
103- return buffer [:n ], nil
104107}
105108
106109func write (ctx context.Context , rpc * msgpackrpc.Connection , params []any ) (_result any , _err any ) {
@@ -117,41 +120,52 @@ func write(ctx context.Context, rpc *msgpackrpc.Connection, params []any) (_resu
117120 }
118121 }
119122
120- lock .RLock ()
121- conn := socket
122- lock .RUnlock ()
123-
124- if conn == nil { // No active connection, drop the data
125- return len (data ), nil
123+ socketsLock .RLock ()
124+ clients := make ([]net.Conn , 0 , len (sockets ))
125+ for c := range sockets {
126+ clients = append (clients , c )
126127 }
128+ socketsLock .RUnlock ()
127129
128- n , err := conn .Write (data )
129- if err != nil {
130- // If we get an error, we assume the connection is lost.
131- slog .Error ("Monitor connection lost, closing connection" , "error" , err )
132- close ()
133-
134- return nil , []any {3 , "Failed to write to connection: " + err .Error ()}
130+ for _ , conn := range clients {
131+ if len (clients ) > 1 {
132+ // If there are multiple clients, allow 500 ms for the data to
133+ // get through each one.
134+ _ = conn .SetWriteDeadline (time .Now ().Add (time .Millisecond * 500 ))
135+ } else {
136+ _ = conn .SetWriteDeadline (time.Time {})
137+ }
138+ if _ , err := conn .Write (data ); err != nil {
139+ // If we get an error, we assume the connection is lost.
140+ slog .Error ("Monitor connection lost, closing connection" , "error" , err )
141+ close (conn )
142+ }
135143 }
136144
137- return n , nil
145+ return len (data ), nil
146+ }
147+
148+ func close (conn net.Conn ) {
149+ socketsLock .Lock ()
150+ delete (sockets , conn )
151+ socketsLock .Unlock ()
152+ _ = conn .Close ()
138153}
139154
140155func reset (ctx context.Context , rpc * msgpackrpc.Connection , params []any ) (_result any , _err any ) {
141156 if len (params ) != 0 {
142157 return nil , []any {1 , "Invalid number of parameters, expected no parameters" }
143158 }
144- close ()
145- slog .Info ("Monitor connection reset" )
146- return true , nil
147- }
148159
149- func close () {
150- lock .Lock ()
151- if socket != nil {
152- _ = socket .Close ()
160+ socketsLock .Lock ()
161+ socketsToClose := sockets
162+ sockets = make (map [net.Conn ]struct {})
163+ socketsLock .Unlock ()
164+
165+ for c := range socketsToClose {
166+ _ = c .Close ()
153167 }
154- socket = nil
155- monitorConnectionLost . Broadcast ( )
156- lock . Unlock ()
168+
169+ slog . Info ( "Monitor connection reset" )
170+ return true , nil
157171}
0 commit comments