diff --git a/rug_check_res_sample.json b/rug_check_res_sample.json new file mode 100644 index 0000000..393590c --- /dev/null +++ b/rug_check_res_sample.json @@ -0,0 +1,35 @@ +{ + "tokenProgram": "TokenkegQfeZyiNwAJbNbGKPFXCWuBvf9Ss623VQ5DA", + "tokenType": "", + "risks": [ + { + "name": "Large Amount of LP Unlocked", + "value": "100.00%", + "description": "A large amount of LP tokens are unlocked, allowing the owner to remove liquidity at any point.", + "score": 10999, + "level": "danger" + }, + { + "name": "High holder correlation", + "value": "(19)", + "description": "The top users hold similar supply amounts", + "score": 2900, + "level": "warn" + }, + { + "name": "Low amount of LP Providers", + "value": "", + "description": "Only a few users are providing liquidity", + "score": 400, + "level": "warn" + }, + { + "name": "Mutable metadata", + "value": "", + "description": "Token metadata can be changed by the owner", + "score": 100, + "level": "warn" + } + ], + "score": 14399 +} diff --git a/src/config.rs b/src/config.rs index 248081f..a7f4526 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,4 +1,4 @@ -#[derive(Debug, Clone)] +#[derive(Debug, Clone, PartialEq)] pub struct Config { pub database_url: String, pub telegram_token: String, @@ -10,7 +10,17 @@ pub struct Config { pub program_id: String, pub private_key: String, + pub liquidility_pool_wsol_pc_mint: String, + pub rug_checker_url: String, + pub rug_check_config: RugCheckConfig, } + +#[derive(Debug, Clone, PartialEq)] +pub struct RugCheckConfig { + pub signal_holder_ownership: f64, + pub not_allowed_risk: Vec, +} + impl Config { pub fn init() -> Config { let database_url = std::env::var("DATABASE_URL").expect("DATABASE_URL must be set"); @@ -21,6 +31,9 @@ impl Config { let helius_api_key = std::env::var("HELIUS_API_KEY").expect("HELIUS_API_KEY must be set"); let program_id = std::env::var("PROGRAM_ID").expect("PROGRAM_ID must be set"); let private_key = std::env::var("PRIVATE_KEY").expect("PRIVATE_KEY must be set"); + let liquidility_pool_wsol_pc_mint = std::env::var("LIQUIDILITY_POOL_WSOL_PC_MINT") + .expect("LIQUIDILITY_POOL_WSOL_PC_MINT must be set"); + let rug_checker_url = std::env::var("RUG_CHECKER_URL").expect("RUG_CHECKER_URL must be set"); Config { database_url, @@ -31,6 +44,16 @@ impl Config { private_key, program_id, telegram_chat_id: telegram_chat_id.parse::().unwrap(), + liquidility_pool_wsol_pc_mint, + rug_checker_url, + rug_check_config: RugCheckConfig { + signal_holder_ownership: 0 as f64, + not_allowed_risk: vec![ + "Freeze Authority still enabled".to_string(), + "Large Amount of LP Unlocked".to_string(), + "Copycat token".to_string(), + ], + }, } } } diff --git a/src/main.rs b/src/main.rs index 9fe1ff6..2504da7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ mod config; mod database; mod model; mod price_monitor; +mod rug_checker; mod telegram; mod transaction_processor; mod websocket; @@ -41,14 +42,12 @@ async fn main() -> Result<(), Box> { while let Some(message) = stream.next().await { // Process transaction let signer = Keypair::from_base58_string(&config.private_key); - let processor = transaction_processor::TransactionProcessor::new(&config.helius_rpc_url); + let processor = + transaction_processor::TransactionProcessor::new(config.clone(), &websocket, ¬ifier); // let notifier = notifier.clone(); if let Err(e) = processor.process_transaction(&message, &signer).await { eprintln!("Error processing transaction: {}", e); - notifier - .send_message(&format!("Error processing transaction: {}", e)) - .await?; } } diff --git a/src/model.rs b/src/model.rs index cb75bf0..8d9ecdd 100644 --- a/src/model.rs +++ b/src/model.rs @@ -54,3 +54,141 @@ pub struct Value { pub signature: Option, pub logs: Option>, } + +pub type TrxDetailRes = Vec; + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct TrxDetailRe { + pub description: String, + + #[serde(rename = "type")] + pub trx_detail_re_type: String, + + pub source: String, + + pub fee: i64, + + pub fee_payer: String, + + pub signature: String, + + pub slot: i64, + + pub timestamp: i64, + + pub token_transfers: Vec, + + pub native_transfers: Vec, + + pub account_data: Vec, + + pub transaction_error: Option, + pub instructions: Vec, + + pub events: Events, +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct AccountDatum { + pub account: String, + + pub native_balance_change: i64, + + pub token_balance_changes: Vec>, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct Events {} + +#[derive(Serialize, Deserialize, Debug, Clone)] +#[serde(rename_all = "camelCase")] +pub struct Instruction { + pub accounts: Option>, + + pub data: String, + + pub program_id: String, + + pub inner_instructions: Option>, +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct NativeTransfer { + pub from_user_account: String, + + pub to_user_account: String, + + pub amount: i64, +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "camelCase")] +pub struct TokenTransfer { + pub from_token_account: String, + + pub to_token_account: String, + + pub from_user_account: String, + + pub to_user_account: String, + + pub token_amount: f64, + + pub mint: String, + + pub token_standard: String, +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "PascalCase")] +pub struct TransactionError { + pub instruction_error: Vec, +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(untagged)] +pub enum InstructionErrorElement { + InstructionErrorClass(InstructionErrorClass), + + Integer(i64), +} + +#[derive(Serialize, Deserialize, Debug)] +#[serde(rename_all = "PascalCase")] +pub struct InstructionErrorClass { + pub custom: i64, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct DisplayDataItem { + pub token_mint: String, + pub sol_mint: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct RugCheckRes { + pub token_program: String, + + pub token_type: String, + + pub risks: Vec, + + pub score: i64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RugCheckRisk { + pub name: String, + + pub value: String, + + pub description: String, + + pub score: i64, + + pub level: String, +} diff --git a/src/rug_checker.rs b/src/rug_checker.rs new file mode 100644 index 0000000..87480b6 --- /dev/null +++ b/src/rug_checker.rs @@ -0,0 +1,76 @@ +use crate::{config::Config, model::RugCheckRes}; +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum RugCheckerError { + #[error("Connection error: {0}")] + RugCheckError(String), +} + +pub struct RugChecker<'a> { + pub config: &'a Config, +} + +impl<'a> RugChecker<'a> { + pub fn new(config: &'a Config) -> Self { + Self { config } + } + + pub async fn isvalid_rug_check(&self, token_mint: &str) -> Result { + let client = reqwest::Client::new(); + let url = format!( + "{}/tokens/{}/report/summary", + self.config.rug_checker_url, token_mint + ); + let res = client + .get(url) + .header("Content-Type", "application/json") + .send() + .await; + + match res { + Ok(data) => { + let status = data.status(); + let response_txt = data.text().await; + println!("Status: {}", status); + + match response_txt { + Ok(txt) => { + println!("Response: {}", txt); + let res_data = serde_json::from_str::(&txt).unwrap(); + let mut is_valid = true; + res_data.risks.iter().for_each(|risk| { + println!("Rug: {:?}", risk); + if risk.name.to_lowercase() == "single holder ownership" { + let value = risk.value.replace("%", ""); + let numberic_value = value.parse::().unwrap(); + if numberic_value > self.config.rug_check_config.signal_holder_ownership { + is_valid = false; + } + } + }); + if !is_valid { + return Ok(false); + } + + let res_data = res_data.clone(); + let valid = !res_data.risks.iter().any(|r| { + self + .config + .rug_check_config + .not_allowed_risk + .contains(&r.name) + }); + return Ok(valid); + } + Err(e) => { + return Err(RugCheckerError::RugCheckError(e.to_string())); + } + } + } + Err(e) => { + return Err(RugCheckerError::RugCheckError(e.to_string())); + } + }; + } +} diff --git a/src/transaction_processor.rs b/src/transaction_processor.rs index 16e5e64..045d077 100644 --- a/src/transaction_processor.rs +++ b/src/transaction_processor.rs @@ -1,8 +1,15 @@ -use serde_json::Value; -use solana_sdk::signature::{Signature, Signer}; +use std::time::Duration; + +use solana_sdk::signature::Signer; use thiserror::Error; -use crate::model::StreamMessage; +use crate::{ + config::Config, + model::{DisplayDataItem, StreamMessage, TrxDetailRes}, + rug_checker::RugChecker, + telegram::TelegramNotifier, + websocket::SolanaWebsocket, +}; #[derive(Error, Debug)] pub enum TransactionError { @@ -14,14 +21,18 @@ pub enum TransactionError { RpcError(String), } -pub struct TransactionProcessor { - rpc_url: String, +pub struct TransactionProcessor<'a> { + config: Config, + ws: &'a SolanaWebsocket, + notifier: &'a TelegramNotifier, } -impl TransactionProcessor { - pub fn new(rpc_url: &str) -> Self { +impl<'a> TransactionProcessor<'a> { + pub fn new(config: Config, ws: &'a SolanaWebsocket, notifier: &'a TelegramNotifier) -> Self { Self { - rpc_url: rpc_url.to_string(), + config, + ws, + notifier, } } @@ -70,10 +81,55 @@ impl TransactionProcessor { .iter() .find(|x| x.starts_with("Program log: initialize2: InitializeInstruction2")); if contains_create.is_some() { - println!( - "Create pool transaction detected: {}", - serde_json::to_string(&data).unwrap() - ); + self.ws.close(); + + println!("======================"); + println!("🔎 New Liquidity Pool found."); + println!("Puase WS for handle transaction"); + + println!("🔃 Fetching transaction details..."); + let signature = signature.as_ref().map_or("", String::as_str); + let trx_details = self.fetch_trx_details(signature).await; + match trx_details { + Ok(trx_details) => { + print!("🔎 Transaction details: {:?}", trx_details); + let rug_checker = RugChecker::new(&self.config); + let is_valid_rug_check = + rug_checker.isvalid_rug_check(&trx_details.token_mint).await; + + match is_valid_rug_check { + Ok(is_valid) => { + // let trx_details = trx_details.clone(); + if is_valid { + println!("🚀 Liquidity Pool is valid."); + _ = self + .notifier + .send_message(&format!( + "🚀 Liquidity Pool is valid. \nTokenMint: {}\n ViewToken: https://gmgn.ai/sol/token/{}", + &trx_details.token_mint, &trx_details.token_mint, + )) + .await; + } else { + println!("🚨 Liquidity Pool is not valid rug check."); + _ = self + .notifier + .send_message(&format!( + "🚨 Liquidity Pool is not valid rug check. \nTokenMint: {}\n ViewToken: https://gmgn.ai/sol/token/{}", + &trx_details.token_mint, + &trx_details.token_mint, + )) + .await; + } + } + Err(e) => { + println!("🔎 Error checking rug pull: {}", e); + } + } + } + Err(e) => { + println!("🔎 Error fetching transaction details: {}", e); + } + } } } } @@ -82,25 +138,118 @@ impl TransactionProcessor { Ok(()) } - async fn get_token_metadata(&self, mint_address: &str) -> Result { + async fn fetch_trx_details(&self, signature: &str) -> Result { + let mut count_retry = 0; let client = reqwest::Client::new(); - let response = client - .post(&self.rpc_url) - .json(&serde_json::json!({ - "jsonrpc": "2.0", - "id": 1, - "method": "getAccountInfo", - "params": [mint_address, {"encoding": "jsonParsed"}] - })) - .send() - .await - .map_err(|e| TransactionError::RpcError(e.to_string()))?; - - let result: Value = response - .json() - .await - .map_err(|e| TransactionError::RpcError(e.to_string()))?; - - Ok(result) + while count_retry < 3 { + if count_retry > 0 { + tokio::time::sleep(Duration::from_secs(10)).await; + println!("Delay fetching transaction details for 10 secs"); + } + println!( + "Retry fetching transaction details... Attempt {}", + count_retry + ); + count_retry += 1; + let json = &serde_json::json!({"transactions": [signature]}); + println!("JSON: {}", json); + + let url = format!( + "{}/transactions/?api-key={}", + &self.config.helius_rpc_url, &self.config.helius_api_key + ); + + let res = client + .post(url) + .header("Content-Type", "application/json") + .json(json) + .send() + .await; + + let response = match res { + Ok(res) => { + let status = res.status(); + let response_text = res.text().await; + println!("Status: {}", status); + + match response_text { + Ok(text) => { + let trx_details = serde_json::from_str::(&text).unwrap(); + if trx_details.len() == 0 { + continue; + } + let mut instructions = trx_details[0].instructions.clone().into_iter(); + if instructions.len() == 0 {} + + let instruction = instructions.find(|i| i.program_id == self.config.program_id); + + if let Some(instr) = instruction { + if let Some(accs) = instr.accounts { + let acc_one = accs[8].to_string(); + let acc_two = accs[9].to_string(); + let sol_token_acc: String; + let new_token_acc: String; + if acc_one == self.config.liquidility_pool_wsol_pc_mint { + sol_token_acc = acc_one; + new_token_acc = acc_two; + } else { + sol_token_acc = acc_two; + new_token_acc = acc_one; + } + + let display_data: DisplayDataItem = DisplayDataItem { + sol_mint: sol_token_acc, + token_mint: new_token_acc, + }; + display_data + } else { + return Err(TransactionError::RpcError( + "Failed to fetch transaction details".to_string(), + )); + } + } else { + return Err(TransactionError::RpcError( + "Failed to fetch transaction details".to_string(), + )); + } + } + Err(e) => { + println!("Error fetching transaction details: {}", e); + continue; + } + } + } + Err(e) => { + println!("Error fetching transaction details: {}", e); + continue; + } + }; + return Ok(response); + } + Err(TransactionError::RpcError( + "Failed to fetch transaction details".to_string(), + )) } + + // async fn get_token_metadata(&self, mint_address: &str) -> Result { + // let client = reqwest::Client::new(); + // let response = client + // .post(&self.rpc_url) + // .json(&serde_json::json!({ + // "jsonrpc": "2.0", + // "id": 1, + // "method": "getAccountInfo", + // "params": [mint_address, {"encoding": "jsonParsed"}] + // })) + // .send() + // .await + // .map_err(|e| TransactionError::RpcError(e.to_string()))?; + + // let result: Value = response + // .json() + // .await + // .map_err(|e| TransactionError::RpcError(e.to_string()))?; + + // Ok(result) + // } } diff --git a/src/websocket.rs b/src/websocket.rs index 4a770a1..0859853 100644 --- a/src/websocket.rs +++ b/src/websocket.rs @@ -1,6 +1,12 @@ +use std::sync::Arc; + use futures::{SinkExt, Stream, StreamExt}; use serde_json; use thiserror::Error; +use tokio::{ + sync::{watch, Mutex}, + time::{sleep, Duration}, +}; use tokio_tungstenite::tungstenite::{protocol::Message, Bytes}; #[derive(Error, Debug)] @@ -14,57 +20,141 @@ pub enum WebsocketError { pub struct SolanaWebsocket { ws_url: String, api_key: String, + stop_signal: Arc>, } impl SolanaWebsocket { pub fn new(ws_url: &str, api_key: &str) -> Self { + let (stop_signal, _) = watch::channel(false); Self { ws_url: ws_url.to_string(), api_key: api_key.to_string(), + stop_signal: Arc::new(stop_signal), } } + /// **Closes the WebSocket manually** + pub fn close(&self) { + let _ = self.stop_signal.send(true); + println!("WebSocket manually closed."); + } + + /// **Restarts the WebSocket connection** + pub fn restart(&self) { + let _ = self.stop_signal.send(false); // Reset stop signal + println!("🔄 Restarting WebSocket..."); + } + pub async fn listen_for_pool_creation( &self, wallet_address: &str, ) -> Result, WebsocketError> { + // let url_string = format!("{}/?api-key={}", self.ws_url, &self.api_key); + + // let (ws_stream, _) = tokio_tungstenite::connect_async(url_string) // Pass the String instead of Url + // .await + // .map_err(|e| WebsocketError::ConnectionError(e.to_string()))?; + // let (mut write, read) = ws_stream.split(); + + // let subscribe_message = serde_json::json!({ + // "jsonrpc": "2.0", + // "id": 1, + // "method": "logsSubscribe", + // "params": [ + // {"mentions": [wallet_address]}, + // {"commitment": "processed"} + // ] + // }); + + // // Convert `serde_json::Value` to Utf8Bytes + // let subscribe_message_str = serde_json::to_string(&subscribe_message).map_err(|e| { + // WebsocketError::SubscriptionError(format!("Error serializing message: {}", e)) + // })?; + + // let subscribe_message_bytes = subscribe_message_str.into_bytes(); // Convert to Utf8Bytes + + // write + // .send(Message::Binary(Bytes::from(subscribe_message_bytes))) + // .await + // .map_err(|e| WebsocketError::SubscriptionError(format!("Error listen for socket: {}", e)))?; + + // Ok(read.filter_map(|msg| async { + // match msg { + // Ok(Message::Text(text)) => { + // let str = text.to_string(); + // Some(str) + // } // `Message::Text` is already a `String` + // _ => None, + // } + // })) let url_string = format!("{}/?api-key={}", self.ws_url, &self.api_key); + let mut retry_attempts = 0; + let stop_receiver = self.stop_signal.subscribe(); - let (ws_stream, _) = tokio_tungstenite::connect_async(url_string) // Pass the String instead of Url - .await - .map_err(|e| WebsocketError::ConnectionError(e.to_string()))?; - let (mut write, read) = ws_stream.split(); - - let subscribe_message = serde_json::json!({ - "jsonrpc": "2.0", - "id": 1, - "method": "logsSubscribe", - "params": [ - {"mentions": [wallet_address]}, - {"commitment": "processed"} - ] - }); - - // Convert `serde_json::Value` to Utf8Bytes - let subscribe_message_str = serde_json::to_string(&subscribe_message).map_err(|e| { - WebsocketError::SubscriptionError(format!("Error serializing message: {}", e)) - })?; - - let subscribe_message_bytes = subscribe_message_str.into_bytes(); // Convert to Utf8Bytes - - write - .send(Message::Binary(Bytes::from(subscribe_message_bytes))) - .await - .map_err(|e| WebsocketError::SubscriptionError(format!("Error listen for socket: {}", e)))?; - - Ok(read.filter_map(|msg| async { - match msg { - Ok(Message::Text(text)) => { - let str = text.to_string(); - Some(str) - } // `Message::Text` is already a `String` - _ => None, + loop { + if *stop_receiver.borrow() { + println!("🛑 Stopping WebSocket connection..."); + return Err(WebsocketError::ConnectionError( + "WebSocket closed manually.".into(), + )); } - })) + + match tokio_tungstenite::connect_async(&url_string).await { + Ok((ws_stream, _)) => { + let (mut write, read) = ws_stream.split(); + + let subscribe_message = serde_json::json!({ + "jsonrpc": "2.0", + "id": 1, + "method": "logsSubscribe", + "params": [ + {"mentions": [wallet_address]}, + {"commitment": "processed"} + ] + }); + + let subscribe_message_str = serde_json::to_string(&subscribe_message).map_err(|e| { + WebsocketError::SubscriptionError(format!("Error serializing message: {}", e)) + })?; + + let subscribe_message_bytes = subscribe_message_str.into_bytes(); + + if let Err(e) = write + .send(Message::Binary(Bytes::from(subscribe_message_bytes))) + .await + { + return Err(WebsocketError::SubscriptionError(format!( + "Error sending subscription: {}", + e + ))); + } + println!("✅ WebSocket connected successfully!"); + + return Ok(read.filter_map(|msg| async { + match msg { + Ok(Message::Text(text)) => { + let str = text.to_string(); + Some(str) + } // `Message::Text` is already a `String` + Ok(Message::Close(_)) => { + println!("⚠️ WebSocket closed. Reconnecting..."); + None + } + Err(e) => { + println!("❌ WebSocket error: {}. Reconnecting...", e); + None + } + _ => None, + } + })); + } + Err(e) => { + println!("🔄 Failed to connect: {}. Retrying...", e); + retry_attempts += 1; + let wait_time = Duration::from_secs(2u64.pow(retry_attempts.min(5))); // Max 32s delay + sleep(wait_time).await; + } + } + } } }