Skip to content

Commit baabb14

Browse files
committed
oversight: improve handling of tree termination by trying to wait for children goroutines to stop before returning control
1 parent 2e4c6e6 commit baabb14

File tree

3 files changed

+62
-11
lines changed

3 files changed

+62
-11
lines changed

child_process.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,15 @@ func Transient() Restart { return func(err error) bool { return err != nil } }
133133
// are signaled to stop.
134134
type Shutdown func() (context.Context, context.CancelFunc)
135135

136+
type shutdownContextValue string
137+
138+
var detachableContext = shutdownContextValue("detachable")
139+
136140
// Infinity will wait until the process naturally dies.
137141
func Infinity() Shutdown {
138142
return func() (context.Context, context.CancelFunc) {
139-
return context.WithCancel(context.Background())
143+
ctx := context.WithValue(context.Background(), detachableContext, false)
144+
return context.WithCancel(ctx)
140145
}
141146
}
142147

@@ -148,6 +153,7 @@ const DefaultChildProcessTimeout = 5 * time.Second
148153
// detaching from the winding process.
149154
func Timeout(d time.Duration) Shutdown {
150155
return func() (context.Context, context.CancelFunc) {
151-
return context.WithTimeout(context.Background(), d)
156+
ctx := context.WithValue(context.Background(), detachableContext, true)
157+
return context.WithTimeout(ctx, d)
152158
}
153159
}

tree.go

+21-9
Original file line numberDiff line numberDiff line change
@@ -66,13 +66,14 @@ type Tree struct {
6666
stopped chan struct{}
6767

6868
// semaphore must be held when adding/deleting dynamic processes
69-
semaphore sync.Mutex
70-
strategy Strategy
71-
maxR int
72-
maxT time.Duration
73-
children map[string]childProcess // map of children name to child process
74-
childrenOrder []string
75-
processChanged chan struct{} // indicates that some change to process slice has been made
69+
semaphore sync.Mutex
70+
strategy Strategy
71+
maxR int
72+
maxT time.Duration
73+
children map[string]childProcess // map of children name to child process
74+
childrenOrder []string
75+
childrenWaitGroup sync.WaitGroup
76+
processChanged chan struct{} // indicates that some change to process slice has been made
7677

7778
logger Logger
7879

@@ -251,6 +252,7 @@ func (t *Tree) Start(rootCtx context.Context) error {
251252
if err := t.err(); err != nil {
252253
return err
253254
}
255+
defer t.childrenWaitGroup.Wait()
254256
ctx, cancel := context.WithCancel(rootCtx)
255257
defer cancel()
256258
t.gracefulCancel = cancel
@@ -348,7 +350,14 @@ func (t *Tree) handleTreeChanges(ctx context.Context, cancel context.CancelFunc)
348350

349351
func (t *Tree) startChildProcess(ctx context.Context, processID int, p *ChildProcessSpecification, startSemaphore <-chan struct{}) {
350352
childCtx, childWg, procState := t.plugStop(ctx, processID, p)
353+
detachable := childCtx.Value(detachableContext) == true
354+
if !detachable {
355+
t.childrenWaitGroup.Add(1)
356+
}
351357
go func() {
358+
if !detachable {
359+
defer t.childrenWaitGroup.Done()
360+
}
352361
defer childWg.Done()
353362
<-startSemaphore
354363
t.logger.Println(p.Name, "child started")
@@ -369,13 +378,16 @@ func (t *Tree) startChildProcess(ctx context.Context, processID int, p *ChildPro
369378
type oversightValue string
370379

371380
func (t *Tree) plugStop(ctx context.Context, processID int, p *ChildProcessSpecification) (context.Context, *sync.WaitGroup, *state) {
372-
childCtx, childCancel := context.WithCancel(context.WithValue(ctx, oversightValue("name"), p.Name))
381+
stopCtx, stopCancel := p.Shutdown()
382+
baseCtx := ctx
383+
baseCtx = context.WithValue(baseCtx, oversightValue("name"), p.Name)
384+
baseCtx = context.WithValue(baseCtx, detachableContext, stopCtx.Value(detachableContext))
385+
childCtx, childCancel := context.WithCancel(baseCtx)
373386
var childWg sync.WaitGroup
374387
childWg.Add(1)
375388
childProc := t.children[p.Name]
376389
childProc.state.setRunning(func() {
377390
t.logger.Println(p.Name, "stopping")
378-
stopCtx, stopCancel := p.Shutdown()
379391
defer stopCancel()
380392
wgComplete := make(chan struct{})
381393
childCancel()

tree_test.go

+33
Original file line numberDiff line numberDiff line change
@@ -1063,10 +1063,43 @@ func TestTree_shutdownOrder(t *testing.T) {
10631063
}
10641064

10651065
func TestPanicDoubleStart(t *testing.T) {
1066+
t.Parallel()
10661067
var tree oversight.Tree
10671068
oversight.NeverHalt()(&tree)
10681069
ctx, cancel := context.WithCancel(context.Background())
10691070
tree.Add(oversight.ChildProcessSpecification{Name: "child", Start: func(context.Context) error { cancel(); return nil }})
10701071
tree.Start(ctx)
10711072
tree.Start(ctx)
10721073
}
1074+
1075+
func TestWaitAfterStart(t *testing.T) {
1076+
t.Parallel()
1077+
tree := oversight.New(oversight.NeverHalt())
1078+
var (
1079+
mu sync.Mutex
1080+
count int
1081+
)
1082+
ctx, cancel := context.WithCancel(context.Background())
1083+
for i := 0; i < 10; i++ {
1084+
mu.Lock()
1085+
count++
1086+
mu.Unlock()
1087+
tree.Add(oversight.ChildProcessSpecification{
1088+
Start: func(ctx context.Context) error {
1089+
<-ctx.Done()
1090+
mu.Lock()
1091+
count--
1092+
mu.Unlock()
1093+
return nil
1094+
},
1095+
Restart: oversight.Temporary(),
1096+
})
1097+
}
1098+
time.AfterFunc(1*time.Second, cancel)
1099+
if err := tree.Start(ctx); err != nil {
1100+
t.Fatal(err)
1101+
}
1102+
if count != 0 {
1103+
t.Fatal("run returned before all children returned")
1104+
}
1105+
}

0 commit comments

Comments
 (0)