forked from rhysd/actionlint
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprocess.go
117 lines (103 loc) · 3.58 KB
/
process.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package actionlint
import (
"context"
"fmt"
"io"
"os/exec"
"sync"
"golang.org/x/sync/errgroup"
"golang.org/x/sync/semaphore"
"golang.org/x/sys/execabs"
)
// concurrentProcess is a manager to run process concurrently. Since running process consumes OS
// resources, running too many processes concurrently causes some issues. On macOS, making new
// process hangs (see issue #3). And also running processes which opens files causes an error
// "pipe: too many files to open". To avoid it, this class manages how many processes are run at
// the same time.
type concurrentProcess struct {
ctx context.Context
sema *semaphore.Weighted
wg sync.WaitGroup
}
func newConcurrentProcess(par int) *concurrentProcess {
return &concurrentProcess{
ctx: context.Background(),
sema: semaphore.NewWeighted(int64(par)),
}
}
func runProcessWithStdin(exe string, args []string, stdin string) ([]byte, error) {
cmd := exec.Command(exe, args...)
cmd.Stderr = nil
p, err := cmd.StdinPipe()
if err != nil {
return nil, fmt.Errorf("could not make stdin pipe for %s process: %w", exe, err)
}
if _, err := io.WriteString(p, stdin); err != nil {
p.Close()
return nil, fmt.Errorf("could not write to stdin of %s process: %w", exe, err)
}
p.Close()
stdout, err := cmd.Output()
if err != nil {
if exitErr, ok := err.(*exec.ExitError); ok {
code := exitErr.ExitCode()
if code < 0 {
return nil, fmt.Errorf("%s was terminated. stderr: %q", exe, exitErr.Stderr)
}
if len(stdout) == 0 {
return nil, fmt.Errorf("%s exited with status %d but stdout was empty. stderr: %q", exe, code, exitErr.Stderr)
}
// Reaches here when exit status is non-zero and stdout is not empty, shellcheck successfully found some errors
} else {
return nil, err
}
}
return stdout, nil
}
func (proc *concurrentProcess) run(eg *errgroup.Group, exe string, args []string, stdin string, callback func([]byte, error) error) {
proc.sema.Acquire(proc.ctx, 1)
proc.wg.Add(1)
eg.Go(func() error {
defer proc.wg.Done()
stdout, err := runProcessWithStdin(exe, args, stdin)
proc.sema.Release(1)
return callback(stdout, err)
})
}
// wait waits all goroutines started by this concurrentProcess instance finish.
func (proc *concurrentProcess) wait() {
proc.wg.Wait() // Wait for all gorotines completing to shutdown
}
// newCommandRunner creates new external command runner for given executable. The executable path
// is resolved in this function.
func (proc *concurrentProcess) newCommandRunner(exe string) (*externalCommand, error) {
p, err := execabs.LookPath(exe)
if err != nil {
return nil, err
}
cmd := &externalCommand{
proc: proc,
exe: p,
}
return cmd, nil
}
// externalCommand is struct to run specific command concurrently with concurrentProcess bounding
// number of processes at the same time. This type manages fatal errors while running the command
// by using errgroup.Group. The wait() method must be called at the end for checking if some fatal
// error occurred.
type externalCommand struct {
proc *concurrentProcess
eg errgroup.Group
exe string
}
// run runs the command with given arguments and stdin. The callback function is called after the
// process runs. First argument is stdout and the second argument is an error while running the
// process.
func (cmd *externalCommand) run(args []string, stdin string, callback func([]byte, error) error) {
cmd.proc.run(&cmd.eg, cmd.exe, args, stdin, callback)
}
// wait waits until all goroutines for this command finish. Note that it does not wait for
// goroutines for other commands.
func (cmd *externalCommand) wait() error {
return cmd.eg.Wait()
}