Skip to content

Commit

Permalink
Optimized batch resolving process
Browse files Browse the repository at this point in the history
  • Loading branch information
begmaroman committed Mar 6, 2024
1 parent d5853ea commit 5b2bbc7
Show file tree
Hide file tree
Showing 7 changed files with 302 additions and 94 deletions.
160 changes: 120 additions & 40 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,16 @@ var (
// DB defines functions that a DB instance should implement
type DB interface {
BeginStateTransaction(ctx context.Context) (Tx, error)
Exists(ctx context.Context, key common.Hash) bool

StoreLastProcessedBlock(ctx context.Context, task string, block uint64, dbTx sqlx.ExecerContext) error
GetLastProcessedBlock(ctx context.Context, task string) (uint64, error)

StoreUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey, dbTx sqlx.ExecerContext) error
GetUnresolvedBatchKeys(ctx context.Context) ([]types.BatchKey, error)
DeleteUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey, dbTx sqlx.ExecerContext) error

Exists(ctx context.Context, key common.Hash) bool
GetOffChainData(ctx context.Context, key common.Hash, dbTx sqlx.QueryerContext) (types.ArgBytes, error)
StoreLastProcessedBlock(ctx context.Context, task string, block uint64, dbTx sqlx.ExecerContext) error
StoreOffChainData(ctx context.Context, od []types.OffChainData, dbTx sqlx.ExecerContext) error
}

Expand Down Expand Up @@ -50,19 +56,51 @@ func (db *pgDB) BeginStateTransaction(ctx context.Context) (Tx, error) {
return db.pg.BeginTxx(ctx, nil)
}

// StoreOffChainData stores and array of key values in the Db
func (db *pgDB) StoreOffChainData(ctx context.Context, od []types.OffChainData, dbTx sqlx.ExecerContext) error {
const storeOffChainDataSQL = `
INSERT INTO data_node.offchain_data (key, value)
// StoreLastProcessedBlock stores a record of a block processed by the synchronizer for named task
func (db *pgDB) StoreLastProcessedBlock(ctx context.Context, task string, block uint64, dbTx sqlx.ExecerContext) error {
const storeLastProcessedBlockSQL = `
INSERT INTO data_node.sync_tasks (task, block)
VALUES ($1, $2)
ON CONFLICT (key) DO NOTHING;
ON CONFLICT (task) DO UPDATE
SET block = EXCLUDED.block, processed = NOW();
`

for _, d := range od {
if _, err := dbTx.ExecContext(
ctx, storeOffChainDataSQL,
d.Key.Hex(),
common.Bytes2Hex(d.Value),
if _, err := db.execer(dbTx).ExecContext(ctx, storeLastProcessedBlockSQL, task, block); err != nil {
return err
}

return nil
}

// GetLastProcessedBlock returns the latest block successfully processed by the synchronizer for named task
func (db *pgDB) GetLastProcessedBlock(ctx context.Context, task string) (uint64, error) {
const getLastProcessedBlockSQL = "SELECT block FROM data_node.sync_tasks WHERE task = $1;"

var (
lastBlock uint64
)

if err := db.pg.QueryRowContext(ctx, getLastProcessedBlockSQL, task).Scan(&lastBlock); err != nil {
return 0, err
}

return lastBlock, nil
}

// StoreUnresolvedBatchKeys stores unresolved batch keys in the database
func (db *pgDB) StoreUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey, dbTx sqlx.ExecerContext) error {
const storeUnresolvedBatchesSQL = `
INSERT INTO data_node.unresolved_batches (num, hash)
VALUES ($1, $2)
ON CONFLICT (num, hash) DO NOTHING;
`

execer := db.execer(dbTx)
for _, bk := range bks {
if _, err := execer.ExecContext(
ctx, storeUnresolvedBatchesSQL,
bk.Number,
bk.Hash.Hex(),
); err != nil {
return err
}
Expand All @@ -71,26 +109,39 @@ func (db *pgDB) StoreOffChainData(ctx context.Context, od []types.OffChainData,
return nil
}

// GetOffChainData returns the value identified by the key
func (db *pgDB) GetOffChainData(ctx context.Context, key common.Hash, dbTx sqlx.QueryerContext) (types.ArgBytes, error) {
const getOffchainDataSQL = `
SELECT value
FROM data_node.offchain_data
WHERE key = $1 LIMIT 1;
`
// GetUnresolvedBatchKeys returns the unresolved batch keys from the database
func (db *pgDB) GetUnresolvedBatchKeys(ctx context.Context) ([]types.BatchKey, error) {
const getUnresolvedBatchKeysSQL = "SELECT num, hash FROM data_node.unresolved_batches;"

var (
hexValue string
bks []types.BatchKey
)

if err := dbTx.QueryRowxContext(ctx, getOffchainDataSQL, key.Hex()).Scan(&hexValue); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, ErrStateNotSynchronized
}
if err := db.pg.SelectContext(ctx, &bks, getUnresolvedBatchKeysSQL); err != nil {
return nil, err
}

return common.FromHex(hexValue), nil
return bks, nil
}

// DeleteUnresolvedBatchKeys deletes the unresolved batch keys from the database
func (db *pgDB) DeleteUnresolvedBatchKeys(ctx context.Context, bks []types.BatchKey, dbTx sqlx.ExecerContext) error {
const deleteUnresolvedBatchKeysSQL = `
DELETE FROM data_node.unresolved_batches
WHERE num = $1 AND hash = $2;
`

for _, bk := range bks {
if _, err := db.execer(dbTx).ExecContext(
ctx, deleteUnresolvedBatchKeysSQL,
bk.Number,
bk.Hash.Hex(),
); err != nil {
return err
}
}

return nil
}

// Exists checks if a key exists in offchain data table
Expand All @@ -108,33 +159,62 @@ func (db *pgDB) Exists(ctx context.Context, key common.Hash) bool {
return count > 0
}

// StoreLastProcessedBlock stores a record of a block processed by the synchronizer for named task
func (db *pgDB) StoreLastProcessedBlock(ctx context.Context, task string, block uint64, dbTx sqlx.ExecerContext) error {
const storeLastProcessedBlockSQL = `
INSERT INTO data_node.sync_tasks (task, block)
// StoreOffChainData stores and array of key values in the Db
func (db *pgDB) StoreOffChainData(ctx context.Context, od []types.OffChainData, dbTx sqlx.ExecerContext) error {
const storeOffChainDataSQL = `
INSERT INTO data_node.offchain_data (key, value)
VALUES ($1, $2)
ON CONFLICT (task) DO UPDATE
SET block = EXCLUDED.block, processed = NOW();
ON CONFLICT (key) DO NOTHING;
`

if _, err := dbTx.ExecContext(ctx, storeLastProcessedBlockSQL, task, block); err != nil {
return err
execer := db.execer(dbTx)
for _, d := range od {
if _, err := execer.ExecContext(
ctx, storeOffChainDataSQL,
d.Key.Hex(),
common.Bytes2Hex(d.Value),
); err != nil {
return err
}
}

return nil
}

// GetLastProcessedBlock returns the latest block successfully processed by the synchronizer for named task
func (db *pgDB) GetLastProcessedBlock(ctx context.Context, task string) (uint64, error) {
const getLastProcessedBlockSQL = "SELECT block FROM data_node.sync_tasks WHERE task = $1;"
// GetOffChainData returns the value identified by the key
func (db *pgDB) GetOffChainData(ctx context.Context, key common.Hash, dbTx sqlx.QueryerContext) (types.ArgBytes, error) {
const getOffchainDataSQL = `
SELECT value
FROM data_node.offchain_data
WHERE key = $1 LIMIT 1;
`

var (
lastBlock uint64
hexValue string
)

if err := db.pg.QueryRowContext(ctx, getLastProcessedBlockSQL, task).Scan(&lastBlock); err != nil {
return 0, err
if err := db.querier(dbTx).QueryRowxContext(ctx, getOffchainDataSQL, key.Hex()).Scan(&hexValue); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, ErrStateNotSynchronized
}
return nil, err
}

return lastBlock, nil
return common.FromHex(hexValue), nil
}

func (db *pgDB) execer(dbTx sqlx.ExecerContext) sqlx.ExecerContext {
if dbTx != nil {
return dbTx
}

return db.pg
}

func (db *pgDB) querier(dbTx sqlx.QueryerContext) sqlx.QueryerContext {
if dbTx != nil {
return dbTx
}

return db.pg
}
12 changes: 12 additions & 0 deletions db/migrations/0004.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- +migrate Down
DROP TABLE IF EXISTS data_node.unresolved_batches CASCADE;

-- +migrate Up
CREATE TABLE data_node.unresolved_batches
(
num BIGINT NOT NULL,
hash VARCHAR NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),

PRIMARY KEY (num, hash)
);
Loading

0 comments on commit 5b2bbc7

Please sign in to comment.