Skip to content

Commit

Permalink
add success/fail handler options
Browse files Browse the repository at this point in the history
  • Loading branch information
ahab94 committed Oct 14, 2020
1 parent c5e954e commit 7e2da35
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 10 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,6 @@

# IDE
.idea

# vendor
vendor/
48 changes: 44 additions & 4 deletions concurrent.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,44 @@ import (
// Concurrent is an executor for concurrent executions
type Concurrent struct {
executor
engine *engine.Engine
block bool
engine *engine.Engine
block bool
successHandler func()
failHandler func(err error)
}

type ConcurrentOption func(*Concurrent)

// NewConcurrent - initializes concurrent executor; if completionBlock=true, it will block main routine until all tasks completed
func NewConcurrent(ctx context.Context, engine *engine.Engine, completionBlock bool) *Concurrent {
return &Concurrent{
func NewConcurrent(ctx context.Context, engine *engine.Engine, completionBlock bool, opts ...ConcurrentOption) *Concurrent {
con := &Concurrent{
executor: executor{
id: fmt.Sprintf("%s-%s", "concurrent", uuid.NewV4().String()),
ctx: ctx,
},
engine: engine,
block: completionBlock,
}

for _, opt := range opts {
opt(con)
}

return con
}

// ConcurrentFailHandler - inits fail handler
func ConcurrentFailHandler(fail func(err error)) ConcurrentOption {
return func(s *Concurrent) {
s.failHandler = fail
}
}

// ConcurrentSuccessHandler - inits success handler
func ConcurrentSuccessHandler(success func()) ConcurrentOption {
return func(c *Concurrent) {
c.successHandler = success
}
}

// Execute - executes all executables concurrently
Expand Down Expand Up @@ -52,3 +76,19 @@ func (c *Concurrent) executeDispatch() {
}
}
}

// OnSuccess - handles completion callback
func (c *Concurrent) OnSuccess() {
c.executor.OnSuccess()
if c.successHandler != nil {
c.successHandler()
}
}

// OnFailure - handles failure callback
func (c *Concurrent) OnFailure(err error) {
c.executor.OnFailure(err)
if c.failHandler != nil {
c.failHandler(err)
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ go 1.12
require (
github.com/ahab94/engine v0.1.0
github.com/kr/pretty v0.1.0 // indirect
github.com/pkg/errors v0.8.1
github.com/pkg/errors v0.8.1 // indirect
github.com/satori/go.uuid v1.2.0
github.com/sirupsen/logrus v1.4.2
)
46 changes: 43 additions & 3 deletions parallel.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,42 @@ import (
// Parallel is an executor for parallel executions
type Parallel struct {
executor
wg *sync.WaitGroup
wg *sync.WaitGroup
successHandler func()
failHandler func(err error)
}

type ParallelOption func(parallel *Parallel)

// NewParallel - initializes a parallel executor
func NewParallel(ctx context.Context) *Parallel {
return &Parallel{
func NewParallel(ctx context.Context, opts ...ParallelOption) *Parallel {
par := &Parallel{
executor: executor{
id: fmt.Sprintf("%s-%s", "parallel", uuid.NewV4().String()),
ctx: ctx,
},
wg: &sync.WaitGroup{},
}

for _, opt := range opts {
opt(par)
}

return par
}

// ParallelFailHandler - inits fail handler
func ParallelFailHandler(fail func(err error)) ParallelOption {
return func(p *Parallel) {
p.failHandler = fail
}
}

// ParallelSuccessHandler - inits success handler
func ParallelSuccessHandler(success func()) ParallelOption {
return func(p *Parallel) {
p.successHandler = success
}
}

// Execute - executes all executables In parallel
Expand Down Expand Up @@ -53,3 +77,19 @@ func (p *Parallel) executeWg() {
}
p.wg.Wait()
}

// OnSuccess - handles completion callback
func (p *Parallel) OnSuccess() {
p.executor.OnSuccess()
if p.successHandler != nil {
p.successHandler()
}
}

// OnFailure - handles failure callback
func (p *Parallel) OnFailure(err error) {
p.executor.OnFailure(err)
if p.failHandler != nil {
p.failHandler(err)
}
}
44 changes: 42 additions & 2 deletions sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,40 @@ import (
// Sequence is an executor for sequential executions
type Sequence struct {
executor
successHandler func()
failHandler func(err error)
}

type SequenceOption func(*Sequence)

// NewSequence - initializes a sequence executor
func NewSequence(ctx context.Context) *Sequence {
return &Sequence{
func NewSequence(ctx context.Context, opts ...SequenceOption) *Sequence {
seq := &Sequence{
executor: executor{
id: fmt.Sprintf("%s-%s", "sequence", uuid.NewV4().String()),
ctx: ctx,
},
}

for _, opt := range opts {
opt(seq)
}

return seq
}

// SequenceFailHandler - inits fail handler
func SequenceFailHandler(fail func(err error)) SequenceOption {
return func(s *Sequence) {
s.failHandler = fail
}
}

// SequenceSuccessHandler - inits success handler
func SequenceSuccessHandler(success func()) SequenceOption {
return func(s *Sequence) {
s.successHandler = success
}
}

// Execute - executes all executables sequentially
Expand All @@ -41,3 +65,19 @@ func (s *Sequence) Execute() error {
}
return nil
}

// OnSuccess - handles completion callback
func (s *Sequence) OnSuccess() {
s.executor.OnSuccess()
if s.successHandler != nil {
s.successHandler()
}
}

// OnFailure - handles failure callback
func (s *Sequence) OnFailure(err error) {
s.executor.OnFailure(err)
if s.failHandler != nil {
s.failHandler(err)
}
}

0 comments on commit 7e2da35

Please sign in to comment.