Skip to content

Commit

Permalink
Merge pull request #453 from NilFoundation/sc/fix/storage-capacity-limit
Browse files Browse the repository at this point in the history
Sync Committee: BlockStorage Capacity Limit
  • Loading branch information
zadykian authored Mar 4, 2025
2 parents 74855fb + 98fad98 commit 6dfe558
Show file tree
Hide file tree
Showing 15 changed files with 292 additions and 55 deletions.
2 changes: 1 addition & 1 deletion nil/cmd/sync_committee/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func execute() error {
func addFlags(cmd *cobra.Command, cfg *cmdConfig) {
cmd.Flags().StringVar(&cfg.RpcEndpoint, "endpoint", cfg.RpcEndpoint, "rpc endpoint")
cmd.Flags().StringVar(&cfg.TaskListenerRpcEndpoint, "own-endpoint", cfg.TaskListenerRpcEndpoint, "own rpc server endpoint")
cmd.Flags().DurationVar(&cfg.PollingDelay, "polling-delay", cfg.PollingDelay, "delay between new block polling")
cmd.Flags().DurationVar(&cfg.AggregatorConfig.RpcPollingInterval, "polling-delay", cfg.AggregatorConfig.RpcPollingInterval, "delay between new block polling")
cmd.Flags().StringVar(&cfg.DbPath, "db-path", "sync_committee.db", "path to database")
cmd.Flags().StringVar(&cfg.ProposerParams.Endpoint, "l1-endpoint", cfg.ProposerParams.Endpoint, "L1 endpoint")
cmd.Flags().StringVar(&cfg.ProposerParams.PrivateKey, "l1-private-key", cfg.ProposerParams.PrivateKey, "L1 account private key")
Expand Down
31 changes: 25 additions & 6 deletions nil/services/synccommittee/core/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/NilFoundation/nil/nil/services/synccommittee/core/reset"
"github.com/NilFoundation/nil/nil/services/synccommittee/internal/metrics"
"github.com/NilFoundation/nil/nil/services/synccommittee/internal/srv"
"github.com/NilFoundation/nil/nil/services/synccommittee/internal/storage"
"github.com/NilFoundation/nil/nil/services/synccommittee/internal/types"
"github.com/rs/zerolog"
)
Expand All @@ -38,6 +39,20 @@ type AggregatorBlockStorage interface {
SetBlockBatch(ctx context.Context, batch *types.BlockBatch) error
}

type AggregatorConfig struct {
RpcPollingInterval time.Duration
}

func NewAggregatorConfig(rpcPollingInterval time.Duration) AggregatorConfig {
return AggregatorConfig{
RpcPollingInterval: rpcPollingInterval,
}
}

func NewDefaultAggregatorConfig() AggregatorConfig {
return NewAggregatorConfig(time.Second)
}

type aggregator struct {
logger zerolog.Logger
rpcClient client.Client
Expand All @@ -58,7 +73,7 @@ func NewAggregator(
timer common.Timer,
logger zerolog.Logger,
metrics AggregatorMetrics,
pollingDelay time.Duration,
config AggregatorConfig,
) *aggregator {
agg := &aggregator{
rpcClient: rpcClient,
Expand All @@ -76,7 +91,7 @@ func NewAggregator(
metrics: metrics,
}

agg.workerAction = concurrent.NewSuspendable(agg.runIteration, pollingDelay)
agg.workerAction = concurrent.NewSuspendable(agg.runIteration, config.RpcPollingInterval)
agg.logger = srv.WorkerLogger(logger, agg)
return agg
}
Expand Down Expand Up @@ -155,6 +170,10 @@ func (agg *aggregator) processNewBlocks(ctx context.Context) error {
}
return nil

case errors.Is(err, storage.ErrCapacityLimitReached):
agg.logger.Info().Err(err).Msg("storage capacity limit reached, skipping")
return nil

case err != nil && !errors.Is(err, context.Canceled):
return fmt.Errorf("error processing blocks: %w", err)

Expand Down Expand Up @@ -290,6 +309,10 @@ func (agg *aggregator) handleBlockBatch(ctx context.Context, batch *types.BlockB
return err
}

if err := agg.blockStorage.SetBlockBatch(ctx, batch); err != nil {
return fmt.Errorf("error storing block batch, mainHash=%s: %w", batch.MainShardBlock.Hash, err)
}

prunedBatch := types.NewPrunedBatch(batch)
if err := agg.batchCommitter.Commit(ctx, prunedBatch); err != nil {
return err
Expand All @@ -299,10 +322,6 @@ func (agg *aggregator) handleBlockBatch(ctx context.Context, batch *types.BlockB
return fmt.Errorf("error creating proof tasks, mainHash=%s: %w", batch.MainShardBlock.Hash, err)
}

if err := agg.blockStorage.SetBlockBatch(ctx, batch); err != nil {
return fmt.Errorf("error storing block batch, mainHash=%s: %w", batch.MainShardBlock.Hash, err)
}

agg.metrics.RecordMainBlockFetched(ctx)
return nil
}
Expand Down
99 changes: 76 additions & 23 deletions nil/services/synccommittee/core/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"testing"
"time"

"github.com/NilFoundation/nil/nil/client"
"github.com/NilFoundation/nil/nil/common"
Expand All @@ -26,6 +25,7 @@ type AggregatorTestSuite struct {
ctx context.Context
cancellation context.CancelFunc

metrics *metrics.SyncCommitteeMetricsHandler
db db.DB
blockStorage *storage.BlockStorage
taskStorage *storage.TaskStorage
Expand All @@ -45,28 +45,45 @@ func (s *AggregatorTestSuite) SetupSuite() {
logger := logging.NewLogger("aggregator_test")
metricsHandler, err := metrics.NewSyncCommitteeMetrics()
s.Require().NoError(err)
s.metrics = metricsHandler

s.db, err = db.NewBadgerDbInMemory()
s.Require().NoError(err)
timer := common.NewTimer()
s.blockStorage = storage.NewBlockStorage(s.db, timer, metricsHandler, logger)
s.taskStorage = storage.NewTaskStorage(s.db, timer, metricsHandler, logger)
s.blockStorage = s.newTestBlockStorage(storage.DefaultBlockStorageConfig())
s.taskStorage = storage.NewTaskStorage(s.db, timer, s.metrics, logger)
s.rpcClientMock = &client.ClientMock{}

s.aggregator = s.newTestAggregator(s.blockStorage)
}

func (s *AggregatorTestSuite) newTestAggregator(
blockStorage AggregatorBlockStorage,
) *aggregator {
s.T().Helper()

logger := logging.NewLogger("aggregator_test")
stateResetter := reset.NewStateResetter(logger, s.blockStorage)
timer := common.NewTimer()

s.aggregator = NewAggregator(
return NewAggregator(
s.rpcClientMock,
s.blockStorage,
blockStorage,
s.taskStorage,
stateResetter,
timer,
logger,
metricsHandler,
time.Second,
s.metrics,
NewDefaultAggregatorConfig(),
)
}

func (s *AggregatorTestSuite) newTestBlockStorage(config storage.BlockStorageConfig) *storage.BlockStorage {
s.T().Helper()
timer := common.NewTimer()
return storage.NewBlockStorage(s.db, config, timer, s.metrics, logging.NewLogger("aggregator_test"))
}

func (s *AggregatorTestSuite) SetupTest() {
err := s.db.DropAll()
s.Require().NoError(err, "failed to clear database in SetUpTest")
Expand Down Expand Up @@ -102,18 +119,10 @@ func (s *AggregatorTestSuite) Test_No_New_Block_To_Fetch() {
}

func (s *AggregatorTestSuite) Test_Fetched_Not_Ready_Batch() {
mainBlock := testaide.NewMainShardBlock()
mainBlock.ChildBlocks[1] = common.EmptyHash

s.rpcClientMock.GetBlockFunc = blockGenerator(mainBlock)
nextMainBlock := testaide.NewMainShardBlock()
nextMainBlock.ChildBlocks[1] = common.EmptyHash

s.rpcClientMock.GetBlocksRangeFunc = func(_ context.Context, _ types.ShardId, from types.BlockNumber, to types.BlockNumber, _ bool, _ int) ([]*jsonrpc.RPCBlock, error) {
if from == mainBlock.Number && to == mainBlock.Number+1 {
return []*jsonrpc.RPCBlock{mainBlock}, nil
}

return nil, errors.New("unexpected call of GetBlocksRange")
}
s.setBlockGeneratorTo(nextMainBlock)

err := s.aggregator.processNewBlocks(s.ctx)
s.Require().NoError(err)
Expand Down Expand Up @@ -190,6 +199,54 @@ func (s *AggregatorTestSuite) Test_Fetch_Next_Valid() {
s.Require().NoError(err)
nextMainBlock := batches[1].MainShardBlock

s.setBlockGeneratorTo(nextMainBlock)

err = s.aggregator.processNewBlocks(s.ctx)
s.Require().NoError(err)
s.requireMainBlockHandled(nextMainBlock)
}

func (s *AggregatorTestSuite) Test_Block_Storage_Capacity_Exceeded() {
// only one test batch can fit in the storage
storageConfig := storage.NewBlockStorageConfig(testaide.BatchSize)
blockStorage := s.newTestBlockStorage(storageConfig)

batches := testaide.NewBatchesSequence(2)
nextMainBlock := batches[0].MainShardBlock

s.setBlockGeneratorTo(nextMainBlock)

agg := s.newTestAggregator(blockStorage)

err := agg.processNewBlocks(s.ctx)
s.Require().NoError(err)
s.requireMainBlockHandled(nextMainBlock)

latestFetchedBeforeNext, err := blockStorage.TryGetLatestFetched(s.ctx)
s.Require().NoError(err)

// nextBatch should not be handled by Aggregator due to storage capacity limit
nextBatch := batches[1]
s.setBlockGeneratorTo(nextBatch.MainShardBlock)
err = agg.processNewBlocks(s.ctx)
s.Require().NoError(err)

latestFetchedAfterNext, err := blockStorage.TryGetLatestFetched(s.ctx)
s.Require().NoError(err)
s.Equal(latestFetchedBeforeNext, latestFetchedAfterNext)

for _, block := range nextBatch.AllBlocks() {
storedBlock, err := s.blockStorage.TryGetBlock(s.ctx, scTypes.IdFromBlock(block))
s.Require().NoError(err)
s.Require().Nil(storedBlock)
}

s.requireNoNewTasks()
}

func (s *AggregatorTestSuite) setBlockGeneratorTo(nextMainBlock *jsonrpc.RPCBlock) {
s.T().Helper()

s.rpcClientMock.GetBlockFunc = blockGenerator(nextMainBlock)

s.rpcClientMock.GetBlocksRangeFunc = func(_ context.Context, _ types.ShardId, from types.BlockNumber, to types.BlockNumber, _ bool, _ int) ([]*jsonrpc.RPCBlock, error) {
Expand All @@ -199,10 +256,6 @@ func (s *AggregatorTestSuite) Test_Fetch_Next_Valid() {

return nil, errors.New("unexpected call of GetBlocksRange")
}

err = s.aggregator.processNewBlocks(s.ctx)
s.Require().NoError(err)
s.requireMainBlockHandled(nextMainBlock)
}

func blockGenerator(mainBlock *jsonrpc.RPCBlock) func(context.Context, types.ShardId, any, bool) (*jsonrpc.RPCBlock, error) {
Expand Down Expand Up @@ -232,7 +285,7 @@ func (s *AggregatorTestSuite) requireNoNewTasks() {
s.T().Helper()
task, err := s.taskStorage.RequestTaskToExecute(s.ctx, testaide.RandomExecutorId())
s.Require().NoError(err)
s.Require().Nil(task)
s.Require().Nil(task, "expected no new tasks available for execution, but got one")
}

func (s *AggregatorTestSuite) requireMainBlockHandled(mainBlock *jsonrpc.RPCBlock) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (s *BlockTasksIntegrationTestSuite) SetupSuite() {

s.timer = testaide.NewTestTimer()
s.taskStorage = storage.NewTaskStorage(s.db, s.timer, metricsHandler, logger)
s.blockStorage = storage.NewBlockStorage(s.db, s.timer, metricsHandler, logger)
s.blockStorage = storage.NewBlockStorage(s.db, storage.DefaultBlockStorageConfig(), s.timer, metricsHandler, logger)

s.scheduler = scheduler.New(
s.taskStorage,
Expand Down
8 changes: 3 additions & 5 deletions nil/services/synccommittee/core/config.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package core

import (
"time"

"github.com/NilFoundation/nil/nil/internal/telemetry"
)

Expand All @@ -13,16 +11,16 @@ const (
type Config struct {
RpcEndpoint string
TaskListenerRpcEndpoint string
PollingDelay time.Duration
ProposerParams *ProposerParams
AggregatorConfig AggregatorConfig
ProposerParams ProposerParams
Telemetry *telemetry.Config
}

func NewDefaultConfig() *Config {
return &Config{
RpcEndpoint: "tcp://127.0.0.1:8529",
TaskListenerRpcEndpoint: DefaultTaskRpcEndpoint,
PollingDelay: time.Second,
AggregatorConfig: NewDefaultAggregatorConfig(),
ProposerParams: NewDefaultProposerParams(),
Telemetry: &telemetry.Config{
ServiceName: "sync_committee",
Expand Down
8 changes: 4 additions & 4 deletions nil/services/synccommittee/core/proposer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ type proposer struct {
ethClient rollupcontract.EthClient

rollupContractWrapper *rollupcontract.Wrapper
params *ProposerParams
params ProposerParams

metrics ProposerMetrics
logger zerolog.Logger
Expand All @@ -53,8 +53,8 @@ type ProposerParams struct {
EthClientTimeout time.Duration
}

func NewDefaultProposerParams() *ProposerParams {
return &ProposerParams{
func NewDefaultProposerParams() ProposerParams {
return ProposerParams{
Endpoint: "http://rpc2.sepolia.org",
PrivateKey: "0000000000000000000000000000000000000000000000000000000000000001",
ContractAddress: "0x796baf7E572948CD0cbC374f345963bA433b47a2",
Expand All @@ -65,7 +65,7 @@ func NewDefaultProposerParams() *ProposerParams {

func NewProposer(
ctx context.Context,
params *ProposerParams,
params ProposerParams,
storage ProposerStorage,
ethClient rollupcontract.EthClient,
metrics ProposerMetrics,
Expand Down
4 changes: 2 additions & 2 deletions nil/services/synccommittee/core/proposer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type ProposerTestSuite struct {
ctx context.Context
cancellation context.CancelFunc

params *ProposerParams
params ProposerParams
db db.DB
timer common.Timer
storage *storage.BlockStorage
Expand Down Expand Up @@ -114,7 +114,7 @@ func (s *ProposerTestSuite) SetupSuite() {
s.Require().NoError(err)

s.timer = testaide.NewTestTimer()
s.storage = storage.NewBlockStorage(s.db, s.timer, metricsHandler, logger)
s.storage = storage.NewBlockStorage(s.db, storage.DefaultBlockStorageConfig(), s.timer, metricsHandler, logger)
s.params = NewDefaultProposerParams()
s.testData = testaide.NewProposalData(3, s.timer.NowTime())
s.callContractMock = newCallContractMock()
Expand Down
4 changes: 2 additions & 2 deletions nil/services/synccommittee/core/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func New(cfg *Config, database db.DB, ethClient rollupcontract.EthClient) (*Sync
client := nilrpc.NewClient(cfg.RpcEndpoint, logger)

timer := common.NewTimer()
blockStorage := storage.NewBlockStorage(database, timer, metricsHandler, logger)
blockStorage := storage.NewBlockStorage(database, storage.DefaultBlockStorageConfig(), timer, metricsHandler, logger)
taskStorage := storage.NewTaskStorage(database, timer, metricsHandler, logger)

// todo: add reset logic to TaskStorage (implement StateResetter interface) and pass it here in https://github.com/NilFoundation/nil/pull/419
Expand All @@ -52,7 +52,7 @@ func New(cfg *Config, database db.DB, ethClient rollupcontract.EthClient) (*Sync
timer,
logger,
metricsHandler,
cfg.PollingDelay,
cfg.AggregatorConfig,
)

ctx := context.Background()
Expand Down
9 changes: 7 additions & 2 deletions nil/services/synccommittee/core/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,13 @@ func (s *SyncCommitteeTestSuite) SetupSuite() {

syncCommitteeMetrics, err := metrics.NewSyncCommitteeMetrics()
s.Require().NoError(err)
s.blockStorage = storage.NewBlockStorage(s.scDb, common.NewTimer(), syncCommitteeMetrics, logging.NewLogger("sync_committee_srv_test"))
s.Require().NoError(err)
s.blockStorage = storage.NewBlockStorage(
s.scDb,
storage.DefaultBlockStorageConfig(),
common.NewTimer(),
syncCommitteeMetrics,
logging.NewLogger("sync_committee_srv_test"),
)
}

func (s *SyncCommitteeTestSuite) TearDownSuite() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"time"

"github.com/NilFoundation/nil/nil/common"
"github.com/dgraph-io/badger/v4"
"github.com/rs/zerolog"
)

Expand All @@ -18,7 +19,9 @@ func badgerRetryRunner(
retryPolicy := common.ComposeRetryPolicies(
append(
[]common.RetryPolicyFunc{
common.DoNotRetryIf(ErrSerializationFailed),
common.DoNotRetryIf(
ErrSerializationFailed, ErrCapacityLimitReached, badger.ErrTxnTooBig,
),
common.LimitRetries(badgerDefaultRetryLimit),
},
additionalPolicies...,
Expand Down
Loading

0 comments on commit 6dfe558

Please sign in to comment.