Skip to content

Commit ce4644d

Browse files
mirkoCrobumirkoCrobu
andauthored
Implementing progress value for the start app command
Co-authored-by: mirkoCrobu <mirkocrobu@NB-0531.localdomain>
1 parent 489924a commit ce4644d

File tree

3 files changed

+284
-6
lines changed

3 files changed

+284
-6
lines changed
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
package orchestrator
2+
3+
import (
4+
"log/slog"
5+
"regexp"
6+
"strconv"
7+
"strings"
8+
"sync"
9+
)
10+
11+
// layerProgress keeps track of progress for a single layer.
12+
type layerProgress struct {
13+
total uint64
14+
current uint64
15+
}
16+
17+
// DockerProgressParser parses Docker pull logs to extract and aggregate progress information.
18+
type DockerProgressParser struct {
19+
mu sync.Mutex
20+
layers map[string]*layerProgress
21+
22+
progressRegex *regexp.Regexp
23+
24+
history []uint64 // Memorize the history of progress values
25+
historySize int // the maximum size of the history
26+
smoothedProgress uint64
27+
}
28+
29+
// NewDockerProgressParser creates a new DockerProgressParser instance.
30+
func NewDockerProgressParser(historySize int) *DockerProgressParser {
31+
regex := regexp.MustCompile(`^\s*([a-f0-9]{12})\s+(Downloading|Extracting)\s+\[.*\]\s+([\d.]+[kKmMgG]?[bB])\/([\d.]+[kKmMgG]?[bB])`)
32+
33+
return &DockerProgressParser{
34+
layers: make(map[string]*layerProgress),
35+
progressRegex: regex,
36+
history: make([]uint64, 0, historySize),
37+
historySize: historySize,
38+
}
39+
}
40+
41+
// returns the overall progress percentage (0 to 100) and a boolean indicating if parsing was successful.
42+
func (p *DockerProgressParser) Parse(logLine string) (uint64, bool) {
43+
44+
info, ok := parseProgressLine(logLine, p.progressRegex)
45+
if !ok {
46+
return 0, false
47+
}
48+
49+
p.mu.Lock()
50+
defer p.mu.Unlock()
51+
52+
if _, exists := p.layers[info.layerID]; !exists {
53+
p.layers[info.layerID] = &layerProgress{}
54+
}
55+
56+
p.layers[info.layerID].current = info.currentBytes
57+
p.layers[info.layerID].total = info.totalBytes
58+
59+
rawPercentage := calculateTotalProgress(p.layers)
60+
61+
p.history = append(p.history, rawPercentage)
62+
63+
// if the history exceeds the maximum size, remove the oldest entry
64+
if len(p.history) > p.historySize {
65+
p.history = append(make([]uint64, 0, p.historySize), p.history[1:]...)
66+
}
67+
68+
// calculate the smoothed progress as the average of the history
69+
var sum float64
70+
for _, v := range p.history {
71+
sum += float64(v)
72+
}
73+
currentSmoothedProgress := sum / float64(len(p.history))
74+
newSmoothedIntProgress := uint64(currentSmoothedProgress)
75+
76+
if newSmoothedIntProgress > p.smoothedProgress {
77+
p.smoothedProgress = newSmoothedIntProgress
78+
return p.smoothedProgress, true
79+
}
80+
81+
return 0, false
82+
}
83+
84+
type parsedProgressInfo struct {
85+
layerID string
86+
currentBytes uint64
87+
totalBytes uint64
88+
}
89+
90+
func parseProgressLine(logLine string, regex *regexp.Regexp) (*parsedProgressInfo, bool) {
91+
matches := regex.FindStringSubmatch(logLine)
92+
if len(matches) != 5 {
93+
return nil, false
94+
}
95+
96+
currentBytes, err := parseBytes(matches[3])
97+
if err != nil {
98+
slog.Warn("Could not retrieve currentBytes from docker progress line", "line", logLine, "error", err)
99+
return nil, false
100+
}
101+
102+
totalBytes, err := parseBytes(matches[4])
103+
if err != nil {
104+
slog.Warn("Could not retrieve totalBytes from docker progress line", "line", logLine, "error", err)
105+
return nil, false
106+
}
107+
108+
return &parsedProgressInfo{
109+
layerID: matches[1],
110+
currentBytes: currentBytes,
111+
totalBytes: totalBytes,
112+
}, true
113+
}
114+
115+
func calculateTotalProgress(layers map[string]*layerProgress) uint64 {
116+
var totalCurrent, grandTotal uint64
117+
118+
for _, progress := range layers {
119+
totalCurrent += progress.current
120+
grandTotal += progress.total
121+
}
122+
123+
if grandTotal == 0 {
124+
return 0
125+
}
126+
127+
return uint64((float64(totalCurrent) / float64(grandTotal)) * 100)
128+
}
129+
130+
func parseBytes(s string) (uint64, error) {
131+
s = strings.ToLower(strings.TrimSpace(s))
132+
133+
unit := s[len(s)-2:]
134+
valueStr := s[:len(s)-2]
135+
136+
var multiplier float64 = 1
137+
switch unit {
138+
case "kb":
139+
multiplier = 1024
140+
case "mb":
141+
multiplier = 1024 * 1024
142+
case "gb":
143+
multiplier = 1024 * 1024 * 1024
144+
default:
145+
unit = s[len(s)-1:]
146+
valueStr = s[:len(s)-1]
147+
if unit != "b" {
148+
valueStr = s
149+
}
150+
}
151+
152+
value, err := strconv.ParseFloat(valueStr, 64)
153+
if err != nil {
154+
return 0, err
155+
}
156+
157+
totalBytesFloat := value * multiplier
158+
return uint64(totalBytesFloat), nil
159+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package orchestrator
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
)
8+
9+
func TestDockerProgressParser_Parse(t *testing.T) {
10+
11+
testCases := []struct {
12+
name string
13+
historySize int
14+
logLines []string
15+
expectedProgress []uint64
16+
finalState map[string]*layerProgress
17+
}{
18+
{
19+
name: "Single layer download should report increasing progress",
20+
historySize: 3,
21+
logLines: []string{
22+
" e756f3fdd6a3 Downloading [===> ] 1.5MB/10.0MB", // 1.5/10 = 15% history: [15.0] (15.0) / historySize = 15.0 / 1 = 15
23+
" e756f3fdd6a3 Downloading [=========> ] 5.0MB/10.0MB", // 5.0/10 = 50% history: [15.0, 50.0] (15.0 + 50.0) / historySize = 65.0 / 2 = 32.5
24+
" irrelevant log",
25+
" e756f3fdd6a3 Downloading [===================> ] 9.0MB/10.0MB", // 9.0/10 = 90% history: [15.0, 50, 90.0] (15.0 + 50.0 + 90.0) / historySize = 155.0.0 / 3 = 51
26+
" e756f3fdd6a3 Downloading [=====================> ] 10.0MB/10.0MB", // 10/10 = 100% history: [50, 90.0, 100.0] (50.0 + 90.0 +100.0) / historySize = 240.0 / 3 = 80
27+
},
28+
expectedProgress: []uint64{15, 32, 51, 80},
29+
},
30+
{
31+
name: "Two parallel downloads should aggregate progress correctly",
32+
historySize: 2,
33+
logLines: []string{
34+
" e756f3fdd6a3 Downloading [====> ] 5.0MB/10.0MB",
35+
" a555d2abb4b2 Downloading [> ] 2.0MB/40.0MB",
36+
" e756f3fdd6a3 Downloading [=====>] 10.0MB/10.0MB",
37+
" a555d2abb4b2 Downloading [=====>] 20.0MB/40.0MB",
38+
" a555d2abb4b2 Downloading [=====>] 40.0MB/40.0MB",
39+
}, // just 2 results instead of 5 because we avoit to returs equal values: if [1] = 50% and [2] = 50%, we retun nil second time
40+
expectedProgress: []uint64{50, 80},
41+
},
42+
{
43+
name: "Irrelevant lines should be ignored",
44+
historySize: 5,
45+
logLines: []string{
46+
" main Pulling",
47+
" e756f3fdd6a3 Pulling fs layer",
48+
" e756f3fdd6a3 Waiting",
49+
" e756f3fdd6a3 Verifying Checksum",
50+
" e756f3fdd6a3 Download complete",
51+
" e756f3fdd6a3 Extracting [> ] 1.0MB/10.0MB", // only one good line
52+
},
53+
expectedProgress: []uint64{10},
54+
},
55+
{
56+
name: "Parser should handle different byte units not showing the last",
57+
historySize: 1,
58+
logLines: []string{
59+
" e756f3fdd6a3 Downloading [> ] 512.0kB/2.0MB", // 25%
60+
" e756f3fdd6a3 Downloading [> ] 1.5MB/2.0MB", // 75%
61+
" a555d2abb4b2 Downloading [> ] 1.0GB/2.0GB", // Raw: (1.5MB+1GB)/(2MB+2GB) ~= 50%
62+
}, // the last one, 50%, is lower than the previous one 75%. So we don't return it and we keep 75%
63+
expectedProgress: []uint64{25, 75},
64+
},
65+
{
66+
name: "Parser should handle different byte units showing the last",
67+
historySize: 1,
68+
logLines: []string{
69+
" e756f3fdd6a3 Downloading [> ] 512.0kB/2.0MB", // 25%
70+
" e756f3fdd6a3 Downloading [> ] 1.5MB/2.0MB", // 75%
71+
" a555d2abb4b2 Downloading [> ] 2.0GB/2.0GB", // 100%
72+
},
73+
expectedProgress: []uint64{25, 75, 99},
74+
},
75+
}
76+
77+
for _, tc := range testCases {
78+
t.Run(tc.name, func(t *testing.T) {
79+
parser := NewDockerProgressParser(tc.historySize)
80+
81+
var reportedProgress []uint64
82+
83+
for _, line := range tc.logLines {
84+
if progress, ok := parser.Parse(line); ok {
85+
reportedProgress = append(reportedProgress, progress)
86+
}
87+
}
88+
89+
assert.Equal(t, tc.expectedProgress, reportedProgress, "The reported progress sequence should match the expected one")
90+
})
91+
}
92+
}

internal/orchestrator/orchestrator.go

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -131,30 +131,44 @@ func StartApp(
131131
return
132132
}
133133
})
134-
134+
if !yield(StreamMessage{progress: &Progress{Name: "preparing", Progress: 0.0}}) {
135+
return
136+
}
135137
if app.MainSketchPath != nil {
136-
if !yield(StreamMessage{data: "Compiling and uploading the sketch..."}) {
137-
cancel()
138+
if !yield(StreamMessage{progress: &Progress{Name: "sketch compiling and uploading", Progress: 0.0}}) {
138139
return
139140
}
140141
if err := compileUploadSketch(ctx, &app, sketchCallbackWriter, cfg); err != nil {
141142
yield(StreamMessage{error: err})
142143
return
143144
}
145+
if !yield(StreamMessage{progress: &Progress{Name: "sketch updated", Progress: 10.0}}) {
146+
return
147+
}
144148
}
149+
145150
if app.MainPythonFile != nil {
146151
envs := getAppEnvironmentVariables(app, bricksIndex, modelsIndex)
147152

148-
if !yield(StreamMessage{data: "Provisioning app..."}) {
153+
if !yield(StreamMessage{data: "python provisioning"}) {
149154
cancel()
150155
return
151156
}
157+
provisionStartProgress := float32(0.0)
158+
if app.MainSketchPath != nil {
159+
provisionStartProgress = 10.0
160+
}
161+
162+
if !yield(StreamMessage{progress: &Progress{Name: "python provisioning", Progress: provisionStartProgress}}) {
163+
return
164+
}
152165

153166
if err := provisioner.App(ctx, bricksIndex, &app, cfg, envs, staticStore); err != nil {
154167
yield(StreamMessage{error: err})
155168
return
156169
}
157-
if !yield(StreamMessage{data: "Starting app..."}) {
170+
171+
if !yield(StreamMessage{data: "python downloading"}) {
158172
cancel()
159173
return
160174
}
@@ -169,14 +183,26 @@ func StartApp(
169183
}
170184
commands = append(commands, "up", "-d", "--remove-orphans", "--pull", "missing")
171185

186+
dockerParser := NewDockerProgressParser(200)
187+
172188
var customError error
173189
callbackDockerWriter := NewCallbackWriter(func(line string) {
174190
// docker compose sometimes returns errors as info lines, we try to parse them here and return a proper error
191+
175192
if e := GetCustomErrorFomDockerEvent(line); e != nil {
176193
customError = e
177194
}
195+
if percentage, ok := dockerParser.Parse(line); ok {
196+
197+
// assumption: docker pull progress goes from 0 to 80% of the total app start progress
198+
totalProgress := 20.0 + (percentage/100.0)*80.0
178199

179-
if !yield(StreamMessage{data: line}) {
200+
if !yield(StreamMessage{progress: &Progress{Name: "python starting", Progress: float32(totalProgress)}}) {
201+
cancel()
202+
return
203+
}
204+
return
205+
} else if !yield(StreamMessage{data: line}) {
180206
cancel()
181207
return
182208
}
@@ -194,6 +220,7 @@ func StartApp(
194220
if customError != nil {
195221
err = customError
196222
}
223+
197224
yield(StreamMessage{error: err})
198225
return
199226
}

0 commit comments

Comments
 (0)