Skip to content

Commit

Permalink
Merge pull request #9270 from starius/goroutine-manager-bool
Browse files Browse the repository at this point in the history
fn: improvements for GoroutineManager
  • Loading branch information
guggero authored Nov 19, 2024
2 parents f4a1299 + 891e962 commit c136901
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 15 deletions.
16 changes: 7 additions & 9 deletions fn/goroutine_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,9 @@ package fn

import (
"context"
"errors"
"sync"
)

// ErrStopping is returned when trying to add a new goroutine while stopping.
var ErrStopping = errors.New("can not add goroutine, stopping")

// GoroutineManager is used to launch goroutines until context expires or the
// manager is stopped. The Stop method blocks until all started goroutines stop.
type GoroutineManager struct {
Expand All @@ -29,8 +25,10 @@ func NewGoroutineManager(ctx context.Context) *GoroutineManager {
}
}

// Go starts a new goroutine if the manager is not stopping.
func (g *GoroutineManager) Go(f func(ctx context.Context)) error {
// Go tries to start a new goroutine and returns a boolean indicating its
// success. It fails iff the goroutine manager is stopping or its context passed
// to NewGoroutineManager has expired.
func (g *GoroutineManager) Go(f func(ctx context.Context)) bool {
// Calling wg.Add(1) and wg.Wait() when wg's counter is 0 is a race
// condition, since it is not clear should Wait() block or not. This
// kind of race condition is detected by Go runtime and results in a
Expand All @@ -43,7 +41,7 @@ func (g *GoroutineManager) Go(f func(ctx context.Context)) error {
defer g.mu.Unlock()

if g.ctx.Err() != nil {
return ErrStopping
return false
}

g.wg.Add(1)
Expand All @@ -52,7 +50,7 @@ func (g *GoroutineManager) Go(f func(ctx context.Context)) error {
f(g.ctx)
}()

return nil
return true
}

// Stop prevents new goroutines from being added and waits for all running
Expand All @@ -66,7 +64,7 @@ func (g *GoroutineManager) Stop() {
// safe, since it can't run in parallel with wg.Add(1) call in Go, since
// we just cancelled the context and even if Go call starts running here
// after acquiring the mutex, it would see that the context has expired
// and return ErrStopping instead of calling wg.Add(1).
// and return false instead of calling wg.Add(1).
g.wg.Wait()
}

Expand Down
48 changes: 42 additions & 6 deletions fn/goroutine_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package fn

import (
"context"
"sync"
"testing"
"time"

Expand All @@ -19,7 +20,7 @@ func TestGoroutineManager(t *testing.T) {

taskChan := make(chan struct{})

require.NoError(t, m.Go(func(ctx context.Context) {
require.True(t, m.Go(func(ctx context.Context) {
<-taskChan
}))

Expand All @@ -37,7 +38,7 @@ func TestGoroutineManager(t *testing.T) {
require.Greater(t, stopDelay, time.Second)

// Make sure new goroutines do not start after Stop.
require.ErrorIs(t, m.Go(func(ctx context.Context) {}), ErrStopping)
require.False(t, m.Go(func(ctx context.Context) {}))

// When Stop() is called, the internal context expires and m.Done() is
// closed. Test this.
Expand All @@ -56,7 +57,7 @@ func TestGoroutineManagerContextExpires(t *testing.T) {

m := NewGoroutineManager(ctx)

require.NoError(t, m.Go(func(ctx context.Context) {
require.True(t, m.Go(func(ctx context.Context) {
<-ctx.Done()
}))

Expand All @@ -79,7 +80,7 @@ func TestGoroutineManagerContextExpires(t *testing.T) {
}

// Make sure new goroutines do not start after context expiry.
require.ErrorIs(t, m.Go(func(ctx context.Context) {}), ErrStopping)
require.False(t, m.Go(func(ctx context.Context) {}))

// Stop will wait for all goroutines to stop.
m.Stop()
Expand Down Expand Up @@ -107,15 +108,50 @@ func TestGoroutineManagerStress(t *testing.T) {
// implementation, this test crashes under `-race`.
for i := 0; i < 100; i++ {
taskChan := make(chan struct{})
err := m.Go(func(ctx context.Context) {
ok := m.Go(func(ctx context.Context) {
close(taskChan)
})
// If goroutine was started, wait for its completion.
if err == nil {
if ok {
<-taskChan
}
}

// Wait for Stop to complete.
<-stopChan
}

// TestGoroutineManagerStopsStress launches many Stop() calls in parallel with a
// task exiting. It attempts to catch a race condition between wg.Done() and
// wg.Wait() calls. According to documentation of wg.Wait() this is acceptable,
// therefore this test passes even with -race.
func TestGoroutineManagerStopsStress(t *testing.T) {
t.Parallel()

m := NewGoroutineManager(context.Background())

// jobChan is used to make the task to finish.
jobChan := make(chan struct{})

// Start a task and wait inside it until we start calling Stop() method.
ok := m.Go(func(ctx context.Context) {
<-jobChan
})
require.True(t, ok)

// Now launch many gorotines calling Stop() method in parallel.
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer wg.Done()
m.Stop()
}()
}

// Exit the task in parallel with Stop() calls.
close(jobChan)

// Wait until all the Stop() calls complete.
wg.Wait()
}

0 comments on commit c136901

Please sign in to comment.