Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check failed job #188

Merged
merged 7 commits into from
Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 79 additions & 55 deletions internal/controllers/storage/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package storage

import (
"context"
"errors"
"fmt"
"regexp"
"strings"
Expand Down Expand Up @@ -164,71 +163,84 @@ func (r *Reconciler) initializeStorage(

if initJob.Status.Succeeded > 0 {
r.Log.Info("Init Job status succeeded")
podLogs, err := r.getSucceededJobLogs(ctx, storage, initJob)
r.Recorder.Event(
storage,
corev1.EventTypeNormal,
"InitializingStorage",
"Storage initialized successfully",
)
return r.setInitStorageCompleted(ctx, storage, "Storage initialized successfully")
}

var conditionFailed bool
for _, condition := range initJob.Status.Conditions {
if condition.Type == batchv1.JobFailed {
conditionFailed = true
break
}
}

//nolint:nestif
if initJob.Status.Failed > 0 {
initialized, err := r.checkFailedJob(ctx, storage, initJob)
if err != nil {
r.Recorder.Event(
storage,
corev1.EventTypeWarning,
"ControllerError",
fmt.Sprintf("Failed to get succeeded Pod for Job: %s", err),
fmt.Sprintf("Failed to check logs from failed Pod for Job: %s", err),
)
return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err
}

if mismatchItemConfigGenerationRegexp.MatchString(podLogs) {
if initialized {
r.Log.Info("Storage is already initialized, continuing...")
r.Recorder.Event(
storage,
corev1.EventTypeNormal,
"InitializingStorage",
"Storage initialization attempted and skipped, storage already initialized",
)
if err := r.Delete(ctx, initJob, client.PropagationPolicy(metav1.DeletePropagationForeground)); err != nil {
r.Recorder.Event(
storage,
corev1.EventTypeWarning,
"ControllerError",
fmt.Sprintf("Failed to delete Job: %s", err),
)
return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err
}
return r.setInitStorageCompleted(ctx, storage, "Storage already initialized")
}

r.Recorder.Event(
storage,
corev1.EventTypeNormal,
"InitializingStorage",
"Storage initialized successfully",
)
return r.setInitStorageCompleted(ctx, storage, "Storage initialized successfully")
}

var conditionFailed bool
for _, condition := range initJob.Status.Conditions {
if condition.Type == batchv1.JobFailed {
conditionFailed = true
break
}
}
if initJob.Status.Failed == *initJob.Spec.BackoffLimit || conditionFailed {
r.Log.Info("Init Job status failed")
r.Recorder.Event(
storage,
corev1.EventTypeWarning,
"InitializingStorage",
"Failed to initializing Storage",
)
if err := r.Delete(ctx, initJob); err != nil {
if initJob.Status.Failed == *initJob.Spec.BackoffLimit || conditionFailed {
r.Log.Info("Init Job status failed")
r.Recorder.Event(
storage,
corev1.EventTypeWarning,
"ControllerError",
fmt.Sprintf("Failed to delete Job: %s", err),
"InitializingStorage",
"Failed to initializing Storage",
)
return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err
if err := r.Delete(ctx, initJob, client.PropagationPolicy(metav1.DeletePropagationForeground)); err != nil {
r.Recorder.Event(
storage,
corev1.EventTypeWarning,
"ControllerError",
fmt.Sprintf("Failed to delete Job: %s", err),
)
return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err
}
}
}

return Stop, ctrl.Result{RequeueAfter: StorageInitializationRequeueDelay}, nil
}

func (r *Reconciler) getSucceededJobLogs(
func (r *Reconciler) checkFailedJob(
ctx context.Context,
storage *resources.StorageClusterBuilder,
job *batchv1.Job,
) (string, error) {
) (bool, error) {
podList := &corev1.PodList{}
opts := []client.ListOption{
client.InNamespace(storage.Namespace),
Expand All @@ -243,41 +255,53 @@ func (r *Reconciler) getSucceededJobLogs(
"ControllerError",
fmt.Sprintf("Failed to list pods for Job: %s", err),
)
return "", fmt.Errorf("failed to list pods for getSucceededJobLogs, error: %w", err)
return false, fmt.Errorf("failed to list pods for checkFailedJob, error: %w", err)
}

// Assuming there is only one succeeded pod, you can adjust the logic if needed
for _, pod := range podList.Items {
if pod.Status.Phase == corev1.PodSucceeded {
if pod.Status.Phase == corev1.PodFailed {
clientset, err := kubernetes.NewForConfig(r.Config)
if err != nil {
return "", fmt.Errorf("failed to initialize clientset for getSucceededJobLogs, error: %w", err)
return false, fmt.Errorf("failed to initialize clientset for checkFailedJob, error: %w", err)
}

podLogs, err := clientset.CoreV1().
Pods(storage.Namespace).
GetLogs(pod.Name, &corev1.PodLogOptions{}).
Stream(context.TODO())
podLogs, err := getPodLogs(ctx, clientset, storage.Namespace, pod.Name)
if err != nil {
return "", fmt.Errorf("failed to stream logs from pod for getSucceededJobLogs, error: %w", err)
return false, fmt.Errorf("failed to get pod logs for checkFailedJob, error: %w", err)
}
defer podLogs.Close()

var logsBuilder strings.Builder
buf := make([]byte, 4096)
for {
numBytes, err := podLogs.Read(buf)
if numBytes == 0 && err != nil {
break
}
logsBuilder.Write(buf[:numBytes])

if mismatchItemConfigGenerationRegexp.MatchString(podLogs) {
return true, nil
}
}
}
return false, nil
}

func getPodLogs(ctx context.Context, clientset *kubernetes.Clientset, namespace, name string) (string, error) {
var logsBuilder strings.Builder

streamCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
podLogs, err := clientset.CoreV1().
Pods(namespace).
GetLogs(name, &corev1.PodLogOptions{}).
Stream(streamCtx)
if err != nil {
return "", fmt.Errorf("failed to stream GetLogs from pod %s/%s, error: %w", namespace, name, err)
}
defer podLogs.Close()

return logsBuilder.String(), nil
buf := make([]byte, 4096)
for {
numBytes, err := podLogs.Read(buf)
if numBytes == 0 && err != nil {
break
}
logsBuilder.Write(buf[:numBytes])
}

return "", errors.New("failed to get succeeded Pod for getSucceededJobLogs")
return logsBuilder.String(), nil
}

func shouldIgnoreJobUpdate() resources.IgnoreChangesFunction {
Expand Down
16 changes: 15 additions & 1 deletion internal/resources/storage_init_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (b *StorageInitJobBuilder) Build(obj client.Object) error {
Parallelism: ptr.Int32(1),
Completions: ptr.Int32(1),
ActiveDeadlineSeconds: ptr.Int64(300),
BackoffLimit: ptr.Int32(3),
BackoffLimit: ptr.Int32(6),
Template: b.buildInitJobPodTemplateSpec(),
}

Expand Down Expand Up @@ -121,10 +121,24 @@ func (b *StorageInitJobBuilder) buildInitJobPodTemplateSpec() corev1.PodTemplate
)
}

if b.Spec.HostNetwork {
podTemplate.Spec.HostNetwork = true
}

if b.Spec.Image.PullSecret != nil {
podTemplate.Spec.ImagePullSecrets = []corev1.LocalObjectReference{{Name: *b.Spec.Image.PullSecret}}
}

if value, ok := b.ObjectMeta.Annotations[api.AnnotationUpdateDNSPolicy]; ok {
switch value {
case string(corev1.DNSClusterFirstWithHostNet), string(corev1.DNSClusterFirst), string(corev1.DNSDefault), string(corev1.DNSNone):
podTemplate.Spec.DNSPolicy = corev1.DNSPolicy(value)
case "":
podTemplate.Spec.DNSPolicy = corev1.DNSClusterFirst
default:
}
}

return podTemplate
}

Expand Down
Loading