-
Notifications
You must be signed in to change notification settings - Fork 60
/
Copy pathcheck.go
153 lines (130 loc) · 4.17 KB
/
check.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package rabbitmq
import (
"fmt"
"os"
"time"
"github.com/streadway/amqp"
)
const (
defaultExchange = "health_check"
)
var (
defaultConsumeTimeout = time.Second * 3
)
type (
// Config is the RabbitMQ checker configuration settings container.
Config struct {
// DSN is the RabbitMQ instance connection DSN. Required.
DSN string
// Exchange is the application health check exchange. If not set - "health_check" is used.
Exchange string
// RoutingKey is the application health check routing key within health check exchange.
// Can be an application or host name, for example.
// If not set - host name is used.
RoutingKey string
// Queue is the application health check queue, that binds to the exchange with the routing key.
// If not set - "<exchange>.<routing-key>" is used.
Queue string
// ConsumeTimeout is the duration that health check will try to consume published test message.
// If not set - 3 seconds
ConsumeTimeout time.Duration
// LogFunc is the callback function for errors logging during check.
// If not set logging is skipped.
LogFunc func(err error, details string, extra ...interface{})
}
)
// New creates new RabbitMQ health check that verifies the following:
// - connection establishing
// - getting channel from the connection
// - declaring topic exchange
// - declaring queue
// - binding a queue to the exchange with the defined routing key
// - publishing a message to the exchange with the defined routing key
// - consuming published message
func New(config Config) func() error {
(&config).defaults()
return func() error {
conn, err := amqp.Dial(config.DSN)
if err != nil {
config.LogFunc(err, "RabbitMQ health check failed on dial phase")
return err
}
defer func() {
if err := conn.Close(); err != nil {
config.LogFunc(err, "RabbitMQ health check failed to close connection")
}
}()
ch, err := conn.Channel()
if err != nil {
config.LogFunc(err, "RabbitMQ health check failed on getting channel phase")
return err
}
defer func() {
if err := ch.Close(); err != nil {
config.LogFunc(err, "RabbitMQ health check failed to close channel")
}
}()
if err := ch.ExchangeDeclare(config.Exchange, "topic", true, false, false, false, nil); err != nil {
config.LogFunc(err, "RabbitMQ health check failed during declaring exchange")
return err
}
if _, err := ch.QueueDeclare(config.Queue, false, false, false, false, nil); err != nil {
config.LogFunc(err, "RabbitMQ health check failed during declaring queue")
return err
}
if err := ch.QueueBind(config.Queue, config.RoutingKey, config.Exchange, false, nil); err != nil {
config.LogFunc(err, "RabbitMQ health check failed during binding")
return err
}
messages, err := ch.Consume(config.Queue, "", true, false, false, false, nil)
if err != nil {
config.LogFunc(err, "RabbitMQ health check failed during consuming")
return err
}
done := make(chan struct{})
go func() {
// block until: a message is received, or message channel is closed (consume timeout)
<-messages
// release the channel resources, and unblock the receive on done below
close(done)
// now drain any incidental remaining messages
for range messages {
}
}()
p := amqp.Publishing{Body: []byte(time.Now().Format(time.RFC3339Nano))}
if err := ch.Publish(config.Exchange, config.RoutingKey, false, false, p); err != nil {
config.LogFunc(err, "RabbitMQ health check failed during publishing")
return err
}
for {
select {
case <-time.After(config.ConsumeTimeout):
config.LogFunc(nil, "RabbitMQ health check failed due to consume timeout")
return fmt.Errorf("RabbitMQ health check failed due to consume timeout")
case <-done:
return nil
}
}
}
}
func (c *Config) defaults() {
if c.LogFunc == nil {
c.LogFunc = func(err error, details string, extra ...interface{}) {}
}
if c.Exchange == "" {
c.Exchange = defaultExchange
}
if c.RoutingKey == "" {
host, err := os.Hostname()
if nil != err {
c.RoutingKey = "-unknown-"
}
c.RoutingKey = host
}
if c.Queue == "" {
c.Queue = fmt.Sprintf("%s.%s", c.Exchange, c.RoutingKey)
}
if c.ConsumeTimeout == 0 {
c.ConsumeTimeout = defaultConsumeTimeout
}
}