Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Block processing #402

Merged
merged 3 commits into from
Feb 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ services:
ports: []
command: nodemon workers/lowPriority.js

process_historical_blocks:
<<: *backend
ports: []
command: nodemon workers/processHistoricalBlocks.js

front:
extra_hosts:
- "app.ethernal.local:0.0.0.0"
Expand Down
1 change: 1 addition & 0 deletions fly.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ kill_timeout = "5s"
hpworker = "node workers/highPriority.js"
lpworker = "node workers/lowPriority.js"
mpworker = "node workers/mediumPriority.js"
phworker = "node workers/processHistoricalBlocks.js"

[[services]]
protocol = "tcp"
Expand Down
11 changes: 5 additions & 6 deletions run/api/demo.js
Original file line number Diff line number Diff line change
Expand Up @@ -86,19 +86,18 @@ router.post('/migrateExplorer', authMiddleware, async (req, res, next) => {
}
});


/*
Creates a free demo explorer from a RPC server
@param {string} rpcServer - The RPC server to use for the explorer
@param {string} name (optional) - The name of the explorer
@param {string} token (optional) - The native token to use for the explorer
@returns {object} - The explorer object

Demo explorers are deleted after 24 hours. They have a banner on the top of the page with
a link to convert to a free trial.
Some chainIds are blocked from being used for demo explorers. Those are the ones that have a
high volume of transactions, already have an explorer, and are likely to be rate limited causing
the explorer to not function properly.

@param {string} rpcServer - The RPC server to use for the explorer
@param {string} name (optional) - The name of the explorer
@param {string} token (optional) - The native token to use for the explorer
@returns {object} - The explorer object
*/
router.post('/explorers', async (req, res, next) => {
const data = req.body;
Expand Down
62 changes: 19 additions & 43 deletions run/jobs/processBlock.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
const { getNodeEnv } = require('../lib/env');
const { Block, Workspace, Explorer, Transaction } = require('../models');
const { bulkEnqueue, enqueue } = require('../lib/queue');
const STALLED_BLOCK_REMOVAL_DELAY = getNodeEnv() == 'production' ? 5 * 60 * 1000 : 15 * 60 * 1000;
const { Block, Workspace } = require('../models');

module.exports = async job => {
const data = job.data;
Expand All @@ -10,23 +7,11 @@ module.exports = async job => {
return 'Missing parameter';

const block = await Block.findByPk(data.blockId, {
include: [
{
model: Transaction,
as: 'transactions',
attributes: ['id', 'hash']
},
{
model: Workspace,
as: 'workspace',
attributes: ['id', 'public', 'tracing', 'integrityCheckStartBlockNumber'],
include: {
model: Explorer,
as: 'explorer',
attributes: ['id', 'shouldSync']
}
}
]
include: {
model: Workspace,
as: 'workspace',
include: 'explorer'
}
});

if (!block)
Expand All @@ -41,28 +26,19 @@ module.exports = async job => {
if (!block.workspace.explorer.shouldSync)
return 'Sync is disabled';

await enqueue('removeStalledBlock', `removeStalledBlock-${block.id}`, { blockId: block.id }, null, null, STALLED_BLOCK_REMOVAL_DELAY);

if (block.workspace.tracing && block.workspace.tracing != 'hardhat') {
const jobs = [];
for (let i = 0; i < block.transactions.length; i++) {
const transaction = block.transactions[i];
jobs.push({
name: `processTransactionTrace-${block.workspaceId}-${transaction.hash}`,
data: { transactionId: transaction.id }
});
}
await bulkEnqueue('processTransactionTrace', jobs);
}

if (block.workspace.integrityCheckStartBlockNumber === undefined || block.workspace.integrityCheckStartBlockNumber === null) {
const integrityCheckStartBlockNumber = block.number < 1000 ? 0 : block.number;
await block.workspace.update({ integrityCheckStartBlockNumber });
}
const client = block.workspace.getViemPublicClient();

if (block.number == block.workspace.integrityCheckStartBlockNumber) {
await enqueue('integrityCheck', `integrityCheck-${block.workspaceId}`, { workspaceId: block.workspaceId });
}
const feeHistory = await client.getFeeHistory({
blockCount: 1,
blockNumber: block.number,
rewardPercentiles: [20, 50, 75]
});

return true;
return block.safeCreateEvent({
baseFeePerGas: feeHistory.baseFeePerGas[0].toString(),
gasUsed: block.gasUsed,
gasLimit: block.gasLimit,
gasUsedRatio: feeHistory.gasUsedRatio[0].toString(),
priorityFeePerGas: feeHistory.reward[0].map(x => x.toString())
});
};
90 changes: 90 additions & 0 deletions run/migrations/20250208102954-create-block-event.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
'use strict';

/** @type {import('sequelize-cli').Migration} */
module.exports = {
async up (queryInterface, Sequelize) {
const transaction = await queryInterface.sequelize.transaction();
try {
await queryInterface.createTable('block_events', {
workspaceId: {
type: Sequelize.INTEGER,
allowNull: false,
references: {
key: 'id',
model: {
tableName: 'workspaces'
}
}
},
blockId: {
primaryKey: true,
type: Sequelize.INTEGER,
allowNull: false,
references: {
key: 'id',
model: {
tableName: 'blocks'
}
}
},
number : {
type: Sequelize.INTEGER,
allowNull: false,
},
timestamp: {
primaryKey: true,
type: 'TIMESTAMPTZ',
allowNull: false,
},
transactionCount: {
type: Sequelize.INTEGER,
allowNull: false
},
baseFeePerGas: {
type: Sequelize.NUMERIC,
allowNull: false
},
gasLimit: {
type: Sequelize.NUMERIC,
allowNull: true
},
gasUsed: {
type: Sequelize.NUMERIC,
allowNull: false
},
gasUsedRatio: {
type: Sequelize.FLOAT,
allowNull: false
},
priorityFeePerGas: {
type: Sequelize.ARRAY(Sequelize.NUMERIC),
allowNull: false
}
}, { transaction });

await queryInterface.sequelize.query(`SELECT create_hypertable('block_events', 'timestamp');`, { transaction });
await queryInterface.sequelize.query(`
CREATE INDEX "block_events_workspaceId_timestamp" ON block_events("workspaceId", timestamp DESC);
`, { transaction });

await transaction.commit();
} catch(error) {
console.log(error);
await transaction.rollback();
throw error;
}
},

async down (queryInterface, Sequelize) {
const transaction = await queryInterface.sequelize.transaction();
try {
await queryInterface.dropTable('block_events', { transaction });

await transaction.commit();
} catch(error) {
console.log(error);
await transaction.rollback();
throw error;
}
}
};
76 changes: 64 additions & 12 deletions run/models/block.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@ const {
Model,
Sequelize
} = require('sequelize');
const { trigger } = require('../lib/pusher');
const { enqueue } = require('../lib/queue');
const moment = require('moment');

const { trigger } = require('../lib/pusher');
const { enqueue, bulkEnqueue } = require('../lib/queue');
const { getNodeEnv } = require('../lib/env');

const STALLED_BLOCK_REMOVAL_DELAY = getNodeEnv() == 'production' ? 5 * 60 * 1000 : 15 * 60 * 1000;

module.exports = (sequelize, DataTypes) => {
class Block extends Model {
/**
Expand All @@ -17,15 +21,35 @@ module.exports = (sequelize, DataTypes) => {
static associate(models) {
Block.belongsTo(models.Workspace, { foreignKey: 'workspaceId', as: 'workspace' });
Block.hasMany(models.Transaction, { foreignKey: 'blockId', as: 'transactions' });
Block.hasOne(models.BlockEvent, { foreignKey: 'blockId', as: 'event' });
}

async safeDestroy(transaction) {
const transactions = await this.getTransactions();
for (let i = 0; i < transactions.length; i++)
await transactions[i].safeDestroy(transaction);

const event = await this.getEvent();
if (event)
await event.destroy({ transaction });

return this.destroy({ transaction });
}

safeCreateEvent(event, transaction) {
return this.createEvent({
workspaceId: this.workspaceId,
number: this.number,
timestamp: this.timestamp,
transactionCount: this.transactionsCount,
baseFeePerGas: event.baseFeePerGas,
gasLimit: event.gasLimit,
gasUsed: event.gasUsed,
gasUsedRatio: event.gasUsedRatio,
priorityFeePerGas: event.priorityFeePerGas,
}, { transaction });
}

async revertIfPartial() {
const transactions = await this.getTransactions();
const isSyncing = transactions.map(t => t.isSyncing).length > 0 || transactions.length != this.transactionsCount;
Expand All @@ -40,16 +64,44 @@ module.exports = (sequelize, DataTypes) => {
}

async afterCreate(options) {
const afterCreateFn = () => {
if (Date.now() / 1000 - this.timestamp < 60 * 10)
trigger(`private-blocks;workspace=${this.workspaceId}`, 'new', { number: this.number, withTransactions: this.transactionsCount > 0 });
return enqueue('processBlock', `processBlock-${this.id}`, { blockId: this.id });
};

if (options.transaction)
return options.transaction.afterCommit(afterCreateFn);
else
return afterCreateFn();
const afterCreateFn = async () => {
if (Date.now() / 1000 - this.timestamp < 60 * 10)
trigger(`private-blocks;workspace=${this.workspaceId}`, 'new', { number: this.number, withTransactions: this.transactionsCount > 0 });

const workspace = await this.getWorkspace();
if (workspace.public) {
await enqueue('removeStalledBlock', `removeStalledBlock-${this.id}`, { blockId: this.id }, null, null, STALLED_BLOCK_REMOVAL_DELAY);

if (workspace.tracing && workspace.tracing != 'hardhat') {
const jobs = [];
const transactions = await this.getTransactions();
for (let i = 0; i < transactions.length; i++) {
const transaction = transactions[i];
jobs.push({
name: `processTransactionTrace-${this.workspaceId}-${transaction.hash}`,
data: { transactionId: transaction.id }
});
}
await bulkEnqueue('processTransactionTrace', jobs);
}

if (workspace.integrityCheckStartBlockNumber === undefined || workspace.integrityCheckStartBlockNumber === null) {
const integrityCheckStartBlockNumber = this.number < 1000 ? 0 : this.number;
await workspace.update({ integrityCheckStartBlockNumber });
}

if (this.number == workspace.integrityCheckStartBlockNumber) {
await enqueue('integrityCheck', `integrityCheck-${this.workspaceId}`, { workspaceId: this.workspaceId });
}
}

return enqueue('processBlock', `processBlock-${this.id}`, { blockId: this.id });
};

if (options.transaction)
return options.transaction.afterCommit(afterCreateFn);
else
return afterCreateFn();
}
}
Block.init({
Expand Down
45 changes: 45 additions & 0 deletions run/models/blockevent.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
'use strict';
const {
Model
} = require('sequelize');
const moment = require('moment');
module.exports = (sequelize, DataTypes) => {
class BlockEvent extends Model {
/**
* Helper method for defining associations.
* This method is not a part of Sequelize lifecycle.
* The `models/index` file will call this method automatically.
*/
static associate(models) {
BlockEvent.belongsTo(models.Workspace, { foreignKey: 'workspaceId', as: 'workspace' });
BlockEvent.belongsTo(models.Block, { foreignKey: 'blockId', as: 'block' });
}
}
BlockEvent.init({
workspaceId: DataTypes.INTEGER,
blockId: DataTypes.INTEGER,
number: DataTypes.INTEGER,
timestamp: {
type: DataTypes.DATE,
primaryKey: true,
set(value) {
if (String(value).length > 10)
this.setDataValue('timestamp', moment(value).format());
else
this.setDataValue('timestamp', moment.unix(value).format());
}
},
transactionCount: DataTypes.INTEGER,
baseFeePerGas: DataTypes.STRING,
gasLimit: DataTypes.STRING,
gasUsed: DataTypes.STRING,
gasUsedRatio: DataTypes.STRING,
priorityFeePerGas: DataTypes.ARRAY(DataTypes.STRING)
}, {
sequelize,
timestamps: false,
modelName: 'BlockEvent',
tableName: 'block_events'
});
return BlockEvent;
};
Loading