Skip to content

Commit d6a053f

Browse files
committed
Refactor source/handler/predicate packages to remove dep injection
Signed-off-by: Vince Prignano <vince@prigna.com>
1 parent 16f7965 commit d6a053f

28 files changed

+217
-698
lines changed

examples/builtins/main.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,14 @@ func main() {
5959
}
6060

6161
// Watch ReplicaSets and enqueue ReplicaSet object key
62-
if err := c.Watch(&source.Kind{Type: &appsv1.ReplicaSet{}}, &handler.EnqueueRequestForObject{}); err != nil {
62+
if err := c.Watch(source.Kind(mgr.GetCache(), &appsv1.ReplicaSet{}), &handler.EnqueueRequestForObject{}); err != nil {
6363
entryLog.Error(err, "unable to watch ReplicaSets")
6464
os.Exit(1)
6565
}
6666

6767
// Watch Pods and enqueue owning ReplicaSet key
68-
if err := c.Watch(&source.Kind{Type: &corev1.Pod{}},
69-
&handler.EnqueueRequestForOwner{OwnerType: &appsv1.ReplicaSet{}, IsController: true}); err != nil {
68+
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}),
69+
handler.EnqueueRequestForOwner(mgr.GetScheme(), mgr.GetRESTMapper(), &appsv1.ReplicaSet{}, handler.OnlyControllerOwner())); err != nil {
7070
entryLog.Error(err, "unable to watch Pods")
7171
os.Exit(1)
7272
}

hack/test-all.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ if [[ -n ${ARTIFACTS:-} ]]; then
2525
fi
2626

2727
result=0
28-
go test -race ${P_FLAG} ${MOD_OPT} ./... ${GINKGO_ARGS} || result=$?
28+
go test -v -race ${P_FLAG} ${MOD_OPT} ./... --ginkgo.fail-fast ${GINKGO_ARGS} || result=$?
2929

3030
if [[ -n ${ARTIFACTS:-} ]]; then
3131
mkdir -p ${ARTIFACTS}

pkg/builder/controller.go

+18-22
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import (
2222
"strings"
2323

2424
"github.com/go-logr/logr"
25-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2625
"k8s.io/apimachinery/pkg/runtime/schema"
2726
"k8s.io/klog/v2"
2827

@@ -197,18 +196,16 @@ func (blder *Builder) Build(r reconcile.Reconciler) (controller.Controller, erro
197196
return blder.ctrl, nil
198197
}
199198

200-
func (blder *Builder) project(obj client.Object, proj objectProjection) (client.Object, error) {
199+
func (blder *Builder) project(obj client.Object, proj objectProjection) (source.Source, error) {
200+
src := source.Kind(blder.mgr.GetCache(), obj)
201201
switch proj {
202202
case projectAsNormal:
203-
return obj, nil
203+
return src, nil
204204
case projectAsMetadata:
205-
metaObj := &metav1.PartialObjectMetadata{}
206-
gvk, err := getGvk(obj, blder.mgr.GetScheme())
207-
if err != nil {
208-
return nil, fmt.Errorf("unable to determine GVK of %T for a metadata-only watch: %w", obj, err)
205+
if err := source.KindAsPartialMetadata(src, blder.mgr.GetScheme()); err != nil {
206+
return nil, err
209207
}
210-
metaObj.SetGroupVersionKind(gvk)
211-
return metaObj, nil
208+
return src, nil
212209
default:
213210
panic(fmt.Sprintf("unexpected projection type %v on type %T, should not be possible since this is an internal field", proj, obj))
214211
}
@@ -217,11 +214,10 @@ func (blder *Builder) project(obj client.Object, proj objectProjection) (client.
217214
func (blder *Builder) doWatch() error {
218215
// Reconcile type
219216
if blder.forInput.object != nil {
220-
typeForSrc, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
217+
src, err := blder.project(blder.forInput.object, blder.forInput.objectProjection)
221218
if err != nil {
222219
return err
223220
}
224-
src := &source.Kind{Type: typeForSrc}
225221
hdler := &handler.EnqueueRequestForObject{}
226222
allPredicates := append(blder.globalPredicates, blder.forInput.predicates...)
227223
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
@@ -234,15 +230,15 @@ func (blder *Builder) doWatch() error {
234230
return errors.New("Owns() can only be used together with For()")
235231
}
236232
for _, own := range blder.ownsInput {
237-
typeForSrc, err := blder.project(own.object, own.objectProjection)
233+
src, err := blder.project(own.object, own.objectProjection)
238234
if err != nil {
239235
return err
240236
}
241-
src := &source.Kind{Type: typeForSrc}
242-
hdler := &handler.EnqueueRequestForOwner{
243-
OwnerType: blder.forInput.object,
244-
IsController: true,
245-
}
237+
hdler := handler.EnqueueRequestForOwner(
238+
blder.mgr.GetScheme(), blder.mgr.GetRESTMapper(),
239+
blder.forInput.object,
240+
handler.OnlyControllerOwner(),
241+
)
246242
allPredicates := append([]predicate.Predicate(nil), blder.globalPredicates...)
247243
allPredicates = append(allPredicates, own.predicates...)
248244
if err := blder.ctrl.Watch(src, hdler, allPredicates...); err != nil {
@@ -259,12 +255,12 @@ func (blder *Builder) doWatch() error {
259255
allPredicates = append(allPredicates, w.predicates...)
260256

261257
// If the source of this watch is of type *source.Kind, project it.
262-
if srckind, ok := w.src.(*source.Kind); ok {
263-
typeForSrc, err := blder.project(srckind.Type, w.objectProjection)
264-
if err != nil {
265-
return err
258+
if srckind, ok := w.src.(source.SyncingSource); ok {
259+
if w.objectProjection == projectAsMetadata {
260+
if err := source.KindAsPartialMetadata(srckind, blder.mgr.GetScheme()); err != nil {
261+
return err
262+
}
266263
}
267-
srckind.Type = typeForSrc
268264
}
269265

270266
if err := blder.ctrl.Watch(w.src, w.eventhandler, allPredicates...); err != nil {

pkg/builder/controller_test.go

+10-8
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ var _ = Describe("application", func() {
118118
Expect(err).NotTo(HaveOccurred())
119119

120120
instance, err := ControllerManagedBy(m).
121-
Watches(&source.Kind{Type: &appsv1.ReplicaSet{}}, &handler.EnqueueRequestForObject{}).
121+
Watches(source.Kind(m.GetCache(), &appsv1.ReplicaSet{}), &handler.EnqueueRequestForObject{}).
122122
Build(noop)
123123
Expect(err).To(MatchError(ContainSubstring("one of For() or Named() must be called")))
124124
Expect(instance).To(BeNil())
@@ -157,7 +157,7 @@ var _ = Describe("application", func() {
157157

158158
instance, err := ControllerManagedBy(m).
159159
Named("my_controller").
160-
Watches(&source.Kind{Type: &appsv1.ReplicaSet{}}, &handler.EnqueueRequestForObject{}).
160+
Watches(source.Kind(m.GetCache(), &appsv1.ReplicaSet{}), &handler.EnqueueRequestForObject{}).
161161
Build(noop)
162162
Expect(err).NotTo(HaveOccurred())
163163
Expect(instance).NotTo(BeNil())
@@ -369,8 +369,9 @@ var _ = Describe("application", func() {
369369
bldr := ControllerManagedBy(m).
370370
For(&appsv1.Deployment{}).
371371
Watches( // Equivalent of Owns
372-
&source.Kind{Type: &appsv1.ReplicaSet{}},
373-
&handler.EnqueueRequestForOwner{OwnerType: &appsv1.Deployment{}, IsController: true})
372+
source.Kind(m.GetCache(), &appsv1.ReplicaSet{}),
373+
handler.EnqueueRequestForOwner(m.GetScheme(), m.GetRESTMapper(), &appsv1.Deployment{}, handler.OnlyControllerOwner()),
374+
)
374375

375376
ctx, cancel := context.WithCancel(context.Background())
376377
defer cancel()
@@ -384,10 +385,11 @@ var _ = Describe("application", func() {
384385
bldr := ControllerManagedBy(m).
385386
Named("Deployment").
386387
Watches( // Equivalent of For
387-
&source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForObject{}).
388+
source.Kind(m.GetCache(), &appsv1.Deployment{}), &handler.EnqueueRequestForObject{}).
388389
Watches( // Equivalent of Owns
389-
&source.Kind{Type: &appsv1.ReplicaSet{}},
390-
&handler.EnqueueRequestForOwner{OwnerType: &appsv1.Deployment{}, IsController: true})
390+
source.Kind(m.GetCache(), &appsv1.ReplicaSet{}),
391+
handler.EnqueueRequestForOwner(m.GetScheme(), m.GetRESTMapper(), &appsv1.Deployment{}, handler.OnlyControllerOwner()),
392+
)
391393

392394
ctx, cancel := context.WithCancel(context.Background())
393395
defer cancel()
@@ -481,7 +483,7 @@ var _ = Describe("application", func() {
481483
bldr := ControllerManagedBy(mgr).
482484
For(&appsv1.Deployment{}, OnlyMetadata).
483485
Owns(&appsv1.ReplicaSet{}, OnlyMetadata).
484-
Watches(&source.Kind{Type: &appsv1.StatefulSet{}},
486+
Watches(source.Kind(mgr.GetCache(), &appsv1.StatefulSet{}),
485487
handler.EnqueueRequestsFromMapFunc(func(o client.Object) []reconcile.Request {
486488
defer GinkgoRecover()
487489

pkg/cluster/cluster.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ type Cluster interface {
4545
// GetConfig returns an initialized Config
4646
GetConfig() *rest.Config
4747

48+
// GetCache returns a cache.Cache
49+
GetCache() cache.Cache
50+
4851
// GetScheme returns an initialized Scheme
4952
GetScheme() *runtime.Scheme
5053

@@ -57,9 +60,6 @@ type Cluster interface {
5760
// GetFieldIndexer returns a client.FieldIndexer configured with the client
5861
GetFieldIndexer() client.FieldIndexer
5962

60-
// GetCache returns a cache.Cache
61-
GetCache() cache.Cache
62-
6363
// GetEventRecorderFor returns a new EventRecorder for the provided name
6464
GetEventRecorderFor(name string) record.EventRecorder
6565

pkg/controller/controller.go

-1
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,6 @@ func NewUnmanaged(name string, mgr manager.Manager, options Options) (Controller
156156
},
157157
MaxConcurrentReconciles: options.MaxConcurrentReconciles,
158158
CacheSyncTimeout: options.CacheSyncTimeout,
159-
SetFields: mgr.SetFields,
160159
Name: name,
161160
LogConstructor: options.LogConstructor,
162161
RecoverPanic: options.RecoverPanic,

pkg/controller/controller_integration_test.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,13 @@ var _ = Describe("controller", func() {
6464
Expect(err).NotTo(HaveOccurred())
6565

6666
By("Watching Resources")
67-
err = instance.Watch(&source.Kind{Type: &appsv1.ReplicaSet{}}, &handler.EnqueueRequestForOwner{
68-
OwnerType: &appsv1.Deployment{},
69-
})
67+
err = instance.Watch(
68+
source.Kind(cm.GetCache(), &appsv1.ReplicaSet{}),
69+
handler.EnqueueRequestForOwner(cm.GetScheme(), cm.GetRESTMapper(), &appsv1.Deployment{}),
70+
)
7071
Expect(err).NotTo(HaveOccurred())
7172

72-
err = instance.Watch(&source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForObject{})
73+
err = instance.Watch(source.Kind(cm.GetCache(), &appsv1.Deployment{}), &handler.EnqueueRequestForObject{})
7374
Expect(err).NotTo(HaveOccurred())
7475

7576
err = cm.GetClient().Get(ctx, types.NamespacedName{Name: "foo"}, &corev1.Namespace{})

pkg/controller/controller_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ var _ = Describe("controller.Controller", func() {
8888
It("should not leak goroutines when stopped", func() {
8989
currentGRs := goleak.IgnoreCurrent()
9090

91+
ctx, cancel := context.WithCancel(context.Background())
9192
watchChan := make(chan event.GenericEvent, 1)
9293
watch := &source.Channel{Source: watchChan}
9394
watchChan <- event.GenericEvent{Object: &corev1.Pod{}}
@@ -114,7 +115,6 @@ var _ = Describe("controller.Controller", func() {
114115
Expect(c.Watch(watch, &handler.EnqueueRequestForObject{})).To(Succeed())
115116
Expect(err).NotTo(HaveOccurred())
116117

117-
ctx, cancel := context.WithCancel(context.Background())
118118
go func() {
119119
defer GinkgoRecover()
120120
Expect(m.Start(ctx)).To(Succeed())

pkg/controller/example_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ func ExampleController() {
7171
}
7272

7373
// Watch for Pod create / update / delete events and call Reconcile
74-
err = c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{})
74+
err = c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}), &handler.EnqueueRequestForObject{})
7575
if err != nil {
7676
log.Error(err, "unable to watch pods")
7777
os.Exit(1)
@@ -108,7 +108,7 @@ func ExampleController_unstructured() {
108108
Version: "v1",
109109
})
110110
// Watch for Pod create / update / delete events and call Reconcile
111-
err = c.Watch(&source.Kind{Type: u}, &handler.EnqueueRequestForObject{})
111+
err = c.Watch(source.Kind(mgr.GetCache(), u), &handler.EnqueueRequestForObject{})
112112
if err != nil {
113113
log.Error(err, "unable to watch pods")
114114
os.Exit(1)
@@ -139,7 +139,7 @@ func ExampleNewUnmanaged() {
139139
os.Exit(1)
140140
}
141141

142-
if err := c.Watch(&source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestForObject{}); err != nil {
142+
if err := c.Watch(source.Kind(mgr.GetCache(), &corev1.Pod{}), &handler.EnqueueRequestForObject{}); err != nil {
143143
log.Error(err, "unable to watch pods")
144144
os.Exit(1)
145145
}

pkg/handler/enqueue_mapped.go

-11
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"sigs.k8s.io/controller-runtime/pkg/client"
2222
"sigs.k8s.io/controller-runtime/pkg/event"
2323
"sigs.k8s.io/controller-runtime/pkg/reconcile"
24-
"sigs.k8s.io/controller-runtime/pkg/runtime/inject"
2524
)
2625

2726
// MapFunc is the signature required for enqueueing requests from a generic function.
@@ -85,13 +84,3 @@ func (e *enqueueRequestsFromMapFunc) mapAndEnqueue(q workqueue.RateLimitingInter
8584
}
8685
}
8786
}
88-
89-
// EnqueueRequestsFromMapFunc can inject fields into the mapper.
90-
91-
// InjectFunc implements inject.Injector.
92-
func (e *enqueueRequestsFromMapFunc) InjectFunc(f inject.Func) error {
93-
if f == nil {
94-
return nil
95-
}
96-
return f(e.toRequests)
97-
}

0 commit comments

Comments
 (0)