From 0adb1b7d5c31b620ae3c8cd352200df16e031ede Mon Sep 17 00:00:00 2001 From: zeroxbt Date: Fri, 15 Mar 2024 15:11:45 +0900 Subject: [PATCH 1/3] fix send message timeout --- package-lock.json | 14 -- package.json | 1 - .../network/implementation/libp2p-service.js | 153 ++++++++---------- 3 files changed, 66 insertions(+), 102 deletions(-) diff --git a/package-lock.json b/package-lock.json index 9af8e44640..df817722a9 100644 --- a/package-lock.json +++ b/package-lock.json @@ -56,7 +56,6 @@ "rolling-rate-limiter": "^0.2.13", "semver": "^7.5.2", "sequelize": "^6.29.0", - "timeout-abort-controller": "^3.0.0", "toobusy-js": "^0.5.1", "uint8arrays": "^3.1.0", "umzug": "^3.2.1", @@ -19647,13 +19646,6 @@ "node": ">=0.10.0" } }, - "node_modules/timeout-abort-controller": { - "version": "3.0.0", - "license": "MIT", - "dependencies": { - "retimer": "^3.0.0" - } - }, "node_modules/tiny-emitter": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/tiny-emitter/-/tiny-emitter-2.1.0.tgz", @@ -35143,12 +35135,6 @@ "version": "4.0.1", "dev": true }, - "timeout-abort-controller": { - "version": "3.0.0", - "requires": { - "retimer": "^3.0.0" - } - }, "tiny-emitter": { "version": "2.1.0", "resolved": "https://registry.npmjs.org/tiny-emitter/-/tiny-emitter-2.1.0.tgz", diff --git a/package.json b/package.json index 73b34eff3b..1e9ea5b7d6 100644 --- a/package.json +++ b/package.json @@ -111,7 +111,6 @@ "rolling-rate-limiter": "^0.2.13", "semver": "^7.5.2", "sequelize": "^6.29.0", - "timeout-abort-controller": "^3.0.0", "toobusy-js": "^0.5.1", "uint8arrays": "^3.1.0", "umzug": "^3.2.1", diff --git a/src/modules/network/implementation/libp2p-service.js b/src/modules/network/implementation/libp2p-service.js index 385d9be40e..abfd28a5bf 100644 --- a/src/modules/network/implementation/libp2p-service.js +++ b/src/modules/network/implementation/libp2p-service.js @@ -14,7 +14,6 @@ import { InMemoryRateLimiter } from 'rolling-rate-limiter'; import toobusy from 'toobusy-js'; import { mkdir, writeFile, readFile, stat } from 'fs/promises'; import ip from 'ip'; -import { TimeoutController } from 'timeout-abort-controller'; import { NETWORK_API_RATE_LIMIT, NETWORK_API_SPAM_DETECTION, @@ -327,113 +326,93 @@ class Libp2pService { this.logger.trace( `Dialing remotePeerId: ${peerIdString} with public ip: ${publicIp}: protocol: ${protocol}, messageType: ${messageType} , operationId: ${operationId}`, ); - let dialResult; - let dialStart; - let dialEnd; - try { - dialStart = Date.now(); - dialResult = await this.node.dialProtocol(peerIdObject, protocol); - dialEnd = Date.now(); - } catch (error) { - dialEnd = Date.now(); - nackMessage.data.errorMessage = `Unable to dial peer: ${peerIdString}. protocol: ${protocol}, messageType: ${messageType} , operationId: ${operationId}, dial execution time: ${ - dialEnd - dialStart - } ms. Error: ${error.message}`; - - return nackMessage; - } - this.logger.trace( - `Created stream for peer: ${peerIdString}. protocol: ${protocol}, messageType: ${messageType} , operationId: ${operationId}, dial execution time: ${ - dialEnd - dialStart - } ms.`, - ); - const { stream } = dialResult; + const timeoutSignal = AbortSignal.timeout(timeout); + const abortError = new Error('Message timed out'); + let stream; + let response; + let errorMessage; + let operationStart; + let operationEnd; + const onAbort = () => { + if (stream) stream.abort(abortError); + response = null; + }; - this.updateSessionStream(operationId, keywordUuid, peerIdString, stream); + timeoutSignal.addEventListener('abort', onAbort, { once: true }); - const streamMessage = this.createStreamMessage( - message, - operationId, - keywordUuid, - messageType, - ); + try { + errorMessage = `Unable to dial peer: ${peerIdString}. protocol: ${protocol}, messageType: ${messageType} , operationId: ${operationId}.`; + operationStart = Date.now(); + const dialResult = await this.node.dialProtocol(peerIdObject, protocol, { + signal: timeoutSignal, + }); + operationEnd = Date.now(); + if (timeoutSignal.aborted) { + throw abortError; + } - this.logger.trace( - `Sending message to ${peerIdString}. protocol: ${protocol}, messageType: ${messageType}, operationId: ${operationId}`, - ); + this.logger.trace( + `Created stream for peer: ${peerIdString}. protocol: ${protocol}, messageType: ${messageType} , operationId: ${operationId}, dial execution time: ${ + operationEnd - operationStart + } ms.`, + ); - let sendMessageStart; - let sendMessageEnd; - try { - sendMessageStart = Date.now(); - await this._sendMessageToStream(stream, streamMessage); - sendMessageEnd = Date.now(); - } catch (error) { - sendMessageEnd = Date.now(); - nackMessage.data.errorMessage = `Unable to send message to peer: ${peerIdString}. protocol: ${protocol}, messageType: ${messageType}, operationId: ${operationId}, execution time: ${ - sendMessageEnd - sendMessageStart - } ms. Error: ${error.message}`; + stream = dialResult.stream; - return nackMessage; - } + this.updateSessionStream(operationId, keywordUuid, peerIdString, stream); - let readResponseStart; - let readResponseEnd; - let response; - const timeoutController = new TimeoutController(timeout); - try { - readResponseStart = Date.now(); + const streamMessage = this.createStreamMessage( + message, + operationId, + keywordUuid, + messageType, + ); - timeoutController.signal.addEventListener( - 'abort', - async () => { - stream.abort(); - response = null; - }, - { once: true }, + this.logger.trace( + `Sending message to ${peerIdString}. protocol: ${protocol}, messageType: ${messageType}, operationId: ${operationId}`, ); + errorMessage = `Unable to send message to peer: ${peerIdString}. protocol: ${protocol}, messageType: ${messageType}, operationId: ${operationId}.`; + operationStart = Date.now(); + await this._sendMessageToStream(stream, streamMessage); + operationEnd = Date.now(); + if (timeoutSignal.aborted) { + throw abortError; + } + + errorMessage = `Unable to read response from peer ${peerIdString}. protocol: ${protocol}, messageType: ${messageType} , operationId: ${operationId}`; + operationStart = Date.now(); response = await this._readMessageFromStream( stream, this.isResponseValid.bind(this), peerIdString, ); - - if (timeoutController.signal.aborted) { - throw Error('Message timed out!'); + operationEnd = Date.now(); + if (timeoutSignal.aborted) { + throw abortError; } - timeoutController.signal.removeEventListener('abort'); - timeoutController.clear(); + this.logger.trace( + `Receiving response from ${peerIdString}. protocol: ${protocol}, messageType: ${ + response.message?.header?.messageType + }, operationId: ${operationId}, execution time: ${ + operationEnd - operationStart + } ms.`, + ); - readResponseEnd = Date.now(); + if (!response.valid) { + nackMessage.data.errorMessage = 'Invalid response'; + return nackMessage; + } } catch (error) { - timeoutController.signal.removeEventListener('abort'); - timeoutController.clear(); - - readResponseEnd = Date.now(); - nackMessage.data.errorMessage = `Unable to read response from peer ${peerIdString}. protocol: ${protocol}, messageType: ${messageType} , operationId: ${operationId}, execution time: ${ - readResponseEnd - readResponseStart - } ms. Error: ${error.message}`; - + nackMessage.data.errorMessage = `${errorMessage} Execution time: ${ + (operationEnd ?? Date.now()) - operationStart + } ms. Error: ${error.message.slice(0, 145)}`; return nackMessage; + } finally { + timeoutSignal.removeEventListener('abort', onAbort); } - - this.logger.trace( - `Receiving response from ${peerIdString}. protocol: ${protocol}, messageType: ${ - response.message?.header?.messageType - }, operationId: ${operationId}, execution time: ${ - readResponseEnd - readResponseStart - } ms.`, - ); - - if (!response.valid) { - nackMessage.data.errorMessage = 'Invalid response'; - - return nackMessage; - } - return response.message; } From 758e43def34770e45077362ca0e2d6bea4bac4bb Mon Sep 17 00:00:00 2001 From: zeroxbt Date: Fri, 15 Mar 2024 15:24:30 +0900 Subject: [PATCH 2/3] refactor error message creation --- .../network/implementation/libp2p-service.js | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/modules/network/implementation/libp2p-service.js b/src/modules/network/implementation/libp2p-service.js index abfd28a5bf..ae93bcbc49 100644 --- a/src/modules/network/implementation/libp2p-service.js +++ b/src/modules/network/implementation/libp2p-service.js @@ -323,10 +323,6 @@ class Libp2pService { .map((addr) => addr.toString().split('/')) .filter((splittedAddr) => !ip.isPrivate(splittedAddr[2]))[0]?.[2]; - this.logger.trace( - `Dialing remotePeerId: ${peerIdString} with public ip: ${publicIp}: protocol: ${protocol}, messageType: ${messageType} , operationId: ${operationId}`, - ); - const timeoutSignal = AbortSignal.timeout(timeout); const abortError = new Error('Message timed out'); let stream; @@ -342,7 +338,11 @@ class Libp2pService { timeoutSignal.addEventListener('abort', onAbort, { once: true }); try { - errorMessage = `Unable to dial peer: ${peerIdString}. protocol: ${protocol}, messageType: ${messageType} , operationId: ${operationId}.`; + this.logger.trace( + `Dialing remotePeerId: ${peerIdString} with public ip: ${publicIp}: protocol: ${protocol}, messageType: ${messageType} , operationId: ${operationId}`, + ); + + errorMessage = `dial`; operationStart = Date.now(); const dialResult = await this.node.dialProtocol(peerIdObject, protocol, { signal: timeoutSignal, @@ -373,7 +373,7 @@ class Libp2pService { `Sending message to ${peerIdString}. protocol: ${protocol}, messageType: ${messageType}, operationId: ${operationId}`, ); - errorMessage = `Unable to send message to peer: ${peerIdString}. protocol: ${protocol}, messageType: ${messageType}, operationId: ${operationId}.`; + errorMessage = `send message`; operationStart = Date.now(); await this._sendMessageToStream(stream, streamMessage); operationEnd = Date.now(); @@ -381,7 +381,7 @@ class Libp2pService { throw abortError; } - errorMessage = `Unable to read response from peer ${peerIdString}. protocol: ${protocol}, messageType: ${messageType} , operationId: ${operationId}`; + errorMessage = `read response`; operationStart = Date.now(); response = await this._readMessageFromStream( stream, @@ -406,7 +406,7 @@ class Libp2pService { return nackMessage; } } catch (error) { - nackMessage.data.errorMessage = `${errorMessage} Execution time: ${ + nackMessage.data.errorMessage = `Unable to ${errorMessage} from peer ${peerIdString}. protocol: ${protocol}, messageType: ${messageType} , operationId: ${operationId}. Execution time: ${ (operationEnd ?? Date.now()) - operationStart } ms. Error: ${error.message.slice(0, 145)}`; return nackMessage; From b89cec0536f53251614d4a47ef7c41c3f0d55e8f Mon Sep 17 00:00:00 2001 From: zeroxbt Date: Sat, 16 Mar 2024 04:52:36 +0900 Subject: [PATCH 3/3] fix dial timeout --- src/modules/network/implementation/libp2p-service.js | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/src/modules/network/implementation/libp2p-service.js b/src/modules/network/implementation/libp2p-service.js index ae93bcbc49..b67fd15588 100644 --- a/src/modules/network/implementation/libp2p-service.js +++ b/src/modules/network/implementation/libp2p-service.js @@ -325,6 +325,10 @@ class Libp2pService { const timeoutSignal = AbortSignal.timeout(timeout); const abortError = new Error('Message timed out'); + + const abortPromise = new Promise((_, reject) => { + timeoutSignal.onabort = () => reject(abortError); + }); let stream; let response; let errorMessage; @@ -335,18 +339,18 @@ class Libp2pService { response = null; }; - timeoutSignal.addEventListener('abort', onAbort, { once: true }); - try { + timeoutSignal.addEventListener('abort', onAbort, { once: true }); this.logger.trace( `Dialing remotePeerId: ${peerIdString} with public ip: ${publicIp}: protocol: ${protocol}, messageType: ${messageType} , operationId: ${operationId}`, ); errorMessage = `dial`; operationStart = Date.now(); - const dialResult = await this.node.dialProtocol(peerIdObject, protocol, { + const dialPromise = this.node.dialProtocol(peerIdObject, protocol, { signal: timeoutSignal, }); + const dialResult = await Promise.race([dialPromise, abortPromise]); operationEnd = Date.now(); if (timeoutSignal.aborted) { throw abortError;