Skip to content

Commit 15cf0c6

Browse files
authored
[Refactor] DCR Monitor Refactoring (3/3) (#29)
2/3 PR for a major refactoring of DCR Monitor. See #15. 1/3: #16 2/3: #19 This pretty much concludes the major refactoring, resolving #15 # Changes * Added `Created` job status, because the reconciler needs to handle newly created jobs. * Move KanikoService functionality into the monitor (=reconciler) * Remove unused config `Cluster.PodServiceAccount` * Clean up ARG/ENV of TEE dockerfile * In-memory Dockerfile injection in API. See `addDockerfileToTarGz`. This resolves #3. Also Kaniko can just refer to the remote object path. * Removed most of shallow functions in `Config` * Completely removed `CloudProvider` * Removed shallow / no-longer used modules in `pkg`: `pkg/utils/constant.go`, `pkg/utils/file.go`, and `pkg/utils/k8s.go`. # Testing Manual end-to-end testing.
1 parent 12deb21 commit 15cf0c6

File tree

24 files changed

+308
-816
lines changed

24 files changed

+308
-816
lines changed

BUILD

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,7 @@ echo 'CloudProvider:
1717
Zone: "{zone}"
1818
Region: "{region}"
1919
Debug: false
20-
Env: {env}
21-
Cluster:
22-
PodServiceAccount: "dcr-k8s-pod-sa"' > $@
20+
Env: {env}' > $@
2321
""".format(
2422
env = env,
2523
project_id = project_id,

app/dcr_api/Dockerfile

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,18 @@
11
ARG BASE_IMAGE
22
FROM $BASE_IMAGE
33
ARG OUTPUTPATH
4-
ARG ENCRYPTED_FILENAME
5-
ARG ENCRYPTED_CLOUDSTORAGE_PATH
6-
ARG CREATOR
7-
ARG IMPERSONATION_SERVICE_ACCOUNT
84
ARG JUPYTER_FILENAME
95
ARG USER_WORKSPACE
106
ARG CUSTOMTOKEN_CLOUDSTORAGE_PATH
117

128
ENV OUTPUTPATH=$OUTPUTPATH
13-
ENV ENCRYPTED_FILENAME=$ENCRYPTED_FILENAME
14-
ENV ENCRYPTED_CLOUDSTORAGE_PATH=$ENCRYPTED_CLOUDSTORAGE_PATH
15-
ENV IMPERSONATION_SERVICE_ACCOUNT=$IMPERSONATION_SERVICE_ACCOUNT
16-
ENV CREATOR=$CREATOR
179
ENV JUPYTER_FILENAME=$JUPYTER_FILENAME
1810
ENV CUSTOMTOKEN_CLOUDSTORAGE_PATH=$CUSTOMTOKEN_CLOUDSTORAGE_PATH
1911

2012
WORKDIR /home/jovyan
2113
COPY $USER_WORKSAPCE/* ./
2214

23-
LABEL "tee.launch_policy.allow_env_override"="USER_TOKEN,EXECUTION_STAGE,DEPLOYMENT_ENV,PROJECT_ID,KEY_LOCATION"
15+
LABEL "tee.launch_policy.allow_env_override"="USER_TOKEN,EXECUTION_STAGE,DEPLOYMENT_ENV,PROJECT_ID"
2416

2517
ENTRYPOINT jupyter nbconvert --execute --to notebook --inplace $JUPYTER_FILENAME --ExecutePreprocessor.timeout=-1 --allow-errors \
2618
&& hash=$(md5sum $JUPYTER_FILENAME | awk '{ print $1 }') \

app/dcr_api/biz/dal/db/job.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ type Job struct {
2727
UUID string `gorm:"uuid" json:"uuid"`
2828
Creator string `gorm:"creator" json:"creator"`
2929
JupyterFileName string `gorm:"jupyter_file_name" json:"jupyter_file_name"`
30+
BuildContextPath string `gorm:"build_context_path" json:"build_context_path"`
3031
DockerImage string `gorm:"docker_image" json:"docker_image"`
3132
DockerImageDigest string `gorm:"docker_image_digest" json:"docker_image_digest"`
3233
AttestationReport string `gorm:"attestation_report" json:"attestation_report"`
@@ -50,7 +51,7 @@ func CreateJob(job *Job) error {
5051
}
5152

5253
func UpdateJob(j *Job) error {
53-
result := DB.Model(j).Updates(Job{JobStatus: j.JobStatus, DockerImageDigest: j.DockerImageDigest, DockerImage: j.DockerImage, AttestationReport: j.AttestationReport, InstanceName: j.InstanceName})
54+
result := DB.Model(j).Updates(Job{JobStatus: j.JobStatus, BuildContextPath: j.BuildContextPath, DockerImageDigest: j.DockerImageDigest, DockerImage: j.DockerImage, AttestationReport: j.AttestationReport, InstanceName: j.InstanceName})
5455
if result.Error != nil {
5556
return errors.Wrap(result.Error, "failed to update job %v")
5657
}
@@ -98,15 +99,15 @@ func QueryJobByUUIDAndCreator(creator string, uuid string) (*Job, error) {
9899

99100
func GetInProgressJobs(creator string) ([]*Job, error) {
100101
var res []*Job
101-
if err := DB.Model(Job{}).Where("creator = ? AND job_status in (1, 3, 4)", creator).Find(&res).Error; err != nil {
102+
if err := DB.Model(Job{}).Where("creator = ? AND job_status in (0, 1, 3, 4)", creator).Find(&res).Error; err != nil {
102103
return nil, errors.Wrap(err, "failed to find finished job status")
103104
}
104105
return res, nil
105106
}
106107

107108
func GetAllInProgressJobs() ([]*Job, error) {
108109
var res []*Job
109-
if err := DB.Model(Job{}).Where("job_status in (1, 3, 4)").Find(&res).Error; err != nil {
110+
if err := DB.Model(Job{}).Where("job_status in (0, 1, 3, 4)").Find(&res).Error; err != nil {
110111
return nil, errors.Wrap(err, "failed to find finished job status")
111112
}
112113
return res, nil

app/dcr_api/biz/model/job/job.go

Lines changed: 5 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

app/dcr_api/biz/service/BUILD.bazel

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,17 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
22

33
go_library(
44
name = "service",
5-
srcs = [
6-
"build_service.go",
7-
"job_service.go",
8-
"kaniko_service.go",
9-
],
5+
srcs = ["job_service.go"],
106
importpath = "github.com/manatee-project/manatee/app/dcr_api/biz/service",
117
visibility = ["//visibility:public"],
128
deps = [
139
"//app/dcr_api/biz/dal/db",
1410
"//app/dcr_api/biz/model/job",
15-
"//pkg/cloud",
1611
"//pkg/config",
1712
"//pkg/errno",
18-
"//pkg/utils",
1913
"@com_github_cloudwego_hertz//pkg/common/hlog",
20-
"@com_github_docker_docker//pkg/archive",
2114
"@com_github_google_uuid//:uuid",
2215
"@com_github_pkg_errors//:errors",
23-
"@io_k8s_api//batch/v1:batch",
24-
"@io_k8s_api//core/v1:core",
25-
"@io_k8s_apimachinery//pkg/api/resource",
26-
"@io_k8s_apimachinery//pkg/apis/meta/v1:meta",
27-
"@io_k8s_client_go//kubernetes",
28-
"@io_k8s_client_go//rest",
16+
"@com_google_cloud_go_storage//:storage",
2917
],
3018
)

app/dcr_api/biz/service/build_service.go

Lines changed: 0 additions & 24 deletions
This file was deleted.

app/dcr_api/biz/service/job_service.go

Lines changed: 150 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,19 +14,22 @@
1414
package service
1515

1616
import (
17+
"archive/tar"
18+
"bytes"
19+
"compress/gzip"
1720
"context"
1821
"encoding/base64"
1922
"fmt"
2023
"io"
24+
"os"
2125

26+
"cloud.google.com/go/storage"
2227
"github.com/cloudwego/hertz/pkg/common/hlog"
2328
"github.com/google/uuid"
2429
"github.com/manatee-project/manatee/app/dcr_api/biz/dal/db"
2530
"github.com/manatee-project/manatee/app/dcr_api/biz/model/job"
26-
"github.com/manatee-project/manatee/pkg/cloud"
2731
"github.com/manatee-project/manatee/pkg/config"
2832
"github.com/manatee-project/manatee/pkg/errno"
29-
"github.com/manatee-project/manatee/pkg/utils"
3033
"github.com/pkg/errors"
3134
)
3235

@@ -49,23 +52,33 @@ func (js *JobService) SubmitJob(req *job.SubmitJobRequest, userWorkspace io.Read
4952
return "", errors.Wrap(fmt.Errorf(errno.ReachJobLimitErrMsg), "")
5053
}
5154

52-
provider := cloud.GetCloudProvider(js.ctx)
53-
err = provider.UploadFile(userWorkspace, config.GetUserWorkSpacePath(creator), false)
55+
// inject Dockerfile in /usr/local/dcr_conf/Dockerfile into the build context
56+
// FIXME: avoid reading the file system, use a string instead in the future.
57+
dockerFileContent, err := os.ReadFile("/usr/local/dcr_conf/Dockerfile")
5458
if err != nil {
5559
return "", err
5660
}
61+
buildctx, err := js.addDockerfileToTarGz(userWorkspace, string(dockerFileContent))
62+
if err != nil {
63+
return "", err
64+
}
65+
err = js.uploadFile(buildctx, fmt.Sprintf("%s/%s-workspace.tar.gz", creator, creator), false)
66+
if err != nil {
67+
return "", err
68+
}
69+
buildctxpath := fmt.Sprintf("gs://%s/%s/%s-workspace.tar.gz", config.GetBucket(), creator, creator)
5770

5871
uuidStr, err := uuid.NewUUID()
5972
if err != nil {
6073
return "", errors.Wrap(err, "failed to generate uuid")
6174
}
6275
t := db.Job{
63-
UUID: uuidStr.String(),
64-
Creator: req.Creator,
65-
JupyterFileName: req.JupyterFileName,
66-
JobStatus: int(job.JobStatus_ImageBuilding),
76+
UUID: uuidStr.String(),
77+
Creator: req.Creator,
78+
JupyterFileName: req.JupyterFileName,
79+
JobStatus: int(job.JobStatus_Created),
80+
BuildContextPath: buildctxpath,
6781
}
68-
err = BuildImage(js.ctx, t, req.AccessToken)
6982
if err != nil {
7083
return "", err
7184
}
@@ -78,15 +91,69 @@ func (js *JobService) SubmitJob(req *job.SubmitJobRequest, userWorkspace io.Read
7891
return uuidStr.String(), nil
7992
}
8093

94+
func (js *JobService) addDockerfileToTarGz(input io.Reader, dockerfileContent string) (io.Reader, error) {
95+
gzReader, err := gzip.NewReader(input)
96+
if err != nil {
97+
return nil, fmt.Errorf("failed to create gzip reader: %w", err)
98+
}
99+
defer gzReader.Close()
100+
101+
var buffer bytes.Buffer
102+
gzWriter := gzip.NewWriter(&buffer)
103+
defer gzWriter.Close()
104+
105+
tarReader := tar.NewReader(gzReader)
106+
tarWriter := tar.NewWriter(gzWriter)
107+
defer tarWriter.Close()
108+
for {
109+
header, err := tarReader.Next()
110+
if err == io.EOF {
111+
break
112+
}
113+
if err != nil {
114+
return nil, fmt.Errorf("failed to read tar entry: %w", err)
115+
}
116+
117+
// Write the existing header and file content to the new tar archive
118+
if err := tarWriter.WriteHeader(header); err != nil {
119+
return nil, fmt.Errorf("failed to write tar header: %w", err)
120+
}
121+
122+
if _, err := io.Copy(tarWriter, tarReader); err != nil {
123+
return nil, fmt.Errorf("failed to write tar entry: %w", err)
124+
}
125+
}
126+
// Add the Dockerfile as a new entry
127+
dockerfileHeader := &tar.Header{
128+
Name: "Dockerfile",
129+
Size: int64(len(dockerfileContent)),
130+
Mode: 0600,
131+
}
132+
if err := tarWriter.WriteHeader(dockerfileHeader); err != nil {
133+
return nil, fmt.Errorf("failed to write Dockerfile header: %w", err)
134+
}
135+
if _, err := tarWriter.Write([]byte(dockerfileContent)); err != nil {
136+
return nil, fmt.Errorf("failed to write Dockerfile content: %w", err)
137+
}
138+
// Close the tar and gzip writers
139+
if err := tarWriter.Close(); err != nil {
140+
return nil, fmt.Errorf("failed to close tar writer: %w", err)
141+
}
142+
if err := gzWriter.Close(); err != nil {
143+
return nil, fmt.Errorf("failed to close gzip writer: %w", err)
144+
}
145+
return &buffer, nil
146+
}
147+
81148
func convertEntityToModel(j *db.Job) *job.Job {
82149
return &job.Job{
83150
ID: int64(j.ID),
84151
UUID: j.UUID,
85152
Creator: j.Creator,
86153
JobStatus: job.JobStatus(j.JobStatus),
87154
JupyterFileName: j.JupyterFileName,
88-
CreatedAt: j.CreatedAt.Format(utils.Layout),
89-
UpdatedAt: j.UpdatedAt.Format(utils.Layout),
155+
CreatedAt: j.CreatedAt.Format("2006-01-02 15:04:05"),
156+
UpdatedAt: j.UpdatedAt.Format("2006-01-02 15:04:05"),
90157
}
91158
}
92159

@@ -107,24 +174,22 @@ func (js *JobService) GetJobOutputAttrs(req *job.QueryJobOutputRequest) (string,
107174
if err != nil {
108175
return "", 0, err
109176
}
110-
outputPath := config.GetJobOutputPath(j.Creator, j.UUID, j.JupyterFileName)
177+
outputPath := js.getJobOutputPath(j.Creator, j.UUID, j.JupyterFileName)
111178

112-
provider := cloud.GetCloudProvider(js.ctx)
113-
size, err := provider.GetFileSize(outputPath)
179+
size, err := js.getFileSize(outputPath)
114180
if err != nil {
115181
return "", 0, err
116182
}
117-
return config.GetJobOutputFilename(fmt.Sprintf("%v", j.ID), j.JupyterFileName), size, nil
183+
return js.getJobOutputFilename(fmt.Sprintf("%v", j.ID), j.JupyterFileName), size, nil
118184
}
119185

120186
func (js *JobService) DownloadJobOutput(req *job.DownloadJobOutputRequest) (string, error) {
121187
j, err := db.QueryJobByIdAndCreator(req.ID, req.Creator)
122188
if err != nil {
123189
return "", err
124190
}
125-
outputPath := config.GetJobOutputPath(j.Creator, j.UUID, j.JupyterFileName)
126-
provider := cloud.GetCloudProvider(js.ctx)
127-
datg, err := provider.GetFilebyChunk(outputPath, req.Offset, req.Chunk)
191+
outputPath := js.getJobOutputPath(j.Creator, j.UUID, j.JupyterFileName)
192+
datg, err := js.getFilebyChunk(outputPath, req.Offset, req.Chunk)
128193
if err != nil {
129194
return "", err
130195
}
@@ -146,3 +211,70 @@ func (js *JobService) GetJobAttestationReport(req *job.QueryJobAttestationReques
146211
}
147212
return j.AttestationReport, nil
148213
}
214+
215+
func (js *JobService) getJobOutputFilename(UUID string, originName string) string {
216+
return fmt.Sprintf("out-%s-%s", UUID, originName)
217+
}
218+
219+
func (js *JobService) getJobOutputPath(creator string, UUID string, originName string) string {
220+
return fmt.Sprintf("%s/output/%s", creator, js.getJobOutputFilename(UUID, originName))
221+
}
222+
223+
func (g *JobService) getFileSize(remotePath string) (int64, error) {
224+
client, err := storage.NewClient(g.ctx)
225+
if err != nil {
226+
return 0, errors.Wrap(err, "failed to create gcp storage client")
227+
}
228+
defer client.Close()
229+
bucket := config.GetBucket()
230+
attr, err := client.Bucket(bucket).Object(remotePath).Attrs(g.ctx)
231+
if err != nil {
232+
return 0, errors.Wrap(err, "failed to get file attributes, or it doesn't exist")
233+
}
234+
return attr.Size, nil
235+
}
236+
237+
func (g *JobService) getFilebyChunk(remotePath string, offset int64, chunkSize int64) ([]byte, error) {
238+
client, err := storage.NewClient(g.ctx)
239+
if err != nil {
240+
return nil, errors.Wrap(err, "failed to create gcp storage client")
241+
}
242+
defer client.Close()
243+
bucket := config.GetBucket()
244+
objectHandle := client.Bucket(bucket).Object(remotePath)
245+
objectReader, err := objectHandle.NewRangeReader(g.ctx, offset, chunkSize)
246+
if err != nil {
247+
return nil, errors.Wrap(err, fmt.Sprintf("failed to create reader on %s", remotePath))
248+
}
249+
defer objectReader.Close()
250+
data := make([]byte, chunkSize)
251+
n, err := objectReader.Read(data)
252+
if err != nil {
253+
return nil, errors.Wrap(err, "failed to read cloud storage object")
254+
}
255+
data = data[:n]
256+
return data, nil
257+
}
258+
259+
func (g *JobService) uploadFile(reader io.Reader, remotePath string, compress bool) error {
260+
client, err := storage.NewClient(g.ctx)
261+
if err != nil {
262+
return errors.Wrap(err, "failed to create storage client")
263+
}
264+
defer client.Close()
265+
bucket := config.GetBucket()
266+
writer := client.Bucket(bucket).Object(remotePath).NewWriter(g.ctx)
267+
defer writer.Close()
268+
if compress {
269+
gzipWriter := gzip.NewWriter(writer)
270+
if _, err = io.Copy(gzipWriter, reader); err != nil {
271+
return errors.Wrap(err, "failed to copy content to gzip writer")
272+
}
273+
defer gzipWriter.Close()
274+
} else {
275+
if _, err = io.Copy(writer, reader); err != nil {
276+
return errors.Wrap(err, "failed to copy content to writer")
277+
}
278+
}
279+
return nil
280+
}

0 commit comments

Comments
 (0)