Skip to content

File tree

9 files changed

+163
-59
lines changed

9 files changed

+163
-59
lines changed
 

‎deploy/ydb-operator/Chart.yaml

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ type: application
1515
# This is the chart version. This version number should be incremented each time you make changes
1616
# to the chart and its templates, including the app version.
1717
# Versions are expected to follow Semantic Versioning (https://semver.org/)
18-
version: 0.5.15
18+
version: 0.5.16
1919

2020
# This is the version number of the application being deployed. This version number should be
2121
# incremented each time you make changes to the application. Versions are not expected to
2222
# follow Semantic Versioning. They should reflect the version the application is using.
2323
# It is recommended to use it with quotes.
24-
appVersion: "0.5.15"
24+
appVersion: "0.5.16"

‎e2e/tests/smoke_test.go

+13-9
Original file line numberDiff line numberDiff line change
@@ -474,15 +474,19 @@ var _ = Describe("Operator smoke test", func() {
474474
By("checking that all the storage pods are running and ready...")
475475
checkPodsRunningAndReady(ctx, "ydb-cluster", "kind-storage", storageSample.Spec.Nodes)
476476

477-
By("database can be healthily created after Frozen storage...")
478-
Expect(k8sClient.Create(ctx, databaseSample)).Should(Succeed())
479-
defer func() {
480-
Expect(k8sClient.Delete(ctx, databaseSample)).Should(Succeed())
481-
}()
482-
By("waiting until database is ready...")
483-
waitUntilDatabaseReady(ctx, databaseSample.Name, testobjects.YdbNamespace)
484-
By("checking that all the database pods are running and ready...")
485-
checkPodsRunningAndReady(ctx, "ydb-cluster", "kind-database", databaseSample.Spec.Nodes)
477+
/*
478+
// This test suite attempts to create a database on uninitialised storage
479+
480+
By("database can be healthily created after Frozen storage...")
481+
Expect(k8sClient.Create(ctx, databaseSample)).Should(Succeed())
482+
defer func() {
483+
Expect(k8sClient.Delete(ctx, databaseSample)).Should(Succeed())
484+
}()
485+
By("waiting until database is ready...")
486+
waitUntilDatabaseReady(ctx, databaseSample.Name, testobjects.YdbNamespace)
487+
By("checking that all the database pods are running and ready...")
488+
checkPodsRunningAndReady(ctx, "ydb-cluster", "kind-database", databaseSample.Spec.Nodes)
489+
*/
486490
})
487491

488492
It("create storage and database with nodeSets", func() {

‎internal/cms/tenant.go

+50-34
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,22 @@ import (
66
"fmt"
77

88
"github.com/ydb-platform/ydb-go-genproto/Ydb_Cms_V1"
9+
"github.com/ydb-platform/ydb-go-genproto/Ydb_Operation_V1"
910
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
1011
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Cms"
12+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations"
1113
ydb "github.com/ydb-platform/ydb-go-sdk/v3"
12-
ydbCredentials "github.com/ydb-platform/ydb-go-sdk/v3/credentials"
1314
"sigs.k8s.io/controller-runtime/pkg/log"
1415

1516
ydbv1alpha1 "github.com/ydb-platform/ydb-kubernetes-operator/api/v1alpha1"
1617
"github.com/ydb-platform/ydb-kubernetes-operator/internal/connection"
17-
"github.com/ydb-platform/ydb-kubernetes-operator/internal/resources"
1818
)
1919

2020
var ErrEmptyReplyFromStorage = errors.New("empty reply from storage")
2121

2222
type Tenant struct {
2323
StorageEndpoint string
24+
Domain string
2425
Path string
2526
StorageUnits []ydbv1alpha1.StorageUnit
2627
Shared bool
@@ -29,43 +30,29 @@ type Tenant struct {
2930

3031
func (t *Tenant) Create(
3132
ctx context.Context,
32-
database *resources.DatabaseBuilder,
33-
creds ydbCredentials.Credentials,
3433
opts ...ydb.Option,
35-
) error {
34+
) (string, error) {
3635
logger := log.FromContext(ctx)
37-
createDatabaseURL := fmt.Sprintf(
38-
"%s/%s",
39-
t.StorageEndpoint,
40-
database.Spec.Domain,
41-
)
42-
43-
db, err := connection.Open(ctx,
44-
createDatabaseURL,
45-
ydb.WithCredentials(creds),
46-
ydb.MergeOptions(opts...),
47-
)
36+
url := fmt.Sprintf("%s/%s", t.StorageEndpoint, t.Domain)
37+
conn, err := connection.Open(ctx, url, opts...)
4838
if err != nil {
4939
logger.Error(err, "Error connecting to YDB storage")
50-
return err
40+
return "", err
5141
}
5242
defer func() {
53-
connection.Close(ctx, db)
43+
connection.Close(ctx, conn)
5444
}()
5545

56-
client := Ydb_Cms_V1.NewCmsServiceClient(ydb.GRPCConn(db))
57-
logger.Info(fmt.Sprintf("creating tenant, url: %s", createDatabaseURL))
46+
client := Ydb_Cms_V1.NewCmsServiceClient(ydb.GRPCConn(conn))
47+
logger.Info(fmt.Sprintf("creating tenant, url: %s", url))
5848
request := t.makeCreateDatabaseRequest()
5949
logger.Info(fmt.Sprintf("creating tenant, request: %s", request))
6050
response, err := client.CreateDatabase(ctx, request)
6151
if err != nil {
62-
return err
63-
}
64-
if _, err := processDatabaseCreationResponse(response); err != nil {
65-
return err
52+
return "", err
6653
}
6754
logger.Info(fmt.Sprintf("creating tenant, response: %s", response))
68-
return nil
55+
return processDatabaseCreationOperation(response.Operation)
6956
}
7057

7158
func (t *Tenant) makeCreateDatabaseRequest() *Ydb_Cms.CreateDatabaseRequest {
@@ -101,17 +88,46 @@ func (t *Tenant) makeCreateDatabaseRequest() *Ydb_Cms.CreateDatabaseRequest {
10188
return request
10289
}
10390

104-
func processDatabaseCreationResponse(response *Ydb_Cms.CreateDatabaseResponse) (bool, error) {
105-
if response.Operation == nil {
106-
return false, ErrEmptyReplyFromStorage
91+
func processDatabaseCreationOperation(operation *Ydb_Operations.Operation) (string, error) {
92+
if operation == nil {
93+
return "", ErrEmptyReplyFromStorage
10794
}
108-
109-
if response.Operation.Status == Ydb.StatusIds_ALREADY_EXISTS || response.Operation.Status == Ydb.StatusIds_SUCCESS {
110-
return true, nil
95+
if !operation.Ready {
96+
return operation.Id, nil
97+
}
98+
if operation.Status == Ydb.StatusIds_ALREADY_EXISTS || operation.Status == Ydb.StatusIds_SUCCESS {
99+
return "", nil
111100
}
112-
if response.Operation.Status == Ydb.StatusIds_STATUS_CODE_UNSPECIFIED && len(response.Operation.Issues) == 0 {
113-
return true, nil
101+
return "", fmt.Errorf("YDB response error: %v %v", operation.Status, operation.Issues)
102+
}
103+
104+
func (t *Tenant) CheckCreateOperation(
105+
ctx context.Context,
106+
operationID string,
107+
opts ...ydb.Option,
108+
) (bool, error, error) {
109+
logger := log.FromContext(ctx)
110+
url := fmt.Sprintf("%s/%s", t.StorageEndpoint, t.Domain)
111+
conn, err := connection.Open(ctx, url, opts...)
112+
if err != nil {
113+
logger.Error(err, "Error connecting to YDB storage")
114+
return false, nil, err
114115
}
116+
defer func() {
117+
connection.Close(ctx, conn)
118+
}()
115119

116-
return false, fmt.Errorf("YDB response error: %v %v", response.Operation.Status, response.Operation.Issues)
120+
client := Ydb_Operation_V1.NewOperationServiceClient(ydb.GRPCConn(conn))
121+
request := &Ydb_Operations.GetOperationRequest{Id: operationID}
122+
logger.Info(fmt.Sprintf("checking operation, url: %s, operationId: %s, request: %s", url, operationID, request))
123+
response, err := client.GetOperation(ctx, request)
124+
if err != nil {
125+
return false, nil, err
126+
}
127+
logger.Info(fmt.Sprintf("checking operation, response: %s", response))
128+
if response.Operation == nil {
129+
return false, nil, ErrEmptyReplyFromStorage
130+
}
131+
oid, err := processDatabaseCreationOperation(response.Operation)
132+
return len(oid) == 0, err, nil
117133
}

‎internal/controllers/constants/constants.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,12 @@ const (
2525
StoragePausedCondition = "StoragePaused"
2626
StorageReadyCondition = "StorageReady"
2727

28-
DatabasePreparedCondition = "DatabasePrepared"
29-
DatabaseInitializedCondition = "DatabaseInitialized"
30-
DatabaseProvisionedCondition = "DatabaseProvisioned"
31-
DatabasePausedCondition = "DatabasePaused"
32-
DatabaseReadyCondition = "DatabaseReady"
28+
DatabasePreparedCondition = "DatabasePrepared"
29+
DatabaseInitializedCondition = "DatabaseInitialized"
30+
DatabaseProvisionedCondition = "DatabaseProvisioned"
31+
DatabasePausedCondition = "DatabasePaused"
32+
DatabaseReadyCondition = "DatabaseReady"
33+
CreateDatabaseOperationCondition = "CreateDatabaseOperation"
3334

3435
NodeSetPreparedCondition = "NodeSetPrepared"
3536
NodeSetProvisionedCondition = "NodeSetProvisioned"

‎internal/controllers/database/controller.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -164,11 +164,11 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
164164
).
165165
WithEventFilter(predicate.Or(
166166
predicate.GenerationChangedPredicate{},
167-
resources.IgnoreDeletetionPredicate(),
168167
resources.LastAppliedAnnotationPredicate(),
169168
resources.IsServicePredicate(),
170169
resources.IsSecretPredicate(),
171170
)).
171+
WithEventFilter(resources.IgnoreDeletetionPredicate()).
172172
Complete(r)
173173
}
174174

‎internal/controllers/database/init.go

+88-5
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66

7+
"github.com/ydb-platform/ydb-go-sdk/v3"
78
corev1 "k8s.io/api/core/v1"
89
apierrors "k8s.io/apimachinery/pkg/api/errors"
910
"k8s.io/apimachinery/pkg/api/meta"
@@ -62,9 +63,76 @@ func (r *Reconciler) setInitDatabaseCompleted(
6263
Reason: ReasonCompleted,
6364
Message: message,
6465
})
66+
meta.SetStatusCondition(&database.Status.Conditions, metav1.Condition{
67+
Type: CreateDatabaseOperationCondition,
68+
Status: metav1.ConditionTrue,
69+
Reason: ReasonCompleted,
70+
Message: "Tenant creation operation is completed",
71+
})
6572
return r.updateStatus(ctx, database, StatusUpdateRequeueDelay)
6673
}
6774

75+
func (r *Reconciler) checkCreateTenantOperation(
76+
ctx context.Context,
77+
database *resources.DatabaseBuilder,
78+
tenant *cms.Tenant,
79+
ydbOptions ydb.Option,
80+
) (bool, ctrl.Result, error) {
81+
condition := meta.FindStatusCondition(database.Status.Conditions, CreateDatabaseOperationCondition)
82+
if condition == nil || len(condition.Message) == 0 {
83+
// Something is wrong with the condition where we save operation id
84+
// retry create tenant
85+
meta.SetStatusCondition(&database.Status.Conditions, metav1.Condition{
86+
Type: CreateDatabaseOperationCondition,
87+
Status: metav1.ConditionTrue,
88+
Reason: ReasonNotRequired,
89+
})
90+
return r.updateStatus(ctx, database, DatabaseInitializationRequeueDelay)
91+
}
92+
operationID := condition.Message
93+
finished, operationErr, err := tenant.CheckCreateOperation(ctx, operationID, ydbOptions)
94+
if err != nil {
95+
r.Recorder.Event(
96+
database,
97+
corev1.EventTypeWarning,
98+
"InitializingFailed",
99+
fmt.Sprintf("Error creating tenant %s: %s", tenant.Path, err),
100+
)
101+
return Stop, ctrl.Result{RequeueAfter: DatabaseInitializationRequeueDelay}, err
102+
}
103+
if operationErr != nil {
104+
// Creation operation failed - retry Create Tenant
105+
r.Recorder.Event(
106+
database,
107+
corev1.EventTypeWarning,
108+
"InitializingFailed",
109+
fmt.Sprintf("Error creating tenant %s: %s", tenant.Path, operationErr),
110+
)
111+
meta.SetStatusCondition(&database.Status.Conditions, metav1.Condition{
112+
Type: CreateDatabaseOperationCondition,
113+
Status: metav1.ConditionTrue,
114+
Reason: ReasonNotRequired,
115+
})
116+
return r.updateStatus(ctx, database, DatabaseInitializationRequeueDelay)
117+
}
118+
if !finished {
119+
r.Recorder.Event(
120+
database,
121+
corev1.EventTypeWarning,
122+
"Pending",
123+
fmt.Sprintf("Tenant creation operation is not completed, operationID: %s", operationID),
124+
)
125+
return Stop, ctrl.Result{RequeueAfter: DatabaseInitializationRequeueDelay}, nil
126+
}
127+
r.Recorder.Event(
128+
database,
129+
corev1.EventTypeNormal,
130+
"Initialized",
131+
fmt.Sprintf("Tenant %s created", tenant.Path),
132+
)
133+
return r.setInitDatabaseCompleted(ctx, database, "Database initialized successfully")
134+
}
135+
68136
func (r *Reconciler) initializeTenant(
69137
ctx context.Context,
70138
database *resources.DatabaseBuilder,
@@ -139,8 +207,9 @@ func (r *Reconciler) initializeTenant(
139207
return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, ErrIncorrectDatabaseResourcesConfiguration
140208
}
141209

142-
tenant := cms.Tenant{
210+
tenant := &cms.Tenant{
143211
StorageEndpoint: database.Spec.StorageEndpoint,
212+
Domain: database.Spec.Domain,
144213
Path: path,
145214
StorageUnits: storageUnits,
146215
Shared: shared,
@@ -167,19 +236,33 @@ func (r *Reconciler) initializeTenant(
167236
)
168237
return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err
169238
}
239+
ydbOpts := ydb.MergeOptions(ydb.WithCredentials(creds), tlsOptions)
170240

171-
err = tenant.Create(ctx, database, creds, tlsOptions)
241+
if meta.IsStatusConditionFalse(database.Status.Conditions, CreateDatabaseOperationCondition) {
242+
return r.checkCreateTenantOperation(ctx, database, tenant, ydbOpts)
243+
}
244+
operationID, err := tenant.Create(ctx, ydb.WithCredentials(creds), tlsOptions)
172245
if err != nil {
173246
r.Recorder.Event(
174247
database,
175248
corev1.EventTypeWarning,
176249
"InitializingFailed",
177250
fmt.Sprintf("Error creating tenant %s: %s", tenant.Path, err),
178251
)
252+
return Stop, ctrl.Result{RequeueAfter: DatabaseInitializationRequeueDelay}, err
253+
}
254+
if len(operationID) > 0 {
255+
r.Recorder.Event(
256+
database,
257+
corev1.EventTypeWarning,
258+
"Pending",
259+
fmt.Sprintf("Tenant creation operation in progress, operationID: %s", operationID),
260+
)
179261
meta.SetStatusCondition(&database.Status.Conditions, metav1.Condition{
180-
Type: DatabaseInitializedCondition,
181-
Status: metav1.ConditionFalse,
182-
Reason: ReasonInProgress,
262+
Type: CreateDatabaseOperationCondition,
263+
Status: metav1.ConditionFalse,
264+
Reason: ReasonInProgress,
265+
Message: operationID,
183266
})
184267
return r.updateStatus(ctx, database, DatabaseInitializationRequeueDelay)
185268
}

‎internal/controllers/databasenodeset/controller.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,8 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
7070
Owns(&appsv1.StatefulSet{}).
7171
WithEventFilter(predicate.Or(
7272
predicate.GenerationChangedPredicate{},
73-
resources.IgnoreDeletetionPredicate(),
7473
resources.LastAppliedAnnotationPredicate()),
7574
).
75+
WithEventFilter(resources.IgnoreDeletetionPredicate()).
7676
Complete(r)
7777
}

‎internal/controllers/storage/controller.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -172,11 +172,11 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
172172
).
173173
WithEventFilter(predicate.Or(
174174
predicate.GenerationChangedPredicate{},
175-
resources.IgnoreDeletetionPredicate(),
176175
resources.LastAppliedAnnotationPredicate(),
177176
resources.IsServicePredicate(),
178177
resources.IsSecretPredicate(),
179178
)).
179+
WithEventFilter(resources.IgnoreDeletetionPredicate()).
180180
Complete(r)
181181
}
182182

‎internal/controllers/storagenodeset/controller.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
6969
Owns(&appsv1.StatefulSet{}).
7070
WithEventFilter(predicate.Or(
7171
predicate.GenerationChangedPredicate{},
72-
resources.IgnoreDeletetionPredicate(),
7372
resources.LastAppliedAnnotationPredicate()),
7473
).
74+
WithEventFilter(resources.IgnoreDeletetionPredicate()).
7575
Complete(r)
7676
}

0 commit comments

Comments
 (0)
Please sign in to comment.