Skip to content

Commit cdd34ba

Browse files
[#583][Delta] Ensure resources are properly sent again if envoy unsubscribes then subscribes again to a resource
1 parent af7a06d commit cdd34ba

File tree

2 files changed

+45
-2
lines changed

2 files changed

+45
-2
lines changed

pkg/server/delta/v3/server.go

+5
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,11 @@ func (s *server) unsubscribe(resources []string, streamState *stream.StreamState
245245
// * detect the version change, and return the resource (as an update)
246246
// * detect the resource deletion, and set it as removed in the response
247247
streamState.GetKnownResources()[resource] = ""
248+
} else {
249+
// Clean-up the state version for this resource.
250+
// This addresses https://github.com/envoyproxy/go-control-plane/issues/583, where a resource unsubscribed then subscribed again
251+
// is not sent again while envoy expects it.
252+
delete(streamState.GetKnownResources(), resource)
248253
}
249254
delete(sv, resource)
250255
}

pkg/server/v3/delta_test.go

+40-2
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"google.golang.org/grpc"
1111

1212
"github.com/stretchr/testify/assert"
13+
"github.com/stretchr/testify/require"
1314

1415
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
1516
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
@@ -467,7 +468,7 @@ func TestDeltaWildcardSubscriptions(t *testing.T) {
467468
},
468469
}
469470

470-
validateResponse := func(t *testing.T, replies <-chan *discovery.DeltaDiscoveryResponse, expectedResources []string, expectedRemovedResources []string) {
471+
validateResponse := func(t *testing.T, replies <-chan *discovery.DeltaDiscoveryResponse, expectedResources []string, expectedRemovedResources []string) string {
471472
t.Helper()
472473
select {
473474
case response := <-replies:
@@ -480,8 +481,10 @@ func TestDeltaWildcardSubscriptions(t *testing.T) {
480481
assert.ElementsMatch(t, names, expectedResources)
481482
assert.ElementsMatch(t, response.RemovedResources, expectedRemovedResources)
482483
}
484+
return response.Nonce
483485
case <-time.After(1 * time.Second):
484-
t.Fatalf("got no response")
486+
require.Fail(t, "got no response")
487+
return ""
485488
}
486489
}
487490

@@ -530,6 +533,41 @@ func TestDeltaWildcardSubscriptions(t *testing.T) {
530533

531534
})
532535

536+
t.Run("unsubscribed resources are sent again if re-subscribed later on and the version has not changed", func(t *testing.T) {
537+
resp := makeMockDeltaStream(t)
538+
defer close(resp.recv)
539+
s := server.NewServer(context.Background(), config, server.CallbackFuncs{})
540+
go func() {
541+
err := s.DeltaAggregatedResources(resp)
542+
assert.NoError(t, err)
543+
}()
544+
545+
resp.recv <- &discovery.DeltaDiscoveryRequest{
546+
Node: node,
547+
TypeUrl: rsrc.EndpointType,
548+
ResourceNamesSubscribe: []string{"endpoints0", "endpoints1"},
549+
}
550+
nonce := validateResponse(t, resp.sent, []string{"endpoints0", "endpoints1"}, nil)
551+
552+
// Unsubscribe from endpoints0
553+
resp.recv <- &discovery.DeltaDiscoveryRequest{
554+
Node: node,
555+
TypeUrl: rsrc.EndpointType,
556+
ResponseNonce: nonce,
557+
ResourceNamesUnsubscribe: []string{"endpoints0"},
558+
}
559+
// No reply is expected here
560+
561+
// Subscribe again to endpoints0
562+
resp.recv <- &discovery.DeltaDiscoveryRequest{
563+
Node: node,
564+
TypeUrl: rsrc.EndpointType,
565+
ResponseNonce: nonce,
566+
ResourceNamesSubscribe: []string{"endpoints0"},
567+
}
568+
validateResponse(t, resp.sent, []string{"endpoints0"}, nil)
569+
})
570+
533571
t.Run("* subscribtion/unsubscription support", func(t *testing.T) {
534572
resp := makeMockDeltaStream(t)
535573
defer close(resp.recv)

0 commit comments

Comments
 (0)