diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go index e07d00d310418..9883136dbae10 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cache_watcher.go @@ -124,15 +124,20 @@ func (c *cacheWatcher) Stop() { // we rely on the fact that stopLocked is actually protected by Cacher.Lock() func (c *cacheWatcher) stopLocked() { + fmt.Printf("#### 4a groupResource=%v \n", c.groupResource) if !c.stopped { + fmt.Printf("#### 4b groupResource=%v \n", c.groupResource) c.stopped = true // stop without draining the input channel was requested. if !c.drainInputBuffer { + fmt.Printf("#### 4c groupResource=%v \n", c.groupResource) close(c.done) } + fmt.Printf("#### 4d groupResource=%v \n", c.groupResource) close(c.input) } + fmt.Printf("#### 4e groupResource=%v \n", c.groupResource) // Even if the watcher was already stopped, if it previously was // using draining mode and it's not using it now we need to // close the done channel now. Otherwise we could leak the @@ -140,8 +145,10 @@ func (c *cacheWatcher) stopLocked() { // into result channel, the channel will be full and there will // already be noone on the processing the events on the receiving end. if !c.drainInputBuffer && !c.isDoneChannelClosedLocked() { + fmt.Printf("#### 4f groupResource=%v \n", c.groupResource) close(c.done) } + fmt.Printf("#### 4g groupResource=%v \n", c.groupResource) } func (c *cacheWatcher) nonblockingAdd(event *watchCacheEvent) bool { diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go index 48791bd7b63b9..b6b6897daa5a2 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/cacher.go @@ -450,6 +450,8 @@ func NewCacherFromConfig(config Config) (*Cacher, error) { } func (c *Cacher) startCaching(stopChannel <-chan struct{}) { + fmt.Printf("#### 2a groupResource=%v startCaching\n", c.groupResource) + // The 'usable' lock is always 'RLock'able when it is safe to use the cache. // It is safe to use the cache after a successful list until a disconnection. // We start with usable (write) locked. The below OnReplace function will @@ -465,6 +467,7 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) { }) defer func() { if successfulList { + fmt.Printf("#### 2b groupResource=%v setting to false\n", c.groupResource) c.ready.set(false) } }() @@ -477,6 +480,7 @@ func (c *Cacher) startCaching(stopChannel <-chan struct{}) { if err := c.reflector.ListAndWatch(stopChannel); err != nil { klog.Errorf("cacher (%v): unexpected ListAndWatch error: %v; reinitializing...", c.groupResource.String(), err) } + fmt.Printf("#### 2e groupResource=%v exiting\n", c.groupResource) } // Versioner implements storage.Interface. @@ -536,17 +540,20 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions var readyGeneration int if utilfeature.DefaultFeatureGate.Enabled(features.ResilientWatchCacheInitialization) { + fmt.Printf("#### 1a groupResource=%v\n", c.groupResource) var ok bool readyGeneration, ok = c.ready.checkAndReadGeneration() if !ok { return nil, errors.NewTooManyRequests("storage is (re)initializing", 1) } } else { + fmt.Printf("#### 1b groupResource=%v\n", c.groupResource) readyGeneration, err = c.ready.waitAndReadGeneration(ctx) if err != nil { return nil, errors.NewServiceUnavailable(err.Error()) } } + fmt.Printf("#### 1c groupResource=%v, originalReadyGeneration=%d\n", c.groupResource, readyGeneration) // determine the namespace and name scope of the watch, first from the request, secondarily from the field selector scope := namespacedName{} @@ -654,17 +661,20 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions } addedWatcher := false + fmt.Printf("#### 1n groupResource=%v \n", c.groupResource) func() { c.Lock() defer c.Unlock() if generation, ok := c.ready.checkAndReadGeneration(); generation != readyGeneration || !ok { + fmt.Printf("#### 1o groupResource=%v currentReadyGeneration=%d, originalReadyGeneration=%d, ok=%v\n", c.groupResource, generation, readyGeneration, ok) // We went unready or are already on a different generation. // Avoid registering and starting the watch as it will have to be // terminated immediately anyway. return } + fmt.Printf("#### 1p groupResource=%v \n", c.groupResource) // Update watcher.forget function once we can compute it. watcher.forget = forgetWatcher(c, watcher, c.watcherIdx, scope, triggerValue, triggerSupported) // Update the bookMarkAfterResourceVersion @@ -672,14 +682,19 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions c.watchers.addWatcher(watcher, c.watcherIdx, scope, triggerValue, triggerSupported) addedWatcher = true + fmt.Printf("#### 1q groupResource=%v \n", c.groupResource) // Add it to the queue only when the client support watch bookmarks. if watcher.allowWatchBookmarks { c.bookmarkWatchers.addWatcherThreadUnsafe(watcher) } c.watcherIdx++ + + fmt.Printf("#### 1r groupResource=%v \n", c.groupResource) }() + fmt.Printf("#### 1s groupResource=%v \n", c.groupResource) if !addedWatcher { + fmt.Printf("#### 1x groupResource=%v returning the immediate closer thing\n", c.groupResource) // Watcher isn't really started at this point, so it's safe to just drop it. // // We're simulating the immediate watch termination, which boils down to simply @@ -687,6 +702,7 @@ func (c *Cacher) Watch(ctx context.Context, key string, opts storage.ListOptions return newImmediateCloseWatcher(), nil } + fmt.Printf("#### 1y groupResource=%v \n", c.groupResource) go watcher.processInterval(ctx, cacheInterval, requiredResourceVersion) return watcher, nil } @@ -1333,13 +1349,18 @@ func forgetWatcher(c *Cacher, w *cacheWatcher, index int, scope namespacedName, c.Lock() defer c.Unlock() + fmt.Printf("#### 3a groupResource=%v \n", c.groupResource) + w.setDrainInputBufferLocked(drainWatcher) + fmt.Printf("#### 3b groupResource=%v \n", c.groupResource) // It's possible that the watcher is already not in the structure (e.g. in case of // simultaneous Stop() and terminateAllWatchers(), but it is safe to call stopLocked() // on a watcher multiple times. c.watchers.deleteWatcher(index, scope, triggerValue, triggerSupported) + fmt.Printf("#### 3c groupResource=%v \n", c.groupResource) c.stopWatcherLocked(w) + fmt.Printf("#### 3d groupResource=%v \n", c.groupResource) } } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go index 012d6d585c9f1..10b9fd0e27d1f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/ready.go @@ -117,6 +117,7 @@ func (r *ready) checkAndReadGeneration() (int, bool) { func (r *ready) set(ok bool) { r.lock.Lock() defer r.lock.Unlock() + if r.state == Stopped { return }