Skip to content

Commit

Permalink
add grpc broker integration test.
Browse files Browse the repository at this point in the history
Signed-off-by: morvencao <lcao@redhat.com>
  • Loading branch information
morvencao committed Dec 20, 2024
1 parent 2d0074d commit 1cf6559
Show file tree
Hide file tree
Showing 13 changed files with 300 additions and 117 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,5 @@ test/e2e/.consumer_name
test/e2e/.external_host_ip
test/e2e/report/*
unit-test-results.json
integration-test-results.json
*integration-test-results.json
test/e2e/setup/aro/aro-hcp
17 changes: 13 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ MQTT_IMAGE ?= docker.io/library/eclipse-mosquitto:2.0.18

# Test output files
unit_test_json_output ?= ${PWD}/unit-test-results.json
integration_test_json_output ?= ${PWD}/integration-test-results.json
mqtt_integration_test_json_output ?= ${PWD}/mqtt-integration-test-results.json
grpc_integration_test_json_output ?= ${PWD}/grpc-integration-test-results.json

# Prints a list of useful targets.
help:
Expand Down Expand Up @@ -218,11 +219,19 @@ test:
# make test-integration TESTFLAGS="-run TestAccounts" acts as TestAccounts* and run TestAccountsGet, TestAccountsPost, etc.
# make test-integration TESTFLAGS="-run TestAccountsGet" runs TestAccountsGet
# make test-integration TESTFLAGS="-short" skips long-run tests
test-integration:
OCM_ENV=testing gotestsum --jsonfile-timing-events=$(integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h $(TESTFLAGS) \
./test/integration
test-integration: test-integration-mqtt test-integration-grpc
.PHONY: test-integration

test-integration-mqtt:
BROKER=mqtt OCM_ENV=testing gotestsum --jsonfile-timing-events=$(mqtt_integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h $(TESTFLAGS) \
./test/integration
.PHONY: test-integration-mqtt

test-integration-grpc:
BROKER=grpc OCM_ENV=testing gotestsum --jsonfile-timing-events=$(grpc_integration_test_json_output) --format $(TEST_SUMMARY_FORMAT) -- -p 1 -ldflags -s -v -timeout 1h -run TestController \
./test/integration
.PHONY: test-integration-grpc

# Regenerate openapi client and models
generate:
rm -rf pkg/api/openapi
Expand Down
32 changes: 25 additions & 7 deletions cmd/maestro/server/grpc_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
// TODO: error handling to address errors beyond network issues.
if err := subServer.Send(pbEvt); err != nil {
klog.Errorf("failed to send grpc event, %v", err)
return err // return error to requeue the spec event
}

return nil
Expand Down Expand Up @@ -397,8 +398,8 @@ func (bkr *GRPCBroker) handleResEvent(ctx context.Context, eventID string, resou

// add the event instance record to mark the event has been processed by the current instance
if _, err := bkr.eventInstanceDao.Create(ctx, &api.EventInstance{
EventID: eventID,
InstanceID: bkr.instanceID,
SpecEventID: eventID,
InstanceID: bkr.instanceID,
}); err != nil {
return fmt.Errorf("failed to create event instance record %s: %s", eventID, err.Error())
}
Expand Down Expand Up @@ -461,9 +462,26 @@ func (bkr *GRPCBroker) PredicateEvent(ctx context.Context, eventID string) (bool
if err != nil {
return false, fmt.Errorf("failed to get event %s: %s", eventID, err.Error())
}
resource, err := bkr.resourceService.Get(ctx, evt.SourceID)
if err != nil {
return false, fmt.Errorf("failed to get resource %s: %s", evt.SourceID, err.Error())

// fast return if the event is already reconciled
if evt.ReconciledDate != nil {
return false, nil
}

resource, svcErr := bkr.resourceService.Get(ctx, evt.SourceID)
if svcErr != nil {
// if the resource is not found, it indicates the resource has been handled by
// other instances, so we can mark the event as reconciled and ignore it.
if svcErr.Is404() {
klog.V(10).Infof("The resource %s has been deleted, mark the event as reconciled", evt.SourceID)
now := time.Now()
evt.ReconciledDate = &now
if _, svcErr := bkr.eventService.Replace(ctx, evt); svcErr != nil {
return false, fmt.Errorf("failed to update event %s: %s", evt.ID, svcErr.Error())
}
return false, nil
}
return false, fmt.Errorf("failed to get resource %s: %s", evt.SourceID, svcErr.Error())
}

if bkr.IsConsumerSubscribed(resource.ConsumerName) {
Expand All @@ -473,8 +491,8 @@ func (bkr *GRPCBroker) PredicateEvent(ctx context.Context, eventID string) (bool
// if the consumer is not subscribed to the broker, then add the event instance record
// to indicate the event has been processed by the instance
if _, err := bkr.eventInstanceDao.Create(ctx, &api.EventInstance{
EventID: eventID,
InstanceID: bkr.instanceID,
SpecEventID: eventID,
InstanceID: bkr.instanceID,
}); err != nil {
return false, fmt.Errorf("failed to create event instance record %s: %s", eventID, err.Error())
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/api/event_instances.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package api

type EventInstance struct {
EventID string
InstanceID string
EventID string `gorm:"default:null"`
SpecEventID string `gorm:"default:null"`
InstanceID string
}

type EventInstanceList []*EventInstance
29 changes: 18 additions & 11 deletions pkg/controllers/event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,15 +126,26 @@ func (h *PredicatedEventHandler) PostProcess(ctx context.Context, event *api.Eve
return fmt.Errorf("error finding ready instances: %v", err)
}

eventInstances, err := h.eventInstanceDao.GetInstancesByEventID(ctx, event.ID)
processedInstances, err := h.eventInstanceDao.GetInstancesBySpecEventID(ctx, event.ID)
if err != nil {
return fmt.Errorf("error finding processed server instances for event %s: %v", event.ID, err)
return fmt.Errorf("error finding processed instances for event %s: %v", event.ID, err)
}

// should never happen. If the event is not processed by any instance, return an error
if len(processedInstances) == 0 {
klog.V(10).Infof("Event %s is not processed by any instance", event.ID)
return fmt.Errorf("event %s is not processed by any instance", event.ID)
}

// check if all instances have processed the event
if !compareStrings(activeInstances, eventInstances) {
klog.V(10).Infof("Event %s is not processed by all instances, handled by %v, active instances %v", event.ID, eventInstances, activeInstances)
return fmt.Errorf("event %s is not processed by all instances", event.ID)
// 1. In normal case, the activeInstances == eventInstances, mark the event as reconciled
// 2. If maestro server instance is up, but has't been marked as ready, then activeInstances < eventInstances,
// it's ok to mark the event as reconciled, as the instance is not ready to sever the request, no connected agents.
// 3. If maestro server instance is down, but has been marked as unready, it may still have connected agents, but
// the instance has stopped to handle the event, so activeInstances > eventInstances, the event should be equeued.
if !isSubSet(activeInstances, processedInstances) {
klog.V(10).Infof("Event %s is not processed by all active instances %v, handled by %v", event.ID, activeInstances, processedInstances)
return fmt.Errorf("event %s is not processed by all active instances", event.ID)
}

// update the event with the reconciled date
Expand All @@ -147,12 +158,8 @@ func (h *PredicatedEventHandler) PostProcess(ctx context.Context, event *api.Eve
return nil
}

// compareStrings compares two string slices and returns true if they are equal
func compareStrings(a, b []string) bool {
if len(a) != len(b) {
return false
}

// isSubSet checks if slice a is a subset of slice b
func isSubSet(a, b []string) bool {
for _, v := range a {
found := false
for _, vv := range b {
Expand Down
44 changes: 39 additions & 5 deletions pkg/controllers/event_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func TestPredicatedEventHandler(t *testing.T) {
RegisterTestingT(t)

currentInstanceID := "test-instance"
anotherInstanceID := "another-instance"
source := "my-event-source"
ctx := context.Background()
eventsDao := mocks.NewEventDao()
Expand All @@ -87,13 +88,15 @@ func TestPredicatedEventHandler(t *testing.T) {
eventServer := &exampleEventServer{eventDao: eventsDao}
eventHandler := NewPredicatedEventHandler(eventServer.PredicateEvent, events, eventInstancesDao, instancesDao)

// current instance is ready
_, _ = instancesDao.Create(ctx, &api.ServerInstance{
Meta: api.Meta{ID: currentInstanceID},
Ready: true,
})

// second instance is not ready
_, _ = instancesDao.Create(ctx, &api.ServerInstance{
Meta: api.Meta{ID: "another-instance"},
Meta: api.Meta{ID: anotherInstanceID},
Ready: false,
})

Expand All @@ -111,40 +114,71 @@ func TestPredicatedEventHandler(t *testing.T) {
EventType: api.CreateEventType,
})

// handle event 1
shouldProcess, err := eventHandler.ShouldHandleEvent(ctx, "1")
Expect(err).To(BeNil())
Expect(shouldProcess).To(BeTrue())

_, err = eventInstancesDao.Create(ctx, &api.EventInstance{
EventID: "1",
InstanceID: currentInstanceID,
SpecEventID: "1",
InstanceID: currentInstanceID,
})
Expect(err).To(BeNil())

eventHandler.DeferredAction(ctx, "1")

// simulate the second instance handled the event, although it has not been marked as ready
_, err = eventInstancesDao.Create(ctx, &api.EventInstance{
SpecEventID: "1",
InstanceID: anotherInstanceID,
})
Expect(err).To(BeNil())

event, err := eventsDao.Get(ctx, "1")
Expect(err).To(BeNil())
Expect(event.ReconciledDate).To(BeNil())

// should post process the event the second instance is not ready
err = eventHandler.PostProcess(ctx, event)
Expect(err).To(BeNil())
Expect(event.ReconciledDate).NotTo(BeNil())

event, err = eventsDao.Get(ctx, "1")
// mark the second instance as ready
err = instancesDao.MarkReadyByIDs(ctx, []string{anotherInstanceID})
Expect(err).To(BeNil())
Expect(event.ReconciledDate).NotTo(BeNil())

// handle event 2
shouldProcess, err = eventHandler.ShouldHandleEvent(ctx, "2")
Expect(err).To(BeNil())
Expect(shouldProcess).To(BeTrue())

// simulate the current instance handled the event, the second instance is shutting down
// before it handled the event
_, err = eventInstancesDao.Create(ctx, &api.EventInstance{
SpecEventID: "2",
InstanceID: currentInstanceID,
})
Expect(err).To(BeNil())

eventHandler.DeferredAction(ctx, "2")

event, err = eventsDao.Get(ctx, "2")
Expect(err).To(BeNil())
Expect(event.ReconciledDate).To(BeNil())

err = eventHandler.PostProcess(ctx, event)
Expect(err).NotTo(BeNil())
Expect(event.ReconciledDate).To(BeNil())

// mark the second instance as unready
err = instancesDao.MarkUnreadyByIDs(ctx, []string{anotherInstanceID})
Expect(err).To(BeNil())

// simulate requeue the event
err = eventHandler.PostProcess(ctx, event)
Expect(err).To(BeNil())
Expect(event.ReconciledDate).NotTo(BeNil())

shouldProcess, err = eventHandler.ShouldHandleEvent(ctx, "3")
Expect(err).NotTo(BeNil())
Expect(shouldProcess).To(BeFalse())
Expand Down
21 changes: 14 additions & 7 deletions pkg/controllers/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,26 @@ type exampleController struct {
func (d *exampleController) OnAdd(ctx context.Context, eventID, resourceID string) error {
d.addCounter++
_, err := d.eventInstancesDao.Create(ctx, &api.EventInstance{
EventID: eventID,
InstanceID: d.instanceID,
SpecEventID: eventID,
InstanceID: d.instanceID,
})
return err
}

func (d *exampleController) OnUpdate(ctx context.Context, eventID, resourceID string) error {
d.updateCounter++
_, err := d.eventInstancesDao.Create(ctx, &api.EventInstance{
EventID: eventID,
InstanceID: d.instanceID,
SpecEventID: eventID,
InstanceID: d.instanceID,
})
return err
}

func (d *exampleController) OnDelete(ctx context.Context, eventID, resourceID string) error {
d.deleteCounter++
_, err := d.eventInstancesDao.Create(ctx, &api.EventInstance{
EventID: eventID,
InstanceID: d.instanceID,
SpecEventID: eventID,
InstanceID: d.instanceID,
})
return err
}
Expand Down Expand Up @@ -105,6 +105,12 @@ func TestControllerFrameworkWithLockBasedEventHandler(t *testing.T) {

eve, _ := eventsDao.Get(ctx, "1")
Expect(eve.ReconciledDate).ToNot(BeNil(), "event reconcile date should be set")

eve, _ = eventsDao.Get(ctx, "2")
Expect(eve.ReconciledDate).ToNot(BeNil(), "event reconcile date should be set")

eve, _ = eventsDao.Get(ctx, "3")
Expect(eve.ReconciledDate).ToNot(BeNil(), "event reconcile date should be set")
}

type exampleEventServer struct {
Expand All @@ -123,6 +129,7 @@ func TestControllerFrameworkWithPredicatedEventHandler(t *testing.T) {
RegisterTestingT(t)

currentInstanceID := "test-instance"
anotherInstanceID := "another-instance"
ctx := context.Background()
eventsDao := mocks.NewEventDao()
events := services.NewEventService(eventsDao)
Expand All @@ -145,7 +152,7 @@ func TestControllerFrameworkWithPredicatedEventHandler(t *testing.T) {
})

_, _ = instancesDao.Create(ctx, &api.ServerInstance{
Meta: api.Meta{ID: "another-instance"},
Meta: api.Meta{ID: anotherInstanceID},
Ready: false,
})

Expand Down
31 changes: 15 additions & 16 deletions pkg/dao/event_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@ import (

type EventInstanceDao interface {
Get(ctx context.Context, eventID, instanceID string) (*api.EventInstance, error)
GetInstancesByEventID(ctx context.Context, eventID string) ([]string, error)
Create(ctx context.Context, eventInstance *api.EventInstance) (*api.EventInstance, error)

FindStatusEvents(ctx context.Context, ids []string) (api.EventInstanceList, error)
GetInstancesBySpecEventID(ctx context.Context, eventID string) ([]string, error)
FindEventInstancesByEventIDs(ctx context.Context, ids []string) (api.EventInstanceList, error)
GetEventsAssociatedWithInstances(ctx context.Context, instanceIDs []string) ([]string, error)
}

Expand All @@ -39,10 +38,19 @@ func (d *sqlEventInstanceDao) Get(ctx context.Context, eventID, instanceID strin
return &eventInstance, nil
}

func (d *sqlEventInstanceDao) GetInstancesByEventID(ctx context.Context, eventID string) ([]string, error) {
func (d *sqlEventInstanceDao) Create(ctx context.Context, eventInstance *api.EventInstance) (*api.EventInstance, error) {
g2 := (*d.sessionFactory).New(ctx)
if err := g2.Omit(clause.Associations).Create(eventInstance).Error; err != nil {
db.MarkForRollback(ctx, err)
return nil, err
}
return eventInstance, nil
}

func (d *sqlEventInstanceDao) GetInstancesBySpecEventID(ctx context.Context, specEventID string) ([]string, error) {
g2 := (*d.sessionFactory).New(ctx)
var eventInstances []api.EventInstance
if err := g2.Model(&api.EventInstance{}).Where("event_id = ?", eventID).Find(&eventInstances).Error; err != nil {
if err := g2.Model(&api.EventInstance{}).Where("spec_event_id = ?", specEventID).Find(&eventInstances).Error; err != nil {
return nil, err
}
instanceIDs := make([]string, len(eventInstances))
Expand All @@ -52,16 +60,7 @@ func (d *sqlEventInstanceDao) GetInstancesByEventID(ctx context.Context, eventID
return instanceIDs, nil
}

func (d *sqlEventInstanceDao) Create(ctx context.Context, eventInstance *api.EventInstance) (*api.EventInstance, error) {
g2 := (*d.sessionFactory).New(ctx)
if err := g2.Omit(clause.Associations).Create(eventInstance).Error; err != nil {
db.MarkForRollback(ctx, err)
return nil, err
}
return eventInstance, nil
}

func (d *sqlEventInstanceDao) FindStatusEvents(ctx context.Context, ids []string) (api.EventInstanceList, error) {
func (d *sqlEventInstanceDao) FindEventInstancesByEventIDs(ctx context.Context, ids []string) (api.EventInstanceList, error) {
g2 := (*d.sessionFactory).New(ctx)
eventInstances := api.EventInstanceList{}
if err := g2.Where("event_id in (?)", ids).Find(&eventInstances).Error; err != nil {
Expand All @@ -84,7 +83,7 @@ func (d *sqlEventInstanceDao) GetEventsAssociatedWithInstances(ctx context.Conte
// consider using join to optimize
if err := g2.Table("event_instances").
Select("event_id").
Where("instance_id IN ?", instanceIDs).
Where("instance_id IN (?) AND event_id IS NOT NULL", instanceIDs).
Group("event_id").
Having("COUNT(DISTINCT instance_id) = ?", instanceCount).
Scan(&eventIDs).Error; err != nil {
Expand Down
Loading

0 comments on commit 1cf6559

Please sign in to comment.