From 82dceaba75fd507c3a4fc71da43e316f30c1b5d0 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Fri, 8 Mar 2024 11:11:40 +0100 Subject: [PATCH] Fix progress notification on watch opened on the current revision that doesn't match watch range. When implementing the fix for progress notifications (https://github.com/etcd-io/etcd/pull/15237) we made a incorrect assumption that that unsynched watches will always get at least one event. Unsynched watches include not only slow watchers, but also newly created watches that requested old revision. In case that non of the events match watch filter, those newly created watches might become synched without any event going through. Signed-off-by: Marek Siarkowicz --- server/etcdserver/api/v3rpc/watch.go | 20 +------------ tests/integration/v3_watch_test.go | 45 ++++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 19 deletions(-) diff --git a/server/etcdserver/api/v3rpc/watch.go b/server/etcdserver/api/v3rpc/watch.go index 3b82fe8d848b..9b28319806ba 100644 --- a/server/etcdserver/api/v3rpc/watch.go +++ b/server/etcdserver/api/v3rpc/watch.go @@ -145,10 +145,6 @@ type serverWatchStream struct { // records fragmented watch IDs fragment map[mvcc.WatchID]bool - // indicates whether we have an outstanding global progress - // notification to send - deferredProgress bool - // closec indicates the stream is closed. closec chan struct{} @@ -178,8 +174,6 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) { prevKV: make(map[mvcc.WatchID]bool), fragment: make(map[mvcc.WatchID]bool), - deferredProgress: false, - closec: make(chan struct{}), } @@ -367,14 +361,7 @@ func (sws *serverWatchStream) recvLoop() error { case *pb.WatchRequest_ProgressRequest: if uv.ProgressRequest != nil { sws.mu.Lock() - // Ignore if deferred progress notification is already in progress - if !sws.deferredProgress { - // Request progress for all watchers, - // force generation of a response - if !sws.watchStream.RequestProgressAll() { - sws.deferredProgress = true - } - } + sws.watchStream.RequestProgressAll() sws.mu.Unlock() } default: @@ -483,11 +470,6 @@ func (sws *serverWatchStream) sendLoop() { // elide next progress update if sent a key update sws.progress[wresp.WatchID] = false } - if sws.deferredProgress { - if sws.watchStream.RequestProgressAll() { - sws.deferredProgress = false - } - } sws.mu.Unlock() case c, ok := <-sws.ctrlStream: diff --git a/tests/integration/v3_watch_test.go b/tests/integration/v3_watch_test.go index 3d094c572814..6a934c41c0c3 100644 --- a/tests/integration/v3_watch_test.go +++ b/tests/integration/v3_watch_test.go @@ -1467,3 +1467,48 @@ func TestV3WatchProgressWaitsForSync(t *testing.T) { t.Fatal("Wrong revision in progress notification!") } } + +func TestV3WatchProgressForWatchOnCurrentRevision(t *testing.T) { + if integration.ThroughProxy { + t.Skip("grpc proxy currently does not support requesting progress notifications") + } + integration.BeforeTest(t) + + clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + client := clus.RandClient() + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + resp, err := client.Put(ctx, "bar", "1") + require.NoError(t, err) + + wch := client.Watch(ctx, "foo", clientv3.WithRev(resp.Header.Revision)) + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + err = client.RequestProgress(ctx) + require.NoError(t, err) + gotProgressNotification := false + for { + select { + case <-ticker.C: + err := client.RequestProgress(ctx) + require.NoError(t, err) + case resp := <-wch: + if resp.Err() != nil { + t.Fatal(fmt.Errorf("watch error: %w", resp.Err())) + } + if resp.IsProgressNotify() { + gotProgressNotification = true + } + } + if gotProgressNotification { + break + } + } + if !gotProgressNotification { + t.Fatal("Expected to get progress notification") + } +}