diff --git a/Makefile b/Makefile index af6ac8b..3fe615a 100644 --- a/Makefile +++ b/Makefile @@ -35,7 +35,6 @@ install-goreleaser: ## check license if not exist install go-lint tools build: mkdir -p bin $(GOBUILD) -v -o ./bin/xtest ./cmd - cp ./xtest.toml ./bin clean: rm -rf bin dist diff --git a/cmd/xtest.go b/cmd/xtest.go index d303240..a85216f 100644 --- a/cmd/xtest.go +++ b/cmd/xtest.go @@ -1,5 +1,4 @@ //go:build !linux -// +build !linux package main diff --git a/request/fileSession.go b/request/fileSession.go index eef8e44..8182450 100644 --- a/request/fileSession.go +++ b/request/fileSession.go @@ -63,10 +63,15 @@ func (r *Request) FileSessionCall(cli *xsfcli.Client, index int64) (info analy.E } select { - case r.C.AsyncDrop <- _var.OutputMeta{reqSid, outType, v.Meta.Name, v.Meta.Attribute, index, v.Data}: + case r.C.AsyncDrop <- _var.OutputMeta{Name: v.Meta.Name, Sid: reqSid, Type: outType, Desc: v.Meta.Attribute, Seq: index, Data: v.Data}: default: // 异步channel满, 同步写; key: sid-type-format-encoding, value: data - key := reqSid + "-" + outType + "-" + v.Meta.Name + key := reqSid + "-" + outType + "-" + v.Meta.Name + "-" + strconv.FormatInt(index, 10) + if outType == "image" { + key += ".jpg" + } else if outType == "text" { + key += ".txt" + } r.downOutput(key, v.Data, cli.Log) } } diff --git a/request/oneShot.go b/request/oneShot.go index f79f56e..a70c318 100644 --- a/request/oneShot.go +++ b/request/oneShot.go @@ -110,11 +110,15 @@ func (r *Request) OneShotCall(cli *xsfcli.Client, index int64) (info analy.ErrIn outType = "video" } select { - case r.C.AsyncDrop <- _var.OutputMeta{v.Meta.Name, sessId, - outType, v.Meta.Attribute, index, v.Data}: + case r.C.AsyncDrop <- _var.OutputMeta{Name: v.Meta.Name, Sid: sessId, Type: outType, Desc: v.Meta.Attribute, Seq: index, Data: v.Data}: default: // 异步channel满, 同步写; key: sid-type-name, value: data - key := sessId + "-" + outType + "-" + v.Meta.Name + key := sessId + "-" + outType + "-" + v.Meta.Name + "-" + strconv.FormatInt(index, 10) + if outType == "image" { + key += ".jpg" + } else if outType == "text" { + key += ".txt" + } r.downOutput(key, v.Data, cli.Log) } } diff --git a/request/output.go b/request/output.go index 9058c06..e5840da 100644 --- a/request/output.go +++ b/request/output.go @@ -3,15 +3,14 @@ package request import ( "fmt" "github.com/xfyun/xsf/utils" - "io/ioutil" "os" + "path/filepath" "strconv" "sync" ) // 下行数据异步落盘或打印 func (r *Request) DownStreamWrite(wg *sync.WaitGroup, log *utils.Logger) { - for { output, alive := <-r.C.AsyncDrop if !alive { @@ -24,6 +23,7 @@ func (r *Request) DownStreamWrite(wg *sync.WaitGroup, log *utils.Logger) { } else if output.Type == "text" { key += ".txt" } + r.downOutput(key, output.Data, log) } wg.Done() @@ -38,9 +38,6 @@ func (r *Request) downOutput(key string, data []byte, log *utils.Logger) { return } - //tmp := []byte(key + ":") - //tmp = append(tmp, data...) - //tmp = append(tmp, byte('\n')) wlen, err := fi.Write(data) if err != nil || wlen != len(data) { log.Errorw("downOutput Sync AppendFile fail", "err", err.Error(), "wlen", wlen, "key", key) @@ -49,7 +46,7 @@ func (r *Request) downOutput(key string, data []byte, log *utils.Logger) { } _ = fi.Close() case 1: // 输出至目录OutputDst - err := ioutil.WriteFile(r.C.OutputDst+"/"+key, data, os.ModePerm) + err := os.WriteFile(filepath.Join(r.C.OutputDst, key), data, os.ModePerm) if err != nil { log.Errorw("downOutput Sync WriteFile fail", "err", err.Error(), "key", key) return diff --git a/request/session.go b/request/session.go index a9602ad..c314773 100644 --- a/request/session.go +++ b/request/session.go @@ -1,7 +1,6 @@ package request import ( - "encoding/json" "errors" "github.com/golang/protobuf/proto" xsfcli "github.com/xfyun/xsf/client" @@ -18,9 +17,6 @@ import ( func (r *Request) SessionCall(cli *xsfcli.Client, index int64) (info analy.ErrInfo) { // 下行结果缓存 - - data, _ := json.Marshal(r.C.UpStreams) - println(string(data)) var indexs []int = make([]int, 0, len(r.C.UpStreams)) for _, v := range r.C.UpStreams { streamIndex := index % int64(len(v.DataList)) @@ -45,21 +41,21 @@ func (r *Request) SessionCall(cli *xsfcli.Client, index int64) (info analy.ErrIn } _ = r.sessAIExcp(cli, hdl, reqSid) // 结果落盘 - tmpMerge := make(map[int] /*streamId*/ *protocol.Payload) + tmpMerge := make(map[string] /*streamId*/ *protocol.Payload) cli.Log.Debugw("length of thrRslt", "length", len(thrRslt)) - for k, _ := range thrRslt { + for k := range thrRslt { for _, d := range thrRslt[k].Pl { - meta, exist := tmpMerge[k] + meta, exist := tmpMerge[d.Meta.Name] if exist { - tmpMerge[k].Data = append(meta.Data, d.Data...) + meta.Data = append(meta.Data, d.Data...) } else { - tmpMerge[k] = d + tmpMerge[d.Meta.Name] = d } } } - for seq, v := range tmpMerge { - var outType string = "invalidType" + for _, v := range tmpMerge { + var outType = "invalidType" switch v.Meta.DataType { case protocol.MetaDesc_TEXT: outType = "text" @@ -72,10 +68,15 @@ func (r *Request) SessionCall(cli *xsfcli.Client, index int64) (info analy.ErrIn } select { - case r.C.AsyncDrop <- _var.OutputMeta{v.Meta.Name, reqSid, outType, v.Meta.Attribute, int64(seq), v.Data}: + case r.C.AsyncDrop <- _var.OutputMeta{Name: v.Meta.Name, Sid: reqSid, Type: outType, Desc: v.Meta.Attribute, Seq: index, Data: v.Data}: default: // 异步channel满, 同步写; key: sid-type-format-encoding, value: data - key := reqSid + "-" + outType + "-" + v.Meta.Name + strconv.FormatInt(index, 10) + key := reqSid + "-" + outType + "-" + v.Meta.Name + "-" + strconv.FormatInt(index, 10) + if outType == "image" { + key += ".jpg" + } else if outType == "text" { + key += ".txt" + } r.downOutput(key, v.Data, cli.Log) } } @@ -130,7 +131,7 @@ func (r *Request) sessAIIn(cli *xsfcli.Client, indexs []int, thrRslt *[]protocol resp, ecode, err := caller.SessionCall(xsfcli.CREATE, r.C.SvcName, "AIIn", req, time.Duration(r.C.TimeOut+r.C.LossDeviation)*time.Millisecond) if err != nil { cli.Log.Errorw("sessAIIn Create request fail", "err", err.Error(), "code", ecode, "params", dataIn.Params) - analy.Perf.Record(reqSid, resp.Handle(), analy.DataBegin, analy.SessBegin, analy.DOWN, int(ecode), err.Error()) + analy.Perf.Record(reqSid, "", analy.DataBegin, analy.SessBegin, analy.DOWN, int(ecode), err.Error()) return hdl, status, analy.ErrInfo{ErrCode: int(ecode), ErrStr: err} } hdl = resp.Session() @@ -215,9 +216,7 @@ func (r *Request) multiUpStream(cli *xsfcli.Client, swg *sync.WaitGroup, session if dataSendLen[streamId] >= len(r.C.UpStreams[streamId].DataList[fileId]) { continue // 该上行数据流已发送完毕 } - //if dataSendLen[streamId] == 0 { - // upStatus = protocol.EngInputData_BEGIN - //} + if dataSendLen[streamId]+size >= len(r.C.UpStreams[streamId].DataList[fileId]) { size = len(r.C.UpStreams[streamId].DataList[fileId]) - dataSendLen[streamId] upStatus = protocol.EngInputData_END diff --git a/var/conf.go b/var/conf.go index 99689eb..1851c3e 100644 --- a/var/conf.go +++ b/var/conf.go @@ -350,7 +350,7 @@ func (c *Conf) secParseSvc(conf *utils.Configure) error { } c.DropThr = c.MultiThr if cnt, err := conf.GetInt64(secTmp, "loopCnt"); err == nil { - c.LoopCnt.Store(int64(cnt)) + c.LoopCnt.Store(cnt) } if perfOn, err := conf.GetBool(secTmp, "perfOn"); err == nil { @@ -539,7 +539,7 @@ func (c *Conf) secParseDStream(conf *utils.Configure) error { return err } } - err = os.MkdirAll(c.OutputDst, os.ModeDir) + err = os.MkdirAll(c.OutputDst, os.ModePerm) if err != nil { return err }