Skip to content

Commit

Permalink
Merge pull request #68 from askuy/feature/cron20210311
Browse files Browse the repository at this point in the history
document and cron
  • Loading branch information
askuy authored Mar 11, 2021
2 parents 45c4543 + 3fa1581 commit c6980d9
Show file tree
Hide file tree
Showing 11 changed files with 80 additions and 88 deletions.
57 changes: 24 additions & 33 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,39 +62,30 @@ EGO是一个集成里各种工程实践的框架。通过组件化的设计模


## 4 功能
* server HTTP
* [例子](https://github.com/gotomicro/ego/tree/master/examples/server/http)
* [使用方式](https://ego.gocn.vip/frame/server/http.html)
* server gRPC
* [例子](https://github.com/gotomicro/ego/tree/master/examples/server/grpc)
* [使用方式](https://ego.gocn.vip/frame/server/grpc.html#example)
* task job
* [例子](https://github.com/gotomicro/ego/tree/master/examples/task/job)
* [使用方式](https://ego.gocn.vip/frame/task/job.html)
* task cron
* [例子](https://github.com/gotomicro/ego/tree/master/examples/task/cron)
* [使用方式](https://ego.gocn.vip/frame/task/cron.html)
* client HTTP
* [例子](https://github.com/gotomicro/ego/tree/master/examples/http/client)
* [使用方式](https://ego.gocn.vip/frame/client/http.html#example)
* client gRPC
* [直连例子](https://github.com/gotomicro/ego/tree/master/examples/grpc/direct)
* [ETCD例子](https://github.com/gotomicro/ego-component/tree/master/eetcd/examples)
* [使用方式](https://ego.gocn.vip/frame/client/grpc.html#example)
* client mysql
* [例子](https://github.com/gotomicro/ego-component/tree/master/egorm/examples/gorm)
* [使用方式](https://ego.gocn.vip/frame/client/gorm.html#example)
* client redis
* [例子](https://github.com/gotomicro/ego-component/tree/master/eredis/examples/redis)
* [使用方式](https://ego.gocn.vip/frame/client/redis.html#example)
* client mongo
* [例子](https://github.com/gotomicro/ego-component/tree/master/emongo)
* client kafka
* [例子](https://github.com/gotomicro/ego-component/tree/master/ekafka/examples)
* client wechat
* [例子](https://github.com/gotomicro/ego-component/tree/master/ewechat)
* client dingding
* [例子](https://github.com/gotomicro/ego-component/tree/master/edingtalk/examples)
|组件名称|代码|例子|文档|
| --- | --- | --- | --- |
|HTTP服务|[代码](./server/egin)|[例子](./examples/server/http)|[文档](https://ego.gocn.vip/frame/server/http.html)|
|gRPC服务|[代码](./server/egrpc)|[例子](./examples/server/grpc)|[文档](https://ego.gocn.vip/frame/server/grpc.html#example)|
|治理服务|[代码](./server/egovernor)|[例子](./examples/server/governor)|[文档](https://ego.gocn.vip/frame/server/governor.html)|
|短时任务|[代码](./task/ejob)|[例子](./examples/task/job)|[文档](https://ego.gocn.vip/frame/task/job.html)|
|常规定时任务|[代码](./task/ecron)|[例子](./examples/task/job)|[文档](https://ego.gocn.vip/frame/task/cron.html#_3-%E5%B8%B8%E8%A7%84%E5%AE%9A%E6%97%B6%E4%BB%BB%E5%8A%A1)|
|分布式定时任务|||[文档](https://ego.gocn.vip/frame/task/cron.html#_4-%E5%88%86%E5%B8%83%E5%BC%8F%E5%AE%9A%E6%97%B6%E4%BB%BB%E5%8A%A1)|
|调用HTTP|[代码](./client/ehttp)|[例子](./examples/http/client)|[文档](https://ego.gocn.vip/frame/client/http.html#example)|
|直连调用gRPC|[代码](./client/egrpc)|[例子](./examples/grpc/direct)|[文档](https://ego.gocn.vip/frame/client/grpc.html#_4-%E7%9B%B4%E8%BF%9Egrpc)|
|通过etcd调用gRPC||[例子](https://github.com/gotomicro/ego-component/tree/master/eetcd/examples)|[文档](https://ego.gocn.vip/frame/client/grpc.html#_5-%E4%BD%BF%E7%94%A8etcd%E7%9A%84grpc)|
|通过k8s调用gRPC||[例子](https://github.com/gotomicro/ego-component/tree/master/eetcd/examples)|[文档](https://ego.gocn.vip/frame/client/grpc.html#_6-%E4%BD%BF%E7%94%A8k8s%E7%9A%84grpc)|
|调用MySQL|[代码](https://github.com/gotomicro/ego-component/tree/master/egorm)|[例子](https://github.com/gotomicro/ego-component/tree/master/egorm/examples/gorm)|[文档](https://ego.gocn.vip/frame/client/gorm.html#example)|
|调用Redis|[代码](https://github.com/gotomicro/ego-component/tree/master/eredis)|[例子](https://github.com/gotomicro/ego-component/tree/master/eredis/examples/redis)|[文档](https://ego.gocn.vip/frame/client/redis.html#example)|
|调用Redis分布式锁|[代码](https://github.com/gotomicro/ego-component/tree/master/eredis)|||
|调用Mongo|[代码](https://github.com/gotomicro/ego-component/tree/master/emongo)|||
|调用Kafka|[代码](https://github.com/gotomicro/ego-component/tree/master/ekafka)|||
|调用ETCD|[代码](https://github.com/gotomicro/ego-component/tree/master/eetcd)|||
|调用K8S|[代码](https://github.com/gotomicro/ego-component/tree/master/ek8s)|||
|调用Oauth2|[代码](https://github.com/gotomicro/ego-component/tree/master/eoauth2)|||
|调用Wechat|[代码](https://github.com/gotomicro/ego-component/tree/master/ewechat)|||
|调用Dingtalk|[代码](https://github.com/gotomicro/ego-component/tree/master/edingtalk)|||
|调用Jira|[代码](https://github.com/gotomicro/ego-component/tree/master/ejira)|||

* 更多组件请查看:[https://github.com/gotomicro/ego-component](https://github.com/gotomicro/ego-component)


Expand Down
1 change: 1 addition & 0 deletions client/egrpc/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func Load(key string) *Container {
c.logger = c.logger.With(elog.FieldComponentName(key))
c.logger = c.logger.With(elog.FieldAddr(c.config.Addr))
c.name = key

return c
}

Expand Down
2 changes: 1 addition & 1 deletion client/ehttp/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func newComponent(name string, config *Config, logger *elog.Component) *Componen
elog.FieldAddr(rr.URL.Host),
)

if config.EnableAccessInterceptorReply {
if config.EnableAccessInterceptorRes {
fields = append(fields, elog.FieldValueAny(respBody))
}

Expand Down
14 changes: 7 additions & 7 deletions client/ehttp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ import (
)

type Config struct {
Addr string // 连接地址
Debug bool // 是否开启调试,默认不开启,开启后并加上export EGO_DEBUG=true,可以看到每次请求,配置名、地址、耗时、请求数据、响应数据
RawDebug bool // 是否开启原生调试,默认不开启
ReadTimeout time.Duration // 读超时,默认2s
SlowLogThreshold time.Duration // 慢日志记录的阈值,默认500ms
EnableAccessInterceptor bool // 是否开启记录请求数据,默认不开启
EnableAccessInterceptorReply bool // 是否开启记录响应参数,默认不开启
Addr string // 连接地址
Debug bool // 是否开启调试,默认不开启,开启后并加上export EGO_DEBUG=true,可以看到每次请求,配置名、地址、耗时、请求数据、响应数据
RawDebug bool // 是否开启原生调试,默认不开启
ReadTimeout time.Duration // 读超时,默认2s
SlowLogThreshold time.Duration // 慢日志记录的阈值,默认500ms
EnableAccessInterceptor bool // 是否开启记录请求数据,默认不开启
EnableAccessInterceptorRes bool // 是否开启记录响应参数,默认不开启
}

// DefaultConfig ...
Expand Down
4 changes: 2 additions & 2 deletions client/ehttp/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ func WithEnableAccessInterceptor(enableAccessInterceptor bool) Option {
}
}

func WithEnableAccessInterceptorReply(enableAccessInterceptorReply bool) Option {
func WithEnableAccessInterceptorRes(enableAccessInterceptorRes bool) Option {
return func(c *Container) {
c.config.EnableAccessInterceptorReply = enableAccessInterceptorReply
c.config.EnableAccessInterceptorRes = enableAccessInterceptorRes
}
}
4 changes: 3 additions & 1 deletion examples/grpc/direct/client/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@
debug = true # 开启后并加上export EGO_DEBUG=true,可以看到每次grpc请求,配置名、地址、耗时、请求数据、响应数据
addr = "127.0.0.1:9002"
enableAccessInterceptorReq = true
enableAccessInterceptorRes = true
enableAccessInterceptorRes = true


8 changes: 4 additions & 4 deletions examples/task/cron/config.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[cron.test]
withSeconds = false
concurrentDelay= 1
immediatelyRun = false
distributedTask = false
enableDistributedTask = false # 是否分布式任务,默认否,如果存在分布式任务,会只执行该定时人物
enableImmediatelyRun = false # 是否立刻执行,默认否
enableWithSeconds = false # 是否使用秒作解析器,默认否
delayExecType = "skip" # skip,queue,concurrent,如果上一个任务执行较慢,到达了新任务执行时间,那么新任务选择跳过,排队,并发执行的策略,新任务默认选择skip策略
6 changes: 3 additions & 3 deletions task/ecron/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func newComponent(name string, config *Config, logger *elog.Component) *Componen

// Schedule ...
func (c *Component) Schedule(schedule Schedule, job NamedJob) EntryID {
if c.config.ImmediatelyRun {
if c.config.EnableImmediatelyRun {
schedule = &immediatelyScheduler{
Schedule: schedule,
}
Expand Down Expand Up @@ -132,7 +132,7 @@ func (c *Component) AddFunc(spec string, cmd func() error) (EntryID, error) {

// Start ...
func (c *Component) Start() error {
if c.config.DistributedTask {
if c.config.EnableDistributedTask {
// 如果分布式的定时任务,那么就需要抢占锁
go func() {
var err error
Expand Down Expand Up @@ -172,7 +172,7 @@ func (c *Component) Start() error {
// Stop ...
func (c *Component) Stop() error {
_ = c.Cron.Stop()
if c.config.DistributedTask {
if c.config.EnableDistributedTask {
ctx, cancel := context.WithTimeout(context.Background(), c.config.WaitUnlockTime)
defer cancel()
err := c.config.locker.Unlock(ctx, c.lockerName())
Expand Down
50 changes: 23 additions & 27 deletions task/ecron/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,38 +15,34 @@ import (

// Config ...
type Config struct {
WithSeconds bool // 是否使用秒作解析器,默认否
ConcurrentDelay int // 并发延迟,默认是执行超过定时时间后,下次执行的任务会跳过
ImmediatelyRun bool // 是否立刻执行,默认否
DistributedTask bool // 是否分布式任务,默认否,如果存在分布式任务,则会解析嵌入的etcd配置
WaitLockTime time.Duration // 抢锁等待时间,默认60s
LockTTL time.Duration // 租期,默认60s
LockDir string // 定时任务锁目录
RefreshTTL time.Duration // 刷新ttl,默认60s
WaitUnlockTime time.Duration // 抢锁等待时间,默认1s
Endpoints []string // etcd地址
ConnectTimeout time.Duration // 连接超时时间,默认5s
Secure bool // 是否安全通信,默认false
AutoSyncInterval time.Duration // 自动同步member list的间隔
wrappers []JobWrapper
parser cron.Parser
locker Locker
WaitLockTime time.Duration // 抢锁等待时间,默认60s
LockTTL time.Duration // 租期,默认60s
LockDir string // 定时任务锁目录
RefreshTTL time.Duration // 刷新ttl,默认60s
WaitUnlockTime time.Duration // 抢锁等待时间,默认1s
DelayExecType string // skip,queue,concurrent,如果上一个任务执行较慢,到达了新任务执行时间,那么新任务选择跳过,排队,并发执行的策略,新任务默认选择skip策略
EnableDistributedTask bool // 是否分布式任务,默认否,如果存在分布式任务,会只执行该定时人物
EnableImmediatelyRun bool // 是否立刻执行,默认否
EnableWithSeconds bool // 是否使用秒作解析器,默认否
wrappers []JobWrapper
parser cron.Parser
locker Locker
}

// DefaultConfig ...
func DefaultConfig() *Config {
return &Config{
WithSeconds: false,
ConcurrentDelay: -1, // skip
ImmediatelyRun: false,
DistributedTask: false,
WaitLockTime: xtime.Duration("60s"),
LockTTL: xtime.Duration("60s"),
RefreshTTL: xtime.Duration("50s"),
WaitUnlockTime: xtime.Duration("1s"),
LockDir: "/ecron/lock/%s/%s",
wrappers: []JobWrapper{},
parser: cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor),
EnableWithSeconds: false,
EnableImmediatelyRun: false,
EnableDistributedTask: false,
DelayExecType: "skip",
WaitLockTime: xtime.Duration("60s"),
LockTTL: xtime.Duration("60s"),
RefreshTTL: xtime.Duration("50s"),
WaitUnlockTime: xtime.Duration("1s"),
LockDir: "/ecron/lock/%s/%s",
wrappers: []JobWrapper{},
parser: cron.NewParser(cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor),
}
}

Expand Down
16 changes: 9 additions & 7 deletions task/ecron/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,21 @@ func (c *Container) Build(options ...Option) *Component {
option(c)
}

if c.config.WithSeconds {
if c.config.EnableWithSeconds {
c.config.parser = cron.NewParser(cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor)
}

if c.config.ConcurrentDelay > 0 { // 延迟
c.config.wrappers = append(c.config.wrappers, delayIfStillRunning(c.logger))
} else if c.config.ConcurrentDelay < 0 { // 跳过
switch c.config.DelayExecType {
case "skip":
c.config.wrappers = append(c.config.wrappers, skipIfStillRunning(c.logger))
case "queue":
c.config.wrappers = append(c.config.wrappers, queueIfStillRunning(c.logger))
case "concurrent":
default:
c.config.wrappers = append(c.config.wrappers, skipIfStillRunning(c.logger))
} else {
// 默认不延迟也不跳过
}

if c.config.DistributedTask && c.config.locker == nil {
if c.config.EnableDistributedTask && c.config.locker == nil {
c.logger.Panic("client locker nil", elog.FieldKey("use WithLocker method"))
}

Expand Down
6 changes: 3 additions & 3 deletions task/ecron/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,18 @@ import (
"github.com/robfig/cron/v3"
)

// delayIfStillRunning serializes jobs, delaying subsequent runs until the
// queueIfStillRunning serializes jobs, delaying subsequent runs until the
// previous one is complete. Jobs running after a delay of more than a minute
// have the delay logged at Info.
func delayIfStillRunning(logger *elog.Component) JobWrapper {
func queueIfStillRunning(logger *elog.Component) JobWrapper {
return func(j Job) Job {
var mu sync.Mutex
return cron.FuncJob(func() {
start := time.Now()
mu.Lock()
defer mu.Unlock()
if dur := time.Since(start); dur > time.Minute {
logger.Info("cron delay", elog.String("duration", dur.String()))
logger.Info("cron queue", elog.String("duration", dur.String()))
}
j.Run()
})
Expand Down

0 comments on commit c6980d9

Please sign in to comment.