@@ -2,7 +2,6 @@ package storage
2
2
3
3
import (
4
4
"context"
5
- "errors"
6
5
"fmt"
7
6
"regexp"
8
7
"strings"
@@ -164,71 +163,84 @@ func (r *Reconciler) initializeStorage(
164
163
165
164
if initJob .Status .Succeeded > 0 {
166
165
r .Log .Info ("Init Job status succeeded" )
167
- podLogs , err := r .getSucceededJobLogs (ctx , storage , initJob )
166
+ r .Recorder .Event (
167
+ storage ,
168
+ corev1 .EventTypeNormal ,
169
+ "InitializingStorage" ,
170
+ "Storage initialized successfully" ,
171
+ )
172
+ return r .setInitStorageCompleted (ctx , storage , "Storage initialized successfully" )
173
+ }
174
+
175
+ var conditionFailed bool
176
+ for _ , condition := range initJob .Status .Conditions {
177
+ if condition .Type == batchv1 .JobFailed {
178
+ conditionFailed = true
179
+ break
180
+ }
181
+ }
182
+
183
+ //nolint:nestif
184
+ if initJob .Status .Failed > 0 {
185
+ initialized , err := r .checkFailedJob (ctx , storage , initJob )
168
186
if err != nil {
169
187
r .Recorder .Event (
170
188
storage ,
171
189
corev1 .EventTypeWarning ,
172
190
"ControllerError" ,
173
- fmt .Sprintf ("Failed to get succeeded Pod for Job: %s" , err ),
191
+ fmt .Sprintf ("Failed to check logs from failed Pod for Job: %s" , err ),
174
192
)
175
193
return Stop , ctrl.Result {RequeueAfter : DefaultRequeueDelay }, err
176
194
}
177
195
178
- if mismatchItemConfigGenerationRegexp . MatchString ( podLogs ) {
196
+ if initialized {
179
197
r .Log .Info ("Storage is already initialized, continuing..." )
180
198
r .Recorder .Event (
181
199
storage ,
182
200
corev1 .EventTypeNormal ,
183
201
"InitializingStorage" ,
184
202
"Storage initialization attempted and skipped, storage already initialized" ,
185
203
)
204
+ if err := r .Delete (ctx , initJob , client .PropagationPolicy (metav1 .DeletePropagationForeground )); err != nil {
205
+ r .Recorder .Event (
206
+ storage ,
207
+ corev1 .EventTypeWarning ,
208
+ "ControllerError" ,
209
+ fmt .Sprintf ("Failed to delete Job: %s" , err ),
210
+ )
211
+ return Stop , ctrl.Result {RequeueAfter : DefaultRequeueDelay }, err
212
+ }
186
213
return r .setInitStorageCompleted (ctx , storage , "Storage already initialized" )
187
214
}
188
215
189
- r .Recorder .Event (
190
- storage ,
191
- corev1 .EventTypeNormal ,
192
- "InitializingStorage" ,
193
- "Storage initialized successfully" ,
194
- )
195
- return r .setInitStorageCompleted (ctx , storage , "Storage initialized successfully" )
196
- }
197
-
198
- var conditionFailed bool
199
- for _ , condition := range initJob .Status .Conditions {
200
- if condition .Type == batchv1 .JobFailed {
201
- conditionFailed = true
202
- break
203
- }
204
- }
205
- if initJob .Status .Failed == * initJob .Spec .BackoffLimit || conditionFailed {
206
- r .Log .Info ("Init Job status failed" )
207
- r .Recorder .Event (
208
- storage ,
209
- corev1 .EventTypeWarning ,
210
- "InitializingStorage" ,
211
- "Failed to initializing Storage" ,
212
- )
213
- if err := r .Delete (ctx , initJob ); err != nil {
216
+ if initJob .Status .Failed == * initJob .Spec .BackoffLimit || conditionFailed {
217
+ r .Log .Info ("Init Job status failed" )
214
218
r .Recorder .Event (
215
219
storage ,
216
220
corev1 .EventTypeWarning ,
217
- "ControllerError " ,
218
- fmt . Sprintf ( "Failed to delete Job: %s" , err ) ,
221
+ "InitializingStorage " ,
222
+ "Failed to initializing Storage" ,
219
223
)
220
- return Stop , ctrl.Result {RequeueAfter : DefaultRequeueDelay }, err
224
+ if err := r .Delete (ctx , initJob , client .PropagationPolicy (metav1 .DeletePropagationForeground )); err != nil {
225
+ r .Recorder .Event (
226
+ storage ,
227
+ corev1 .EventTypeWarning ,
228
+ "ControllerError" ,
229
+ fmt .Sprintf ("Failed to delete Job: %s" , err ),
230
+ )
231
+ return Stop , ctrl.Result {RequeueAfter : DefaultRequeueDelay }, err
232
+ }
221
233
}
222
234
}
223
235
224
236
return Stop , ctrl.Result {RequeueAfter : StorageInitializationRequeueDelay }, nil
225
237
}
226
238
227
- func (r * Reconciler ) getSucceededJobLogs (
239
+ func (r * Reconciler ) checkFailedJob (
228
240
ctx context.Context ,
229
241
storage * resources.StorageClusterBuilder ,
230
242
job * batchv1.Job ,
231
- ) (string , error ) {
243
+ ) (bool , error ) {
232
244
podList := & corev1.PodList {}
233
245
opts := []client.ListOption {
234
246
client .InNamespace (storage .Namespace ),
@@ -243,41 +255,53 @@ func (r *Reconciler) getSucceededJobLogs(
243
255
"ControllerError" ,
244
256
fmt .Sprintf ("Failed to list pods for Job: %s" , err ),
245
257
)
246
- return "" , fmt .Errorf ("failed to list pods for getSucceededJobLogs , error: %w" , err )
258
+ return false , fmt .Errorf ("failed to list pods for checkFailedJob , error: %w" , err )
247
259
}
248
260
249
- // Assuming there is only one succeeded pod, you can adjust the logic if needed
250
261
for _ , pod := range podList .Items {
251
- if pod .Status .Phase == corev1 .PodSucceeded {
262
+ if pod .Status .Phase == corev1 .PodFailed {
252
263
clientset , err := kubernetes .NewForConfig (r .Config )
253
264
if err != nil {
254
- return "" , fmt .Errorf ("failed to initialize clientset for getSucceededJobLogs , error: %w" , err )
265
+ return false , fmt .Errorf ("failed to initialize clientset for checkFailedJob , error: %w" , err )
255
266
}
256
267
257
- podLogs , err := clientset .CoreV1 ().
258
- Pods (storage .Namespace ).
259
- GetLogs (pod .Name , & corev1.PodLogOptions {}).
260
- Stream (context .TODO ())
268
+ podLogs , err := getPodLogs (ctx , clientset , storage .Namespace , pod .Name )
261
269
if err != nil {
262
- return "" , fmt .Errorf ("failed to stream logs from pod for getSucceededJobLogs , error: %w" , err )
270
+ return false , fmt .Errorf ("failed to get pod logs for checkFailedJob , error: %w" , err )
263
271
}
264
- defer podLogs .Close ()
265
-
266
- var logsBuilder strings.Builder
267
- buf := make ([]byte , 4096 )
268
- for {
269
- numBytes , err := podLogs .Read (buf )
270
- if numBytes == 0 && err != nil {
271
- break
272
- }
273
- logsBuilder .Write (buf [:numBytes ])
272
+
273
+ if mismatchItemConfigGenerationRegexp .MatchString (podLogs ) {
274
+ return true , nil
274
275
}
276
+ }
277
+ }
278
+ return false , nil
279
+ }
280
+
281
+ func getPodLogs (ctx context.Context , clientset * kubernetes.Clientset , namespace , name string ) (string , error ) {
282
+ var logsBuilder strings.Builder
283
+
284
+ streamCtx , cancel := context .WithTimeout (ctx , 10 * time .Second )
285
+ defer cancel ()
286
+ podLogs , err := clientset .CoreV1 ().
287
+ Pods (namespace ).
288
+ GetLogs (name , & corev1.PodLogOptions {}).
289
+ Stream (streamCtx )
290
+ if err != nil {
291
+ return "" , fmt .Errorf ("failed to stream GetLogs from pod %s/%s, error: %w" , namespace , name , err )
292
+ }
293
+ defer podLogs .Close ()
275
294
276
- return logsBuilder .String (), nil
295
+ buf := make ([]byte , 4096 )
296
+ for {
297
+ numBytes , err := podLogs .Read (buf )
298
+ if numBytes == 0 && err != nil {
299
+ break
277
300
}
301
+ logsBuilder .Write (buf [:numBytes ])
278
302
}
279
303
280
- return "" , errors . New ( "failed to get succeeded Pod for getSucceededJobLogs" )
304
+ return logsBuilder . String (), nil
281
305
}
282
306
283
307
func shouldIgnoreJobUpdate () resources.IgnoreChangesFunction {
0 commit comments