-
Notifications
You must be signed in to change notification settings - Fork 5
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add checks to validate referential integrity in the data (#7)
* Add checks to validate referential integrity * Add unit tests for referential integrity validation * Use EXISTS in referential integrity validation queries
- Loading branch information
1 parent
effbdc3
commit cc935dc
Showing
7 changed files
with
707 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,224 @@ | ||
package validator | ||
|
||
import ( | ||
"fmt" | ||
|
||
"github.com/jmoiron/sqlx" | ||
) | ||
|
||
// ValidateReferentialIntegrity validates referential integrity at the given height | ||
func ValidateReferentialIntegrity(db *sqlx.DB, blockNumber uint64) error { | ||
|
||
err := ValidateHeaderCIDsRef(db, blockNumber) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
err = ValidateUncleCIDsRef(db, blockNumber) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
err = ValidateTransactionCIDsRef(db, blockNumber) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
err = ValidateReceiptCIDsRef(db, blockNumber) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
err = ValidateStateCIDsRef(db, blockNumber) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
err = ValidateStorageCIDsRef(db, blockNumber) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
err = ValidateStateAccountsRef(db, blockNumber) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
err = ValidateAccessListElementsRef(db, blockNumber) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
err = ValidateLogCIDsRef(db, blockNumber) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// ValidateHeaderCIDsRef does a reference integrity check on references in eth.header_cids table | ||
func ValidateHeaderCIDsRef(db *sqlx.DB, blockNumber uint64) error { | ||
err := ValidateIPFSBlocks(db, blockNumber, "eth.header_cids", "mh_key") | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// ValidateUncleCIDsRef does a reference integrity check on references in eth.uncle_cids table | ||
func ValidateUncleCIDsRef(db *sqlx.DB, blockNumber uint64) error { | ||
var exists bool | ||
err := db.Get(&exists, UncleCIDsRefHeaderCIDs, blockNumber) | ||
if err != nil { | ||
return err | ||
} | ||
if exists { | ||
return fmt.Errorf(ReferentialIntegrityErr, blockNumber, "eth.header_cids") | ||
} | ||
|
||
err = ValidateIPFSBlocks(db, blockNumber, "eth.uncle_cids", "mh_key") | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// ValidateTransactionCIDsRef does a reference integrity check on references in eth.header_cids table | ||
func ValidateTransactionCIDsRef(db *sqlx.DB, blockNumber uint64) error { | ||
var exists bool | ||
err := db.Get(&exists, TransactionCIDsRefHeaderCIDs, blockNumber) | ||
if err != nil { | ||
return err | ||
} | ||
if exists { | ||
return fmt.Errorf(ReferentialIntegrityErr, blockNumber, "eth.header_cids") | ||
} | ||
|
||
err = ValidateIPFSBlocks(db, blockNumber, "eth.transaction_cids", "mh_key") | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// ValidateReceiptCIDsRef does a reference integrity check on references in eth.receipt_cids table | ||
func ValidateReceiptCIDsRef(db *sqlx.DB, blockNumber uint64) error { | ||
var exists bool | ||
err := db.Get(&exists, ReceiptCIDsRefTransactionCIDs, blockNumber) | ||
if err != nil { | ||
return err | ||
} | ||
if exists { | ||
return fmt.Errorf(ReferentialIntegrityErr, blockNumber, "eth.transaction_cids") | ||
} | ||
|
||
err = ValidateIPFSBlocks(db, blockNumber, "eth.receipt_cids", "leaf_mh_key") | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// ValidateStateCIDsRef does a reference integrity check on references in eth.state_cids table | ||
func ValidateStateCIDsRef(db *sqlx.DB, blockNumber uint64) error { | ||
var exists bool | ||
err := db.Get(&exists, StateCIDsRefHeaderCIDs, blockNumber) | ||
if err != nil { | ||
return err | ||
} | ||
if exists { | ||
return fmt.Errorf(ReferentialIntegrityErr, blockNumber, "eth.header_cids") | ||
} | ||
|
||
err = ValidateIPFSBlocks(db, blockNumber, "eth.state_cids", "mh_key") | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// ValidateStorageCIDsRef does a reference integrity check on references in eth.storage_cids table | ||
func ValidateStorageCIDsRef(db *sqlx.DB, blockNumber uint64) error { | ||
var exists bool | ||
err := db.Get(&exists, StorageCIDsRefStateCIDs, blockNumber) | ||
if err != nil { | ||
return err | ||
} | ||
if exists { | ||
return fmt.Errorf(ReferentialIntegrityErr, blockNumber, "eth.state_cids") | ||
} | ||
|
||
err = ValidateIPFSBlocks(db, blockNumber, "eth.storage_cids", "mh_key") | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// ValidateStateAccountsRef does a reference integrity check on references in eth.state_accounts table | ||
func ValidateStateAccountsRef(db *sqlx.DB, blockNumber uint64) error { | ||
var exists bool | ||
err := db.Get(&exists, StateAccountsRefStateCIDs, blockNumber) | ||
if err != nil { | ||
return err | ||
} | ||
if exists { | ||
return fmt.Errorf(ReferentialIntegrityErr, blockNumber, "eth.state_cids") | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// ValidateAccessListElementsRef does a reference integrity check on references in eth.access_list_elements table | ||
func ValidateAccessListElementsRef(db *sqlx.DB, blockNumber uint64) error { | ||
var exists bool | ||
err := db.Get(&exists, AccessListElementsRefTransactionCIDs, blockNumber) | ||
if err != nil { | ||
return err | ||
} | ||
if exists { | ||
return fmt.Errorf(ReferentialIntegrityErr, blockNumber, "eth.transaction_cids") | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// ValidateLogCIDsRef does a reference integrity check on references in eth.log_cids table | ||
func ValidateLogCIDsRef(db *sqlx.DB, blockNumber uint64) error { | ||
var exists bool | ||
err := db.Get(&exists, LogCIDsRefReceiptCIDs, blockNumber) | ||
if err != nil { | ||
return err | ||
} | ||
if exists { | ||
return fmt.Errorf(ReferentialIntegrityErr, blockNumber, "eth.receipt_cids") | ||
} | ||
|
||
err = ValidateIPFSBlocks(db, blockNumber, "eth.log_cids", "leaf_mh_key") | ||
if err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// ValidateIPFSBlocks does a reference integrity check between the given CID table and IPFS blocks table on MHKey and block number | ||
func ValidateIPFSBlocks(db *sqlx.DB, blockNumber uint64, CIDTable string, mhKeyField string) error { | ||
var exists bool | ||
err := db.Get(&exists, fmt.Sprintf(CIDsRefIPLDBlocks, CIDTable, mhKeyField), blockNumber) | ||
if err != nil { | ||
return err | ||
} | ||
if exists { | ||
return fmt.Errorf(ReferentialIntegrityErr, blockNumber, "public.blocks") | ||
} | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
package validator | ||
|
||
// Queries to validate referential integrity in the indexed data: | ||
// At the given block number, | ||
// In each table, for each (would be) foreign key reference, perform left join with the referenced table on the foreign key fields. | ||
// Select rows where there are no matching rows in the referenced table. | ||
// If any such rows exist, there are missing entries in the referenced table. | ||
|
||
const ( | ||
CIDsRefIPLDBlocks = `SELECT EXISTS ( | ||
SELECT * | ||
FROM %[1]s | ||
LEFT JOIN public.blocks ON ( | ||
%[1]s.%[2]s = blocks.key | ||
AND %[1]s.block_number = blocks.block_number | ||
) | ||
WHERE | ||
%[1]s.block_number = $1 | ||
AND blocks.key IS NULL | ||
)` | ||
|
||
UncleCIDsRefHeaderCIDs = `SELECT EXISTS ( | ||
SELECT * | ||
FROM eth.uncle_cids | ||
LEFT JOIN eth.header_cids ON ( | ||
uncle_cids.header_id = header_cids.block_hash | ||
AND uncle_cids.block_number = header_cids.block_number | ||
) | ||
WHERE | ||
uncle_cids.block_number = $1 | ||
AND header_cids.block_hash IS NULL | ||
)` | ||
|
||
TransactionCIDsRefHeaderCIDs = `SELECT EXISTS ( | ||
SELECT * | ||
FROM eth.transaction_cids | ||
LEFT JOIN eth.header_cids ON ( | ||
transaction_cids.header_id = header_cids.block_hash | ||
AND transaction_cids.block_number = header_cids.block_number | ||
) | ||
WHERE | ||
transaction_cids.block_number = $1 | ||
AND header_cids.block_hash IS NULL | ||
)` | ||
|
||
ReceiptCIDsRefTransactionCIDs = `SELECT EXISTS ( | ||
SELECT * | ||
FROM eth.receipt_cids | ||
LEFT JOIN eth.transaction_cids ON ( | ||
receipt_cids.tx_id = transaction_cids.tx_hash | ||
AND receipt_cids.block_number = transaction_cids.block_number | ||
) | ||
WHERE | ||
receipt_cids.block_number = $1 | ||
AND transaction_cids.tx_hash IS NULL | ||
)` | ||
|
||
StateCIDsRefHeaderCIDs = `SELECT EXISTS ( | ||
SELECT * | ||
FROM eth.state_cids | ||
LEFT JOIN eth.header_cids ON ( | ||
state_cids.header_id = header_cids.block_hash | ||
AND state_cids.block_number = header_cids.block_number | ||
) | ||
WHERE | ||
state_cids.block_number = $1 | ||
AND header_cids.block_hash IS NULL | ||
)` | ||
|
||
StorageCIDsRefStateCIDs = `SELECT EXISTS ( | ||
SELECT * | ||
FROM eth.storage_cids | ||
LEFT JOIN eth.state_cids ON ( | ||
storage_cids.state_path = state_cids.state_path | ||
AND storage_cids.header_id = state_cids.header_id | ||
AND storage_cids.block_number = state_cids.block_number | ||
) | ||
WHERE | ||
storage_cids.block_number = $1 | ||
AND state_cids.state_path IS NULL | ||
)` | ||
|
||
StateAccountsRefStateCIDs = `SELECT EXISTS ( | ||
SELECT * | ||
FROM eth.state_accounts | ||
LEFT JOIN eth.state_cids ON ( | ||
state_accounts.state_path = state_cids.state_path | ||
AND state_accounts.header_id = state_cids.header_id | ||
AND state_accounts.block_number = state_cids.block_number | ||
) | ||
WHERE | ||
state_accounts.block_number = $1 | ||
AND state_cids.state_path IS NULL | ||
)` | ||
|
||
AccessListElementsRefTransactionCIDs = `SELECT EXISTS ( | ||
SELECT * | ||
FROM eth.access_list_elements | ||
LEFT JOIN eth.transaction_cids ON ( | ||
access_list_elements.tx_id = transaction_cids.tx_hash | ||
AND access_list_elements.block_number = transaction_cids.block_number | ||
) | ||
WHERE | ||
access_list_elements.block_number = $1 | ||
AND transaction_cids.tx_hash IS NULL | ||
)` | ||
|
||
LogCIDsRefReceiptCIDs = `SELECT EXISTS ( | ||
SELECT * | ||
FROM eth.log_cids | ||
LEFT JOIN eth.receipt_cids ON ( | ||
log_cids.rct_id = receipt_cids.tx_id | ||
AND log_cids.block_number = receipt_cids.block_number | ||
) | ||
WHERE | ||
log_cids.block_number = $1 | ||
AND receipt_cids.tx_id IS NULL | ||
)` | ||
) |
Oops, something went wrong.