diff --git a/db/db.go b/db/db.go index db1aad18..2b95ca3a 100644 --- a/db/db.go +++ b/db/db.go @@ -19,10 +19,16 @@ var ( // DB defines functions that a DB instance should implement type DB interface { BeginStateTransaction(ctx context.Context) (Tx, error) - Exists(ctx context.Context, key common.Hash) bool + + StoreLastProcessedBlock(ctx context.Context, task string, block uint64, dbTx sqlx.ExecerContext) error GetLastProcessedBlock(ctx context.Context, task string) (uint64, error) + + StoreUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey, dbTx sqlx.ExecerContext) error + GetUnresolvedBatchKeys(ctx context.Context) ([]types.BatchKey, error) + DeleteUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey, dbTx sqlx.ExecerContext) error + + Exists(ctx context.Context, key common.Hash) bool GetOffChainData(ctx context.Context, key common.Hash, dbTx sqlx.QueryerContext) (types.ArgBytes, error) - StoreLastProcessedBlock(ctx context.Context, task string, block uint64, dbTx sqlx.ExecerContext) error StoreOffChainData(ctx context.Context, od []types.OffChainData, dbTx sqlx.ExecerContext) error } @@ -50,19 +56,51 @@ func (db *pgDB) BeginStateTransaction(ctx context.Context) (Tx, error) { return db.pg.BeginTxx(ctx, nil) } -// StoreOffChainData stores and array of key values in the Db -func (db *pgDB) StoreOffChainData(ctx context.Context, od []types.OffChainData, dbTx sqlx.ExecerContext) error { - const storeOffChainDataSQL = ` - INSERT INTO data_node.offchain_data (key, value) +// StoreLastProcessedBlock stores a record of a block processed by the synchronizer for named task +func (db *pgDB) StoreLastProcessedBlock(ctx context.Context, task string, block uint64, dbTx sqlx.ExecerContext) error { + const storeLastProcessedBlockSQL = ` + INSERT INTO data_node.sync_tasks (task, block) VALUES ($1, $2) - ON CONFLICT (key) DO NOTHING; + ON CONFLICT (task) DO UPDATE + SET block = EXCLUDED.block, processed = NOW(); ` - for _, d := range od { - if _, err := dbTx.ExecContext( - ctx, storeOffChainDataSQL, - d.Key.Hex(), - common.Bytes2Hex(d.Value), + if _, err := db.execer(dbTx).ExecContext(ctx, storeLastProcessedBlockSQL, task, block); err != nil { + return err + } + + return nil +} + +// GetLastProcessedBlock returns the latest block successfully processed by the synchronizer for named task +func (db *pgDB) GetLastProcessedBlock(ctx context.Context, task string) (uint64, error) { + const getLastProcessedBlockSQL = "SELECT block FROM data_node.sync_tasks WHERE task = $1;" + + var ( + lastBlock uint64 + ) + + if err := db.pg.QueryRowContext(ctx, getLastProcessedBlockSQL, task).Scan(&lastBlock); err != nil { + return 0, err + } + + return lastBlock, nil +} + +// StoreUnresolvedBatchKeys stores unresolved batch keys in the database +func (db *pgDB) StoreUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey, dbTx sqlx.ExecerContext) error { + const storeUnresolvedBatchesSQL = ` + INSERT INTO data_node.unresolved_batches (num, hash) + VALUES ($1, $2) + ON CONFLICT (num, hash) DO NOTHING; + ` + + execer := db.execer(dbTx) + for _, bk := range bks { + if _, err := execer.ExecContext( + ctx, storeUnresolvedBatchesSQL, + bk.Number, + bk.Hash.Hex(), ); err != nil { return err } @@ -71,26 +109,39 @@ func (db *pgDB) StoreOffChainData(ctx context.Context, od []types.OffChainData, return nil } -// GetOffChainData returns the value identified by the key -func (db *pgDB) GetOffChainData(ctx context.Context, key common.Hash, dbTx sqlx.QueryerContext) (types.ArgBytes, error) { - const getOffchainDataSQL = ` - SELECT value - FROM data_node.offchain_data - WHERE key = $1 LIMIT 1; - ` +// GetUnresolvedBatchKeys returns the unresolved batch keys from the database +func (db *pgDB) GetUnresolvedBatchKeys(ctx context.Context) ([]types.BatchKey, error) { + const getUnresolvedBatchKeysSQL = "SELECT num, hash FROM data_node.unresolved_batches;" var ( - hexValue string + bks []types.BatchKey ) - if err := dbTx.QueryRowxContext(ctx, getOffchainDataSQL, key.Hex()).Scan(&hexValue); err != nil { - if errors.Is(err, sql.ErrNoRows) { - return nil, ErrStateNotSynchronized - } + if err := db.pg.SelectContext(ctx, &bks, getUnresolvedBatchKeysSQL); err != nil { return nil, err } - return common.FromHex(hexValue), nil + return bks, nil +} + +// DeleteUnresolvedBatchKeys deletes the unresolved batch keys from the database +func (db *pgDB) DeleteUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey, dbTx sqlx.ExecerContext) error { + const deleteUnresolvedBatchKeysSQL = ` + DELETE FROM data_node.unresolved_batches + WHERE num = $1 AND hash = $2; + ` + + for _, bk := range bks { + if _, err := db.execer(dbTx).ExecContext( + ctx, deleteUnresolvedBatchKeysSQL, + bk.Number, + bk.Hash.Hex(), + ); err != nil { + return err + } + } + + return nil } // Exists checks if a key exists in offchain data table @@ -108,33 +159,62 @@ func (db *pgDB) Exists(ctx context.Context, key common.Hash) bool { return count > 0 } -// StoreLastProcessedBlock stores a record of a block processed by the synchronizer for named task -func (db *pgDB) StoreLastProcessedBlock(ctx context.Context, task string, block uint64, dbTx sqlx.ExecerContext) error { - const storeLastProcessedBlockSQL = ` - INSERT INTO data_node.sync_tasks (task, block) +// StoreOffChainData stores and array of key values in the Db +func (db *pgDB) StoreOffChainData(ctx context.Context, od []types.OffChainData, dbTx sqlx.ExecerContext) error { + const storeOffChainDataSQL = ` + INSERT INTO data_node.offchain_data (key, value) VALUES ($1, $2) - ON CONFLICT (task) DO UPDATE - SET block = EXCLUDED.block, processed = NOW(); + ON CONFLICT (key) DO NOTHING; ` - if _, err := dbTx.ExecContext(ctx, storeLastProcessedBlockSQL, task, block); err != nil { - return err + execer := db.execer(dbTx) + for _, d := range od { + if _, err := execer.ExecContext( + ctx, storeOffChainDataSQL, + d.Key.Hex(), + common.Bytes2Hex(d.Value), + ); err != nil { + return err + } } return nil } -// GetLastProcessedBlock returns the latest block successfully processed by the synchronizer for named task -func (db *pgDB) GetLastProcessedBlock(ctx context.Context, task string) (uint64, error) { - const getLastProcessedBlockSQL = "SELECT block FROM data_node.sync_tasks WHERE task = $1;" +// GetOffChainData returns the value identified by the key +func (db *pgDB) GetOffChainData(ctx context.Context, key common.Hash, dbTx sqlx.QueryerContext) (types.ArgBytes, error) { + const getOffchainDataSQL = ` + SELECT value + FROM data_node.offchain_data + WHERE key = $1 LIMIT 1; + ` var ( - lastBlock uint64 + hexValue string ) - if err := db.pg.QueryRowContext(ctx, getLastProcessedBlockSQL, task).Scan(&lastBlock); err != nil { - return 0, err + if err := db.querier(dbTx).QueryRowxContext(ctx, getOffchainDataSQL, key.Hex()).Scan(&hexValue); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, ErrStateNotSynchronized + } + return nil, err } - return lastBlock, nil + return common.FromHex(hexValue), nil +} + +func (db *pgDB) execer(dbTx sqlx.ExecerContext) sqlx.ExecerContext { + if dbTx != nil { + return dbTx + } + + return db.pg +} + +func (db *pgDB) querier(dbTx sqlx.QueryerContext) sqlx.QueryerContext { + if dbTx != nil { + return dbTx + } + + return db.pg } diff --git a/db/migrations/0004.sql b/db/migrations/0004.sql new file mode 100644 index 00000000..f3f4c0fa --- /dev/null +++ b/db/migrations/0004.sql @@ -0,0 +1,12 @@ +-- +migrate Down +DROP TABLE IF EXISTS data_node.unresolved_batches CASCADE; + +-- +migrate Up +CREATE TABLE data_node.unresolved_batches +( + num BIGINT NOT NULL, + hash VARCHAR NOT NULL, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + + PRIMARY KEY (num, hash) +); diff --git a/synchronizer/batches.go b/synchronizer/batches.go index 216ebb7a..018efa5c 100644 --- a/synchronizer/batches.go +++ b/synchronizer/batches.go @@ -96,6 +96,7 @@ func (bs *BatchSynchronizer) resolveCommittee() error { // Start starts the synchronizer func (bs *BatchSynchronizer) Start() { log.Infof("starting batch synchronizer, DAC addr: %v", bs.self) + go bs.handleUnresolvedBatches() go bs.consumeEvents() go bs.produceEvents() go bs.handleReorgs() @@ -108,6 +109,7 @@ func (bs *BatchSynchronizer) Stop() { } func (bs *BatchSynchronizer) handleReorgs() { + log.Info("starting reorgs handler") for { select { case r := <-bs.reorgs: @@ -203,12 +205,6 @@ func (bs *BatchSynchronizer) consumeEvents() { } } -// batchKey is the pairing of batch number and data hash of a batch -type batchKey struct { - number uint64 - hash common.Hash -} - func (bs *BatchSynchronizer) handleEvent(event *polygonvalidium.PolygonvalidiumSequenceBatches) error { ctx, cancel := context.WithTimeout(context.Background(), bs.rpcTimeout) defer cancel() @@ -217,45 +213,102 @@ func (bs *BatchSynchronizer) handleEvent(event *polygonvalidium.PolygonvalidiumS if err != nil { return err } + txData := tx.Data() keys, err := UnpackTxData(txData) if err != nil { return err } + // The event has the _last_ batch number & list of hashes. Each hash is // in order, so the batch number can be computed from position in array - var batchKeys []batchKey + var batchKeys []types.BatchKey for i, j := 0, len(keys)-1; i < len(keys); i, j = i+1, j-1 { - batchKeys = append(batchKeys, batchKey{ - number: event.NumBatch - uint64(i), - hash: keys[j], + batchKeys = append(batchKeys, types.BatchKey{ + Number: event.NumBatch - uint64(i), + Hash: keys[j], }) } - // Pick out any batches that are missing from storage - var missing []batchKey - for _, key := range batchKeys { - if !exists(bs.db, key.hash) { - missing = append(missing, key) - } - } - if len(missing) == 0 { - return nil - } - // Resolve the missing data - var data []types.OffChainData - for _, key := range missing { - var value *types.OffChainData - value, err = bs.resolve(key) - if err != nil { - return err + + // Store batch keys. Already handled batch keys are going to be ignored based on the DB logic. + return storeUnresolvedBatchKeys(bs.db, batchKeys) +} + +// handleUnresolvedBatches handles unresolved batches that were collected by the event consumer +func (bs *BatchSynchronizer) handleUnresolvedBatches() { + log.Info("starting handling unresolved batches") + for { + delay := time.NewTimer(bs.retry) + select { + case <-delay.C: + // Get unresolved batches + batchKeys, err := getUnresolvedBatchKeys(bs.db) + if err != nil { + log.Errorf("failed to get unresolved batch keys: %v", err) + continue + } + + if len(batchKeys) == 0 { + continue + } + + // Resolve the unresolved data + // Pick out any batches that are missing from storage + var existing []types.BatchKey + var unresolved []types.BatchKey + for _, key := range batchKeys { + if exists(bs.db, key.Hash) { + existing = append(existing, key) + } else { + unresolved = append(unresolved, key) + } + } + + // Delete existing keys from unresolved batches + if len(existing) > 0 { + if err = deleteUnresolvedBatchKeys(bs.db, existing); err != nil { + log.Errorf("failed to delete unresolved batch keys: %v", err) + } + } + + // Break the process if there are no unresolved batches + if len(unresolved) == 0 { + continue + } + + // Resolve the missing data + var data []types.OffChainData + var resolved []types.BatchKey + for _, key := range unresolved { + var value *types.OffChainData + if value, err = bs.resolve(key); err != nil { + log.Errorf("failed to resolve batch %s: %v", key.Hash.Hex(), err) + continue + } + + resolved = append(resolved, key) + data = append(data, *value) + } + + // Finally, store the data + if err = storeOffchainData(bs.db, data); err != nil { + log.Errorf("failed to store offchain data: %v", err) + continue + } + + // Delete keys from unresolved batches + if len(resolved) > 0 { + if err = deleteUnresolvedBatchKeys(bs.db, resolved); err != nil { + log.Errorf("failed to delete successfully resolved batch keys: %v", err) + } + } + case <-bs.stop: + return } - data = append(data, *value) } - // Finally, store the data - return store(bs.db, data) } -func (bs *BatchSynchronizer) resolve(batch batchKey) (*types.OffChainData, error) { +func (bs *BatchSynchronizer) resolve(batch types.BatchKey) (*types.OffChainData, error) { // First try to get the data from the trusted sequencer data := bs.trySequencer(batch) if data != nil { @@ -284,7 +337,7 @@ func (bs *BatchSynchronizer) resolve(batch batchKey) (*types.OffChainData, error delete(bs.committee, member.Addr) continue // malformed committee, skip what is known to be wrong } - value, err := bs.resolveWithMember(batch.hash, member) + value, err := bs.resolveWithMember(batch.Hash, member) if err != nil { log.Warnf("error resolving, continuing: %v", err) delete(bs.committee, member.Addr) @@ -293,24 +346,24 @@ func (bs *BatchSynchronizer) resolve(batch batchKey) (*types.OffChainData, error return value, nil } - return nil, rpc.NewRPCError(rpc.NotFoundErrorCode, "no data found for number %d, key %v", batch.number, batch.hash.Hex()) + return nil, rpc.NewRPCError(rpc.NotFoundErrorCode, "no data found for number %d, key %v", batch.Number, batch.Hash.Hex()) } // trySequencer returns L2Data from the trusted sequencer, but does not return errors, only logs warnings if not found. -func (bs *BatchSynchronizer) trySequencer(batch batchKey) *types.OffChainData { - seqBatch, err := bs.sequencer.GetSequenceBatch(batch.number) +func (bs *BatchSynchronizer) trySequencer(batch types.BatchKey) *types.OffChainData { + seqBatch, err := bs.sequencer.GetSequenceBatch(batch.Number) if err != nil { log.Warnf("failed to get data from sequencer: %v", err) return nil } expectKey := crypto.Keccak256Hash(seqBatch.BatchL2Data) - if batch.hash != expectKey { - log.Warnf("number %d: sequencer gave wrong data for key: %s", batch.number, batch.hash.Hex()) + if batch.Hash != expectKey { + log.Warnf("number %d: sequencer gave wrong data for key: %s", batch.Number, batch.Hash.Hex()) return nil } return &types.OffChainData{ - Key: batch.hash, + Key: batch.Hash, Value: seqBatch.BatchL2Data, } } diff --git a/synchronizer/batches_test.go b/synchronizer/batches_test.go index d904348d..b76eff29 100644 --- a/synchronizer/batches_test.go +++ b/synchronizer/batches_test.go @@ -91,9 +91,9 @@ func TestBatchSynchronizer_Resolve(t *testing.T) { } data := common.HexToHash("0xFFFF").Bytes() - batchKey := batchKey{ - number: 1, - hash: crypto.Keccak256Hash(data), + batchKey := types.BatchKey{ + Number: 1, + Hash: crypto.Keccak256Hash(data), } testFn := func(config testConfig) { @@ -139,7 +139,7 @@ func TestBatchSynchronizer_Resolve(t *testing.T) { } } else { require.NoError(t, err) - require.Equal(t, batchKey.hash, offChainData.Key) + require.Equal(t, batchKey.Hash, offChainData.Key) require.Equal(t, data, offChainData.Value) } @@ -153,9 +153,9 @@ func TestBatchSynchronizer_Resolve(t *testing.T) { t.Parallel() testFn(testConfig{ - getSequenceBatchArgs: []interface{}{batchKey.number}, + getSequenceBatchArgs: []interface{}{batchKey.Number}, getSequenceBatchReturns: []interface{}{&sequencer.SeqBatch{ - Number: types.ArgUint64(batchKey.number), + Number: types.ArgUint64(batchKey.Number), BatchL2Data: types.ArgBytes(data), }, nil}, }) @@ -179,9 +179,9 @@ func TestBatchSynchronizer_Resolve(t *testing.T) { testFn(testConfig{ isErrorExpected: false, - getOffChainDataArgs: [][]interface{}{{mock.Anything, batchKey.hash}}, + getOffChainDataArgs: [][]interface{}{{mock.Anything, batchKey.Hash}}, getOffChainDataReturns: [][]interface{}{{data, nil}}, - getSequenceBatchArgs: []interface{}{batchKey.number}, + getSequenceBatchArgs: []interface{}{batchKey.Number}, getSequenceBatchReturns: []interface{}{nil, errors.New("error")}, getCurrentDataCommitteeReturns: []interface{}{committee, nil}, newArgs: [][]interface{}{{committee.Members[0].URL}}, @@ -205,15 +205,15 @@ func TestBatchSynchronizer_Resolve(t *testing.T) { } testFn(testConfig{ - getSequenceBatchArgs: []interface{}{batchKey.number}, + getSequenceBatchArgs: []interface{}{batchKey.Number}, getSequenceBatchReturns: []interface{}{nil, errors.New("error")}, getCurrentDataCommitteeReturns: []interface{}{committee, nil}, newArgs: [][]interface{}{ {committee.Members[0].URL}, {committee.Members[1].URL}}, getOffChainDataArgs: [][]interface{}{ - {mock.Anything, batchKey.hash}, - {mock.Anything, batchKey.hash}, + {mock.Anything, batchKey.Hash}, + {mock.Anything, batchKey.Hash}, }, getOffChainDataReturns: [][]interface{}{ {nil, errors.New("error")}, // member doesn't have batch @@ -247,14 +247,14 @@ func TestBatchSynchronizer_Resolve(t *testing.T) { {committee.Members[0].URL}, {committee.Members[1].URL}}, getOffChainDataArgs: [][]interface{}{ - {mock.Anything, batchKey.hash}, - {mock.Anything, batchKey.hash}, + {mock.Anything, batchKey.Hash}, + {mock.Anything, batchKey.Hash}, }, getOffChainDataReturns: [][]interface{}{ {[]byte{0, 0, 0, 1}, nil}, // member doesn't have batch {[]byte{0, 0, 0, 1}, nil}, // member doesn't have batch }, - getSequenceBatchArgs: []interface{}{batchKey.number}, + getSequenceBatchArgs: []interface{}{batchKey.Number}, getSequenceBatchReturns: []interface{}{nil, errors.New("error")}, getCurrentDataCommitteeReturns: []interface{}{committee, nil}, }) diff --git a/synchronizer/store.go b/synchronizer/store.go index d75355ef..31eca3ff 100644 --- a/synchronizer/store.go +++ b/synchronizer/store.go @@ -59,7 +59,64 @@ func exists(db dbTypes.DB, key common.Hash) bool { return db.Exists(ctx, key) } -func store(db dbTypes.DB, data []types.OffChainData) error { +func storeUnresolvedBatchKeys(db dbTypes.DB, keys []types.BatchKey) error { + ctx, cancel := context.WithTimeout(context.Background(), dbTimeout) + defer cancel() + + var ( + dbTx dbTypes.Tx + err error + ) + + if dbTx, err = db.BeginStateTransaction(ctx); err != nil { + return err + } + + if err = db.StoreUnresolvedBatchKeys(ctx, keys, dbTx); err != nil { + rollback(err, dbTx) + return err + } + + if err = dbTx.Commit(); err != nil { + return err + } + + return nil +} + +func getUnresolvedBatchKeys(db dbTypes.DB) ([]types.BatchKey, error) { + ctx, cancel := context.WithTimeout(context.Background(), dbTimeout) + defer cancel() + + return db.GetUnresolvedBatchKeys(ctx) +} + +func deleteUnresolvedBatchKeys(db dbTypes.DB, keys []types.BatchKey) error { + ctx, cancel := context.WithTimeout(context.Background(), dbTimeout) + defer cancel() + + var ( + dbTx dbTypes.Tx + err error + ) + + if dbTx, err = db.BeginStateTransaction(ctx); err != nil { + return err + } + + if err = db.DeleteUnresolvedBatchKeys(ctx, keys, dbTx); err != nil { + rollback(err, dbTx) + return err + } + + if err = dbTx.Commit(); err != nil { + return err + } + + return nil +} + +func storeOffchainData(db dbTypes.DB, data []types.OffChainData) error { ctx, cancel := context.WithTimeout(context.Background(), dbTimeout) defer cancel() diff --git a/synchronizer/store_test.go b/synchronizer/store_test.go index 5a72bfa4..6a15cf56 100644 --- a/synchronizer/store_test.go +++ b/synchronizer/store_test.go @@ -233,7 +233,7 @@ func Test_store(t *testing.T) { t.Run(tt.name, func(t *testing.T) { testDB := tt.db(t) - if err := store(testDB, tt.data); tt.wantErr { + if err := storeOffchainData(testDB, tt.data); tt.wantErr { require.ErrorIs(t, err, testError) } else { require.NoError(t, err) diff --git a/types/types.go b/types/types.go index 7474bdb7..2d97e253 100644 --- a/types/types.go +++ b/types/types.go @@ -15,6 +15,12 @@ const ( hexBitSize64 = 64 ) +// BatchKey is the pairing of batch number and data hash of a batch +type BatchKey struct { + Number uint64 + Hash common.Hash +} + // OffChainData represents some data that is not stored on chain and should be preserved type OffChainData struct { Key common.Hash