Skip to content

Commit 93819d1

Browse files
committed
Simplified pipe-streamer creation in GRPC daemon
1 parent 49d678d commit 93819d1

File tree

1 file changed

+37
-43
lines changed

1 file changed

+37
-43
lines changed

daemon/daemon.go

Lines changed: 37 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -57,91 +57,85 @@ func (s *ArduinoCoreServerImpl) Init(ctx context.Context, req *rpc.InitReq) (*rp
5757
}
5858

5959
func (s *ArduinoCoreServerImpl) Compile(req *rpc.CompileReq, stream rpc.ArduinoCore_CompileServer) error {
60-
r, w := io.Pipe()
61-
go func() {
62-
data := make([]byte, 1024)
63-
for {
64-
if n, err := r.Read(data); err != nil {
65-
return
66-
} else {
67-
stream.Send(&rpc.CompileResp{Output: data[:n]})
68-
}
69-
}
70-
}()
71-
resp, err := compile.Compile(stream.Context(), req, w, func(taskProgress *rpc.TaskProgress) {
72-
stream.Send(&rpc.CompileResp{TaskProgress: taskProgress})
73-
}, func(downloadProgress *rpc.DownloadProgress) {
74-
stream.Send(&rpc.CompileResp{DownloadProgress: downloadProgress})
75-
})
60+
resp, err := compile.Compile(
61+
stream.Context(), req,
62+
feedStream(func(data []byte) { stream.Send(&rpc.CompileResp{Output: data}) }),
63+
func(p *rpc.TaskProgress) { stream.Send(&rpc.CompileResp{TaskProgress: p}) },
64+
func(p *rpc.DownloadProgress) { stream.Send(&rpc.CompileResp{DownloadProgress: p}) },
65+
)
7666
stream.Send(resp)
7767
return err
7868
}
7969

8070
func (s *ArduinoCoreServerImpl) PlatformInstall(req *rpc.PlatformInstallReq, stream rpc.ArduinoCore_PlatformInstallServer) error {
81-
resp, err := core.PlatformInstall(stream.Context(), req, func(progress *rpc.DownloadProgress) {
82-
stream.Send(&rpc.PlatformInstallResp{Progress: progress})
83-
}, func(taskProgress *rpc.TaskProgress) {
84-
stream.Send(&rpc.PlatformInstallResp{TaskProgress: taskProgress})
85-
})
71+
resp, err := core.PlatformInstall(
72+
stream.Context(), req,
73+
func(p *rpc.DownloadProgress) { stream.Send(&rpc.PlatformInstallResp{Progress: p}) },
74+
func(p *rpc.TaskProgress) { stream.Send(&rpc.PlatformInstallResp{TaskProgress: p}) },
75+
)
8676
if err != nil {
8777
return err
8878
}
8979
return stream.Send(resp)
9080
}
9181

9282
func (s *ArduinoCoreServerImpl) PlatformDownload(req *rpc.PlatformDownloadReq, stream rpc.ArduinoCore_PlatformDownloadServer) error {
93-
resp, err := core.PlatformDownload(stream.Context(), req, func(progress *rpc.DownloadProgress) {
94-
stream.Send(&rpc.PlatformDownloadResp{Progress: progress})
95-
})
83+
resp, err := core.PlatformDownload(
84+
stream.Context(), req,
85+
func(p *rpc.DownloadProgress) { stream.Send(&rpc.PlatformDownloadResp{Progress: p}) },
86+
)
9687
if err != nil {
9788
return err
9889
}
9990
return stream.Send(resp)
10091
}
10192

10293
func (s *ArduinoCoreServerImpl) PlatformUninstall(req *rpc.PlatformUninstallReq, stream rpc.ArduinoCore_PlatformUninstallServer) error {
103-
resp, err := core.PlatformUninstall(stream.Context(), req, func(taskProgress *rpc.TaskProgress) {
104-
stream.Send(&rpc.PlatformUninstallResp{TaskProgress: taskProgress})
105-
})
94+
resp, err := core.PlatformUninstall(
95+
stream.Context(), req,
96+
func(p *rpc.TaskProgress) { stream.Send(&rpc.PlatformUninstallResp{TaskProgress: p}) },
97+
)
10698
if err != nil {
10799
return err
108100
}
109101
return stream.Send(resp)
110102
}
111103

112104
func (s *ArduinoCoreServerImpl) PlatformUpgrade(req *rpc.PlatformUpgradeReq, stream rpc.ArduinoCore_PlatformUpgradeServer) error {
113-
resp, err := core.PlatformUpgrade(stream.Context(), req, func(progress *rpc.DownloadProgress) {
114-
stream.Send(&rpc.PlatformUpgradeResp{Progress: progress})
115-
}, func(taskProgress *rpc.TaskProgress) {
116-
stream.Send(&rpc.PlatformUpgradeResp{TaskProgress: taskProgress})
117-
})
105+
resp, err := core.PlatformUpgrade(
106+
stream.Context(), req,
107+
func(p *rpc.DownloadProgress) { stream.Send(&rpc.PlatformUpgradeResp{Progress: p}) },
108+
func(p *rpc.TaskProgress) { stream.Send(&rpc.PlatformUpgradeResp{TaskProgress: p}) },
109+
)
118110
if err != nil {
119111
return err
120112
}
121113
return stream.Send(resp)
122114
}
123115

124116
func (s *ArduinoCoreServerImpl) Upload(req *rpc.UploadReq, stream rpc.ArduinoCore_UploadServer) error {
125-
r, w := io.Pipe()
126-
r2, w2 := io.Pipe()
127-
128-
feedStream(r, func(data []byte) { stream.Send(&rpc.UploadResp{OutStream: data}) })
129-
feedStream(r2, func(data []byte) { stream.Send(&rpc.UploadResp{ErrStream: data}) })
130-
131-
resp, err := upload.Upload(stream.Context(), req, w, w2)
132-
stream.Send(resp)
133-
return err
117+
resp, err := upload.Upload(
118+
stream.Context(), req,
119+
feedStream(func(data []byte) { stream.Send(&rpc.UploadResp{OutStream: data}) }),
120+
feedStream(func(data []byte) { stream.Send(&rpc.UploadResp{ErrStream: data}) }),
121+
)
122+
if err != nil {
123+
return err
124+
}
125+
return stream.Send(resp)
134126
}
135127

136-
func feedStream(out io.Reader, streamer func(data []byte)) {
128+
func feedStream(streamer func(data []byte)) io.Writer {
129+
r, w := io.Pipe()
137130
go func() {
138131
data := make([]byte, 1024)
139132
for {
140-
if n, err := out.Read(data); err != nil {
133+
if n, err := r.Read(data); err != nil {
141134
return
142135
} else {
143136
streamer(data[:n])
144137
}
145138
}
146139
}()
140+
return w
147141
}

0 commit comments

Comments
 (0)