-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpublisher.go
176 lines (151 loc) · 5.32 KB
/
publisher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package pubsub
import (
"context"
"errors"
"fmt"
)
var (
// ErrResourceDoesNotExist indicates that the resource (topic, queue...) does not exist.
ErrResourceDoesNotExist = errors.New("name does not exist")
)
// Publisher describes the top level method to publish messages.
type Publisher interface {
Publish(ctx context.Context, topic string, messages ...*Message) error
}
// PublisherFunc is a function that can publish messages.
type PublisherFunc func(ctx context.Context, topic string, envelopes ...*Message) error
// Publish publishes messages invoking the function.
func (f PublisherFunc) Publish(ctx context.Context, topic string, envelopes ...*Message) error {
return f(ctx, topic, envelopes...)
}
// EnvelopePublisher publish envelopes where the data has already been
// marshalled.
//
// You can also use this interface directly is you want to handle the
// marshalling yourself, combined with the NoOpMarshaller for the
// router.
type EnvelopePublisher interface {
Publish(ctx context.Context, topic string, envelopes ...*Envelope) error
}
// EnvelopePublisherFunc is a function that can publish envelopes.
type EnvelopePublisherFunc func(ctx context.Context, topic string, envelopes ...*Envelope) error
// Publish publishes envelopes invoking the function.
func (f EnvelopePublisherFunc) Publish(ctx context.Context, topic string, envelopes ...*Envelope) error {
return f(ctx, topic, envelopes...)
}
// Envelope holds the data that need to be transmitted.
type Envelope struct {
ID MessageID
Name string
Key string
Body []byte
Version string
Attributes Attributes
}
// MarshallerPublisher will marshall a message and publish a message
// delegate the publishing of the envelope.
type MarshallerPublisher struct {
publisher EnvelopePublisher
marshaller Marshaller
}
// NewPublisher creates a new marshaller publisher.
func NewPublisher(publisher EnvelopePublisher, marshaller Marshaller) *MarshallerPublisher {
return &MarshallerPublisher{
publisher: publisher,
marshaller: marshaller,
}
}
// Publish a message to the given topic.
func (p *MarshallerPublisher) Publish(ctx context.Context, topic string, messages ...*Message) error {
envelopes, err := marshallMessages(p.marshaller, messages...)
if err != nil {
return err
}
return p.publisher.Publish(ctx, topic, envelopes...)
}
// PublisherHandler handles events and generates new
// messages that should be published.
type PublisherHandler interface {
HandleMessage(ctx context.Context, message *Message) ([]*Message, error)
}
// PublisherHandlerFunc function that can handle a message.
type PublisherHandlerFunc func(ctx context.Context, message *Message) ([]*Message, error)
// HandleMessage handles the message with the function.
func (f PublisherHandlerFunc) HandleMessage(ctx context.Context, message *Message) ([]*Message, error) {
return f(ctx, message)
}
// Handler is a helper that publishes messages generated by other handlers.
func (p *MarshallerPublisher) Handler(topic string, handler PublisherHandler) Handler {
return HandlerFunc(func(ctx context.Context, message *Message) error {
messages, err := handler.HandleMessage(ctx, message)
if err != nil {
return err
}
return p.Publish(ctx, topic, messages...)
})
}
type PublisherMiddleware = func(Publisher) Publisher
// CtxExtractorFunc is a function that given a
// context returns a value. Return empty string
// if not present.
type CtxExtractorFunc func(ctx context.Context) string
// CtxAttributeInjector is a publisher middleware that can set message
// attributes based on value present in the context.
func CtxAttributeInjector(attributeName string, extract CtxExtractorFunc) PublisherMiddleware {
return func(publisher Publisher) Publisher {
return PublisherFunc(func(ctx context.Context, topic string, envelopes ...*Message) error {
v := extract(ctx)
if v != "" {
for _, e := range envelopes {
e.SetAttribute(attributeName, v)
}
}
return publisher.Publish(ctx, topic, envelopes...)
})
}
}
// TopicAsEventName is a publisher middleware that will set the event name as the topic
// if the event name is empty.
func TopicAsEventName() PublisherMiddleware {
return func(publisher Publisher) Publisher {
return PublisherFunc(func(ctx context.Context, topic string, envelopes ...*Message) error {
for _, env := range envelopes {
if env.Name == "" {
env.Name = topic
}
}
return publisher.Publish(ctx, topic, envelopes...)
})
}
}
// WrapPublisher will wrap the publisher in the given middlewares.
func WrapPublisher(publisher Publisher, middlewares ...PublisherMiddleware) Publisher {
for _, mw := range middlewares {
publisher = mw(publisher)
}
return PublisherFunc(func(ctx context.Context, topic string, envelopes ...*Message) error {
return publisher.Publish(ctx, topic, envelopes...)
})
}
func marshallMessages(marshaller Marshaller, messages ...*Message) ([]*Envelope, error) {
envelopes := make([]*Envelope, len(messages))
for i, m := range messages {
body, version, err := marshaller.Marshal(m.Data)
if err != nil {
return nil, fmt.Errorf("marshaller error (%d): %w", i, err)
}
id := m.ID
if len(m.ID) == 0 {
id = NewID()
}
envelopes[i] = &Envelope{
ID: id,
Name: m.Name,
Key: m.Key,
Attributes: m.Attributes,
Body: body,
Version: version,
}
}
return envelopes, nil
}