Skip to content

Commit

Permalink
broadcast after resource status updated in db (#88)
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <liuweixa@redhat.com>
  • Loading branch information
skeeey authored May 9, 2024
1 parent 2386c56 commit a35d649
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 29 deletions.
4 changes: 4 additions & 0 deletions cmd/maestro/server/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ func (svr *GRPCServer) Publish(ctx context.Context, pubReq *pbv1.PublishRequest)
return nil, fmt.Errorf("failed to parse cloud event type %s, %v", evt.Type(), err)
}

glog.V(4).Infof("receive the event with grpc, %s", evt)

// handler resync request
if eventType.Action == types.ResyncRequestAction {
err := svr.respondResyncStatusRequest(ctx, eventType.CloudEventsDataType, evt)
Expand Down Expand Up @@ -161,6 +163,8 @@ func (svr *GRPCServer) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
return fmt.Errorf("failed to encode resource %s to cloudevent: %v", res.ID, err)
}

glog.V(4).Infof("send the event with grpc, %s", evt)

// WARNING: don't use "pbEvt, err := pb.ToProto(evt)" to convert cloudevent to protobuf
pbEvt := &pbv1.CloudEvent{}
if err = grpcprotocol.WritePBMessage(context.TODO(), binding.ToMessage(evt), pbEvt); err != nil {
Expand Down
29 changes: 18 additions & 11 deletions cmd/maestro/server/pulse_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,16 +165,15 @@ func (s *PulseServer) startSubscription(ctx context.Context) {
return fmt.Errorf("unmatched consumer name %s for resource %s", resource.ConsumerName, resource.ID)
}

// set the resource source back for broadcast
resource.Source = found.Source

if !s.statusDispatcher.Dispatch(resource.ConsumerName) {
// the resource is not owned by the current instance, skip
log.V(4).Infof("skipping resource status update %s as it is not owned by the current instance", resource.ID)
return nil
}

// broadcast the resource status update event
resource.Source = found.Source
s.eventBroadcaster.Broadcast(resource)

// convert the resource status to cloudevent
evt, err := api.JSONMAPToCloudEvent(resource.Status)
if err != nil {
Expand All @@ -189,14 +188,22 @@ func (s *PulseServer) startSubscription(ctx context.Context) {

// if the resource has been deleted from agent, delete it from maestro
if meta.IsStatusConditionTrue(statusPayload.Conditions, common.ManifestsDeleted) {
if err := s.resourceService.Delete(ctx, resource.ID); err != nil {
return err
}
} else {
// update the resource status
if _, err := s.resourceService.UpdateStatus(ctx, resource); err != nil {
return err
if svcErr := s.resourceService.Delete(ctx, resource.ID); svcErr != nil {
return svcErr
}

s.eventBroadcaster.Broadcast(resource)
return nil
}
// update the resource status
_, updated, svcErr := s.resourceService.UpdateStatus(ctx, resource)
if svcErr != nil {
return svcErr
}

// broadcast the resource status updated only when the resource is updated
if updated {
s.eventBroadcaster.Broadcast(resource)
}
default:
return fmt.Errorf("unsupported action %s", action)
Expand Down
4 changes: 2 additions & 2 deletions pkg/client/cloudevents/source_client_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (s *SourceClientMock) OnCreate(ctx context.Context, id string) error {
return fmt.Errorf("failed to unmarshal resource status: %v", err)
}

newResource, serviceErr := s.ResourceService.UpdateStatus(ctx, resource)
newResource, _, serviceErr := s.ResourceService.UpdateStatus(ctx, resource)
if serviceErr != nil {
return fmt.Errorf("failed to update resource status: %v", serviceErr)
}
Expand Down Expand Up @@ -119,7 +119,7 @@ func (s *SourceClientMock) OnUpdate(ctx context.Context, id string) error {
return fmt.Errorf("failed to unmarshal resource status: %v", err)
}

newResource, serviceErr := s.ResourceService.UpdateStatus(ctx, resource)
newResource, _, serviceErr := s.ResourceService.UpdateStatus(ctx, resource)
if serviceErr != nil {
return fmt.Errorf("failed to update resource status: %v", serviceErr)
}
Expand Down
36 changes: 20 additions & 16 deletions pkg/services/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ type ResourceService interface {
Get(ctx context.Context, id string) (*api.Resource, *errors.ServiceError)
Create(ctx context.Context, resource *api.Resource) (*api.Resource, *errors.ServiceError)
Update(ctx context.Context, resource *api.Resource) (*api.Resource, *errors.ServiceError)
UpdateStatus(ctx context.Context, resource *api.Resource) (*api.Resource, *errors.ServiceError)
UpdateStatus(ctx context.Context, resource *api.Resource) (*api.Resource, bool, *errors.ServiceError)
MarkAsDeleting(ctx context.Context, id string) *errors.ServiceError
Delete(ctx context.Context, id string) *errors.ServiceError
All(ctx context.Context) (api.ResourceList, *errors.ServiceError)
Expand Down Expand Up @@ -132,7 +132,7 @@ func (s *sqlResourceService) Update(ctx context.Context, resource *api.Resource)
return updated, nil
}

func (s *sqlResourceService) UpdateStatus(ctx context.Context, resource *api.Resource) (*api.Resource, *errors.ServiceError) {
func (s *sqlResourceService) UpdateStatus(ctx context.Context, resource *api.Resource) (*api.Resource, bool, *errors.ServiceError) {
logger := logger.NewOCMLogger(ctx)
// Updates the resource status only when its status changes.
// If there are multiple requests at the same time, it will cause the race conditions among these
Expand All @@ -141,64 +141,68 @@ func (s *sqlResourceService) UpdateStatus(ctx context.Context, resource *api.Res
// Ensure that the transaction related to this lock always end.
defer s.lockFactory.Unlock(ctx, lockOwnerID)
if err != nil {
return nil, errors.DatabaseAdvisoryLock(err)
return nil, false, errors.DatabaseAdvisoryLock(err)
}

found, err := s.resourceDao.Get(ctx, resource.ID)
if err != nil {
return nil, handleGetError("Resource", "id", resource.ID, err)
return nil, false, handleGetError("Resource", "id", resource.ID, err)
}

// Make sure the requested resource version is consistent with its database version.
if found.Version != resource.Version {
logger.Warning(fmt.Sprintf("Updating status for stale resource; disregard as the latest version is: %d", found.Version))
return found, nil
logger.Warning(fmt.Sprintf("Updating status for stale resource; disregard it: id=%s, foundVersion=%d, wantedVersion=%d",
resource.ID, found.Version, resource.Version))
return found, false, nil
}

// New status is not changed, the update status action is not needed.
if reflect.DeepEqual(resource.Status, found.Status) {
return found, nil
return found, false, nil
}

resourceStatusEvent, err := api.JSONMAPToCloudEvent(resource.Status)
if err != nil {
return nil, errors.GeneralError("Unable to convert resource status to cloudevent: %s", err)
return nil, false, errors.GeneralError("Unable to convert resource status to cloudevent: %s", err)
}

logger.V(4).Info(fmt.Sprintf("Updating resource status with event %s", resourceStatusEvent))

sequenceID, err := cloudeventstypes.ToString(resourceStatusEvent.Context.GetExtensions()[cetypes.ExtensionStatusUpdateSequenceID])
if err != nil {
return nil, errors.GeneralError("Unable to get sequence ID from resource status: %s", err)
return nil, false, errors.GeneralError("Unable to get sequence ID from resource status: %s", err)
}

foundSequenceID := ""
if len(found.Status) != 0 {
foundStatusEvent, err := api.JSONMAPToCloudEvent(found.Status)
if err != nil {
return nil, errors.GeneralError("Unable to convert resource status to cloudevent: %s", err)
return nil, false, errors.GeneralError("Unable to convert resource status to cloudevent: %s", err)
}

foundSequenceID, err = cloudeventstypes.ToString(foundStatusEvent.Context.GetExtensions()[cetypes.ExtensionStatusUpdateSequenceID])
if err != nil {
return nil, errors.GeneralError("Unable to get sequence ID from found resource status: %s", err)
return nil, false, errors.GeneralError("Unable to get sequence ID from found resource status: %s", err)
}
}

newer, err := compareSequenceIDs(sequenceID, foundSequenceID)
if err != nil {
return nil, errors.GeneralError("Unable to compare sequence IDs: %s", err)
return nil, false, errors.GeneralError("Unable to compare sequence IDs: %s", err)
}
if !newer {
logger.Warning(fmt.Sprintf("Updating status for stale resource; disregard as the latest sequence ID is: %s", foundSequenceID))
return found, nil
logger.Warning(fmt.Sprintf("Updating status for stale resource; disregard it: id=%s, foundSequenceID=%s, wantedSequenceID=%s",
resource.ID, foundSequenceID, sequenceID))
return found, false, nil
}

found.Status = resource.Status
updated, err := s.resourceDao.Update(ctx, found)
if err != nil {
return nil, handleUpdateError("Resource", err)
return nil, false, handleUpdateError("Resource", err)
}

return updated, nil
return updated, true, nil
}

// MarkAsDeleting marks the resource as deleting by setting the deleted_at timestamp.
Expand Down

0 comments on commit a35d649

Please sign in to comment.