Skip to content

Commit 05566a2

Browse files
dido18Copilotlucarin91
authored
refact(orchestrator): make the apt upgrade as a package that handle the events
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Luca Rinaldi <lucarin@protonmail.com>
1 parent e708e75 commit 05566a2

File tree

8 files changed

+350
-247
lines changed

8 files changed

+350
-247
lines changed

cmd/arduino-app-cli/daemon.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import (
88
"time"
99

1010
"github.com/arduino/arduino-app-cli/internal/api"
11-
"github.com/arduino/arduino-app-cli/internal/api/handlers"
11+
"github.com/arduino/arduino-app-cli/internal/apt"
1212
"github.com/arduino/arduino-app-cli/internal/orchestrator"
1313
"github.com/arduino/arduino-app-cli/pkg/httprecover"
1414

@@ -44,7 +44,7 @@ func newDaemonCmd(docker *dockerClient.Client) *cobra.Command {
4444
func httpHandler(ctx context.Context, dockerClient *dockerClient.Client, daemonPort string) {
4545
slog.Info("Starting HTTP server", slog.String("address", ":"+daemonPort))
4646

47-
apiSrv := api.NewHTTPRouter(dockerClient, Version, handlers.NewUpdateEventsBroker())
47+
apiSrv := api.NewHTTPRouter(dockerClient, Version, apt.New())
4848

4949
corsMiddlware, err := cors.NewMiddleware(
5050
cors.Config{

internal/api/api.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,25 @@ import (
55
"net/http"
66

77
"github.com/arduino/arduino-app-cli/internal/api/handlers"
8+
"github.com/arduino/arduino-app-cli/internal/apt"
89

910
dockerClient "github.com/docker/docker/client"
1011
)
1112

1213
//go:embed docs
1314
var docsFS embed.FS
1415

15-
func NewHTTPRouter(dockerClient *dockerClient.Client, version string, eventsBroker *handlers.UpdateEventsBroker) http.Handler {
16+
func NewHTTPRouter(dockerClient *dockerClient.Client, version string, aptClient *apt.Service) http.Handler {
1617
mux := http.NewServeMux()
1718

1819
mux.Handle("GET /v1/version", handlers.HandlerVersion(version))
1920
mux.Handle("GET /v1/config", handlers.HandleConfig())
2021
mux.Handle("GET /v1/bricks", handlers.HandleBrickList())
2122
mux.Handle("GET /v1/bricks/{brickID}", handlers.HandleBrickDetails())
22-
mux.Handle("GET /v1/system/update/check", handlers.HandleCheckUpgradable())
23-
mux.Handle("GET /v1/system/update/events", handlers.HandleUpdateEvents(eventsBroker))
24-
mux.Handle("PUT /v1/system/update/apply", handlers.HandleUpdateApply(eventsBroker))
23+
24+
mux.Handle("GET /v1/system/update/check", handlers.HandleCheckUpgradable(aptClient))
25+
mux.Handle("GET /v1/system/update/events", handlers.HandleUpdateEvents(aptClient))
26+
mux.Handle("PUT /v1/system/update/apply", handlers.HandleUpdateApply(aptClient))
2527

2628
mux.Handle("GET /v1/models", handlers.HandleModelsList())
2729
mux.Handle("GET /v1/models/{modelID}", handlers.HandlerModelByID())

internal/api/handlers/events.go

Lines changed: 0 additions & 62 deletions
This file was deleted.

internal/api/handlers/update.go

Lines changed: 47 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,27 @@
11
package handlers
22

33
import (
4+
"errors"
45
"net/http"
56
"strings"
6-
"sync/atomic"
77

88
"log/slog"
99

1010
"go.bug.st/f"
1111

12-
"github.com/arduino/arduino-app-cli/internal/orchestrator"
12+
"github.com/arduino/arduino-app-cli/internal/apt"
1313
"github.com/arduino/arduino-app-cli/pkg/render"
1414
)
1515

16-
var matchArduinoPackage = func(p orchestrator.UpgradablePackage) bool {
16+
var matchArduinoPackage = func(p apt.UpgradablePackage) bool {
1717
return strings.HasPrefix(p.Name, "arduino-")
1818
}
1919

20-
var matchAllPackages = func(p orchestrator.UpgradablePackage) bool {
20+
var matchAllPackages = func(p apt.UpgradablePackage) bool {
2121
return true
2222
}
2323

24-
func HandleCheckUpgradable() http.HandlerFunc {
24+
func HandleCheckUpgradable(aptClient *apt.Service) http.HandlerFunc {
2525
return func(w http.ResponseWriter, r *http.Request) {
2626
queryParams := r.URL.Query()
2727

@@ -35,8 +35,12 @@ func HandleCheckUpgradable() http.HandlerFunc {
3535
filterFunc = matchArduinoPackage
3636
}
3737

38-
pkgs, err := orchestrator.GetUpgradablePackages(r.Context(), filterFunc)
38+
pkgs, err := aptClient.ListUpgradablePackages(r.Context(), filterFunc)
3939
if err != nil {
40+
if errors.Is(err, apt.ErrOperationAlreadyInProgress) {
41+
render.EncodeResponse(w, http.StatusConflict, err.Error())
42+
return
43+
}
4044
render.EncodeResponse(w, http.StatusBadRequest, "Error checking for upgradable packages: "+err.Error())
4145
return
4246
}
@@ -53,25 +57,11 @@ func HandleCheckUpgradable() http.HandlerFunc {
5357
}
5458

5559
type UpdateCheckResult struct {
56-
Packages []orchestrator.UpgradablePackage `json:"packages"`
60+
Packages []apt.UpgradablePackage `json:"packages"`
5761
}
5862

59-
var inProgress atomic.Bool
60-
61-
func HandleUpdateApply(eventsBroker *UpdateEventsBroker) http.HandlerFunc {
63+
func HandleUpdateApply(aptClient *apt.Service) http.HandlerFunc {
6264
return func(w http.ResponseWriter, r *http.Request) {
63-
// Check if an upgrade is already in progress
64-
if inProgress.Load() {
65-
render.EncodeResponse(w, http.StatusConflict, "Upgrade already in progress")
66-
return
67-
}
68-
69-
// Set upgrade in progress
70-
if !inProgress.CompareAndSwap(false, true) {
71-
render.EncodeResponse(w, http.StatusConflict, "Upgrade already in progress")
72-
return
73-
}
74-
7565
queryParams := r.URL.Query()
7666
onlyArduinoPackages := false
7767
if val := queryParams.Get("only-arduino"); val != "" {
@@ -83,8 +73,12 @@ func HandleUpdateApply(eventsBroker *UpdateEventsBroker) http.HandlerFunc {
8373
filterFunc = matchArduinoPackage
8474
}
8575

86-
pkgs, err := orchestrator.GetUpgradablePackages(r.Context(), filterFunc)
76+
pkgs, err := aptClient.ListUpgradablePackages(r.Context(), filterFunc)
8777
if err != nil {
78+
if errors.Is(err, apt.ErrOperationAlreadyInProgress) {
79+
render.EncodeResponse(w, http.StatusConflict, err.Error())
80+
return
81+
}
8882
slog.Error("Unable to get upgradable packages", slog.String("error", err.Error()))
8983
render.EncodeResponse(w, http.StatusInternalServerError, "Error checking for upgradable packages")
9084
return
@@ -95,41 +89,25 @@ func HandleUpdateApply(eventsBroker *UpdateEventsBroker) http.HandlerFunc {
9589
return
9690
}
9791

98-
go func() {
99-
defer inProgress.Store(false)
100-
101-
names := f.Map(pkgs, func(p orchestrator.UpgradablePackage) string {
102-
return p.Name
103-
})
104-
105-
eventsBroker.PublishLog("Upgrading: " + strings.Join(names, ", "))
106-
107-
iter, err := orchestrator.RunUpgradeCommand(r.Context(), names)
108-
if err != nil {
109-
slog.Error("Error running upgrade command", slog.String("error", err.Error()))
110-
eventsBroker.PublishError(render.SSEErrorData{Message: "failed to upgrade the packages"})
111-
return
112-
}
113-
114-
for item := range iter {
115-
eventsBroker.PublishLog(item)
116-
}
117-
118-
eventsBroker.Restarting()
92+
names := f.Map(pkgs, func(p apt.UpgradablePackage) string {
93+
return p.Name
94+
})
11995

120-
err = orchestrator.RestartServices(r.Context())
121-
if err != nil {
122-
slog.Error("Error restarting services", slog.String("error", err.Error()))
123-
eventsBroker.PublishError(render.SSEErrorData{Message: "failed to restart services"})
96+
err = aptClient.UpgradePackages(names)
97+
if err != nil {
98+
if errors.Is(err, apt.ErrOperationAlreadyInProgress) {
99+
render.EncodeResponse(w, http.StatusConflict, err.Error())
124100
return
125101
}
126-
}()
102+
render.EncodeResponse(w, http.StatusInternalServerError, "Error upgrading packages")
103+
return
104+
}
127105

128106
render.EncodeResponse(w, http.StatusAccepted, "Upgrade started")
129107
}
130108
}
131109

132-
func HandleUpdateEvents(eventsBroker *UpdateEventsBroker) http.HandlerFunc {
110+
func HandleUpdateEvents(aptClient *apt.Service) http.HandlerFunc {
133111
return func(w http.ResponseWriter, r *http.Request) {
134112
sseStream, err := render.NewSSEStream(r.Context(), w)
135113
if err != nil {
@@ -139,13 +117,28 @@ func HandleUpdateEvents(eventsBroker *UpdateEventsBroker) http.HandlerFunc {
139117
}
140118
defer sseStream.Close()
141119

142-
ch := eventsBroker.Subscribe()
143-
defer eventsBroker.Unsubscribe(ch)
120+
ch := aptClient.Subscribe()
121+
defer aptClient.Unsubscribe(ch)
144122

145123
for {
146124
select {
147-
case event := <-ch:
148-
sseStream.Send(event)
125+
case event, ok := <-ch:
126+
if !ok {
127+
slog.Info("APT event channel closed, stopping SSE stream")
128+
return
129+
}
130+
if event.Type == apt.ErrorEvent {
131+
sseStream.SendError(render.SSEErrorData{
132+
Code: render.InternalServiceErr,
133+
Message: event.Data,
134+
})
135+
} else {
136+
sseStream.Send(render.SSEEvent{
137+
Type: event.Type.String(),
138+
Data: event.Data,
139+
})
140+
}
141+
149142
case <-r.Context().Done():
150143
return
151144
}

internal/apt/event.go

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
package apt
2+
3+
// EventType defines the type of upgrade event.
4+
type EventType int
5+
6+
const (
7+
UpgradeLineEvent EventType = iota
8+
StartEvent
9+
RestartEvent
10+
ErrorEvent
11+
)
12+
13+
// Event represents a single event in the upgrade process.
14+
type Event struct {
15+
Type EventType
16+
Data string
17+
Err error // Optional error field for error events
18+
}
19+
20+
func (t EventType) String() string {
21+
switch t {
22+
case UpgradeLineEvent:
23+
return "log"
24+
case RestartEvent:
25+
return "restarting"
26+
case StartEvent:
27+
return "starting"
28+
case ErrorEvent:
29+
return "error"
30+
default:
31+
panic("unreachable")
32+
}
33+
}

0 commit comments

Comments
 (0)