Skip to content

feature: Added gRPC close signal to Monitor call (allows graceful close of monitor) #2276

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Jan 2, 2024
30 changes: 26 additions & 4 deletions commands/daemon/daemon.go
Original file line number Diff line number Diff line change
@@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"io"
"sync/atomic"

"github.com/arduino/arduino-cli/commands"
"github.com/arduino/arduino-cli/commands/board"
@@ -477,7 +478,11 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
return err
}

portProxy, _, err := monitor.Monitor(stream.Context(), req)
openReq := req.GetOpenRequest()
if openReq == nil {
return &cmderrors.InvalidInstanceError{}
}
portProxy, _, err := monitor.Monitor(stream.Context(), openReq)
if err != nil {
return err
}
@@ -486,6 +491,10 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
_ = syncSend.Send(&rpc.MonitorResponse{Success: true})

cancelCtx, cancel := context.WithCancel(stream.Context())
gracefulCloseInitiated := &atomic.Bool{}
gracefuleCloseCtx, gracefulCloseCancel := context.WithCancel(context.Background())

// gRPC stream receiver (gRPC data -> monitor, config, close)
go func() {
defer cancel()
for {
@@ -497,13 +506,20 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
syncSend.Send(&rpc.MonitorResponse{Error: err.Error()})
return
}
if conf := msg.GetPortConfiguration(); conf != nil {
if conf := msg.GetUpdatedConfiguration(); conf != nil {
for _, c := range conf.GetSettings() {
if err := portProxy.Config(c.GetSettingId(), c.GetValue()); err != nil {
syncSend.Send(&rpc.MonitorResponse{Error: err.Error()})
}
}
}
if closeMsg := msg.GetClose(); closeMsg {
gracefulCloseInitiated.Store(true)
if err := portProxy.Close(); err != nil {
logrus.WithError(err).Debug("Error closing monitor port")
}
gracefulCloseCancel()
}
tx := msg.GetTxData()
for len(tx) > 0 {
n, err := portProxy.Write(tx)
@@ -519,8 +535,9 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
}
}()

// gRPC stream sender (monitor -> gRPC)
go func() {
defer cancel()
defer cancel() // unlock the receiver
buff := make([]byte, 4096)
for {
n, err := portProxy.Read(buff)
@@ -538,6 +555,11 @@ func (s *ArduinoCoreServerImpl) Monitor(stream rpc.ArduinoCoreService_MonitorSer
}()

<-cancelCtx.Done()
portProxy.Close()
if gracefulCloseInitiated.Load() {
// Port closing has been initiated in the receiver
<-gracefuleCloseCtx.Done()
} else {
portProxy.Close()
}
return nil
}
10 changes: 6 additions & 4 deletions commands/daemon/term_example/main.go
Original file line number Diff line number Diff line change
@@ -89,8 +89,10 @@ func connectToPort(cli commands.ArduinoCoreServiceClient, instance *commands.Ins
log.Fatal("Error opening Monitor:", err)
}
if err := monitorClient.Send(&commands.MonitorRequest{
Instance: instance,
Port: port,
Message: &commands.MonitorRequest_OpenRequest{OpenRequest: &commands.MonitorPortOpenRequest{
Instance: instance,
Port: port,
}},
}); err != nil {
log.Fatal("Error sending Monitor config:", err)
}
@@ -106,9 +108,9 @@ func connectToPort(cli commands.ArduinoCoreServiceClient, instance *commands.Ins
}
}()

hello := &commands.MonitorRequest{
hello := &commands.MonitorRequest{Message: &commands.MonitorRequest_TxData{
TxData: []byte("HELLO!"),
}
}}
fmt.Println("Send:", hello)
if err := monitorClient.Send(hello); err != nil {
log.Fatal("Monitor send HELLO:", err)
2 changes: 1 addition & 1 deletion commands/monitor/monitor.go
Original file line number Diff line number Diff line change
@@ -60,7 +60,7 @@ func (p *PortProxy) Close() error {

// Monitor opens a communication port. It returns a PortProxy to communicate with the port and a PortDescriptor
// that describes the available configuration settings.
func Monitor(ctx context.Context, req *rpc.MonitorRequest) (*PortProxy, *pluggableMonitor.PortDescriptor, error) {
func Monitor(ctx context.Context, req *rpc.MonitorPortOpenRequest) (*PortProxy, *pluggableMonitor.PortDescriptor, error) {
pme, release, err := instances.GetPackageManagerExplorer(req.GetInstance())
if err != nil {
return nil, nil, err
59 changes: 59 additions & 0 deletions docs/UPGRADING.md
Original file line number Diff line number Diff line change
@@ -8,6 +8,65 @@ Here you can find a list of migration guides to handle breaking changes between

We're dropping the `builtin.tools` support. It was the equivalent of Arduino IDE 1.x bundled tools directory.

### The gRPC `cc.arduino.cli.commands.v1.MonitorRequest` message has been changed.

Previously the `MonitorRequest` was a single message used to open the monitor, to stream data, and to change the port
configuration:

```proto
message MonitorRequest {
// Arduino Core Service instance from the `Init` response.
Instance instance = 1;
// Port to open, must be filled only on the first request
Port port = 2;
// The board FQBN we are trying to connect to. This is optional, and it's
// needed to disambiguate if more than one platform provides the pluggable
// monitor for a given port protocol.
string fqbn = 3;
// Data to send to the port
bytes tx_data = 4;
// Port configuration, optional, contains settings of the port to be applied
MonitorPortConfiguration port_configuration = 5;
}
```

Now the meaning of the fields has been clarified with the `oneof` clause, making it more explicit:

```proto
message MonitorRequest {
oneof message {
// Open request, it must be the first incoming message
MonitorPortOpenRequest open_request = 1;
// Data to send to the port
bytes tx_data = 2;
// Port configuration, contains settings of the port to be changed
MonitorPortConfiguration updated_configuration = 3;
// Close message, set to true to gracefully close a port (this ensure
// that the gRPC streaming call is closed by the daemon AFTER the port
// has been successfully closed)
bool close = 4;
}
}
message MonitorPortOpenRequest {
// Arduino Core Service instance from the `Init` response.
Instance instance = 1;
// Port to open, must be filled only on the first request
Port port = 2;
// The board FQBN we are trying to connect to. This is optional, and it's
// needed to disambiguate if more than one platform provides the pluggable
// monitor for a given port protocol.
string fqbn = 3;
// Port configuration, optional, contains settings of the port to be applied
MonitorPortConfiguration port_configuration = 4;
}
```

Now the message field `MonitorPortOpenRequest.open_request` must be sent in the first message after opening the
streaming gRPC call.

The identification number of the fields has been changed, this change is not binary compatible with old clients.

### Some golang modules from `github.com/arduino/arduino-cli/*` have been made private.

The following golang modules are no longer available as public API:
2 changes: 1 addition & 1 deletion internal/arduino/monitor/monitor.go
Original file line number Diff line number Diff line change
@@ -292,7 +292,7 @@ func (mon *PluggableMonitor) Close() error {
if err := mon.sendCommand("CLOSE\n"); err != nil {
return err
}
_, err := mon.waitMessage(time.Millisecond*250, "close")
_, err := mon.waitMessage(time.Millisecond*5000, "close")
return err
}

2 changes: 1 addition & 1 deletion internal/cli/monitor/monitor.go
Original file line number Diff line number Diff line change
@@ -203,7 +203,7 @@ func runMonitorCmd(
}
}
}
portProxy, _, err := monitor.Monitor(context.Background(), &rpc.MonitorRequest{
portProxy, _, err := monitor.Monitor(context.Background(), &rpc.MonitorPortOpenRequest{
Instance: inst,
Port: &rpc.Port{Address: portAddress, Protocol: portProtocol},
Fqbn: fqbn,
34 changes: 34 additions & 0 deletions internal/integrationtest/arduino-cli.go
Original file line number Diff line number Diff line change
@@ -119,6 +119,21 @@ func NewArduinoCliWithinEnvironment(env *Environment, config *ArduinoCLIConfig)
return cli
}

// CreateEnvForDaemon performs the minimum required operations to start the arduino-cli daemon.
// It returns a testsuite.Environment and an ArduinoCLI client to perform the integration tests.
// The Environment must be disposed by calling the CleanUp method via defer.
func CreateEnvForDaemon(t *testing.T) (*Environment, *ArduinoCLI) {
env := NewEnvironment(t)

cli := NewArduinoCliWithinEnvironment(env, &ArduinoCLIConfig{
ArduinoCLIPath: FindRepositoryRootPath(t).Join("arduino-cli"),
UseSharedStagingFolder: true,
})

_ = cli.StartDaemon(false)
return env, cli
}

// CleanUp closes the Arduino CLI client.
func (cli *ArduinoCLI) CleanUp() {
if cli.proc != nil {
@@ -596,3 +611,22 @@ func (inst *ArduinoCLIInstance) PlatformSearch(ctx context.Context, args string,
resp, err := inst.cli.daemonClient.PlatformSearch(ctx, req)
return resp, err
}

// Monitor calls the "Monitor" gRPC method and sends the OpenRequest message.
func (inst *ArduinoCLIInstance) Monitor(ctx context.Context, port *commands.Port) (commands.ArduinoCoreService_MonitorClient, error) {
req := &commands.MonitorRequest{}
logCallf(">>> Monitor(%+v)\n", req)
monitorClient, err := inst.cli.daemonClient.Monitor(ctx)
if err != nil {
return nil, err
}
err = monitorClient.Send(&commands.MonitorRequest{
Message: &commands.MonitorRequest_OpenRequest{
OpenRequest: &commands.MonitorPortOpenRequest{
Instance: inst.instance,
Port: port,
},
},
})
return monitorClient, err
}
3 changes: 2 additions & 1 deletion internal/integrationtest/daemon/daemon_concurrency_test.go
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@ import (
"testing"
"time"

"github.com/arduino/arduino-cli/internal/integrationtest"
"github.com/arduino/arduino-cli/rpc/cc/arduino/cli/commands/v1"
"github.com/arduino/go-paths-helper"
"github.com/stretchr/testify/require"
@@ -31,7 +32,7 @@ import (
func TestArduinoCliDaemonCompileWithLotOfOutput(t *testing.T) {
// See: https://github.com/arduino/arduino-cli/issues/2169

env, cli := createEnvForDaemon(t)
env, cli := integrationtest.CreateEnvForDaemon(t)
defer env.CleanUp()

_, _, err := cli.Run("core", "install", "arduino:avr")
37 changes: 11 additions & 26 deletions internal/integrationtest/daemon/daemon_test.go
Original file line number Diff line number Diff line change
@@ -37,7 +37,7 @@ import (
func TestArduinoCliDaemon(t *testing.T) {
// See: https://github.com/arduino/arduino-cli/pull/1804

env, cli := createEnvForDaemon(t)
env, cli := integrationtest.CreateEnvForDaemon(t)
defer env.CleanUp()

grpcInst := cli.Create()
@@ -96,7 +96,7 @@ func TestArduinoCliDaemon(t *testing.T) {
func TestDaemonAutoUpdateIndexOnFirstInit(t *testing.T) {
// https://github.com/arduino/arduino-cli/issues/1529

env, cli := createEnvForDaemon(t)
env, cli := integrationtest.CreateEnvForDaemon(t)
defer env.CleanUp()

grpcInst := cli.Create()
@@ -110,26 +110,11 @@ func TestDaemonAutoUpdateIndexOnFirstInit(t *testing.T) {
require.FileExists(t, cli.DataDir().Join("package_index.json").String())
}

// createEnvForDaemon performs the minimum required operations to start the arduino-cli daemon.
// It returns a testsuite.Environment and an ArduinoCLI client to perform the integration tests.
// The Environment must be disposed by calling the CleanUp method via defer.
func createEnvForDaemon(t *testing.T) (*integrationtest.Environment, *integrationtest.ArduinoCLI) {
env := integrationtest.NewEnvironment(t)

cli := integrationtest.NewArduinoCliWithinEnvironment(env, &integrationtest.ArduinoCLIConfig{
ArduinoCLIPath: integrationtest.FindRepositoryRootPath(t).Join("arduino-cli"),
UseSharedStagingFolder: true,
})

_ = cli.StartDaemon(false)
return env, cli
}

func TestDaemonCompileOptions(t *testing.T) {
// See: https://github.com/arduino/arduino-cli/issues/1614
// See: https://github.com/arduino/arduino-cli/pull/1820

env, cli := createEnvForDaemon(t)
env, cli := integrationtest.CreateEnvForDaemon(t)
defer env.CleanUp()

grpcInst := cli.Create()
@@ -203,7 +188,7 @@ func TestDaemonCompileOptions(t *testing.T) {
func TestDaemonCompileAfterFailedLibInstall(t *testing.T) {
// See: https://github.com/arduino/arduino-cli/issues/1812

env, cli := createEnvForDaemon(t)
env, cli := integrationtest.CreateEnvForDaemon(t)
defer env.CleanUp()

grpcInst := cli.Create()
@@ -233,7 +218,7 @@ func TestDaemonCompileAfterFailedLibInstall(t *testing.T) {
}

func TestDaemonCoreUpdateIndex(t *testing.T) {
env, cli := createEnvForDaemon(t)
env, cli := integrationtest.CreateEnvForDaemon(t)
defer env.CleanUp()

grpcInst := cli.Create()
@@ -269,7 +254,7 @@ func TestDaemonCoreUpdateIndex(t *testing.T) {
}

func TestDaemonBundleLibInstall(t *testing.T) {
env, cli := createEnvForDaemon(t)
env, cli := integrationtest.CreateEnvForDaemon(t)
defer env.CleanUp()

grpcInst := cli.Create()
@@ -409,7 +394,7 @@ func TestDaemonLibrariesRescanOnInstall(t *testing.T) {
with the gprc instance
The last attempt is expected to not raise an error
*/
env, cli := createEnvForDaemon(t)
env, cli := integrationtest.CreateEnvForDaemon(t)
defer env.CleanUp()

grpcInst := cli.Create()
@@ -465,7 +450,7 @@ func TestDaemonCoreUpgradePlatform(t *testing.T) {

t.Run("upgraded successfully with additional urls", func(t *testing.T) {
t.Run("and install.json is present", func(t *testing.T) {
env, cli := createEnvForDaemon(t)
env, cli := integrationtest.CreateEnvForDaemon(t)
defer env.CleanUp()

grpcInst := cli.Create()
@@ -481,7 +466,7 @@ func TestDaemonCoreUpgradePlatform(t *testing.T) {
require.False(t, platform.GetRelease().GetMissingMetadata()) // install.json is present
})
t.Run("and install.json is missing", func(t *testing.T) {
env, cli := createEnvForDaemon(t)
env, cli := integrationtest.CreateEnvForDaemon(t)
defer env.CleanUp()

grpcInst := cli.Create()
@@ -504,7 +489,7 @@ func TestDaemonCoreUpgradePlatform(t *testing.T) {

t.Run("upgrade failed", func(t *testing.T) {
t.Run("without additional URLs", func(t *testing.T) {
env, cli := createEnvForDaemon(t)
env, cli := integrationtest.CreateEnvForDaemon(t)
defer env.CleanUp()

grpcInst := cli.Create()
@@ -524,7 +509,7 @@ func TestDaemonCoreUpgradePlatform(t *testing.T) {
require.False(t, platform.GetRelease().GetMissingMetadata()) // install.json is present
})
t.Run("missing both additional URLs and install.json", func(t *testing.T) {
env, cli := createEnvForDaemon(t)
env, cli := integrationtest.CreateEnvForDaemon(t)
defer env.CleanUp()

grpcInst := cli.Create()
117 changes: 117 additions & 0 deletions internal/integrationtest/monitor/monitor_grpc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// This file is part of arduino-cli.
//
// Copyright 2023 ARDUINO SA (http://www.arduino.cc/)
//
// This software is released under the GNU General Public License version 3,
// which covers the main part of arduino-cli.
// The terms of this license can be found at:
// https://www.gnu.org/licenses/gpl-3.0.en.html
//
// You can be released from the requirements of the above licenses by purchasing
// a commercial license. Buying such a license is mandatory if you want to
// modify or otherwise use the software for commercial activities involving the
// Arduino software without disclosing the source code of your own applications.
// To purchase a commercial license, send an email to license@arduino.cc.

package monitor_test

import (
"context"
"errors"
"fmt"
"io"
"regexp"
"testing"
"time"

"github.com/arduino/arduino-cli/internal/integrationtest"
"github.com/arduino/arduino-cli/rpc/cc/arduino/cli/commands/v1"
"github.com/arduino/go-paths-helper"
"github.com/stretchr/testify/require"
)

func TestMonitorGRPCClose(t *testing.T) {
// See: https://github.com/arduino/arduino-cli/issues/2271

env, cli := integrationtest.CreateEnvForDaemon(t)
defer env.CleanUp()

_, _, err := cli.Run("core", "install", "arduino:avr@1.8.6")
require.NoError(t, err)

cli.InstallMockedSerialDiscovery(t)
cli.InstallMockedSerialMonitor(t)

grpcInst := cli.Create()
require.NoError(t, grpcInst.Init("", "", func(ir *commands.InitResponse) {
fmt.Printf("INIT> %v\n", ir.GetMessage())
}))

// Run a one-shot board list
boardListResp, err := grpcInst.BoardList(time.Second)
require.NoError(t, err)
ports := boardListResp.GetPorts()
require.NotEmpty(t, ports)
fmt.Printf("Got boardlist response with %d ports\n", len(ports))

// Open mocked serial-monitor and close it client-side
tmpFileMatcher := regexp.MustCompile("Tmpfile: (.*)\n")
{
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
mon, err := grpcInst.Monitor(ctx, ports[0].Port)
var tmpFile *paths.Path
for {
monResp, err := mon.Recv()
if err != nil {
fmt.Println("MON>", err)
break
}
fmt.Printf("MON> %v\n", monResp)
if rx := monResp.GetRxData(); rx != nil {
if matches := tmpFileMatcher.FindAllStringSubmatch(string(rx), -1); len(matches) > 0 {
fmt.Println("Found tmpFile", matches[0][1])
tmpFile = paths.New(matches[0][1])
}
}
}
require.NotNil(t, tmpFile)
// The port is close client-side, it may be still open server-side
require.True(t, tmpFile.Exist())
cancel()
require.NoError(t, err)
}

// Now close the monitor using MonitorRequest_Close
{
// Keep a timeout to allow the test to exit in any case
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
mon, err := grpcInst.Monitor(ctx, ports[0].Port)
var tmpFile *paths.Path
for {
monResp, err := mon.Recv()
if errors.Is(err, io.EOF) {
fmt.Println("MON>", err)
break
}

require.NoError(t, err)
fmt.Printf("MON> %v\n", monResp)
if rx := monResp.GetRxData(); rx != nil {
if matches := tmpFileMatcher.FindAllStringSubmatch(string(rx), -1); len(matches) > 0 {
fmt.Println("Found tmpFile", matches[0][1])
tmpFile = paths.New(matches[0][1])
go func() {
time.Sleep(time.Second)
fmt.Println("<MON Sent close command")
mon.Send(&commands.MonitorRequest{Message: &commands.MonitorRequest_Close{Close: true}})
}()
}
}
}
require.NotNil(t, tmpFile)
// The port is closed serverd-side, it must be already closed once the client has received the EOF
require.False(t, tmpFile.Exist())
cancel()
require.NoError(t, err)
}
}
15 changes: 15 additions & 0 deletions internal/mock_serial_monitor/main.go
Original file line number Diff line number Diff line change
@@ -24,7 +24,9 @@ import (
"os"
"slices"
"strings"
"time"

"github.com/arduino/go-paths-helper"
monitor "github.com/arduino/pluggable-monitor-protocol-handler"
)

@@ -41,6 +43,7 @@ type SerialMonitor struct {
mockedSerialPort io.ReadWriteCloser
serialSettings *monitor.PortDescriptor
openedPort bool
muxFile *paths.Path
}

// NewSerialMonitor will initialize and return a SerialMonitor
@@ -129,9 +132,16 @@ func (d *SerialMonitor) Open(boardPort string) (io.ReadWriter, error) {
d.openedPort = true
sideA, sideB := newBidirectionalPipe()
d.mockedSerialPort = sideA
if muxFile, err := paths.MkTempFile(nil, ""); err == nil {
d.muxFile = paths.NewFromFile(muxFile)
muxFile.Close()
}
go func() {
buff := make([]byte, 1024)
d.mockedSerialPort.Write([]byte("Opened port: " + boardPort + "\n"))
if d.muxFile != nil {
d.mockedSerialPort.Write([]byte("Tmpfile: " + d.muxFile.String() + "\n"))
}
for parameter, descriptor := range d.serialSettings.ConfigurationParameter {
d.mockedSerialPort.Write([]byte(
fmt.Sprintf("Configuration %s = %s\n", parameter, descriptor.Selected)))
@@ -186,6 +196,11 @@ func (d *SerialMonitor) Close() error {
}
d.mockedSerialPort.Close()
d.openedPort = false
if d.muxFile != nil {
time.Sleep(2000 * time.Millisecond) // Emulate a small delay closing the port to check gRPC synchronization
d.muxFile.Remove()
d.muxFile = nil
}
return nil
}

423 changes: 283 additions & 140 deletions rpc/cc/arduino/cli/commands/v1/monitor.pb.go

Large diffs are not rendered by default.

19 changes: 16 additions & 3 deletions rpc/cc/arduino/cli/commands/v1/monitor.proto
Original file line number Diff line number Diff line change
@@ -23,6 +23,21 @@ import "cc/arduino/cli/commands/v1/common.proto";
import "cc/arduino/cli/commands/v1/port.proto";

message MonitorRequest {
oneof message {
// Open request, it must be the first incoming message
MonitorPortOpenRequest open_request = 1;
// Data to send to the port
bytes tx_data = 2;
// Port configuration, contains settings of the port to be changed
MonitorPortConfiguration updated_configuration = 3;
// Close message, set to true to gracefully close a port (this ensure
// that the gRPC streaming call is closed by the daemon AFTER the port
// has been successfully closed)
bool close = 4;
}
}

message MonitorPortOpenRequest {
// Arduino Core Service instance from the `Init` response.
Instance instance = 1;
// Port to open, must be filled only on the first request
@@ -31,10 +46,8 @@ message MonitorRequest {
// needed to disambiguate if more than one platform provides the pluggable
// monitor for a given port protocol.
string fqbn = 3;
// Data to send to the port
bytes tx_data = 4;
// Port configuration, optional, contains settings of the port to be applied
MonitorPortConfiguration port_configuration = 5;
MonitorPortConfiguration port_configuration = 4;
}

message MonitorPortConfiguration {