|
| 1 | +package client |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "fmt" |
| 6 | + "reflect" |
| 7 | + |
| 8 | + "k8s.io/apimachinery/pkg/api/errors" |
| 9 | + apimeta "k8s.io/apimachinery/pkg/api/meta" |
| 10 | + "k8s.io/apimachinery/pkg/fields" |
| 11 | + "k8s.io/apimachinery/pkg/labels" |
| 12 | + "k8s.io/apimachinery/pkg/runtime" |
| 13 | + "k8s.io/apimachinery/pkg/runtime/schema" |
| 14 | + "k8s.io/apimachinery/pkg/selection" |
| 15 | + "k8s.io/client-go/tools/cache" |
| 16 | + |
| 17 | + "github.com/kubernetes-sigs/kubebuilder/pkg/informer" |
| 18 | + logf "github.com/kubernetes-sigs/kubebuilder/pkg/log" |
| 19 | +) |
| 20 | + |
| 21 | +var log = logf.KBLog.WithName("object-cache") |
| 22 | + |
| 23 | +// ObjectCache is a ReadInterface |
| 24 | +var _ ReadInterface = &ObjectCache{} |
| 25 | + |
| 26 | +type ObjectCache struct { |
| 27 | + cachesByType map[reflect.Type]*SingleObjectCache |
| 28 | + scheme *runtime.Scheme |
| 29 | +} |
| 30 | + |
| 31 | +func ObjectCacheFromInformers(informers map[schema.GroupVersionKind]cache.SharedIndexInformer, scheme *runtime.Scheme) *ObjectCache { |
| 32 | + res := NewObjectCache(scheme) |
| 33 | + res.AddInformers(informers) |
| 34 | + return res |
| 35 | +} |
| 36 | + |
| 37 | +func (o *ObjectCache) AddInformers(informers map[schema.GroupVersionKind]cache.SharedIndexInformer) { |
| 38 | + for gvk, informer := range informers { |
| 39 | + o.AddInformer(gvk, informer) |
| 40 | + } |
| 41 | +} |
| 42 | + |
| 43 | +func (o *ObjectCache) Call(gvk schema.GroupVersionKind, c cache.SharedIndexInformer) { |
| 44 | + o.AddInformer(gvk, c) |
| 45 | +} |
| 46 | + |
| 47 | +func (o *ObjectCache) AddInformer(gvk schema.GroupVersionKind, c cache.SharedIndexInformer) { |
| 48 | + obj, err := o.scheme.New(gvk) |
| 49 | + if err != nil { |
| 50 | + log.Error(err, "could not register informer in ObjectCache for GVK", "GroupVersionKind", gvk) |
| 51 | + return |
| 52 | + } |
| 53 | + if _, found := o.CacheFor(obj); found { |
| 54 | + return |
| 55 | + } |
| 56 | + o.RegisterCache(obj, gvk, c.GetIndexer()) |
| 57 | +} |
| 58 | + |
| 59 | +func NewObjectCache(scheme *runtime.Scheme) *ObjectCache { |
| 60 | + return &ObjectCache{ |
| 61 | + cachesByType: make(map[reflect.Type]*SingleObjectCache), |
| 62 | + scheme: scheme, |
| 63 | + } |
| 64 | +} |
| 65 | + |
| 66 | +func (c *ObjectCache) RegisterCache(obj runtime.Object, gvk schema.GroupVersionKind, store cache.Indexer) { |
| 67 | + objType := reflect.TypeOf(obj) |
| 68 | + c.cachesByType[objType] = &SingleObjectCache{ |
| 69 | + Indexer: store, |
| 70 | + GroupVersionKind: gvk, |
| 71 | + } |
| 72 | +} |
| 73 | + |
| 74 | +func (c *ObjectCache) CacheFor(obj runtime.Object) (*SingleObjectCache, bool) { |
| 75 | + objType := reflect.TypeOf(obj) |
| 76 | + cache, isKnown := c.cachesByType[objType] |
| 77 | + return cache, isKnown |
| 78 | +} |
| 79 | + |
| 80 | +func (c *ObjectCache) Get(ctx context.Context, key ObjectKey, out runtime.Object) error { |
| 81 | + cache, isKnown := c.CacheFor(out) |
| 82 | + if !isKnown { |
| 83 | + return fmt.Errorf("no cache for objects of type %T, must have asked for an watch/informer first", out) |
| 84 | + } |
| 85 | + return cache.Get(ctx, key, out) |
| 86 | +} |
| 87 | + |
| 88 | +func (c *ObjectCache) List(ctx context.Context, opts *ListOptions, out runtime.Object) error { |
| 89 | + itemsPtr, err := apimeta.GetItemsPtr(out) |
| 90 | + if err != nil { |
| 91 | + return nil |
| 92 | + } |
| 93 | + // http://knowyourmeme.com/memes/this-is-fine |
| 94 | + outType := reflect.Indirect(reflect.ValueOf(itemsPtr)).Type().Elem() |
| 95 | + cache, isKnown := c.cachesByType[outType] |
| 96 | + if !isKnown { |
| 97 | + return fmt.Errorf("no cache for objects of type %T", out) |
| 98 | + } |
| 99 | + return cache.List(ctx, opts, out) |
| 100 | +} |
| 101 | + |
| 102 | +// SingleObjectCache is a ReadInterface |
| 103 | +var _ ReadInterface = &SingleObjectCache{} |
| 104 | + |
| 105 | +// SingleObjectCache is a ReadInterface that retrieves objects |
| 106 | +// from a single local cache populated by a watch. |
| 107 | +type SingleObjectCache struct { |
| 108 | + // Indexer is the underlying indexer wrapped by this cache. |
| 109 | + Indexer cache.Indexer |
| 110 | + // GroupVersionKind is the group-version-kind of the resource. |
| 111 | + GroupVersionKind schema.GroupVersionKind |
| 112 | +} |
| 113 | + |
| 114 | +func (c *SingleObjectCache) Get(_ context.Context, key ObjectKey, out runtime.Object) error { |
| 115 | + storeKey := objectKeyToStoreKey(key) |
| 116 | + obj, exists, err := c.Indexer.GetByKey(storeKey) |
| 117 | + if err != nil { |
| 118 | + return err |
| 119 | + } |
| 120 | + if !exists { |
| 121 | + // Resource gets transformed into Kind in the error anyway, so this is fine |
| 122 | + return errors.NewNotFound(schema.GroupResource{ |
| 123 | + Group: c.GroupVersionKind.Group, |
| 124 | + Resource: c.GroupVersionKind.Kind, |
| 125 | + }, key.Name) |
| 126 | + } |
| 127 | + if _, isObj := obj.(runtime.Object); !isObj { |
| 128 | + return fmt.Errorf("cache contained %T, which is not an Object", obj) |
| 129 | + } |
| 130 | + |
| 131 | + // deep copy to avoid mutating cache |
| 132 | + // TODO(directxman12): revisit the decision to always deepcopy |
| 133 | + obj = obj.(runtime.Object).DeepCopyObject() |
| 134 | + |
| 135 | + // TODO(directxman12): this is a terrible hack, pls fix |
| 136 | + // (we should have deepcopyinto) |
| 137 | + outVal := reflect.ValueOf(out) |
| 138 | + objVal := reflect.ValueOf(obj) |
| 139 | + if !objVal.Type().AssignableTo(outVal.Type()) { |
| 140 | + return fmt.Errorf("cache had type %s, but %s was asked for", objVal.Type(), outVal.Type()) |
| 141 | + } |
| 142 | + reflect.Indirect(outVal).Set(reflect.Indirect(objVal)) |
| 143 | + return nil |
| 144 | +} |
| 145 | + |
| 146 | +func (c *SingleObjectCache) List(ctx context.Context, opts *ListOptions, out runtime.Object) error { |
| 147 | + var objs []interface{} |
| 148 | + var err error |
| 149 | + |
| 150 | + if opts != nil && opts.FieldSelector != nil { |
| 151 | + // TODO(directxman12): support more complicated field selectors by |
| 152 | + // combining multiple indicies, GetIndexers, etc |
| 153 | + field, val, requiresExact := requiresExactMatch(opts.FieldSelector) |
| 154 | + if !requiresExact { |
| 155 | + return fmt.Errorf("non-exact field matches are not supported by the cache") |
| 156 | + } |
| 157 | + // list all objects by the field selector. If this is namespaced and we have one, ask for the |
| 158 | + // namespaced index key. Otherwise, ask for the non-namespaced variant by using the fake "all namespaces" |
| 159 | + // namespace. |
| 160 | + objs, err = c.Indexer.ByIndex(fieldIndexName(field), keyToNamespacedKey(opts.Namespace, val)) |
| 161 | + } else if opts != nil && opts.Namespace != "" { |
| 162 | + objs, err = c.Indexer.ByIndex(cache.NamespaceIndex, opts.Namespace) |
| 163 | + } else { |
| 164 | + objs = c.Indexer.List() |
| 165 | + } |
| 166 | + if err != nil { |
| 167 | + return err |
| 168 | + } |
| 169 | + var labelSel labels.Selector |
| 170 | + if opts != nil && opts.LabelSelector != nil { |
| 171 | + labelSel = opts.LabelSelector |
| 172 | + } |
| 173 | + |
| 174 | + outItems := make([]runtime.Object, 0, len(objs)) |
| 175 | + for _, item := range objs { |
| 176 | + obj, isObj := item.(runtime.Object) |
| 177 | + if !isObj { |
| 178 | + return fmt.Errorf("cache contained %T, which is not an Object", obj) |
| 179 | + } |
| 180 | + meta, err := apimeta.Accessor(obj) |
| 181 | + if err != nil { |
| 182 | + return err |
| 183 | + } |
| 184 | + if labelSel != nil { |
| 185 | + lbls := labels.Set(meta.GetLabels()) |
| 186 | + if !labelSel.Matches(lbls) { |
| 187 | + continue |
| 188 | + } |
| 189 | + } |
| 190 | + outItems = append(outItems, obj.DeepCopyObject()) |
| 191 | + } |
| 192 | + if err := apimeta.SetList(out, outItems); err != nil { |
| 193 | + return err |
| 194 | + } |
| 195 | + return nil |
| 196 | +} |
| 197 | + |
| 198 | +// TODO: Make an interface with this function that has an Informers as an object on the struct |
| 199 | +// that automatically calls InformerFor and passes in the Indexer into IndexByField |
| 200 | + |
| 201 | +// noNamespaceNamespace is used as the "namespace" when we want to list across all namespaces |
| 202 | +const allNamespacesNamespace = "__all_namespaces" |
| 203 | + |
| 204 | +type InformerFieldIndexer struct { |
| 205 | + Informers informer.Informers |
| 206 | +} |
| 207 | + |
| 208 | +func (i *InformerFieldIndexer) IndexField(obj runtime.Object, field string, extractValue IndexerFunc) error { |
| 209 | + informer, err := i.Informers.InformerFor(obj) |
| 210 | + if err != nil { |
| 211 | + return err |
| 212 | + } |
| 213 | + return IndexByField(informer.GetIndexer(), field, extractValue) |
| 214 | +} |
| 215 | + |
| 216 | +// IndexByField adds an indexer to the underlying cache, using extraction function to get |
| 217 | +// value(s) from the given field. This index can then be used by passing a field selector |
| 218 | +// to List. For one-to-one compatibility with "normal" field selectors, only return one value. |
| 219 | +// The values may be anything. They will automatically be prefixed with the namespace of the |
| 220 | +// given object, if present. The objects passed are guaranteed to be objects of the correct type. |
| 221 | +func IndexByField(indexer cache.Indexer, field string, extractor IndexerFunc) error { |
| 222 | + indexFunc := func(objRaw interface{}) ([]string, error) { |
| 223 | + // TODO(directxman12): check if this is the correct type? |
| 224 | + obj, isObj := objRaw.(runtime.Object) |
| 225 | + if !isObj { |
| 226 | + return nil, fmt.Errorf("object of type %T is not an Object", objRaw) |
| 227 | + } |
| 228 | + meta, err := apimeta.Accessor(obj) |
| 229 | + if err != nil { |
| 230 | + return nil, err |
| 231 | + } |
| 232 | + ns := meta.GetNamespace() |
| 233 | + |
| 234 | + rawVals := extractor(obj) |
| 235 | + var vals []string |
| 236 | + if ns == "" { |
| 237 | + // if we're not doubling the keys for the namespaced case, just re-use what was returned to us |
| 238 | + vals = rawVals |
| 239 | + } else { |
| 240 | + // if we need to add non-namespaced versions too, double the length |
| 241 | + vals = make([]string, len(rawVals)*2) |
| 242 | + } |
| 243 | + for i, rawVal := range rawVals { |
| 244 | + // save a namespaced variant, so that we can ask |
| 245 | + // "what are all the object matching a given index *in a given namespace*" |
| 246 | + vals[i] = keyToNamespacedKey(ns, rawVal) |
| 247 | + if ns != "" { |
| 248 | + // if we have a namespace, also inject a special index key for listing |
| 249 | + // regardless of the object namespace |
| 250 | + vals[i+len(rawVals)] = keyToNamespacedKey("", rawVal) |
| 251 | + } |
| 252 | + } |
| 253 | + |
| 254 | + return vals, nil |
| 255 | + } |
| 256 | + |
| 257 | + if err := indexer.AddIndexers(cache.Indexers{fieldIndexName(field): indexFunc}); err != nil { |
| 258 | + return err |
| 259 | + } |
| 260 | + return nil |
| 261 | +} |
| 262 | + |
| 263 | +// fieldIndexName constructs the name of the index over the given field, |
| 264 | +// for use with an Indexer. |
| 265 | +func fieldIndexName(field string) string { |
| 266 | + return "field:" + field |
| 267 | +} |
| 268 | + |
| 269 | +// keyToNamespacedKey prefixes the given index key with a namespace |
| 270 | +// for use in field selector indexes. |
| 271 | +func keyToNamespacedKey(ns string, baseKey string) string { |
| 272 | + if ns != "" { |
| 273 | + return ns + "/" + baseKey |
| 274 | + } |
| 275 | + return allNamespacesNamespace + "/" + baseKey |
| 276 | +} |
| 277 | + |
| 278 | +// objectKeyToStorageKey converts an object key to store key. |
| 279 | +// It's akin to MetaNamespaceKeyFunc. It's seperate from |
| 280 | +// String to allow keeping the key format easily in sync with |
| 281 | +// MetaNamespaceKeyFunc. |
| 282 | +func objectKeyToStoreKey(k ObjectKey) string { |
| 283 | + if k.Namespace == "" { |
| 284 | + return k.Name |
| 285 | + } |
| 286 | + return k.Namespace + "/" + k.Name |
| 287 | +} |
| 288 | + |
| 289 | +// requiresExactMatch checks if the given field selector is of the form `k=v` or `k==v`. |
| 290 | +func requiresExactMatch(sel fields.Selector) (field, val string, required bool) { |
| 291 | + reqs := sel.Requirements() |
| 292 | + if len(reqs) != 1 { |
| 293 | + return "", "", false |
| 294 | + } |
| 295 | + req := reqs[0] |
| 296 | + if req.Operator != selection.Equals && req.Operator != selection.DoubleEquals { |
| 297 | + return "", "", false |
| 298 | + } |
| 299 | + return req.Field, req.Value, true |
| 300 | +} |
| 301 | + |
| 302 | +// SplitReaderWriter forms an interface Interface by composing separate |
| 303 | +// read and write interfaces. This way, you can have an Interface that |
| 304 | +// reads from a cache and writes to the API server. |
| 305 | +type SplitReaderWriter struct { |
| 306 | + ReadInterface |
| 307 | + WriteInterface |
| 308 | +} |
0 commit comments