diff --git a/erigon-lib/state/aggregator.go b/erigon-lib/state/aggregator.go index 740456dd0f2..1b9cfeeccc3 100644 --- a/erigon-lib/state/aggregator.go +++ b/erigon-lib/state/aggregator.go @@ -692,7 +692,7 @@ func (a *Aggregator) mergeLoopStep(ctx context.Context, toTxNum uint64) (somethi return false, nil } - outs, err := aggTx.staticFilesInRange(r) + outs, err := aggTx.StaticFilesInRange(r) defer func() { if closeAll { outs.Close() @@ -1244,6 +1244,10 @@ type RangesV3 struct { invertedIndex []*MergeRange } +func NewRangesV3(domain [kv.DomainLen]DomainRanges, invertedIndex []*MergeRange) RangesV3 { + return RangesV3{domain: domain, invertedIndex: invertedIndex} +} + func (r RangesV3) String() string { ss := []string{} for _, d := range &r.domain { diff --git a/erigon-lib/state/aggregator_files.go b/erigon-lib/state/aggregator_files.go index b129d87cac1..c086d35d604 100644 --- a/erigon-lib/state/aggregator_files.go +++ b/erigon-lib/state/aggregator_files.go @@ -17,6 +17,7 @@ package state import ( + "github.com/erigontech/erigon-lib/common" "github.com/erigontech/erigon-lib/kv" ) @@ -27,7 +28,23 @@ type SelectedStaticFilesV3 struct { ii [][]*filesItem } -func (sf SelectedStaticFilesV3) Close() { +func (sf *SelectedStaticFilesV3) DomainFiles(name kv.Domain) []FilesItem { + return common.SliceMap(sf.d[name], func(item *filesItem) FilesItem { return item }) +} + +func (sf *SelectedStaticFilesV3) DomainHistoryFiles(name kv.Domain) []FilesItem { + return common.SliceMap(sf.dHist[name], func(item *filesItem) FilesItem { return item }) +} + +func (sf *SelectedStaticFilesV3) DomainInvertedIndexFiles(name kv.Domain) []FilesItem { + return common.SliceMap(sf.dIdx[name], func(item *filesItem) FilesItem { return item }) +} + +func (sf *SelectedStaticFilesV3) InvertedIndexFiles(id int) []FilesItem { + return common.SliceMap(sf.ii[id], func(item *filesItem) FilesItem { return item }) +} + +func (sf *SelectedStaticFilesV3) Close() { clist := make([][]*filesItem, 0, int(kv.DomainLen)+len(sf.ii)) for id := range sf.d { clist = append(clist, sf.d[id], sf.dIdx[id], sf.dHist[id]) @@ -48,7 +65,7 @@ func (sf SelectedStaticFilesV3) Close() { } } -func (ac *AggregatorRoTx) staticFilesInRange(r *RangesV3) (*SelectedStaticFilesV3, error) { +func (ac *AggregatorRoTx) StaticFilesInRange(r *RangesV3) (*SelectedStaticFilesV3, error) { sf := &SelectedStaticFilesV3{ii: make([][]*filesItem, len(r.invertedIndex))} for id := range ac.d { if !r.domain[id].any() { @@ -65,6 +82,14 @@ func (ac *AggregatorRoTx) staticFilesInRange(r *RangesV3) (*SelectedStaticFilesV return sf, nil } +func (ac *AggregatorRoTx) InvertedIndicesLen() int { + return len(ac.iis) +} + +func (ac *AggregatorRoTx) InvertedIndexName(id int) kv.InvertedIdx { + return ac.iis[id].name +} + type MergedFilesV3 struct { d [kv.DomainLen]*filesItem dHist [kv.DomainLen]*filesItem diff --git a/erigon-lib/state/btree_index.go b/erigon-lib/state/btree_index.go index 72906690bfa..4e359af10e6 100644 --- a/erigon-lib/state/btree_index.go +++ b/erigon-lib/state/btree_index.go @@ -30,6 +30,7 @@ import ( "strings" "sync" "time" + "unsafe" "github.com/c2h5oh/datasize" "github.com/edsrzf/mmap-go" @@ -994,6 +995,10 @@ func (b *BtIndex) newCursor(k, v []byte, d uint64, g *seg.Reader) *Cursor { return c } +func (b *BtIndex) DataHandle() unsafe.Pointer { + return unsafe.Pointer(&b.data[0]) +} + func (b *BtIndex) Size() int64 { return b.size } func (b *BtIndex) ModTime() time.Time { return b.modTime } diff --git a/erigon-lib/state/files_item.go b/erigon-lib/state/files_item.go index 093836f609b..0c81513781a 100644 --- a/erigon-lib/state/files_item.go +++ b/erigon-lib/state/files_item.go @@ -62,6 +62,15 @@ type filesItem struct { canDelete atomic.Bool } +type FilesItem interface { + Segment() *seg.Decompressor + AccessorIndex() *recsplit.Index + BtIndex() *BtIndex + ExistenceFilter() *ExistenceFilter +} + +var _ FilesItem = (*filesItem)(nil) + func newFilesItem(startTxNum, endTxNum, stepSize uint64) *filesItem { startStep := startTxNum / stepSize endStep := endTxNum / stepSize @@ -69,6 +78,14 @@ func newFilesItem(startTxNum, endTxNum, stepSize uint64) *filesItem { return &filesItem{startTxNum: startTxNum, endTxNum: endTxNum, frozen: frozen} } +func (i *filesItem) Segment() *seg.Decompressor { return i.decompressor } + +func (i *filesItem) AccessorIndex() *recsplit.Index { return i.index } + +func (i *filesItem) BtIndex() *BtIndex { return i.bindex } + +func (i *filesItem) ExistenceFilter() *ExistenceFilter { return i.existence } + // isSubsetOf - when `j` covers `i` but not equal `i` func (i *filesItem) isSubsetOf(j *filesItem) bool { return (j.startTxNum <= i.startTxNum && i.endTxNum <= j.endTxNum) && (j.startTxNum != i.startTxNum || i.endTxNum != j.endTxNum) diff --git a/erigon-lib/state/inverted_index.go b/erigon-lib/state/inverted_index.go index 6310755f77c..592f1665726 100644 --- a/erigon-lib/state/inverted_index.go +++ b/erigon-lib/state/inverted_index.go @@ -506,6 +506,10 @@ type MergeRange struct { to uint64 } +func NewMergeRange(name string, needMerge bool, from uint64, to uint64) *MergeRange { + return &MergeRange{name: name, needMerge: needMerge, from: from, to: to} +} + func (mr *MergeRange) FromTo() (uint64, uint64) { return mr.from, mr.to } diff --git a/erigon-lib/state/merge.go b/erigon-lib/state/merge.go index 6d5c0f06f8b..e73564c4521 100644 --- a/erigon-lib/state/merge.go +++ b/erigon-lib/state/merge.go @@ -81,6 +81,10 @@ type DomainRanges struct { aggStep uint64 } +func NewDomainRanges(name kv.Domain, values MergeRange, history HistoryRanges, aggStep uint64) DomainRanges { + return DomainRanges{name: name, values: values, history: history, aggStep: aggStep} +} + func (r DomainRanges) String() string { var b strings.Builder if r.values.needMerge { @@ -219,6 +223,10 @@ type HistoryRanges struct { index MergeRange } +func NewHistoryRanges(history MergeRange, index MergeRange) HistoryRanges { + return HistoryRanges{history: history, index: index} +} + func (r HistoryRanges) String(aggStep uint64) string { var str string if r.history.needMerge { diff --git a/erigon-lib/state/squeeze.go b/erigon-lib/state/squeeze.go index 73b11e34f63..69500751115 100644 --- a/erigon-lib/state/squeeze.go +++ b/erigon-lib/state/squeeze.go @@ -13,6 +13,7 @@ import ( "time" "github.com/c2h5oh/datasize" + "github.com/erigontech/erigon-lib/common" "github.com/erigontech/erigon-lib/common/datadir" "github.com/erigontech/erigon-lib/common/dir" @@ -132,7 +133,7 @@ func (ac *AggregatorRoTx) SqueezeCommitmentFiles() error { }, }, } - sf, err := ac.staticFilesInRange(rng) + sf, err := ac.StaticFilesInRange(rng) if err != nil { return err } @@ -331,7 +332,7 @@ func (a *Aggregator) RebuildCommitmentFiles(ctx context.Context, rwDb kv.RwDB, t }, }, } - sf, err := acRo.staticFilesInRange(rng) + sf, err := acRo.StaticFilesInRange(rng) if err != nil { return nil, err } diff --git a/eth/stagedsync/stage_snapshots.go b/eth/stagedsync/stage_snapshots.go index f6c91ce99b5..26175a45283 100644 --- a/eth/stagedsync/stage_snapshots.go +++ b/eth/stagedsync/stage_snapshots.go @@ -300,12 +300,6 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R return err } - if cfg.silkworm != nil { - if err := cfg.blockReader.Snapshots().(silkworm.CanAddSnapshotsToSilkwarm).AddSnapshotsToSilkworm(cfg.silkworm); err != nil { - return err - } - } - indexWorkers := estimate.IndexSnapshot.Workers() diagnostics.Send(diagnostics.CurrentSyncSubStage{SubStage: "E3 Indexing"}) if err := agg.BuildMissedIndices(ctx, indexWorkers); err != nil { @@ -321,6 +315,18 @@ func DownloadAndIndexSnapshotsIfNeed(s *StageState, ctx context.Context, tx kv.R cfg.notifier.Events.OnNewSnapshot() } + if cfg.silkworm != nil { + repository := silkworm.NewSnapshotsRepository( + cfg.silkworm, + cfg.blockReader.Snapshots().(*freezeblocks.RoSnapshots), + agg, + logger, + ) + if err := repository.Update(); err != nil { + return err + } + } + frozenBlocks := cfg.blockReader.FrozenBlocks() if s.BlockNumber < frozenBlocks { // allow genesis if err := s.Update(tx, frozenBlocks); err != nil { diff --git a/go.mod b/go.mod index 7ff9ca2d42b..96d126ca531 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/erigontech/erigonwatch v0.0.0-20240718131902-b6576bde1116 github.com/erigontech/mdbx-go v0.38.6-0.20250205222432-e4dd01978d7f github.com/erigontech/secp256k1 v1.1.0 - github.com/erigontech/silkworm-go v0.18.0 + github.com/erigontech/silkworm-go v0.24.0 ) require ( diff --git a/go.sum b/go.sum index 39dbe483f81..93519c3131e 100644 --- a/go.sum +++ b/go.sum @@ -275,8 +275,8 @@ github.com/erigontech/mdbx-go v0.38.6-0.20250205222432-e4dd01978d7f h1:2uZv1SGd7 github.com/erigontech/mdbx-go v0.38.6-0.20250205222432-e4dd01978d7f/go.mod h1:lkqHAZqXtFaIPlvTaGAx3VUDuGYZcuhve1l4JVVN1Z0= github.com/erigontech/secp256k1 v1.1.0 h1:mO3YJMUSoASE15Ya//SoHiisptUhdXExuMUN1M0X9qY= github.com/erigontech/secp256k1 v1.1.0/go.mod h1:GokhPepsMB+EYDs7I5JZCprxHW6+yfOcJKaKtoZ+Fls= -github.com/erigontech/silkworm-go v0.18.0 h1:j56p61xZHBFhZGH1OixlGU8KcfjHzcw9pjAfjmVsOZA= -github.com/erigontech/silkworm-go v0.18.0/go.mod h1:O50ux0apICEVEGyRWiE488K8qz8lc3PA/SXbQQAc8SU= +github.com/erigontech/silkworm-go v0.24.0 h1:fFe74CjQM5LI7ouMYjmqfFaqIFzQTpMrt+ls+a5PxpE= +github.com/erigontech/silkworm-go v0.24.0/go.mod h1:O50ux0apICEVEGyRWiE488K8qz8lc3PA/SXbQQAc8SU= github.com/erigontech/speedtest v0.0.2 h1:W9Cvky/8AMUtUONwkLA/dZjeQ2XfkBdYfJzvhMZUO+U= github.com/erigontech/speedtest v0.0.2/go.mod h1:vulsRNiM51BmSTbVtch4FWxKxx53pS2D35lZTtao0bw= github.com/erigontech/torrent v1.54.3-alpha-1 h1:oyT9YpMr82g566v0STVKW0ZTdX/eun03cW2mKmKuTAQ= diff --git a/turbo/silkworm/silkworm.go b/turbo/silkworm/silkworm.go index 385da6ae36f..85427047bbe 100644 --- a/turbo/silkworm/silkworm.go +++ b/turbo/silkworm/silkworm.go @@ -33,16 +33,18 @@ type SilkwormLogLevel = silkworm_go.SilkwormLogLevel type SentrySettings = silkworm_go.SentrySettings type RpcDaemonSettings = silkworm_go.RpcDaemonSettings type RpcInterfaceLogSettings = silkworm_go.RpcInterfaceLogSettings -type MappedHeaderSnapshot = silkworm_go.MappedHeaderSnapshot -type MappedBodySnapshot = silkworm_go.MappedBodySnapshot -type MappedTxnSnapshot = silkworm_go.MappedTxnSnapshot -type MappedChainSnapshot = silkworm_go.MappedChainSnapshot - -var NewMemoryMappedRegion = silkworm_go.NewMemoryMappedRegion -var NewMappedHeaderSnapshot = silkworm_go.NewMappedHeaderSnapshot -var NewMappedBodySnapshot = silkworm_go.NewMappedBodySnapshot -var NewMappedTxnSnapshot = silkworm_go.NewMappedTxnSnapshot +type HeadersSnapshot = silkworm_go.HeadersSnapshot +type BodiesSnapshot = silkworm_go.BodiesSnapshot +type TransactionsSnapshot = silkworm_go.TransactionsSnapshot +type BlocksSnapshotBundle = silkworm_go.BlocksSnapshotBundle +type InvertedIndexSnapshot = silkworm_go.InvertedIndexSnapshot +type HistorySnapshot = silkworm_go.HistorySnapshot +type DomainSnapshot = silkworm_go.DomainSnapshot +type StateSnapshotBundleLatest = silkworm_go.StateSnapshotBundleLatest +type StateSnapshotBundleHistorical = silkworm_go.StateSnapshotBundleHistorical + +var NewFilePath = silkworm_go.NewFilePath var ErrInterrupted = silkworm_go.ErrInterrupted func New(dataDirPath string, libMdbxVersion string, numIOContexts uint32, logLevel log.Lvl) (*Silkworm, error) { @@ -125,7 +127,3 @@ func ExecuteBlocksPerpetual(s *Silkworm, db kv.RwDB, chainID *big.Int, startBloc } return lastExecutedBlock, err } - -type CanAddSnapshotsToSilkwarm interface { - AddSnapshotsToSilkworm(*Silkworm) error -} diff --git a/turbo/silkworm/snapshots_repository.go b/turbo/silkworm/snapshots_repository.go new file mode 100644 index 00000000000..9c06850ea0e --- /dev/null +++ b/turbo/silkworm/snapshots_repository.go @@ -0,0 +1,267 @@ +package silkworm + +import ( + "errors" + "math" + "time" + "unsafe" + + silkworm_go "github.com/erigontech/silkworm-go" + + "github.com/erigontech/erigon-lib/kv" + "github.com/erigontech/erigon-lib/log/v3" + "github.com/erigontech/erigon-lib/recsplit" + "github.com/erigontech/erigon-lib/seg" + "github.com/erigontech/erigon-lib/state" + coresnaptype "github.com/erigontech/erigon/core/snaptype" + "github.com/erigontech/erigon/turbo/snapshotsync/freezeblocks" +) + +type SnapshotsRepository struct { + silkworm *Silkworm + + blockSnapshots *freezeblocks.RoSnapshots + stateSnapshots *state.Aggregator + + logger log.Logger +} + +func NewSnapshotsRepository( + silkworm *Silkworm, + blockSnapshots *freezeblocks.RoSnapshots, + stateSnapshots *state.Aggregator, + logger log.Logger, +) *SnapshotsRepository { + return &SnapshotsRepository{ + silkworm, + blockSnapshots, + stateSnapshots, + logger, + } +} + +type MemoryMappedFile interface { + FilePath() string + DataHandle() unsafe.Pointer + Size() int64 +} + +var _ MemoryMappedFile = (*seg.Decompressor)(nil) +var _ MemoryMappedFile = (*recsplit.Index)(nil) +var _ MemoryMappedFile = (*state.BtIndex)(nil) + +func memoryMappedFile(file MemoryMappedFile) silkworm_go.MemoryMappedFile { + return silkworm_go.MemoryMappedFile{ + FilePath: NewFilePath(file.FilePath()), + DataHandle: file.DataHandle(), + Size: file.Size(), + } +} + +func (r *SnapshotsRepository) Update() error { + startTime := time.Now() + r.logger.Debug("[silkworm] snapshots updating...") + + blocksView := r.blockSnapshots.View() + defer blocksView.Close() + err := r.updateBlocks(blocksView) + if err != nil { + return err + } + + stateTx := r.stateSnapshots.BeginFilesRo() + defer stateTx.Close() + err = r.updateState(stateTx) + + if err == nil { + duration := time.Since(startTime) + if duration > 10*time.Second { + r.logger.Info("[silkworm] snapshots updated", "duration", duration) + } else { + r.logger.Debug("[silkworm] snapshots updated", "duration", duration) + } + } + return err +} + +func (r *SnapshotsRepository) updateBlocks(view *freezeblocks.View) error { + segmentsHeaders := view.Headers() + segmentsBodies := view.Bodies() + segmentsTransactions := view.Txs() + + count := len(segmentsHeaders) + if (len(segmentsBodies) != count) || (len(segmentsTransactions) != count) { + return errors.New("silkworm.SnapshotsRepository.updateBlocks: the number of headers/bodies/transactions segments must be the same") + } + + startTime := time.Now() + for i := 0; i < count; i++ { + r.logger.Trace("[silkworm] snapshots updating blocks", "i", i, "count", count) + segmentHeaders := segmentsHeaders[i].Src() + segmentBodies := segmentsBodies[i].Src() + segmentTransactions := segmentsTransactions[i].Src() + + err := r.silkworm.AddBlocksSnapshotBundle(BlocksSnapshotBundle{ + Headers: HeadersSnapshot{ + Segment: memoryMappedFile(segmentHeaders), + HeaderHashIndex: memoryMappedFile(segmentHeaders.Index()), + }, + Bodies: BodiesSnapshot{ + Segment: memoryMappedFile(segmentBodies), + BlockNumIndex: memoryMappedFile(segmentBodies.Index()), + }, + Transactions: TransactionsSnapshot{ + Segment: memoryMappedFile(segmentTransactions), + TxnHashIndex: memoryMappedFile(segmentTransactions.Index(coresnaptype.Indexes.TxnHash)), + TxnHash2BlockIndex: memoryMappedFile(segmentTransactions.Index(coresnaptype.Indexes.TxnHash2BlockNum)), + }, + }) + if err != nil { + return err + } + } + r.logger.Debug("[silkworm] snapshots updated blocks", "count", count, "duration", time.Since(startTime)) + + return nil +} + +func makeInvertedIndexSnapshot(item state.FilesItem) InvertedIndexSnapshot { + return InvertedIndexSnapshot{ + Segment: memoryMappedFile(item.Segment()), + AccessorIndex: memoryMappedFile(item.AccessorIndex()), + } +} + +func makeHistorySnapshot(historyItem state.FilesItem, iiItem state.FilesItem) HistorySnapshot { + return HistorySnapshot{ + Segment: memoryMappedFile(historyItem.Segment()), + AccessorIndex: memoryMappedFile(historyItem.AccessorIndex()), + InvertedIndex: makeInvertedIndexSnapshot(iiItem), + } +} + +func makeDomainSnapshot(item state.FilesItem) DomainSnapshot { + var accessorIndexOpt *silkworm_go.MemoryMappedFile + if item.AccessorIndex() != nil { + accessorIndex := memoryMappedFile(item.AccessorIndex()) + accessorIndexOpt = &accessorIndex + } + return DomainSnapshot{ + Segment: memoryMappedFile(item.Segment()), + ExistenceIndex: silkworm_go.MemoryMappedFile{ + FilePath: NewFilePath(item.ExistenceFilter().FilePath), + DataHandle: nil, + Size: 0, + }, + BTreeIndex: memoryMappedFile(item.BtIndex()), + AccessorIndex: accessorIndexOpt, + } +} + +func (r *SnapshotsRepository) updateState(stateTx *state.AggregatorRoTx) error { + mergeRange := state.NewMergeRange("", true, 0, math.MaxUint64) + domainRanges := func(name kv.Domain) state.DomainRanges { + return state.NewDomainRanges( + name, + *mergeRange, + state.NewHistoryRanges(*mergeRange, *mergeRange), + 0, + ) + } + var allDomainRanges [kv.DomainLen]state.DomainRanges + for i := 0; i < len(allDomainRanges); i++ { + allDomainRanges[i] = domainRanges(kv.Domain(i)) + } + iiRanges := make([]*state.MergeRange, stateTx.InvertedIndicesLen()) + for i := 0; i < len(iiRanges); i++ { + iiRanges[i] = mergeRange + } + ranges := state.NewRangesV3(allDomainRanges, iiRanges) + + allFiles, err := stateTx.StaticFilesInRange(&ranges) + if err != nil { + return err + } + + iiNames := make(map[kv.InvertedIdx]int, len(iiRanges)) + for i := 0; i < len(iiRanges); i++ { + iiNames[stateTx.InvertedIndexName(i)] = i + } + + iiFilesLogAddresses := allFiles.InvertedIndexFiles(iiNames[kv.LogAddrIdx]) + iiFilesLogTopics := allFiles.InvertedIndexFiles(iiNames[kv.LogTopicIdx]) + iiFilesTracesFrom := allFiles.InvertedIndexFiles(iiNames[kv.TracesFromIdx]) + iiFilesTracesTo := allFiles.InvertedIndexFiles(iiNames[kv.TracesToIdx]) + + historyFilesAccounts := allFiles.DomainHistoryFiles(kv.AccountsDomain) + historyFilesStorage := allFiles.DomainHistoryFiles(kv.StorageDomain) + historyFilesCode := allFiles.DomainHistoryFiles(kv.CodeDomain) + historyFilesReceipts := allFiles.DomainHistoryFiles(kv.ReceiptDomain) + + historyIIFilesAccounts := allFiles.DomainInvertedIndexFiles(kv.AccountsDomain) + historyIIFilesStorage := allFiles.DomainInvertedIndexFiles(kv.StorageDomain) + historyIIFilesCode := allFiles.DomainInvertedIndexFiles(kv.CodeDomain) + historyIIFilesReceipts := allFiles.DomainInvertedIndexFiles(kv.ReceiptDomain) + + domainFilesAccounts := allFiles.DomainFiles(kv.AccountsDomain) + domainFilesStorage := allFiles.DomainFiles(kv.StorageDomain) + domainFilesCode := allFiles.DomainFiles(kv.CodeDomain) + // TODO: enable after fixing .kvi configuration + // domainFilesCommitment := allFiles.DomainFiles(kv.CommitmentDomain) + domainFilesReceipts := allFiles.DomainFiles(kv.ReceiptDomain) + + countHistorical := len(iiFilesLogAddresses) + if (len(iiFilesLogTopics) != countHistorical) || (len(iiFilesTracesFrom) != countHistorical) || (len(iiFilesTracesTo) != countHistorical) || + (len(historyFilesAccounts) != countHistorical) || (len(historyFilesStorage) != countHistorical) || (len(historyFilesCode) != countHistorical) || (len(historyFilesReceipts) != countHistorical) || + (len(historyIIFilesAccounts) != countHistorical) || (len(historyIIFilesStorage) != countHistorical) || (len(historyIIFilesCode) != countHistorical) || (len(historyIIFilesReceipts) != countHistorical) { + return errors.New("silkworm.SnapshotsRepository.updateState: the number of historical files must be the same") + } + + startTimeHistorical := time.Now() + for i := 0; i < countHistorical; i++ { + r.logger.Trace("[silkworm] snapshots updating historical", "i", i, "count", countHistorical) + err := r.silkworm.AddStateSnapshotBundleHistorical(StateSnapshotBundleHistorical{ + Accounts: makeHistorySnapshot(historyFilesAccounts[i], historyIIFilesAccounts[i]), + Storage: makeHistorySnapshot(historyFilesStorage[i], historyIIFilesStorage[i]), + Code: makeHistorySnapshot(historyFilesCode[i], historyIIFilesCode[i]), + Receipts: makeHistorySnapshot(historyFilesReceipts[i], historyIIFilesReceipts[i]), + + LogAddresses: makeInvertedIndexSnapshot(iiFilesLogAddresses[i]), + LogTopics: makeInvertedIndexSnapshot(iiFilesLogTopics[i]), + TracesFrom: makeInvertedIndexSnapshot(iiFilesTracesFrom[i]), + TracesTo: makeInvertedIndexSnapshot(iiFilesTracesTo[i]), + }) + if err != nil { + return err + } + } + r.logger.Debug("[silkworm] snapshots updated historical", "count", countHistorical, "duration", time.Since(startTimeHistorical)) + + countLatest := len(domainFilesAccounts) + if (len(domainFilesStorage) != countLatest) || (len(domainFilesCode) != countLatest) || + /* TODO: enable after fixing .kvi configuration */ + /* (len(domainFilesCommitment) != countLatest) || */ + (len(domainFilesReceipts) != countLatest) { + return errors.New("silkworm.SnapshotsRepository.updateState: the number of latest files must be the same") + } + + startTimeLatest := time.Now() + for i := 0; i < countLatest; i++ { + r.logger.Trace("[silkworm] snapshots updating latest", "i", i, "count", countLatest) + err := r.silkworm.AddStateSnapshotBundleLatest(StateSnapshotBundleLatest{ + Accounts: makeDomainSnapshot(domainFilesAccounts[i]), + Storage: makeDomainSnapshot(domainFilesStorage[i]), + Code: makeDomainSnapshot(domainFilesCode[i]), + /* TODO: enable after fixing .kvi configuration */ + /* Commitment: makeDomainSnapshot(domainFilesCommitment[i]), */ + Commitment: makeDomainSnapshot(domainFilesReceipts[i]), + Receipts: makeDomainSnapshot(domainFilesReceipts[i]), + }) + if err != nil { + return err + } + } + r.logger.Debug("[silkworm] snapshots updated latest", "count", countLatest, "duration", time.Since(startTimeLatest)) + + return nil +} diff --git a/turbo/snapshotsync/snapshots.go b/turbo/snapshotsync/snapshots.go index 0d701468ff3..fd522c5f0e6 100644 --- a/turbo/snapshotsync/snapshots.go +++ b/turbo/snapshotsync/snapshots.go @@ -29,6 +29,9 @@ import ( "sync/atomic" "time" + "github.com/tidwall/btree" + "golang.org/x/sync/errgroup" + "github.com/erigontech/erigon-lib/chain" "github.com/erigontech/erigon-lib/chain/snapcfg" common2 "github.com/erigontech/erigon-lib/common" @@ -43,9 +46,6 @@ import ( coresnaptype "github.com/erigontech/erigon/core/snaptype" "github.com/erigontech/erigon/eth/ethconfig" "github.com/erigontech/erigon/eth/ethconfig/estimate" - "github.com/erigontech/erigon/turbo/silkworm" - "github.com/tidwall/btree" - "golang.org/x/sync/errgroup" ) type SortedRange interface { @@ -1450,76 +1450,6 @@ func (s *RoSnapshots) PrintDebug() { } } -func mappedHeaderSnapshot(sn *DirtySegment) *silkworm.MappedHeaderSnapshot { - segmentRegion := silkworm.NewMemoryMappedRegion(sn.FilePath(), sn.DataHandle(), sn.Size()) - idxRegion := silkworm.NewMemoryMappedRegion(sn.Index().FilePath(), sn.Index().DataHandle(), sn.Index().Size()) - return silkworm.NewMappedHeaderSnapshot(segmentRegion, idxRegion) -} - -func mappedBodySnapshot(sn *DirtySegment) *silkworm.MappedBodySnapshot { - segmentRegion := silkworm.NewMemoryMappedRegion(sn.FilePath(), sn.DataHandle(), sn.Size()) - idxRegion := silkworm.NewMemoryMappedRegion(sn.Index().FilePath(), sn.Index().DataHandle(), sn.Index().Size()) - return silkworm.NewMappedBodySnapshot(segmentRegion, idxRegion) -} - -func mappedTxnSnapshot(sn *DirtySegment) *silkworm.MappedTxnSnapshot { - segmentRegion := silkworm.NewMemoryMappedRegion(sn.FilePath(), sn.DataHandle(), sn.Size()) - idxTxnHash := sn.Index(coresnaptype.Indexes.TxnHash) - idxTxnHashRegion := silkworm.NewMemoryMappedRegion(idxTxnHash.FilePath(), idxTxnHash.DataHandle(), idxTxnHash.Size()) - idxTxnHash2BlockNum := sn.Index(coresnaptype.Indexes.TxnHash2BlockNum) - idxTxnHash2BlockRegion := silkworm.NewMemoryMappedRegion(idxTxnHash2BlockNum.FilePath(), idxTxnHash2BlockNum.DataHandle(), idxTxnHash2BlockNum.Size()) - return silkworm.NewMappedTxnSnapshot(segmentRegion, idxTxnHashRegion, idxTxnHash2BlockRegion) -} - -func (s *RoSnapshots) AddSnapshotsToSilkworm(silkwormInstance *silkworm.Silkworm) error { - v := s.View() - defer v.Close() - - s.visibleLock.RLock() - defer s.visibleLock.RUnlock() - - mappedHeaderSnapshots := make([]*silkworm.MappedHeaderSnapshot, 0) - if vis := v.segments[coresnaptype.Enums.Headers]; vis != nil { - for _, headerSegment := range vis.Segments { - mappedHeaderSnapshots = append(mappedHeaderSnapshots, mappedHeaderSnapshot(headerSegment.src)) - } - } - - mappedBodySnapshots := make([]*silkworm.MappedBodySnapshot, 0) - if vis := v.segments[coresnaptype.Enums.Bodies]; vis != nil { - for _, bodySegment := range vis.Segments { - mappedBodySnapshots = append(mappedBodySnapshots, mappedBodySnapshot(bodySegment.src)) - } - return nil - - } - - mappedTxnSnapshots := make([]*silkworm.MappedTxnSnapshot, 0) - if txs := v.segments[coresnaptype.Enums.Transactions]; txs != nil { - for _, txnSegment := range txs.Segments { - mappedTxnSnapshots = append(mappedTxnSnapshots, mappedTxnSnapshot(txnSegment.src)) - } - } - - if len(mappedHeaderSnapshots) != len(mappedBodySnapshots) || len(mappedBodySnapshots) != len(mappedTxnSnapshots) { - return errors.New("addSnapshots: the number of headers/bodies/txs snapshots must be the same") - } - - for i := 0; i < len(mappedHeaderSnapshots); i++ { - mappedSnapshot := &silkworm.MappedChainSnapshot{ - Headers: mappedHeaderSnapshots[i], - Bodies: mappedBodySnapshots[i], - Txs: mappedTxnSnapshots[i], - } - err := silkwormInstance.AddSnapshot(mappedSnapshot) - if err != nil { - return err - } - } - - return nil -} - type View struct { s *RoSnapshots segments []*RoTx