Skip to content

Commit

Permalink
run building files in background after node startup (#13682)
Browse files Browse the repository at this point in the history
Closes #13360.
  • Loading branch information
awskii authored Feb 10, 2025
1 parent 0e25f80 commit 325afd7
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 20 deletions.
30 changes: 11 additions & 19 deletions erigon-lib/state/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,11 +667,7 @@ func (a *Aggregator) BuildFiles2(ctx context.Context, fromStep, toStep uint64) e
}
}

if ok := a.mergingFiles.CompareAndSwap(false, true); !ok {
return
}
go func() {
defer a.mergingFiles.Store(false)
if err := a.MergeLoop(ctx); err != nil {
panic(err)
}
Expand Down Expand Up @@ -724,7 +720,16 @@ func (a *Aggregator) mergeLoopStep(ctx context.Context, toTxNum uint64) (somethi
return true, nil
}

// TODO: merge must have own semphore
func (a *Aggregator) MergeLoop(ctx context.Context) error {
if dbg.NoMerge() || !a.mergingFiles.CompareAndSwap(false, true) {
return nil // currently merging or merge is prohibited
}

a.wg.Add(1)
defer a.wg.Done()
defer a.mergingFiles.Store(false)

for {
somethingMerged, err := a.mergeLoopStep(ctx, a.visibleFilesMinimaxTxNum.Load())
if err != nil {
Expand Down Expand Up @@ -1489,6 +1494,7 @@ func (a *Aggregator) BuildFilesInBackground(txNum uint64) chan struct{} {
//we are inside own goroutine - it's fine to block here
if err := a.snapshotBuildSema.Acquire(a.ctx, 1); err != nil {
a.logger.Warn("[snapshots] buildFilesInBackground", "err", err)
close(fin)
return //nolint
}
defer a.snapshotBuildSema.Release(1)
Expand Down Expand Up @@ -1523,23 +1529,9 @@ func (a *Aggregator) BuildFilesInBackground(txNum uint64) chan struct{} {
break
}
}

if dbg.NoMerge() {
close(fin)
return
}
if ok := a.mergingFiles.CompareAndSwap(false, true); !ok {
close(fin)
return
}
a.wg.Add(1)
go func() {
defer a.wg.Done()
defer a.mergingFiles.Store(false)

//TODO: merge must have own semphore
defer close(fin)

defer func() { close(fin) }()
if err := a.MergeLoop(a.ctx); err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, common2.ErrStopped) {
return
Expand Down
7 changes: 6 additions & 1 deletion eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,6 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
if err != nil {
return nil, err
}

backend.blockSnapshots, backend.blockReader, backend.blockWriter = allSnapshots, blockReader, blockWriter

backend.chainDB, err = temporal.New(rawChainDB, agg)
Expand Down Expand Up @@ -1086,6 +1085,12 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
}
}

go func() {
if err := agg.MergeLoop(ctx); err != nil {
logger.Error("snapashot merge loop error", "err", err)
}
}()

return backend, nil
}

Expand Down

0 comments on commit 325afd7

Please sign in to comment.