Skip to content

Commit 04817aa

Browse files
committed
Merge branch 'master' into propagate-annotations
2 parents 62e8736 + 36ee6cf commit 04817aa

22 files changed

+333
-146
lines changed

deploy/ydb-operator/Chart.yaml

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ type: application
1515
# This is the chart version. This version number should be incremented each time you make changes
1616
# to the chart and its templates, including the app version.
1717
# Versions are expected to follow Semantic Versioning (https://semver.org/)
18-
version: 0.5.15
18+
version: 0.5.16
1919

2020
# This is the version number of the application being deployed. This version number should be
2121
# incremented each time you make changes to the application. Versions are not expected to
2222
# follow Semantic Versioning. They should reflect the version the application is using.
2323
# It is recommended to use it with quotes.
24-
appVersion: "0.5.15"
24+
appVersion: "0.5.16"

e2e/tests/smoke_test.go

+13-9
Original file line numberDiff line numberDiff line change
@@ -474,15 +474,19 @@ var _ = Describe("Operator smoke test", func() {
474474
By("checking that all the storage pods are running and ready...")
475475
checkPodsRunningAndReady(ctx, "ydb-cluster", "kind-storage", storageSample.Spec.Nodes)
476476

477-
By("database can be healthily created after Frozen storage...")
478-
Expect(k8sClient.Create(ctx, databaseSample)).Should(Succeed())
479-
defer func() {
480-
Expect(k8sClient.Delete(ctx, databaseSample)).Should(Succeed())
481-
}()
482-
By("waiting until database is ready...")
483-
waitUntilDatabaseReady(ctx, databaseSample.Name, testobjects.YdbNamespace)
484-
By("checking that all the database pods are running and ready...")
485-
checkPodsRunningAndReady(ctx, "ydb-cluster", "kind-database", databaseSample.Spec.Nodes)
477+
/*
478+
// This test suite attempts to create a database on uninitialised storage
479+
480+
By("database can be healthily created after Frozen storage...")
481+
Expect(k8sClient.Create(ctx, databaseSample)).Should(Succeed())
482+
defer func() {
483+
Expect(k8sClient.Delete(ctx, databaseSample)).Should(Succeed())
484+
}()
485+
By("waiting until database is ready...")
486+
waitUntilDatabaseReady(ctx, databaseSample.Name, testobjects.YdbNamespace)
487+
By("checking that all the database pods are running and ready...")
488+
checkPodsRunningAndReady(ctx, "ydb-cluster", "kind-database", databaseSample.Spec.Nodes)
489+
*/
486490
})
487491

488492
It("create storage and database with nodeSets", func() {

internal/cms/tenant.go

+50-34
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,22 @@ import (
66
"fmt"
77

88
"github.com/ydb-platform/ydb-go-genproto/Ydb_Cms_V1"
9+
"github.com/ydb-platform/ydb-go-genproto/Ydb_Operation_V1"
910
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb"
1011
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Cms"
12+
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Operations"
1113
ydb "github.com/ydb-platform/ydb-go-sdk/v3"
12-
ydbCredentials "github.com/ydb-platform/ydb-go-sdk/v3/credentials"
1314
"sigs.k8s.io/controller-runtime/pkg/log"
1415

1516
ydbv1alpha1 "github.com/ydb-platform/ydb-kubernetes-operator/api/v1alpha1"
1617
"github.com/ydb-platform/ydb-kubernetes-operator/internal/connection"
17-
"github.com/ydb-platform/ydb-kubernetes-operator/internal/resources"
1818
)
1919

2020
var ErrEmptyReplyFromStorage = errors.New("empty reply from storage")
2121

2222
type Tenant struct {
2323
StorageEndpoint string
24+
Domain string
2425
Path string
2526
StorageUnits []ydbv1alpha1.StorageUnit
2627
Shared bool
@@ -29,43 +30,29 @@ type Tenant struct {
2930

3031
func (t *Tenant) Create(
3132
ctx context.Context,
32-
database *resources.DatabaseBuilder,
33-
creds ydbCredentials.Credentials,
3433
opts ...ydb.Option,
35-
) error {
34+
) (string, error) {
3635
logger := log.FromContext(ctx)
37-
createDatabaseURL := fmt.Sprintf(
38-
"%s/%s",
39-
t.StorageEndpoint,
40-
database.Spec.Domain,
41-
)
42-
43-
db, err := connection.Open(ctx,
44-
createDatabaseURL,
45-
ydb.WithCredentials(creds),
46-
ydb.MergeOptions(opts...),
47-
)
36+
url := fmt.Sprintf("%s/%s", t.StorageEndpoint, t.Domain)
37+
conn, err := connection.Open(ctx, url, opts...)
4838
if err != nil {
4939
logger.Error(err, "Error connecting to YDB storage")
50-
return err
40+
return "", err
5141
}
5242
defer func() {
53-
connection.Close(ctx, db)
43+
connection.Close(ctx, conn)
5444
}()
5545

56-
client := Ydb_Cms_V1.NewCmsServiceClient(ydb.GRPCConn(db))
57-
logger.Info(fmt.Sprintf("creating tenant, url: %s", createDatabaseURL))
46+
client := Ydb_Cms_V1.NewCmsServiceClient(ydb.GRPCConn(conn))
47+
logger.Info(fmt.Sprintf("creating tenant, url: %s", url))
5848
request := t.makeCreateDatabaseRequest()
5949
logger.Info(fmt.Sprintf("creating tenant, request: %s", request))
6050
response, err := client.CreateDatabase(ctx, request)
6151
if err != nil {
62-
return err
63-
}
64-
if _, err := processDatabaseCreationResponse(response); err != nil {
65-
return err
52+
return "", err
6653
}
6754
logger.Info(fmt.Sprintf("creating tenant, response: %s", response))
68-
return nil
55+
return processDatabaseCreationOperation(response.Operation)
6956
}
7057

7158
func (t *Tenant) makeCreateDatabaseRequest() *Ydb_Cms.CreateDatabaseRequest {
@@ -101,17 +88,46 @@ func (t *Tenant) makeCreateDatabaseRequest() *Ydb_Cms.CreateDatabaseRequest {
10188
return request
10289
}
10390

104-
func processDatabaseCreationResponse(response *Ydb_Cms.CreateDatabaseResponse) (bool, error) {
105-
if response.Operation == nil {
106-
return false, ErrEmptyReplyFromStorage
91+
func processDatabaseCreationOperation(operation *Ydb_Operations.Operation) (string, error) {
92+
if operation == nil {
93+
return "", ErrEmptyReplyFromStorage
10794
}
108-
109-
if response.Operation.Status == Ydb.StatusIds_ALREADY_EXISTS || response.Operation.Status == Ydb.StatusIds_SUCCESS {
110-
return true, nil
95+
if !operation.Ready {
96+
return operation.Id, nil
97+
}
98+
if operation.Status == Ydb.StatusIds_ALREADY_EXISTS || operation.Status == Ydb.StatusIds_SUCCESS {
99+
return "", nil
111100
}
112-
if response.Operation.Status == Ydb.StatusIds_STATUS_CODE_UNSPECIFIED && len(response.Operation.Issues) == 0 {
113-
return true, nil
101+
return "", fmt.Errorf("YDB response error: %v %v", operation.Status, operation.Issues)
102+
}
103+
104+
func (t *Tenant) CheckCreateOperation(
105+
ctx context.Context,
106+
operationID string,
107+
opts ...ydb.Option,
108+
) (bool, error, error) {
109+
logger := log.FromContext(ctx)
110+
url := fmt.Sprintf("%s/%s", t.StorageEndpoint, t.Domain)
111+
conn, err := connection.Open(ctx, url, opts...)
112+
if err != nil {
113+
logger.Error(err, "Error connecting to YDB storage")
114+
return false, nil, err
114115
}
116+
defer func() {
117+
connection.Close(ctx, conn)
118+
}()
115119

116-
return false, fmt.Errorf("YDB response error: %v %v", response.Operation.Status, response.Operation.Issues)
120+
client := Ydb_Operation_V1.NewOperationServiceClient(ydb.GRPCConn(conn))
121+
request := &Ydb_Operations.GetOperationRequest{Id: operationID}
122+
logger.Info(fmt.Sprintf("checking operation, url: %s, operationId: %s, request: %s", url, operationID, request))
123+
response, err := client.GetOperation(ctx, request)
124+
if err != nil {
125+
return false, nil, err
126+
}
127+
logger.Info(fmt.Sprintf("checking operation, response: %s", response))
128+
if response.Operation == nil {
129+
return false, nil, ErrEmptyReplyFromStorage
130+
}
131+
oid, err := processDatabaseCreationOperation(response.Operation)
132+
return len(oid) == 0, err, nil
117133
}

internal/controllers/constants/constants.go

+6-5
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,12 @@ const (
2525
StoragePausedCondition = "StoragePaused"
2626
StorageReadyCondition = "StorageReady"
2727

28-
DatabasePreparedCondition = "DatabasePrepared"
29-
DatabaseInitializedCondition = "DatabaseInitialized"
30-
DatabaseProvisionedCondition = "DatabaseProvisioned"
31-
DatabasePausedCondition = "DatabasePaused"
32-
DatabaseReadyCondition = "DatabaseReady"
28+
DatabasePreparedCondition = "DatabasePrepared"
29+
DatabaseInitializedCondition = "DatabaseInitialized"
30+
DatabaseProvisionedCondition = "DatabaseProvisioned"
31+
DatabasePausedCondition = "DatabasePaused"
32+
DatabaseReadyCondition = "DatabaseReady"
33+
CreateDatabaseOperationCondition = "CreateDatabaseOperation"
3334

3435
NodeSetPreparedCondition = "NodeSetPrepared"
3536
NodeSetProvisionedCondition = "NodeSetProvisioned"

internal/controllers/database/controller.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -164,11 +164,11 @@ func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
164164
).
165165
WithEventFilter(predicate.Or(
166166
predicate.GenerationChangedPredicate{},
167-
resources.IgnoreDeletetionPredicate(),
168167
resources.LastAppliedAnnotationPredicate(),
169168
resources.IsServicePredicate(),
170169
resources.IsSecretPredicate(),
171170
)).
171+
WithEventFilter(resources.IgnoreDeletetionPredicate()).
172172
Complete(r)
173173
}
174174

internal/controllers/database/init.go

+88-5
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"strconv"
77

8+
"github.com/ydb-platform/ydb-go-sdk/v3"
89
corev1 "k8s.io/api/core/v1"
910
apierrors "k8s.io/apimachinery/pkg/api/errors"
1011
"k8s.io/apimachinery/pkg/api/meta"
@@ -66,9 +67,76 @@ func (r *Reconciler) setInitDatabaseCompleted(
6667
Reason: ReasonCompleted,
6768
Message: message,
6869
})
70+
meta.SetStatusCondition(&database.Status.Conditions, metav1.Condition{
71+
Type: CreateDatabaseOperationCondition,
72+
Status: metav1.ConditionTrue,
73+
Reason: ReasonCompleted,
74+
Message: "Tenant creation operation is completed",
75+
})
6976
return r.updateStatus(ctx, database, StatusUpdateRequeueDelay)
7077
}
7178

79+
func (r *Reconciler) checkCreateTenantOperation(
80+
ctx context.Context,
81+
database *resources.DatabaseBuilder,
82+
tenant *cms.Tenant,
83+
ydbOptions ydb.Option,
84+
) (bool, ctrl.Result, error) {
85+
condition := meta.FindStatusCondition(database.Status.Conditions, CreateDatabaseOperationCondition)
86+
if condition == nil || len(condition.Message) == 0 {
87+
// Something is wrong with the condition where we save operation id
88+
// retry create tenant
89+
meta.SetStatusCondition(&database.Status.Conditions, metav1.Condition{
90+
Type: CreateDatabaseOperationCondition,
91+
Status: metav1.ConditionTrue,
92+
Reason: ReasonNotRequired,
93+
})
94+
return r.updateStatus(ctx, database, DatabaseInitializationRequeueDelay)
95+
}
96+
operationID := condition.Message
97+
finished, operationErr, err := tenant.CheckCreateOperation(ctx, operationID, ydbOptions)
98+
if err != nil {
99+
r.Recorder.Event(
100+
database,
101+
corev1.EventTypeWarning,
102+
"InitializingFailed",
103+
fmt.Sprintf("Error creating tenant %s: %s", tenant.Path, err),
104+
)
105+
return Stop, ctrl.Result{RequeueAfter: DatabaseInitializationRequeueDelay}, err
106+
}
107+
if operationErr != nil {
108+
// Creation operation failed - retry Create Tenant
109+
r.Recorder.Event(
110+
database,
111+
corev1.EventTypeWarning,
112+
"InitializingFailed",
113+
fmt.Sprintf("Error creating tenant %s: %s", tenant.Path, operationErr),
114+
)
115+
meta.SetStatusCondition(&database.Status.Conditions, metav1.Condition{
116+
Type: CreateDatabaseOperationCondition,
117+
Status: metav1.ConditionTrue,
118+
Reason: ReasonNotRequired,
119+
})
120+
return r.updateStatus(ctx, database, DatabaseInitializationRequeueDelay)
121+
}
122+
if !finished {
123+
r.Recorder.Event(
124+
database,
125+
corev1.EventTypeWarning,
126+
"Pending",
127+
fmt.Sprintf("Tenant creation operation is not completed, operationID: %s", operationID),
128+
)
129+
return Stop, ctrl.Result{RequeueAfter: DatabaseInitializationRequeueDelay}, nil
130+
}
131+
r.Recorder.Event(
132+
database,
133+
corev1.EventTypeNormal,
134+
"Initialized",
135+
fmt.Sprintf("Tenant %s created", tenant.Path),
136+
)
137+
return r.setInitDatabaseCompleted(ctx, database, "Database initialized successfully")
138+
}
139+
72140
func (r *Reconciler) initializeTenant(
73141
ctx context.Context,
74142
database *resources.DatabaseBuilder,
@@ -143,8 +211,9 @@ func (r *Reconciler) initializeTenant(
143211
return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, ErrIncorrectDatabaseResourcesConfiguration
144212
}
145213

146-
tenant := cms.Tenant{
214+
tenant := &cms.Tenant{
147215
StorageEndpoint: database.Spec.StorageEndpoint,
216+
Domain: database.Spec.Domain,
148217
Path: path,
149218
StorageUnits: storageUnits,
150219
Shared: shared,
@@ -171,19 +240,33 @@ func (r *Reconciler) initializeTenant(
171240
)
172241
return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err
173242
}
243+
ydbOpts := ydb.MergeOptions(ydb.WithCredentials(creds), tlsOptions)
174244

175-
err = tenant.Create(ctx, database, creds, tlsOptions)
245+
if meta.IsStatusConditionFalse(database.Status.Conditions, CreateDatabaseOperationCondition) {
246+
return r.checkCreateTenantOperation(ctx, database, tenant, ydbOpts)
247+
}
248+
operationID, err := tenant.Create(ctx, ydb.WithCredentials(creds), tlsOptions)
176249
if err != nil {
177250
r.Recorder.Event(
178251
database,
179252
corev1.EventTypeWarning,
180253
"InitializingFailed",
181254
fmt.Sprintf("Error creating tenant %s: %s", tenant.Path, err),
182255
)
256+
return Stop, ctrl.Result{RequeueAfter: DatabaseInitializationRequeueDelay}, err
257+
}
258+
if len(operationID) > 0 {
259+
r.Recorder.Event(
260+
database,
261+
corev1.EventTypeWarning,
262+
"Pending",
263+
fmt.Sprintf("Tenant creation operation in progress, operationID: %s", operationID),
264+
)
183265
meta.SetStatusCondition(&database.Status.Conditions, metav1.Condition{
184-
Type: DatabaseInitializedCondition,
185-
Status: metav1.ConditionFalse,
186-
Reason: ReasonInProgress,
266+
Type: CreateDatabaseOperationCondition,
267+
Status: metav1.ConditionFalse,
268+
Reason: ReasonInProgress,
269+
Message: operationID,
187270
})
188271
return r.updateStatus(ctx, database, DatabaseInitializationRequeueDelay)
189272
}

0 commit comments

Comments
 (0)