diff --git a/Makefile b/Makefile index 763236e1..5c590350 100644 --- a/Makefile +++ b/Makefile @@ -10,7 +10,7 @@ IMG ?= cr.yandex/yc/ydb-operator:latest # Produce CRDs that work back to Kubernetes 1.11 (no version conversion) CRD_OPTIONS ?= "crd:trivialVersions=true,preserveUnknownFields=false" # ENVTEST_K8S_VERSION refers to the version of kubebuilder assets to be downloaded by envtest binary. -ENVTEST_K8S_VERSION = 1.21 +ENVTEST_K8S_VERSION = 1.26 # Get the currently used golang install path (in GOPATH/bin, unless GOBIN is set) ifeq (,$(shell go env GOBIN)) diff --git a/api/v1alpha1/remotestoragenodeset_types.go b/api/v1alpha1/remotestoragenodeset_types.go index c31395eb..2728bf24 100644 --- a/api/v1alpha1/remotestoragenodeset_types.go +++ b/api/v1alpha1/remotestoragenodeset_types.go @@ -23,7 +23,7 @@ type RemoteStorageNodeSet struct { Status RemoteStorageNodeSetStatus `json:"status,omitempty"` } -// DatabaseNodeSetStatus defines the observed state +// StorageNodeSetStatus defines the observed state type RemoteStorageNodeSetStatus struct { State constants.ClusterState `json:"state"` Conditions []metav1.Condition `json:"conditions,omitempty"` diff --git a/deploy/ydb-operator/Chart.yaml b/deploy/ydb-operator/Chart.yaml index eb351b4f..f31c7d12 100644 --- a/deploy/ydb-operator/Chart.yaml +++ b/deploy/ydb-operator/Chart.yaml @@ -15,10 +15,10 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.5.5 +version: 0.5.6 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. # It is recommended to use it with quotes. -appVersion: "0.5.5" +appVersion: "0.5.6" diff --git a/deploy/ydb-operator/crds/remotestoragenodeset.yaml b/deploy/ydb-operator/crds/remotestoragenodeset.yaml index 78bcae7d..00577e09 100644 --- a/deploy/ydb-operator/crds/remotestoragenodeset.yaml +++ b/deploy/ydb-operator/crds/remotestoragenodeset.yaml @@ -4498,7 +4498,7 @@ spec: status: default: state: Pending - description: DatabaseNodeSetStatus defines the observed state + description: StorageNodeSetStatus defines the observed state properties: conditions: items: diff --git a/internal/controllers/databasenodeset/controller.go b/internal/controllers/databasenodeset/controller.go index 545e7277..bfd278e5 100644 --- a/internal/controllers/databasenodeset/controller.go +++ b/internal/controllers/databasenodeset/controller.go @@ -39,16 +39,16 @@ type Reconciler struct { // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - logger := log.FromContext(ctx) + r.Log = log.FromContext(ctx) crDatabaseNodeSet := &v1alpha1.DatabaseNodeSet{} err := r.Get(ctx, req.NamespacedName, crDatabaseNodeSet) if err != nil { if apierrors.IsNotFound(err) { - logger.Info("DatabaseNodeSet resource not found") + r.Log.Info("DatabaseNodeSet resource not found") return ctrl.Result{Requeue: false}, nil } - logger.Error(err, "unable to get DatabaseNodeSet") + r.Log.Error(err, "unable to get DatabaseNodeSet") return ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err } diff --git a/internal/controllers/remotedatabasenodeset/controller.go b/internal/controllers/remotedatabasenodeset/controller.go index c6cf3e9c..c9479c25 100644 --- a/internal/controllers/remotedatabasenodeset/controller.go +++ b/internal/controllers/remotedatabasenodeset/controller.go @@ -54,15 +54,15 @@ type Reconciler struct { //+kubebuilder:rbac:groups=core,resources=secrets/status,verbs=get;update;patch func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - logger := log.FromContext(ctx) + r.Log = log.FromContext(ctx) remoteDatabaseNodeSet := &v1alpha1.RemoteDatabaseNodeSet{} if err := r.RemoteClient.Get(ctx, req.NamespacedName, remoteDatabaseNodeSet); err != nil { if apierrors.IsNotFound(err) { - logger.Info("RemoteDatabaseNodeSet resource not found on remote cluster") + r.Log.Info("RemoteDatabaseNodeSet resource not found on remote cluster") return ctrl.Result{Requeue: false}, nil } - logger.Error(err, "unable to get RemoteDatabaseNodeSet on remote cluster") + r.Log.Error(err, "unable to get RemoteDatabaseNodeSet on remote cluster") return ctrl.Result{RequeueAfter: DefaultRequeueDelay}, nil } @@ -225,29 +225,27 @@ func (r *Reconciler) deleteExternalResources( ctx context.Context, crRemoteDatabaseNodeSet *v1alpha1.RemoteDatabaseNodeSet, ) error { - logger := log.FromContext(ctx) - databaseNodeSet := &v1alpha1.DatabaseNodeSet{} if err := r.Client.Get(ctx, types.NamespacedName{ Name: crRemoteDatabaseNodeSet.Name, Namespace: crRemoteDatabaseNodeSet.Namespace, }, databaseNodeSet); err != nil { if apierrors.IsNotFound(err) { - logger.Info("DatabaseNodeSet not found") + r.Log.Info("DatabaseNodeSet not found") } else { - logger.Error(err, "unable to get DatabaseNodeSet") + r.Log.Error(err, "unable to get DatabaseNodeSet") return err } } else { if err := r.Client.Delete(ctx, databaseNodeSet); err != nil { - logger.Error(err, "unable to delete DatabaseNodeSet") + r.Log.Error(err, "unable to delete DatabaseNodeSet") return err } } remoteDatabaseNodeSet := resources.NewRemoteDatabaseNodeSet(crRemoteDatabaseNodeSet) if _, _, err := r.removeUnusedRemoteObjects(ctx, &remoteDatabaseNodeSet, []client.Object{}); err != nil { - logger.Error(err, "unable to delete unused remote resources") + r.Log.Error(err, "unable to delete unused remote resources") return err } diff --git a/internal/controllers/remotedatabasenodeset/controller_test.go b/internal/controllers/remotedatabasenodeset/controller_test.go index eb1064fb..48cd00a4 100644 --- a/internal/controllers/remotedatabasenodeset/controller_test.go +++ b/internal/controllers/remotedatabasenodeset/controller_test.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "path/filepath" + "reflect" "strings" "testing" "time" @@ -22,7 +23,6 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "sigs.k8s.io/controller-runtime/pkg/cluster" "sigs.k8s.io/controller-runtime/pkg/envtest" logf "sigs.k8s.io/controller-runtime/pkg/log" @@ -35,7 +35,9 @@ import ( "github.com/ydb-platform/ydb-kubernetes-operator/internal/controllers/database" "github.com/ydb-platform/ydb-kubernetes-operator/internal/controllers/databasenodeset" "github.com/ydb-platform/ydb-kubernetes-operator/internal/controllers/remotedatabasenodeset" + "github.com/ydb-platform/ydb-kubernetes-operator/internal/controllers/remotestoragenodeset" "github.com/ydb-platform/ydb-kubernetes-operator/internal/controllers/storage" + "github.com/ydb-platform/ydb-kubernetes-operator/internal/controllers/storagenodeset" "github.com/ydb-platform/ydb-kubernetes-operator/internal/resources" "github.com/ydb-platform/ydb-kubernetes-operator/internal/test" ) @@ -128,7 +130,6 @@ var _ = BeforeSuite(func() { Client: localManager.GetClient(), Scheme: localManager.GetScheme(), Config: localManager.GetConfig(), - Log: logf.Log, }).SetupWithManager(localManager) Expect(err).ShouldNot(HaveOccurred()) @@ -136,7 +137,6 @@ var _ = BeforeSuite(func() { Client: localManager.GetClient(), Scheme: localManager.GetScheme(), Config: localManager.GetConfig(), - Log: logf.Log, }).SetupWithManager(localManager) Expect(err).ShouldNot(HaveOccurred()) @@ -144,22 +144,32 @@ var _ = BeforeSuite(func() { Client: localManager.GetClient(), Scheme: localManager.GetScheme(), Config: localManager.GetConfig(), - Log: logf.Log, }).SetupWithManager(localManager) Expect(err).ShouldNot(HaveOccurred()) + err = (&storagenodeset.Reconciler{ + Client: remoteManager.GetClient(), + Scheme: remoteManager.GetScheme(), + Config: remoteManager.GetConfig(), + }).SetupWithManager(remoteManager) + Expect(err).ShouldNot(HaveOccurred()) + err = (&databasenodeset.Reconciler{ Client: remoteManager.GetClient(), Scheme: remoteManager.GetScheme(), Config: remoteManager.GetConfig(), - Log: logf.Log, }).SetupWithManager(remoteManager) Expect(err).ShouldNot(HaveOccurred()) + err = (&remotestoragenodeset.Reconciler{ + Client: remoteManager.GetClient(), + Scheme: remoteManager.GetScheme(), + }).SetupWithManager(remoteManager, &remoteCluster) + Expect(err).ShouldNot(HaveOccurred()) + err = (&remotedatabasenodeset.Reconciler{ Client: remoteManager.GetClient(), Scheme: remoteManager.GetScheme(), - Log: logf.Log, }).SetupWithManager(remoteManager, &remoteCluster) Expect(err).ShouldNot(HaveOccurred()) @@ -414,7 +424,7 @@ var _ = Describe("RemoteDatabaseNodeSet controller tests", func() { When("Created RemoteDatabaseNodeSet with Secrets in k8s-mgmt-cluster", func() { It("Should sync Secrets into k8s-data-cluster", func() { - By("create simple Secret in Database namespace") + By("create simple Secret in remote namespace") simpleSecret := &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: testSecretName, @@ -426,6 +436,58 @@ var _ = Describe("RemoteDatabaseNodeSet controller tests", func() { } Expect(localClient.Create(ctx, simpleSecret)) + By("checking that Storage updated on local cluster...") + Eventually(func() error { + foundStorage := &v1alpha1.Storage{} + Expect(localClient.Get(ctx, types.NamespacedName{ + Name: storageSample.Name, + Namespace: testobjects.YdbNamespace, + }, foundStorage)) + + foundStorage.Spec.NodeSets = append(foundStorage.Spec.NodeSets, v1alpha1.StorageNodeSetSpecInline{ + Name: testNodeSetName + "-remote", + Remote: &v1alpha1.RemoteSpec{ + Cluster: testRemoteCluster, + }, + StorageNodeSpec: v1alpha1.StorageNodeSpec{ + Nodes: 4, + }, + }) + + foundStorage.Spec.Secrets = append( + foundStorage.Spec.Secrets, + &corev1.LocalObjectReference{ + Name: testSecretName, + }, + ) + return localClient.Update(ctx, foundStorage) + }, test.Timeout, test.Interval).ShouldNot(HaveOccurred()) + + By("checking that RemoteStorageNodeSet created on local cluster...") + Eventually(func() error { + foundRemoteStorageNodeSet := &v1alpha1.RemoteStorageNodeSet{} + return localClient.Get(ctx, types.NamespacedName{ + Name: storageSample.Name + "-" + testNodeSetName + "-remote", + Namespace: testobjects.YdbNamespace, + }, foundRemoteStorageNodeSet) + }, test.Timeout, test.Interval).ShouldNot(HaveOccurred()) + + By("checking that StorageNodeSet created on remote cluster...") + Eventually(func() bool { + foundStorageNodeSetOnRemote := v1alpha1.StorageNodeSetList{} + + Expect(remoteClient.List(ctx, &foundStorageNodeSetOnRemote, client.InNamespace( + testobjects.YdbNamespace, + ))).Should(Succeed()) + + for _, nodeset := range foundStorageNodeSetOnRemote.Items { + if nodeset.Name == storageSample.Name+"-"+testNodeSetName+"-remote" { + return true + } + } + return false + }, test.Timeout, test.Interval).Should(BeTrue()) + By("checking that Database updated on local cluster...") Eventually(func() error { foundDatabase := &v1alpha1.Database{} @@ -445,6 +507,12 @@ var _ = Describe("RemoteDatabaseNodeSet controller tests", func() { By("checking that Secrets are synced...") Eventually(func() error { + foundRemoteStorageNodeSet := &v1alpha1.RemoteStorageNodeSet{} + Expect(localClient.Get(ctx, types.NamespacedName{ + Name: storageSample.Name + "-" + testNodeSetName + "-remote", + Namespace: testobjects.YdbNamespace, + }, foundRemoteStorageNodeSet)).Should(Succeed()) + foundRemoteDatabaseNodeSet := &v1alpha1.RemoteDatabaseNodeSet{} Expect(localClient.Get(ctx, types.NamespacedName{ Name: databaseSample.Name + "-" + testNodeSetName + "-remote", @@ -469,12 +537,20 @@ var _ = Describe("RemoteDatabaseNodeSet controller tests", func() { return err } - primaryResourceName, exist := remoteSecret.Annotations[ydbannotations.PrimaryResourceDatabaseAnnotation] + primaryResourceStorage, exist := remoteSecret.Annotations[ydbannotations.PrimaryResourceStorageAnnotation] + if !exist { + return fmt.Errorf("annotation %s does not exist on remoteSecret %s", ydbannotations.PrimaryResourceStorageAnnotation, remoteSecret.Name) + } + if primaryResourceStorage != foundRemoteStorageNodeSet.Spec.StorageRef.Name { + return fmt.Errorf("primaryResourceName %s does not equal storageRef name %s", primaryResourceStorage, foundRemoteDatabaseNodeSet.Spec.DatabaseRef.Name) + } + + primaryResourceDatabase, exist := remoteSecret.Annotations[ydbannotations.PrimaryResourceDatabaseAnnotation] if !exist { return fmt.Errorf("annotation %s does not exist on remoteSecret %s", ydbannotations.PrimaryResourceDatabaseAnnotation, remoteSecret.Name) } - if primaryResourceName != foundRemoteDatabaseNodeSet.Spec.DatabaseRef.Name { - return fmt.Errorf("primaryResourceName %s does not equal databaseRef name %s", primaryResourceName, foundRemoteDatabaseNodeSet.Spec.DatabaseRef.Name) + if primaryResourceDatabase != foundRemoteDatabaseNodeSet.Spec.DatabaseRef.Name { + return fmt.Errorf("primaryResourceName %s does not equal databaseRef name %s", primaryResourceDatabase, foundRemoteDatabaseNodeSet.Spec.DatabaseRef.Name) } remoteRV, exist := remoteSecret.Annotations[ydbannotations.RemoteResourceVersionAnnotation] @@ -487,6 +563,159 @@ var _ = Describe("RemoteDatabaseNodeSet controller tests", func() { return nil }, test.Timeout, test.Interval).ShouldNot(HaveOccurred()) + + By("delete RemoteDatabaseNodeSet on local cluster...") + Eventually(func() error { + foundDatabase := v1alpha1.Database{} + Expect(localClient.Get(ctx, types.NamespacedName{ + Name: databaseSample.Name, + Namespace: testobjects.YdbNamespace, + }, &foundDatabase)).Should(Succeed()) + foundDatabase.Spec.NodeSets = []v1alpha1.DatabaseNodeSetSpecInline{ + { + Name: testNodeSetName + "-local", + DatabaseNodeSpec: v1alpha1.DatabaseNodeSpec{ + Nodes: 4, + }, + }, + { + Name: testNodeSetName + "-remote-dedicated", + Remote: &v1alpha1.RemoteSpec{ + Cluster: testRemoteCluster, + }, + DatabaseNodeSpec: v1alpha1.DatabaseNodeSpec{ + Nodes: 4, + }, + }, + } + return localClient.Update(ctx, &foundDatabase) + }, test.Timeout, test.Interval).Should(Succeed()) + + By("delete RemoteStorageNodeSet on local cluster...") + Eventually(func() error { + foundStorage := v1alpha1.Storage{} + Expect(localClient.Get(ctx, types.NamespacedName{ + Name: storageSample.Name, + Namespace: testobjects.YdbNamespace, + }, &foundStorage)).Should(Succeed()) + foundStorage.Spec.NodeSets = []v1alpha1.StorageNodeSetSpecInline{} + return localClient.Update(ctx, &foundStorage) + }, test.Timeout, test.Interval).Should(Succeed()) + + By("checking that Secrets are synced after RemoteStorageNodeSet and RemoteDatabaseNodeSet delete...") + Eventually(func() error { + foundRemoteDatabaseNodeSet := &v1alpha1.RemoteDatabaseNodeSet{} + Expect(localClient.Get(ctx, types.NamespacedName{ + Name: databaseSample.Name + "-" + testNodeSetName + "-remote-dedicated", + Namespace: testobjects.YdbNamespace, + }, foundRemoteDatabaseNodeSet)).Should(Succeed()) + + localSecret := &corev1.Secret{} + err := localClient.Get(ctx, types.NamespacedName{ + Name: testSecretName, + Namespace: testobjects.YdbNamespace, + }, localSecret) + if err != nil { + return err + } + + remoteSecret := &corev1.Secret{} + err = remoteClient.Get(ctx, types.NamespacedName{ + Name: testSecretName, + Namespace: testobjects.YdbNamespace, + }, remoteSecret) + if err != nil { + return err + } + + _, exist := remoteSecret.Annotations[ydbannotations.PrimaryResourceStorageAnnotation] + if exist { + return fmt.Errorf("annotation %s still exist on remoteSecret %s", ydbannotations.PrimaryResourceStorageAnnotation, remoteSecret.Name) + } + + primaryResourceDatabase, exist := remoteSecret.Annotations[ydbannotations.PrimaryResourceDatabaseAnnotation] + if !exist { + return fmt.Errorf("annotation %s does not exist on remoteSecret %s", ydbannotations.PrimaryResourceDatabaseAnnotation, remoteSecret.Name) + } + if primaryResourceDatabase != foundRemoteDatabaseNodeSet.Spec.DatabaseRef.Name { + return fmt.Errorf("primaryResourceName %s does not equal databaseRef name %s", primaryResourceDatabase, foundRemoteDatabaseNodeSet.Spec.DatabaseRef.Name) + } + + remoteRV, exist := remoteSecret.Annotations[ydbannotations.RemoteResourceVersionAnnotation] + if !exist { + return fmt.Errorf("annotation %s does not exist on remoteSecret %s", ydbannotations.RemoteResourceVersionAnnotation, remoteSecret.Name) + } + if localSecret.GetResourceVersion() != remoteRV { + return fmt.Errorf("localRV %s does not equal remoteRV %s", localSecret.GetResourceVersion(), remoteRV) + } + + return nil + }, test.Timeout, test.Interval).ShouldNot(HaveOccurred()) + + By("update Secret in remote namespace") + Eventually(func() error { + localSecret := &corev1.Secret{} + err := localClient.Get(ctx, types.NamespacedName{ + Name: testSecretName, + Namespace: testobjects.YdbNamespace, + }, localSecret) + if err != nil { + return err + } + localSecret.StringData = map[string]string{ + "message": "Updated message from k8s-mgmt-cluster", + } + return localClient.Update(ctx, localSecret) + }, test.Timeout, test.Interval).Should(Succeed()) + + By("checking that Secrets are synced after update...") + Eventually(func() error { + foundRemoteDatabaseNodeSet := &v1alpha1.RemoteDatabaseNodeSet{} + Expect(localClient.Get(ctx, types.NamespacedName{ + Name: databaseSample.Name + "-" + testNodeSetName + "-remote-dedicated", + Namespace: testobjects.YdbNamespace, + }, foundRemoteDatabaseNodeSet)).Should(Succeed()) + + localSecret := &corev1.Secret{} + err := localClient.Get(ctx, types.NamespacedName{ + Name: testSecretName, + Namespace: testobjects.YdbNamespace, + }, localSecret) + if err != nil { + return err + } + + remoteSecret := &corev1.Secret{} + err = remoteClient.Get(ctx, types.NamespacedName{ + Name: testSecretName, + Namespace: testobjects.YdbNamespace, + }, remoteSecret) + if err != nil { + return err + } + + primaryResourceDatabase, exist := remoteSecret.Annotations[ydbannotations.PrimaryResourceDatabaseAnnotation] + if !exist { + return fmt.Errorf("annotation %s does not exist on remoteSecret %s", ydbannotations.PrimaryResourceDatabaseAnnotation, remoteSecret.Name) + } + if primaryResourceDatabase != foundRemoteDatabaseNodeSet.Spec.DatabaseRef.Name { + return fmt.Errorf("primaryResourceName %s does not equal databaseRef name %s", primaryResourceDatabase, foundRemoteDatabaseNodeSet.Spec.DatabaseRef.Name) + } + + remoteRV, exist := remoteSecret.Annotations[ydbannotations.RemoteResourceVersionAnnotation] + if !exist { + return fmt.Errorf("annotation %s does not exist on remoteSecret %s", ydbannotations.RemoteResourceVersionAnnotation, remoteSecret.Name) + } + if localSecret.GetResourceVersion() != remoteRV { + return fmt.Errorf("localRV %s does not equal remoteRV %s", localSecret.GetResourceVersion(), remoteRV) + } + + if !reflect.DeepEqual(localSecret.StringData, remoteSecret.StringData) { + return fmt.Errorf("localSecret StringData %s does not equal with remoteSecret %s", localSecret.StringData, remoteSecret.StringData) + } + + return nil + }, test.Timeout, test.Interval).ShouldNot(HaveOccurred()) }) }) @@ -534,16 +763,11 @@ var _ = Describe("RemoteDatabaseNodeSet controller tests", func() { Namespace: testobjects.YdbNamespace, }, &foundConfigMap)).Should(Succeed()) - gvk, err := apiutil.GVKForObject(foundConfigMap.DeepCopy(), scheme.Scheme) - Expect(err).ShouldNot(HaveOccurred()) - for idx := range foundRemoteDatabaseNodeSet.Status.RemoteResources { remoteResource := foundRemoteDatabaseNodeSet.Status.RemoteResources[idx] if resources.EqualRemoteResourceWithObject( &remoteResource, - testobjects.YdbNamespace, foundConfigMap.DeepCopy(), - gvk, ) { if meta.IsStatusConditionPresentAndEqual( remoteResource.Conditions, @@ -571,16 +795,11 @@ var _ = Describe("RemoteDatabaseNodeSet controller tests", func() { Namespace: testobjects.YdbNamespace, }, &foundConfigMap)).Should(Succeed()) - gvk, err := apiutil.GVKForObject(foundConfigMap.DeepCopy(), scheme.Scheme) - Expect(err).ShouldNot(HaveOccurred()) - for idx := range foundRemoteDatabaseNodeSet.Status.RemoteResources { remoteResource := foundRemoteDatabaseNodeSet.Status.RemoteResources[idx] if resources.EqualRemoteResourceWithObject( &remoteResource, - testobjects.YdbNamespace, foundConfigMap.DeepCopy(), - gvk, ) { if meta.IsStatusConditionPresentAndEqual( remoteResource.Conditions, @@ -733,6 +952,16 @@ func deleteAll(env *envtest.Environment, k8sClient client.Client, objs ...client Expect(client.IgnoreNotFound(ignoreMethodNotAllowed(err))).ShouldNot(HaveOccurred()) } + // Delete all Services in this namespace + serviceList := corev1.ServiceList{} + err = k8sClient.List(ctx, &serviceList, client.InNamespace(ns.Name)) + Expect(err).ShouldNot(HaveOccurred()) + for idx := range serviceList.Items { + policy := metav1.DeletePropagationForeground + err = k8sClient.Delete(ctx, &serviceList.Items[idx], &client.DeleteOptions{PropagationPolicy: &policy}) + Expect(err).ShouldNot(HaveOccurred()) + } + Eventually(func() error { key := client.ObjectKeyFromObject(ns) if err := k8sClient.Get(ctx, key, ns); err != nil { diff --git a/internal/controllers/remotedatabasenodeset/remote_objects.go b/internal/controllers/remotedatabasenodeset/remote_objects.go index bcecef09..d600e93d 100644 --- a/internal/controllers/remotedatabasenodeset/remote_objects.go +++ b/internal/controllers/remotedatabasenodeset/remote_objects.go @@ -3,18 +3,17 @@ package remotedatabasenodeset import ( "context" "fmt" - "reflect" + "time" - "github.com/banzaicloud/k8s-objectmatcher/patch" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "github.com/ydb-platform/ydb-kubernetes-operator/api/v1alpha1" ydbannotations "github.com/ydb-platform/ydb-kubernetes-operator/internal/annotations" @@ -23,67 +22,32 @@ import ( "github.com/ydb-platform/ydb-kubernetes-operator/internal/resources" ) -var ( - annotator = patch.NewAnnotator(ydbannotations.LastAppliedAnnotation) - patchMaker = patch.NewPatchMaker(annotator) -) - -func (r *Reconciler) initRemoteResourcesStatus( +func (r *Reconciler) initRemoteObjectsStatus( ctx context.Context, remoteDatabaseNodeSet *resources.RemoteDatabaseNodeSetResource, remoteObjects []client.Object, ) (bool, ctrl.Result, error) { - r.Log.Info("running step initRemoteResourcesStatus") - - syncedResources := []v1alpha1.RemoteResource{} - // copy actual slice to local variable - if remoteDatabaseNodeSet.Status.RemoteResources != nil { - syncedResources = append(syncedResources, remoteDatabaseNodeSet.Status.RemoteResources...) - } - - for idx := range remoteObjects { - remoteObj := remoteObjects[idx] - remoteObjGVK, err := apiutil.GVKForObject(remoteObj, r.Scheme) - if err != nil { - r.Recorder.Event( - remoteDatabaseNodeSet, - corev1.EventTypeWarning, - "ControllerError", - fmt.Sprintf("Failed to recognize GVK for remote object %s with name %s: %s", remoteObjGVK.Kind, remoteObj.GetName(), err), - ) - return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err - } + r.Log.Info("running step initRemoteObjectsStatus") + for _, remoteObj := range remoteObjects { existInStatus := false - for i := range syncedResources { - syncedResource := syncedResources[i] + for idx := range remoteDatabaseNodeSet.Status.RemoteResources { if resources.EqualRemoteResourceWithObject( - &syncedResource, - remoteDatabaseNodeSet.Namespace, + &remoteDatabaseNodeSet.Status.RemoteResources[idx], remoteObj, - remoteObjGVK, ) { existInStatus = true break } } - if !existInStatus { - remoteDatabaseNodeSet.Status.RemoteResources = append( - remoteDatabaseNodeSet.Status.RemoteResources, - v1alpha1.RemoteResource{ - Group: remoteObjGVK.Group, - Version: remoteObjGVK.Version, - Kind: remoteObjGVK.Kind, - Name: remoteObj.GetName(), - State: ResourceSyncPending, - Conditions: []metav1.Condition{}, - }, - ) + remoteDatabaseNodeSet.CreateRemoteResourceStatus(remoteObj) + return r.updateStatusRemoteObjects(ctx, remoteDatabaseNodeSet, StatusUpdateRequeueDelay) } } - return r.updateRemoteResourcesStatus(ctx, remoteDatabaseNodeSet) + r.Log.Info("complete step initRemoteObjectsStatus") + return Continue, ctrl.Result{}, nil } func (r *Reconciler) syncRemoteObjects( @@ -94,144 +58,142 @@ func (r *Reconciler) syncRemoteObjects( r.Log.Info("running step syncRemoteObjects") for _, remoteObj := range remoteObjects { - // Determine actual GVK for generic client.Object - remoteObjGVK, err := apiutil.GVKForObject(remoteObj, r.Scheme) - if err != nil { - r.Recorder.Event( - remoteDatabaseNodeSet, - corev1.EventTypeWarning, - "ControllerError", - fmt.Sprintf("Failed to recognize GVK for remote object %s with name %s: %s", remoteObjGVK.Kind, remoteObj.GetName(), err), - ) - return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err + remoteObjName := remoteObj.GetName() + remoteObjKind := remoteObj.GetObjectKind().GroupVersionKind().Kind + remoteObjRV := remoteObj.GetResourceVersion() + var remoteResource *v1alpha1.RemoteResource + for idx := range remoteDatabaseNodeSet.Status.RemoteResources { + if resources.EqualRemoteResourceWithObject(&remoteDatabaseNodeSet.Status.RemoteResources[idx], remoteObj) { + remoteResource = &remoteDatabaseNodeSet.Status.RemoteResources[idx] + break + } } // Get object to sync from remote cluster - err = r.RemoteClient.Get(ctx, types.NamespacedName{ + remoteGetErr := r.RemoteClient.Get(ctx, types.NamespacedName{ Name: remoteObj.GetName(), Namespace: remoteObj.GetNamespace(), }, remoteObj) - if err != nil { - // Resource not found on remote cluster but we should retry - if apierrors.IsNotFound(err) { + // Resource not found on remote cluster or internal kubernetes error + if remoteGetErr != nil { + if apierrors.IsNotFound(remoteGetErr) { r.Recorder.Event( remoteDatabaseNodeSet, corev1.EventTypeWarning, "ProvisioningFailed", - fmt.Sprintf("Resource %s with name %s was not found on remote cluster: %s", remoteObjGVK.Kind, remoteObj.GetName(), err), + fmt.Sprintf("Resource %s with name %s was not found on remote cluster: %s", remoteObjKind, remoteObjName, remoteGetErr), ) r.RemoteRecorder.Event( remoteDatabaseNodeSet, corev1.EventTypeWarning, "ProvisioningFailed", - fmt.Sprintf("Resource %s with name %s was not found: %s", remoteObjGVK.Kind, remoteObj.GetName(), err), + fmt.Sprintf("Resource %s with name %s was not found: %s", remoteObjKind, remoteObjName, remoteGetErr), ) } else { r.Recorder.Event( remoteDatabaseNodeSet, corev1.EventTypeWarning, "ControllerError", - fmt.Sprintf("Failed to get resource %s with name %s on remote cluster: %s", remoteObjGVK.Kind, remoteObj.GetName(), err), + fmt.Sprintf("Failed to get resource %s with name %s on remote cluster: %s", remoteObjKind, remoteObjName, remoteGetErr), ) r.RemoteRecorder.Event( remoteDatabaseNodeSet, corev1.EventTypeWarning, "ControllerError", - fmt.Sprintf("Failed to get resource %s with name %s: %s", remoteObjGVK.Kind, remoteObj.GetName(), err), + fmt.Sprintf("Failed to get resource %s with name %s: %s", remoteObjKind, remoteObjName, remoteGetErr), ) } - return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err + remoteDatabaseNodeSet.UpdateRemoteResourceStatus(remoteResource, metav1.ConditionFalse, remoteObjRV) + return r.updateStatusRemoteObjects(ctx, remoteDatabaseNodeSet, DefaultRequeueDelay) } - // Create client.Object from api.RemoteResource struct - localObj := resources.CreateResource(remoteObj) - remoteDatabaseNodeSet.SetPrimaryResourceAnnotations(localObj) // Check object existence in local cluster - err = r.Client.Get(ctx, types.NamespacedName{ - Name: remoteObj.GetName(), - Namespace: remoteObj.GetNamespace(), + localObj := resources.CreateResource(remoteObj) + getErr := r.Client.Get(ctx, types.NamespacedName{ + Name: localObj.GetName(), + Namespace: localObj.GetNamespace(), }, localObj) - //nolint:nestif - if err != nil { - if !apierrors.IsNotFound(err) { - r.Recorder.Event( - remoteDatabaseNodeSet, - corev1.EventTypeWarning, - "ControllerError", - fmt.Sprintf("Failed to get resource %s with name %s: %s", remoteObjGVK.Kind, remoteObj.GetName(), err), - ) - return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err - } - // Object does not exist in local cluster - // Try to create resource in remote cluster - if err := r.Client.Create(ctx, localObj); err != nil { + + // Handler for kubernetes internal error + if getErr != nil && !apierrors.IsNotFound(getErr) { + r.Recorder.Event( + remoteDatabaseNodeSet, + corev1.EventTypeWarning, + "ControllerError", + fmt.Sprintf("Failed to get resource %s with name %s: %s", remoteObjKind, remoteObjName, getErr), + ) + remoteDatabaseNodeSet.UpdateRemoteResourceStatus(remoteResource, metav1.ConditionFalse, remoteObjRV) + return r.updateStatusRemoteObjects(ctx, remoteDatabaseNodeSet, DefaultRequeueDelay) + } + + // Try to create non-existing remote object in local cluster + if apierrors.IsNotFound(getErr) { + createErr := r.Client.Create(ctx, localObj) + if createErr != nil { r.Recorder.Event( remoteDatabaseNodeSet, corev1.EventTypeWarning, "ControllerError", - fmt.Sprintf("Failed to create resource %s with name %s: %s", remoteObjGVK.Kind, remoteObj.GetName(), err), + fmt.Sprintf("Failed to create resource %s with name %s: %s", remoteObjKind, remoteObjName, getErr), ) - return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, nil + remoteDatabaseNodeSet.UpdateRemoteResourceStatus(remoteResource, metav1.ConditionFalse, remoteObjRV) + return r.updateStatusRemoteObjects(ctx, remoteDatabaseNodeSet, DefaultRequeueDelay) } r.Recorder.Event( remoteDatabaseNodeSet, corev1.EventTypeNormal, "Provisioning", - fmt.Sprintf("RemoteSync CREATE resource %s with name %s", remoteObjGVK.Kind, remoteObj.GetName()), + fmt.Sprintf("RemoteSync CREATE resource %s with name %s", remoteObjKind, remoteObjName), ) - } else { - // Update client.Object for local object with spec from remote object - updatedObj := resources.UpdateResource(localObj, remoteObj) - remoteDatabaseNodeSet.SetPrimaryResourceAnnotations(updatedObj) - // Remote object existing in local cluster, сheck the need for an update - // Get diff resources and compare bytes by k8s-objectmatcher PatchMaker - patchResult, err := patchMaker.Calculate(localObj, updatedObj, - []patch.CalculateOption{ - patch.IgnoreStatusFields(), - }..., + remoteDatabaseNodeSet.UpdateRemoteResourceStatus(remoteResource, metav1.ConditionFalse, remoteObjRV) + return r.updateStatusRemoteObjects(ctx, remoteDatabaseNodeSet, StatusUpdateRequeueDelay) + } + + // Get patch diff between remote object and existing object + remoteDatabaseNodeSet.SetPrimaryResourceAnnotations(remoteObj) + patchResult, patchErr := resources.GetPatchResult(localObj, remoteObj) + if patchErr != nil { + r.Recorder.Event( + remoteDatabaseNodeSet, + corev1.EventTypeWarning, + "ControllerError", + fmt.Sprintf("Failed to get diff for remote resource %s with name %s: %s", remoteObjKind, remoteObjName, patchErr), ) - if err != nil { + remoteDatabaseNodeSet.UpdateRemoteResourceStatus(remoteResource, metav1.ConditionFalse, remoteObjRV) + return r.updateStatusRemoteObjects(ctx, remoteDatabaseNodeSet, DefaultRequeueDelay) + } + + // Try to update existing object in local cluster by rawPatch + if !patchResult.IsEmpty() { + updateErr := r.Client.Patch(ctx, localObj, client.RawPatch(types.StrategicMergePatchType, patchResult.Patch)) + if updateErr != nil { r.Recorder.Event( remoteDatabaseNodeSet, corev1.EventTypeWarning, "ControllerError", - fmt.Sprintf("Failed to get diff for remote resource %s with name %s: %s", remoteObjGVK.Kind, remoteObj.GetName(), err), - ) - return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, nil - } - // We need to check patchResult by k8s-objectmatcher and resourceVersion from annotation - // And update if localObj does not match updatedObj from remote cluster - if !patchResult.IsEmpty() || - remoteObj.GetResourceVersion() != localObj.GetAnnotations()[ydbannotations.RemoteResourceVersionAnnotation] { - r.Recorder.Event( - remoteDatabaseNodeSet, - corev1.EventTypeNormal, - "Provisioning", - fmt.Sprintf("Patch for resource %s with name %s: %s", remoteObjGVK.Kind, remoteObj.GetName(), string(patchResult.Patch)), - ) - // Try to update resource in local cluster - if err := r.Client.Update(ctx, updatedObj); err != nil { - r.Recorder.Event( - remoteDatabaseNodeSet, - corev1.EventTypeWarning, - "ControllerError", - fmt.Sprintf("Failed to update resource %s with name %s: %s", remoteObjGVK.Kind, remoteObj.GetName(), err), - ) - return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, nil - } - r.Recorder.Event( - remoteDatabaseNodeSet, - corev1.EventTypeNormal, - "Provisioning", - fmt.Sprintf("RemoteSync UPDATE resource %s with name %s resourceVersion %s", remoteObjGVK.Kind, remoteObj.GetName(), remoteObj.GetResourceVersion()), + fmt.Sprintf("Failed to update resource %s with name %s: %v", remoteObjKind, remoteObjName, updateErr), ) + remoteDatabaseNodeSet.UpdateRemoteResourceStatus(remoteResource, metav1.ConditionFalse, remoteObjRV) + return r.updateStatusRemoteObjects(ctx, remoteDatabaseNodeSet, DefaultRequeueDelay) } + r.Recorder.Event( + remoteDatabaseNodeSet, + corev1.EventTypeNormal, + "Provisioning", + fmt.Sprintf("RemoteSync UPDATE resource %s with name %s resourceVersion %s", remoteObjKind, remoteObjName, remoteObj.GetResourceVersion()), + ) + remoteDatabaseNodeSet.UpdateRemoteResourceStatus(remoteResource, metav1.ConditionTrue, remoteObjRV) + return r.updateStatusRemoteObjects(ctx, remoteDatabaseNodeSet, StatusUpdateRequeueDelay) + } + + if !meta.IsStatusConditionTrue(remoteResource.Conditions, RemoteResourceSyncedCondition) { + remoteDatabaseNodeSet.UpdateRemoteResourceStatus(remoteResource, metav1.ConditionTrue, remoteObjRV) + return r.updateStatusRemoteObjects(ctx, remoteDatabaseNodeSet, StatusUpdateRequeueDelay) } - // Set status for remote resource in RemoteDatabaseNodeSet object - remoteDatabaseNodeSet.SetRemoteResourceStatus(localObj, remoteObjGVK) } - return r.updateRemoteResourcesStatus(ctx, remoteDatabaseNodeSet) + r.Log.Info("complete step syncRemoteObjects") + return Continue, ctrl.Result{}, nil } func (r *Reconciler) removeUnusedRemoteObjects( @@ -239,156 +201,195 @@ func (r *Reconciler) removeUnusedRemoteObjects( remoteDatabaseNodeSet *resources.RemoteDatabaseNodeSetResource, remoteObjects []client.Object, ) (bool, ctrl.Result, error) { - r.Log.Info("running step removeUnusedRemoteObjects") + r.Log.Info("running step removeUnusedRemoteResources") + // We should check every remote resource to need existence in cluster // Get processed remote resources from object Status candidatesToDelete := []v1alpha1.RemoteResource{} - - // Remove remote resource from candidates to delete if it declared - // to using in current RemoteDatabaseNodeSet spec for idx := range remoteDatabaseNodeSet.Status.RemoteResources { - remoteResource := remoteDatabaseNodeSet.Status.RemoteResources[idx] + // Remove remote resource from candidates to delete if it declared + // to using in current RemoteDatabaseNodeSet spec existInSpec := false - for i := range remoteObjects { - declaredObj := remoteObjects[i] - declaredObjGVK, err := apiutil.GVKForObject(declaredObj, r.Scheme) - if err != nil { - return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err - } + for _, remoteObj := range remoteObjects { if resources.EqualRemoteResourceWithObject( - &remoteResource, - remoteDatabaseNodeSet.Namespace, - declaredObj, - declaredObjGVK, + &remoteDatabaseNodeSet.Status.RemoteResources[idx], + remoteObj, ) { existInSpec = true break } } if !existInSpec { - candidatesToDelete = append(candidatesToDelete, remoteResource) + candidatesToDelete = append(candidatesToDelete, remoteDatabaseNodeSet.Status.RemoteResources[idx]) } } - // Check resources usage in another DatabaseNodeSet and make List request - // only if we have candidates to Delete - resourcesToDelete := []v1alpha1.RemoteResource{} - if len(candidatesToDelete) > 0 { - resourcesUsedInAnotherObject, err := r.getRemoteObjectsUsedInNamespace(ctx, remoteDatabaseNodeSet, remoteObjects) + if len(candidatesToDelete) == 0 { + r.Log.Info("complete step removeUnusedRemoteObjects") + return Continue, ctrl.Result{}, nil + } + + existInDatabase := false + anotherDatabaseNodeSets, err := r.getAnotherDatabaseNodeSets(ctx, remoteDatabaseNodeSet) + if err != nil { + r.Recorder.Event( + remoteDatabaseNodeSet, + corev1.EventTypeWarning, + "ControllerError", + fmt.Sprintf("Failed to check remote resource usage in another: %v", err), + ) + return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err + } + + if len(anotherDatabaseNodeSets) > 0 { + existInDatabase = true + } + + // Check RemoteResource usage in DatabaseNodeSet + for _, remoteResource := range candidatesToDelete { + remoteObj, err := resources.ConvertRemoteResourceToObject(remoteResource, remoteDatabaseNodeSet.Namespace) if err != nil { r.Recorder.Event( remoteDatabaseNodeSet, corev1.EventTypeWarning, - "ProvisioningFailed", - fmt.Sprintf("Failed to get resources used in another object: %s", err), + "ControllerError", + fmt.Sprintf("Failed to convert RemoteResource %s with name %s to object: %v", remoteResource.Kind, remoteResource.Name, err), ) return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err } - for idx := range candidatesToDelete { - remoteResource := candidatesToDelete[idx] - isCandidateExistInANotherObject := false - // Remove resource from cadidates to Delete if another object using it now - for i := range resourcesUsedInAnotherObject { - usedObj := resourcesUsedInAnotherObject[i] - usedObjGVK, err := apiutil.GVKForObject(usedObj, r.Scheme) - if err != nil { - return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err - } - if resources.EqualRemoteResourceWithObject( - &remoteResource, - remoteDatabaseNodeSet.Namespace, - usedObj, - usedObjGVK, - ) { - isCandidateExistInANotherObject = true - break - } - } - if !isCandidateExistInANotherObject { - resourcesToDelete = append(resourcesToDelete, remoteResource) - } - } - } - // Remove unused remote resource from cluster and make API call DELETE - // for every candidate to Delete - for _, recourceToDelete := range resourcesToDelete { - // Convert RemoteResource struct from Status to client.Object - remoteObj, err := resources.ConvertRemoteResourceToObject( - recourceToDelete, - remoteDatabaseNodeSet.Namespace, - ) - if err != nil { - return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err - } - - // Determine actual GVK for generic client.Object - remoteResourceGVK, err := apiutil.GVKForObject(remoteObj, r.Scheme) - if err != nil { + localObj := resources.CreateResource(remoteObj) + if err := r.Client.Get(ctx, types.NamespacedName{ + Name: localObj.GetName(), + Namespace: localObj.GetNamespace(), + }, localObj); err != nil { + if apierrors.IsNotFound(err) { + continue + } r.Recorder.Event( remoteDatabaseNodeSet, corev1.EventTypeWarning, "ControllerError", - fmt.Sprintf("Failed to recognize GVK for remote object %v: %s", remoteObj, err), + fmt.Sprintf("Failed to get RemoteResource %s with name %s as object: %v", remoteResource.Kind, remoteResource.Name, err), ) return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err } - // Try to get resource in local cluster - if err := r.Client.Get(ctx, types.NamespacedName{ - Name: remoteObj.GetName(), - Namespace: remoteObj.GetNamespace(), - }, remoteObj); err != nil { - if !apierrors.IsNotFound(err) { + // Remove annotation if no one another DatabaseNodeSet + if !existInDatabase { + // Try to update existing object in local cluster by rawPatch + patch := []byte(fmt.Sprintf(`{"metadata": {"annotations": {"%s": null}}}`, ydbannotations.PrimaryResourceStorageAnnotation)) + updateErr := r.Client.Patch(ctx, localObj, client.RawPatch(types.StrategicMergePatchType, patch)) + if updateErr != nil { r.Recorder.Event( remoteDatabaseNodeSet, corev1.EventTypeWarning, "ControllerError", - fmt.Sprintf("Failed to get resource %s with name %s: %s", remoteResourceGVK.Kind, remoteObj.GetName(), err), + fmt.Sprintf("Failed to update resource %s with name %s: %s", remoteResource.Kind, remoteResource.Name, err), + ) + } else { + r.Recorder.Event( + remoteDatabaseNodeSet, + corev1.EventTypeNormal, + "Provisioning", + fmt.Sprintf("RemoteSync UPDATE resource %s with name %s unset primaryResource annotation", remoteResource.Kind, remoteResource.Name), ) - return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err } } - // Skip resource deletion because it using in some Storage - // check by existence of annotation `ydb.tech/primary-resource-storage` - if _, exist := remoteObj.GetAnnotations()[ydbannotations.PrimaryResourceStorageAnnotation]; exist { - continue - } - - // Try to delete unused resource from local cluster - if err := r.Client.Delete(ctx, remoteObj); err != nil { - if !apierrors.IsNotFound(err) { + // Delete resource if annotation `ydb.tech/primary-resource-storage` does not exist + _, existInStorage := localObj.GetAnnotations()[ydbannotations.PrimaryResourceStorageAnnotation] + if !existInStorage { + // Try to delete unused resource from local cluster + deleteErr := r.Client.Delete(ctx, localObj) + if deleteErr != nil { + if !apierrors.IsNotFound(deleteErr) { + r.Recorder.Event( + remoteDatabaseNodeSet, + corev1.EventTypeWarning, + "ControllerError", + fmt.Sprintf("Failed to delete resource %s with name %s: %s", remoteResource.Kind, remoteResource.Name, err), + ) + } + } else { r.Recorder.Event( remoteDatabaseNodeSet, - corev1.EventTypeWarning, - "ControllerError", - fmt.Sprintf("Failed to delete resource %s with name %s: %s", remoteResourceGVK.Kind, remoteObj.GetName(), err), + corev1.EventTypeNormal, + "Provisioning", + fmt.Sprintf("RemoteSync DELETE resource %s with name %s", remoteResource.Kind, remoteResource.Name), ) - return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err } } + remoteDatabaseNodeSet.RemoveRemoteResourceStatus(remoteObj) + } + + return r.updateStatusRemoteObjects(ctx, remoteDatabaseNodeSet, StatusUpdateRequeueDelay) +} +func (r *Reconciler) updateStatusRemoteObjects( + ctx context.Context, + remoteDatabaseNodeSet *resources.RemoteDatabaseNodeSetResource, + requeueAfter time.Duration, +) (bool, ctrl.Result, error) { + crRemoteDatabaseNodeSet := &v1alpha1.RemoteDatabaseNodeSet{} + getErr := r.RemoteClient.Get(ctx, types.NamespacedName{ + Name: remoteDatabaseNodeSet.Name, + Namespace: remoteDatabaseNodeSet.Namespace, + }, crRemoteDatabaseNodeSet) + if getErr != nil { r.Recorder.Event( remoteDatabaseNodeSet, - corev1.EventTypeNormal, - "Provisioning", - fmt.Sprintf("RemoteSync DELETE resource %s with name %s", remoteResourceGVK.Kind, remoteObj.GetName()), + corev1.EventTypeWarning, + "ControllerError", + fmt.Sprintf("Failed fetching CR before status update for remote resources on remote cluster: %v", getErr), + ) + r.RemoteRecorder.Event( + remoteDatabaseNodeSet, + corev1.EventTypeWarning, + "ControllerError", + fmt.Sprintf("Failed fetching CR before status update for remote resources: %v", getErr), + ) + return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, getErr + } + + crRemoteDatabaseNodeSet.Status.RemoteResources = append([]v1alpha1.RemoteResource{}, remoteDatabaseNodeSet.Status.RemoteResources...) + updateErr := r.RemoteClient.Status().Update(ctx, crRemoteDatabaseNodeSet) + if updateErr != nil { + r.Recorder.Event( + remoteDatabaseNodeSet, + corev1.EventTypeWarning, + "ControllerError", + fmt.Sprintf("Failed to update status for remote resources on remote cluster: %s", updateErr), + ) + r.RemoteRecorder.Event( + remoteDatabaseNodeSet, + corev1.EventTypeWarning, + "ControllerError", + fmt.Sprintf("Failed to update status for remote resources: %s", updateErr), ) - // Remove status for remote resource from RemoteDatabaseNodeSet object - remoteDatabaseNodeSet.RemoveRemoteResourceStatus(remoteObj, remoteResourceGVK) + return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, updateErr } - return r.updateRemoteResourcesStatus(ctx, remoteDatabaseNodeSet) + r.Recorder.Event( + remoteDatabaseNodeSet, + corev1.EventTypeNormal, + "StatusChanged", + "Status for remote resources updated on remote cluster", + ) + r.RemoteRecorder.Event( + remoteDatabaseNodeSet, + corev1.EventTypeNormal, + "StatusChanged", + "Status for remote resources updated", + ) + + return Stop, ctrl.Result{RequeueAfter: requeueAfter}, nil } -func (r *Reconciler) getRemoteObjectsUsedInNamespace( +func (r *Reconciler) getAnotherDatabaseNodeSets( ctx context.Context, remoteDatabaseNodeSet *resources.RemoteDatabaseNodeSetResource, - remoteObjs []client.Object, -) ([]client.Object, error) { - remoteObjectsUsedInNamespace := []client.Object{} - +) ([]v1alpha1.DatabaseNodeSet, error) { // Create label requirement that label `ydb.tech/database-nodeset` which not equal // to current DatabaseNodeSet object for exclude current nodeSet from List result labelRequirement, err := labels.NewRequirement( @@ -402,10 +403,10 @@ func (r *Reconciler) getRemoteObjectsUsedInNamespace( // Search another DatabaseNodeSets in current namespace with the same DatabaseRef // but exclude current nodeSet from result - databaseNodeSets := &v1alpha1.DatabaseNodeSetList{} + databaseNodeSets := v1alpha1.DatabaseNodeSetList{} if err := r.Client.List( ctx, - databaseNodeSets, + &databaseNodeSets, client.InNamespace(remoteDatabaseNodeSet.Namespace), client.MatchingLabelsSelector{ Selector: labels.NewSelector().Add(*labelRequirement), @@ -417,95 +418,5 @@ func (r *Reconciler) getRemoteObjectsUsedInNamespace( return nil, err } - // We found some DatabaseNodeSet and should check objects usage - if len(databaseNodeSets.Items) > 0 { - for _, remoteObj := range remoteObjs { - switch obj := remoteObj.(type) { - // If client.Object typed by Secret search existence - // in another DatabaseNodeSet spec.secrets - case *corev1.Secret: - for _, databaseNodeSet := range databaseNodeSets.Items { - for _, secret := range databaseNodeSet.Spec.Secrets { - if obj.GetName() == secret.Name { - remoteObjectsUsedInNamespace = append( - remoteObjectsUsedInNamespace, - obj, - ) - } - } - } - // Else client.Object typed by ConfigMap or Service - // which always used in another DatabaseNodeSet - default: - remoteObjectsUsedInNamespace = append( - remoteObjectsUsedInNamespace, - obj, - ) - } - } - } - - return remoteObjectsUsedInNamespace, nil -} - -func (r *Reconciler) updateRemoteResourcesStatus( - ctx context.Context, - remoteDatabaseNodeSet *resources.RemoteDatabaseNodeSetResource, -) (bool, ctrl.Result, error) { - crRemoteDatabaseNodeSet := &v1alpha1.RemoteDatabaseNodeSet{} - err := r.RemoteClient.Get(ctx, types.NamespacedName{ - Name: remoteDatabaseNodeSet.Name, - Namespace: remoteDatabaseNodeSet.Namespace, - }, crRemoteDatabaseNodeSet) - if err != nil { - r.Recorder.Event( - remoteDatabaseNodeSet, - corev1.EventTypeWarning, - "ControllerError", - "Failed fetching RemoteDatabaseNodeSet on remote cluster before remote status update", - ) - r.RemoteRecorder.Event( - remoteDatabaseNodeSet, - corev1.EventTypeWarning, - "ControllerError", - "Failed fetching RemoteDatabaseNodeSet before remote status update", - ) - return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err - } - - oldSyncResources := append([]v1alpha1.RemoteResource{}, crRemoteDatabaseNodeSet.Status.RemoteResources...) - crRemoteDatabaseNodeSet.Status.RemoteResources = append([]v1alpha1.RemoteResource{}, remoteDatabaseNodeSet.Status.RemoteResources...) - - if !reflect.DeepEqual(oldSyncResources, remoteDatabaseNodeSet.Status.RemoteResources) { - if err = r.RemoteClient.Status().Update(ctx, crRemoteDatabaseNodeSet); err != nil { - r.Recorder.Event( - remoteDatabaseNodeSet, - corev1.EventTypeWarning, - "ControllerError", - fmt.Sprintf("Failed to update status for remote resources on remote cluster: %s", err), - ) - r.RemoteRecorder.Event( - remoteDatabaseNodeSet, - corev1.EventTypeWarning, - "ControllerError", - fmt.Sprintf("Failed to update status for remote resources: %s", err), - ) - return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err - } - r.Recorder.Event( - remoteDatabaseNodeSet, - corev1.EventTypeNormal, - "StatusChanged", - "Status updated for remote resources on remote cluster", - ) - r.RemoteRecorder.Event( - remoteDatabaseNodeSet, - corev1.EventTypeNormal, - "StatusChanged", - "Status updated for remote resources", - ) - return Stop, ctrl.Result{RequeueAfter: StatusUpdateRequeueDelay}, nil - } - - return Continue, ctrl.Result{Requeue: false}, nil + return databaseNodeSets.Items, nil } diff --git a/internal/controllers/remotedatabasenodeset/sync.go b/internal/controllers/remotedatabasenodeset/sync.go index 05741319..403c10ea 100644 --- a/internal/controllers/remotedatabasenodeset/sync.go +++ b/internal/controllers/remotedatabasenodeset/sync.go @@ -22,9 +22,9 @@ func (r *Reconciler) Sync(ctx context.Context, crRemoteDatabaseNodeSet *v1alpha1 var err error remoteDatabaseNodeSet := resources.NewRemoteDatabaseNodeSet(crRemoteDatabaseNodeSet) - remoteObjects := remoteDatabaseNodeSet.GetRemoteObjects() + remoteObjects := remoteDatabaseNodeSet.GetRemoteObjects(r.Scheme) - stop, result, err = r.initRemoteResourcesStatus(ctx, &remoteDatabaseNodeSet, remoteObjects) + stop, result, err = r.initRemoteObjectsStatus(ctx, &remoteDatabaseNodeSet, remoteObjects) if stop { return result, err } diff --git a/internal/controllers/remotestoragenodeset/controller.go b/internal/controllers/remotestoragenodeset/controller.go index cb2664b8..aafe961a 100644 --- a/internal/controllers/remotestoragenodeset/controller.go +++ b/internal/controllers/remotestoragenodeset/controller.go @@ -54,15 +54,15 @@ type Reconciler struct { //+kubebuilder:rbac:groups=core,resources=secrets/status,verbs=get;update;patch func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - logger := log.FromContext(ctx) + r.Log = log.FromContext(ctx) remoteStorageNodeSet := &v1alpha1.RemoteStorageNodeSet{} if err := r.RemoteClient.Get(ctx, req.NamespacedName, remoteStorageNodeSet); err != nil { if apierrors.IsNotFound(err) { - logger.Info("RemoteStorageNodeSet resource not found on remote cluster") + r.Log.Info("RemoteStorageNodeSet resource not found on remote cluster") return ctrl.Result{Requeue: false}, nil } - logger.Error(err, "unable to get RemoteStorageNodeSet on remote cluster") + r.Log.Error(err, "unable to get RemoteStorageNodeSet on remote cluster") return ctrl.Result{RequeueAfter: DefaultRequeueDelay}, nil } @@ -225,29 +225,27 @@ func (r *Reconciler) deleteExternalResources( ctx context.Context, crRemoteStorageNodeSet *v1alpha1.RemoteStorageNodeSet, ) error { - logger := log.FromContext(ctx) - storageNodeSet := &v1alpha1.StorageNodeSet{} if err := r.Client.Get(ctx, types.NamespacedName{ Name: crRemoteStorageNodeSet.Name, Namespace: crRemoteStorageNodeSet.Namespace, }, storageNodeSet); err != nil { if apierrors.IsNotFound(err) { - logger.Info("StorageNodeSet not found") + r.Log.Info("StorageNodeSet not found") } else { - logger.Error(err, "unable to get StorageNodeSet") + r.Log.Error(err, "unable to get StorageNodeSet") return err } } else { if err := r.Client.Delete(ctx, storageNodeSet); err != nil { - logger.Error(err, "unable to delete StorageNodeSet") + r.Log.Error(err, "unable to delete StorageNodeSet") return err } } remoteStorageNodeSet := resources.NewRemoteStorageNodeSet(crRemoteStorageNodeSet) if _, _, err := r.removeUnusedRemoteObjects(ctx, &remoteStorageNodeSet, []client.Object{}); err != nil { - logger.Error(err, "unable to delete unused remote resources") + r.Log.Error(err, "unable to delete unused remote resources") return err } diff --git a/internal/controllers/remotestoragenodeset/controller_test.go b/internal/controllers/remotestoragenodeset/controller_test.go index 87d94a83..0ec90731 100644 --- a/internal/controllers/remotestoragenodeset/controller_test.go +++ b/internal/controllers/remotestoragenodeset/controller_test.go @@ -22,7 +22,6 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "sigs.k8s.io/controller-runtime/pkg/cluster" "sigs.k8s.io/controller-runtime/pkg/envtest" logf "sigs.k8s.io/controller-runtime/pkg/log" @@ -124,7 +123,6 @@ var _ = BeforeSuite(func() { Client: localManager.GetClient(), Scheme: localManager.GetScheme(), Config: localManager.GetConfig(), - Log: logf.Log, }).SetupWithManager(localManager) Expect(err).ShouldNot(HaveOccurred()) @@ -132,7 +130,6 @@ var _ = BeforeSuite(func() { Client: localManager.GetClient(), Scheme: localManager.GetScheme(), Config: localManager.GetConfig(), - Log: logf.Log, }).SetupWithManager(localManager) Expect(err).ShouldNot(HaveOccurred()) @@ -140,14 +137,12 @@ var _ = BeforeSuite(func() { Client: remoteManager.GetClient(), Scheme: remoteManager.GetScheme(), Config: remoteManager.GetConfig(), - Log: logf.Log, }).SetupWithManager(remoteManager) Expect(err).ShouldNot(HaveOccurred()) err = (&remotestoragenodeset.Reconciler{ Client: remoteManager.GetClient(), Scheme: remoteManager.GetScheme(), - Log: logf.Log, }).SetupWithManager(remoteManager, &remoteCluster) Expect(err).ShouldNot(HaveOccurred()) @@ -481,16 +476,12 @@ var _ = Describe("RemoteStorageNodeSet controller tests", func() { Namespace: testobjects.YdbNamespace, }, &foundConfigMap)).Should(Succeed()) - gvk, err := apiutil.GVKForObject(foundConfigMap.DeepCopy(), scheme.Scheme) - Expect(err).ShouldNot(HaveOccurred()) - + logf.Log.Info("remoteResources", "status", foundRemoteStorageNodeSet.Status.RemoteResources) for idx := range foundRemoteStorageNodeSet.Status.RemoteResources { remoteResource := foundRemoteStorageNodeSet.Status.RemoteResources[idx] if resources.EqualRemoteResourceWithObject( &remoteResource, - testobjects.YdbNamespace, foundConfigMap.DeepCopy(), - gvk, ) { if meta.IsStatusConditionPresentAndEqual( remoteResource.Conditions, @@ -518,16 +509,11 @@ var _ = Describe("RemoteStorageNodeSet controller tests", func() { Namespace: testobjects.YdbNamespace, }, &foundConfigMap)).Should(Succeed()) - gvk, err := apiutil.GVKForObject(foundConfigMap.DeepCopy(), scheme.Scheme) - Expect(err).ShouldNot(HaveOccurred()) - for idx := range foundRemoteStorageNodeSet.Status.RemoteResources { remoteResource := foundRemoteStorageNodeSet.Status.RemoteResources[idx] if resources.EqualRemoteResourceWithObject( &remoteResource, - testobjects.YdbNamespace, foundConfigMap.DeepCopy(), - gvk, ) { if meta.IsStatusConditionPresentAndEqual( remoteResource.Conditions, @@ -676,6 +662,16 @@ func deleteAll(env *envtest.Environment, k8sClient client.Client, objs ...client Expect(client.IgnoreNotFound(ignoreMethodNotAllowed(err))).ShouldNot(HaveOccurred()) } + // Delete all Services in this namespace + serviceList := corev1.ServiceList{} + err = k8sClient.List(ctx, &serviceList, client.InNamespace(ns.Name)) + Expect(err).ShouldNot(HaveOccurred()) + for idx := range serviceList.Items { + policy := metav1.DeletePropagationForeground + err = k8sClient.Delete(ctx, &serviceList.Items[idx], &client.DeleteOptions{PropagationPolicy: &policy}) + Expect(err).ShouldNot(HaveOccurred()) + } + Eventually(func() error { key := client.ObjectKeyFromObject(ns) if err := k8sClient.Get(ctx, key, ns); err != nil { diff --git a/internal/controllers/remotestoragenodeset/remote_objects.go b/internal/controllers/remotestoragenodeset/remote_objects.go index 4fae4866..a54f6a39 100644 --- a/internal/controllers/remotestoragenodeset/remote_objects.go +++ b/internal/controllers/remotestoragenodeset/remote_objects.go @@ -3,18 +3,17 @@ package remotestoragenodeset import ( "context" "fmt" - "reflect" + "time" - "github.com/banzaicloud/k8s-objectmatcher/patch" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "github.com/ydb-platform/ydb-kubernetes-operator/api/v1alpha1" ydbannotations "github.com/ydb-platform/ydb-kubernetes-operator/internal/annotations" @@ -23,66 +22,32 @@ import ( "github.com/ydb-platform/ydb-kubernetes-operator/internal/resources" ) -var ( - annotator = patch.NewAnnotator(ydbannotations.LastAppliedAnnotation) - patchMaker = patch.NewPatchMaker(annotator) -) - -func (r *Reconciler) initRemoteResourcesStatus( +func (r *Reconciler) initRemoteObjectsStatus( ctx context.Context, remoteStorageNodeSet *resources.RemoteStorageNodeSetResource, remoteObjects []client.Object, ) (bool, ctrl.Result, error) { - r.Log.Info("running step initRemoteResourcesStatus") - syncedResources := []v1alpha1.RemoteResource{} - // copy actual slice to local variable - if remoteStorageNodeSet.Status.RemoteResources != nil { - syncedResources = append(syncedResources, remoteStorageNodeSet.Status.RemoteResources...) - } - - for idx := range remoteObjects { - remoteObj := remoteObjects[idx] - remoteObjGVK, err := apiutil.GVKForObject(remoteObj, r.Scheme) - if err != nil { - r.Recorder.Event( - remoteStorageNodeSet, - corev1.EventTypeWarning, - "ControllerError", - fmt.Sprintf("Failed to recognize GVK for remote object %s with name %s: %s", remoteObjGVK.Kind, remoteObj.GetName(), err), - ) - return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err - } + r.Log.Info("running step initRemoteObjectsStatus") + for _, remoteObj := range remoteObjects { existInStatus := false - for i := range syncedResources { - remoteResource := syncedResources[i] + for idx := range remoteStorageNodeSet.Status.RemoteResources { if resources.EqualRemoteResourceWithObject( - &remoteResource, - remoteStorageNodeSet.Namespace, + &remoteStorageNodeSet.Status.RemoteResources[idx], remoteObj, - remoteObjGVK, ) { existInStatus = true break } } - if !existInStatus { - remoteStorageNodeSet.Status.RemoteResources = append( - remoteStorageNodeSet.Status.RemoteResources, - v1alpha1.RemoteResource{ - Group: remoteObjGVK.Group, - Version: remoteObjGVK.Version, - Kind: remoteObjGVK.Kind, - Name: remoteObj.GetName(), - State: ResourceSyncPending, - Conditions: []metav1.Condition{}, - }, - ) + remoteStorageNodeSet.CreateRemoteResourceStatus(remoteObj) + return r.updateStatusRemoteObjects(ctx, remoteStorageNodeSet, StatusUpdateRequeueDelay) } } - return r.updateRemoteResourcesStatus(ctx, remoteStorageNodeSet) + r.Log.Info("complete step initRemoteObjectsStatus") + return Continue, ctrl.Result{}, nil } func (r *Reconciler) syncRemoteObjects( @@ -93,144 +58,142 @@ func (r *Reconciler) syncRemoteObjects( r.Log.Info("running step syncRemoteObjects") for _, remoteObj := range remoteObjects { - // Determine actual GVK for generic client.Object - remoteObjGVK, err := apiutil.GVKForObject(remoteObj, r.Scheme) - if err != nil { - r.Recorder.Event( - remoteStorageNodeSet, - corev1.EventTypeWarning, - "ControllerError", - fmt.Sprintf("Failed to recognize GVK for remote object %s with name %s: %s", remoteObjGVK.Kind, remoteObj.GetName(), err), - ) - return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err + remoteObjName := remoteObj.GetName() + remoteObjKind := remoteObj.GetObjectKind().GroupVersionKind().Kind + remoteObjRV := remoteObj.GetResourceVersion() + var remoteResource *v1alpha1.RemoteResource + for idx := range remoteStorageNodeSet.Status.RemoteResources { + if resources.EqualRemoteResourceWithObject(&remoteStorageNodeSet.Status.RemoteResources[idx], remoteObj) { + remoteResource = &remoteStorageNodeSet.Status.RemoteResources[idx] + break + } } // Get object to sync from remote cluster - err = r.RemoteClient.Get(ctx, types.NamespacedName{ + remoteGetErr := r.RemoteClient.Get(ctx, types.NamespacedName{ Name: remoteObj.GetName(), Namespace: remoteObj.GetNamespace(), }, remoteObj) - if err != nil { - // Resource not found on remote cluster but we should retry - if apierrors.IsNotFound(err) { + // Resource not found on remote cluster or internal kubernetes error + if remoteGetErr != nil { + if apierrors.IsNotFound(remoteGetErr) { r.Recorder.Event( remoteStorageNodeSet, corev1.EventTypeWarning, "ProvisioningFailed", - fmt.Sprintf("Resource %s with name %s was not found on remote cluster: %s", remoteObjGVK.Kind, remoteObj.GetName(), err), + fmt.Sprintf("Resource %s with name %s was not found on remote cluster: %s", remoteObjKind, remoteObjName, remoteGetErr), ) r.RemoteRecorder.Event( remoteStorageNodeSet, corev1.EventTypeWarning, "ProvisioningFailed", - fmt.Sprintf("Resource %s with name %s was not found: %s", remoteObjGVK.Kind, remoteObj.GetName(), err), + fmt.Sprintf("Resource %s with name %s was not found: %s", remoteObjKind, remoteObjName, remoteGetErr), ) } else { r.Recorder.Event( remoteStorageNodeSet, corev1.EventTypeWarning, "ControllerError", - fmt.Sprintf("Failed to get resource %s with name %s on remote cluster: %s", remoteObjGVK.Kind, remoteObj.GetName(), err), + fmt.Sprintf("Failed to get resource %s with name %s on remote cluster: %s", remoteObjKind, remoteObjName, remoteGetErr), ) r.RemoteRecorder.Event( remoteStorageNodeSet, corev1.EventTypeWarning, "ControllerError", - fmt.Sprintf("Failed to get resource %s with name %s: %s", remoteObjGVK.Kind, remoteObj.GetName(), err), + fmt.Sprintf("Failed to get resource %s with name %s: %s", remoteObjKind, remoteObjName, remoteGetErr), ) } - return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err + remoteStorageNodeSet.UpdateRemoteResourceStatus(remoteResource, metav1.ConditionFalse, remoteObjRV) + return r.updateStatusRemoteObjects(ctx, remoteStorageNodeSet, DefaultRequeueDelay) } - // Create client.Object from api.RemoteResource struct - localObj := resources.CreateResource(remoteObj) - remoteStorageNodeSet.SetPrimaryResourceAnnotations(localObj) // Check object existence in local cluster - err = r.Client.Get(ctx, types.NamespacedName{ - Name: remoteObj.GetName(), - Namespace: remoteObj.GetNamespace(), + localObj := resources.CreateResource(remoteObj) + getErr := r.Client.Get(ctx, types.NamespacedName{ + Name: localObj.GetName(), + Namespace: localObj.GetNamespace(), }, localObj) - //nolint:nestif - if err != nil { - if !apierrors.IsNotFound(err) { - r.Recorder.Event( - remoteStorageNodeSet, - corev1.EventTypeWarning, - "ControllerError", - fmt.Sprintf("Failed to get resource %s with name %s: %s", remoteObjGVK.Kind, remoteObj.GetName(), err), - ) - return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err - } - // Object does not exist in local cluster - // Try to create resource in remote cluster - if err := r.Client.Create(ctx, localObj); err != nil { + + // Handler for kubernetes internal error + if getErr != nil && !apierrors.IsNotFound(getErr) { + r.Recorder.Event( + remoteStorageNodeSet, + corev1.EventTypeWarning, + "ControllerError", + fmt.Sprintf("Failed to get resource %s with name %s: %s", remoteObjKind, remoteObjName, getErr), + ) + remoteStorageNodeSet.UpdateRemoteResourceStatus(remoteResource, metav1.ConditionFalse, remoteObjRV) + return r.updateStatusRemoteObjects(ctx, remoteStorageNodeSet, DefaultRequeueDelay) + } + + // Try to create non-existing remote object in local cluster + if apierrors.IsNotFound(getErr) { + createErr := r.Client.Create(ctx, localObj) + if createErr != nil { r.Recorder.Event( remoteStorageNodeSet, corev1.EventTypeWarning, "ControllerError", - fmt.Sprintf("Failed to create resource %s with name %s: %s", remoteObjGVK.Kind, remoteObj.GetName(), err), + fmt.Sprintf("Failed to create resource %s with name %s: %s", remoteObjKind, remoteObjName, getErr), ) - return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, nil + remoteStorageNodeSet.UpdateRemoteResourceStatus(remoteResource, metav1.ConditionFalse, remoteObjRV) + return r.updateStatusRemoteObjects(ctx, remoteStorageNodeSet, DefaultRequeueDelay) } r.Recorder.Event( remoteStorageNodeSet, corev1.EventTypeNormal, "Provisioning", - fmt.Sprintf("RemoteSync CREATE resource %s with name %s", remoteObjGVK.Kind, remoteObj.GetName()), + fmt.Sprintf("RemoteSync CREATE resource %s with name %s", remoteObjKind, remoteObjName), ) - } else { - // Update client.Object for local object with spec from remote object - updatedObj := resources.UpdateResource(localObj, remoteObj) - remoteStorageNodeSet.SetPrimaryResourceAnnotations(updatedObj) - // Remote object existing in local cluster, сheck the need for an update - // Get diff resources and compare bytes by k8s-objectmatcher PatchMaker - patchResult, err := patchMaker.Calculate(localObj, updatedObj, - []patch.CalculateOption{ - patch.IgnoreStatusFields(), - }..., + remoteStorageNodeSet.UpdateRemoteResourceStatus(remoteResource, metav1.ConditionFalse, remoteObjRV) + return r.updateStatusRemoteObjects(ctx, remoteStorageNodeSet, StatusUpdateRequeueDelay) + } + + // Get patch diff between remote object and existing object + remoteStorageNodeSet.SetPrimaryResourceAnnotations(remoteObj) + patchResult, patchErr := resources.GetPatchResult(localObj, remoteObj) + if patchErr != nil { + r.Recorder.Event( + remoteStorageNodeSet, + corev1.EventTypeWarning, + "ControllerError", + fmt.Sprintf("Failed to get diff for remote resource %s with name %s: %s", remoteObjKind, remoteObjName, patchErr), ) - if err != nil { + remoteStorageNodeSet.UpdateRemoteResourceStatus(remoteResource, metav1.ConditionFalse, remoteObjRV) + return r.updateStatusRemoteObjects(ctx, remoteStorageNodeSet, DefaultRequeueDelay) + } + + // Try to update existing object in local cluster by rawPatch + if !patchResult.IsEmpty() { + updateErr := r.Client.Patch(ctx, localObj, client.RawPatch(types.StrategicMergePatchType, patchResult.Patch)) + if updateErr != nil { r.Recorder.Event( remoteStorageNodeSet, corev1.EventTypeWarning, "ControllerError", - fmt.Sprintf("Failed to get diff for remote resource %s with name %s: %s", remoteObjGVK.Kind, remoteObj.GetName(), err), - ) - return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, nil - } - // We need to check patchResult by k8s-objectmatcherand and resourceVersion from annotation - // And update if localObj does not match updatedObj from remote cluster - if !patchResult.IsEmpty() || - remoteObj.GetResourceVersion() != localObj.GetAnnotations()[ydbannotations.RemoteResourceVersionAnnotation] { - r.Recorder.Event( - remoteStorageNodeSet, - corev1.EventTypeNormal, - "Provisioning", - fmt.Sprintf("Patch for resource %s with name %s: %s", remoteObjGVK.Kind, remoteObj.GetName(), string(patchResult.Patch)), - ) - // Try to update resource in local cluster - if err := r.Client.Update(ctx, updatedObj); err != nil { - r.Recorder.Event( - remoteStorageNodeSet, - corev1.EventTypeWarning, - "ControllerError", - fmt.Sprintf("Failed to update resource %s with name %s: %s", remoteObjGVK.Kind, remoteObj.GetName(), err), - ) - return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, nil - } - r.Recorder.Event( - remoteStorageNodeSet, - corev1.EventTypeNormal, - "Provisioning", - fmt.Sprintf("RemoteSync UPDATE resource %s with name %s resourceVersion %s", remoteObjGVK.Kind, remoteObj.GetName(), remoteObj.GetResourceVersion()), + fmt.Sprintf("Failed to update resource %s with name %s: %v", remoteObjKind, remoteObjName, updateErr), ) + remoteStorageNodeSet.UpdateRemoteResourceStatus(remoteResource, metav1.ConditionFalse, remoteObjRV) + return r.updateStatusRemoteObjects(ctx, remoteStorageNodeSet, DefaultRequeueDelay) } + r.Recorder.Event( + remoteStorageNodeSet, + corev1.EventTypeNormal, + "Provisioning", + fmt.Sprintf("RemoteSync UPDATE resource %s with name %s resourceVersion %s", remoteObjKind, remoteObjName, remoteObjRV), + ) + remoteStorageNodeSet.UpdateRemoteResourceStatus(remoteResource, metav1.ConditionTrue, remoteObjRV) + return r.updateStatusRemoteObjects(ctx, remoteStorageNodeSet, StatusUpdateRequeueDelay) + } + + if !meta.IsStatusConditionTrue(remoteResource.Conditions, RemoteResourceSyncedCondition) { + remoteStorageNodeSet.UpdateRemoteResourceStatus(remoteResource, metav1.ConditionTrue, remoteObjRV) + return r.updateStatusRemoteObjects(ctx, remoteStorageNodeSet, StatusUpdateRequeueDelay) } - // Update status for remote resource in RemoteStorageNodeSet object - remoteStorageNodeSet.SetRemoteResourceStatus(localObj, remoteObjGVK) } - return r.updateRemoteResourcesStatus(ctx, remoteStorageNodeSet) + r.Log.Info("complete step syncRemoteObjects") + return Continue, ctrl.Result{}, nil } func (r *Reconciler) removeUnusedRemoteObjects( @@ -238,271 +201,221 @@ func (r *Reconciler) removeUnusedRemoteObjects( remoteStorageNodeSet *resources.RemoteStorageNodeSetResource, remoteObjects []client.Object, ) (bool, ctrl.Result, error) { - r.Log.Info("running step removeUnusedRemoteObjects") + r.Log.Info("running step removeUnusedRemoteResources") + // We should check every remote resource to need existence in cluster // Get processed remote resources from object Status candidatesToDelete := []v1alpha1.RemoteResource{} - - // Remove remote resource from candidates to delete if it declared - // to using in current RemoteStorageNodeSet spec for idx := range remoteStorageNodeSet.Status.RemoteResources { - remoteResource := remoteStorageNodeSet.Status.RemoteResources[idx] + // Remove remote resource from candidates to delete if it declared + // to using in current RemoteStorageNodeSet spec existInSpec := false - for i := range remoteObjects { - declaredObj := remoteObjects[i] - declaredObjGVK, err := apiutil.GVKForObject(declaredObj, r.Scheme) - if err != nil { - return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err - } + for _, remoteObj := range remoteObjects { if resources.EqualRemoteResourceWithObject( - &remoteResource, - remoteStorageNodeSet.Namespace, - declaredObj, - declaredObjGVK, + &remoteStorageNodeSet.Status.RemoteResources[idx], + remoteObj, ) { existInSpec = true break } } if !existInSpec { - candidatesToDelete = append(candidatesToDelete, remoteResource) + candidatesToDelete = append(candidatesToDelete, remoteStorageNodeSet.Status.RemoteResources[idx]) } } - // Check resources usage in another StorageNodeSet and make List request - // only if we have candidates to Delete - resourcesToDelete := []v1alpha1.RemoteResource{} - if len(candidatesToDelete) > 0 { - remoteObjectsUsed, err := r.getRemoteObjectsUsedInNamespace(ctx, remoteStorageNodeSet, remoteObjects) + if len(candidatesToDelete) == 0 { + r.Log.Info("complete step removeUnusedRemoteObjects") + return Continue, ctrl.Result{}, nil + } + + existInStorage := false + anotherStorageNodeSets, err := r.getAnotherStorageNodeSets(ctx, remoteStorageNodeSet) + if err != nil { + r.Recorder.Event( + remoteStorageNodeSet, + corev1.EventTypeWarning, + "ControllerError", + fmt.Sprintf("Failed to check remote resource usage in another: %v", err), + ) + return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err + } + + if len(anotherStorageNodeSets) > 0 { + existInStorage = true + } + + // Check RemoteResource usage in StorageNodeSet + for _, remoteResource := range candidatesToDelete { + remoteObj, err := resources.ConvertRemoteResourceToObject(remoteResource, remoteStorageNodeSet.Namespace) if err != nil { r.Recorder.Event( remoteStorageNodeSet, corev1.EventTypeWarning, - "ProvisioningFailed", - fmt.Sprintf("Failed to get resources used in another object: %s", err), + "ControllerError", + fmt.Sprintf("Failed to convert RemoteResource %s with name %s to object: %v", remoteResource.Kind, remoteResource.Name, err), ) return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err } - for idx := range candidatesToDelete { - remoteResource := candidatesToDelete[idx] - isCandidateExistInANotherObject := false - // Remove resource from cadidates to Delete if another object using it now - for i := range remoteObjectsUsed { - usedObj := remoteObjectsUsed[i] - usedObjGVK, err := apiutil.GVKForObject(usedObj, r.Scheme) - if err != nil { - return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err - } - if resources.EqualRemoteResourceWithObject( - &remoteResource, - remoteStorageNodeSet.Namespace, - usedObj, - usedObjGVK, - ) { - isCandidateExistInANotherObject = true - break - } - } - if !isCandidateExistInANotherObject { - resourcesToDelete = append(resourcesToDelete, remoteResource) - } - } - } - // Remove unused remote resource from cluster and make API call DELETE - // for every candidate to Delete - for _, recourceToDelete := range resourcesToDelete { - // Convert RemoteResource struct from Status to client.Object - remoteObj, err := resources.ConvertRemoteResourceToObject( - recourceToDelete, - remoteStorageNodeSet.Namespace, - ) - if err != nil { - return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err - } - - // Determine actual GVK for generic client.Object - remoteResourceGVK, err := apiutil.GVKForObject(remoteObj, r.Scheme) - if err != nil { + localObj := resources.CreateResource(remoteObj) + if err := r.Client.Get(ctx, types.NamespacedName{ + Name: localObj.GetName(), + Namespace: localObj.GetNamespace(), + }, localObj); err != nil { + if apierrors.IsNotFound(err) { + continue + } r.Recorder.Event( remoteStorageNodeSet, corev1.EventTypeWarning, "ControllerError", - fmt.Sprintf("Failed to recognize GVK for remote object %v: %s", remoteObj, err), + fmt.Sprintf("Failed to get RemoteResource %s with name %s as object: %v", remoteResource.Kind, remoteResource.Name, err), ) return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err } - // Try to get resource in local cluster - if err := r.Client.Get(ctx, types.NamespacedName{ - Name: remoteObj.GetName(), - Namespace: remoteObj.GetNamespace(), - }, remoteObj); err != nil { - if !apierrors.IsNotFound(err) { + // Remove annotation if no one another StorageNodeSet + if !existInStorage { + patch := []byte(fmt.Sprintf(`{"metadata": {"annotations": {"%s": null}}}`, ydbannotations.PrimaryResourceStorageAnnotation)) + updateErr := r.Client.Patch(ctx, localObj, client.RawPatch(types.StrategicMergePatchType, patch)) + if updateErr != nil { r.Recorder.Event( remoteStorageNodeSet, corev1.EventTypeWarning, "ControllerError", - fmt.Sprintf("Failed to get resource %s with name %s: %s", remoteResourceGVK.Kind, remoteObj.GetName(), err), + fmt.Sprintf("Failed to update resource %s with name %s: %s", remoteResource.Kind, remoteResource.Name, err), ) - return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err - } - } - - // Skip resource deletion because it using in some Database - // check by existence of annotation `ydb.tech/primary-resource-database` - if _, exist := remoteObj.GetAnnotations()[ydbannotations.PrimaryResourceDatabaseAnnotation]; exist { - continue - } - - // Try to delete unused resource from local cluster - if err := r.Client.Delete(ctx, remoteObj); err != nil { - if !apierrors.IsNotFound(err) { + } else { r.Recorder.Event( remoteStorageNodeSet, - corev1.EventTypeWarning, - "ControllerError", - fmt.Sprintf("Failed to delete resource %s with name %s: %s", remoteResourceGVK.Kind, remoteObj.GetName(), err), + corev1.EventTypeNormal, + "Provisioning", + fmt.Sprintf("RemoteSync UPDATE resource %s with name %s unset primaryResource annotation", remoteResource.Kind, remoteResource.Name), ) - return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err } } - r.Recorder.Event( - remoteStorageNodeSet, - corev1.EventTypeNormal, - "Provisioning", - fmt.Sprintf("RemoteSync DELETE resource %s with name %s", remoteResourceGVK.Kind, remoteObj.GetName()), - ) - // Remove status for remote resource from RemoteStorageNodeSet object - remoteStorageNodeSet.RemoveRemoteResourceStatus(remoteObj, remoteResourceGVK) - } - - return r.updateRemoteResourcesStatus(ctx, remoteStorageNodeSet) -} - -func (r *Reconciler) getRemoteObjectsUsedInNamespace( - ctx context.Context, - remoteStorageNodeSet *resources.RemoteStorageNodeSetResource, - remoteObjects []client.Object, -) ([]client.Object, error) { - remoteObjectsUsedInNamespace := []client.Object{} - - // Create label requirement that label `ydb.tech/storage-nodeset` which not equal - // to current StorageNodeSet object for exclude current nodeSet from List result - labelRequirement, err := labels.NewRequirement( - ydblabels.StorageNodeSetComponent, - selection.NotEquals, - []string{remoteStorageNodeSet.Labels[ydblabels.StorageNodeSetComponent]}, - ) - if err != nil { - return nil, err - } - - // Search another StorageNodeSets in current namespace with the same StorageRef - storageNodeSets := &v1alpha1.StorageNodeSetList{} - if err := r.Client.List( - ctx, - storageNodeSets, - client.InNamespace(remoteStorageNodeSet.Namespace), - client.MatchingLabelsSelector{ - Selector: labels.NewSelector().Add(*labelRequirement), - }, - client.MatchingFields{ - StorageRefField: remoteStorageNodeSet.Spec.StorageRef.Name, - }, - ); err != nil { - return nil, err - } - // We found some StorageNodeSet and should check objects usage - if len(storageNodeSets.Items) > 0 { - for _, remoteObj := range remoteObjects { - switch obj := remoteObj.(type) { - // If client.Object typed by Secret search existence - // in another StorageNodeSet spec.secrets - case *corev1.Secret: - for _, storageNodeSet := range storageNodeSets.Items { - for _, secret := range storageNodeSet.Spec.Secrets { - if obj.GetName() == secret.Name { - remoteObjectsUsedInNamespace = append( - remoteObjectsUsedInNamespace, - obj, - ) - } - } + // Delete resource if annotation `ydb.tech/primary-resource-database` does not exist + _, existInDatabase := localObj.GetAnnotations()[ydbannotations.PrimaryResourceDatabaseAnnotation] + if !existInDatabase { + // Try to delete unused resource from local cluster + deleteErr := r.Client.Delete(ctx, localObj) + if deleteErr != nil { + if !apierrors.IsNotFound(deleteErr) { + r.Recorder.Event( + remoteStorageNodeSet, + corev1.EventTypeWarning, + "ControllerError", + fmt.Sprintf("Failed to delete resource %s with name %s: %s", remoteResource.Kind, remoteResource.Name, err), + ) } - // Else client.Object typed by ConfigMap or Service - // which always used in another StorageNodeSet - default: - remoteObjectsUsedInNamespace = append( - remoteObjectsUsedInNamespace, - obj, + } else { + r.Recorder.Event( + remoteStorageNodeSet, + corev1.EventTypeNormal, + "Provisioning", + fmt.Sprintf("RemoteSync DELETE resource %s with name %s", remoteResource.Kind, remoteResource.Name), ) } } + remoteStorageNodeSet.RemoveRemoteResourceStatus(remoteObj) } - return remoteObjectsUsedInNamespace, nil + return r.updateStatusRemoteObjects(ctx, remoteStorageNodeSet, StatusUpdateRequeueDelay) } -func (r *Reconciler) updateRemoteResourcesStatus( +func (r *Reconciler) updateStatusRemoteObjects( ctx context.Context, remoteStorageNodeSet *resources.RemoteStorageNodeSetResource, + requeueAfter time.Duration, ) (bool, ctrl.Result, error) { crRemoteStorageNodeSet := &v1alpha1.RemoteStorageNodeSet{} - err := r.RemoteClient.Get(ctx, types.NamespacedName{ + getErr := r.RemoteClient.Get(ctx, types.NamespacedName{ Name: remoteStorageNodeSet.Name, Namespace: remoteStorageNodeSet.Namespace, }, crRemoteStorageNodeSet) - if err != nil { + if getErr != nil { r.Recorder.Event( remoteStorageNodeSet, corev1.EventTypeWarning, "ControllerError", - "Failed fetching RemoteStorageNodeSet on remote cluster before remote status update", + fmt.Sprintf("Failed fetching CR before status update for remote resources on remote cluster: %v", getErr), ) r.RemoteRecorder.Event( remoteStorageNodeSet, corev1.EventTypeWarning, "ControllerError", - "Failed fetching RemoteStorageNodeSet before status update", + fmt.Sprintf("Failed fetching CR before status update for remote resources: %v", getErr), ) - return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err + return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, getErr } - oldSyncResources := append([]v1alpha1.RemoteResource{}, crRemoteStorageNodeSet.Status.RemoteResources...) crRemoteStorageNodeSet.Status.RemoteResources = append([]v1alpha1.RemoteResource{}, remoteStorageNodeSet.Status.RemoteResources...) - - if !reflect.DeepEqual(oldSyncResources, remoteStorageNodeSet.Status.RemoteResources) { - if err = r.RemoteClient.Status().Update(ctx, crRemoteStorageNodeSet); err != nil { - r.Recorder.Event( - remoteStorageNodeSet, - corev1.EventTypeWarning, - "ControllerError", - fmt.Sprintf("Failed to update status for remote resources on remote cluster: %s", err), - ) - r.RemoteRecorder.Event( - remoteStorageNodeSet, - corev1.EventTypeWarning, - "ControllerError", - fmt.Sprintf("Failed to update status for remote resources: %s", err), - ) - return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err - } + updateErr := r.RemoteClient.Status().Update(ctx, crRemoteStorageNodeSet) + if updateErr != nil { r.Recorder.Event( remoteStorageNodeSet, - corev1.EventTypeNormal, - "StatusChanged", - "Status updated for remote resources on remote cluster", + corev1.EventTypeWarning, + "ControllerError", + fmt.Sprintf("Failed to update status for remote resources on remote cluster: %s", updateErr), ) r.RemoteRecorder.Event( remoteStorageNodeSet, - corev1.EventTypeNormal, - "StatusChanged", - "Status updated for remote resources", + corev1.EventTypeWarning, + "ControllerError", + fmt.Sprintf("Failed to update status for remote resources: %s", updateErr), ) - return Stop, ctrl.Result{RequeueAfter: StatusUpdateRequeueDelay}, nil + return Stop, ctrl.Result{RequeueAfter: DefaultRequeueDelay}, updateErr + } + + r.Recorder.Event( + remoteStorageNodeSet, + corev1.EventTypeNormal, + "StatusChanged", + "Status for remote resources updated on remote cluster", + ) + r.RemoteRecorder.Event( + remoteStorageNodeSet, + corev1.EventTypeNormal, + "StatusChanged", + "Status for remote resources updated", + ) + + return Stop, ctrl.Result{RequeueAfter: requeueAfter}, nil +} + +func (r *Reconciler) getAnotherStorageNodeSets( + ctx context.Context, + remoteStorageNodeSet *resources.RemoteStorageNodeSetResource, +) ([]v1alpha1.StorageNodeSet, error) { + // Create label requirement that label `ydb.tech/storage-nodeset` which not equal + // to current StorageNodeSet object for exclude current nodeSet from List result + labelRequirement, err := labels.NewRequirement( + ydblabels.StorageNodeSetComponent, + selection.NotEquals, + []string{remoteStorageNodeSet.Labels[ydblabels.StorageNodeSetComponent]}, + ) + if err != nil { + return nil, err + } + + // Search another StorageNodeSets in current namespace with the same StorageRef + // but exclude current nodeSet from result + storageNodeSets := v1alpha1.StorageNodeSetList{} + if err := r.Client.List( + ctx, + &storageNodeSets, + client.InNamespace(remoteStorageNodeSet.Namespace), + client.MatchingLabelsSelector{ + Selector: labels.NewSelector().Add(*labelRequirement), + }, + client.MatchingFields{ + StorageRefField: remoteStorageNodeSet.Spec.StorageRef.Name, + }, + ); err != nil { + return nil, err } - return Continue, ctrl.Result{Requeue: false}, nil + return storageNodeSets.Items, nil } diff --git a/internal/controllers/remotestoragenodeset/sync.go b/internal/controllers/remotestoragenodeset/sync.go index 22da20e9..d217985e 100644 --- a/internal/controllers/remotestoragenodeset/sync.go +++ b/internal/controllers/remotestoragenodeset/sync.go @@ -22,9 +22,9 @@ func (r *Reconciler) Sync(ctx context.Context, crRemoteStorageNodeSet *v1alpha1. var err error remoteStorageNodeSet := resources.NewRemoteStorageNodeSet(crRemoteStorageNodeSet) - remoteObjects := remoteStorageNodeSet.GetRemoteObjects() + remoteObjects := remoteStorageNodeSet.GetRemoteObjects(r.Scheme) - stop, result, err = r.initRemoteResourcesStatus(ctx, &remoteStorageNodeSet, remoteObjects) + stop, result, err = r.initRemoteObjectsStatus(ctx, &remoteStorageNodeSet, remoteObjects) if stop { return result, err } @@ -44,7 +44,7 @@ func (r *Reconciler) Sync(ctx context.Context, crRemoteStorageNodeSet *v1alpha1. return result, err } - return result, err + return ctrl.Result{}, nil } func (r *Reconciler) handleResourcesSync( @@ -98,6 +98,7 @@ func (r *Reconciler) handleResourcesSync( } } + r.Log.Info("complete step handleResourcesSync") return r.updateRemoteStatus(ctx, remoteStorageNodeSet) } @@ -164,13 +165,13 @@ func (r *Reconciler) updateRemoteStatus( remoteStorageNodeSet, corev1.EventTypeNormal, "StatusChanged", - "StorageNodeSet status updated on remote cluster", + "Status updated on remote cluster", ) r.RemoteRecorder.Event( remoteStorageNodeSet, corev1.EventTypeNormal, "StatusChanged", - "RemoteStorageNodeSet status updated", + "Status updated", ) r.Log.Info("step updateRemoteStatus requeue reconcile") return Stop, ctrl.Result{RequeueAfter: StatusUpdateRequeueDelay}, nil diff --git a/internal/controllers/storagenodeset/controller.go b/internal/controllers/storagenodeset/controller.go index 79b9fabb..81b3da48 100644 --- a/internal/controllers/storagenodeset/controller.go +++ b/internal/controllers/storagenodeset/controller.go @@ -38,16 +38,16 @@ type Reconciler struct { // Reconcile is part of the main kubernetes reconciliation loop which aims to // move the current state of the cluster closer to the desired state. func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - logger := log.FromContext(ctx) + r.Log = log.FromContext(ctx) crStorageNodeSet := &v1alpha1.StorageNodeSet{} err := r.Get(ctx, req.NamespacedName, crStorageNodeSet) if err != nil { if apierrors.IsNotFound(err) { - logger.Info("StorageNodeSet resource not found") + r.Log.Info("StorageNodeSet resource not found") return ctrl.Result{Requeue: false}, nil } - logger.Error(err, "unable to get StorageNodeSet") + r.Log.Error(err, "unable to get StorageNodeSet") return ctrl.Result{RequeueAfter: DefaultRequeueDelay}, err } diff --git a/internal/resources/remotedatabasenodeset.go b/internal/resources/remotedatabasenodeset.go index c5762f59..99877652 100644 --- a/internal/resources/remotedatabasenodeset.go +++ b/internal/resources/remotedatabasenodeset.go @@ -7,8 +7,9 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" api "github.com/ydb-platform/ydb-kubernetes-operator/api/v1alpha1" ydbannotations "github.com/ydb-platform/ydb-kubernetes-operator/internal/annotations" @@ -84,12 +85,14 @@ func NewRemoteDatabaseNodeSet(remoteDatabaseNodeSet *api.RemoteDatabaseNodeSet) return RemoteDatabaseNodeSetResource{RemoteDatabaseNodeSet: crRemoteDatabaseNodeSet} } -func (b *RemoteDatabaseNodeSetResource) GetRemoteObjects() []client.Object { - objects := []client.Object{} +func (b *RemoteDatabaseNodeSetResource) GetRemoteObjects( + scheme *runtime.Scheme, +) []client.Object { + remoteObjects := []client.Object{} // sync Secrets for _, secret := range b.Spec.Secrets { - objects = append(objects, + remoteObjects = append(remoteObjects, &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: secret.Name, @@ -99,7 +102,7 @@ func (b *RemoteDatabaseNodeSetResource) GetRemoteObjects() []client.Object { } // sync ConfigMap - objects = append(objects, + remoteObjects = append(remoteObjects, &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: b.Spec.DatabaseRef.Name, @@ -108,7 +111,7 @@ func (b *RemoteDatabaseNodeSetResource) GetRemoteObjects() []client.Object { }) // sync Services - objects = append(objects, + remoteObjects = append(remoteObjects, &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf(GRPCServiceNameFormat, b.Spec.DatabaseRef.Name), @@ -129,7 +132,7 @@ func (b *RemoteDatabaseNodeSetResource) GetRemoteObjects() []client.Object { }, ) if b.Spec.Datastreams != nil && b.Spec.Datastreams.Enabled { - objects = append(objects, + remoteObjects = append(remoteObjects, &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf(DatastreamsServiceNameFormat, b.Spec.DatabaseRef.Name), @@ -138,7 +141,12 @@ func (b *RemoteDatabaseNodeSetResource) GetRemoteObjects() []client.Object { }) } - return objects + for _, remoteObj := range remoteObjects { + remoteObjGVK, _ := apiutil.GVKForObject(remoteObj, scheme) + remoteObj.GetObjectKind().SetGroupVersionKind(remoteObjGVK) + } + + return remoteObjects } func (b *RemoteDatabaseNodeSetResource) SetPrimaryResourceAnnotations(obj client.Object) { @@ -154,30 +162,78 @@ func (b *RemoteDatabaseNodeSetResource) SetPrimaryResourceAnnotations(obj client obj.SetAnnotations(annotations) } -func (b *RemoteDatabaseNodeSetResource) SetRemoteResourceStatus(remoteObj client.Object, remoteObjGVK schema.GroupVersionKind) { - for idx := range b.Status.RemoteResources { - if EqualRemoteResourceWithObject(&b.Status.RemoteResources[idx], b.Namespace, remoteObj, remoteObjGVK) { - meta.SetStatusCondition(&b.Status.RemoteResources[idx].Conditions, - metav1.Condition{ - Type: RemoteResourceSyncedCondition, - Status: "True", - Reason: ReasonCompleted, - Message: fmt.Sprintf("Resource updated with resourceVersion %s", remoteObj.GetResourceVersion()), - }) - b.Status.RemoteResources[idx].State = ResourceSyncSuccess +func (b *RemoteDatabaseNodeSetResource) UnsetPrimaryResourceAnnotations(obj client.Object) { + annotations := make(map[string]string) + for key, value := range obj.GetAnnotations() { + if key != annotations[ydbannotations.PrimaryResourceDatabaseAnnotation] { + annotations[key] = value } } + obj.SetAnnotations(annotations) } -func (b *RemoteDatabaseNodeSetResource) RemoveRemoteResourceStatus(remoteObj client.Object, remoteObjGVK schema.GroupVersionKind) { - syncedResources := append([]api.RemoteResource{}, b.Status.RemoteResources...) - for idx := range syncedResources { - if EqualRemoteResourceWithObject(&syncedResources[idx], b.Namespace, remoteObj, remoteObjGVK) { - b.Status.RemoteResources = append( - b.Status.RemoteResources[:idx], - b.Status.RemoteResources[idx+1:]..., - ) +func (b *RemoteDatabaseNodeSetResource) CreateRemoteResourceStatus( + remoteObj client.Object, +) { + b.Status.RemoteResources = append( + b.Status.RemoteResources, + api.RemoteResource{ + Group: remoteObj.GetObjectKind().GroupVersionKind().Group, + Version: remoteObj.GetObjectKind().GroupVersionKind().Version, + Kind: remoteObj.GetObjectKind().GroupVersionKind().Kind, + Name: remoteObj.GetName(), + State: ResourceSyncPending, + Conditions: []metav1.Condition{}, + }, + ) + meta.SetStatusCondition( + &b.Status.RemoteResources[len(b.Status.RemoteResources)-1].Conditions, + metav1.Condition{ + Type: RemoteResourceSyncedCondition, + Status: "Unknown", + Reason: ReasonInProgress, + }, + ) +} + +func (b *RemoteDatabaseNodeSetResource) UpdateRemoteResourceStatus( + remoteResource *api.RemoteResource, + status metav1.ConditionStatus, + resourceVersion string, +) { + if status == metav1.ConditionFalse { + meta.SetStatusCondition(&remoteResource.Conditions, + metav1.Condition{ + Type: RemoteResourceSyncedCondition, + Status: metav1.ConditionFalse, + Reason: ReasonInProgress, + Message: fmt.Sprintf("Failed to sync remoteObject to resourceVersion %s", resourceVersion), + }) + remoteResource.State = ResourceSyncPending + } + + if status == metav1.ConditionTrue { + meta.SetStatusCondition(&remoteResource.Conditions, + metav1.Condition{ + Type: RemoteResourceSyncedCondition, + Status: metav1.ConditionTrue, + Reason: ReasonCompleted, + Message: fmt.Sprintf("Successfully synced remoteObject to resourceVersion %s", resourceVersion), + }) + remoteResource.State = ResourceSyncSuccess + } +} + +func (b *RemoteDatabaseNodeSetResource) RemoveRemoteResourceStatus(remoteObj client.Object) { + var idxRemoteObj int + for idx := range b.Status.RemoteResources { + if EqualRemoteResourceWithObject(&b.Status.RemoteResources[idx], remoteObj) { + idxRemoteObj = idx break } } + b.Status.RemoteResources = append( + b.Status.RemoteResources[:idxRemoteObj], + b.Status.RemoteResources[idxRemoteObj+1:]..., + ) } diff --git a/internal/resources/remotestoragenodeset.go b/internal/resources/remotestoragenodeset.go index c710c411..635ed591 100644 --- a/internal/resources/remotestoragenodeset.go +++ b/internal/resources/remotestoragenodeset.go @@ -7,8 +7,9 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" api "github.com/ydb-platform/ydb-kubernetes-operator/api/v1alpha1" ydbannotations "github.com/ydb-platform/ydb-kubernetes-operator/internal/annotations" @@ -84,12 +85,14 @@ func NewRemoteStorageNodeSet(remoteStorageNodeSet *api.RemoteStorageNodeSet) Rem return RemoteStorageNodeSetResource{RemoteStorageNodeSet: crRemoteStorageNodeSet} } -func (b *RemoteStorageNodeSetResource) GetRemoteObjects() []client.Object { - objects := []client.Object{} +func (b *RemoteStorageNodeSetResource) GetRemoteObjects( + scheme *runtime.Scheme, +) []client.Object { + remoteObjects := []client.Object{} // sync Secrets for _, secret := range b.Spec.Secrets { - objects = append(objects, + remoteObjects = append(remoteObjects, &corev1.Secret{ ObjectMeta: metav1.ObjectMeta{ Name: secret.Name, @@ -99,7 +102,7 @@ func (b *RemoteStorageNodeSetResource) GetRemoteObjects() []client.Object { } // sync ConfigMap - objects = append(objects, + remoteObjects = append(remoteObjects, &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: b.Spec.StorageRef.Name, @@ -108,7 +111,7 @@ func (b *RemoteStorageNodeSetResource) GetRemoteObjects() []client.Object { }) // sync Services - objects = append(objects, + remoteObjects = append(remoteObjects, &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: fmt.Sprintf(GRPCServiceNameFormat, b.Spec.StorageRef.Name), @@ -129,7 +132,12 @@ func (b *RemoteStorageNodeSetResource) GetRemoteObjects() []client.Object { }, ) - return objects + for _, remoteObj := range remoteObjects { + remoteObjGVK, _ := apiutil.GVKForObject(remoteObj, scheme) + remoteObj.GetObjectKind().SetGroupVersionKind(remoteObjGVK) + } + + return remoteObjects } func (b *RemoteStorageNodeSetResource) SetPrimaryResourceAnnotations(obj client.Object) { @@ -145,30 +153,76 @@ func (b *RemoteStorageNodeSetResource) SetPrimaryResourceAnnotations(obj client. obj.SetAnnotations(annotations) } -func (b *RemoteStorageNodeSetResource) SetRemoteResourceStatus(remoteObj client.Object, remoteObjGVK schema.GroupVersionKind) { - for idx := range b.Status.RemoteResources { - if EqualRemoteResourceWithObject(&b.Status.RemoteResources[idx], b.Namespace, remoteObj, remoteObjGVK) { - meta.SetStatusCondition(&b.Status.RemoteResources[idx].Conditions, - metav1.Condition{ - Type: RemoteResourceSyncedCondition, - Status: "True", - Reason: ReasonCompleted, - Message: fmt.Sprintf("Resource updated with resourceVersion %s", remoteObj.GetResourceVersion()), - }) - b.Status.RemoteResources[idx].State = ResourceSyncSuccess +func (b *RemoteStorageNodeSetResource) UnsetPrimaryResourceAnnotations(obj client.Object) { + annotations := make(map[string]string) + for key, value := range obj.GetAnnotations() { + if key != annotations[ydbannotations.PrimaryResourceStorageAnnotation] { + annotations[key] = value } } + obj.SetAnnotations(annotations) } -func (b *RemoteStorageNodeSetResource) RemoveRemoteResourceStatus(remoteObj client.Object, remoteObjGVK schema.GroupVersionKind) { - syncedResources := append([]api.RemoteResource{}, b.Status.RemoteResources...) - for idx := range syncedResources { - if EqualRemoteResourceWithObject(&syncedResources[idx], b.Namespace, remoteObj, remoteObjGVK) { - b.Status.RemoteResources = append( - b.Status.RemoteResources[:idx], - b.Status.RemoteResources[idx+1:]..., - ) +func (b *RemoteStorageNodeSetResource) CreateRemoteResourceStatus(remoteObj client.Object) { + b.Status.RemoteResources = append( + b.Status.RemoteResources, + api.RemoteResource{ + Group: remoteObj.GetObjectKind().GroupVersionKind().Group, + Version: remoteObj.GetObjectKind().GroupVersionKind().Version, + Kind: remoteObj.GetObjectKind().GroupVersionKind().Kind, + Name: remoteObj.GetName(), + State: ResourceSyncPending, + Conditions: []metav1.Condition{}, + }, + ) + meta.SetStatusCondition( + &b.Status.RemoteResources[len(b.Status.RemoteResources)-1].Conditions, + metav1.Condition{ + Type: RemoteResourceSyncedCondition, + Status: "Unknown", + Reason: ReasonInProgress, + }, + ) +} + +func (b *RemoteStorageNodeSetResource) UpdateRemoteResourceStatus( + remoteResource *api.RemoteResource, + status metav1.ConditionStatus, + resourceVersion string, +) { + if status == metav1.ConditionFalse { + meta.SetStatusCondition(&remoteResource.Conditions, + metav1.Condition{ + Type: RemoteResourceSyncedCondition, + Status: metav1.ConditionFalse, + Reason: ReasonInProgress, + Message: fmt.Sprintf("Failed to sync remoteObject to resourceVersion %s", resourceVersion), + }) + remoteResource.State = ResourceSyncPending + } + + if status == metav1.ConditionTrue { + meta.SetStatusCondition(&remoteResource.Conditions, + metav1.Condition{ + Type: RemoteResourceSyncedCondition, + Status: metav1.ConditionTrue, + Reason: ReasonCompleted, + Message: fmt.Sprintf("Successfully synced remoteObject to resourceVersion %s", resourceVersion), + }) + remoteResource.State = ResourceSyncSuccess + } +} + +func (b *RemoteStorageNodeSetResource) RemoveRemoteResourceStatus(remoteObj client.Object) { + var idxRemoteObj int + for idx := range b.Status.RemoteResources { + if EqualRemoteResourceWithObject(&b.Status.RemoteResources[idx], remoteObj) { + idxRemoteObj = idx break } } + b.Status.RemoteResources = append( + b.Status.RemoteResources[:idxRemoteObj], + b.Status.RemoteResources[idxRemoteObj+1:]..., + ) } diff --git a/internal/resources/resource.go b/internal/resources/resource.go index 03fe495b..29ccd4dd 100644 --- a/internal/resources/resource.go +++ b/internal/resources/resource.go @@ -207,13 +207,15 @@ func CreateResource(obj client.Object) client.Object { createdObj.SetUID("") createdObj.SetOwnerReferences([]metav1.OwnerReference{}) createdObj.SetFinalizers([]string{}) + createdObj.SetManagedFields([]metav1.ManagedFieldsEntry{}) if svc, ok := createdObj.(*corev1.Service); ok { svc.Spec.ClusterIP = "" svc.Spec.ClusterIPs = nil } - setRemoteResourceVersionAnnotation(createdObj, obj.GetResourceVersion()) + // Set remoteResourceVersion annotation + SetRemoteResourceVersionAnnotation(createdObj, obj.GetResourceVersion()) return createdObj } @@ -227,18 +229,35 @@ func UpdateResource(oldObj, newObj client.Object) client.Object { updatedObj.SetUID(oldObj.GetUID()) updatedObj.SetOwnerReferences(oldObj.GetOwnerReferences()) updatedObj.SetFinalizers(oldObj.GetFinalizers()) + updatedObj.SetManagedFields(oldObj.GetManagedFields()) + // Specific fields to save for Service object if svc, ok := updatedObj.(*corev1.Service); ok { svc.Spec.ClusterIP = oldObj.(*corev1.Service).Spec.ClusterIP svc.Spec.ClusterIPs = append([]string{}, oldObj.(*corev1.Service).Spec.ClusterIPs...) } - setRemoteResourceVersionAnnotation(updatedObj, newObj.GetResourceVersion()) + // Copy primaryResource annotations + CopyPrimaryResourceObjectAnnotation(updatedObj, oldObj.GetAnnotations()) + + // Set remoteResourceVersion annotation + SetRemoteResourceVersionAnnotation(updatedObj, newObj.GetResourceVersion()) return updatedObj } -func setRemoteResourceVersionAnnotation(obj client.Object, resourceVersion string) { +func CopyPrimaryResourceObjectAnnotation(obj client.Object, oldAnnotations map[string]string) { + annotations := CopyDict(obj.GetAnnotations()) + for key, value := range oldAnnotations { + if key == ydbannotations.PrimaryResourceDatabaseAnnotation || + key == ydbannotations.PrimaryResourceStorageAnnotation { + annotations[key] = value + } + } + obj.SetAnnotations(annotations) +} + +func SetRemoteResourceVersionAnnotation(obj client.Object, resourceVersion string) { annotations := make(map[string]string) for key, value := range obj.GetAnnotations() { annotations[key] = value @@ -284,17 +303,32 @@ func ConvertRemoteResourceToObject(remoteResource api.RemoteResource, namespace return runtimeObj.(client.Object), nil } +func GetPatchResult( + localObj client.Object, + remoteObj client.Object, +) (*patch.PatchResult, error) { + // Get diff resources and compare bytes by k8s-objectmatcher PatchMaker + updatedObj := UpdateResource(localObj, remoteObj) + patchResult, err := patchMaker.Calculate(localObj, updatedObj, + []patch.CalculateOption{ + patch.IgnoreStatusFields(), + }..., + ) + if err != nil { + return nil, err + } + + return patchResult, nil +} + func EqualRemoteResourceWithObject( remoteResource *api.RemoteResource, - namespace string, remoteObj client.Object, - remoteObjGVK schema.GroupVersionKind, ) bool { if remoteObj.GetName() == remoteResource.Name && - remoteObj.GetNamespace() == namespace && - remoteObjGVK.Kind == remoteResource.Kind && - remoteObjGVK.Group == remoteResource.Group && - remoteObjGVK.Version == remoteResource.Version { + remoteObj.GetObjectKind().GroupVersionKind().Kind == remoteResource.Kind && + remoteObj.GetObjectKind().GroupVersionKind().Group == remoteResource.Group && + remoteObj.GetObjectKind().GroupVersionKind().Version == remoteResource.Version { return true } return false