Skip to content

Commit

Permalink
fix(meshservice): use Protocol from the MeshService (#12709)
Browse files Browse the repository at this point in the history
## Motivation

We were using an incorrect way to retrieve protocol for a MeshService,
and that caused the configuration to not be applied

## Implementation information

Instead of using GetServiceProtocol, I changed the implementation to
retrieve the port protocol directly from MeshService and use it.
Additionally, I added an end-to-end (E2E) test to verify that
`idleTimeout` is correctly set. The test ensures the connection is
closed after 2 seconds, given that the default timeout is 1 hour.

## Supporting documentation

Fix #12702

---------

Signed-off-by: Lukasz Dziedziak <lukidzi@gmail.com>
  • Loading branch information
lukidzi authored Feb 7, 2025
1 parent 72107d3 commit 74af069
Show file tree
Hide file tree
Showing 5 changed files with 262 additions and 5 deletions.
3 changes: 3 additions & 0 deletions pkg/plugins/policies/core/xds/meshroute/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ func GenerateClusters(
// we only check TLS status for local service
// services that are synced can be accessed only with TLS through ZoneIngress
tlsReady = !ms.IsLocalMeshService() || ms.Status.TLS.Status == meshservice_api.TLSReady
if port, found := ms.FindPortByName(realResourceRef.Resource.SectionName); found {
protocol = port.AppProtocol
}
}
}
edsClusterBuilder.Configure(envoy_clusters.ClientSideMultiIdentitiesMTLS(
Expand Down
2 changes: 1 addition & 1 deletion pkg/plugins/policies/core/xds/meshroute/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func GetServiceProtocolPortFromRef(
}
port := uint32(mes.Spec.Match.Port)
service := mes.DestinationName(port)
protocol := meshCtx.GetServiceProtocol(service)
protocol := mes.Spec.Match.Protocol
return service, protocol, port, true
case common_api.MeshMultiZoneService:
ms, ok := meshCtx.MeshMultiZoneServiceByIdentifier[pointer.Deref(ref.Resource).ResourceIdentifier]
Expand Down
2 changes: 1 addition & 1 deletion test/e2e_env/kubernetes/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -844,7 +844,7 @@ apiVersion: kuma.io/v1alpha1
kind: MeshFaultInjection
metadata:
namespace: %s
name: mesh-fault-injecton
name: mesh-fault-injecton-gw
labels:
kuma.io/mesh: %s
spec:
Expand Down
204 changes: 201 additions & 3 deletions test/e2e_env/multizone/meshservice/policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,21 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1"
core_mesh "github.com/kumahq/kuma/pkg/core/resources/apis/mesh"
"github.com/kumahq/kuma/pkg/plugins/policies/meshhttproute/api/v1alpha1"
meshfaultinjection_api "github.com/kumahq/kuma/pkg/plugins/policies/meshfaultinjection/api/v1alpha1"
meshhealthcheck_api "github.com/kumahq/kuma/pkg/plugins/policies/meshhealthcheck/api/v1alpha1"
meshhttproute_api "github.com/kumahq/kuma/pkg/plugins/policies/meshhttproute/api/v1alpha1"
meshretry_api "github.com/kumahq/kuma/pkg/plugins/policies/meshretry/api/v1alpha1"
meshtcproute_api "github.com/kumahq/kuma/pkg/plugins/policies/meshtcproute/api/v1alpha1"
"github.com/kumahq/kuma/pkg/test/resources/samples"
"github.com/kumahq/kuma/test/framework"
. "github.com/kumahq/kuma/test/framework"
"github.com/kumahq/kuma/test/framework/client"
"github.com/kumahq/kuma/test/framework/deployments/testserver"
"github.com/kumahq/kuma/test/framework/envoy_admin"
"github.com/kumahq/kuma/test/framework/envoy_admin/stats"
"github.com/kumahq/kuma/test/framework/envoy_admin/tunnel"
"github.com/kumahq/kuma/test/framework/envs/multizone"
)

Expand All @@ -25,7 +34,8 @@ func MeshServiceTargeting() {

BeforeAll(func() {
Expect(NewClusterSetup().
Install(MeshWithMeshServicesUniversal(meshName, "Everywhere")).
Install(ResourceUniversal(samples.MeshMTLSBuilder().WithName(meshName).WithMeshServicesEnabled(mesh_proto.Mesh_MeshServices_Everywhere).Build())).
Install(MeshTrafficPermissionAllowAllUniversal(meshName)).
Setup(multizone.Global)).To(Succeed())
Expect(WaitForMesh(meshName, multizone.Zones())).To(Succeed())

Expand Down Expand Up @@ -71,6 +81,19 @@ spec:
`, Config.KumaNamespace, addressSuffix))).
Setup(multizone.KubeZone1)
Expect(err).ToNot(HaveOccurred())

err = NewClusterSetup().
Install(NamespaceWithSidecarInjection(namespace)).
Install(testserver.Install(
testserver.WithName("test-server"),
testserver.WithMesh(meshName),
testserver.WithNamespace(namespace),
testserver.WithEchoArgs("echo", "--instance", "kube-test-server-2"),
)).Setup(multizone.KubeZone2)
Expect(err).ToNot(HaveOccurred())

// remove default retry policy
Expect(DeleteMeshResources(multizone.Global, meshName, meshretry_api.MeshRetryResourceTypeDescriptor)).To(Succeed())
})

AfterEachFailure(func() {
Expand All @@ -79,16 +102,37 @@ spec:
})

E2EAfterEach(func() {
Expect(DeleteMeshResources(multizone.KubeZone1, meshName, v1alpha1.MeshHTTPRouteResourceTypeDescriptor)).To(Succeed())
Expect(DeleteMeshResources(multizone.KubeZone1, meshName, meshhttproute_api.MeshHTTPRouteResourceTypeDescriptor)).To(Succeed())
Expect(DeleteMeshResources(multizone.KubeZone1, meshName, meshtcproute_api.MeshTCPRouteResourceTypeDescriptor)).To(Succeed())
Expect(DeleteMeshResources(multizone.KubeZone1, meshName, meshhealthcheck_api.MeshHealthCheckResourceTypeDescriptor)).To(Succeed())
Expect(DeleteMeshResources(multizone.KubeZone1, meshName, core_mesh.ExternalServiceResourceTypeDescriptor)).To(Succeed())
Expect(DeleteMeshResources(multizone.KubeZone2, meshName, meshfaultinjection_api.MeshFaultInjectionResourceTypeDescriptor)).To(Succeed())
})

E2EAfterAll(func() {
Expect(multizone.KubeZone1.TriggerDeleteNamespace(namespace)).To(Succeed())
Expect(multizone.KubeZone2.TriggerDeleteNamespace(namespace)).To(Succeed())
Expect(multizone.Global.DeleteMesh(meshName)).To(Succeed())
})

retryStat := func(admin envoy_admin.Tunnel) *stats.Stats {
s, err := admin.GetStats("cluster.real-resource-mesh_test-server_real-resource-ns_kuma-2_msvc_80.upstream_rq_retry_success")
Expect(err).ToNot(HaveOccurred())
return s
}

countResponseCodes := func(statusCode int) func(responses []client.FailureResponse) int {
return func(responses []client.FailureResponse) int {
count := 0
for _, r := range responses {
if r.ResponseCode == statusCode {
count++
}
}
return count
}
}

It("should configure URLRewrite", func() {
// when
Expect(YamlK8s(fmt.Sprintf(`
Expand Down Expand Up @@ -225,4 +269,158 @@ spec:
g.Expect(response.Instance).To(HavePrefix("second-test-server"))
}, "30s", "1s").Should(Succeed())
})

It("should configure MeshRetry for MeshService in another zone", func() {
Expect(YamlK8s(fmt.Sprintf(`
apiVersion: kuma.io/v1alpha1
kind: MeshFaultInjection
metadata:
name: mesh-fi-ms-another-zone
namespace: %s
labels:
kuma.io/mesh: %s
kuma.io/origin: zone
spec:
targetRef:
kind: Mesh
proxyTypes: ["Sidecar"]
from:
- targetRef:
kind: Mesh
default:
http:
- abort:
httpStatus: 503
percentage: "50"
`, Config.KumaNamespace, meshName))(multizone.KubeZone2)).To(Succeed())
// given
// create a tunnel to test-client admin
Expect(multizone.KubeZone1.PortForwardService("test-client", namespace, 9901)).To(Succeed())
portFwd := multizone.KubeZone1.GetPortForward("test-client")
tnl := tunnel.NewK8sEnvoyAdminTunnel(multizone.Global.GetTesting(), portFwd.ApiServerEndpoint)

// then
Eventually(func() ([]client.FailureResponse, error) {
return client.CollectResponsesAndFailures(
multizone.KubeZone1,
"test-client",
fmt.Sprintf("test-server.%s.svc.%s.mesh.local", namespace, Kuma2),
client.FromKubernetesPod(namespace, "test-client"),
client.WithNumberOfRequests(100),
)
}, "30s", "5s").Should(And(
HaveLen(100),
WithTransform(countResponseCodes(503), BeNumerically("~", 50, 15)),
WithTransform(countResponseCodes(200), BeNumerically("~", 50, 15)),
))

Eventually(func(g Gomega) {
g.Expect(retryStat(tnl)).To(stats.BeEqualZero())
}, "5s", "1s").Should(Succeed())

meshRetryPolicy := fmt.Sprintf(`
apiVersion: kuma.io/v1alpha1
kind: MeshRetry
metadata:
name: mr-for-ms-in-zone-2
namespace: %s
labels:
kuma.io/mesh: %s
kuma.io/origin: zone
spec:
targetRef:
kind: Mesh
to:
- targetRef:
kind: MeshService
labels:
kuma.io/display-name: test-server
kuma.io/zone: %s
default:
http:
numRetries: 6
retryOn: ["503"]
`, Config.KumaNamespace, meshName, Kuma2)
// and when a MeshRetry policy is applied
Expect(framework.YamlK8s(meshRetryPolicy)(multizone.KubeZone1)).To(Succeed())

// then
Eventually(func() ([]client.FailureResponse, error) {
return client.CollectResponsesAndFailures(
multizone.KubeZone1,
"test-client",
fmt.Sprintf("test-server.%s.svc.%s.mesh.local", namespace, Kuma2),
client.FromKubernetesPod(namespace, "test-client"),
client.WithNumberOfRequests(100),
)
}, "30s", "5s").Should(And(
HaveLen(100),
ContainElements(HaveField("ResponseCode", 200)),
))

// and
Expect(retryStat(tnl)).To(stats.BeGreaterThanZero())

// remove Policies
Expect(DeleteMeshPolicyOrError(multizone.KubeZone1, meshretry_api.MeshRetryResourceTypeDescriptor, "mr-for-ms-in-zone-2")).To(Succeed())
})

It("should mark MeshService from another zone as unhealthy if it doesn't reply on health checks", func() {
// check that test-server is healthy
Eventually(func(g Gomega) {
_, err := client.CollectEchoResponse(
multizone.KubeZone1,
"test-client",
fmt.Sprintf("test-server.%s.svc.%s.mesh.local", namespace, Kuma2),
client.FromKubernetesPod(namespace, "test-client"),
)
g.Expect(err).ToNot(HaveOccurred())
}, "30s", "1s").MustPassRepeatedly(3).Should(Succeed())

healthCheck := fmt.Sprintf(`
apiVersion: kuma.io/v1alpha1
kind: MeshHealthCheck
metadata:
name: mhc-for-ms-in-kuma-2
namespace: %s
labels:
kuma.io/mesh: %s
kuma.io/origin: zone
spec:
targetRef:
kind: Mesh
to:
- targetRef:
kind: MeshService
labels:
kuma.io/display-name: test-server
kuma.io/zone: %s
default:
interval: 3s
timeout: 2s
unhealthyThreshold: 3
healthyThreshold: 1
failTrafficOnPanic: true
noTrafficInterval: 1s
healthyPanicThreshold: 0
reuseConnection: true
http:
path: /are-you-healthy
expectedStatuses:
- 500`, Config.KumaNamespace, meshName, Kuma2)
// update HealthCheck policy to check for another status code
Expect(framework.YamlK8s(healthCheck)(multizone.KubeZone1)).To(Succeed())

// check that test-server is unhealthy
Eventually(func(g Gomega) {
response, err := client.CollectFailure(
multizone.KubeZone1,
"test-client",
fmt.Sprintf("test-server.%s.svc.%s.mesh.local", namespace, Kuma2),
client.FromKubernetesPod(namespace, "test-client"),
)
g.Expect(err).ToNot(HaveOccurred())
g.Expect(response.ResponseCode).To(Equal(503))
}, "30s", "1s").MustPassRepeatedly(3).Should(Succeed())
})
}
56 changes: 56 additions & 0 deletions test/e2e_env/multizone/meshtimeout/meshtimeout.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import (
. "github.com/kumahq/kuma/test/framework"
framework_client "github.com/kumahq/kuma/test/framework/client"
"github.com/kumahq/kuma/test/framework/deployments/testserver"
"github.com/kumahq/kuma/test/framework/envoy_admin"
"github.com/kumahq/kuma/test/framework/envoy_admin/stats"
"github.com/kumahq/kuma/test/framework/envoy_admin/tunnel"
"github.com/kumahq/kuma/test/framework/envs/multizone"
)

Expand Down Expand Up @@ -83,6 +86,7 @@ spec:
Expect(group.Wait()).To(Succeed())

Expect(DeleteMeshResources(multizone.Global, mesh, meshretry_api.MeshRetryResourceTypeDescriptor)).To(Succeed())
Expect(DeleteMeshResources(multizone.Global, mesh, meshtimeout_api.MeshTimeoutResourceTypeDescriptor)).To(Succeed())
})

AfterEachFailure(func() {
Expand All @@ -101,6 +105,12 @@ spec:
Expect(multizone.Global.DeleteMesh(mesh)).To(Succeed())
})

activeCxStat := func(admin envoy_admin.Tunnel) *stats.Stats {
s, err := admin.GetStats("cluster.multizone-meshtimeout_test-server_multizone-meshtimeout-ns_kuma-2_msvc_80.upstream_cx_active")
Expect(err).ToNot(HaveOccurred())
return s
}

It("should apply MeshTimeout policy to MeshHTTPRoute", func() {
// when
Expect(YamlUniversal(fmt.Sprintf(`
Expand Down Expand Up @@ -259,4 +269,50 @@ spec:
g.Expect(response.ResponseCode).To(Equal(504))
}, "30s", "1s").Should(Succeed())
})

It("should apply MeshTimeout policy on MeshService from other zone", func() {
// given
// create a tunnel to test-client admin
Expect(multizone.KubeZone1.PortForwardService("test-client", k8sZoneNamespace, 9901)).To(Succeed())
portFwd := multizone.KubeZone1.GetPortForward("test-client")
tnl := tunnel.NewK8sEnvoyAdminTunnel(multizone.Global.GetTesting(), portFwd.ApiServerEndpoint)

Eventually(func(g Gomega) {
_, err := framework_client.CollectEchoResponse(
multizone.KubeZone1, "test-client", "http://test-server.multizone-meshtimeout-ns.svc.kuma-2.mesh.local:80",
framework_client.FromKubernetesPod(k8sZoneNamespace, "test-client"),
)
g.Expect(err).ToNot(HaveOccurred())
}, "30s", "1s").MustPassRepeatedly(5).Should(Succeed())
// should have active connection
Consistently(func(g Gomega) {
g.Expect(activeCxStat(tnl)).To(stats.BeGreaterThanZero())
}, "5s", "1s").Should(Succeed())

Expect(YamlUniversal(fmt.Sprintf(`
type: MeshTimeout
name: mesh-timeout-ms-kuma-2
mesh: %s
spec:
targetRef:
kind: Mesh
to:
- targetRef:
kind: Mesh
default:
idleTimeout: 2s
`, mesh))(multizone.Global)).To(Succeed())

Eventually(func(g Gomega) {
_, err := framework_client.CollectEchoResponse(
multizone.KubeZone1, "test-client", "http://test-server.multizone-meshtimeout-ns.svc.kuma-2.mesh.local:80",
framework_client.FromKubernetesPod(k8sZoneNamespace, "test-client"),
)
g.Expect(err).ToNot(HaveOccurred())
}, "30s", "1s").MustPassRepeatedly(5).Should(Succeed())
// should close the connection shortly after
Eventually(func(g Gomega) {
g.Expect(activeCxStat(tnl)).To(stats.BeEqualZero())
}, "30s", "1s").Should(Succeed())
})
}

0 comments on commit 74af069

Please sign in to comment.