-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathshift.go
388 lines (332 loc) · 11.4 KB
/
shift.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
// Copyright 2020 Mustafa Turan. All rights reserved.
// Use of this source code is governed by a Apache License 2.0 license that can
// be found in the LICENSE file.
package shift
import (
"context"
"sync"
"time"
"github.com/mustafaturan/shift/counter"
"github.com/mustafaturan/shift/timer"
)
const (
// Version matches with the current version of the package
Version = "1.0.0"
)
// Shift is an optioned circuit breaker implementation
type Shift struct {
mutex sync.RWMutex
// Name is an identity for the circuit breaker to increase observability
// on failures
name string
// State of the circuit breaker. It can have open, half-open, close values
state State
// Counter is behaviour for circuit breaker metrics which supports the basic
// increment and reset operations
counter Counter
// ResetTimer is a duration builder for resetting the state
resetTimer Timer
// Resetter holds the timer which resets the circuit breaker state
resetter *time.Timer
// Invokers holds invokers per state. Invokers are also
invokers map[State]invoker
// Trippers
halfOpenCloser SuccessHandler
halfOpenOpener FailureHandler
closeOpener FailureHandler
successHandlers map[State][]SuccessHandler
failureHandlers map[State][]FailureHandler
// Restrictors are pre-callback actions which applies right before the
// invocations. The restrictors can block the invocation with error returns.
restrictors []Restrictor
// StateChangeHandlers are callbacks which called on every state changes
stateChangeHandlers []StateChangeHandler
}
const (
// optionDefaultInitialState default initial state
optionDefaultInitialState = StateClose
// optionDefaultResetTimer default wait time
optionDefaultResetTimer = 15 * time.Second
// optionDefaultInvocationTimeout default invocation timeout duration
optionDefaultInvocationTimeout = 5 * time.Second
// optionDefaultCounterCapacity default capacity for counter
optionDefaultCounterCapacity = 10
// optionDefaultCounterBucketDuration default duration for counter buckets
optionDefaultCounterBucketDuration = time.Second
// optionDefaultMinSuccessRatioForCloseOpener minimum success ratio required
// to keep the state as is
optionDefaultMinSuccessRatioForCloseOpener = 90.0
// optionDefaultMinSuccessRatioForHalfOpenOpener minimum success ratio required
// to keep the state as is
optionDefaultMinSuccessRatioForHalfOpenOpener = 70.0
// optionDefaultMinSuccessRatioForHalfOpenCloser minimum success ratio required
// to trip to 'close' state
optionDefaultMinSuccessRatioForHalfOpenCloser = 85.0
// optionDefaultMinRequests
optionDefaultMinRequests = 10
)
// New inits a new Circuit Breaker with given name and options
func New(name string, opts ...Option) (*Shift, error) {
s := &Shift{
name: name,
state: optionDefaultInitialState,
resetter: time.AfterFunc(time.Microsecond, func() {}),
invokers: map[State]invoker{
StateClose: &onCloseInvoker{
timeout: optionDefaultInvocationTimeout,
},
StateHalfOpen: &onHalfOpenInvoker{
timeout: optionDefaultInvocationTimeout,
},
StateOpen: &onOpenInvoker{},
},
failureHandlers: map[State][]FailureHandler{
StateClose: make([]FailureHandler, 0),
StateHalfOpen: make([]FailureHandler, 0),
StateOpen: make([]FailureHandler, 0),
},
successHandlers: map[State][]SuccessHandler{
StateClose: make([]SuccessHandler, 0),
StateHalfOpen: make([]SuccessHandler, 0),
StateOpen: make([]SuccessHandler, 0),
},
stateChangeHandlers: make([]StateChangeHandler, 0),
restrictors: make([]Restrictor, 0),
}
for _, opt := range opts {
err := opt(s)
if err != nil {
return nil, err
}
}
// Init the default counter if not specified
if s.counter == nil {
s.counter, _ = counter.NewTimeBucketCounter(
optionDefaultCounterCapacity,
optionDefaultCounterBucketDuration,
)
}
if s.resetTimer == nil {
s.resetTimer, _ = timer.NewConstantTimer(optionDefaultResetTimer)
}
s.invokers[StateClose].(*onCloseInvoker).timeoutCallback = func() {
s.counter.Increment(metricTimeout)
}
s.invokers[StateHalfOpen].(*onHalfOpenInvoker).timeoutCallback = func() {
s.counter.Increment(metricTimeout)
}
s.invokers[StateOpen].(*onOpenInvoker).rejectCallback = func() {
s.counter.Increment(metricReject)
}
if s.closeOpener == nil {
_ = WithOpener(StateClose, optionDefaultMinSuccessRatioForCloseOpener, optionDefaultMinRequests)(s)
}
s.failureHandlers[StateClose] = append([]FailureHandler{s.closeOpener}, s.failureHandlers[StateClose]...)
if s.halfOpenOpener == nil {
_ = WithOpener(StateHalfOpen, optionDefaultMinSuccessRatioForHalfOpenOpener, optionDefaultMinRequests)(s)
}
s.failureHandlers[StateHalfOpen] = append([]FailureHandler{s.closeOpener}, s.failureHandlers[StateHalfOpen]...)
if s.halfOpenCloser == nil {
_ = WithCloser(optionDefaultMinSuccessRatioForHalfOpenCloser, optionDefaultMinRequests)(s)
}
s.successHandlers[StateHalfOpen] = append([]SuccessHandler{s.halfOpenCloser}, s.successHandlers[StateHalfOpen]...)
return s, nil
}
// WithInitialState builds option to set initial state
func WithInitialState(state State) Option {
return func(s *Shift) error {
s.state = state
return nil
}
}
// WithInvocationTimeout builds option to set invocation timeout duration
func WithInvocationTimeout(duration time.Duration) Option {
return func(s *Shift) error {
s.invokers[StateClose].(*onCloseInvoker).timeout = duration
s.invokers[StateHalfOpen].(*onHalfOpenInvoker).timeout = duration
return nil
}
}
// WithResetTimer builds option to set reset timer
func WithResetTimer(t Timer) Option {
return func(s *Shift) error {
s.resetTimer = t
return nil
}
}
// WithCounter builds option to set stats counter
func WithCounter(c Counter) Option {
return func(s *Shift) error {
s.counter = c
return nil
}
}
// WithRestrictors builds option to set restrictors to restrict the invocations
// Restrictors does not effect the current state, but they can block the
// invocation depending on its own internal state values. If a restrictor blocks
// an invocation then it returns an error and `On Failure Handlers` get executed
// in order.
func WithRestrictors(restrictors ...Restrictor) Option {
return func(s *Shift) error {
for _, r := range restrictors {
if r == nil {
return &InvalidOptionError{
Name: "restrictor",
Message: "can't be nil",
}
}
}
s.restrictors = restrictors
return nil
}
}
// WithStateChangeHandlers builds option to set state change handlers, the
// provided handlers will be evaluate in the given order as option
func WithStateChangeHandlers(handlers ...StateChangeHandler) Option {
return func(s *Shift) error {
for _, h := range handlers {
if h == nil {
return &InvalidOptionError{
Name: "on state change handler",
Message: "can't be nil",
}
}
}
s.stateChangeHandlers = handlers
return nil
}
}
// WithSuccessHandlers builds option to set on failure handlers, the provided
// handlers will be evaluate in the given order as option
func WithSuccessHandlers(state State, handlers ...SuccessHandler) Option {
return func(s *Shift) error {
for _, h := range handlers {
if h == nil {
return &InvalidOptionError{
Name: "on success handler",
Message: "can't be nil",
}
}
}
s.successHandlers[state] = append(s.successHandlers[state], handlers...)
return nil
}
}
// WithFailureHandlers builds option to set on failure handlers, the
// provided handlers will be evaluate in the given order as option
func WithFailureHandlers(state State, handlers ...FailureHandler) Option {
return func(s *Shift) error {
for _, h := range handlers {
if h == nil {
return &InvalidOptionError{
Name: "failure handler",
Message: "can't be nil",
}
}
}
s.failureHandlers[state] = append(s.failureHandlers[state], handlers...)
return nil
}
}
// WithOpener builds an option to set the default failure criteria to trip to
// 'open' state. (If the failure criteria matches then the circuit breaker
// trips to the 'open' state.)
//
// As runtime behaviour, it prepends a failure handler for the given state to
// trip circuit breaker into the 'open' state when the given thresholds reached.
//
// Definitions of the params are
// state: StateClose, StateHalfOpen
// minSuccessRatio: min success ratio ratio to keep the Circuit Breaker as is
// minRequests: min number of requests before checking the ratio
//
// Params with example:
// state: StateClose, minSuccessRatio: 95%, minRequests: 10
// The above configuration means that:
// On 'close' state, at min 10 requests, if it calculates the success ratio less
// than or equal to 95% then will trip to 'open' state
func WithOpener(state State, minSuccessRatio float32, minRequests uint32) Option {
return func(s *Shift) error {
if !state.isClose() && !state.isHalfOpen() {
return &InvalidOptionError{
Name: "state for failure criteria",
Message: "can only be applied to 'close' and 'half open' states",
}
}
if minSuccessRatio <= 0.0 || minSuccessRatio > 100.0 {
return &InvalidOptionError{
Name: "min success ratio to trip to 'open' state",
Message: "can be greater than 0.0 and less than equal to 100.0",
}
}
if minRequests < 1 {
return &InvalidOptionError{
Name: "min requests to check success ratio",
Message: "must be positive int",
}
}
var handler OnFailure = func(ctx context.Context, _ error) {
stats := ctx.Value(CtxStats).(Stats)
requests := stats.SuccessCount + stats.FailureCount - stats.RejectCount
if requests < minRequests {
return
}
ratio := float32(stats.SuccessCount) / float32(requests) * 100
if ratio < minSuccessRatio {
_ = s.Trip(StateOpen, &FailureThresholdReachedError{})
}
}
if state.isHalfOpen() {
s.halfOpenOpener = handler
} else {
s.closeOpener = handler
}
return nil
}
}
// WithCloser builds an option to set the default success criteria trip to
// 'close' state. (If the success criteria matches then the circuit breaker
// trips to the 'close' state.)
//
// As runtime behaviour, it appends a success handler for the given state to
// trip circuit breaker into the 'close' state when the given thresholds reached
//
// Definitions of the params are
// state: StateHalfOpen(always half-open it is a hidden param)
// minSuccessRatio: min success ratio to trip the circuit breaker to close state
// minRequests: min number of requests before checking the ratio
//
// Params with example:
// state: StateHalfOpen, minSuccessRatio: 99.5%, minRequests: 1000
// The above configuration means that:
// On 'half-open' state, at min 1000 requests, if it counts 995 success then
// will trip to 'close' state
func WithCloser(minSuccessRatio float32, minRequests uint32) Option {
return func(s *Shift) error {
if minSuccessRatio <= 0.0 || minSuccessRatio > 100.0 {
return &InvalidOptionError{
Name: "min success ratio to trip to 'close' state",
Message: "can be greater than 0.0 and less than equal to 100.0",
}
}
if minRequests < 1 {
return &InvalidOptionError{
Name: "min requests to check success ratio",
Message: "must be positive int",
}
}
var handler OnSuccess = func(ctx context.Context, _ interface{}) {
stats := ctx.Value(CtxStats).(Stats)
requests := stats.SuccessCount + stats.FailureCount - stats.RejectCount
if requests < minRequests {
return
}
ratio := float32(stats.SuccessCount) / float32(requests) * 100
if ratio >= minSuccessRatio {
_ = s.Trip(StateClose)
}
}
s.halfOpenCloser = handler
return nil
}
}