@@ -20,6 +20,7 @@ import (
20
20
"errors"
21
21
"fmt"
22
22
"io"
23
+ "sync/atomic"
23
24
24
25
"github.com/arduino/arduino-cli/commands"
25
26
"github.com/arduino/arduino-cli/commands/board"
@@ -490,6 +491,10 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
490
491
_ = syncSend .Send (& rpc.MonitorResponse {Success : true })
491
492
492
493
cancelCtx , cancel := context .WithCancel (stream .Context ())
494
+ gracefulCloseInitiated := & atomic.Bool {}
495
+ gracefuleCloseCtx , gracefulCloseCancel := context .WithCancel (context .Background ())
496
+
497
+ // gRPC stream receiver (gRPC data -> monitor, config, close)
493
498
go func () {
494
499
defer cancel ()
495
500
for {
@@ -509,9 +514,11 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
509
514
}
510
515
}
511
516
if closeMsg := msg .GetClose (); closeMsg {
517
+ gracefulCloseInitiated .Store (true )
512
518
if err := portProxy .Close (); err != nil {
513
519
logrus .WithError (err ).Debug ("Error closing monitor port" )
514
520
}
521
+ gracefulCloseCancel ()
515
522
}
516
523
tx := msg .GetTxData ()
517
524
for len (tx ) > 0 {
@@ -528,8 +535,9 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
528
535
}
529
536
}()
530
537
538
+ // gRPC stream sender (monitor -> gRPC)
531
539
go func () {
532
- defer cancel ()
540
+ defer cancel () // unlock the receiver
533
541
buff := make ([]byte , 4096 )
534
542
for {
535
543
n , err := portProxy .Read (buff )
@@ -547,6 +555,11 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
547
555
}()
548
556
549
557
<- cancelCtx .Done ()
550
- portProxy .Close ()
558
+ if gracefulCloseInitiated .Load () {
559
+ // Port closing has been initiated in the receiver
560
+ <- gracefuleCloseCtx .Done ()
561
+ } else {
562
+ portProxy .Close ()
563
+ }
551
564
return nil
552
565
}
0 commit comments