working commit

This commit is contained in:
2026-03-13 19:02:42 +02:00
parent bebbf79c7a
commit 5c1da77f4c
1329 changed files with 314708 additions and 39 deletions
@@ -0,0 +1,34 @@
// Copyright 2022 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
package watcher
import (
"context"
"github.com/fluxcd/cli-utils/pkg/kstatus/polling/event"
"github.com/fluxcd/cli-utils/pkg/object"
)
// BlindStatusWatcher sees nothing.
// BlindStatusWatcher sends no update or error events.
// BlindStatusWatcher waits patiently to be cancelled.
// BlindStatusWatcher implements the StatusWatcher interface.
type BlindStatusWatcher struct{}
var _ StatusWatcher = BlindStatusWatcher{}
// Watch nothing. See no changes.
func (w BlindStatusWatcher) Watch(ctx context.Context, _ object.ObjMetadataSet, _ Options) <-chan event.Event {
doneCh := ctx.Done()
eventCh := make(chan event.Event)
go func() {
// Send SyncEvent immediately.
eventCh <- event.Event{Type: event.SyncEvent}
// Block until the context is cancelled.
<-doneCh
// Signal to the caller there will be no more events.
close(eventCh)
}()
return eventCh
}
@@ -0,0 +1,176 @@
// Copyright 2022 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
package watcher
import (
"context"
"fmt"
"time"
"github.com/fluxcd/cli-utils/pkg/kstatus/polling/clusterreader"
"github.com/fluxcd/cli-utils/pkg/kstatus/polling/engine"
"github.com/fluxcd/cli-utils/pkg/kstatus/polling/event"
"github.com/fluxcd/cli-utils/pkg/kstatus/polling/statusreaders"
"github.com/fluxcd/cli-utils/pkg/object"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/klog/v2"
)
// DefaultStatusWatcher reports on status updates to a set of objects.
//
// Use NewDefaultStatusWatcher to build a DefaultStatusWatcher with default settings.
type DefaultStatusWatcher struct {
// DynamicClient is used to watch of resource objects.
DynamicClient dynamic.Interface
// Mapper is used to map from GroupKind to GroupVersionKind.
Mapper meta.RESTMapper
// ResyncPeriod is how often the objects are retrieved to re-synchronize,
// in case any events were missed.
ResyncPeriod time.Duration
// StatusReader specifies a custom implementation of the
// engine.StatusReader interface that will be used to compute reconcile
// status for resource objects.
StatusReader engine.StatusReader
// ClusterReader is used to look up generated objects on-demand.
// Generated objects (ex: Deployment > ReplicaSet > Pod) are sometimes
// required for computing parent object status, to compensate for
// controllers that aren't following status conventions.
ClusterReader engine.ClusterReader
}
var _ StatusWatcher = &DefaultStatusWatcher{}
// NewDefaultStatusWatcher constructs a DynamicStatusWatcher with defaults
// chosen for general use. If you need different settings, consider building a
// DynamicStatusWatcher directly.
func NewDefaultStatusWatcher(dynamicClient dynamic.Interface, mapper meta.RESTMapper) *DefaultStatusWatcher {
return &DefaultStatusWatcher{
DynamicClient: dynamicClient,
Mapper: mapper,
ResyncPeriod: 1 * time.Hour,
StatusReader: statusreaders.NewDefaultStatusReader(mapper),
ClusterReader: &clusterreader.DynamicClusterReader{
DynamicClient: dynamicClient,
Mapper: mapper,
},
}
}
// Watch the cluster for changes made to the specified objects.
// Returns an event channel on which these updates (and errors) will be reported.
// Each update event includes the computed status of the object.
func (w *DefaultStatusWatcher) Watch(ctx context.Context, ids object.ObjMetadataSet, opts Options) <-chan event.Event {
strategy := opts.RESTScopeStrategy
if strategy == RESTScopeAutomatic {
strategy = autoSelectRESTScopeStrategy(ids)
}
var scope meta.RESTScope
var targets []GroupKindNamespace
switch strategy {
case RESTScopeRoot:
scope = meta.RESTScopeRoot
targets = rootScopeGKNs(ids)
klog.V(3).Infof("DynamicStatusWatcher starting in root-scoped mode (targets: %d)", len(targets))
case RESTScopeNamespace:
scope = meta.RESTScopeNamespace
targets = namespaceScopeGKNs(ids)
klog.V(3).Infof("DynamicStatusWatcher starting in namespace-scoped mode (targets: %d)", len(targets))
default:
return handleFatalError(fmt.Errorf("invalid RESTScopeStrategy: %v", strategy))
}
informer := &ObjectStatusReporter{
InformerFactory: NewDynamicInformerFactory(w.DynamicClient, w.ResyncPeriod),
Mapper: w.Mapper,
StatusReader: w.StatusReader,
ClusterReader: w.ClusterReader,
Targets: targets,
ObjectFilter: &AllowListObjectFilter{AllowList: ids},
RESTScope: scope,
}
return informer.Start(ctx)
}
func handleFatalError(err error) <-chan event.Event {
eventCh := make(chan event.Event)
go func() {
defer close(eventCh)
eventCh <- event.Event{
Type: event.ErrorEvent,
Error: err,
}
}()
return eventCh
}
func autoSelectRESTScopeStrategy(ids object.ObjMetadataSet) RESTScopeStrategy {
if len(uniqueNamespaces(ids)) > 1 {
return RESTScopeRoot
}
return RESTScopeNamespace
}
func rootScopeGKNs(ids object.ObjMetadataSet) []GroupKindNamespace {
gks := uniqueGKs(ids)
targets := make([]GroupKindNamespace, len(gks))
for i, gk := range gks {
targets[i] = GroupKindNamespace{
Group: gk.Group,
Kind: gk.Kind,
Namespace: "",
}
}
return targets
}
func namespaceScopeGKNs(ids object.ObjMetadataSet) []GroupKindNamespace {
return uniqueGKNs(ids)
}
// uniqueGKNs returns a set of unique GroupKindNamespaces from a set of object identifiers.
func uniqueGKNs(ids object.ObjMetadataSet) []GroupKindNamespace {
gknMap := make(map[GroupKindNamespace]struct{})
for _, id := range ids {
gkn := GroupKindNamespace{Group: id.GroupKind.Group, Kind: id.GroupKind.Kind, Namespace: id.Namespace}
gknMap[gkn] = struct{}{}
}
gknList := make([]GroupKindNamespace, 0, len(gknMap))
for gk := range gknMap {
gknList = append(gknList, gk)
}
return gknList
}
// uniqueGKs returns a set of unique GroupKinds from a set of object identifiers.
func uniqueGKs(ids object.ObjMetadataSet) []schema.GroupKind {
gkMap := make(map[schema.GroupKind]struct{})
for _, id := range ids {
gkn := schema.GroupKind{Group: id.GroupKind.Group, Kind: id.GroupKind.Kind}
gkMap[gkn] = struct{}{}
}
gkList := make([]schema.GroupKind, 0, len(gkMap))
for gk := range gkMap {
gkList = append(gkList, gk)
}
return gkList
}
func uniqueNamespaces(ids object.ObjMetadataSet) []string {
nsMap := make(map[string]struct{})
for _, id := range ids {
nsMap[id.Namespace] = struct{}{}
}
nsList := make([]string, 0, len(nsMap))
for ns := range nsMap {
nsList = append(nsList, ns)
}
return nsList
}
+39
View File
@@ -0,0 +1,39 @@
// Copyright 2022 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
// Package watcher is a library for computing the status of kubernetes resource
// objects based on watching object state from a cluster. It keeps watching
// until it is cancelled through the provided context. Updates on the status of
// objects are streamed back to the caller through a channel.
//
// # Watching Resources
//
// In order to watch a set of resources objects, create a StatusWatcher
// and pass in the list of object identifiers to the Watch function.
//
// import (
// "github.com/fluxcd/cli-utils/pkg/kstatus/watcher"
// )
//
// ids := []prune.ObjMetadata{
// {
// GroupKind: schema.GroupKind{
// Group: "apps",
// Kind: "Deployment",
// },
// Name: "dep",
// Namespace: "default",
// }
// }
//
// statusWatcher := watcher.NewDefaultStatusWatcher(dynamicClient, mapper)
// ctx, cancelFunc := context.WithCancel(context.Background())
// eventCh := statusWatcher.Watch(ctx, ids, watcher.Options{})
// for e := range eventCh {
// // Handle event
// if e.Type == event.ErrorEvent {
// cancelFunc()
// return e.Err
// }
// }
package watcher
@@ -0,0 +1,64 @@
// Copyright 2022 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
package watcher
import (
"context"
"time"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
)
type DynamicInformerFactory struct {
Client dynamic.Interface
ResyncPeriod time.Duration
Indexers cache.Indexers
}
func NewDynamicInformerFactory(client dynamic.Interface, resyncPeriod time.Duration) *DynamicInformerFactory {
return &DynamicInformerFactory{
Client: client,
ResyncPeriod: resyncPeriod,
Indexers: cache.Indexers{
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
},
}
}
func (f *DynamicInformerFactory) NewInformer(ctx context.Context, mapping *meta.RESTMapping, namespace string) cache.SharedIndexInformer {
// Unstructured example output need `"apiVersion"` and `"kind"` set.
example := &unstructured.Unstructured{}
example.SetGroupVersionKind(mapping.GroupVersionKind)
lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return f.Client.Resource(mapping.Resource).
Namespace(namespace).
List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return f.Client.Resource(mapping.Resource).
Namespace(namespace).
Watch(ctx, options)
},
}
// Wrap the ListWatch with the client to allow the reflector to detect
// if the client supports WatchList semantics. This is important for
// fake clients used in tests, which do not support WatchList.
wrappedLW := cache.ToListWatcherWithWatchListSemantics(lw, f.Client)
return cache.NewSharedIndexInformer(
wrappedLW,
example,
f.ResyncPeriod,
f.Indexers,
)
}
+122
View File
@@ -0,0 +1,122 @@
// Copyright 2022 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
package watcher
import (
"context"
"fmt"
"github.com/fluxcd/cli-utils/pkg/kstatus/polling/event"
"k8s.io/klog/v2"
)
// eventFunnel wraps a list of event channels and multiplexes them down to a
// single event channel. New input channels can be added at runtime, and the
// output channel will remain open until all input channels are closed.
type eventFunnel struct {
// ctx closure triggers shutdown
ctx context.Context
// outCh is the funnel that consumes all events from input channels
outCh chan event.Event
// doneCh is closed after outCh is closed.
// This allows blocking until done without consuming events.
doneCh chan struct{}
// counterCh is used to track the number of open input channels.
counterCh chan int
}
func newEventFunnel(ctx context.Context) *eventFunnel {
funnel := &eventFunnel{
ctx: ctx,
outCh: make(chan event.Event),
doneCh: make(chan struct{}),
counterCh: make(chan int),
}
// Wait until the context is done and all input channels are closed.
// Then close out and done channels to signal completion.
go func() {
defer func() {
// Don't close counterCh, otherwise AddInputChannel may panic.
klog.V(5).Info("Closing funnel")
close(funnel.outCh)
close(funnel.doneCh)
}()
ctxDoneCh := ctx.Done()
// Count input channels that have been added and not closed.
inputs := 0
for {
select {
case delta := <-funnel.counterCh:
inputs += delta
klog.V(5).Infof("Funnel input channels (%+d): %d", delta, inputs)
case <-ctxDoneCh:
// Stop waiting for context closure.
// Nil channel avoids busy waiting.
ctxDoneCh = nil
}
if ctxDoneCh == nil && inputs <= 0 {
// Context is closed and all input channels are closed.
break
}
}
}()
return funnel
}
// Add a new input channel to the multiplexer.
func (m *eventFunnel) AddInputChannel(inCh <-chan event.Event) error {
select {
case <-m.ctx.Done(): // skip, if context is closed
return &EventFunnelClosedError{ContextError: m.ctx.Err()}
case m.counterCh <- 1: // increment counter
}
// Create a multiplexer for each new event channel.
go m.drain(inCh, m.outCh)
return nil
}
// OutputChannel channel receives all events sent to input channels.
// This channel is closed after all input channels are closed.
func (m *eventFunnel) OutputChannel() <-chan event.Event {
return m.outCh
}
// Done channel is closed after the Output channel is closed.
// This allows blocking until done without consuming events.
// If no input channels have been added yet, the done channel will be nil.
func (m *eventFunnel) Done() <-chan struct{} {
return m.doneCh
}
// drain a single input channel to a single output channel.
func (m *eventFunnel) drain(inCh <-chan event.Event, outCh chan<- event.Event) {
defer func() {
m.counterCh <- -1 // decrement counter
}()
for event := range inCh {
outCh <- event
}
}
type EventFunnelClosedError struct {
ContextError error
}
func (e *EventFunnelClosedError) Error() string {
return fmt.Sprintf("event funnel closed: %v", e.ContextError)
}
func (e *EventFunnelClosedError) Is(err error) bool {
fcErr, ok := err.(*EventFunnelClosedError)
if !ok {
return false
}
return e.ContextError == fcErr.ContextError
}
func (e *EventFunnelClosedError) Unwrap() error {
return e.ContextError
}
@@ -0,0 +1,30 @@
// Copyright 2022 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
package watcher
import (
"github.com/fluxcd/cli-utils/pkg/object"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)
// ObjectFilter allows for filtering objects.
type ObjectFilter interface {
// Filter returns true if the object should be skipped.
Filter(obj *unstructured.Unstructured) bool
}
// AllowListObjectFilter filters objects not in the allow list.
// AllowListObjectFilter implements ObjectFilter.
type AllowListObjectFilter struct {
AllowList object.ObjMetadataSet
}
var _ ObjectFilter = &AllowListObjectFilter{}
// Filter returns true if the object should be skipped, because it is NOT in the
// AllowList.
func (f *AllowListObjectFilter) Filter(obj *unstructured.Unstructured) bool {
id := object.UnstructuredToObjMetadata(obj)
return !f.AllowList.Contains(id)
}
@@ -0,0 +1,895 @@
// Copyright 2022 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
package watcher
import (
"context"
"errors"
"fmt"
"io"
"sync"
"time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
"github.com/fluxcd/cli-utils/pkg/kstatus/polling/engine"
"github.com/fluxcd/cli-utils/pkg/kstatus/polling/event"
"github.com/fluxcd/cli-utils/pkg/kstatus/status"
"github.com/fluxcd/cli-utils/pkg/object"
)
// GroupKindNamespace identifies an informer target.
// When used as an informer target, the namespace is optional.
// When the namespace is empty for namespaced resources, all namespaces are watched.
type GroupKindNamespace struct {
Group string
Kind string
Namespace string
}
// String returns a serialized form suitable for logging.
func (gkn GroupKindNamespace) String() string {
return fmt.Sprintf("%s/%s/namespaces/%s",
gkn.Group, gkn.Kind, gkn.Namespace)
}
func (gkn GroupKindNamespace) GroupKind() schema.GroupKind {
return schema.GroupKind{Group: gkn.Group, Kind: gkn.Kind}
}
// ObjectStatusReporter reports on updates to objects (instances) using a
// network of informers to watch one or more resources (types).
//
// Unlike SharedIndexInformer, ObjectStatusReporter...
// - Reports object status.
// - Can watch multiple resource types simultaneously.
// - Specific objects can be ignored for efficiency by specifying an ObjectFilter.
// - Resolves GroupKinds into Resources at runtime, to pick up newly added
// resources.
// - Starts and Stops individual watches automaically to reduce errors when a
// CRD or Namespace is deleted.
// - Resources can be watched in root-scope mode or namespace-scope mode,
// allowing the caller to optimize for efficiency or least-privilege.
// - Gives unschedulable Pods (and objects that generate them) a 15s grace
// period before reporting them as Failed.
// - Resets the RESTMapper cache automatically when CRDs are modified.
//
// ObjectStatusReporter is NOT repeatable. It will panic if started more than
// once. If you need a repeatable factory, use DefaultStatusWatcher.
//
// TODO: support detection of added/removed api extensions at runtime
// TODO: Watch CRDs & Namespaces, even if not in the set of IDs.
// TODO: Retry with backoff if in namespace-scoped mode, to allow CRDs & namespaces to be created asynchronously
type ObjectStatusReporter struct {
// InformerFactory is used to build informers
InformerFactory *DynamicInformerFactory
// Mapper is used to map from GroupKind to GroupVersionKind.
Mapper meta.RESTMapper
// StatusReader specifies a custom implementation of the
// engine.StatusReader interface that will be used to compute reconcile
// status for resource objects.
StatusReader engine.StatusReader
// ClusterReader is used to look up generated objects on-demand.
// Generated objects (ex: Deployment > ReplicaSet > Pod) are sometimes
// required for computing parent object status, to compensate for
// controllers that aren't following status conventions.
ClusterReader engine.ClusterReader
// GroupKinds is the list of GroupKinds to watch.
Targets []GroupKindNamespace
// ObjectFilter is used to decide which objects to ingore.
ObjectFilter ObjectFilter
// RESTScope specifies whether to ListAndWatch resources at the namespace
// or cluster (root) level. Using root scope is more efficient, but
// namespace scope may require fewer permissions.
RESTScope meta.RESTScope
// lock guards modification of the subsequent stateful fields
lock sync.Mutex
// gk2gkn maps GKs to GKNs to make it easy/cheap to look up.
gk2gkn map[schema.GroupKind]map[GroupKindNamespace]struct{}
// ns2gkn maps Namespaces to GKNs to make it easy/cheap to look up.
ns2gkn map[string]map[GroupKindNamespace]struct{}
// informerRefs tracks which informers have been started and stopped
informerRefs map[GroupKindNamespace]*informerReference
// context will be cancelled when the reporter should stop.
context context.Context
// cancel function that stops the context.
// This should only be called after the terminal error event has been sent.
cancel context.CancelFunc
// funnel multiplexes multiple input channels into one output channel,
// allowing input channels to be added and removed at runtime.
funnel *eventFunnel
// taskManager makes it possible to cancel scheduled tasks.
taskManager *taskManager
started bool
stopped bool
}
func (w *ObjectStatusReporter) Start(ctx context.Context) <-chan event.Event {
w.lock.Lock()
defer w.lock.Unlock()
if w.started {
panic("ObjectStatusInformer cannot be restarted")
}
w.taskManager = &taskManager{}
// Map GroupKinds to sets of GroupKindNamespaces for fast lookups.
// This is the only time we modify the map.
// So it should be safe to read from multiple threads after this.
w.gk2gkn = make(map[schema.GroupKind]map[GroupKindNamespace]struct{})
for _, gkn := range w.Targets {
gk := gkn.GroupKind()
m, found := w.gk2gkn[gk]
if !found {
m = make(map[GroupKindNamespace]struct{})
w.gk2gkn[gk] = m
}
m[gkn] = struct{}{}
}
// Map namespaces to sets of GroupKindNamespaces for fast lookups.
// This is the only time we modify the map.
// So it should be safe to read from multiple threads after this.
w.ns2gkn = make(map[string]map[GroupKindNamespace]struct{})
for _, gkn := range w.Targets {
ns := gkn.Namespace
m, found := w.ns2gkn[ns]
if !found {
m = make(map[GroupKindNamespace]struct{})
w.ns2gkn[ns] = m
}
m[gkn] = struct{}{}
}
// Initialize the informer map with references to track their start/stop.
// This is the only time we modify the map.
// So it should be safe to read from multiple threads after this.
w.informerRefs = make(map[GroupKindNamespace]*informerReference, len(w.Targets))
for _, gkn := range w.Targets {
w.informerRefs[gkn] = &informerReference{}
}
ctx, cancel := context.WithCancel(ctx)
w.context = ctx
w.cancel = cancel
// Use an event funnel to multiplex events through multiple input channels
// into out output channel. We can't use the normal fan-in pattern, because
// we need to be able to add and remove new input channels at runtime, as
// new informers are created and destroyed.
w.funnel = newEventFunnel(ctx)
// Send start requests.
for _, gkn := range w.Targets {
w.startInformer(gkn)
}
w.started = true
// Block until the event funnel is closed.
// The event funnel will close after all the informer channels are closed.
// The informer channels will close after the informers have stopped.
// The informers will stop after their context is cancelled.
go func() {
<-w.funnel.Done()
w.lock.Lock()
defer w.lock.Unlock()
w.stopped = true
}()
// Wait until all informers are synced or stopped, then send a SyncEvent.
syncEventCh := make(chan event.Event)
err := w.funnel.AddInputChannel(syncEventCh)
if err != nil {
// Reporter already stopped.
return handleFatalError(fmt.Errorf("reporter failed to start: %v", err))
}
go func() {
defer close(syncEventCh)
// TODO: should we use something less aggressive, like wait.BackoffUntil?
if cache.WaitForCacheSync(ctx.Done(), w.HasSynced) {
syncEventCh <- event.Event{
Type: event.SyncEvent,
}
}
}()
return w.funnel.OutputChannel()
}
// Stop triggers the cancellation of the reporter context, and closure of the
// event channel without sending an error event.
func (w *ObjectStatusReporter) Stop() {
klog.V(4).Info("Stopping reporter")
w.cancel()
}
// HasSynced returns true if all the started informers have been synced.
//
// Use the following to block waiting for synchronization:
// synced := cache.WaitForCacheSync(stopCh, informer.HasSynced)
func (w *ObjectStatusReporter) HasSynced() bool {
w.lock.Lock()
defer w.lock.Unlock()
if w.stopped || !w.started {
return false
}
pending := make([]GroupKindNamespace, 0, len(w.informerRefs))
for gke, informer := range w.informerRefs {
if informer.HasStarted() && !informer.HasSynced() {
pending = append(pending, gke)
}
}
if len(pending) > 0 {
klog.V(5).Infof("Informers pending synchronization: %v", pending)
return false
}
return true
}
// startInformer adds the specified GroupKindNamespace to the start channel to
// be started asynchronously.
func (w *ObjectStatusReporter) startInformer(gkn GroupKindNamespace) {
ctx, ok := w.informerRefs[gkn].Start(w.context)
if !ok {
klog.V(5).Infof("Watch start skipped (already started): %v", gkn)
// already started
return
}
go w.startInformerWithRetry(ctx, gkn)
}
// stopInformer stops the informer watching the specified GroupKindNamespace.
func (w *ObjectStatusReporter) stopInformer(gkn GroupKindNamespace) {
w.informerRefs[gkn].Stop()
}
func (w *ObjectStatusReporter) startInformerWithRetry(ctx context.Context, gkn GroupKindNamespace) {
realClock := &clock.RealClock{}
// TODO nolint can be removed once https://github.com/kubernetes/kubernetes/issues/118638 is resolved
backoffManager := wait.NewExponentialBackoffManager(800*time.Millisecond, 30*time.Second, 2*time.Minute, 2.0, 1.0, realClock) //nolint:staticcheck
retryCtx, retryCancel := context.WithCancel(ctx)
wait.BackoffUntil(func() {
err := w.startInformerNow(
ctx,
gkn,
)
if err != nil {
if meta.IsNoMatchError(err) {
// CRD (or api extension) not installed
// TODO: retry if CRDs are not being watched
klog.V(3).Infof("Watch start error (blocking until CRD is added): %v: %v", gkn, err)
// Cancel the parent context, which will stop the retries too.
w.stopInformer(gkn)
return
}
// Create a temporary input channel to send the error event.
eventCh := make(chan event.Event)
defer close(eventCh)
err := w.funnel.AddInputChannel(eventCh)
if err != nil {
// Reporter already stopped.
// This is fine. 🔥
klog.V(5).Infof("Informer failed to start: %v", err)
return
}
// Send error event and stop the reporter!
w.handleFatalError(eventCh, err)
return
}
// Success! - Stop retrying
retryCancel()
}, backoffManager, true, retryCtx.Done())
}
// startInformerNow starts an informer to watch for changes to a
// GroupKindNamespace. Changes are filtered and passed by event channel into the
// funnel. Each update event includes the computed status of the object.
// An error is returned if the informer could not be created.
func (w *ObjectStatusReporter) startInformerNow(
ctx context.Context,
gkn GroupKindNamespace,
) error {
// Look up the mapping for this GroupKind.
// If it doesn't exist, either delay watching or emit an error.
gk := gkn.GroupKind()
mapping, err := w.Mapper.RESTMapping(gk)
if err != nil {
// Might be a NoResourceMatchError/NoKindMatchError
return err
}
informer := w.InformerFactory.NewInformer(ctx, mapping, gkn.Namespace)
w.informerRefs[gkn].SetInformer(informer)
eventCh := make(chan event.Event)
// Add this event channel to the output multiplexer
err = w.funnel.AddInputChannel(eventCh)
if err != nil {
// Reporter already stopped.
return fmt.Errorf("informer failed to build event handler: %w", err)
}
// Handler called when ListAndWatch errors.
// Custom handler stops the informer if the resource is NotFound (CRD deleted).
err = informer.SetWatchErrorHandler(func(r *cache.Reflector, err error) {
w.watchErrorHandler(gkn, eventCh, err)
})
if err != nil {
// Should never happen.
// Informer can't have started yet. We just created it.
return fmt.Errorf("failed to set error handler on new informer for %v: %v", mapping.Resource, err)
}
_, err = informer.AddEventHandler(w.eventHandler(ctx, eventCh))
if err != nil {
// Should never happen.
return fmt.Errorf("failed add event handler on new informer for %v: %v", mapping.Resource, err)
}
// Start the informer in the background.
// Informer will be stopped when the context is cancelled.
go func() {
klog.V(3).Infof("Watch starting: %v", gkn)
informer.Run(ctx.Done())
klog.V(3).Infof("Watch stopped: %v", gkn)
// Signal to the caller there will be no more events for this GroupKind.
close(eventCh)
}()
return nil
}
func (w *ObjectStatusReporter) forEachTargetWithGroupKind(gk schema.GroupKind, fn func(GroupKindNamespace)) {
for gkn := range w.gk2gkn[gk] {
fn(gkn)
}
}
func (w *ObjectStatusReporter) forEachTargetWithNamespace(ns string, fn func(GroupKindNamespace)) {
for gkn := range w.ns2gkn[ns] {
fn(gkn)
}
}
// readStatusFromObject is a convenience function to read object status with a
// StatusReader using a ClusterReader to retrieve generated objects.
func (w *ObjectStatusReporter) readStatusFromObject(
ctx context.Context,
obj *unstructured.Unstructured,
) (*event.ResourceStatus, error) {
return w.StatusReader.ReadStatusForObject(ctx, w.ClusterReader, obj)
}
// readStatusFromCluster is a convenience function to read object status with a
// StatusReader using a ClusterReader to retrieve the object and its generated
// objects.
func (w *ObjectStatusReporter) readStatusFromCluster(
ctx context.Context,
id object.ObjMetadata,
) (*event.ResourceStatus, error) {
return w.StatusReader.ReadStatus(ctx, w.ClusterReader, id)
}
// deletedStatus builds a ResourceStatus for a deleted object.
//
// StatusReader.ReadStatusForObject doesn't handle nil objects as input. So
// this builds the status manually.
// TODO: find a way to delegate this back to the status package.
func deletedStatus(id object.ObjMetadata) *event.ResourceStatus {
// Status is always NotFound after deltion.
// Passed obj represents the last known state, not the current state.
result := &event.ResourceStatus{
Identifier: id,
Status: status.NotFoundStatus,
Message: "Resource not found",
}
return &event.ResourceStatus{
Identifier: id,
Resource: nil, // deleted object has no
Status: result.Status,
Message: result.Message,
// If deleted with foreground deletion, a finalizer will have blocked
// deletion until all the generated resources are deleted.
// TODO: Handle lookup of generated resources when not using foreground deletion.
GeneratedResources: nil,
}
}
// eventHandler builds an event handler to compute object status.
// Returns an event channel on which these stats updates will be reported.
func (w *ObjectStatusReporter) eventHandler(
ctx context.Context,
eventCh chan<- event.Event,
) cache.ResourceEventHandler {
var handler cache.ResourceEventHandlerFuncs
handler.AddFunc = func(iobj interface{}) {
// Bail early if the context is cancelled, to avoid unnecessary work.
if ctx.Err() != nil {
return
}
obj, ok := iobj.(*unstructured.Unstructured)
if !ok {
panic(fmt.Sprintf("AddFunc received unexpected object type %T", iobj))
}
id := object.UnstructuredToObjMetadata(obj)
if w.ObjectFilter.Filter(obj) {
klog.V(7).Infof("Watch Event Skipped: AddFunc: %s", id)
return
}
klog.V(5).Infof("AddFunc: Computing status for object: %s", id)
// cancel any scheduled status update for this object
w.taskManager.Cancel(id)
rs, err := w.readStatusFromObject(ctx, obj)
if err != nil {
// Send error event and stop the reporter!
w.handleFatalError(eventCh, fmt.Errorf("failed to compute object status: %s: %w", id, err))
return
}
if object.IsNamespace(obj) {
klog.V(5).Infof("AddFunc: Namespace added: %v", id)
w.onNamespaceAdd(obj)
} else if object.IsCRD(obj) {
klog.V(5).Infof("AddFunc: CRD added: %v", id)
w.onCRDAdd(obj)
}
if isObjectUnschedulable(rs) {
klog.V(5).Infof("AddFunc: object unschedulable: %v", id)
// schedule delayed status update
w.taskManager.Schedule(ctx, id, status.ScheduleWindow,
w.newStatusCheckTaskFunc(ctx, eventCh, id))
}
klog.V(7).Infof("AddFunc: sending update event: %v", rs)
eventCh <- event.Event{
Type: event.ResourceUpdateEvent,
Resource: rs,
}
}
handler.UpdateFunc = func(_, iobj interface{}) {
// Bail early if the context is cancelled, to avoid unnecessary work.
if ctx.Err() != nil {
return
}
obj, ok := iobj.(*unstructured.Unstructured)
if !ok {
panic(fmt.Sprintf("UpdateFunc received unexpected object type %T", iobj))
}
id := object.UnstructuredToObjMetadata(obj)
if w.ObjectFilter.Filter(obj) {
klog.V(7).Infof("UpdateFunc: Watch Event Skipped: %s", id)
return
}
klog.V(5).Infof("UpdateFunc: Computing status for object: %s", id)
// cancel any scheduled status update for this object
w.taskManager.Cancel(id)
rs, err := w.readStatusFromObject(ctx, obj)
if err != nil {
// Send error event and stop the reporter!
w.handleFatalError(eventCh, fmt.Errorf("failed to compute object status: %s: %w", id, err))
return
}
if object.IsNamespace(obj) {
klog.V(5).Infof("UpdateFunc: Namespace updated: %v", id)
w.onNamespaceUpdate(obj)
} else if object.IsCRD(obj) {
klog.V(5).Infof("UpdateFunc: CRD updated: %v", id)
w.onCRDUpdate(obj)
}
if isObjectUnschedulable(rs) {
klog.V(5).Infof("UpdateFunc: object unschedulable: %v", id)
// schedule delayed status update
w.taskManager.Schedule(ctx, id, status.ScheduleWindow,
w.newStatusCheckTaskFunc(ctx, eventCh, id))
}
klog.V(7).Infof("UpdateFunc: sending update event: %v", rs)
eventCh <- event.Event{
Type: event.ResourceUpdateEvent,
Resource: rs,
}
}
handler.DeleteFunc = func(iobj interface{}) {
// Bail early if the context is cancelled, to avoid unnecessary work.
if ctx.Err() != nil {
return
}
if tombstone, ok := iobj.(cache.DeletedFinalStateUnknown); ok {
// Last state unknown. Possibly stale.
// TODO: Should we propegate this uncertainty to the caller?
iobj = tombstone.Obj
}
obj, ok := iobj.(*unstructured.Unstructured)
if !ok {
panic(fmt.Sprintf("DeleteFunc received unexpected object type %T", iobj))
}
id := object.UnstructuredToObjMetadata(obj)
if w.ObjectFilter.Filter(obj) {
klog.V(7).Infof("DeleteFunc: Watch Event Skipped: %s", id)
return
}
klog.V(5).Infof("DeleteFunc: Computing status for object: %s", id)
// cancel any scheduled status update for this object
w.taskManager.Cancel(id)
if object.IsNamespace(obj) {
klog.V(5).Infof("DeleteFunc: Namespace deleted: %v", id)
w.onNamespaceDelete(obj)
} else if object.IsCRD(obj) {
klog.V(5).Infof("DeleteFunc: CRD deleted: %v", id)
w.onCRDDelete(obj)
}
rs := deletedStatus(id)
klog.V(7).Infof("DeleteFunc: sending update event: %v", rs)
eventCh <- event.Event{
Type: event.ResourceUpdateEvent,
Resource: rs,
}
}
return handler
}
// onCRDAdd handles creating a new informer to watch the new resource type.
func (w *ObjectStatusReporter) onCRDAdd(obj *unstructured.Unstructured) {
gk, found := object.GetCRDGroupKind(obj)
if !found {
id := object.UnstructuredToObjMetadata(obj)
klog.Warningf("Invalid CRD added: missing group and/or kind: %v", id)
// Don't return an error, because this should not inturrupt the task queue.
// TODO: Allow non-fatal errors to be reported using a specific error type.
return
}
klog.V(3).Infof("CRD added for %s", gk)
klog.V(3).Info("Resetting RESTMapper")
// Reset mapper to invalidate cache.
meta.MaybeResetRESTMapper(w.Mapper)
w.forEachTargetWithGroupKind(gk, func(gkn GroupKindNamespace) {
w.startInformer(gkn)
})
}
// onCRDUpdate handles creating a new informer to watch the updated resource type.
func (w *ObjectStatusReporter) onCRDUpdate(newObj *unstructured.Unstructured) {
gk, found := object.GetCRDGroupKind(newObj)
if !found {
id := object.UnstructuredToObjMetadata(newObj)
klog.Warningf("Invalid CRD updated: missing group and/or kind: %v", id)
// Don't return an error, because this should not inturrupt the task queue.
// TODO: Allow non-fatal errors to be reported using a specific error type.
return
}
klog.V(3).Infof("CRD updated for %s", gk)
klog.V(3).Info("Resetting RESTMapper")
// Reset mapper to invalidate cache.
meta.MaybeResetRESTMapper(w.Mapper)
w.forEachTargetWithGroupKind(gk, func(gkn GroupKindNamespace) {
w.startInformer(gkn)
})
}
// onCRDDelete handles stopping the informer watching the deleted resource type.
func (w *ObjectStatusReporter) onCRDDelete(oldObj *unstructured.Unstructured) {
gk, found := object.GetCRDGroupKind(oldObj)
if !found {
id := object.UnstructuredToObjMetadata(oldObj)
klog.Warningf("Invalid CRD deleted: missing group and/or kind: %v", id)
// Don't return an error, because this should not inturrupt the task queue.
// TODO: Allow non-fatal errors to be reported using a specific error type.
return
}
klog.V(3).Infof("CRD deleted for %s", gk)
w.forEachTargetWithGroupKind(gk, func(gkn GroupKindNamespace) {
w.stopInformer(gkn)
})
klog.V(3).Info("Resetting RESTMapper")
// Reset mapper to invalidate cache.
meta.MaybeResetRESTMapper(w.Mapper)
}
// onNamespaceAdd handles creating new informers to watch this namespace.
func (w *ObjectStatusReporter) onNamespaceAdd(obj *unstructured.Unstructured) {
if w.RESTScope == meta.RESTScopeRoot {
// When watching resources across all namespaces,
// we don't need to start or stop any
// namespace-specific informers.
return
}
namespace := obj.GetName()
w.forEachTargetWithNamespace(namespace, func(gkn GroupKindNamespace) {
w.startInformer(gkn)
})
}
// onNamespaceUpdate handles creating new informers to watch this namespace.
func (w *ObjectStatusReporter) onNamespaceUpdate(obj *unstructured.Unstructured) {
if w.RESTScope == meta.RESTScopeRoot {
// When watching resources across all namespaces,
// we don't need to start or stop any
// namespace-specific informers.
return
}
namespace := obj.GetName()
w.forEachTargetWithNamespace(namespace, func(gkn GroupKindNamespace) {
w.startInformer(gkn)
})
}
// onNamespaceDelete handles stopping informers watching this namespace.
func (w *ObjectStatusReporter) onNamespaceDelete(obj *unstructured.Unstructured) {
if w.RESTScope == meta.RESTScopeRoot {
// When watching resources across all namespaces,
// we don't need to start or stop any
// namespace-specific informers.
return
}
namespace := obj.GetName()
w.forEachTargetWithNamespace(namespace, func(gkn GroupKindNamespace) {
w.stopInformer(gkn)
})
}
// newStatusCheckTaskFunc returns a taskFund that reads the status of an object
// from the cluster and sends it over the event channel.
//
// This method should only be used for generated resource objects, as it's much
// slower at scale than watching the resource for updates.
func (w *ObjectStatusReporter) newStatusCheckTaskFunc(
ctx context.Context,
eventCh chan<- event.Event,
id object.ObjMetadata,
) taskFunc {
return func() {
klog.V(5).Infof("Re-reading object status: %s", id)
// check again
rs, err := w.readStatusFromCluster(ctx, id)
if err != nil {
// Send error event and stop the reporter!
// TODO: retry N times before terminating
w.handleFatalError(eventCh, err)
return
}
eventCh <- event.Event{
Type: event.ResourceUpdateEvent,
Resource: rs,
}
}
}
func (w *ObjectStatusReporter) handleFatalError(eventCh chan<- event.Event, err error) {
klog.V(5).Infof("Reporter error: %v", err)
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return
}
eventCh <- event.Event{
Type: event.ErrorEvent,
Error: err,
}
w.Stop()
}
// watchErrorHandler logs errors and cancels the informer for this GroupKind
// if the NotFound error is received, which usually means the CRD was deleted.
// Based on DefaultWatchErrorHandler from k8s.io/client-go@v0.23.2/tools/cache/reflector.go
func (w *ObjectStatusReporter) watchErrorHandler(gkn GroupKindNamespace, eventCh chan<- event.Event, err error) {
switch {
// Stop channel closed
case err == io.EOF:
klog.V(5).Infof("ListAndWatch error (termination expected): %v: %v", gkn, err)
// Watch connection closed
case err == io.ErrUnexpectedEOF:
klog.V(1).Infof("ListAndWatch error (retry expected): %v: %v", gkn, err)
// Context done
case errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded):
klog.V(5).Infof("ListAndWatch error (termination expected): %v: %v", gkn, err)
// resourceVersion too old
case apierrors.IsResourceExpired(err):
// Keep retrying
klog.V(5).Infof("ListAndWatch error (retry expected): %v: %v", gkn, err)
// Resource unregistered (DEPRECATED, see NotFound)
case apierrors.IsGone(err):
klog.V(5).Infof("ListAndWatch error (retry expected): %v: %v", gkn, err)
// Resource not registered
case apierrors.IsNotFound(err):
klog.V(3).Infof("ListAndWatch error (termination expected): %v: stopping all informers for this GroupKind: %v", gkn, err)
w.forEachTargetWithGroupKind(gkn.GroupKind(), func(gkn GroupKindNamespace) {
w.stopInformer(gkn)
})
// Insufficient permissions
case apierrors.IsForbidden(err):
klog.V(3).Infof("ListAndWatch error (termination expected): %v: stopping all informers: %v", gkn, err)
w.handleFatalError(eventCh, err)
// Unexpected error
default:
klog.Warningf("ListAndWatch error (retry expected): %v: %v", gkn, err)
}
}
// informerReference tracks informer lifecycle.
type informerReference struct {
// lock guards the subsequent stateful fields
lock sync.Mutex
informer cache.SharedIndexInformer
context context.Context
cancel context.CancelFunc
started bool
}
// Start returns a wrapped context that can be cancelled.
// Returns nil & false if already started.
func (ir *informerReference) Start(ctx context.Context) (context.Context, bool) {
ir.lock.Lock()
defer ir.lock.Unlock()
if ir.started {
return nil, false
}
ctx, cancel := context.WithCancel(ctx)
ir.context = ctx
ir.cancel = cancel
ir.started = true
return ctx, true
}
func (ir *informerReference) SetInformer(informer cache.SharedIndexInformer) {
ir.lock.Lock()
defer ir.lock.Unlock()
ir.informer = informer
}
func (ir *informerReference) HasSynced() bool {
ir.lock.Lock()
defer ir.lock.Unlock()
if !ir.started {
return false
}
if ir.informer == nil {
return false
}
return ir.informer.HasSynced()
}
func (ir *informerReference) HasStarted() bool {
ir.lock.Lock()
defer ir.lock.Unlock()
return ir.started
}
// Stop cancels the context, if it's been started.
func (ir *informerReference) Stop() {
ir.lock.Lock()
defer ir.lock.Unlock()
if !ir.started {
return
}
ir.cancel()
ir.started = false
ir.context = nil
}
type taskFunc func()
// taskManager manages a set of tasks with object identifiers.
// This makes starting and stopping the tasks thread-safe.
type taskManager struct {
lock sync.Mutex
cancelFuncs map[object.ObjMetadata]context.CancelFunc
}
func (tm *taskManager) Schedule(parentCtx context.Context, id object.ObjMetadata, delay time.Duration, task taskFunc) {
tm.lock.Lock()
defer tm.lock.Unlock()
if tm.cancelFuncs == nil {
tm.cancelFuncs = make(map[object.ObjMetadata]context.CancelFunc)
}
cancel, found := tm.cancelFuncs[id]
if found {
// Cancel the existing scheduled task and replace it.
cancel()
}
taskCtx, cancel := context.WithTimeout(context.Background(), delay)
tm.cancelFuncs[id] = cancel
go func() {
klog.V(5).Infof("Task scheduled (%v) for object (%s)", delay, id)
select {
case <-parentCtx.Done():
// stop waiting
cancel()
case <-taskCtx.Done():
if taskCtx.Err() == context.DeadlineExceeded {
klog.V(5).Infof("Task executing (after %v) for object (%v)", delay, id)
task()
}
// else stop waiting
}
}()
}
func (tm *taskManager) Cancel(id object.ObjMetadata) {
tm.lock.Lock()
defer tm.lock.Unlock()
cancelFunc, found := tm.cancelFuncs[id]
if !found {
// already cancelled or not added
return
}
delete(tm.cancelFuncs, id)
cancelFunc()
if len(tm.cancelFuncs) == 0 {
tm.cancelFuncs = nil
}
}
@@ -0,0 +1,25 @@
// Code generated by "stringer -type=RESTScopeStrategy -linecomment"; DO NOT EDIT.
package watcher
import "strconv"
func _() {
// An "invalid array index" compiler error signifies that the constant values have changed.
// Re-run the stringer command to generate them again.
var x [1]struct{}
_ = x[RESTScopeAutomatic-0]
_ = x[RESTScopeRoot-1]
_ = x[RESTScopeNamespace-2]
}
const _RESTScopeStrategy_name = "automaticrootnamespace"
var _RESTScopeStrategy_index = [...]uint8{0, 9, 13, 22}
func (i RESTScopeStrategy) String() string {
if i < 0 || i >= RESTScopeStrategy(len(_RESTScopeStrategy_index)-1) {
return "RESTScopeStrategy(" + strconv.FormatInt(int64(i), 10) + ")"
}
return _RESTScopeStrategy_name[_RESTScopeStrategy_index[i]:_RESTScopeStrategy_index[i+1]]
}
@@ -0,0 +1,69 @@
// Copyright 2022 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
package watcher
import (
"github.com/fluxcd/cli-utils/pkg/kstatus/polling/event"
"github.com/fluxcd/cli-utils/pkg/kstatus/status"
"github.com/fluxcd/cli-utils/pkg/object"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
)
// isObjectUnschedulable returns true if the object or any of its generated resources
// is an unschedulable pod.
//
// This status is computed recursively, so it can handle objects that generate
// objects that generate pods, as long as the input ResourceStatus has those
// GeneratedResources computed.
func isObjectUnschedulable(rs *event.ResourceStatus) bool {
if rs.Error != nil {
return false
}
if rs.Status != status.InProgressStatus {
return false
}
if isPodUnschedulable(rs.Resource) {
return true
}
// recurse through generated resources
for _, subRS := range rs.GeneratedResources {
if isObjectUnschedulable(subRS) {
return true
}
}
return false
}
// isPodUnschedulable returns true if the object is a pod and is unschedulable
// according to a False PodScheduled condition.
func isPodUnschedulable(obj *unstructured.Unstructured) bool {
if obj == nil {
return false
}
gk := obj.GroupVersionKind().GroupKind()
if gk != (schema.GroupKind{Kind: "Pod"}) {
return false
}
icnds, found, err := object.NestedField(obj.Object, "status", "conditions")
if err != nil || !found {
return false
}
cnds, ok := icnds.([]interface{})
if !ok {
return false
}
for _, icnd := range cnds {
cnd, ok := icnd.(map[string]interface{})
if !ok {
return false
}
if cnd["type"] == "PodScheduled" &&
cnd["status"] == "False" &&
cnd["reason"] == "Unschedulable" {
return true
}
}
return false
}
+38
View File
@@ -0,0 +1,38 @@
// Copyright 2022 The Kubernetes Authors.
// SPDX-License-Identifier: Apache-2.0
package watcher
import (
"context"
"github.com/fluxcd/cli-utils/pkg/kstatus/polling/event"
"github.com/fluxcd/cli-utils/pkg/object"
)
// StatusWatcher watches a set of objects for status updates.
type StatusWatcher interface {
// Watch a set of objects for status updates.
// Watching should stop if the context is cancelled.
// Events should only be sent for the specified objects.
// The event channel should be closed when the watching stops.
Watch(context.Context, object.ObjMetadataSet, Options) <-chan event.Event
}
// Options can be provided when creating a new StatusWatcher to customize the
// behavior.
type Options struct {
// RESTScopeStrategy specifies which strategy to use when listing and
// watching resources. By default, the strategy is selected automatically.
RESTScopeStrategy RESTScopeStrategy
}
//go:generate stringer -type=RESTScopeStrategy -linecomment
type RESTScopeStrategy int
const (
RESTScopeAutomatic RESTScopeStrategy = iota // automatic
RESTScopeRoot // root
RESTScopeNamespace // namespace
)