@@ -20,6 +20,7 @@ import (
20
20
"errors"
21
21
"fmt"
22
22
"io"
23
+ "sync/atomic"
23
24
24
25
"github.com/arduino/arduino-cli/arduino"
25
26
"github.com/arduino/arduino-cli/commands"
@@ -451,6 +452,10 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
451
452
_ = syncSend .Send (& rpc.MonitorResponse {Success : true })
452
453
453
454
cancelCtx , cancel := context .WithCancel (stream .Context ())
455
+ gracefulCloseInitiated := & atomic.Bool {}
456
+ gracefuleCloseCtx , gracefulCloseCancel := context .WithCancel (context .Background ())
457
+
458
+ // gRPC stream receiver (gRPC data -> monitor, config, close)
454
459
go func () {
455
460
defer cancel ()
456
461
for {
@@ -470,9 +475,11 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
470
475
}
471
476
}
472
477
if closeMsg := msg .GetClose (); closeMsg {
478
+ gracefulCloseInitiated .Store (true )
473
479
if err := portProxy .Close (); err != nil {
474
480
logrus .WithError (err ).Debug ("Error closing monitor port" )
475
481
}
482
+ gracefulCloseCancel ()
476
483
}
477
484
tx := msg .GetTxData ()
478
485
for len (tx ) > 0 {
@@ -489,8 +496,9 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
489
496
}
490
497
}()
491
498
499
+ // gRPC stream sender (monitor -> gRPC)
492
500
go func () {
493
- defer cancel ()
501
+ defer cancel () // unlock the receiver
494
502
buff := make ([]byte , 4096 )
495
503
for {
496
504
n , err := portProxy .Read (buff )
@@ -508,6 +516,11 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
508
516
}()
509
517
510
518
<- cancelCtx .Done ()
511
- portProxy .Close ()
519
+ if gracefulCloseInitiated .Load () {
520
+ // Port closing has been initiated in the receiver
521
+ <- gracefuleCloseCtx .Done ()
522
+ } else {
523
+ portProxy .Close ()
524
+ }
512
525
return nil
513
526
}
0 commit comments