From 3fa1581ba50599ca58402df425b1b4ae4a483fc0 Mon Sep 17 00:00:00 2001 From: askuy Date: Thu, 11 Mar 2021 13:27:52 +0800 Subject: [PATCH] document and cron --- README.md | 57 +++++++++++-------------- client/egrpc/container.go | 1 + client/ehttp/component.go | 2 +- client/ehttp/config.go | 14 +++--- client/ehttp/options.go | 4 +- examples/grpc/direct/client/config.toml | 4 +- examples/task/cron/config.toml | 8 ++-- task/ecron/component.go | 6 +-- task/ecron/config.go | 50 ++++++++++------------ task/ecron/container.go | 16 ++++--- task/ecron/options.go | 6 +-- 11 files changed, 80 insertions(+), 88 deletions(-) diff --git a/README.md b/README.md index 2d71932a..298add14 100644 --- a/README.md +++ b/README.md @@ -62,39 +62,30 @@ EGO是一个集成里各种工程实践的框架。通过组件化的设计模 ## 4 功能 -* server HTTP - * [例子](https://github.com/gotomicro/ego/tree/master/examples/server/http) - * [使用方式](https://ego.gocn.vip/frame/server/http.html) -* server gRPC - * [例子](https://github.com/gotomicro/ego/tree/master/examples/server/grpc) - * [使用方式](https://ego.gocn.vip/frame/server/grpc.html#example) -* task job - * [例子](https://github.com/gotomicro/ego/tree/master/examples/task/job) - * [使用方式](https://ego.gocn.vip/frame/task/job.html) -* task cron - * [例子](https://github.com/gotomicro/ego/tree/master/examples/task/cron) - * [使用方式](https://ego.gocn.vip/frame/task/cron.html) -* client HTTP - * [例子](https://github.com/gotomicro/ego/tree/master/examples/http/client) - * [使用方式](https://ego.gocn.vip/frame/client/http.html#example) -* client gRPC - * [直连例子](https://github.com/gotomicro/ego/tree/master/examples/grpc/direct) - * [ETCD例子](https://github.com/gotomicro/ego-component/tree/master/eetcd/examples) - * [使用方式](https://ego.gocn.vip/frame/client/grpc.html#example) -* client mysql - * [例子](https://github.com/gotomicro/ego-component/tree/master/egorm/examples/gorm) - * [使用方式](https://ego.gocn.vip/frame/client/gorm.html#example) -* client redis - * [例子](https://github.com/gotomicro/ego-component/tree/master/eredis/examples/redis) - * [使用方式](https://ego.gocn.vip/frame/client/redis.html#example) -* client mongo - * [例子](https://github.com/gotomicro/ego-component/tree/master/emongo) -* client kafka - * [例子](https://github.com/gotomicro/ego-component/tree/master/ekafka/examples) -* client wechat - * [例子](https://github.com/gotomicro/ego-component/tree/master/ewechat) -* client dingding - * [例子](https://github.com/gotomicro/ego-component/tree/master/edingtalk/examples) +|组件名称|代码|例子|文档| +| --- | --- | --- | --- | +|HTTP服务|[代码](./server/egin)|[例子](./examples/server/http)|[文档](https://ego.gocn.vip/frame/server/http.html)| +|gRPC服务|[代码](./server/egrpc)|[例子](./examples/server/grpc)|[文档](https://ego.gocn.vip/frame/server/grpc.html#example)| +|治理服务|[代码](./server/egovernor)|[例子](./examples/server/governor)|[文档](https://ego.gocn.vip/frame/server/governor.html)| +|短时任务|[代码](./task/ejob)|[例子](./examples/task/job)|[文档](https://ego.gocn.vip/frame/task/job.html)| +|常规定时任务|[代码](./task/ecron)|[例子](./examples/task/job)|[文档](https://ego.gocn.vip/frame/task/cron.html#_3-%E5%B8%B8%E8%A7%84%E5%AE%9A%E6%97%B6%E4%BB%BB%E5%8A%A1)| +|分布式定时任务|||[文档](https://ego.gocn.vip/frame/task/cron.html#_4-%E5%88%86%E5%B8%83%E5%BC%8F%E5%AE%9A%E6%97%B6%E4%BB%BB%E5%8A%A1)| +|调用HTTP|[代码](./client/ehttp)|[例子](./examples/http/client)|[文档](https://ego.gocn.vip/frame/client/http.html#example)| +|直连调用gRPC|[代码](./client/egrpc)|[例子](./examples/grpc/direct)|[文档](https://ego.gocn.vip/frame/client/grpc.html#_4-%E7%9B%B4%E8%BF%9Egrpc)| +|通过etcd调用gRPC||[例子](https://github.com/gotomicro/ego-component/tree/master/eetcd/examples)|[文档](https://ego.gocn.vip/frame/client/grpc.html#_5-%E4%BD%BF%E7%94%A8etcd%E7%9A%84grpc)| +|通过k8s调用gRPC||[例子](https://github.com/gotomicro/ego-component/tree/master/eetcd/examples)|[文档](https://ego.gocn.vip/frame/client/grpc.html#_6-%E4%BD%BF%E7%94%A8k8s%E7%9A%84grpc)| +|调用MySQL|[代码](https://github.com/gotomicro/ego-component/tree/master/egorm)|[例子](https://github.com/gotomicro/ego-component/tree/master/egorm/examples/gorm)|[文档](https://ego.gocn.vip/frame/client/gorm.html#example)| +|调用Redis|[代码](https://github.com/gotomicro/ego-component/tree/master/eredis)|[例子](https://github.com/gotomicro/ego-component/tree/master/eredis/examples/redis)|[文档](https://ego.gocn.vip/frame/client/redis.html#example)| +|调用Redis分布式锁|[代码](https://github.com/gotomicro/ego-component/tree/master/eredis)||| +|调用Mongo|[代码](https://github.com/gotomicro/ego-component/tree/master/emongo)||| +|调用Kafka|[代码](https://github.com/gotomicro/ego-component/tree/master/ekafka)||| +|调用ETCD|[代码](https://github.com/gotomicro/ego-component/tree/master/eetcd)||| +|调用K8S|[代码](https://github.com/gotomicro/ego-component/tree/master/ek8s)||| +|调用Oauth2|[代码](https://github.com/gotomicro/ego-component/tree/master/eoauth2)||| +|调用Wechat|[代码](https://github.com/gotomicro/ego-component/tree/master/ewechat)||| +|调用Dingtalk|[代码](https://github.com/gotomicro/ego-component/tree/master/edingtalk)||| +|调用Jira|[代码](https://github.com/gotomicro/ego-component/tree/master/ejira)||| + * 更多组件请查看:[https://github.com/gotomicro/ego-component](https://github.com/gotomicro/ego-component) diff --git a/client/egrpc/container.go b/client/egrpc/container.go index 500ace50..67b4ea38 100644 --- a/client/egrpc/container.go +++ b/client/egrpc/container.go @@ -30,6 +30,7 @@ func Load(key string) *Container { c.logger = c.logger.With(elog.FieldComponentName(key)) c.logger = c.logger.With(elog.FieldAddr(c.config.Addr)) c.name = key + return c } diff --git a/client/ehttp/component.go b/client/ehttp/component.go index 4c30da1e..cf785095 100644 --- a/client/ehttp/component.go +++ b/client/ehttp/component.go @@ -50,7 +50,7 @@ func newComponent(name string, config *Config, logger *elog.Component) *Componen elog.FieldAddr(rr.URL.Host), ) - if config.EnableAccessInterceptorReply { + if config.EnableAccessInterceptorRes { fields = append(fields, elog.FieldValueAny(respBody)) } diff --git a/client/ehttp/config.go b/client/ehttp/config.go index caca7a37..4a7a2201 100644 --- a/client/ehttp/config.go +++ b/client/ehttp/config.go @@ -6,13 +6,13 @@ import ( ) type Config struct { - Addr string // 连接地址 - Debug bool // 是否开启调试,默认不开启,开启后并加上export EGO_DEBUG=true,可以看到每次请求,配置名、地址、耗时、请求数据、响应数据 - RawDebug bool // 是否开启原生调试,默认不开启 - ReadTimeout time.Duration // 读超时,默认2s - SlowLogThreshold time.Duration // 慢日志记录的阈值,默认500ms - EnableAccessInterceptor bool // 是否开启记录请求数据,默认不开启 - EnableAccessInterceptorReply bool // 是否开启记录响应参数,默认不开启 + Addr string // 连接地址 + Debug bool // 是否开启调试,默认不开启,开启后并加上export EGO_DEBUG=true,可以看到每次请求,配置名、地址、耗时、请求数据、响应数据 + RawDebug bool // 是否开启原生调试,默认不开启 + ReadTimeout time.Duration // 读超时,默认2s + SlowLogThreshold time.Duration // 慢日志记录的阈值,默认500ms + EnableAccessInterceptor bool // 是否开启记录请求数据,默认不开启 + EnableAccessInterceptorRes bool // 是否开启记录响应参数,默认不开启 } // DefaultConfig ... diff --git a/client/ehttp/options.go b/client/ehttp/options.go index 6a6d320d..0718dc12 100644 --- a/client/ehttp/options.go +++ b/client/ehttp/options.go @@ -38,8 +38,8 @@ func WithEnableAccessInterceptor(enableAccessInterceptor bool) Option { } } -func WithEnableAccessInterceptorReply(enableAccessInterceptorReply bool) Option { +func WithEnableAccessInterceptorRes(enableAccessInterceptorRes bool) Option { return func(c *Container) { - c.config.EnableAccessInterceptorReply = enableAccessInterceptorReply + c.config.EnableAccessInterceptorRes = enableAccessInterceptorRes } } diff --git a/examples/grpc/direct/client/config.toml b/examples/grpc/direct/client/config.toml index 8233e06e..3c62982e 100644 --- a/examples/grpc/direct/client/config.toml +++ b/examples/grpc/direct/client/config.toml @@ -2,4 +2,6 @@ debug = true # 开启后并加上export EGO_DEBUG=true,可以看到每次grpc请求,配置名、地址、耗时、请求数据、响应数据 addr = "127.0.0.1:9002" enableAccessInterceptorReq = true - enableAccessInterceptorRes = true \ No newline at end of file + enableAccessInterceptorRes = true + + diff --git a/examples/task/cron/config.toml b/examples/task/cron/config.toml index 0af98656..d6c6fa29 100644 --- a/examples/task/cron/config.toml +++ b/examples/task/cron/config.toml @@ -1,5 +1,5 @@ [cron.test] - withSeconds = false - concurrentDelay= 1 - immediatelyRun = false - distributedTask = false +enableDistributedTask = false # 是否分布式任务,默认否,如果存在分布式任务,会只执行该定时人物 +enableImmediatelyRun = false # 是否立刻执行,默认否 +enableWithSeconds = false # 是否使用秒作解析器,默认否 +delayExecType = "skip" # skip,queue,concurrent,如果上一个任务执行较慢,到达了新任务执行时间,那么新任务选择跳过,排队,并发执行的策略,新任务默认选择skip策略 diff --git a/task/ecron/component.go b/task/ecron/component.go index 41187405..6366a6e1 100644 --- a/task/ecron/component.go +++ b/task/ecron/component.go @@ -86,7 +86,7 @@ func newComponent(name string, config *Config, logger *elog.Component) *Componen // Schedule ... func (c *Component) Schedule(schedule Schedule, job NamedJob) EntryID { - if c.config.ImmediatelyRun { + if c.config.EnableImmediatelyRun { schedule = &immediatelyScheduler{ Schedule: schedule, } @@ -132,7 +132,7 @@ func (c *Component) AddFunc(spec string, cmd func() error) (EntryID, error) { // Start ... func (c *Component) Start() error { - if c.config.DistributedTask { + if c.config.EnableDistributedTask { // 如果分布式的定时任务,那么就需要抢占锁 go func() { var err error @@ -172,7 +172,7 @@ func (c *Component) Start() error { // Stop ... func (c *Component) Stop() error { _ = c.Cron.Stop() - if c.config.DistributedTask { + if c.config.EnableDistributedTask { ctx, cancel := context.WithTimeout(context.Background(), c.config.WaitUnlockTime) defer cancel() err := c.config.locker.Unlock(ctx, c.lockerName()) diff --git a/task/ecron/config.go b/task/ecron/config.go index 4e679805..9fa4988e 100644 --- a/task/ecron/config.go +++ b/task/ecron/config.go @@ -15,38 +15,34 @@ import ( // Config ... type Config struct { - WithSeconds bool // 是否使用秒作解析器,默认否 - ConcurrentDelay int // 并发延迟,默认是执行超过定时时间后,下次执行的任务会跳过 - ImmediatelyRun bool // 是否立刻执行,默认否 - DistributedTask bool // 是否分布式任务,默认否,如果存在分布式任务,则会解析嵌入的etcd配置 - WaitLockTime time.Duration // 抢锁等待时间,默认60s - LockTTL time.Duration // 租期,默认60s - LockDir string // 定时任务锁目录 - RefreshTTL time.Duration // 刷新ttl,默认60s - WaitUnlockTime time.Duration // 抢锁等待时间,默认1s - Endpoints []string // etcd地址 - ConnectTimeout time.Duration // 连接超时时间,默认5s - Secure bool // 是否安全通信,默认false - AutoSyncInterval time.Duration // 自动同步member list的间隔 - wrappers []JobWrapper - parser cron.Parser - locker Locker + WaitLockTime time.Duration // 抢锁等待时间,默认60s + LockTTL time.Duration // 租期,默认60s + LockDir string // 定时任务锁目录 + RefreshTTL time.Duration // 刷新ttl,默认60s + WaitUnlockTime time.Duration // 抢锁等待时间,默认1s + DelayExecType string // skip,queue,concurrent,如果上一个任务执行较慢,到达了新任务执行时间,那么新任务选择跳过,排队,并发执行的策略,新任务默认选择skip策略 + EnableDistributedTask bool // 是否分布式任务,默认否,如果存在分布式任务,会只执行该定时人物 + EnableImmediatelyRun bool // 是否立刻执行,默认否 + EnableWithSeconds bool // 是否使用秒作解析器,默认否 + wrappers []JobWrapper + parser cron.Parser + locker Locker } // DefaultConfig ... func DefaultConfig() *Config { return &Config{ - WithSeconds: false, - ConcurrentDelay: -1, // skip - ImmediatelyRun: false, - DistributedTask: false, - WaitLockTime: xtime.Duration("60s"), - LockTTL: xtime.Duration("60s"), - RefreshTTL: xtime.Duration("50s"), - WaitUnlockTime: xtime.Duration("1s"), - LockDir: "/ecron/lock/%s/%s", - wrappers: []JobWrapper{}, - parser: cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor), + EnableWithSeconds: false, + EnableImmediatelyRun: false, + EnableDistributedTask: false, + DelayExecType: "skip", + WaitLockTime: xtime.Duration("60s"), + LockTTL: xtime.Duration("60s"), + RefreshTTL: xtime.Duration("50s"), + WaitUnlockTime: xtime.Duration("1s"), + LockDir: "/ecron/lock/%s/%s", + wrappers: []JobWrapper{}, + parser: cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor), } } diff --git a/task/ecron/container.go b/task/ecron/container.go index 37fb02d1..ad96e901 100644 --- a/task/ecron/container.go +++ b/task/ecron/container.go @@ -39,19 +39,21 @@ func (c *Container) Build(options ...Option) *Component { option(c) } - if c.config.WithSeconds { + if c.config.EnableWithSeconds { c.config.parser = cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor) } - if c.config.ConcurrentDelay > 0 { // 延迟 - c.config.wrappers = append(c.config.wrappers, delayIfStillRunning(c.logger)) - } else if c.config.ConcurrentDelay < 0 { // 跳过 + switch c.config.DelayExecType { + case "skip": + c.config.wrappers = append(c.config.wrappers, skipIfStillRunning(c.logger)) + case "queue": + c.config.wrappers = append(c.config.wrappers, queueIfStillRunning(c.logger)) + case "concurrent": + default: c.config.wrappers = append(c.config.wrappers, skipIfStillRunning(c.logger)) - } else { - // 默认不延迟也不跳过 } - if c.config.DistributedTask && c.config.locker == nil { + if c.config.EnableDistributedTask && c.config.locker == nil { c.logger.Panic("client locker nil", elog.FieldKey("use WithLocker method")) } diff --git a/task/ecron/options.go b/task/ecron/options.go index 0bf7a569..44427a42 100644 --- a/task/ecron/options.go +++ b/task/ecron/options.go @@ -8,10 +8,10 @@ import ( "github.com/robfig/cron/v3" ) -// delayIfStillRunning serializes jobs, delaying subsequent runs until the +// queueIfStillRunning serializes jobs, delaying subsequent runs until the // previous one is complete. Jobs running after a delay of more than a minute // have the delay logged at Info. -func delayIfStillRunning(logger *elog.Component) JobWrapper { +func queueIfStillRunning(logger *elog.Component) JobWrapper { return func(j Job) Job { var mu sync.Mutex return cron.FuncJob(func() { @@ -19,7 +19,7 @@ func delayIfStillRunning(logger *elog.Component) JobWrapper { mu.Lock() defer mu.Unlock() if dur := time.Since(start); dur > time.Minute { - logger.Info("cron delay", elog.String("duration", dur.String())) + logger.Info("cron queue", elog.String("duration", dur.String())) } j.Run() })