From af21263421f14cdb2d233ccbbcb67b2574c80f20 Mon Sep 17 00:00:00 2001 From: iota9star Date: Thu, 7 Dec 2023 19:09:18 +0800 Subject: [PATCH] Initial commit --- .env | 5 + .gitignore | 1 + Cargo.toml | 36 ++++++ README.md | 14 +++ build.sh | 1 + build_linux.sh | 2 + src/main.rs | 305 +++++++++++++++++++++++++++++++++++++++++++++++++ test.http | 21 ++++ 8 files changed, 385 insertions(+) create mode 100644 .env create mode 100644 .gitignore create mode 100644 Cargo.toml create mode 100644 README.md create mode 100644 build.sh create mode 100644 build_linux.sh create mode 100644 src/main.rs create mode 100644 test.http diff --git a/.env b/.env new file mode 100644 index 0000000..b404b51 --- /dev/null +++ b/.env @@ -0,0 +1,5 @@ +APP_API=0.0.0.0:12321 +ELECTRUMX_WSS=wss://electrumx.atomicals.xyz:50012 +IP_LIMIT_PER_SECOND=1 +IP_LIMIT_BURST_SIZE=10 +RUST_LOG=info \ No newline at end of file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..6aaf56e --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "fastelex-proxy" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +axum = { version = "^0", features = ["http2"] } +futures = "^0" +serde = { version = "^1", features = ["derive"] } +serde_json = "^1" +tokio = { version = "^1", features = ["full"] } +tokio-stream = "^0" +tokio-tungstenite = { version = "^0", features = ["native-tls"] } +tungstenite = "^0" +url = "^2" +time = { version = "^0", features = [] } +tower = { version = "^0", features = ["full"] } +tower-http = { version = "^0", features = ["cors", "trace", "catch-panic"] } +once_cell = "^1" +tracing = "^0" +tracing-subscriber = "^0" +openssl = { version = "^0", features = ["vendored"] } +anyhow = "^1" +tower_governor = "^0.2" +bytes = "^1" +http-body-util = "^0" +dotenv = "0" + +[profile.release] +strip = true +opt-level = "z" # Optimize for size. +lto = true +codegen-units = 1 +panic = "abort" diff --git a/README.md b/README.md new file mode 100644 index 0000000..ae2c793 --- /dev/null +++ b/README.md @@ -0,0 +1,14 @@ +## Build + +1. Create .cargo/config and add the instructions below: + +```toml +[target.x86_64-unknown-linux-gnu] +linker = "x86_64-unknown-linux-gnu-gcc" +``` + +2. Build the project: + +```shell +sh build_linux.sh +``` \ No newline at end of file diff --git a/build.sh b/build.sh new file mode 100644 index 0000000..7f1b114 --- /dev/null +++ b/build.sh @@ -0,0 +1 @@ +cargo build --release \ No newline at end of file diff --git a/build_linux.sh b/build_linux.sh new file mode 100644 index 0000000..516815c --- /dev/null +++ b/build_linux.sh @@ -0,0 +1,2 @@ +rustup target add x86_64-unknown-linux-gnu +cargo build --release --target x86_64-unknown-linux-gnu \ No newline at end of file diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..a477d36 --- /dev/null +++ b/src/main.rs @@ -0,0 +1,305 @@ +use std::any::Any; +use std::collections::HashMap; +use std::env; +use std::net::SocketAddr; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::Duration; + +use axum::body::Body; +use axum::error_handling::HandleErrorLayer; +use axum::extract::{Path, Query}; +use axum::extract::Extension; +use axum::extract::Json; +use axum::http; +use axum::http::header; +use axum::http::StatusCode; +use axum::response::IntoResponse; +use axum::response::Response; +use axum::Router; +use axum::routing::get; +use axum::ServiceExt; +use bytes::Bytes; +use dotenv::dotenv; +use futures::{SinkExt, StreamExt}; +use http_body_util::Full; +use once_cell::sync::Lazy; +use serde::{Deserialize, Serialize}; +use serde_json::{json, Number, Value}; +use tokio::sync::{mpsc, Mutex, RwLock}; +use tokio::sync::oneshot; +use tokio_stream::wrappers::UnboundedReceiverStream; +use tokio_tungstenite::connect_async; +use tokio_tungstenite::tungstenite::Message; +use tower::{BoxError, ServiceBuilder}; +use tower_governor::errors::display_error; +use tower_governor::governor::GovernorConfigBuilder; +use tower_governor::GovernorLayer; +use tower_http::catch_panic::CatchPanicLayer; +use tower_http::cors::CorsLayer; +use tower_http::trace::TraceLayer; +use tracing::{error, info}; + +#[derive(Serialize, Deserialize)] +struct JsonRpcRequest { + method: String, + params: Vec, + id: u64, +} + +#[derive(Serialize, Deserialize, Debug)] +struct JsonRpcResponse { + result: Option, + error: Option, + id: u64, +} + +#[derive(Serialize, Deserialize, Debug)] +struct R { + success: bool, + #[serde(skip_serializing_if = "Option::is_none")] + response: Option, + #[serde(skip_serializing_if = "Option::is_none")] + code: Option, + #[serde(skip_serializing_if = "Option::is_none")] + message: Option, +} + +impl R { + fn ok(payload: Value) -> Self { + Self { + success: true, + response: Some(payload), + code: None, + message: None, + } + } + fn error(code: i32, message: String) -> Self { + Self { + success: false, + response: None, + code: Some(Value::Number(Number::from(code))), + message: Some(Value::String(message)), + } + } +} + +static ID_COUNTER: Lazy = Lazy::new(|| AtomicU64::new(0)); + +fn get_next_id() -> u64 { + ID_COUNTER.fetch_add(1, Ordering::SeqCst) +} + +type Callbacks = Arc>>>; + +struct AppError(anyhow::Error); + +impl IntoResponse for AppError { + fn into_response(self) -> Response { + let value = R { + success: false, + code: None, + message: Some(Value::String(self.0.to_string())), + response: None, + }; + Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::from(serde_json::to_string(&value).unwrap())) + .unwrap() + } +} + +impl IntoResponse for R { + fn into_response(self) -> Response { + Json(self).into_response() + } +} + +async fn handle_get( + Extension(callbacks): Extension, + Extension(ws_tx): Extension>, + Path(method): Path, + Query(query): Query, +) -> Result { + let x = query.get("params").unwrap().as_str().unwrap(); + let params = serde_json::from_str(x).unwrap(); + let r = handle_request(method, params, callbacks, ws_tx).await; + Ok(r) +} + +async fn handle_post( + Extension(callbacks): Extension, + Extension(ws_tx): Extension>, + Path(method): Path, + Json(body): Json, +) -> Result { + let x = body.get("params").unwrap().as_array().unwrap(); + let r = handle_request(method, x.clone(), callbacks, ws_tx).await; + Ok(r) +} + +async fn handle_request( + method: String, + params: Vec, + callbacks: Callbacks, + ws_tx: mpsc::UnboundedSender, +) -> R { + let id = get_next_id(); + info!("=> id: {}, method: {}, params: {:?}", &id, &method, ¶ms); + + let (response_tx, response_rx) = oneshot::channel(); + { + callbacks.write().await.insert(id, response_tx); + } + let request = JsonRpcRequest { + id, + method, + params, + }; + let request_text = serde_json::to_string(&request).unwrap(); + ws_tx.send(Message::Text(request_text)).unwrap(); + match tokio::time::timeout(Duration::from_secs(10), response_rx).await { + Ok(Ok(rep)) => { + if let Some(result) = rep.result { + R::ok(result) + } else if let Some(err) = rep.error { + let err = err.as_object().unwrap(); + R { + success: false, + code: err.get("code").cloned(), + message: err.get("message").cloned(), + response: None, + } + } else { + R::error(-1, "No response".into()) + } + } + Ok(Err(_)) | Err(_) => { + { + callbacks.write().await.remove(&id); + } + R::error(-1, "Timeout".into()) + } + } +} + +async fn handle_proxy() -> impl IntoResponse { + Json(json!( + { + "success": true, + "info": { + "note": "Atomicals ElectrumX Digital Object Proxy Online", + "usageInfo": { + "note": "The service offers both POST and GET requests for proxying requests to ElectrumX. To handle larger broadcast transaction payloads use the POST method instead of GET.", + "POST": "POST /proxy/:method with string encoded array in the field \"params\" in the request body. ", + "GET": "GET /proxy/:method?params=[\"value1\"] with string encoded array in the query argument \"params\" in the URL." + }, + "healthCheck": "GET /proxy/health", + "github": "https://github.com/atomicals/electrumx-proxy", + "license": "MIT" + } + } + )) +} + + +fn handle_panic(err: Box) -> http::Response> { + let details = if let Some(s) = err.downcast_ref::() { + s.clone() + } else if let Some(s) = err.downcast_ref::<&str>() { + s.to_string() + } else { + "Unknown panic message".to_string() + }; + + let body = R::error(-1, details); + let body = serde_json::to_string(&body).unwrap(); + + Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .header(header::CONTENT_TYPE, "application/json") + .body(Full::from(body)) + .unwrap() +} + + +#[tokio::main] +async fn main() { + dotenv().ok(); + tracing_subscriber::fmt::init(); + let (ws_tx, ws_rx) = mpsc::unbounded_channel::(); + let callbacks: Arc>>> = + Arc::new(RwLock::new(HashMap::new())); + let ws_rx_stream = Arc::new(Mutex::new(UnboundedReceiverStream::new(ws_rx))); + let governor_conf = Box::new( + GovernorConfigBuilder::default() + .per_second(env::var("IP_LIMIT_PER_SECOND").unwrap_or("1".to_string()).parse().unwrap()) + .burst_size(env::var("IP_LIMIT_BURST_SIZE").unwrap_or("10".to_string()).parse().unwrap()) + .finish() + .unwrap(), + ); + let app = Router::new() + .route("/", get(|| async { "Hello, Atomicals!" })) + .route("/proxy", get(handle_proxy).post(handle_proxy)) + .route("/proxy/:method", get(handle_get).post(handle_post)) + .route_layer( + ServiceBuilder::new() + .layer(HandleErrorLayer::new(|e: BoxError| async move { + display_error(e) + })) + .layer(CatchPanicLayer::custom(handle_panic)) + .layer(TraceLayer::new_for_http()) + .layer(GovernorLayer { + config: Box::leak(governor_conf), + }), + ) + .layer(CorsLayer::permissive()) + .layer(Extension(callbacks.clone())) + .layer(Extension(ws_tx.clone())) + ; + tokio::spawn(async move { + loop { + let wss = env::var("ELECTRUMX_WSS").unwrap_or("wss://electrumx.atomicals.xyz:50012".to_string()); + info!("Try to connect to electrumx: {}", &wss); + match connect_async(wss).await { + Ok((ws, _)) => { + info!("Connected to electrumx"); + let (mut write, mut read) = ws.split(); + let ws_rx_stream = Arc::clone(&ws_rx_stream); + tokio::spawn(async move { + let mut guard = ws_rx_stream.lock().await; + while let Some(message) = guard.next().await { + let _ = write.send(message).await; + } + }); + while let Some(Ok(msg)) = read.next().await { + if let Ok(text) = msg.to_text() { + if let Ok(response) = serde_json::from_str::(text) { + info!( + "<= id: {}, success: {}", + &response.id, + response.result.is_some() + ); + if let Some(callback) = callbacks.write().await.remove(&response.id) + { + let _ = callback.send(response); + } + } + } + } + } + Err(e) => { + error!("Failed to connect to electrumx: {:?}", e); + tokio::time::sleep(Duration::from_secs(5)).await; + } + } + } + }); + let app_api = env::var("APP_API").unwrap_or("0.0.0.0:12321".to_string()); + let listener = tokio::net::TcpListener::bind(&app_api) + .await + .unwrap(); + info!("listening on {}", &app_api); + axum::serve(listener, app.into_make_service_with_connect_info::()) + .await.unwrap(); +} diff --git a/test.http b/test.http new file mode 100644 index 0000000..9201794 --- /dev/null +++ b/test.http @@ -0,0 +1,21 @@ + + +### +GET http://localhost:12321/proxy/blockchain.atomicals.listscripthash?params=[%22a98d3e974bdf9488520ce83ea14fbdb55878e73c8be79ddd38749cc742b3ea40%22,true] + +### +POST http://localhost:12321/proxy/blockchain.atomicals.listscripthash +Content-Type: application/json + +{ + "params": [ + "a98d3e974bdf9488520ce83ea14fbdb55878e73c8be79ddd38749cc742b3ea40", + true + ] +} + +### +GET http://localhost:12321/proxy + +### +GET http://localhost:12321 \ No newline at end of file