diff --git a/server/etcdserver/api/v3rpc/watch.go b/server/etcdserver/api/v3rpc/watch.go index 3b82fe8d848b..9b28319806ba 100644 --- a/server/etcdserver/api/v3rpc/watch.go +++ b/server/etcdserver/api/v3rpc/watch.go @@ -145,10 +145,6 @@ type serverWatchStream struct { // records fragmented watch IDs fragment map[mvcc.WatchID]bool - // indicates whether we have an outstanding global progress - // notification to send - deferredProgress bool - // closec indicates the stream is closed. closec chan struct{} @@ -178,8 +174,6 @@ func (ws *watchServer) Watch(stream pb.Watch_WatchServer) (err error) { prevKV: make(map[mvcc.WatchID]bool), fragment: make(map[mvcc.WatchID]bool), - deferredProgress: false, - closec: make(chan struct{}), } @@ -367,14 +361,7 @@ func (sws *serverWatchStream) recvLoop() error { case *pb.WatchRequest_ProgressRequest: if uv.ProgressRequest != nil { sws.mu.Lock() - // Ignore if deferred progress notification is already in progress - if !sws.deferredProgress { - // Request progress for all watchers, - // force generation of a response - if !sws.watchStream.RequestProgressAll() { - sws.deferredProgress = true - } - } + sws.watchStream.RequestProgressAll() sws.mu.Unlock() } default: @@ -483,11 +470,6 @@ func (sws *serverWatchStream) sendLoop() { // elide next progress update if sent a key update sws.progress[wresp.WatchID] = false } - if sws.deferredProgress { - if sws.watchStream.RequestProgressAll() { - sws.deferredProgress = false - } - } sws.mu.Unlock() case c, ok := <-sws.ctrlStream: diff --git a/tests/integration/v3_watch_test.go b/tests/integration/v3_watch_test.go index 3d094c572814..6a934c41c0c3 100644 --- a/tests/integration/v3_watch_test.go +++ b/tests/integration/v3_watch_test.go @@ -1467,3 +1467,48 @@ func TestV3WatchProgressWaitsForSync(t *testing.T) { t.Fatal("Wrong revision in progress notification!") } } + +func TestV3WatchProgressForWatchOnCurrentRevision(t *testing.T) { + if integration.ThroughProxy { + t.Skip("grpc proxy currently does not support requesting progress notifications") + } + integration.BeforeTest(t) + + clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + client := clus.RandClient() + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + resp, err := client.Put(ctx, "bar", "1") + require.NoError(t, err) + + wch := client.Watch(ctx, "foo", clientv3.WithRev(resp.Header.Revision)) + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + err = client.RequestProgress(ctx) + require.NoError(t, err) + gotProgressNotification := false + for { + select { + case <-ticker.C: + err := client.RequestProgress(ctx) + require.NoError(t, err) + case resp := <-wch: + if resp.Err() != nil { + t.Fatal(fmt.Errorf("watch error: %w", resp.Err())) + } + if resp.IsProgressNotify() { + gotProgressNotification = true + } + } + if gotProgressNotification { + break + } + } + if !gotProgressNotification { + t.Fatal("Expected to get progress notification") + } +}