Skip to content

Commit

Permalink
fix healthcheck server repeatedly update instance ready/unready.
Browse files Browse the repository at this point in the history
Signed-off-by: morvencao <lcao@redhat.com>
  • Loading branch information
morvencao committed Jan 13, 2025
1 parent c9a36e1 commit b160af1
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 2 deletions.
4 changes: 2 additions & 2 deletions cmd/maestro/server/healthcheck_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,15 @@ func (s *HealthCheckServer) checkInstances(ctx context.Context) {
inactiveInstanceIDs := []string{}
for _, instance := range instances {
// Instances pulsing within the last three check intervals are considered as active.
if instance.LastHeartbeat.After(time.Now().Add(time.Duration(int(-3*time.Second) * s.heartbeatInterval))) {
if instance.LastHeartbeat.After(time.Now().Add(time.Duration(int(-3*time.Second)*s.heartbeatInterval))) && !instance.Ready {
if s.brokerType == "mqtt" {
if err := s.statusDispatcher.OnInstanceUp(instance.ID); err != nil {
klog.Errorf("Error to call OnInstanceUp handler for maestro instance %s: %s", instance.ID, err.Error())
}
}
// mark the instance as active after it is added to the status dispatcher
activeInstanceIDs = append(activeInstanceIDs, instance.ID)
} else {
} else if instance.LastHeartbeat.Before(time.Now().Add(time.Duration(int(-3*time.Second)*s.heartbeatInterval))) && instance.Ready {
if s.brokerType == "mqtt" {
if err := s.statusDispatcher.OnInstanceDown(instance.ID); err != nil {
klog.Errorf("Error to call OnInstanceDown handler for maestro instance %s: %s", instance.ID, err.Error())
Expand Down
89 changes: 89 additions & 0 deletions test/integration/healthcheck_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package integration

import (
"context"
"fmt"
"testing"
"time"

. "github.com/onsi/gomega"
"github.com/openshift-online/maestro/pkg/api"
"github.com/openshift-online/maestro/pkg/dao"
"github.com/openshift-online/maestro/test"
prommodel "github.com/prometheus/client_model/go"
"k8s.io/apimachinery/pkg/util/rand"
)

func TestHealthCheckServer(t *testing.T) {
h, _ := test.RegisterIntegration(t)
ctx, cancel := context.WithCancel(context.Background())
defer func() {
cancel()
}()

instanceDao := dao.NewInstanceDao(&h.Env().Database.SessionFactory)
// insert one existing instances
_, err := instanceDao.Create(ctx, &api.ServerInstance{
Meta: api.Meta{
ID: "instance1",
},
LastHeartbeat: time.Now(),
Ready: true,
})
Expect(err).NotTo(HaveOccurred())

// create a consumer
clusterName := "cluster-" + rand.String(5)
_ = h.CreateConsumer(clusterName)

instanceID := &h.Env().Config.MessageBroker.ClientID
Eventually(func() error {
instances, err := instanceDao.All(ctx)
if err != nil {
return err
}

if len(instances) != 2 {
return fmt.Errorf("expected 1 instance, got %d", len(instances))
}

var instance *api.ServerInstance
for _, i := range instances {
if i.ID == *instanceID {
instance = i
}
}

if instance.LastHeartbeat.IsZero() {
return fmt.Errorf("expected instance.LastHeartbeat to be non-zero")
}

if !instance.Ready {
return fmt.Errorf("expected instance.Ready to be true")
}

if instance.ID != *instanceID {
return fmt.Errorf("expected instance.ID to be %s, got %s", *instanceID, instance.ID)
}

return nil
}, 10*time.Second, 1*time.Second).Should(Succeed())

if h.Broker != "grpc" {
// check the metrics to ensure only status resync request is sent for manifets and manifestbundles
time.Sleep(2 * time.Second)
families := getServerMetrics(t, "http://localhost:8080/metrics")
labels := []*prommodel.LabelPair{
{Name: strPtr("source"), Value: strPtr("maestro")},
{Name: strPtr("cluster"), Value: strPtr(clusterName)},
{Name: strPtr("type"), Value: strPtr("io.open-cluster-management.works.v1alpha1.manifests")},
}
checkServerCounterMetric(t, families, "cloudevents_sent_total", labels, 1.0)
labels = []*prommodel.LabelPair{
{Name: strPtr("source"), Value: strPtr("maestro")},
{Name: strPtr("cluster"), Value: strPtr(clusterName)},
{Name: strPtr("type"), Value: strPtr("io.open-cluster-management.works.v1alpha1.manifestbundles")},
}
checkServerCounterMetric(t, families, "cloudevents_sent_total", labels, 1.0)
}
}

0 comments on commit b160af1

Please sign in to comment.