Skip to content

Commit

Permalink
fix go-control-plane instantiation order (#5972)
Browse files Browse the repository at this point in the history
Previously, the go-control-plane snapshot cache
was not instantiated and wired up to the snapshot
handler (via a snapshotter) until after the first
DAG run was completed and during the xDS server
startup. As a result, the first DAG run never
populated the go-control plane snapshot cache, so
the go-control-plane xDS server had nothing to
respond to initial discovery requests with.

This moves the wiring of the snapshot handler
to the snapshot cache up to where the snapshot
handler itself is instantiated, before any
runnables are started. It also removes the unneeded
snapshotter layer of indirection. The snapshot
handler now directly generates a go-control-plane
snapshot and stores it in the snapshot cache.

Signed-off-by: Steve Kriss <krisss@vmware.com>
  • Loading branch information
skriss authored Nov 17, 2023
1 parent 935a455 commit e3e0f34
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 106 deletions.
1 change: 1 addition & 0 deletions changelogs/unreleased/5972-skriss-small.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixes a bug with the `envoy` xDS server where at startup, xDS configuration would not be generated and served until a subsequent configuration change.
21 changes: 12 additions & 9 deletions cmd/contour/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ import (
"time"

"github.com/alecthomas/kingpin/v2"
envoy_cache_v3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
envoy_server_v3 "github.com/envoyproxy/go-control-plane/pkg/server/v3"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
networking_v1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
Expand All @@ -39,9 +41,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/manager/signals"
controller_runtime_metrics "sigs.k8s.io/controller-runtime/pkg/metrics"
gatewayapi_v1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"

controller_runtime_metrics_server "sigs.k8s.io/controller-runtime/pkg/metrics/server"
gatewayapi_v1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1"

contour_api_v1 "github.com/projectcontour/contour/apis/projectcontour/v1"
contour_api_v1alpha1 "github.com/projectcontour/contour/apis/projectcontour/v1alpha1"
Expand All @@ -64,7 +65,6 @@ import (
"github.com/projectcontour/contour/internal/xdscache"
xdscache_v3 "github.com/projectcontour/contour/internal/xdscache/v3"
"github.com/projectcontour/contour/pkg/config"
discoveryv1 "k8s.io/api/discovery/v1"
)

const (
Expand Down Expand Up @@ -482,8 +482,13 @@ func (s *Server) doServe() error {
}),
}

// snapshotHandler is used to produce new snapshots when the internal state changes for any xDS resource.
snapshotHandler := xdscache.NewSnapshotHandler(resources, s.log.WithField("context", "snapshotHandler"))
// snapshotHandler triggers go-control-plane Snapshots based on
// the contents of the Contour xDS caches after the DAG is built.
snapshotHandler := xdscache_v3.NewSnapshotHandler(
resources,
envoy_cache_v3.NewSnapshotCache(false, &contour_xds_v3.Hash, s.log.WithField("context", "snapshotCache")),
s.log.WithField("context", "snapshotHandler"),
)

// register observer for endpoints updates.
endpointHandler.SetObserver(contour.ComposeObservers(snapshotHandler))
Expand Down Expand Up @@ -872,7 +877,7 @@ type xdsServer struct {
log logrus.FieldLogger
registry *prometheus.Registry
config contour_api_v1alpha1.XDSServerConfig
snapshotHandler *xdscache.SnapshotHandler
snapshotHandler *xdscache_v3.SnapshotHandler
resources []xdscache.ResourceCache
initialDagBuilt func() bool
}
Expand All @@ -896,9 +901,7 @@ func (x *xdsServer) Start(ctx context.Context) error {

switch x.config.Type {
case contour_api_v1alpha1.EnvoyServerType:
v3cache := contour_xds_v3.NewSnapshotCache(false, log)
x.snapshotHandler.AddSnapshotter(v3cache)
contour_xds_v3.RegisterServer(envoy_server_v3.NewServer(ctx, v3cache, contour_xds_v3.NewRequestLoggingCallbacks(log)), grpcServer)
contour_xds_v3.RegisterServer(envoy_server_v3.NewServer(ctx, x.snapshotHandler.SnapshotCache, contour_xds_v3.NewRequestLoggingCallbacks(log)), grpcServer)
case contour_api_v1alpha1.ContourServerType:
contour_xds_v3.RegisterServer(contour_xds_v3.NewContourServer(log, xdscache.ResourcesOf(x.resources)...), grpcServer)
default:
Expand Down
12 changes: 7 additions & 5 deletions internal/xds/hash.go → internal/xds/v3/hash.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package xds
package v3

import (
envoy_config_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
Expand All @@ -20,15 +20,17 @@ import (
// nolint:revive
const CONSTANT_HASH_VALUE = "contour"

// ConstantHashV3 is a specialized node ID hasher used to allow
// ConstantHash is a specialized node ID hasher used to allow
// any instance of Envoy to connect to Contour regardless of the
// service-node flag configured on Envoy.
type ConstantHashV3 struct{}
type ConstantHash struct{}

func (c ConstantHashV3) ID(*envoy_config_v3.Node) string {
func (c ConstantHash) ID(*envoy_config_v3.Node) string {
return CONSTANT_HASH_VALUE
}

func (c ConstantHashV3) String() string {
func (c ConstantHash) String() string {
return CONSTANT_HASH_VALUE
}

var Hash = ConstantHash{}
56 changes: 0 additions & 56 deletions internal/xds/v3/snapshotter.go

This file was deleted.

66 changes: 30 additions & 36 deletions internal/xdscache/snapshot.go → internal/xdscache/v3/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,64 +11,56 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package xdscache
package v3

import (
"context"
"reflect"
"sync"

envoy_types "github.com/envoyproxy/go-control-plane/pkg/cache/types"
envoy_cache_v3 "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
envoy_resource_v3 "github.com/envoyproxy/go-control-plane/pkg/resource/v3"
"github.com/google/uuid"
"github.com/projectcontour/contour/internal/dag"
contour_xds_v3 "github.com/projectcontour/contour/internal/xds/v3"
"github.com/projectcontour/contour/internal/xdscache"
"github.com/sirupsen/logrus"
)

type Snapshotter interface {
Generate(version string, resources map[envoy_resource_v3.Type][]envoy_types.Resource) error
}

// SnapshotHandler implements the xDS snapshot cache
// by responding to the OnChange() event causing a new
// snapshot to be created.
// SnapshotHandler responds to DAG builds via the OnChange()
// event and generates and caches go-control-plane Snapshots.
type SnapshotHandler struct {
// resources holds the cache of xDS contents.
resources map[envoy_resource_v3.Type]ResourceCache
// SnapshotCache contains go-control-plane Snapshots
// and is used by the go-control-plane xDS server.
SnapshotCache envoy_cache_v3.SnapshotCache

snapshotters []Snapshotter
snapLock sync.Mutex

logrus.FieldLogger
// resources contains the Contour xDS resource caches.
resources map[envoy_resource_v3.Type]xdscache.ResourceCache
log logrus.FieldLogger
}

// NewSnapshotHandler returns an instance of SnapshotHandler.
func NewSnapshotHandler(resources []ResourceCache, logger logrus.FieldLogger) *SnapshotHandler {
func NewSnapshotHandler(resources []xdscache.ResourceCache, snapshotCache envoy_cache_v3.SnapshotCache, logger logrus.FieldLogger) *SnapshotHandler {
return &SnapshotHandler{
resources: parseResources(resources),
FieldLogger: logger,
resources: parseResources(resources),
SnapshotCache: snapshotCache,
log: logger,
}
}

func (s *SnapshotHandler) AddSnapshotter(snap Snapshotter) {
s.snapLock.Lock()
defer s.snapLock.Unlock()

s.snapshotters = append(s.snapshotters, snap)
}

// Refresh is called when the EndpointsTranslator updates values
// in its cache.
func (s *SnapshotHandler) Refresh() {
s.generateNewSnapshot()
}

// OnChange is called when the DAG is rebuilt and a new snapshot is needed.
func (s *SnapshotHandler) OnChange(_ *dag.DAG) {
func (s *SnapshotHandler) OnChange(*dag.DAG) {
s.generateNewSnapshot()
}

// generateNewSnapshot creates a new snapshot against
// the Contour XDS caches.
// generateNewSnapshot creates and caches a new go-control-plane
// Snapshot based on the contents of the Contour xDS resource caches.
func (s *SnapshotHandler) generateNewSnapshot() {
// Generate new snapshot version.
version := uuid.NewString()
Expand All @@ -83,13 +75,15 @@ func (s *SnapshotHandler) generateNewSnapshot() {
envoy_resource_v3.RuntimeType: asResources(s.resources[envoy_resource_v3.RuntimeType].Contents()),
}

s.snapLock.Lock()
defer s.snapLock.Unlock()
snapshot, err := envoy_cache_v3.NewSnapshot(version, resources)
if err != nil {
s.log.Errorf("failed to generate snapshot version %q: %s", version, err)
return
}

for _, snap := range s.snapshotters {
if err := snap.Generate(version, resources); err != nil {
s.Errorf("failed to generate snapshot version %q: %s", version, err)
}
if err := s.SnapshotCache.SetSnapshot(context.Background(), contour_xds_v3.Hash.String(), snapshot); err != nil {
s.log.Errorf("failed to store snapshot version %q: %s", version, err)
return
}
}

Expand All @@ -113,8 +107,8 @@ func asResources(messages any) []envoy_types.Resource {

// parseResources converts an []ResourceCache to a map[envoy_types.ResponseType]ResourceCache
// for faster indexing when creating new snapshots.
func parseResources(resources []ResourceCache) map[envoy_resource_v3.Type]ResourceCache {
resourceMap := make(map[envoy_resource_v3.Type]ResourceCache, len(resources))
func parseResources(resources []xdscache.ResourceCache) map[envoy_resource_v3.Type]xdscache.ResourceCache {
resourceMap := make(map[envoy_resource_v3.Type]xdscache.ResourceCache, len(resources))

for _, r := range resources {
resourceMap[r.TypeURL()] = r
Expand Down

0 comments on commit e3e0f34

Please sign in to comment.