Skip to content

Commit

Permalink
bug fixes (#1343)
Browse files Browse the repository at this point in the history
* chore: logging level changes in sink

Signed-off-by: Jiyong Huang <huangjy@emqx.io>

* fix(conf): be compatible with the previous kuiper.yaml

Signed-off-by: Jiyong Huang <huangjy@emqx.io>

* chore: fix comment error

Signed-off-by: Jiyong Huang <huangjy@emqx.io>
  • Loading branch information
ngjaying authored Jul 28, 2022
1 parent 620db5a commit 27fe029
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 9 deletions.
4 changes: 2 additions & 2 deletions etc/kuiper.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ rule:
sendError: true

sink:
# Control to disable cache or not. If it's set to true, then the cache will be disabled, otherwise, it will be enabled.
# Control to enable cache or not. If it's set to true, then the cache will be enabled, otherwise, it will be disabled.
enableCache: false

# The maximum number of messages to be cached in memory.
Expand All @@ -52,7 +52,7 @@ sink:
# The maximum number of messages to be cached in the disk.
maxDiskCache: 1024000

# The number of messages for a buffer page which is the unit to read/write to disk batchly to prevent frequent IO
# The number of messages for a buffer page which is the unit to read/write to disk in batch to prevent frequent IO
bufferPageSize: 256

# The interval in millisecond to resend the cached messages
Expand Down
4 changes: 2 additions & 2 deletions internal/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ func (sc SinkConf) Validate() error {
Log.Warnf("maxDiskCache is less than 0, set to 1024000")
e["maxDiskCache"] = fmt.Errorf("maxDiskCache must be positive")
}
if sc.BufferPageSize < 0 {
if sc.BufferPageSize <= 0 {
sc.BufferPageSize = 256
Log.Warnf("bufferPageSize is less than 0, set to 256")
Log.Warnf("bufferPageSize is less than or equal to 0, set to 256")
e["bufferPageSize"] = fmt.Errorf("bufferPageSize must be positive")
}
if sc.ResendInterval < 0 {
Expand Down
6 changes: 3 additions & 3 deletions internal/topo/node/cache/sync_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (c *SyncCache) addCache(ctx api.StreamContext, item []map[string]interface{
ctx.GetLogger().Debugf("added cache to mem cache %v", c.memCache)
}
c.cacheLength++
ctx.GetLogger().Infof("added cache %d", c.cacheLength)
ctx.GetLogger().Debugf("added cache %d", c.cacheLength)
}

// deleteCache not thread safe!
Expand All @@ -277,12 +277,12 @@ func (c *SyncCache) deleteCache(ctx api.StreamContext) {
c.diskBufferPage = nil
}
}
ctx.GetLogger().Infof("deleted cache. cacheLength: %d, diskSize: %d, memCache: %v", c.cacheLength, c.diskSize, c.memCache)
ctx.GetLogger().Debugf("deleted cache. cacheLength: %d, diskSize: %d, memCache: %v", c.cacheLength, c.diskSize, c.memCache)
}

func (c *SyncCache) loadFromDisk(ctx api.StreamContext) {
// load page from the disk
ctx.GetLogger().Infof("loading from disk %d. cacheLength: %d, diskSize: %d", c.diskPageTail, c.cacheLength, c.diskSize)
ctx.GetLogger().Debugf("loading from disk %d. cacheLength: %d, diskSize: %d", c.diskPageTail, c.cacheLength, c.diskSize)
hotPage := newPage(c.cacheConf.BufferPageSize)
ok, err := c.store.Get(strconv.Itoa(c.diskPageHead), hotPage)
if err != nil {
Expand Down
10 changes: 8 additions & 2 deletions internal/topo/node/sink_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,15 +165,21 @@ func (m *SinkNode) Open(ctx api.StreamContext, result chan<- error) {
if sconf.RunAsync {
go func() {
p := infra.SafeRun(func() error {
_ = doCollect(ctx, sink, data, stats, sconf)
err := doCollect(ctx, sink, data, stats, sconf)
if err != nil {
logger.Warnf("sink collect error: %v", err)
}
return nil
})
if p != nil {
infra.DrainError(ctx, p, result)
}
}()
} else {
_ = doCollect(ctx, sink, data, stats, sconf)
err := doCollect(ctx, sink, data, stats, sconf)
if err != nil {
logger.Warnf("sink collect error: %v", err)
}
}
case <-ctx.Done():
logger.Infof("sink node %s instance %d done", m.name, instance)
Expand Down

0 comments on commit 27fe029

Please sign in to comment.