-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathevents.go
109 lines (84 loc) · 2.92 KB
/
events.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package frame
import (
"context"
"encoding/json"
"errors"
)
type eventPayload struct {
ID string `json:",omitempty"`
Name string `json:",omitempty"`
Payload string `json:",omitempty"`
}
// EventI an interface to represent a system event. All logic of an event is handled in the execute task
// and can also emit other events into the system or if they don't emit an event the processFunc is deemed complete.
type EventI interface {
// Name represents the unique human readable id of the event that is used to pick it from the registry
//or route follow up processing for system to processFunc using this particular event
Name() string
// PayloadType determines the type of payload the event uses. This is useful for decoding queue data.
PayloadType() any
// Validate enables automatic validation of payload supplied to the event without handling it in the execute block
Validate(ctx context.Context, payload any) error
// Execute performs all the logic required to action a step in the sequence of events required to achieve the end goal.
Execute(ctx context.Context, payload any) error
}
// RegisterEvents Option to write an event or list of events into the service registry for future use.
// All events are unique and shouldn't share a name otherwise the last one registered will take presedence
func RegisterEvents(events ...EventI) Option {
return func(s *Service) {
if s.eventRegistry == nil {
s.eventRegistry = make(map[string]EventI)
}
for _, event := range events {
s.eventRegistry[event.Name()] = event
}
}
}
// Emit a simple method used to deploy
func (s *Service) Emit(ctx context.Context, name string, payload any) error {
payloadBytes, err := json.Marshal(payload)
if err != nil {
return err
}
e := eventPayload{Name: name, Payload: string(payloadBytes)}
config, ok := s.Config().(ConfigurationEvents)
if !ok {
s.L(ctx).Warn("configuration object not of type : ConfigurationDefault")
return errors.New("could not cast config to ConfigurationEvents")
}
// Queue event message for further processing
err = s.Publish(ctx, config.GetEventsQueueName(), e)
if err != nil {
s.L(ctx).WithError(err).WithField("name", name).Error("Could not emit event")
return err
}
return nil
}
type eventQueueHandler struct {
service *Service
}
func (eq *eventQueueHandler) Handle(ctx context.Context, header map[string]string, payload []byte) error {
evtPyl := &eventPayload{}
err := json.Unmarshal(payload, evtPyl)
if err != nil {
return err
}
eventHandler, ok := eq.service.eventRegistry[evtPyl.Name]
if !ok {
eq.service.L(ctx).WithField("event", evtPyl.Name).Error("Could not get event from registry")
}
payLType := eventHandler.PayloadType()
err = json.Unmarshal([]byte(evtPyl.Payload), payLType)
if err != nil {
return err
}
err = eventHandler.Validate(ctx, payLType)
if err != nil {
return err
}
err = eventHandler.Execute(ctx, payLType)
if err != nil {
return err
}
return nil
}