Skip to content

Commit

Permalink
UPSTREAM: <carry>: add audit annotations to track etcd state
Browse files Browse the repository at this point in the history
  • Loading branch information
deads2k committed Jan 13, 2025
1 parent 309f240 commit cccaf3c
Showing 1 changed file with 22 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package etcd3retry

import (
"context"
"k8s.io/apiserver/pkg/audit"
"strings"
"time"

etcdrpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
Expand Down Expand Up @@ -32,11 +34,18 @@ func NewRetryingEtcdStorage(delegate storage.Interface) storage.Interface {
return &retryClient{Interface: delegate}
}

func addEtcdAccessAuditAnnotation(ctx context.Context) {
// add an audit annotation indicating we reached out to etcd. This allows our post-processing to exclude requests
// that don't attempt to access etcd from, "how reliably is etcd" calculations.
audit.AddAuditAnnotation(ctx, "apiserver.internal.openshift.io/etcd-access", time.Now().Format(time.RFC3339))
}

// Create adds a new object at a key unless it already exists. 'ttl' is time-to-live
// in seconds (0 means forever). If no error is returned and out is not nil, out will be
// set to the read value from database.
func (c *retryClient) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
return OnError(ctx, DefaultRetry, IsRetriableEtcdError, func() error {
addEtcdAccessAuditAnnotation(ctx)
return c.Interface.Create(ctx, key, obj, out, ttl)
})
}
Expand All @@ -45,6 +54,7 @@ func (c *retryClient) Create(ctx context.Context, key string, obj, out runtime.O
// If key didn't exist, it will return NotFound storage error.
func (c *retryClient) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error {
return OnError(ctx, DefaultRetry, IsRetriableEtcdError, func() error {
addEtcdAccessAuditAnnotation(ctx)
return c.Interface.Delete(ctx, key, out, preconditions, validateDeletion, cachedExistingObject)
})
}
Expand All @@ -59,6 +69,7 @@ func (c *retryClient) Delete(ctx context.Context, key string, out runtime.Object
func (c *retryClient) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) {
var ret watch.Interface
err := OnError(ctx, DefaultRetry, IsRetriableEtcdError, func() error {
addEtcdAccessAuditAnnotation(ctx)
var innerErr error
ret, innerErr = c.Interface.Watch(ctx, key, opts)
return innerErr
Expand All @@ -73,6 +84,7 @@ func (c *retryClient) Watch(ctx context.Context, key string, opts storage.ListOp
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
func (c *retryClient) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error {
return OnError(ctx, DefaultRetry, IsRetriableEtcdError, func() error {
addEtcdAccessAuditAnnotation(ctx)
return c.Interface.Get(ctx, key, opts, objPtr)
})
}
Expand All @@ -85,6 +97,7 @@ func (c *retryClient) Get(ctx context.Context, key string, opts storage.GetOptio
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
func (c *retryClient) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
return OnError(ctx, DefaultRetry, IsRetriableEtcdError, func() error {
addEtcdAccessAuditAnnotation(ctx)
return c.Interface.GetList(ctx, key, opts, listObj)
})
}
Expand Down Expand Up @@ -126,6 +139,7 @@ func (c *retryClient) GetList(ctx context.Context, key string, opts storage.List
func (c *retryClient) GuaranteedUpdate(ctx context.Context, key string, destination runtime.Object, ignoreNotFound bool,
preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, cachedExistingObject runtime.Object) error {
return OnError(ctx, DefaultRetry, IsRetriableEtcdError, func() error {
addEtcdAccessAuditAnnotation(ctx)
return c.Interface.GuaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, cachedExistingObject)
})
}
Expand Down Expand Up @@ -153,6 +167,8 @@ func OnError(ctx context.Context, backoff wait.Backoff, retriable func(error) (s
var retry bool
var retryCounter int
err := backoffWithRequestContext(ctx, backoff, func() (bool, error) {
startTime := time.Now()

err := fn()
if retry {
klog.V(1).Infof("etcd retry - counter: %v, lastErrLabel: %s lastError: %v, error: %v", retryCounter, lastErrLabel, lastErr, err)
Expand All @@ -162,6 +178,12 @@ func OnError(ctx context.Context, backoff wait.Backoff, retriable func(error) (s
return true, nil
}

// add an audit annotation if we hit a no leader condition so we can track this failure in post-processing CI steps.
// We only mark the first time through. Hopefully there's enough traffic that it doesn't matter
if strings.Contains(err.Error(), "no leader") {
audit.AddAuditAnnotation(ctx, "apiserver.internal.openshift.io/no-leader", startTime.Format(time.RFC3339))
}

lastErrLabel, retry = retriable(err)
if retry {
lastErr = err
Expand Down

0 comments on commit cccaf3c

Please sign in to comment.