From fae7972cf16150e97c7d33d9077604f662e92ea7 Mon Sep 17 00:00:00 2001 From: Martino Facchin Date: Fri, 23 Aug 2019 14:47:05 +0200 Subject: [PATCH 1/4] Add parallel jobs field to proto and rerun protoc --- rpc/commands/compile.pb.go | 59 ++++++++++++++++++++++---------------- rpc/commands/compile.proto | 1 + 2 files changed, 35 insertions(+), 25 deletions(-) diff --git a/rpc/commands/compile.pb.go b/rpc/commands/compile.pb.go index 74d0f0853cf..acfb56d8f87 100644 --- a/rpc/commands/compile.pb.go +++ b/rpc/commands/compile.pb.go @@ -34,6 +34,7 @@ type CompileReq struct { Quiet bool `protobuf:"varint,11,opt,name=quiet,proto3" json:"quiet,omitempty"` VidPid string `protobuf:"bytes,12,opt,name=vidPid,proto3" json:"vidPid,omitempty"` ExportFile string `protobuf:"bytes,13,opt,name=exportFile,proto3" json:"exportFile,omitempty"` + Jobs int32 `protobuf:"varint,14,opt,name=jobs,proto3" json:"jobs,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` @@ -155,6 +156,13 @@ func (m *CompileReq) GetExportFile() string { return "" } +func (m *CompileReq) GetJobs() int32 { + if m != nil { + return m.Jobs + } + return 0 +} + type CompileResp struct { OutStream []byte `protobuf:"bytes,1,opt,name=out_stream,json=outStream,proto3" json:"out_stream,omitempty"` ErrStream []byte `protobuf:"bytes,2,opt,name=err_stream,json=errStream,proto3" json:"err_stream,omitempty"` @@ -210,29 +218,30 @@ func init() { func init() { proto.RegisterFile("commands/compile.proto", fileDescriptor_86bc582849c76c3d) } var fileDescriptor_86bc582849c76c3d = []byte{ - // 378 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x92, 0x4f, 0x8b, 0xdb, 0x30, - 0x14, 0xc4, 0x71, 0xfe, 0xda, 0x2f, 0x69, 0x0b, 0xa2, 0x4d, 0x45, 0x68, 0x8b, 0x9b, 0x43, 0x31, - 0x94, 0xd8, 0xd0, 0x9e, 0x7b, 0x69, 0xa0, 0x50, 0x7a, 0x09, 0xee, 0x6d, 0x2f, 0x8b, 0x2d, 0xbf, - 0x8d, 0xc5, 0xda, 0x92, 0x22, 0xc9, 0xc9, 0x7e, 0xb7, 0xfd, 0x72, 0x8b, 0xe5, 0x38, 0x09, 0x81, - 0x3d, 0xd9, 0xef, 0x37, 0xa3, 0x91, 0xec, 0x11, 0x2c, 0x98, 0xac, 0xeb, 0x4c, 0x14, 0x26, 0x61, - 0xb2, 0x56, 0xbc, 0xc2, 0x58, 0x69, 0x69, 0x25, 0xf9, 0xc8, 0x58, 0x9c, 0xe9, 0xa2, 0xe1, 0x42, - 0xc6, 0xac, 0xe2, 0x71, 0x6f, 0x5b, 0x7e, 0xb8, 0x5e, 0x50, 0x4b, 0xd1, 0xf9, 0x57, 0xcf, 0x43, - 0x80, 0x4d, 0x97, 0x90, 0xe2, 0x9e, 0xfc, 0x02, 0x9f, 0x0b, 0x63, 0x33, 0xc1, 0x90, 0x7a, 0xa1, - 0x17, 0xcd, 0x7e, 0x7c, 0x8d, 0x5f, 0x49, 0x8c, 0xff, 0x9e, 0x8c, 0xe9, 0x79, 0x09, 0x21, 0x30, - 0x7a, 0xd8, 0xe7, 0x82, 0x0e, 0x42, 0x2f, 0x0a, 0x52, 0xf7, 0x4e, 0xbe, 0x00, 0x98, 0x47, 0xb4, - 0xac, 0xdc, 0x66, 0xb6, 0xa4, 0x43, 0xa7, 0x5c, 0x11, 0xf2, 0x0d, 0xde, 0x9a, 0x52, 0x1e, 0xb7, - 0x5a, 0x2a, 0xd4, 0x96, 0xa3, 0xa1, 0xa3, 0xd0, 0x8b, 0xfc, 0xf4, 0x86, 0xb6, 0x39, 0x4a, 0xa3, - 0xd2, 0x92, 0xa1, 0x31, 0x74, 0xec, 0x3c, 0x57, 0xa4, 0xcd, 0xc9, 0x1b, 0x5e, 0x15, 0x9b, 0x8c, - 0x95, 0xe8, 0xf6, 0x9a, 0xb8, 0xbd, 0x6e, 0x28, 0xf9, 0x04, 0x81, 0x23, 0xce, 0x32, 0x75, 0x96, - 0x0b, 0x20, 0x11, 0xbc, 0xeb, 0x86, 0xcb, 0x71, 0xfc, 0x70, 0x18, 0x05, 0xe9, 0x2d, 0x26, 0x4b, - 0xf0, 0x8f, 0x99, 0x16, 0x5c, 0xec, 0x0c, 0x0d, 0x5c, 0xcc, 0x79, 0x26, 0x14, 0xa6, 0x07, 0xd4, - 0xb9, 0x34, 0x48, 0xc1, 0x1d, 0xb4, 0x1f, 0xc9, 0x7b, 0x18, 0xef, 0x1b, 0x8e, 0x96, 0xce, 0x1c, - 0xef, 0x06, 0xb2, 0x80, 0xc9, 0x81, 0x17, 0x5b, 0x5e, 0xd0, 0xb9, 0x4b, 0x3a, 0x4d, 0xed, 0x37, - 0xe3, 0x93, 0x92, 0xda, 0xfe, 0xe1, 0x15, 0xd2, 0x37, 0xdd, 0xbf, 0xbb, 0x90, 0xd5, 0x3f, 0x98, - 0x9d, 0xcb, 0x33, 0x8a, 0x7c, 0x06, 0x90, 0x8d, 0xbd, 0x37, 0x56, 0x63, 0x56, 0xbb, 0xfe, 0xe6, - 0x69, 0x20, 0x1b, 0xfb, 0xdf, 0x81, 0x56, 0x46, 0xad, 0x7b, 0x79, 0xd0, 0xc9, 0xa8, 0x75, 0x27, - 0xff, 0x5e, 0xdf, 0x7d, 0xdf, 0x71, 0x5b, 0x36, 0x79, 0x5b, 0x71, 0x72, 0xaa, 0xbc, 0x7f, 0xae, - 0x59, 0xc5, 0x13, 0xad, 0x58, 0xd2, 0xd7, 0x9f, 0x4f, 0xdc, 0x05, 0xfa, 0xf9, 0x12, 0x00, 0x00, - 0xff, 0xff, 0xd2, 0xee, 0xb7, 0xd0, 0x8a, 0x02, 0x00, 0x00, + // 392 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x74, 0x92, 0x41, 0x8f, 0xd3, 0x30, + 0x10, 0x85, 0x95, 0xed, 0xb6, 0x9b, 0x4c, 0x97, 0x45, 0xb2, 0x60, 0xb1, 0x2a, 0x40, 0xa1, 0x07, + 0x14, 0x09, 0x35, 0x91, 0xe0, 0xcc, 0x85, 0x4a, 0x48, 0x88, 0x4b, 0x15, 0x6e, 0x5c, 0x50, 0xe2, + 0x0c, 0x8d, 0x21, 0xb1, 0x5d, 0xdb, 0x69, 0xf9, 0x97, 0xfc, 0x25, 0x94, 0x49, 0xd3, 0x56, 0x95, + 0x38, 0xc5, 0xf3, 0xbd, 0xe7, 0x67, 0xc7, 0x33, 0xf0, 0x28, 0x74, 0xdb, 0x16, 0xaa, 0x72, 0x99, + 0xd0, 0xad, 0x91, 0x0d, 0xa6, 0xc6, 0x6a, 0xaf, 0xd9, 0x0b, 0x21, 0xd2, 0xc2, 0x56, 0x9d, 0x54, + 0x3a, 0x15, 0x8d, 0x4c, 0x47, 0xdb, 0xe2, 0xf9, 0xe5, 0x86, 0x56, 0xab, 0xc1, 0xbf, 0xfc, 0x3b, + 0x01, 0x58, 0x0f, 0x09, 0x39, 0xee, 0xd8, 0x47, 0x08, 0xa5, 0x72, 0xbe, 0x50, 0x02, 0x79, 0x10, + 0x07, 0xc9, 0xfc, 0xfd, 0x9b, 0xf4, 0x3f, 0x89, 0xe9, 0x97, 0xa3, 0x31, 0x3f, 0x6d, 0x61, 0x0c, + 0x6e, 0x7f, 0xee, 0x4a, 0xc5, 0x6f, 0xe2, 0x20, 0x89, 0x72, 0x5a, 0xb3, 0xd7, 0x00, 0xee, 0x37, + 0x7a, 0x51, 0x6f, 0x0a, 0x5f, 0xf3, 0x09, 0x29, 0x17, 0x84, 0xbd, 0x85, 0x07, 0x57, 0xeb, 0xc3, + 0xc6, 0x6a, 0x83, 0xd6, 0x4b, 0x74, 0xfc, 0x36, 0x0e, 0x92, 0x30, 0xbf, 0xa2, 0x7d, 0x8e, 0xb1, + 0x68, 0xac, 0x16, 0xe8, 0x1c, 0x9f, 0x92, 0xe7, 0x82, 0xf4, 0x39, 0x65, 0x27, 0x9b, 0x6a, 0x5d, + 0x88, 0x1a, 0xe9, 0xac, 0x19, 0x9d, 0x75, 0x45, 0xd9, 0x4b, 0x88, 0x88, 0x90, 0xe5, 0x8e, 0x2c, + 0x67, 0xc0, 0x12, 0x78, 0x3a, 0x14, 0xe7, 0xeb, 0x84, 0xf1, 0x24, 0x89, 0xf2, 0x6b, 0xcc, 0x16, + 0x10, 0x1e, 0x0a, 0xab, 0xa4, 0xda, 0x3a, 0x1e, 0x51, 0xcc, 0xa9, 0x66, 0x1c, 0xee, 0xf6, 0x68, + 0x4b, 0xed, 0x90, 0x03, 0x5d, 0x74, 0x2c, 0xd9, 0x33, 0x98, 0xee, 0x3a, 0x89, 0x9e, 0xcf, 0x89, + 0x0f, 0x05, 0x7b, 0x84, 0xd9, 0x5e, 0x56, 0x1b, 0x59, 0xf1, 0x7b, 0x4a, 0x3a, 0x56, 0xfd, 0x3f, + 0xe3, 0x1f, 0xa3, 0xad, 0xff, 0x2c, 0x1b, 0xe4, 0x4f, 0x86, 0xb7, 0x3b, 0x93, 0xfe, 0xbd, 0x7f, + 0xe9, 0xd2, 0xf1, 0x87, 0x38, 0x48, 0xa6, 0x39, 0xad, 0x97, 0x5f, 0x61, 0x7e, 0x6a, 0xa8, 0x33, + 0xec, 0x15, 0x80, 0xee, 0xfc, 0x0f, 0xe7, 0x2d, 0x16, 0x2d, 0xf5, 0xf4, 0x3e, 0x8f, 0x74, 0xe7, + 0xbf, 0x11, 0xe8, 0x65, 0xb4, 0x76, 0x94, 0x6f, 0x06, 0x19, 0xad, 0x1d, 0xe4, 0x4f, 0xab, 0xef, + 0xef, 0xb6, 0xd2, 0xd7, 0x5d, 0xd9, 0xb7, 0x3d, 0x3b, 0x8e, 0xc1, 0xf8, 0x5d, 0x89, 0x46, 0x66, + 0xd6, 0x88, 0x6c, 0x1c, 0x89, 0x72, 0x46, 0x43, 0xf5, 0xe1, 0x5f, 0x00, 0x00, 0x00, 0xff, 0xff, + 0x14, 0x81, 0xb3, 0x02, 0x9e, 0x02, 0x00, 0x00, } diff --git a/rpc/commands/compile.proto b/rpc/commands/compile.proto index f81e31e0d3c..c0441db635f 100644 --- a/rpc/commands/compile.proto +++ b/rpc/commands/compile.proto @@ -37,6 +37,7 @@ message CompileReq { bool quiet = 11; // Suppresses almost every output. string vidPid = 12; // VID/PID specific build properties. string exportFile = 13; // The compiled binary is written to this file + int32 jobs = 14; // The max number of concurrent compiler instances to run (as make -jx) } message CompileResp { From da72e2eb7be583665d2b3d282bfe2bf96e4f1800 Mon Sep 17 00:00:00 2001 From: Martino Facchin Date: Fri, 23 Aug 2019 14:48:00 +0200 Subject: [PATCH 2/4] Use jobs flag to really control number of concurrent compilations The issue was due to the peculiar way concurrency and parallelism are handled in go. We used to set GOMAXPROC to 1 to avoid parallelizing the WaitGroup execution. This would work, in theory, unless the goroutines sleep. In that case, another goroutine is allowed to start concurrently (only 1 goroutine running in parallel, so GOMAXPROC is happy). Since our goroutines sleep (wait) after calling gcc, another task is started, without any hard limit, till the WaitGroup is completely spawned. On systems with limited resources (as RaspberryPi) and cores with lots of files this can trigger an out of memory condition. --- commands/compile/compile.go | 7 ++++++ legacy/builder/builder_utils/utils.go | 34 +++++++++++++++++---------- legacy/builder/types/context.go | 3 +++ 3 files changed, 32 insertions(+), 12 deletions(-) diff --git a/commands/compile/compile.go b/commands/compile/compile.go index 0307f7c2d39..f8a17b03843 100644 --- a/commands/compile/compile.go +++ b/commands/compile/compile.go @@ -23,6 +23,7 @@ import ( "fmt" "io" "path/filepath" + "runtime" "sort" "strings" @@ -115,6 +116,12 @@ func Compile(ctx context.Context, req *rpc.CompileReq, outStream, errStream io.W builderCtx.CoreBuildCachePath = paths.TempDir().Join("arduino-core-cache") + jobs := runtime.NumCPU() + if req.GetJobs() > 0 { + jobs = int(req.GetJobs()) + } + builderCtx.Jobs = jobs + builderCtx.USBVidPid = req.GetVidPid() builderCtx.WarningsLevel = req.GetWarnings() diff --git a/legacy/builder/builder_utils/utils.go b/legacy/builder/builder_utils/utils.go index 07b021ee8bf..72c109da793 100644 --- a/legacy/builder/builder_utils/utils.go +++ b/legacy/builder/builder_utils/utils.go @@ -175,20 +175,30 @@ func compileFilesWithRecipe(ctx *types.Context, sourcePath *paths.Path, sources ctx.Progress.Steps = ctx.Progress.Steps / float64(len(sources)) var wg sync.WaitGroup - wg.Add(len(sources)) - for _, source := range sources { - go func(source *paths.Path) { - defer wg.Done() - PrintProgressIfProgressEnabledAndMachineLogger(ctx) - objectFile, err := compileFileWithRecipe(ctx, sourcePath, source, buildPath, buildProperties, includes, recipe) - if err != nil { - errorsChan <- err - } else { - objectFilesChan <- objectFile + // Split jobs into batches of N jobs each; wait for the completion of a batch to start the next + par := ctx.Jobs + + go func() { + for total := 0; total < len(sources); total += par { + for i := total; i < total+par && i < len(sources); i++ { + wg.Add(1) + go func(source *paths.Path) { + defer wg.Done() + PrintProgressIfProgressEnabledAndMachineLogger(ctx) + objectFile, err := compileFileWithRecipe(ctx, sourcePath, source, buildPath, buildProperties, includes, recipe) + if err != nil { + errorsChan <- err + } else { + objectFilesChan <- objectFile + } + }(sources[i]) } - }(source) - } + wg.Wait() + } + + doneChan <- struct{}{} + }() go func() { wg.Wait() diff --git a/legacy/builder/types/context.go b/legacy/builder/types/context.go index e2ed7ff8706..1a8a6b5c788 100644 --- a/legacy/builder/types/context.go +++ b/legacy/builder/types/context.go @@ -111,6 +111,9 @@ type Context struct { // Experimental: use arduino-preprocessor to create prototypes UseArduinoPreprocessor bool + // Parallel processes + Jobs int + // Out and Err stream to redirect all Exec commands ExecStdout io.Writer ExecStderr io.Writer From 99b20c2464d74eccfe3ec47b2eb27832ea99c908 Mon Sep 17 00:00:00 2001 From: Cristian Maglie Date: Mon, 26 Aug 2019 11:39:09 +0200 Subject: [PATCH 3/4] Fixed race condition at the edge of builds. Using queue channel. The last loop collecting the remaining objectFiles may be run before the last jobs completes. This commit replaces the two channels used to fill objectFiles and to signal error with direct variable access guarded by mutex, this avoids race conditions at the end and streamlines the whole process. Also added a 'queue' channel to feed the goroutines, this is not strictly part of the fix, but helps to fairly distribute the workload. --- legacy/builder/builder_utils/utils.go | 86 +++++++++++++-------------- 1 file changed, 43 insertions(+), 43 deletions(-) diff --git a/legacy/builder/builder_utils/utils.go b/legacy/builder/builder_utils/utils.go index 72c109da793..d1179dc0530 100644 --- a/legacy/builder/builder_utils/utils.go +++ b/legacy/builder/builder_utils/utils.go @@ -169,57 +169,57 @@ func compileFilesWithRecipe(ctx *types.Context, sourcePath *paths.Path, sources if len(sources) == 0 { return objectFiles, nil } - objectFilesChan := make(chan *paths.Path) - errorsChan := make(chan error) - doneChan := make(chan struct{}) + var objectFilesMux sync.Mutex + var errors []error + var errorsMux sync.Mutex ctx.Progress.Steps = ctx.Progress.Steps / float64(len(sources)) - var wg sync.WaitGroup - // Split jobs into batches of N jobs each; wait for the completion of a batch to start the next - par := ctx.Jobs - - go func() { - for total := 0; total < len(sources); total += par { - for i := total; i < total+par && i < len(sources); i++ { - wg.Add(1) - go func(source *paths.Path) { - defer wg.Done() - PrintProgressIfProgressEnabledAndMachineLogger(ctx) - objectFile, err := compileFileWithRecipe(ctx, sourcePath, source, buildPath, buildProperties, includes, recipe) - if err != nil { - errorsChan <- err - } else { - objectFilesChan <- objectFile - } - }(sources[i]) - } - wg.Wait() + queue := make(chan *paths.Path) + job := func(source *paths.Path) { + PrintProgressIfProgressEnabledAndMachineLogger(ctx) + objectFile, err := compileFileWithRecipe(ctx, sourcePath, source, buildPath, buildProperties, includes, recipe) + if err != nil { + errorsMux.Lock() + errors = append(errors, err) + errorsMux.Unlock() + } else { + objectFilesMux.Lock() + objectFiles.Add(objectFile) + objectFilesMux.Unlock() } + } - doneChan <- struct{}{} - }() - - go func() { - wg.Wait() - doneChan <- struct{}{} - }() - - for { - select { - case objectFile := <-objectFilesChan: - objectFiles.Add(objectFile) - case err := <-errorsChan: - return nil, i18n.WrapError(err) - case <-doneChan: - close(objectFilesChan) - for objectFile := range objectFilesChan { - objectFiles.Add(objectFile) + // Spawn jobs runners + var wg sync.WaitGroup + for i := 0; i < ctx.Jobs; i++ { + wg.Add(1) + go func() { + for source := range queue { + job(source) } - objectFiles.Sort() - return objectFiles, nil + wg.Done() + }() + } + + // Feed jobs until error or done + for _, source := range sources { + errorsMux.Lock() + gotError := len(errors) > 0 + errorsMux.Unlock() + if gotError { + break } + queue <- source + } + close(queue) + wg.Wait() + if len(errors) > 0 { + // output the first error + return nil, i18n.WrapError(errors[0]) } + objectFiles.Sort() + return objectFiles, nil } func compileFileWithRecipe(ctx *types.Context, sourcePath *paths.Path, source *paths.Path, buildPath *paths.Path, buildProperties *properties.Map, includes []string, recipe string) (*paths.Path, error) { From cb3c79bffb95464c55b88da79d9cef2617431fd8 Mon Sep 17 00:00:00 2001 From: Cristian Maglie Date: Mon, 26 Aug 2019 11:53:01 +0200 Subject: [PATCH 4/4] Defaults to runtime.NumCPU() if specified jobs number is 0 --- commands/compile/compile.go | 7 +------ legacy/builder/builder_utils/utils.go | 7 ++++++- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/commands/compile/compile.go b/commands/compile/compile.go index f8a17b03843..0a6650a45a0 100644 --- a/commands/compile/compile.go +++ b/commands/compile/compile.go @@ -23,7 +23,6 @@ import ( "fmt" "io" "path/filepath" - "runtime" "sort" "strings" @@ -116,11 +115,7 @@ func Compile(ctx context.Context, req *rpc.CompileReq, outStream, errStream io.W builderCtx.CoreBuildCachePath = paths.TempDir().Join("arduino-core-cache") - jobs := runtime.NumCPU() - if req.GetJobs() > 0 { - jobs = int(req.GetJobs()) - } - builderCtx.Jobs = jobs + builderCtx.Jobs = int(req.GetJobs()) builderCtx.USBVidPid = req.GetVidPid() builderCtx.WarningsLevel = req.GetWarnings() diff --git a/legacy/builder/builder_utils/utils.go b/legacy/builder/builder_utils/utils.go index d1179dc0530..e2ba0650189 100644 --- a/legacy/builder/builder_utils/utils.go +++ b/legacy/builder/builder_utils/utils.go @@ -33,6 +33,7 @@ import ( "os" "os/exec" "path/filepath" + "runtime" "strconv" "strings" "sync" @@ -192,7 +193,11 @@ func compileFilesWithRecipe(ctx *types.Context, sourcePath *paths.Path, sources // Spawn jobs runners var wg sync.WaitGroup - for i := 0; i < ctx.Jobs; i++ { + jobs := ctx.Jobs + if jobs == 0 { + jobs = runtime.NumCPU() + } + for i := 0; i < jobs; i++ { wg.Add(1) go func() { for source := range queue {