Skip to content

Commit 64e0f0b

Browse files
committedJul 6, 2024
⚠️ Add TypedReconciler
This change adds a TypedReconciler which allows to customize the type being used in the workqueue. There is a number of situations where a custom type might be better than the default `reconcile.Request`: * Multi-Cluster controllers might want to put the clusters in there * Some controllers do not reconcile individual resources of a given type but all of them at once, for example IngressControllers might do this * Some controllers do not operate on Kubernetes resources at all
1 parent e28a842 commit 64e0f0b

29 files changed

+954
-567
lines changed
 

‎.golangci.yml

-4
Original file line numberDiff line numberDiff line change
@@ -122,10 +122,6 @@ issues:
122122
- linters:
123123
- staticcheck
124124
text: "SA1019: .*The component config package has been deprecated and will be removed in a future release."
125-
- linters:
126-
- staticcheck
127-
# Will be addressed separately.
128-
text: "SA1019: workqueue.(RateLimitingInterface|DefaultControllerRateLimiter|New|NewItemExponentialFailureRateLimiter|NewRateLimitingQueueWithConfig|DefaultItemBasedRateLimiter|RateLimitingQueueConfig) is deprecated:"
129125
# With Go 1.16, the new embed directive can be used with an un-named import,
130126
# revive (previously, golint) only allows these to be imported in a main.go, which wouldn't work for us.
131127
# This directive allows the embed package to be imported with an underscore everywhere.

‎examples/multiclustersync/main.go

+178
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
8+
corev1 "k8s.io/api/core/v1"
9+
apierrors "k8s.io/apimachinery/pkg/api/errors"
10+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11+
"k8s.io/apimachinery/pkg/types"
12+
ctrl "sigs.k8s.io/controller-runtime"
13+
"sigs.k8s.io/controller-runtime/pkg/builder"
14+
"sigs.k8s.io/controller-runtime/pkg/client"
15+
"sigs.k8s.io/controller-runtime/pkg/cluster"
16+
"sigs.k8s.io/controller-runtime/pkg/controller"
17+
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
18+
"sigs.k8s.io/controller-runtime/pkg/handler"
19+
"sigs.k8s.io/controller-runtime/pkg/log"
20+
"sigs.k8s.io/controller-runtime/pkg/log/zap"
21+
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
22+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
23+
"sigs.k8s.io/controller-runtime/pkg/source"
24+
)
25+
26+
func main() {
27+
if err := run(); err != nil {
28+
fmt.Fprintf(os.Stderr, "%v\n", err)
29+
os.Exit(1)
30+
}
31+
}
32+
33+
const (
34+
sourceNamespace = "namespace-to-sync-all-secrets-from"
35+
targetNamespace = "namespace-to-sync-all-secrets-to"
36+
)
37+
38+
func run() error {
39+
log.SetLogger(zap.New())
40+
41+
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})
42+
if err != nil {
43+
return fmt.Errorf("failed to construct manager: %w", err)
44+
}
45+
46+
allTargets := map[string]cluster.Cluster{}
47+
48+
cluster, err := cluster.New(ctrl.GetConfigOrDie())
49+
if err != nil {
50+
return fmt.Errorf("failed to construct clusters: %w", err)
51+
}
52+
if err := mgr.Add(cluster); err != nil {
53+
return fmt.Errorf("failed to add cluster to manager: %w", err)
54+
}
55+
56+
// Add more target clusters here as needed
57+
allTargets["self"] = cluster
58+
59+
b := builder.TypedControllerManagedBy[request](mgr).
60+
Named("secret-sync").
61+
// Watch secrets in the source namespace of the source cluster and
62+
// create requests for each target cluster
63+
WatchesRawSource(source.TypedKind(
64+
mgr.GetCache(),
65+
&corev1.Secret{},
66+
handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, s *corev1.Secret) []request {
67+
if s.Namespace != sourceNamespace {
68+
return nil
69+
}
70+
71+
result := make([]request, 0, len(allTargets))
72+
for targetCluster := range allTargets {
73+
result = append(result, request{
74+
NamespacedName: types.NamespacedName{Namespace: s.Namespace, Name: s.Name},
75+
clusterName: targetCluster,
76+
})
77+
}
78+
79+
return result
80+
}),
81+
)).
82+
WithOptions(controller.TypedOptions[request]{MaxConcurrentReconciles: 10})
83+
84+
for targetClusterName, targetCluster := range allTargets {
85+
// Watch secrets in the target namespace of each target cluster
86+
// and create a request for itself.
87+
b = b.WatchesRawSource(source.TypedKind(
88+
targetCluster.GetCache(),
89+
&corev1.Secret{},
90+
handler.TypedEnqueueRequestsFromMapFunc(func(ctx context.Context, s *corev1.Secret) []request {
91+
if s.Namespace != targetNamespace {
92+
return nil
93+
}
94+
95+
return []request{{
96+
NamespacedName: types.NamespacedName{Namespace: sourceNamespace, Name: s.Name},
97+
clusterName: targetClusterName,
98+
}}
99+
}),
100+
))
101+
}
102+
103+
clients := make(map[string]client.Client, len(allTargets))
104+
for targetClusterName, targetCluster := range allTargets {
105+
clients[targetClusterName] = targetCluster.GetClient()
106+
}
107+
108+
if err := b.Complete(&secretSyncReconcier{
109+
source: mgr.GetClient(),
110+
targets: clients,
111+
}); err != nil {
112+
return fmt.Errorf("failed to build reconciler: %w", err)
113+
}
114+
115+
ctx := signals.SetupSignalHandler()
116+
if err := mgr.Start(ctx); err != nil {
117+
return fmt.Errorf("failed to start manager: %w", err)
118+
}
119+
120+
return nil
121+
}
122+
123+
type request struct {
124+
types.NamespacedName
125+
clusterName string
126+
}
127+
128+
// secretSyncReconcier is a simple reconciler that keeps all secrets in the source namespace of a given
129+
// source cluster in sync with the secrets in the target namespace of all target clusters.
130+
type secretSyncReconcier struct {
131+
source client.Client
132+
targets map[string]client.Client
133+
}
134+
135+
func (s *secretSyncReconcier) Reconcile(ctx context.Context, req request) (reconcile.Result, error) {
136+
targetClient, found := s.targets[req.clusterName]
137+
if !found {
138+
return reconcile.Result{}, reconcile.TerminalError(fmt.Errorf("target cluster %s not found", req.clusterName))
139+
}
140+
141+
var reference corev1.Secret
142+
if err := s.source.Get(ctx, req.NamespacedName, &reference); err != nil {
143+
if !apierrors.IsNotFound(err) {
144+
return reconcile.Result{}, fmt.Errorf("failed to get secret %s from reference cluster: %w", req.String(), err)
145+
}
146+
if err := targetClient.Delete(ctx, &corev1.Secret{ObjectMeta: metav1.ObjectMeta{
147+
Name: req.Name,
148+
Namespace: targetNamespace,
149+
}}); err != nil {
150+
if !apierrors.IsNotFound(err) {
151+
return reconcile.Result{}, fmt.Errorf("failed to delete secret %s/%s in cluster %s: %w", targetNamespace, req.Name, req.clusterName, err)
152+
}
153+
154+
return reconcile.Result{}, nil
155+
}
156+
157+
log.FromContext(ctx).Info("Deleted secret", "cluster", req.clusterName, "namespace", targetNamespace, "name", req.Name)
158+
return reconcile.Result{}, nil
159+
}
160+
161+
target := &corev1.Secret{ObjectMeta: metav1.ObjectMeta{
162+
Name: reference.Name,
163+
Namespace: targetNamespace,
164+
}}
165+
result, err := controllerutil.CreateOrUpdate(ctx, targetClient, target, func() error {
166+
target.Data = reference.Data
167+
return nil
168+
})
169+
if err != nil {
170+
return reconcile.Result{}, fmt.Errorf("failed to upsert target secret %s/%s: %w", target.Namespace, target.Name, err)
171+
}
172+
173+
if result != controllerutil.OperationResultNone {
174+
log.FromContext(ctx).Info("Upserted secret", "cluster", req.clusterName, "namespace", targetNamespace, "name", req.Name, "result", result)
175+
}
176+
177+
return reconcile.Result{}, nil
178+
}

‎examples/typed/main.go

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
8+
networkingv1 "k8s.io/api/networking/v1"
9+
ctrl "sigs.k8s.io/controller-runtime"
10+
"sigs.k8s.io/controller-runtime/pkg/builder"
11+
"sigs.k8s.io/controller-runtime/pkg/handler"
12+
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
13+
"sigs.k8s.io/controller-runtime/pkg/reconcile"
14+
"sigs.k8s.io/controller-runtime/pkg/source"
15+
)
16+
17+
func main() {
18+
if err := run(); err != nil {
19+
fmt.Fprintf(os.Stderr, "%v\n", err)
20+
os.Exit(1)
21+
}
22+
}
23+
24+
func run() error {
25+
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{})
26+
if err != nil {
27+
return fmt.Errorf("failed to construct manager: %w", err)
28+
}
29+
30+
// Use a request type that is always equal to itself so the workqueue
31+
// de-duplicates all events.
32+
// This can for example be useful for an ingress-controller that
33+
// generates a config from all ingresses, rather than individual ones.
34+
type request struct{}
35+
36+
r := reconcile.TypedFunc[request](func(ctx context.Context, _ request) (reconcile.Result, error) {
37+
ingressList := &networkingv1.IngressList{}
38+
if err := mgr.GetClient().List(ctx, ingressList); err != nil {
39+
return reconcile.Result{}, fmt.Errorf("failed to list ingresses: %w", err)
40+
}
41+
42+
buildIngressConfig(ingressList)
43+
return reconcile.Result{}, nil
44+
})
45+
if err := builder.TypedControllerManagedBy[request](mgr).
46+
WatchesRawSource(source.TypedKind(
47+
mgr.GetCache(),
48+
&networkingv1.Ingress{},
49+
handler.TypedEnqueueRequestsFromMapFunc(func(context.Context, *networkingv1.Ingress) []request {
50+
return []request{{}}
51+
})),
52+
).
53+
Named("ingress_controller").
54+
Complete(r); err != nil {
55+
return fmt.Errorf("failed to construct ingress-controller: %w", err)
56+
}
57+
58+
ctx := signals.SetupSignalHandler()
59+
if err := mgr.Start(ctx); err != nil {
60+
return fmt.Errorf("failed to start manager: %w", err)
61+
}
62+
63+
return nil
64+
}
65+
66+
func buildIngressConfig(*networkingv1.IngressList) {}

0 commit comments

Comments
 (0)