updated vendor
This commit is contained in:
-181
@@ -1,181 +0,0 @@
|
||||
/*
|
||||
Copyright 2023 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package consistencydetector
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"reflect"
|
||||
"sort"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/diff"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
var dataConsistencyDetectionForWatchListEnabled = false
|
||||
|
||||
func init() {
|
||||
dataConsistencyDetectionForWatchListEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_WATCHLIST_INCONSISTENCY_DETECTOR"))
|
||||
}
|
||||
|
||||
// IsDataConsistencyDetectionForWatchListEnabled returns true when
|
||||
// the KUBE_WATCHLIST_INCONSISTENCY_DETECTOR environment variable was set during a binary startup.
|
||||
func IsDataConsistencyDetectionForWatchListEnabled() bool {
|
||||
return dataConsistencyDetectionForWatchListEnabled
|
||||
}
|
||||
|
||||
// SetDataConsistencyDetectionForWatchListEnabledForTest allows to enable/disable data consistency detection for testing purposes.
|
||||
// It returns a function that restores the original value.
|
||||
func SetDataConsistencyDetectionForWatchListEnabledForTest(enabled bool) func() {
|
||||
original := dataConsistencyDetectionForWatchListEnabled
|
||||
dataConsistencyDetectionForWatchListEnabled = enabled
|
||||
return func() {
|
||||
dataConsistencyDetectionForWatchListEnabled = original
|
||||
}
|
||||
}
|
||||
|
||||
type RetrieveItemsFunc[U any] func() []U
|
||||
|
||||
type ListFunc[T runtime.Object] func(ctx context.Context, options metav1.ListOptions) (T, error)
|
||||
|
||||
type TransformFunc func(interface{}) (interface{}, error)
|
||||
|
||||
// CheckDataConsistency exists solely for testing purposes.
|
||||
// we cannot use checkWatchListDataConsistencyIfRequested because
|
||||
// it is guarded by an environmental variable.
|
||||
// we cannot manipulate the environmental variable because
|
||||
// it will affect other tests in this package.
|
||||
func CheckDataConsistency[T runtime.Object, U any](ctx context.Context, identity string, lastSyncedResourceVersion string, listFn ListFunc[T], listItemTransformFunc TransformFunc, listOptions metav1.ListOptions, retrieveItemsFn RetrieveItemsFunc[U]) {
|
||||
if !canFormAdditionalListCall(lastSyncedResourceVersion, listOptions) {
|
||||
klog.V(4).Infof("data consistency check for %s is enabled but the parameters (RV, ListOptions) doesn't allow for creating a valid LIST request. Skipping the data consistency check.", identity)
|
||||
return
|
||||
}
|
||||
klog.Warningf("data consistency check for %s is enabled, this will result in an additional call to the API server.", identity)
|
||||
|
||||
retrievedItems := toMetaObjectSliceOrDie(retrieveItemsFn())
|
||||
listOptions = prepareListCallOptions(lastSyncedResourceVersion, listOptions, len(retrievedItems))
|
||||
var list runtime.Object
|
||||
err := wait.PollUntilContextCancel(ctx, time.Second, true, func(_ context.Context) (done bool, err error) {
|
||||
list, err = listFn(ctx, listOptions)
|
||||
if err != nil {
|
||||
// the consistency check will only be enabled in the CI
|
||||
// and LIST calls in general will be retired by the client-go library
|
||||
// if we fail simply log and retry
|
||||
klog.Errorf("failed to list data from the server, retrying until stopCh is closed, err: %v", err)
|
||||
return false, nil
|
||||
}
|
||||
return true, nil
|
||||
})
|
||||
if err != nil {
|
||||
klog.Errorf("failed to list data from the server, the data consistency check for %s won't be performed, stopCh was closed, err: %v", identity, err)
|
||||
return
|
||||
}
|
||||
|
||||
rawListItems, err := meta.ExtractListWithAlloc(list)
|
||||
if err != nil {
|
||||
panic(err) // this should never happen
|
||||
}
|
||||
if listItemTransformFunc != nil {
|
||||
for i := range rawListItems {
|
||||
obj, err := listItemTransformFunc(rawListItems[i])
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
rawListItems[i] = obj.(runtime.Object)
|
||||
}
|
||||
}
|
||||
listItems := toMetaObjectSliceOrDie(rawListItems)
|
||||
|
||||
sort.Sort(byUID(listItems))
|
||||
sort.Sort(byUID(retrievedItems))
|
||||
|
||||
if !reflect.DeepEqual(listItems, retrievedItems) {
|
||||
klog.Infof("previously received data for %s is different than received by the standard list api call against etcd, diff: %v", identity, diff.Diff(listItems, retrievedItems))
|
||||
msg := fmt.Sprintf("data inconsistency detected for %s, panicking!", identity)
|
||||
panic(msg)
|
||||
}
|
||||
}
|
||||
|
||||
// canFormAdditionalListCall ensures that we can form a valid LIST requests
|
||||
// for checking data consistency.
|
||||
func canFormAdditionalListCall(lastSyncedResourceVersion string, listOptions metav1.ListOptions) bool {
|
||||
// since we are setting ResourceVersionMatch to metav1.ResourceVersionMatchExact
|
||||
// we need to make sure that the continuation hasn't been set
|
||||
// https://github.com/kubernetes/kubernetes/blob/be4afb9ef90b19ccb6f7e595cbdb247e088b2347/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/validation/validation.go#L38
|
||||
if len(listOptions.Continue) > 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
// since we are setting ResourceVersionMatch to metav1.ResourceVersionMatchExact
|
||||
// we need to make sure that the RV is valid because the validation code forbids RV == "0"
|
||||
// https://github.com/kubernetes/kubernetes/blob/be4afb9ef90b19ccb6f7e595cbdb247e088b2347/staging/src/k8s.io/apimachinery/pkg/apis/meta/internalversion/validation/validation.go#L44
|
||||
if lastSyncedResourceVersion == "0" {
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// prepareListCallOptions changes the input list options so that
|
||||
// the list call goes directly to etcd
|
||||
func prepareListCallOptions(lastSyncedResourceVersion string, listOptions metav1.ListOptions, retrievedItemsCount int) metav1.ListOptions {
|
||||
// this is our legacy case:
|
||||
//
|
||||
// the watch cache skips the Limit if the ResourceVersion was set to "0"
|
||||
// thus, to compare with data retrieved directly from etcd
|
||||
// we need to skip the limit to for the list call as well.
|
||||
//
|
||||
// note that when the number of retrieved items is less than the request limit,
|
||||
// it means either the watch cache is disabled, or there is not enough data.
|
||||
// in both cases, we can use the limit because we will be able to compare
|
||||
// the data with the items retrieved from etcd.
|
||||
if listOptions.ResourceVersion == "0" && listOptions.Limit > 0 && int64(retrievedItemsCount) > listOptions.Limit {
|
||||
listOptions.Limit = 0
|
||||
}
|
||||
|
||||
// set the RV and RVM so that we get the snapshot of data
|
||||
// directly from etcd.
|
||||
listOptions.ResourceVersion = lastSyncedResourceVersion
|
||||
listOptions.ResourceVersionMatch = metav1.ResourceVersionMatchExact
|
||||
|
||||
return listOptions
|
||||
}
|
||||
|
||||
type byUID []metav1.Object
|
||||
|
||||
func (a byUID) Len() int { return len(a) }
|
||||
func (a byUID) Less(i, j int) bool { return a[i].GetUID() < a[j].GetUID() }
|
||||
func (a byUID) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
|
||||
|
||||
func toMetaObjectSliceOrDie[T any](s []T) []metav1.Object {
|
||||
result := make([]metav1.Object, len(s))
|
||||
for i, v := range s {
|
||||
m, err := meta.Accessor(v)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
result[i] = m
|
||||
}
|
||||
return result
|
||||
}
|
||||
-10
@@ -1,10 +0,0 @@
|
||||
# See the OWNERS docs at https://go.k8s.io/owners
|
||||
approvers:
|
||||
- apelisse
|
||||
- alexzielenski
|
||||
reviewers:
|
||||
- apelisse
|
||||
- alexzielenski
|
||||
- KnVerey
|
||||
labels:
|
||||
- sig/api-machinery
|
||||
-30
@@ -1,30 +0,0 @@
|
||||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package csaupgrade
|
||||
|
||||
type Option func(*options)
|
||||
|
||||
// Subresource set the subresource to upgrade from CSA to SSA.
|
||||
func Subresource(s string) Option {
|
||||
return func(opts *options) {
|
||||
opts.subresource = s
|
||||
}
|
||||
}
|
||||
|
||||
type options struct {
|
||||
subresource string
|
||||
}
|
||||
-334
@@ -1,334 +0,0 @@
|
||||
/*
|
||||
Copyright 2022 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package csaupgrade
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"reflect"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"sigs.k8s.io/structured-merge-diff/v6/fieldpath"
|
||||
)
|
||||
|
||||
// Finds all managed fields owners of the given operation type which owns all of
|
||||
// the fields in the given set
|
||||
//
|
||||
// If there is an error decoding one of the fieldsets for any reason, it is ignored
|
||||
// and assumed not to match the query.
|
||||
func FindFieldsOwners(
|
||||
managedFields []metav1.ManagedFieldsEntry,
|
||||
operation metav1.ManagedFieldsOperationType,
|
||||
fields *fieldpath.Set,
|
||||
) []metav1.ManagedFieldsEntry {
|
||||
var result []metav1.ManagedFieldsEntry
|
||||
for _, entry := range managedFields {
|
||||
if entry.Operation != operation {
|
||||
continue
|
||||
}
|
||||
|
||||
fieldSet, err := decodeManagedFieldsEntrySet(entry)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if fields.Difference(&fieldSet).Empty() {
|
||||
result = append(result, entry)
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// Upgrades the Manager information for fields managed with client-side-apply (CSA)
|
||||
// Prepares fields owned by `csaManager` for 'Update' operations for use now
|
||||
// with the given `ssaManager` for `Apply` operations.
|
||||
//
|
||||
// This transformation should be performed on an object if it has been previously
|
||||
// managed using client-side-apply to prepare it for future use with
|
||||
// server-side-apply.
|
||||
//
|
||||
// Caveats:
|
||||
// 1. This operation is not reversible. Information about which fields the client
|
||||
// owned will be lost in this operation.
|
||||
// 2. Supports being performed either before or after initial server-side apply.
|
||||
// 3. Client-side apply tends to own more fields (including fields that are defaulted),
|
||||
// this will possibly remove this defaults, they will be re-defaulted, that's fine.
|
||||
// 4. Care must be taken to not overwrite the managed fields on the server if they
|
||||
// have changed before sending a patch.
|
||||
//
|
||||
// obj - Target of the operation which has been managed with CSA in the past
|
||||
// csaManagerNames - Names of FieldManagers to merge into ssaManagerName
|
||||
// ssaManagerName - Name of FieldManager to be used for `Apply` operations
|
||||
func UpgradeManagedFields(
|
||||
obj runtime.Object,
|
||||
csaManagerNames sets.Set[string],
|
||||
ssaManagerName string,
|
||||
opts ...Option,
|
||||
) error {
|
||||
o := options{}
|
||||
for _, opt := range opts {
|
||||
opt(&o)
|
||||
}
|
||||
|
||||
accessor, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
filteredManagers := accessor.GetManagedFields()
|
||||
|
||||
for csaManagerName := range csaManagerNames {
|
||||
filteredManagers, err = upgradedManagedFields(
|
||||
filteredManagers, csaManagerName, ssaManagerName, o)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Commit changes to object
|
||||
accessor.SetManagedFields(filteredManagers)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Calculates a minimal JSON Patch to send to upgrade managed fields
|
||||
// See `UpgradeManagedFields` for more information.
|
||||
//
|
||||
// obj - Target of the operation which has been managed with CSA in the past
|
||||
// csaManagerNames - Names of FieldManagers to merge into ssaManagerName
|
||||
// ssaManagerName - Name of FieldManager to be used for `Apply` operations
|
||||
//
|
||||
// Returns non-nil error if there was an error, a JSON patch, or nil bytes if
|
||||
// there is no work to be done.
|
||||
func UpgradeManagedFieldsPatch(
|
||||
obj runtime.Object,
|
||||
csaManagerNames sets.Set[string],
|
||||
ssaManagerName string,
|
||||
opts ...Option,
|
||||
) ([]byte, error) {
|
||||
o := options{}
|
||||
for _, opt := range opts {
|
||||
opt(&o)
|
||||
}
|
||||
|
||||
accessor, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
managedFields := accessor.GetManagedFields()
|
||||
filteredManagers := accessor.GetManagedFields()
|
||||
for csaManagerName := range csaManagerNames {
|
||||
filteredManagers, err = upgradedManagedFields(
|
||||
filteredManagers, csaManagerName, ssaManagerName, o)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if reflect.DeepEqual(managedFields, filteredManagers) {
|
||||
// If the managed fields have not changed from the transformed version,
|
||||
// there is no patch to perform
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Create a patch with a diff between old and new objects.
|
||||
// Just include all managed fields since that is only thing that will change
|
||||
//
|
||||
// Also include test for RV to avoid race condition
|
||||
jsonPatch := []map[string]interface{}{
|
||||
{
|
||||
"op": "replace",
|
||||
"path": "/metadata/managedFields",
|
||||
"value": filteredManagers,
|
||||
},
|
||||
{
|
||||
// Use "replace" instead of "test" operation so that etcd rejects with
|
||||
// 409 conflict instead of apiserver with an invalid request
|
||||
"op": "replace",
|
||||
"path": "/metadata/resourceVersion",
|
||||
"value": accessor.GetResourceVersion(),
|
||||
},
|
||||
}
|
||||
|
||||
return json.Marshal(jsonPatch)
|
||||
}
|
||||
|
||||
// Returns a copy of the provided managed fields that has been migrated from
|
||||
// client-side-apply to server-side-apply, or an error if there was an issue
|
||||
func upgradedManagedFields(
|
||||
managedFields []metav1.ManagedFieldsEntry,
|
||||
csaManagerName string,
|
||||
ssaManagerName string,
|
||||
opts options,
|
||||
) ([]metav1.ManagedFieldsEntry, error) {
|
||||
if managedFields == nil {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Create managed fields clone since we modify the values
|
||||
managedFieldsCopy := make([]metav1.ManagedFieldsEntry, len(managedFields))
|
||||
if copy(managedFieldsCopy, managedFields) != len(managedFields) {
|
||||
return nil, errors.New("failed to copy managed fields")
|
||||
}
|
||||
managedFields = managedFieldsCopy
|
||||
|
||||
// Locate SSA manager
|
||||
replaceIndex, managerExists := findFirstIndex(managedFields,
|
||||
func(entry metav1.ManagedFieldsEntry) bool {
|
||||
return entry.Manager == ssaManagerName &&
|
||||
entry.Operation == metav1.ManagedFieldsOperationApply &&
|
||||
entry.Subresource == opts.subresource
|
||||
})
|
||||
|
||||
if !managerExists {
|
||||
// SSA manager does not exist. Find the most recent matching CSA manager,
|
||||
// convert it to an SSA manager.
|
||||
//
|
||||
// (find first index, since managed fields are sorted so that most recent is
|
||||
// first in the list)
|
||||
replaceIndex, managerExists = findFirstIndex(managedFields,
|
||||
func(entry metav1.ManagedFieldsEntry) bool {
|
||||
return entry.Manager == csaManagerName &&
|
||||
entry.Operation == metav1.ManagedFieldsOperationUpdate &&
|
||||
entry.Subresource == opts.subresource
|
||||
})
|
||||
|
||||
if !managerExists {
|
||||
// There are no CSA managers that need to be converted. Nothing to do
|
||||
// Return early
|
||||
return managedFields, nil
|
||||
}
|
||||
|
||||
// Convert CSA manager into SSA manager
|
||||
managedFields[replaceIndex].Operation = metav1.ManagedFieldsOperationApply
|
||||
managedFields[replaceIndex].Manager = ssaManagerName
|
||||
}
|
||||
err := unionManagerIntoIndex(managedFields, replaceIndex, csaManagerName, opts)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Create version of managed fields which has no CSA managers with the given name
|
||||
filteredManagers := filter(managedFields, func(entry metav1.ManagedFieldsEntry) bool {
|
||||
return !(entry.Manager == csaManagerName &&
|
||||
entry.Operation == metav1.ManagedFieldsOperationUpdate &&
|
||||
entry.Subresource == opts.subresource)
|
||||
})
|
||||
|
||||
return filteredManagers, nil
|
||||
}
|
||||
|
||||
// Locates an Update manager entry named `csaManagerName` with the same APIVersion
|
||||
// as the manager at the targetIndex. Unions both manager's fields together
|
||||
// into the manager specified by `targetIndex`. No other managers are modified.
|
||||
func unionManagerIntoIndex(
|
||||
entries []metav1.ManagedFieldsEntry,
|
||||
targetIndex int,
|
||||
csaManagerName string,
|
||||
opts options,
|
||||
) error {
|
||||
ssaManager := entries[targetIndex]
|
||||
|
||||
// find Update manager of same APIVersion, union ssa fields with it.
|
||||
// discard all other Update managers of the same name
|
||||
csaManagerIndex, csaManagerExists := findFirstIndex(entries,
|
||||
func(entry metav1.ManagedFieldsEntry) bool {
|
||||
return entry.Manager == csaManagerName &&
|
||||
entry.Operation == metav1.ManagedFieldsOperationUpdate &&
|
||||
entry.Subresource == opts.subresource &&
|
||||
entry.APIVersion == ssaManager.APIVersion
|
||||
})
|
||||
|
||||
targetFieldSet, err := decodeManagedFieldsEntrySet(ssaManager)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to convert fields to set: %w", err)
|
||||
}
|
||||
|
||||
combinedFieldSet := &targetFieldSet
|
||||
|
||||
// Union the csa manager with the existing SSA manager. Do nothing if
|
||||
// there was no good candidate found
|
||||
if csaManagerExists {
|
||||
csaManager := entries[csaManagerIndex]
|
||||
|
||||
csaFieldSet, err := decodeManagedFieldsEntrySet(csaManager)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to convert fields to set: %w", err)
|
||||
}
|
||||
|
||||
combinedFieldSet = combinedFieldSet.Union(&csaFieldSet)
|
||||
}
|
||||
|
||||
// Encode the fields back to the serialized format
|
||||
err = encodeManagedFieldsEntrySet(&entries[targetIndex], *combinedFieldSet)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to encode field set: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func findFirstIndex[T any](
|
||||
collection []T,
|
||||
predicate func(T) bool,
|
||||
) (int, bool) {
|
||||
for idx, entry := range collection {
|
||||
if predicate(entry) {
|
||||
return idx, true
|
||||
}
|
||||
}
|
||||
|
||||
return -1, false
|
||||
}
|
||||
|
||||
func filter[T any](
|
||||
collection []T,
|
||||
predicate func(T) bool,
|
||||
) []T {
|
||||
result := make([]T, 0, len(collection))
|
||||
|
||||
for _, value := range collection {
|
||||
if predicate(value) {
|
||||
result = append(result, value)
|
||||
}
|
||||
}
|
||||
|
||||
if len(result) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// Included from fieldmanager.internal to avoid dependency cycle
|
||||
// FieldsToSet creates a set paths from an input trie of fields
|
||||
func decodeManagedFieldsEntrySet(f metav1.ManagedFieldsEntry) (s fieldpath.Set, err error) {
|
||||
err = s.FromJSON(bytes.NewReader(f.FieldsV1.Raw))
|
||||
return s, err
|
||||
}
|
||||
|
||||
// SetToFields creates a trie of fields from an input set of paths
|
||||
func encodeManagedFieldsEntrySet(f *metav1.ManagedFieldsEntry, s fieldpath.Set) (err error) {
|
||||
f.FieldsV1.Raw, err = s.ToJSON()
|
||||
return err
|
||||
}
|
||||
-4
@@ -1,4 +0,0 @@
|
||||
# See the OWNERS docs at https://go.k8s.io/owners
|
||||
|
||||
reviewers:
|
||||
- caesarxuchao
|
||||
-105
@@ -1,105 +0,0 @@
|
||||
/*
|
||||
Copyright 2016 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package retry
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
)
|
||||
|
||||
// DefaultRetry is the recommended retry for a conflict where multiple clients
|
||||
// are making changes to the same resource.
|
||||
var DefaultRetry = wait.Backoff{
|
||||
Steps: 5,
|
||||
Duration: 10 * time.Millisecond,
|
||||
Factor: 1.0,
|
||||
Jitter: 0.1,
|
||||
}
|
||||
|
||||
// DefaultBackoff is the recommended backoff for a conflict where a client
|
||||
// may be attempting to make an unrelated modification to a resource under
|
||||
// active management by one or more controllers.
|
||||
var DefaultBackoff = wait.Backoff{
|
||||
Steps: 4,
|
||||
Duration: 10 * time.Millisecond,
|
||||
Factor: 5.0,
|
||||
Jitter: 0.1,
|
||||
}
|
||||
|
||||
// OnError allows the caller to retry fn in case the error returned by fn is retriable
|
||||
// according to the provided function. backoff defines the maximum retries and the wait
|
||||
// interval between two retries.
|
||||
func OnError(backoff wait.Backoff, retriable func(error) bool, fn func() error) error {
|
||||
var lastErr error
|
||||
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
|
||||
err := fn()
|
||||
switch {
|
||||
case err == nil:
|
||||
return true, nil
|
||||
case retriable(err):
|
||||
lastErr = err
|
||||
return false, nil
|
||||
default:
|
||||
return false, err
|
||||
}
|
||||
})
|
||||
if wait.Interrupted(err) {
|
||||
err = lastErr
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// RetryOnConflict is used to make an update to a resource when you have to worry about
|
||||
// conflicts caused by other code making unrelated updates to the resource at the same
|
||||
// time. fn should fetch the resource to be modified, make appropriate changes to it, try
|
||||
// to update it, and return (unmodified) the error from the update function. On a
|
||||
// successful update, RetryOnConflict will return nil. If the update function returns a
|
||||
// "Conflict" error, RetryOnConflict will wait some amount of time as described by
|
||||
// backoff, and then try again. On a non-"Conflict" error, or if it retries too many times
|
||||
// and gives up, RetryOnConflict will return an error to the caller.
|
||||
//
|
||||
// err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
|
||||
// // Fetch the resource here; you need to refetch it on every try, since
|
||||
// // if you got a conflict on the last update attempt then you need to get
|
||||
// // the current version before making your own changes.
|
||||
// pod, err := c.Pods("mynamespace").Get(name, metav1.GetOptions{})
|
||||
// if err != nil {
|
||||
// return err
|
||||
// }
|
||||
//
|
||||
// // Make whatever updates to the resource are needed
|
||||
// pod.Status.Phase = v1.PodFailed
|
||||
//
|
||||
// // Try to update
|
||||
// _, err = c.Pods("mynamespace").UpdateStatus(pod)
|
||||
// // You have to return err itself here (not wrapped inside another error)
|
||||
// // so that RetryOnConflict can identify it correctly.
|
||||
// return err
|
||||
// })
|
||||
// if err != nil {
|
||||
// // May be conflict if max retries were hit, or may be something unrelated
|
||||
// // like permissions or a network error
|
||||
// return err
|
||||
// }
|
||||
// ...
|
||||
//
|
||||
// TODO: Make Backoff an interface?
|
||||
func RetryOnConflict(backoff wait.Backoff, fn func() error) error {
|
||||
return OnError(backoff, errors.IsConflict, fn)
|
||||
}
|
||||
-99
@@ -1,99 +0,0 @@
|
||||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package watchlist
|
||||
|
||||
import (
|
||||
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
|
||||
metainternalversionvalidation "k8s.io/apimachinery/pkg/apis/meta/internalversion/validation"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
clientfeatures "k8s.io/client-go/features"
|
||||
"k8s.io/utils/ptr"
|
||||
)
|
||||
|
||||
var scheme = runtime.NewScheme()
|
||||
|
||||
func init() {
|
||||
utilruntime.Must(metainternalversion.AddToScheme(scheme))
|
||||
}
|
||||
|
||||
// PrepareWatchListOptionsFromListOptions creates a new ListOptions
|
||||
// that can be used for a watch-list request from the given listOptions.
|
||||
//
|
||||
// This function also determines if the given listOptions can be used to form a watch-list request,
|
||||
// which would result in streaming semantically equivalent data from the server.
|
||||
func PrepareWatchListOptionsFromListOptions(listOptions metav1.ListOptions) (metav1.ListOptions, bool, error) {
|
||||
if !clientfeatures.FeatureGates().Enabled(clientfeatures.WatchListClient) {
|
||||
return metav1.ListOptions{}, false, nil
|
||||
}
|
||||
|
||||
internalListOptions := &metainternalversion.ListOptions{}
|
||||
if err := scheme.Convert(&listOptions, internalListOptions, nil); err != nil {
|
||||
return metav1.ListOptions{}, false, err
|
||||
}
|
||||
if errs := metainternalversionvalidation.ValidateListOptions(internalListOptions, true); len(errs) > 0 {
|
||||
return metav1.ListOptions{}, false, nil
|
||||
}
|
||||
|
||||
watchListOptions := listOptions
|
||||
// this is our legacy case, the cache ignores LIMIT for
|
||||
// ResourceVersion == 0 and RVM=unset|NotOlderThan
|
||||
if listOptions.Limit > 0 && listOptions.ResourceVersion != "0" {
|
||||
return metav1.ListOptions{}, false, nil
|
||||
}
|
||||
watchListOptions.Limit = 0
|
||||
|
||||
// to ensure that we can create a watch-list request that returns
|
||||
// semantically equivalent data for the given listOptions,
|
||||
// we need to validate that the RVM for the list is supported by watch-list requests.
|
||||
if listOptions.ResourceVersionMatch == metav1.ResourceVersionMatchExact {
|
||||
return metav1.ListOptions{}, false, nil
|
||||
}
|
||||
watchListOptions.ResourceVersionMatch = metav1.ResourceVersionMatchNotOlderThan
|
||||
|
||||
watchListOptions.Watch = true
|
||||
watchListOptions.AllowWatchBookmarks = true
|
||||
watchListOptions.SendInitialEvents = ptr.To(true)
|
||||
|
||||
internalWatchListOptions := &metainternalversion.ListOptions{}
|
||||
if err := scheme.Convert(&watchListOptions, internalWatchListOptions, nil); err != nil {
|
||||
return metav1.ListOptions{}, false, err
|
||||
}
|
||||
if errs := metainternalversionvalidation.ValidateListOptions(internalWatchListOptions, true); len(errs) > 0 {
|
||||
return metav1.ListOptions{}, false, nil
|
||||
}
|
||||
|
||||
return watchListOptions, true, nil
|
||||
}
|
||||
|
||||
type unSupportedWatchListSemantics interface {
|
||||
IsWatchListSemanticsUnSupported() bool
|
||||
}
|
||||
|
||||
// DoesClientNotSupportWatchListSemantics reports whether the given client
|
||||
// does NOT support WatchList semantics.
|
||||
//
|
||||
// A client does NOT support WatchList only if
|
||||
// it implements `IsWatchListSemanticsUnSupported` and that returns true.
|
||||
func DoesClientNotSupportWatchListSemantics(client any) bool {
|
||||
lw, ok := client.(unSupportedWatchListSemantics)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
return lw.IsWatchListSemanticsUnSupported()
|
||||
}
|
||||
Reference in New Issue
Block a user