Skip to content

Commit

Permalink
enable all integration testing on grpc broker.
Browse files Browse the repository at this point in the history
Signed-off-by: morvencao <lcao@redhat.com>
  • Loading branch information
morvencao committed Jan 9, 2025
1 parent e28d043 commit e68a3ad
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 71 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ test-integration-mqtt:
.PHONY: test-integration-mqtt

test-integration-grpc:
BROKER=grpc OCM_ENV=testing gotestsum --jsonfile-timing-events=$(grpc_integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h -run TestController \
BROKER=grpc OCM_ENV=testing gotestsum --jsonfile-timing-events=$(grpc_integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h $(TESTFLAGS) \
./test/integration
.PHONY: test-integration-grpc

Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -892,8 +892,6 @@ open-cluster-management.io/api v0.15.1-0.20241120090202-cb7ce98ab874 h1:WgkuYXTb
open-cluster-management.io/api v0.15.1-0.20241120090202-cb7ce98ab874/go.mod h1:9erZEWEn4bEqh0nIX2wA7f/s3KCuFycQdBrPrRzi0QM=
open-cluster-management.io/ocm v0.15.1-0.20241125065026-7a190f1a2b18 h1:sLg+JNnjMRE7WwLbzxURYRPD3ZZr/WHydcGC5/jOE/A=
open-cluster-management.io/ocm v0.15.1-0.20241125065026-7a190f1a2b18/go.mod h1:083SWgAjjvkc0TcOwf8wI8HCrCYaUWP860YTs+y8zXY=
open-cluster-management.io/sdk-go v0.15.1-0.20241125015855-1536c3970f8f h1:zeC7QrFNarfK2zY6jGtd+mX+yDrQQmnH/J8A7n5Nh38=
open-cluster-management.io/sdk-go v0.15.1-0.20241125015855-1536c3970f8f/go.mod h1:fi5WBsbC5K3txKb8eRLuP0Sim/Oqz/PHX18skAEyjiA=
open-cluster-management.io/sdk-go v0.15.1-0.20241224013925-71378a533f22 h1:w15NHc6cBfYxKHtF6zGLeQ1iTUqtN53sdONi9XXy5Xc=
open-cluster-management.io/sdk-go v0.15.1-0.20241224013925-71378a533f22/go.mod h1:fi5WBsbC5K3txKb8eRLuP0Sim/Oqz/PHX18skAEyjiA=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.3 h1:2770sDpzrjjsAtVhSeUFseziht227YAWYHLGNM8QPwY=
Expand Down
124 changes: 61 additions & 63 deletions test/integration/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"testing"
"time"

jsonpatch "github.com/evanphx/json-patch"
"github.com/google/uuid"
. "github.com/onsi/gomega"
prommodel "github.com/prometheus/client_model/go"
Expand All @@ -20,14 +21,14 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/rand"
workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1"
workv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/common"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload"

"github.com/openshift-online/maestro/pkg/api"
"github.com/openshift-online/maestro/pkg/api/openapi"
"github.com/openshift-online/maestro/pkg/client/cloudevents"
"github.com/openshift-online/maestro/pkg/dao"
"github.com/openshift-online/maestro/test"
)
Expand Down Expand Up @@ -78,9 +79,7 @@ func TestResourcePost(t *testing.T) {
h.StartControllerManager(ctx)
h.StartWorkAgent(ctx, consumer.Name, false)
clientHolder := h.WorkAgentHolder
informer := h.WorkAgentInformer
agentWorkClient := clientHolder.ManifestWorks(consumer.Name)
sourceClient := h.Env().Clients.CloudEventsSource

// POST responses per openapi spec: 201, 400, 409, 500

Expand Down Expand Up @@ -120,9 +119,8 @@ func TestResourcePost(t *testing.T) {
Expect(json.Unmarshal(work.Spec.Workload.Manifests[0].Raw, &manifest)).NotTo(HaveOccurred(), "Error unmarshalling manifest: %v", err)
Expect(manifest).To(Equal(res.Manifest))

newWork := work.DeepCopy()
statusFeedbackValue := `{"replicas":1,"availableReplicas":1,"readyReplicas":1,"updatedReplicas":1}`
newWork.Status = workv1.ManifestWorkStatus{
newWorkStatus := workv1.ManifestWorkStatus{
ResourceStatus: workv1.ManifestResourceStatus{
Manifests: []workv1.ManifestCondition{
{
Expand All @@ -148,12 +146,8 @@ func TestResourcePost(t *testing.T) {
},
}

// only update the status on the agent local part
Expect(informer.Informer().GetStore().Update(newWork)).NotTo(HaveOccurred())
// Resync the resource status
ceSourceClient, ok := sourceClient.(*cloudevents.SourceClientImpl)
Expect(ok).To(BeTrue())
Expect(ceSourceClient.CloudEventSourceClient.Resync(ctx, consumer.Name)).NotTo(HaveOccurred())
// update the work status
Expect(updateWorkStatus(ctx, agentWorkClient, work, newWorkStatus)).NotTo(HaveOccurred())

var newRes *openapi.Resource
Eventually(func() error {
Expand Down Expand Up @@ -187,16 +181,17 @@ func TestResourcePost(t *testing.T) {
Expect(contentStatus["readyReplicas"]).To(Equal(float64(1)))
Expect(contentStatus["updatedReplicas"]).To(Equal(float64(1)))

// check the metrics
time.Sleep(1 * time.Second)
families := getServerMetrics(t, "http://localhost:8080/metrics")
labels := []*prommodel.LabelPair{
{Name: strPtr("source"), Value: strPtr("maestro")},
{Name: strPtr("cluster"), Value: strPtr(clusterName)},
{Name: strPtr("type"), Value: strPtr("io.open-cluster-management.works.v1alpha1.manifests")},
if h.Broker != "grpc" {
time.Sleep(1 * time.Second)
families := getServerMetrics(t, "http://localhost:8080/metrics")
labels := []*prommodel.LabelPair{
{Name: strPtr("source"), Value: strPtr("maestro")},
{Name: strPtr("cluster"), Value: strPtr(clusterName)},
{Name: strPtr("type"), Value: strPtr("io.open-cluster-management.works.v1alpha1.manifests")},
}
checkServerCounterMetric(t, families, "cloudevents_sent_total", labels, 2.0)
checkServerCounterMetric(t, families, "cloudevents_received_total", labels, 2.0)
}
checkServerCounterMetric(t, families, "cloudevents_sent_total", labels, 3.0)
checkServerCounterMetric(t, families, "cloudevents_received_total", labels, 3.0)
}

func TestResourcePostWithoutName(t *testing.T) {
Expand Down Expand Up @@ -380,14 +375,6 @@ func TestResourcePatch(t *testing.T) {
openapi.ResourcePatchRequest{Version: &res.Version, Manifest: newRes.Manifest}).Execute()
Expect(err).To(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusConflict))

// check the metrics
families := getServerMetrics(t, "http://localhost:8080/metrics")
labels := []*prommodel.LabelPair{
{Name: strPtr("id"), Value: resource.Id},
{Name: strPtr("action"), Value: strPtr("update")},
}
checkServerCounterMetric(t, families, "resource_processed_total", labels, 1.0)
}

func TestResourcePaging(t *testing.T) {
Expand Down Expand Up @@ -467,7 +454,6 @@ func TestResourceBundleGet(t *testing.T) {
Expect(*resBundle.UpdatedAt).To(BeTemporally("~", resourceBundle.UpdatedAt))
Expect(*resBundle.Version).To(Equal(resourceBundle.Version))

// check the metrics
families := getServerMetrics(t, "http://localhost:8080/metrics")
labels := []*prommodel.LabelPair{
{Name: strPtr("method"), Value: strPtr("GET")},
Expand Down Expand Up @@ -503,7 +489,6 @@ func TestResourceBundleListSearch(t *testing.T) {
Expect(len(list.Items)).To(Equal(20))
Expect(list.Total).To(Equal(int32(20)))

// check the metrics
families := getServerMetrics(t, "http://localhost:8080/metrics")
labels := []*prommodel.LabelPair{
{Name: strPtr("method"), Value: strPtr("GET")},
Expand Down Expand Up @@ -612,7 +597,6 @@ func TestResourceFromGRPC(t *testing.T) {
h.StartControllerManager(ctx)
h.StartWorkAgent(ctx, consumer.Name, false)
clientHolder := h.WorkAgentHolder
informer := h.WorkAgentInformer
agentWorkClient := clientHolder.ManifestWorks(consumer.Name)

// use grpc client to create resource
Expand Down Expand Up @@ -659,9 +643,8 @@ func TestResourceFromGRPC(t *testing.T) {
Expect(json.Unmarshal(work.Spec.Workload.Manifests[0].Raw, &manifest)).NotTo(HaveOccurred(), "Error unmarshalling manifest: %v", err)

// update the resource
newWork := work.DeepCopy()
statusFeedbackValue := `{"observedGeneration":1,"replicas":1,"availableReplicas":1,"readyReplicas":1,"updatedReplicas":1}`
newWork.Status = workv1.ManifestWorkStatus{
newWorkStatus := workv1.ManifestWorkStatus{
ResourceStatus: workv1.ManifestResourceStatus{
Manifests: []workv1.ManifestCondition{
{
Expand All @@ -687,13 +670,8 @@ func TestResourceFromGRPC(t *testing.T) {
},
}

// only update the status on the agent local part
Expect(informer.Informer().GetStore().Update(newWork)).NotTo(HaveOccurred())

// Resync the resource status
ceSourceClient, ok := h.Env().Clients.CloudEventsSource.(*cloudevents.SourceClientImpl)
Expect(ok).To(BeTrue())
Expect(ceSourceClient.CloudEventSourceClient.Resync(ctx, consumer.Name)).NotTo(HaveOccurred())
// update the work status
Expect(updateWorkStatus(ctx, agentWorkClient, work, newWorkStatus)).NotTo(HaveOccurred())

Eventually(func() error {
newRes, err := h.Store.Get(res.ID)
Expand Down Expand Up @@ -782,19 +760,17 @@ func TestResourceFromGRPC(t *testing.T) {
}, 10*time.Second, 1*time.Second).Should(Succeed())

// no real kubernete environment, so need to update the resource status manually
deletingWork := work.DeepCopy()
deletingWork.Status = workv1.ManifestWorkStatus{
deletingWorkStatus := workv1.ManifestWorkStatus{
Conditions: []metav1.Condition{
{
Type: common.ManifestsDeleted,
Status: metav1.ConditionTrue,
},
},
}
// only update the status on the agent local part
Expect(informer.Informer().GetStore().Update(deletingWork)).NotTo(HaveOccurred())
// Resync the resource status
Expect(ceSourceClient.CloudEventSourceClient.Resync(ctx, consumer.Name)).NotTo(HaveOccurred())

// update the work status
Expect(updateWorkStatus(ctx, agentWorkClient, work, deletingWorkStatus)).NotTo(HaveOccurred())

Eventually(func() error {
resource, _, err = client.DefaultApi.ApiMaestroV1ResourcesIdGet(ctx, newRes.ID).Execute()
Expand All @@ -804,7 +780,6 @@ func TestResourceFromGRPC(t *testing.T) {
return nil
}, 10*time.Second, 1*time.Second).Should(Succeed())

// check the metrics
time.Sleep(1 * time.Second)
families := getServerMetrics(t, "http://localhost:8080/metrics")
labels := []*prommodel.LabelPair{
Expand Down Expand Up @@ -837,13 +812,15 @@ func TestResourceFromGRPC(t *testing.T) {
}
checkServerCounterMetric(t, families, "grpc_server_processed_total", labels, 0.0)

labels = []*prommodel.LabelPair{
{Name: strPtr("source"), Value: strPtr("maestro")},
{Name: strPtr("cluster"), Value: strPtr(clusterName)},
{Name: strPtr("type"), Value: strPtr("io.open-cluster-management.works.v1alpha1.manifestbundles")},
if h.Broker != "grpc" {
labels = []*prommodel.LabelPair{
{Name: strPtr("source"), Value: strPtr("maestro")},
{Name: strPtr("cluster"), Value: strPtr(clusterName)},
{Name: strPtr("type"), Value: strPtr("io.open-cluster-management.works.v1alpha1.manifestbundles")},
}
checkServerCounterMetric(t, families, "cloudevents_sent_total", labels, 1.0)
checkServerCounterMetric(t, families, "cloudevents_received_total", labels, 1.0)
}
checkServerCounterMetric(t, families, "cloudevents_sent_total", labels, 3.0)
checkServerCounterMetric(t, families, "cloudevents_received_total", labels, 3.0)
}

func TestResourceBundleFromGRPC(t *testing.T) {
Expand All @@ -863,7 +840,6 @@ func TestResourceBundleFromGRPC(t *testing.T) {
h.StartControllerManager(ctx)
h.StartWorkAgent(ctx, consumer.Name, true)
clientHolder := h.WorkAgentHolder
informer := h.WorkAgentInformer
agentWorkClient := clientHolder.ManifestWorks(consumer.Name)

// use grpc client to create resource bundle
Expand Down Expand Up @@ -897,9 +873,8 @@ func TestResourceBundleFromGRPC(t *testing.T) {
Expect(json.Unmarshal(work.Spec.Workload.Manifests[0].Raw, &manifest)).NotTo(HaveOccurred(), "Error unmarshalling manifest: %v", err)

// update the resource
newWork := work.DeepCopy()
statusFeedbackValue := `{"observedGeneration":1,"replicas":1,"availableReplicas":1,"readyReplicas":1,"updatedReplicas":1}`
newWork.Status = workv1.ManifestWorkStatus{
newWorkStatus := workv1.ManifestWorkStatus{
ResourceStatus: workv1.ManifestResourceStatus{
Manifests: []workv1.ManifestCondition{
{
Expand All @@ -925,13 +900,8 @@ func TestResourceBundleFromGRPC(t *testing.T) {
},
}

// only update the status on the agent local part
Expect(informer.Informer().GetStore().Update(newWork)).NotTo(HaveOccurred())

// Resync the resource status
ceSourceClient, ok := h.Env().Clients.CloudEventsSource.(*cloudevents.SourceClientImpl)
Expect(ok).To(BeTrue())
Expect(ceSourceClient.CloudEventSourceClient.Resync(ctx, consumer.Name)).NotTo(HaveOccurred())
// update the work status
Expect(updateWorkStatus(ctx, agentWorkClient, work, newWorkStatus)).NotTo(HaveOccurred())

Eventually(func() error {
newRes, err := h.Store.Get(res.ID)
Expand Down Expand Up @@ -1011,6 +981,34 @@ func TestResourceBundleFromGRPC(t *testing.T) {
Expect(list.Total).To(Equal(int32(1)))
}

func updateWorkStatus(ctx context.Context, workClient workv1client.ManifestWorkInterface, work *workv1.ManifestWork, newStatus workv1.ManifestWorkStatus) error {
// update the work status
newWork := work.DeepCopy()
newWork.Status = newStatus

oldData, err := json.Marshal(work)
if err != nil {
return err
}

newData, err := json.Marshal(newWork)
if err != nil {
return err
}

patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return err
}

_, err = workClient.Patch(ctx, work.Name, k8stypes.MergePatchType, patchBytes, metav1.PatchOptions{}, "status")
if err != nil {
return err
}

return nil
}

func contains(et api.EventType, events api.EventList) bool {
for _, e := range events {
if e.EventType == et {
Expand Down
8 changes: 3 additions & 5 deletions test/integration/status_hash_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ func TestEventServer(t *testing.T) {
h.StartControllerManager(ctx)
h.StartWorkAgent(ctx, consumer.Name, false)
clientHolder := h.WorkAgentHolder
informer := h.WorkAgentInformer
agentWorkClient := clientHolder.ManifestWorks(consumer.Name)
resourceService := h.Env().Services.Resources()

Expand All @@ -109,8 +108,7 @@ func TestEventServer(t *testing.T) {
Expect(work.Spec.Workload).NotTo(BeNil())
Expect(len(work.Spec.Workload.Manifests)).To(Equal(1))

newWork := work.DeepCopy()
newWork.Status = workv1.ManifestWorkStatus{
newWorkStatus := workv1.ManifestWorkStatus{
ResourceStatus: workv1.ManifestResourceStatus{
Manifests: []workv1.ManifestCondition{
{
Expand All @@ -125,8 +123,8 @@ func TestEventServer(t *testing.T) {
},
}

// only update the status on the agent local part
Expect(informer.Informer().GetStore().Update(newWork)).NotTo(HaveOccurred())
// update the work status to make sure the resource status is updated
Expect(updateWorkStatus(ctx, agentWorkClient, work, newWorkStatus)).To(Succeed())

// after the instance ("cluster") is stale, the current instance ("maestro") will take over the consumer
// and resync status, then the resource status will be updated finally
Expand Down

0 comments on commit e68a3ad

Please sign in to comment.