From 9c8fa40048e96f55591f907b9ce8f5f79c41d12d Mon Sep 17 00:00:00 2001 From: galih rivanto Date: Tue, 9 Feb 2021 19:01:55 +0700 Subject: [PATCH 1/4] restructure command queue --- action.go | 19 +++-- proxy.go | 217 ++++++++++++++++++++++++++++++++---------------------- 2 files changed, 140 insertions(+), 96 deletions(-) diff --git a/action.go b/action.go index 533e4ca..e2c4db5 100644 --- a/action.go +++ b/action.go @@ -110,7 +110,7 @@ func FileOpen(filePath string) string { // FileClose . func FileClose() string { - return "file-close:" + return "file-close" } // SelectAll . @@ -144,15 +144,15 @@ func SelectClear() string { } // InvertOption define option when invert selection -type InvertOption string +type InvertOption string // invert selection option const ( - InvertOptionAll InvertOption = "all" - InvertOptionLayers = "layers" - InvertOptionNoLayers = "no-layers" - InvertOptionGroup = "group" - InvertOptionNoGroup = "no-group" + InvertOptionAll InvertOption = "all" + InvertOptionLayers = "layers" + InvertOptionNoLayers = "no-layers" + InvertOptionGroup = "group" + InvertOptionNoGroup = "no-group" ) // SelectInvert . @@ -164,3 +164,8 @@ func SelectInvert(option InvertOption) string { func SelectList() string { return "select-list" } + +// Version print inksscape version and return +func Version() string { + return "inkscape-version" +} diff --git a/proxy.go b/proxy.go index c4f06e8..df461d4 100644 --- a/proxy.go +++ b/proxy.go @@ -1,16 +1,14 @@ package inkscape import ( + "bufio" "bytes" "context" "errors" "fmt" - "io" - "io/ioutil" "log" "os/exec" "strings" - "sync" "time" "github.com/galihrivanto/runner" @@ -19,6 +17,7 @@ import ( const ( defaultCmdName = "inkscape" shellModeBanner = "Inkscape interactive shell mode" + quitCommand = "quit" ) // defines common errors in library @@ -37,8 +36,7 @@ func debug(v ...interface{}) { return } - log.Print("proxy:") - log.Println(v...) + log.Print(append([]interface{}{"proxy:"}, v...)...) } type chanWriter struct { @@ -61,11 +59,11 @@ type Proxy struct { ctx context.Context cancel context.CancelFunc - cmd *exec.Cmd + // limiter to allow one command processed at time + requestLimiter chan struct{} - // input - lock sync.RWMutex - stdin io.WriteCloser + // queue of request + requestQueue chan []byte // output stdout chan []byte @@ -83,31 +81,87 @@ func (p *Proxy) runBackground(ctx context.Context, commandPath string, vars ...s } cmd := exec.CommandContext(ctx, commandPath, args...) - cmd.Stdout = &chanWriter{p.stdout} - cmd.Stderr = &chanWriter{p.stderr} - stdin, err := cmd.StdinPipe() + // // pipe stderr + // stderrC := make(chan []byte) + // defer close(stderrC) + + // cmd.Stderr = &chanWriter{out: stderrC} + + // pipe stdout + stdoutC := make(chan []byte) + defer close(stdoutC) + + stdout, err := cmd.StdoutPipe() if err != nil { return err } - p.lock.Lock() - p.stdin = stdin - p.lock.Unlock() + stdoutReader := bufio.NewReader(stdout) + go func() { + for { - defer func() { - // only close channel when command closes - close(p.stdout) - close(p.stderr) + } }() - if err := cmd.Start(); err != nil { + // pipe stdin + stdin, err := cmd.StdinPipe() + if err != nil { return err } + defer stdin.Close() + // start command and wait it close debug("run in background") + if err := cmd.Start(); err != nil { + return err + } + + // make first command available + select { + case p.requestLimiter <- struct{}{}: + default: + // discard + } + + // handle command and output + for { + select { + case <-ctx.Done(): + return cmd.Wait() - return cmd.Wait() + case command := <-p.requestQueue: + debug("write command ", string(command)) + if _, err := stdin.Write(command); err != nil { + p.stderr <- []byte(err.Error()) + } + + // case byteErr := <-stderrC: + // if len(byteErr) == 0 { + // break + // } + + // if bytes.Contains(byteErr, []byte("WARNING")) { + // continue + // } + + // p.stderr <- byteErr + + case byteOut := <-stdoutC: + if len(byteOut) == 0 { + debug("new line, why?") + break + } + + // check if shell mode banner + if bytes.Contains(byteOut, []byte(shellModeBanner)) { + debug(string(byteOut)) + break + } + + p.stdout <- byteOut + } + } } // Run start inkscape proxy @@ -136,90 +190,55 @@ func (p *Proxy) Run(args ...string) error { // Close satisfy io.Closer interface func (p *Proxy) Close() error { - p.cancel() - p.stdin.Close() - - return nil -} + // send quit command + _, err := p.RawCommands(quitCommand) -// waitReady wait until background process -// ready accepting command -func (p *Proxy) waitReady(timeout time.Duration) error { - ready := make(chan struct{}) - go func() { - for { - // query stdin availability every second - p.lock.RLock() - if p.stdin != nil { - p.lock.RUnlock() - close(ready) - return - } - p.lock.RUnlock() - - <-time.After(time.Second) - } - }() + p.cancel() + close(p.requestLimiter) + close(p.requestQueue) + close(p.stderr) + close(p.stdout) - select { - case <-time.After(timeout): - return ErrCommandNotReady - case <-ready: - return nil - } + return err } func (p *Proxy) sendCommand(b []byte) ([]byte, error) { - debug("wait ready") - err := p.waitReady(30 * time.Second) - if err != nil { - return nil, err - } - debug("send command to stdin", string(b)) + // drain old err and out + drain(p.stderr) + drain(p.stdout) + // append new line if !bytes.HasSuffix(b, []byte{'\n'}) { b = append(b, '\n') } - _, err = p.stdin.Write(b) - if err != nil { - return nil, err - } + p.requestQueue <- b - // wait output - var output []byte + var ( + output []byte + err error + ) waitLoop: for { select { case bytesErr := <-p.stderr: - // for now, we can only check error message pattern - // ignore WARNING - if bytes.Contains(output, []byte("WARNING")) { - debug(string(bytesErr)) - break - } err = fmt.Errorf("%s", string(bytesErr)) break waitLoop - case output = <-p.stdout: - if len(output) == 0 { - break - } + case bytesOut := <-p.stdout: + fmt.Println(string(bytesOut)) - // check if shell mode banner - if bytes.Contains(output, []byte(shellModeBanner)) { - debug(string(output)) - break - } + // TODO: use sentinel + output = bytesOut break waitLoop } } - return output, nil + return output, err } // RawCommands send inkscape shell commands @@ -227,10 +246,19 @@ func (p *Proxy) RawCommands(args ...string) ([]byte, error) { buffer := bufferPool.Get() defer bufferPool.Put(buffer) + // wait available + debug("wait available") + <-p.requestLimiter + // construct command buffer buffer.WriteString(strings.Join(args, ";")) - return p.sendCommand(buffer.Bytes()) + res, err := p.sendCommand(buffer.Bytes()) + + // make it available again + p.requestLimiter <- struct{}{} + + return res, err } // Svg2Pdf convert svg input file to output pdf file @@ -247,32 +275,43 @@ func (p *Proxy) Svg2Pdf(svgIn, pdfOut string) error { debug("result", string(res)) + <-time.After(30 * time.Second) + return nil } // NewProxy create new inkscape proxy instance func NewProxy(opts ...Option) *Proxy { // default value - init := Options{ + options := Options{ commandName: defaultCmdName, maxRetry: 5, verbose: false, } // merge options - options := mergeOptions(init, opts...) + options = mergeOptions(options, opts...) // check verbosity - if !options.verbose { - log.SetOutput(ioutil.Discard) - } - - stdout := make(chan []byte) - stderr := make(chan []byte) + verbose = options.verbose return &Proxy{ options: options, - stdout: stdout, - stderr: stderr, + stdout: make(chan []byte, 100), + stderr: make(chan []byte, 100), + + // limit request to one request at time + requestLimiter: make(chan struct{}, 1), + requestQueue: make(chan []byte, 100), + } +} + +func drain(c chan []byte) { + for { + select { + case <-c: + default: + return + } } } From 0c307bbd963ab226420da4eea90eaabf4f7b090a Mon Sep 17 00:00:00 2001 From: galih rivanto Date: Fri, 12 Feb 2021 10:07:35 +0700 Subject: [PATCH 2/4] better synchronization between command execution --- proxy.go | 113 ++++++++++++++++++++++++++++++------------------------- 1 file changed, 61 insertions(+), 52 deletions(-) diff --git a/proxy.go b/proxy.go index df461d4..3211d88 100644 --- a/proxy.go +++ b/proxy.go @@ -1,7 +1,6 @@ package inkscape import ( - "bufio" "bytes" "context" "errors" @@ -82,27 +81,17 @@ func (p *Proxy) runBackground(ctx context.Context, commandPath string, vars ...s cmd := exec.CommandContext(ctx, commandPath, args...) - // // pipe stderr - // stderrC := make(chan []byte) - // defer close(stderrC) + // pipe stderr + stderrC := make(chan []byte) + defer close(stderrC) - // cmd.Stderr = &chanWriter{out: stderrC} + cmd.Stderr = &chanWriter{out: stderrC} // pipe stdout stdoutC := make(chan []byte) defer close(stdoutC) - stdout, err := cmd.StdoutPipe() - if err != nil { - return err - } - - stdoutReader := bufio.NewReader(stdout) - go func() { - for { - - } - }() + cmd.Stdout = &chanWriter{out: stdoutC} // pipe stdin stdin, err := cmd.StdinPipe() @@ -118,6 +107,15 @@ func (p *Proxy) runBackground(ctx context.Context, commandPath string, vars ...s } // make first command available + // after received prompt + for { + bytesOut := <-stdoutC + bytesOut = bytes.TrimSpace(bytesOut) + if isPrompt(bytesOut[len(bytesOut)-1:]) { + break + } + } + select { case p.requestLimiter <- struct{}{}: default: @@ -136,30 +134,23 @@ func (p *Proxy) runBackground(ctx context.Context, commandPath string, vars ...s p.stderr <- []byte(err.Error()) } - // case byteErr := <-stderrC: - // if len(byteErr) == 0 { - // break - // } - - // if bytes.Contains(byteErr, []byte("WARNING")) { - // continue - // } - - // p.stderr <- byteErr + case bytesErr := <-stderrC: + if len(bytesErr) == 0 { + break + } - case byteOut := <-stdoutC: - if len(byteOut) == 0 { - debug("new line, why?") + if bytes.Contains(bytesErr, []byte("WARNING")) { break } - // check if shell mode banner - if bytes.Contains(byteOut, []byte(shellModeBanner)) { - debug(string(byteOut)) + p.stderr <- bytes.TrimSpace(bytesErr) + + case bytesOut := <-stdoutC: + if len(bytesOut) == 0 { break } - p.stdout <- byteOut + p.stdout <- bytes.TrimSpace(bytesOut) } } } @@ -185,13 +176,17 @@ func (p *Proxy) Run(args ...string) error { ) }() - return nil + // print inkscape version + res, err := p.RawCommands(Version()) + fmt.Println(string(res)) + + return err } // Close satisfy io.Closer interface func (p *Proxy) Close() error { // send quit command - _, err := p.RawCommands(quitCommand) + _, err := p.sendCommand([]byte(quitCommand), false) p.cancel() close(p.requestLimiter) @@ -202,8 +197,21 @@ func (p *Proxy) Close() error { return err } -func (p *Proxy) sendCommand(b []byte) ([]byte, error) { - debug("send command to stdin", string(b)) +func (p *Proxy) sendCommand(b []byte, waitPrompt ...bool) ([]byte, error) { + wait := true + if len(waitPrompt) > 0 { + wait = waitPrompt[0] + } + + // wait available + debug("wait prompt available") + <-p.requestLimiter + defer func() { + // make it available again + p.requestLimiter <- struct{}{} + }() + + debug("send command to stdin ", string(b)) // drain old err and out drain(p.stderr) @@ -221,20 +229,26 @@ func (p *Proxy) sendCommand(b []byte) ([]byte, error) { err error ) + // immediate return + if !wait { + <-time.After(time.Second) + return []byte{}, nil + } + waitLoop: for { select { case bytesErr := <-p.stderr: - + debug(string(bytesErr)) err = fmt.Errorf("%s", string(bytesErr)) break waitLoop case bytesOut := <-p.stdout: - fmt.Println(string(bytesOut)) - - // TODO: use sentinel - output = bytesOut + debug(string(bytesOut)) + if isPrompt(bytesOut) { + break waitLoop + } - break waitLoop + output = append(output, bytesOut...) } } @@ -246,18 +260,11 @@ func (p *Proxy) RawCommands(args ...string) ([]byte, error) { buffer := bufferPool.Get() defer bufferPool.Put(buffer) - // wait available - debug("wait available") - <-p.requestLimiter - // construct command buffer buffer.WriteString(strings.Join(args, ";")) res, err := p.sendCommand(buffer.Bytes()) - // make it available again - p.requestLimiter <- struct{}{} - return res, err } @@ -275,8 +282,6 @@ func (p *Proxy) Svg2Pdf(svgIn, pdfOut string) error { debug("result", string(res)) - <-time.After(30 * time.Second) - return nil } @@ -306,6 +311,10 @@ func NewProxy(opts ...Option) *Proxy { } } +func isPrompt(data []byte) bool { + return bytes.Equal(data, []byte(">")) +} + func drain(c chan []byte) { for { select { From 087429d542e4cccdb1ff6bc738843742a5c0cd69 Mon Sep 17 00:00:00 2001 From: galih rivanto Date: Fri, 12 Feb 2021 10:09:12 +0700 Subject: [PATCH 3/4] add concurrent unit testing --- circle.svg | 3 +++ cmd/svg2pdf/main.go | 8 +++++++- option.go | 10 ++++++++++ proxy_test.go | 29 +++++++++++++++++++++++++++++ 4 files changed, 49 insertions(+), 1 deletion(-) create mode 100644 circle.svg create mode 100644 proxy_test.go diff --git a/circle.svg b/circle.svg new file mode 100644 index 0000000..2ae5da0 --- /dev/null +++ b/circle.svg @@ -0,0 +1,3 @@ + + + \ No newline at end of file diff --git a/cmd/svg2pdf/main.go b/cmd/svg2pdf/main.go index 395a4c5..9eb5cb3 100644 --- a/cmd/svg2pdf/main.go +++ b/cmd/svg2pdf/main.go @@ -3,6 +3,7 @@ package main import ( "flag" "fmt" + "log" "os" "github.com/galihrivanto/go-inkscape" @@ -11,6 +12,7 @@ import ( var ( svgInput string pdfOutput string + verbose bool ) func handleErr(err error) { @@ -23,6 +25,7 @@ func handleErr(err error) { func main() { flag.StringVar(&svgInput, "input", "", "svg input") flag.StringVar(&pdfOutput, "output", "result.pdf", "pdf output") + flag.BoolVar(&verbose, "verbose", false, "verbose output") flag.Parse() if svgInput == "" { @@ -30,7 +33,10 @@ func main() { os.Exit(1) } - proxy := inkscape.NewProxy(inkscape.Verbose(true)) + proxy := inkscape.NewProxy(inkscape.Verbose(verbose)) + + log.Println("run command") + err := proxy.Run() handleErr(err) defer proxy.Close() diff --git a/option.go b/option.go index adfaf1d..ebdd108 100644 --- a/option.go +++ b/option.go @@ -13,6 +13,9 @@ type Options struct { // maximum retry attempt maxRetry int + // maximum command queue size + commandQueueLength int + // set verbosity verbose bool } @@ -33,6 +36,13 @@ func MaxRetry(retry int) Option { } } +// CommandQueueLength override maximum command queue size +func CommandQueueLength(length int) Option { + return func(o *Options) { + o.commandQueueLength = length + } +} + // Verbose override log verbosity // useful for debugging func Verbose(verbose bool) Option { diff --git a/proxy_test.go b/proxy_test.go new file mode 100644 index 0000000..325d879 --- /dev/null +++ b/proxy_test.go @@ -0,0 +1,29 @@ +package inkscape + +import ( + "fmt" + "testing" +) + +func TestConcurrent(t *testing.T) { + tempFiles := make([]string, 0) + // defer func() { + // for _, t := range tempFiles { + // os.Remove(t) + // } + // }() + + proxy := NewProxy(Verbose(true)) + proxy.Run() + + for i := 0; i < 10; i++ { + temp := fmt.Sprintf("%d.pdf", i) + tempFiles = append(tempFiles, temp) + + go func() { + if err := proxy.Svg2Pdf("circle.svg", temp); err != nil { + t.Error(err) + } + }() + } +} From 9ef38ed4be732f4901ca10fec22a28281df13a70 Mon Sep 17 00:00:00 2001 From: galih rivanto Date: Mon, 15 Feb 2021 14:45:50 +0700 Subject: [PATCH 4/4] ignore error when querying version --- cmd/svg2pdf/main.go | 3 - cmd/svg2pdf/maincon.svg | 790 ++++++++++++++++++++++++++++++++++++++++ cmd/svg2pdf/xxx.svg | 1 + proxy.go | 19 +- 4 files changed, 804 insertions(+), 9 deletions(-) create mode 100644 cmd/svg2pdf/maincon.svg create mode 100644 cmd/svg2pdf/xxx.svg diff --git a/cmd/svg2pdf/main.go b/cmd/svg2pdf/main.go index 9eb5cb3..9210db0 100644 --- a/cmd/svg2pdf/main.go +++ b/cmd/svg2pdf/main.go @@ -3,7 +3,6 @@ package main import ( "flag" "fmt" - "log" "os" "github.com/galihrivanto/go-inkscape" @@ -35,8 +34,6 @@ func main() { proxy := inkscape.NewProxy(inkscape.Verbose(verbose)) - log.Println("run command") - err := proxy.Run() handleErr(err) defer proxy.Close() diff --git a/cmd/svg2pdf/maincon.svg b/cmd/svg2pdf/maincon.svg new file mode 100644 index 0000000..609057a --- /dev/null +++ b/cmd/svg2pdf/maincon.svg @@ -0,0 +1,790 @@ + + + + + + + image/svg+xml + + + + + + + + + + + CHANGI EAST + + Alan Xu Kefeng + SEMBCORP SPECIALISEDCONSTRUCTION + + + Serial No : + CON-0001 + + + diff --git a/cmd/svg2pdf/xxx.svg b/cmd/svg2pdf/xxx.svg new file mode 100644 index 0000000..0a35231 --- /dev/null +++ b/cmd/svg2pdf/xxx.svg @@ -0,0 +1 @@ +image/svg+xmlKAWI DEVLalalalaxcorpSerial No :SSC-9999 \ No newline at end of file diff --git a/proxy.go b/proxy.go index 9267cab..5c9a598 100644 --- a/proxy.go +++ b/proxy.go @@ -115,11 +115,15 @@ func (p *Proxy) runBackground(ctx context.Context, commandPath string, vars ...s // make first command available // after received prompt +wait: for { bytesOut := <-stdoutC bytesOut = bytes.TrimSpace(bytesOut) - if isPrompt(bytesOut[len(bytesOut)-1:]) { - break + parts := bytes.Split(bytesOut, []byte("\n")) + for _, part := range parts { + if isPrompt(part) { + break wait + } } } @@ -184,10 +188,10 @@ func (p *Proxy) Run(args ...string) error { }() // print inkscape version - res, err := p.RawCommands(Version()) + res, _ := p.RawCommands(Version()) fmt.Println(string(res)) - return err + return nil } // Close satisfy io.Closer interface @@ -251,8 +255,11 @@ waitLoop: break waitLoop case bytesOut := <-p.stdout: debug(string(bytesOut)) - if isPrompt(bytesOut) { - break waitLoop + parts := bytes.Split(bytesOut, []byte("\n")) + for _, part := range parts { + if isPrompt(part) { + break waitLoop + } } output = append(output, bytesOut...)