From 679e3db69c411133708bb0271fb7692be3a62934 Mon Sep 17 00:00:00 2001 From: Konstantin Fastov Date: Tue, 1 Oct 2024 21:06:42 +0300 Subject: [PATCH] switch to polling, claim some messages now --- src/services/rootPropagator/config.js | 3 + src/services/rootPropagator/database.js | 14 +-- .../modules/L1/L1MessageListener.js | 62 ++++++---- .../modules/L2/L2MessageHandler.js | 106 ++++++++++++++---- src/services/rootPropagator/scheduler.js | 22 ++-- 5 files changed, 146 insertions(+), 61 deletions(-) diff --git a/src/services/rootPropagator/config.js b/src/services/rootPropagator/config.js index 3bdedb6..44b754c 100644 --- a/src/services/rootPropagator/config.js +++ b/src/services/rootPropagator/config.js @@ -6,12 +6,15 @@ dotenv.config(); const config = { propagationPeriod: parseInt(process.env.PROPAGATION_PERIOD) || 3600000, // Default to 1 hour listenerInitDelay: parseInt(process.env.LISTENER_INIT_DELAY) || 5000, // Default to 5 seconds + l1BlocksToQuery: parseInt(process.env.L1_BLOCKS_TO_QUERY) || 100000, // Default to 100,000 blocks + l2BlocksToQuery: parseInt(process.env.L2_BLOCKS_TO_QUERY) || 100000, // Default to 100,000 blocks privateKey: process.env.PRIVATE_KEY, l1RpcUrl: process.env.L1_RPC_URL, l2RpcUrl: process.env.L2_RPC_URL, lineaStateBridgeAddress: process.env.LINEA_STATE_BRIDGE_ADDRESS, l1MessageServiceAddress: process.env.L1_MESSAGE_SERVICE_ADDRESS, l2MessageServiceAddress: process.env.L2_MESSAGE_SERVICE_ADDRESS, + l2PollingInterval: parseInt(process.env.L2_POLLING_INTERVAL) || 30000, // Default to 30 seconds }; export function validateConfig() { diff --git a/src/services/rootPropagator/database.js b/src/services/rootPropagator/database.js index 841ac01..0606722 100644 --- a/src/services/rootPropagator/database.js +++ b/src/services/rootPropagator/database.js @@ -28,11 +28,11 @@ export async function saveMessage(messageData) { }); } -export async function getUnconfirmedMessages() { +export async function getUnclaimedMessages() { return new Promise((resolve, reject) => { - db.find({ status: 'pending' }, (err, docs) => { + db.find({ status: 'confirmed' }, (err, docs) => { if (err) { - console.error('Failed to get unconfirmed messages:', err); + console.error('Failed to get unclaimed messages:', err); reject(err); } else { resolve(docs); @@ -55,14 +55,14 @@ export async function updateMessageStatus(messageHash, status) { }); } -export async function deleteConfirmedMessages() { +export async function deleteClaimedMessages() { return new Promise((resolve, reject) => { - db.remove({ status: 'confirmed' }, { multi: true }, (err, numRemoved) => { + db.remove({ status: 'claimed' }, { multi: true }, (err, numRemoved) => { if (err) { - console.error('Failed to delete confirmed messages:', err); + console.error('Failed to delete claimed messages:', err); reject(err); } else { - console.log(`Deleted ${numRemoved} confirmed messages`); + console.log(`Deleted ${numRemoved} claimed messages`); resolve(numRemoved); } }); diff --git a/src/services/rootPropagator/modules/L1/L1MessageListener.js b/src/services/rootPropagator/modules/L1/L1MessageListener.js index f086b21..5ac6abb 100644 --- a/src/services/rootPropagator/modules/L1/L1MessageListener.js +++ b/src/services/rootPropagator/modules/L1/L1MessageListener.js @@ -14,28 +14,48 @@ export async function listenForL1Messages() { const l1MessageServiceContract = new ethers.Contract(config.l1MessageServiceAddress, abi, provider); const filter = l1MessageServiceContract.filters.MessageSent(config.lineaStateBridgeAddress); - - l1MessageServiceContract.on(filter, async (payload) => { - logger.info(`MessageSent Event Detected:`); - const [_from, _to, _fee, _value, nonce, calldata, messageHash] = payload.args; - logger.info(`Message details`, { - nonce, - messageHash, - calldata, - blockNumber: payload.log.blockNumber, - transactionHash: payload.log.transactionHash - }); - try { - await saveMessage({ - messageHash: messageHash, - nonce: nonce, - calldata: calldata, - }); - } catch (error) { - logger.error('Error saving message to database:', { error: error.message }); - } + // Process past events + const latestBlock = await provider.getBlockNumber(); + const blocksToQuery = config.l1BlocksToQuery || 100000; // Default to 100,000 blocks if not specified + const fromBlock = Math.max(0, latestBlock - blocksToQuery); + logger.info(`Querying past events from block ${fromBlock} to ${latestBlock}`); + + const pastEvents = await l1MessageServiceContract.queryFilter(filter, fromBlock, latestBlock); + logger.info(`Found ${pastEvents.length} past events`); + + for (const event of pastEvents) { + await processEvent(event); + } + + // Set up listener for new events + l1MessageServiceContract.on(filter, async (event) => { + await processEvent(event); }); - logger.info('L1 Message Listener started'); + logger.info('L1 Message Listener started for new events'); +} + +async function processEvent(event) { + const [_from, to, fee, value, nonce, calldata, messageHash] = event.args; + logger.info('Processing MessageSent Event', { + nonce, + messageHash, + }); + + try { + await saveMessage({ + destination: to, + messageHash: messageHash, + nonce: nonce.toString(), + fee: fee.toString(), + value: value.toString(), + calldata: calldata, + status: 'pending', + blockNumber: event.blockNumber, + transactionHash: event.transactionHash + }); + } catch (error) { + logger.error('Error saving message to database:', { error: error.message, messageHash }); + } } \ No newline at end of file diff --git a/src/services/rootPropagator/modules/L2/L2MessageHandler.js b/src/services/rootPropagator/modules/L2/L2MessageHandler.js index 7e3fa46..7802d03 100644 --- a/src/services/rootPropagator/modules/L2/L2MessageHandler.js +++ b/src/services/rootPropagator/modules/L2/L2MessageHandler.js @@ -1,7 +1,7 @@ import { ethers } from 'ethers'; import { LineaSDK } from '@consensys/linea-sdk'; import config from '../../config.js'; -import { getUnconfirmedMessages, updateMessageStatus, deleteConfirmedMessages } from '../../database.js'; +import { getUnclaimedMessages, updateMessageStatus, deleteClaimedMessages } from '../../database.js'; import { createModuleLogger } from '../../utils/logger.js'; const logger = createModuleLogger('L2MessageHandler'); @@ -10,21 +10,72 @@ const abi = [ "event L1L2MessageHashesAddedToInbox(bytes32[] messageHashes)" ]; -export function listenForL2Messages() { +let latestProcessedBlock = 0; + +export async function setupL2EventPolling() { const provider = new ethers.JsonRpcProvider(config.l2RpcUrl); const l2MessageServiceContract = new ethers.Contract(config.l2MessageServiceAddress, abi, provider); - l2MessageServiceContract.on("L1L2MessageHashesAddedToInbox", async (messageHashes) => { - logger.info('New message hashes received on L2', { messageHashes }); - try { - // Update all message statuses in a batch - await updateMessageStatuses(messageHashes); - } catch (error) { - logger.error('Error updating message statuses', { error: error.message }); + // Process past events + const latestBlock = await provider.getBlockNumber(); + const blocksToQuery = config.l2BlocksToQuery || 100000; // Default to 100,000 blocks if not specified + const fromBlock = Math.max(0, latestBlock - blocksToQuery); + logger.info(`Querying past L2 events from block ${fromBlock} to ${latestBlock}`); + + const filter = l2MessageServiceContract.filters.L1L2MessageHashesAddedToInbox(); + const pastEvents = await l2MessageServiceContract.queryFilter(filter, fromBlock, latestBlock); + logger.info(`Found ${pastEvents.length} past L2 events`); + + for (const event of pastEvents) { + await processEvent(event); + } + + // Set the latest processed block after handling past events + latestProcessedBlock = latestBlock; + logger.info(`Processed past events up to block ${latestProcessedBlock}`); + + // Set up interval for polling new events + setInterval(() => pollL2Events(provider, l2MessageServiceContract), config.l2PollingInterval); +} + +async function pollL2Events(provider, l2MessageServiceContract) { + try { + const latestBlock = await provider.getBlockNumber(); + + if (latestBlock <= latestProcessedBlock) { + logger.debug('No new blocks to process'); + return; + } + + logger.info(`Polling L2 events from block ${latestProcessedBlock + 1} to ${latestBlock}`); + + const filter = l2MessageServiceContract.filters.L1L2MessageHashesAddedToInbox(); + const events = await l2MessageServiceContract.queryFilter(filter, latestProcessedBlock + 1, latestBlock); + + for (const event of events) { + await processEvent(event); } + + latestProcessedBlock = latestBlock; + logger.info(`Processed events up to block ${latestProcessedBlock}`); + } catch (error) { + logger.error('Error polling L2 events', { error: error.message }); + } +} + +async function processEvent(event) { + const messageHashes = event.args.messageHashes; + logger.info('Processing L1L2MessageHashesAddedToInbox Event:', { + messageHashes, + blockNumber: event.blockNumber, + transactionHash: event.transactionHash }); - logger.info('L2 Message Listener started'); + try { + await updateMessageStatuses(messageHashes); + } catch (error) { + logger.error('Error updating message statuses', { error: error.message }); + } } async function updateMessageStatuses(messageHashes) { @@ -37,38 +88,49 @@ async function updateMessageStatuses(messageHashes) { await Promise.all(updatePromises); } -export async function confirmL2Messages() { +export async function claimL2Messages() { const lineaSDK = new LineaSDK({ l2RpcUrl: config.l2RpcUrl, l2SignerPrivateKey: config.privateKey, - network: "linea-mainnet", + network: "linea-sepolia", // TODO: make this dynamic based on environment mode: "read-write", }); const l2Contract = lineaSDK.getL2Contract(config.l2MessageServiceAddress); - const unconfirmedMessages = await getUnconfirmedMessages(); + const unclaimedMessages = await getUnclaimedMessages(); - for (const message of unconfirmedMessages) { + for (const message of unclaimedMessages) { try { + console.log('Claiming message', { messageHash: message.messageHash }); + console.log('message contents', { + messageSender: config.lineaStateBridgeAddress, + destination: message.destination, + fee: message.fee, + value: message.value, + messageNonce: message.nonce, + calldata: message.calldata, + messageHash: message.messageHash + }); const tx = await l2Contract.claim({ messageSender: config.lineaStateBridgeAddress, - destination: config.l2MessageServiceAddress, - fee: "0", // Assuming no fee for now - value: "0", // Assuming no value transfer + destination: message.destination, + fee: message.fee, + value: message.value, messageNonce: message.nonce, calldata: message.calldata, messageHash: message.messageHash }); await tx.wait(); - logger.info('Message confirmed successfully', { messageHash: message.messageHash }); - await updateMessageStatus(message.messageHash, 'confirmed'); + logger.info('Message claimed successfully', { messageHash: message.messageHash }); + await updateMessageStatus(message.messageHash, 'claimed'); } catch (error) { - logger.error('Error confirming message', { messageHash: message.messageHash, error: error.message }); + logger.error('Error claimed message', { messageHash: message.messageHash, error: error.message }); } + // break // process only one message for now } - // Clean up confirmed messages - await deleteConfirmedMessages(); + // Clean up claimed messages + await deleteClaimedMessages(); } \ No newline at end of file diff --git a/src/services/rootPropagator/scheduler.js b/src/services/rootPropagator/scheduler.js index b0ebd49..1ec2d0c 100644 --- a/src/services/rootPropagator/scheduler.js +++ b/src/services/rootPropagator/scheduler.js @@ -2,7 +2,7 @@ import cron from 'node-cron'; import config from './config.js'; import { propagateRoot } from './modules/L1/LineaRootPropagator.js'; import { listenForL1Messages } from './modules/L1/L1MessageListener.js'; -import { listenForL2Messages, confirmL2Messages } from './modules/L2/L2MessageHandler.js'; +import { setupL2EventPolling, claimL2Messages } from './modules/L2/L2MessageHandler.js'; import { createModuleLogger } from './utils/logger.js'; const logger = createModuleLogger('Scheduler'); @@ -12,29 +12,29 @@ export async function startScheduler() { logger.info('Starting L1 Message Listener...'); await listenForL1Messages(); - // Start L2 message listener (runs continuously) - logger.info('Starting L2 Message Listener...'); - listenForL2Messages(); + // Start L2 event polling + logger.info('Setting up L2 Event Polling...'); + await setupL2EventPolling(); // Delay the first root propagation to ensure listeners are ready logger.info(`Waiting for listeners to initialize (${config.listenerInitDelay}ms)...`); await new Promise(resolve => setTimeout(resolve, config.listenerInitDelay)); // Execute initial root propagation - logger.info('Executing initial root propagation...'); - await propagateRoot(); +// logger.info('Executing initial root propagation...'); +// await propagateRoot(); // Schedule subsequent root propagations const propagationPeriodMinutes = Math.floor(config.propagationPeriod / 60000); cron.schedule(`*/${propagationPeriodMinutes} * * * *`, async () => { logger.info('Executing scheduled root propagation...'); - await propagateRoot(); + // await propagateRoot(); }); - // Schedule L2 message confirmation (every 5 minutes) - cron.schedule('*/5 * * * *', async () => { - logger.info('Confirming L2 messages...'); - await confirmL2Messages(); + // Schedule L2 message confirmation (every 1 minutes) + cron.schedule('*/1 * * * *', async () => { + logger.info('Claiming L2 messages...'); + await claimL2Messages(); }); logger.info('Scheduler started successfully');