Skip to content

Commit

Permalink
mv cmd/* .
Browse files Browse the repository at this point in the history
  • Loading branch information
penglin2 committed Oct 25, 2024
1 parent 30267ef commit 7095185
Show file tree
Hide file tree
Showing 3 changed files with 279 additions and 0 deletions.
33 changes: 33 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package main

import (
"fmt"
xsfcli "github.com/xfyun/xsf/client"
"github.com/xfyun/xsf/utils"
_var "xtest/var"
)

func main() {
f := _var.NewFlag()
f.Parse()
// xrpc框架初始化;
cli, e := xsfcli.InitClient(_var.CliName, utils.CfgMode(0), utils.WithCfgName(*f.CmdCfg),
utils.WithCfgURL(""), utils.WithCfgPrj(""), utils.WithCfgGroup(""),
utils.WithCfgService(""), utils.WithCfgVersion(""))
if e != nil {
fmt.Println("cli xsf init fail with ", e.Error())
return
}

// cli配置初始化;
conf := _var.NewConf()
e = conf.ConfInit(cli.Cfg())
if e != nil {
fmt.Println("cli conf init fail with ", e.Error())
return
}
//fmt.Printf("%+v\n", conf)
x := NewXtest(cli, conf)
x.Run()
return
}
114 changes: 114 additions & 0 deletions xtest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
//go:build !linux

package main

import (
"fmt"
"github.com/pterm/pterm"
xsfcli "github.com/xfyun/xsf/client"
"sync"
"time"
"xtest/analy"
"xtest/request"
"xtest/resources"
"xtest/util"
"xtest/var"
)

type Xtest struct {
r request.Request
cli *xsfcli.Client
}

func NewXtest(cli *xsfcli.Client, conf _var.Conf) Xtest {
return Xtest{r: request.Request{C: conf}, cli: cli}
}

func (x *Xtest) Run() {
// 数据分析初始化、性能数据
analy.ErrAnalyser.Start(x.r.C.MultiThr, x.cli.Log, x.r.C.ErrAnaDst)
if x.r.C.PerfConfigOn {
analy.Perf = new(analy.PerfModule)
analy.Perf.Log = x.cli.Log
startErr := analy.Perf.Start()
if startErr != nil {
fmt.Println("failed to open req record file.", startErr.Error())
return
}
defer analy.Perf.Stop()
}
// 启动异步输出打印&落盘
var rwg sync.WaitGroup
x.cli.Log.Debugw("dropThr", "length of dropThr", x.r.C.DropThr)
for i := 0; i < x.r.C.DropThr; i++ {
rwg.Add(1)
go x.r.DownStreamWrite(&rwg, x.cli.Log)
}

var wg sync.WaitGroup

// jbzhou5
r := resources.NewResources() // 开启资源监听实例
stp := util.NewScheduledTaskPool() // 开启一个定时任务池
if x.r.C.PrometheusSwitch {
go r.Serve(x.r.C.PrometheusPort) // jbzhou5 启动一个协程写入Prometheus
}

if x.r.C.Plot {
r.GenerateData()
}

go util.ProgressShow(x.r.C.LoopCnt, x.r.C.LoopCnt.Load())

x.cli.Log.Debugw("multiThr", "length of multiThr", x.r.C.MultiThr)
for i := 0; i < x.r.C.MultiThr; i++ {
wg.Add(1)
go func() {
for {
loopIndex := x.r.C.LoopCnt.Load()
x.r.C.LoopCnt.Dec()
if x.r.C.LoopCnt.Load() < 0 {
break
}

switch x.r.C.ReqMode {
case 0:
info := x.r.OneShotCall(x.cli, loopIndex)
analy.ErrAnalyser.PushErr(info)
case 1:
info := x.r.SessionCall(x.cli, loopIndex) // loopIndex % len(stream.dataList)
analy.ErrAnalyser.PushErr(info)
case 2:
info := x.r.TextCall(x.cli, loopIndex) // loopIndex % len(stream.dataList)
analy.ErrAnalyser.PushErr(info)
case 3:
info := x.r.FileSessionCall(x.cli, loopIndex) // loopIndex % len(stream.dataList)
analy.ErrAnalyser.PushErr(info)
default:
println("Unsupported Mode!")
}
}
wg.Done()
}()
x.linearCtl() // 并发线性增长控制,防止瞬时并发请求冲击
}
wg.Wait()
// 关闭异步落盘协程&wait
close(x.r.C.AsyncDrop)
analy.ErrAnalyser.Stop()
rwg.Wait()
xsfcli.DestroyClient(x.cli)
stp.Stop() // 关闭定时任务
r.Stop() // 关闭资源收集
r.Dump() // 持久化资源日志
if x.r.C.Plot {
r.Draw(x.r.C.PlotFile)
}
pterm.DefaultBasicText.Println(pterm.LightGreen("\ncli finish "))
}

func (x *Xtest) linearCtl() {
if x.r.C.LinearNs > 0 {
time.Sleep(time.Duration(time.Nanosecond) * time.Duration(x.r.C.LinearNs))
}
}
132 changes: 132 additions & 0 deletions xtest_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
//go:build linux
// +build linux

package main

import (
"fmt"
"github.com/NVIDIA/gpu-monitoring-tools/bindings/go/nvml"
"github.com/pterm/pterm"
xsfcli "github.com/xfyun/xsf/client"
"log"
"sync"
"time"
"xtest/analy"
"xtest/request"
"xtest/resources"
"xtest/util"
"xtest/var"
)

type Xtest struct {
r request.Request
cli *xsfcli.Client
}

func NewXtest(cli *xsfcli.Client, conf _var.Conf) Xtest {
return Xtest{r: request.Request{C: conf}, cli: cli}
}

func (x *Xtest) Run() {
// 数据分析初始化、性能数据
analy.ErrAnalyser.Start(x.r.C.MultiThr, x.cli.Log, x.r.C.ErrAnaDst)
if x.r.C.PerfConfigOn {
analy.Perf = new(analy.PerfModule)
analy.Perf.Log = x.cli.Log
startErr := analy.Perf.Start()
if startErr != nil {
fmt.Println("failed to open req record file.", startErr.Error())
return
}
defer analy.Perf.Stop()
}
// 启动异步输出打印&落盘
var rwg sync.WaitGroup
for i := 0; i < x.r.C.DropThr; i++ {
rwg.Add(1)
go x.r.DownStreamWrite(&rwg, x.cli.Log)
}

var wg sync.WaitGroup

// jbzhou5
r := resources.NewResources() // 开启资源监听实例
stp := util.NewScheduledTaskPool() // 开启一个定时任务池
if x.r.C.PrometheusSwitch {
go r.Serve(x.r.C.PrometheusPort) // jbzhou5 启动一个协程写入Prometheus
}

if x.r.C.Plot {
r.GenerateData()
}

err := nvml.Init()
if err != nil {
log.Printf("can't get nvml lib..\n %s", err.Error())
x.r.C.GpuMon = false
} else {
defer nvml.Shutdown()

}

// 启动一个系统资源定时任务
stp.Start(time.Microsecond*100, func() {
err := r.ReadMem(&x.r.C)
if err != nil {
return
}
})

go util.ProgressShow(x.r.C.LoopCnt, x.r.C.LoopCnt.Load())

for i := 0; i < x.r.C.MultiThr; i++ {
wg.Add(1)
go func() {
for {
loopIndex := x.r.C.LoopCnt.Load()
x.r.C.LoopCnt.Dec()
if x.r.C.LoopCnt.Load() < 0 {
break
}

switch x.r.C.ReqMode {
case 0:
info := x.r.OneShotCall(x.cli, loopIndex)
analy.ErrAnalyser.PushErr(info)
case 1:
info := x.r.SessionCall(x.cli, loopIndex) // loopIndex % len(stream.dataList)
analy.ErrAnalyser.PushErr(info)
case 2:
info := x.r.TextCall(x.cli, loopIndex) // loopIndex % len(stream.dataList)
analy.ErrAnalyser.PushErr(info)
case 3:
info := x.r.FileSessionCall(x.cli, loopIndex) // loopIndex % len(stream.dataList)
analy.ErrAnalyser.PushErr(info)
default:
println("Unsupported Mode!")
}
}
wg.Done()
}()
x.linearCtl() // 并发线性增长控制,防止瞬时并发请求冲击
}
wg.Wait()
// 关闭异步落盘协程&wait
close(x.r.C.AsyncDrop)
analy.ErrAnalyser.Stop()
rwg.Wait()
xsfcli.DestroyClient(x.cli)
stp.Stop() // 关闭定时任务
r.Stop() // 关闭资源收集
r.Dump() // 持久化资源日志
if x.r.C.Plot {
r.Draw(x.r.C.PlotFile)
}
pterm.DefaultBasicText.Println(pterm.LightGreen("\ncli finish "))
}

func (x *Xtest) linearCtl() {
if x.r.C.LinearNs > 0 {
time.Sleep(time.Duration(time.Nanosecond) * time.Duration(x.r.C.LinearNs))
}
}

0 comments on commit 7095185

Please sign in to comment.