From cada5d1d2e456423fd58db6992d9b92d9ca92cf4 Mon Sep 17 00:00:00 2001 From: codewithmecoder Date: Fri, 7 Feb 2025 22:40:06 +0700 Subject: [PATCH 1/4] check rog --- src/main.rs | 4 +- src/model.rs | 108 +++++++++++++++++++++++++++++++++ src/transaction_processor.rs | 113 +++++++++++++++++++++++++---------- 3 files changed, 194 insertions(+), 31 deletions(-) diff --git a/src/main.rs b/src/main.rs index 9fe1ff6..fd2aba0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,6 +6,8 @@ mod telegram; mod transaction_processor; mod websocket; +use std::clone; + use crate::{ database::Database, price_monitor::PriceMonitor, telegram::TelegramNotifier, websocket::SolanaWebsocket, @@ -41,7 +43,7 @@ async fn main() -> Result<(), Box> { while let Some(message) = stream.next().await { // Process transaction let signer = Keypair::from_base58_string(&config.private_key); - let processor = transaction_processor::TransactionProcessor::new(&config.helius_rpc_url); + let processor = transaction_processor::TransactionProcessor::new(config.clone()); // let notifier = notifier.clone(); if let Err(e) = processor.process_transaction(&message, &signer).await { diff --git a/src/model.rs b/src/model.rs index cb75bf0..64af180 100644 --- a/src/model.rs +++ b/src/model.rs @@ -54,3 +54,111 @@ pub struct Value { pub signature: Option, pub logs: Option>, } + +pub type TrxDetailRes = Vec; + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct TrxDetailRe { + pub description: String, + + #[serde(rename = "type")] + pub trx_detail_re_type: String, + + pub source: String, + + pub fee: i64, + + pub fee_payer: String, + + pub signature: String, + + pub slot: i64, + + pub timestamp: i64, + + pub token_transfers: Vec, + + pub native_transfers: Vec, + + pub account_data: Vec, + + pub transaction_error: TransactionError, + + pub instructions: Vec, + + pub events: Events, +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct AccountDatum { + pub account: String, + + pub native_balance_change: i64, + + pub token_balance_changes: Vec>, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct Events {} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct Instruction { + pub accounts: Vec, + + pub data: String, + + pub program_id: String, + + pub inner_instructions: Option>, +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct NativeTransfer { + pub from_user_account: String, + + pub to_user_account: String, + + pub amount: i64, +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct TokenTransfer { + pub from_token_account: String, + + pub to_token_account: String, + + pub from_user_account: String, + + pub to_user_account: String, + + pub token_amount: i64, + + pub mint: String, + + pub token_standard: String, +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "PascalCase")] +pub struct TransactionError { + pub instruction_error: Vec, +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(untagged)] +pub enum InstructionErrorElement { + InstructionErrorClass(InstructionErrorClass), + + Integer(i64), +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "PascalCase")] +pub struct InstructionErrorClass { + pub custom: i64, +} diff --git a/src/transaction_processor.rs b/src/transaction_processor.rs index 16e5e64..88f6b27 100644 --- a/src/transaction_processor.rs +++ b/src/transaction_processor.rs @@ -1,8 +1,11 @@ use serde_json::Value; -use solana_sdk::signature::{Signature, Signer}; +use solana_sdk::signature::Signer; use thiserror::Error; -use crate::model::StreamMessage; +use crate::{ + config::Config, + model::{StreamMessage, TrxDetailRes}, +}; #[derive(Error, Debug)] pub enum TransactionError { @@ -15,14 +18,12 @@ pub enum TransactionError { } pub struct TransactionProcessor { - rpc_url: String, + config: Config, } impl TransactionProcessor { - pub fn new(rpc_url: &str) -> Self { - Self { - rpc_url: rpc_url.to_string(), - } + pub fn new(config: Config) -> Self { + Self { config } } pub async fn process_transaction( @@ -70,10 +71,20 @@ impl TransactionProcessor { .iter() .find(|x| x.starts_with("Program log: initialize2: InitializeInstruction2")); if contains_create.is_some() { - println!( - "Create pool transaction detected: {}", - serde_json::to_string(&data).unwrap() - ); + println!("======================"); + println!("🔎 New Liquidity Pool found."); + println!("🔃 Fetching transaction details..."); + let signature = signature.as_ref().map_or("", String::as_str); + let trx_details = self.fetch_trx_details(signature).await; + match trx_details { + Ok(trx_details) => { + println!("🔎 Transaction details fetched successfully."); + println!("🔎 Transaction details: {:?}", trx_details); + } + Err(e) => { + println!("🔎 Error fetching transaction details: {}", e); + } + } } } } @@ -82,25 +93,67 @@ impl TransactionProcessor { Ok(()) } - async fn get_token_metadata(&self, mint_address: &str) -> Result { + async fn fetch_trx_details(&self, signature: &str) -> Result { + let mut count_retry = 0; let client = reqwest::Client::new(); - let response = client - .post(&self.rpc_url) - .json(&serde_json::json!({ - "jsonrpc": "2.0", - "id": 1, - "method": "getAccountInfo", - "params": [mint_address, {"encoding": "jsonParsed"}] - })) - .send() - .await - .map_err(|e| TransactionError::RpcError(e.to_string()))?; - - let result: Value = response - .json() - .await - .map_err(|e| TransactionError::RpcError(e.to_string()))?; - - Ok(result) + let mut response: TrxDetailRes = vec![]; + while count_retry < 1 { + count_retry += 1; + let json = &serde_json::json!({"transactions": [signature]}); + println!("JSON: {}", json); + + let url = format!( + "{}/transactions/?api-key={}", + &self.config.helius_rpc_url, &self.config.helius_api_key + ); + + println!("URL: {}", url); + let res = client.post(url).json(json).send().await; + + response = match res { + Ok(res) => { + let status = res.status(); + let response_text = res.text().await; + println!("Status: {}", status); + match response_text { + Ok(text) => serde_json::from_str::(&text).unwrap(), + Err(e) => { + println!("Error fetching transaction details: {}", e); + continue; + } + } + } + Err(e) => { + println!("Error fetching transaction details: {}", e); + continue; + } + }; + } + + println!("Response: {:?}", response); + + Ok(response) } + + // async fn get_token_metadata(&self, mint_address: &str) -> Result { + // let client = reqwest::Client::new(); + // let response = client + // .post(&self.rpc_url) + // .json(&serde_json::json!({ + // "jsonrpc": "2.0", + // "id": 1, + // "method": "getAccountInfo", + // "params": [mint_address, {"encoding": "jsonParsed"}] + // })) + // .send() + // .await + // .map_err(|e| TransactionError::RpcError(e.to_string()))?; + + // let result: Value = response + // .json() + // .await + // .map_err(|e| TransactionError::RpcError(e.to_string()))?; + + // Ok(result) + // } } From 13cb00628c491740fe073ba8cf26381047732175 Mon Sep 17 00:00:00 2001 From: codewithmecoder Date: Sat, 8 Feb 2025 08:40:54 +0700 Subject: [PATCH 2/4] fetch trx details --- src/main.rs | 2 +- src/model.rs | 5 +- src/transaction_processor.rs | 47 +++++++--- src/websocket.rs | 160 +++++++++++++++++++++++++++-------- 4 files changed, 164 insertions(+), 50 deletions(-) diff --git a/src/main.rs b/src/main.rs index fd2aba0..f32af00 100644 --- a/src/main.rs +++ b/src/main.rs @@ -43,7 +43,7 @@ async fn main() -> Result<(), Box> { while let Some(message) = stream.next().await { // Process transaction let signer = Keypair::from_base58_string(&config.private_key); - let processor = transaction_processor::TransactionProcessor::new(config.clone()); + let processor = transaction_processor::TransactionProcessor::new(config.clone(), &websocket); // let notifier = notifier.clone(); if let Err(e) = processor.process_transaction(&message, &signer).await { diff --git a/src/model.rs b/src/model.rs index 64af180..25b7f49 100644 --- a/src/model.rs +++ b/src/model.rs @@ -83,8 +83,7 @@ pub struct TrxDetailRe { pub account_data: Vec, - pub transaction_error: TransactionError, - + pub transaction_error: Option, pub instructions: Vec, pub events: Events, @@ -136,7 +135,7 @@ pub struct TokenTransfer { pub to_user_account: String, - pub token_amount: i64, + pub token_amount: f64, pub mint: String, diff --git a/src/transaction_processor.rs b/src/transaction_processor.rs index 88f6b27..a9e76b9 100644 --- a/src/transaction_processor.rs +++ b/src/transaction_processor.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use serde_json::Value; use solana_sdk::signature::Signer; use thiserror::Error; @@ -5,6 +7,7 @@ use thiserror::Error; use crate::{ config::Config, model::{StreamMessage, TrxDetailRes}, + websocket::SolanaWebsocket, }; #[derive(Error, Debug)] @@ -17,13 +20,14 @@ pub enum TransactionError { RpcError(String), } -pub struct TransactionProcessor { +pub struct TransactionProcessor<'a> { config: Config, + ws: &'a SolanaWebsocket, } -impl TransactionProcessor { - pub fn new(config: Config) -> Self { - Self { config } +impl<'a> TransactionProcessor<'a> { + pub fn new(config: Config, ws: &'a SolanaWebsocket) -> Self { + Self { config, ws } } pub async fn process_transaction( @@ -71,15 +75,19 @@ impl TransactionProcessor { .iter() .find(|x| x.starts_with("Program log: initialize2: InitializeInstruction2")); if contains_create.is_some() { + self.ws.close(); + println!("======================"); println!("🔎 New Liquidity Pool found."); + println!("Puase WS for handle transaction"); + println!("🔃 Fetching transaction details..."); let signature = signature.as_ref().map_or("", String::as_str); let trx_details = self.fetch_trx_details(signature).await; match trx_details { Ok(trx_details) => { println!("🔎 Transaction details fetched successfully."); - println!("🔎 Transaction details: {:?}", trx_details); + println!("🔎 Transaction details: {}", trx_details[0].signature); } Err(e) => { println!("🔎 Error fetching transaction details: {}", e); @@ -97,7 +105,15 @@ impl TransactionProcessor { let mut count_retry = 0; let client = reqwest::Client::new(); let mut response: TrxDetailRes = vec![]; - while count_retry < 1 { + while count_retry < 3 { + if count_retry > 0 { + tokio::time::sleep(Duration::from_secs(10)).await; + println!("Delay fetching transaction details for 10 secs"); + } + println!( + "Retry fetching transaction details... Attempt {}", + count_retry + ); count_retry += 1; let json = &serde_json::json!({"transactions": [signature]}); println!("JSON: {}", json); @@ -108,15 +124,27 @@ impl TransactionProcessor { ); println!("URL: {}", url); - let res = client.post(url).json(json).send().await; + let res = client + .post(url) + .header("Content-Type", "application/json") + .json(json) + .send() + .await; response = match res { Ok(res) => { let status = res.status(); let response_text = res.text().await; println!("Status: {}", status); + match response_text { - Ok(text) => serde_json::from_str::(&text).unwrap(), + Ok(text) => { + let data_trx_details = serde_json::from_str::(&text).unwrap(); + if data_trx_details.len() == 0 { + continue; + } + data_trx_details + } Err(e) => { println!("Error fetching transaction details: {}", e); continue; @@ -129,9 +157,6 @@ impl TransactionProcessor { } }; } - - println!("Response: {:?}", response); - Ok(response) } diff --git a/src/websocket.rs b/src/websocket.rs index 4a770a1..0859853 100644 --- a/src/websocket.rs +++ b/src/websocket.rs @@ -1,6 +1,12 @@ +use std::sync::Arc; + use futures::{SinkExt, Stream, StreamExt}; use serde_json; use thiserror::Error; +use tokio::{ + sync::{watch, Mutex}, + time::{sleep, Duration}, +}; use tokio_tungstenite::tungstenite::{protocol::Message, Bytes}; #[derive(Error, Debug)] @@ -14,57 +20,141 @@ pub enum WebsocketError { pub struct SolanaWebsocket { ws_url: String, api_key: String, + stop_signal: Arc>, } impl SolanaWebsocket { pub fn new(ws_url: &str, api_key: &str) -> Self { + let (stop_signal, _) = watch::channel(false); Self { ws_url: ws_url.to_string(), api_key: api_key.to_string(), + stop_signal: Arc::new(stop_signal), } } + /// **Closes the WebSocket manually** + pub fn close(&self) { + let _ = self.stop_signal.send(true); + println!("WebSocket manually closed."); + } + + /// **Restarts the WebSocket connection** + pub fn restart(&self) { + let _ = self.stop_signal.send(false); // Reset stop signal + println!("🔄 Restarting WebSocket..."); + } + pub async fn listen_for_pool_creation( &self, wallet_address: &str, ) -> Result, WebsocketError> { + // let url_string = format!("{}/?api-key={}", self.ws_url, &self.api_key); + + // let (ws_stream, _) = tokio_tungstenite::connect_async(url_string) // Pass the String instead of Url + // .await + // .map_err(|e| WebsocketError::ConnectionError(e.to_string()))?; + // let (mut write, read) = ws_stream.split(); + + // let subscribe_message = serde_json::json!({ + // "jsonrpc": "2.0", + // "id": 1, + // "method": "logsSubscribe", + // "params": [ + // {"mentions": [wallet_address]}, + // {"commitment": "processed"} + // ] + // }); + + // // Convert `serde_json::Value` to Utf8Bytes + // let subscribe_message_str = serde_json::to_string(&subscribe_message).map_err(|e| { + // WebsocketError::SubscriptionError(format!("Error serializing message: {}", e)) + // })?; + + // let subscribe_message_bytes = subscribe_message_str.into_bytes(); // Convert to Utf8Bytes + + // write + // .send(Message::Binary(Bytes::from(subscribe_message_bytes))) + // .await + // .map_err(|e| WebsocketError::SubscriptionError(format!("Error listen for socket: {}", e)))?; + + // Ok(read.filter_map(|msg| async { + // match msg { + // Ok(Message::Text(text)) => { + // let str = text.to_string(); + // Some(str) + // } // `Message::Text` is already a `String` + // _ => None, + // } + // })) let url_string = format!("{}/?api-key={}", self.ws_url, &self.api_key); + let mut retry_attempts = 0; + let stop_receiver = self.stop_signal.subscribe(); - let (ws_stream, _) = tokio_tungstenite::connect_async(url_string) // Pass the String instead of Url - .await - .map_err(|e| WebsocketError::ConnectionError(e.to_string()))?; - let (mut write, read) = ws_stream.split(); - - let subscribe_message = serde_json::json!({ - "jsonrpc": "2.0", - "id": 1, - "method": "logsSubscribe", - "params": [ - {"mentions": [wallet_address]}, - {"commitment": "processed"} - ] - }); - - // Convert `serde_json::Value` to Utf8Bytes - let subscribe_message_str = serde_json::to_string(&subscribe_message).map_err(|e| { - WebsocketError::SubscriptionError(format!("Error serializing message: {}", e)) - })?; - - let subscribe_message_bytes = subscribe_message_str.into_bytes(); // Convert to Utf8Bytes - - write - .send(Message::Binary(Bytes::from(subscribe_message_bytes))) - .await - .map_err(|e| WebsocketError::SubscriptionError(format!("Error listen for socket: {}", e)))?; - - Ok(read.filter_map(|msg| async { - match msg { - Ok(Message::Text(text)) => { - let str = text.to_string(); - Some(str) - } // `Message::Text` is already a `String` - _ => None, + loop { + if *stop_receiver.borrow() { + println!("🛑 Stopping WebSocket connection..."); + return Err(WebsocketError::ConnectionError( + "WebSocket closed manually.".into(), + )); } - })) + + match tokio_tungstenite::connect_async(&url_string).await { + Ok((ws_stream, _)) => { + let (mut write, read) = ws_stream.split(); + + let subscribe_message = serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "logsSubscribe", + "params": [ + {"mentions": [wallet_address]}, + {"commitment": "processed"} + ] + }); + + let subscribe_message_str = serde_json::to_string(&subscribe_message).map_err(|e| { + WebsocketError::SubscriptionError(format!("Error serializing message: {}", e)) + })?; + + let subscribe_message_bytes = subscribe_message_str.into_bytes(); + + if let Err(e) = write + .send(Message::Binary(Bytes::from(subscribe_message_bytes))) + .await + { + return Err(WebsocketError::SubscriptionError(format!( + "Error sending subscription: {}", + e + ))); + } + println!("✅ WebSocket connected successfully!"); + + return Ok(read.filter_map(|msg| async { + match msg { + Ok(Message::Text(text)) => { + let str = text.to_string(); + Some(str) + } // `Message::Text` is already a `String` + Ok(Message::Close(_)) => { + println!("⚠️ WebSocket closed. Reconnecting..."); + None + } + Err(e) => { + println!("❌ WebSocket error: {}. Reconnecting...", e); + None + } + _ => None, + } + })); + } + Err(e) => { + println!("🔄 Failed to connect: {}. Retrying...", e); + retry_attempts += 1; + let wait_time = Duration::from_secs(2u64.pow(retry_attempts.min(5))); // Max 32s delay + sleep(wait_time).await; + } + } + } } } From 3e31d1bd9ffb6de6ff38e736f2451f389594417d Mon Sep 17 00:00:00 2001 From: codewithmecoder Date: Sat, 8 Feb 2025 12:05:35 +0700 Subject: [PATCH 3/4] fix fetch trx details --- src/config.rs | 6 +++- src/model.rs | 10 +++++-- src/transaction_processor.rs | 56 ++++++++++++++++++++++++++++-------- 3 files changed, 57 insertions(+), 15 deletions(-) diff --git a/src/config.rs b/src/config.rs index 248081f..71f1517 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,4 +1,4 @@ -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct Config { pub database_url: String, pub telegram_token: String, @@ -10,6 +10,7 @@ pub struct Config { pub program_id: String, pub private_key: String, + pub liquidility_pool_wsol_pc_mint: String, } impl Config { pub fn init() -> Config { @@ -21,6 +22,8 @@ impl Config { let helius_api_key = std::env::var("HELIUS_API_KEY").expect("HELIUS_API_KEY must be set"); let program_id = std::env::var("PROGRAM_ID").expect("PROGRAM_ID must be set"); let private_key = std::env::var("PRIVATE_KEY").expect("PRIVATE_KEY must be set"); + let liquidility_pool_wsol_pc_mint = std::env::var("LIQUIDILITY_POOL_WSOL_PC_MINT") + .expect("LIQUIDILITY_POOL_WSOL_PC_MINT must be set"); Config { database_url, @@ -31,6 +34,7 @@ impl Config { private_key, program_id, telegram_chat_id: telegram_chat_id.parse::().unwrap(), + liquidility_pool_wsol_pc_mint, } } } diff --git a/src/model.rs b/src/model.rs index 25b7f49..18e7656 100644 --- a/src/model.rs +++ b/src/model.rs @@ -102,10 +102,10 @@ pub struct AccountDatum { #[derive(Serialize, Deserialize, Debug)] pub struct Events {} -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "camelCase")] pub struct Instruction { - pub accounts: Vec, + pub accounts: Option>, pub data: String, @@ -161,3 +161,9 @@ pub enum InstructionErrorElement { pub struct InstructionErrorClass { pub custom: i64, } + +#[derive(Serialize, Deserialize, Debug)] +pub struct DisplayDataItem { + pub token_mint: String, + pub sol_mint: String, +} diff --git a/src/transaction_processor.rs b/src/transaction_processor.rs index a9e76b9..ae4e6af 100644 --- a/src/transaction_processor.rs +++ b/src/transaction_processor.rs @@ -1,12 +1,11 @@ use std::time::Duration; -use serde_json::Value; use solana_sdk::signature::Signer; use thiserror::Error; use crate::{ config::Config, - model::{StreamMessage, TrxDetailRes}, + model::{DisplayDataItem, StreamMessage, TrxDetailRes}, websocket::SolanaWebsocket, }; @@ -86,8 +85,7 @@ impl<'a> TransactionProcessor<'a> { let trx_details = self.fetch_trx_details(signature).await; match trx_details { Ok(trx_details) => { - println!("🔎 Transaction details fetched successfully."); - println!("🔎 Transaction details: {}", trx_details[0].signature); + print!("🔎 Transaction details: {:?}", trx_details); } Err(e) => { println!("🔎 Error fetching transaction details: {}", e); @@ -101,10 +99,9 @@ impl<'a> TransactionProcessor<'a> { Ok(()) } - async fn fetch_trx_details(&self, signature: &str) -> Result { + async fn fetch_trx_details(&self, signature: &str) -> Result { let mut count_retry = 0; let client = reqwest::Client::new(); - let mut response: TrxDetailRes = vec![]; while count_retry < 3 { if count_retry > 0 { tokio::time::sleep(Duration::from_secs(10)).await; @@ -123,7 +120,6 @@ impl<'a> TransactionProcessor<'a> { &self.config.helius_rpc_url, &self.config.helius_api_key ); - println!("URL: {}", url); let res = client .post(url) .header("Content-Type", "application/json") @@ -131,7 +127,7 @@ impl<'a> TransactionProcessor<'a> { .send() .await; - response = match res { + let response = match res { Ok(res) => { let status = res.status(); let response_text = res.text().await; @@ -139,11 +135,44 @@ impl<'a> TransactionProcessor<'a> { match response_text { Ok(text) => { - let data_trx_details = serde_json::from_str::(&text).unwrap(); - if data_trx_details.len() == 0 { + let trx_details = serde_json::from_str::(&text).unwrap(); + if trx_details.len() == 0 { continue; } - data_trx_details + let mut instructions = trx_details[0].instructions.clone().into_iter(); + if instructions.len() == 0 {} + + let instruction = instructions.find(|i| i.program_id == self.config.program_id); + + if let Some(instr) = instruction { + if let Some(accs) = instr.accounts { + let acc_one = accs[8].to_string(); + let acc_two = accs[9].to_string(); + let sol_token_acc: String; + let new_token_acc: String; + if acc_one == self.config.liquidility_pool_wsol_pc_mint { + sol_token_acc = acc_one; + new_token_acc = acc_two; + } else { + sol_token_acc = acc_two; + new_token_acc = acc_one; + } + + let display_data: DisplayDataItem = DisplayDataItem { + sol_mint: sol_token_acc, + token_mint: new_token_acc, + }; + display_data + } else { + return Err(TransactionError::RpcError( + "Failed to fetch transaction details".to_string(), + )); + } + } else { + return Err(TransactionError::RpcError( + "Failed to fetch transaction details".to_string(), + )); + } } Err(e) => { println!("Error fetching transaction details: {}", e); @@ -156,8 +185,11 @@ impl<'a> TransactionProcessor<'a> { continue; } }; + return Ok(response); } - Ok(response) + Err(TransactionError::RpcError( + "Failed to fetch transaction details".to_string(), + )) } // async fn get_token_metadata(&self, mint_address: &str) -> Result { From 8a04d83fddf14ffb2be19177236afdbd11ee7b16 Mon Sep 17 00:00:00 2001 From: codewithmecoder Date: Sat, 8 Feb 2025 13:54:16 +0700 Subject: [PATCH 4/4] check rug token --- rug_check_res_sample.json | 35 +++++++++++++++++ src/config.rs | 19 +++++++++ src/main.rs | 9 ++--- src/model.rs | 27 ++++++++++++- src/rug_checker.rs | 76 ++++++++++++++++++++++++++++++++++++ src/transaction_processor.rs | 43 +++++++++++++++++++- 6 files changed, 200 insertions(+), 9 deletions(-) create mode 100644 rug_check_res_sample.json create mode 100644 src/rug_checker.rs diff --git a/rug_check_res_sample.json b/rug_check_res_sample.json new file mode 100644 index 0000000..393590c --- /dev/null +++ b/rug_check_res_sample.json @@ -0,0 +1,35 @@ +{ + "tokenProgram": "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA", + "tokenType": "", + "risks": [ + { + "name": "Large Amount of LP Unlocked", + "value": "100.00%", + "description": "A large amount of LP tokens are unlocked, allowing the owner to remove liquidity at any point.", + "score": 10999, + "level": "danger" + }, + { + "name": "High holder correlation", + "value": "(19)", + "description": "The top users hold similar supply amounts", + "score": 2900, + "level": "warn" + }, + { + "name": "Low amount of LP Providers", + "value": "", + "description": "Only a few users are providing liquidity", + "score": 400, + "level": "warn" + }, + { + "name": "Mutable metadata", + "value": "", + "description": "Token metadata can be changed by the owner", + "score": 100, + "level": "warn" + } + ], + "score": 14399 +} diff --git a/src/config.rs b/src/config.rs index 71f1517..a7f4526 100644 --- a/src/config.rs +++ b/src/config.rs @@ -11,7 +11,16 @@ pub struct Config { pub program_id: String, pub private_key: String, pub liquidility_pool_wsol_pc_mint: String, + pub rug_checker_url: String, + pub rug_check_config: RugCheckConfig, } + +#[derive(Debug, Clone, PartialEq)] +pub struct RugCheckConfig { + pub signal_holder_ownership: f64, + pub not_allowed_risk: Vec, +} + impl Config { pub fn init() -> Config { let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set"); @@ -24,6 +33,7 @@ impl Config { let private_key = std::env::var("PRIVATE_KEY").expect("PRIVATE_KEY must be set"); let liquidility_pool_wsol_pc_mint = std::env::var("LIQUIDILITY_POOL_WSOL_PC_MINT") .expect("LIQUIDILITY_POOL_WSOL_PC_MINT must be set"); + let rug_checker_url = std::env::var("RUG_CHECKER_URL").expect("RUG_CHECKER_URL must be set"); Config { database_url, @@ -35,6 +45,15 @@ impl Config { program_id, telegram_chat_id: telegram_chat_id.parse::().unwrap(), liquidility_pool_wsol_pc_mint, + rug_checker_url, + rug_check_config: RugCheckConfig { + signal_holder_ownership: 0 as f64, + not_allowed_risk: vec![ + "Freeze Authority still enabled".to_string(), + "Large Amount of LP Unlocked".to_string(), + "Copycat token".to_string(), + ], + }, } } } diff --git a/src/main.rs b/src/main.rs index f32af00..2504da7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,12 +2,11 @@ mod config; mod database; mod model; mod price_monitor; +mod rug_checker; mod telegram; mod transaction_processor; mod websocket; -use std::clone; - use crate::{ database::Database, price_monitor::PriceMonitor, telegram::TelegramNotifier, websocket::SolanaWebsocket, @@ -43,14 +42,12 @@ async fn main() -> Result<(), Box> { while let Some(message) = stream.next().await { // Process transaction let signer = Keypair::from_base58_string(&config.private_key); - let processor = transaction_processor::TransactionProcessor::new(config.clone(), &websocket); + let processor = + transaction_processor::TransactionProcessor::new(config.clone(), &websocket, ¬ifier); // let notifier = notifier.clone(); if let Err(e) = processor.process_transaction(&message, &signer).await { eprintln!("Error processing transaction: {}", e); - notifier - .send_message(&format!("Error processing transaction: {}", e)) - .await?; } } diff --git a/src/model.rs b/src/model.rs index 18e7656..8d9ecdd 100644 --- a/src/model.rs +++ b/src/model.rs @@ -162,8 +162,33 @@ pub struct InstructionErrorClass { pub custom: i64, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone)] pub struct DisplayDataItem { pub token_mint: String, pub sol_mint: String, } + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct RugCheckRes { + pub token_program: String, + + pub token_type: String, + + pub risks: Vec, + + pub score: i64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RugCheckRisk { + pub name: String, + + pub value: String, + + pub description: String, + + pub score: i64, + + pub level: String, +} diff --git a/src/rug_checker.rs b/src/rug_checker.rs new file mode 100644 index 0000000..87480b6 --- /dev/null +++ b/src/rug_checker.rs @@ -0,0 +1,76 @@ +use crate::{config::Config, model::RugCheckRes}; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum RugCheckerError { + #[error("Connection error: {0}")] + RugCheckError(String), +} + +pub struct RugChecker<'a> { + pub config: &'a Config, +} + +impl<'a> RugChecker<'a> { + pub fn new(config: &'a Config) -> Self { + Self { config } + } + + pub async fn isvalid_rug_check(&self, token_mint: &str) -> Result { + let client = reqwest::Client::new(); + let url = format!( + "{}/tokens/{}/report/summary", + self.config.rug_checker_url, token_mint + ); + let res = client + .get(url) + .header("Content-Type", "application/json") + .send() + .await; + + match res { + Ok(data) => { + let status = data.status(); + let response_txt = data.text().await; + println!("Status: {}", status); + + match response_txt { + Ok(txt) => { + println!("Response: {}", txt); + let res_data = serde_json::from_str::(&txt).unwrap(); + let mut is_valid = true; + res_data.risks.iter().for_each(|risk| { + println!("Rug: {:?}", risk); + if risk.name.to_lowercase() == "single holder ownership" { + let value = risk.value.replace("%", ""); + let numberic_value = value.parse::().unwrap(); + if numberic_value > self.config.rug_check_config.signal_holder_ownership { + is_valid = false; + } + } + }); + if !is_valid { + return Ok(false); + } + + let res_data = res_data.clone(); + let valid = !res_data.risks.iter().any(|r| { + self + .config + .rug_check_config + .not_allowed_risk + .contains(&r.name) + }); + return Ok(valid); + } + Err(e) => { + return Err(RugCheckerError::RugCheckError(e.to_string())); + } + } + } + Err(e) => { + return Err(RugCheckerError::RugCheckError(e.to_string())); + } + }; + } +} diff --git a/src/transaction_processor.rs b/src/transaction_processor.rs index ae4e6af..045d077 100644 --- a/src/transaction_processor.rs +++ b/src/transaction_processor.rs @@ -6,6 +6,8 @@ use thiserror::Error; use crate::{ config::Config, model::{DisplayDataItem, StreamMessage, TrxDetailRes}, + rug_checker::RugChecker, + telegram::TelegramNotifier, websocket::SolanaWebsocket, }; @@ -22,11 +24,16 @@ pub enum TransactionError { pub struct TransactionProcessor<'a> { config: Config, ws: &'a SolanaWebsocket, + notifier: &'a TelegramNotifier, } impl<'a> TransactionProcessor<'a> { - pub fn new(config: Config, ws: &'a SolanaWebsocket) -> Self { - Self { config, ws } + pub fn new(config: Config, ws: &'a SolanaWebsocket, notifier: &'a TelegramNotifier) -> Self { + Self { + config, + ws, + notifier, + } } pub async fn process_transaction( @@ -86,6 +93,38 @@ impl<'a> TransactionProcessor<'a> { match trx_details { Ok(trx_details) => { print!("🔎 Transaction details: {:?}", trx_details); + let rug_checker = RugChecker::new(&self.config); + let is_valid_rug_check = + rug_checker.isvalid_rug_check(&trx_details.token_mint).await; + + match is_valid_rug_check { + Ok(is_valid) => { + // let trx_details = trx_details.clone(); + if is_valid { + println!("🚀 Liquidity Pool is valid."); + _ = self + .notifier + .send_message(&format!( + "🚀 Liquidity Pool is valid. \nTokenMint: {}\n ViewToken: https://gmgn.ai/sol/token/{}", + &trx_details.token_mint, &trx_details.token_mint, + )) + .await; + } else { + println!("🚨 Liquidity Pool is not valid rug check."); + _ = self + .notifier + .send_message(&format!( + "🚨 Liquidity Pool is not valid rug check. \nTokenMint: {}\n ViewToken: https://gmgn.ai/sol/token/{}", + &trx_details.token_mint, + &trx_details.token_mint, + )) + .await; + } + } + Err(e) => { + println!("🔎 Error checking rug pull: {}", e); + } + } } Err(e) => { println!("🔎 Error fetching transaction details: {}", e);