Skip to content

Commit

Permalink
🎨 Improve reconnect & code structure
Browse files Browse the repository at this point in the history
  • Loading branch information
iota9star committed Jan 5, 2024
1 parent eece3a5 commit c440045
Show file tree
Hide file tree
Showing 2 changed files with 168 additions and 67 deletions.
143 changes: 76 additions & 67 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#![feature(lazy_cell)]

mod urn;

use std::any::Any;
use std::collections::HashMap;
use std::env;
Expand Down Expand Up @@ -99,32 +101,41 @@ impl R {
}
}


static IP_LIMIT_PER_SECOND: LazyLock<u64> = LazyLock::new(|| {
env::var("IP_LIMIT_PER_SECOND").unwrap_or("1".to_string()).parse().unwrap()
env::var("IP_LIMIT_PER_SECOND")
.unwrap_or("1".to_string())
.parse()
.unwrap()
});

static IP_LIMIT_BURST_SIZE: LazyLock<u32> = LazyLock::new(|| {
env::var("IP_LIMIT_BURST_SIZE").unwrap_or("10".to_string()).parse().unwrap()
env::var("IP_LIMIT_BURST_SIZE")
.unwrap_or("10".to_string())
.parse()
.unwrap()
});

static CONCURRENCY_LIMIT: LazyLock<usize> = LazyLock::new(|| {
env::var("CONCURRENCY_LIMIT").unwrap_or("500".to_string()).parse().unwrap()
env::var("CONCURRENCY_LIMIT")
.unwrap_or("500".to_string())
.parse()
.unwrap()
});

static ELECTRUMX_WSS: LazyLock<String> = LazyLock::new(|| {
env::var("ELECTRUMX_WSS").unwrap_or("wss://electrumx.atomicals.xyz:50012".to_string())
});

static PROXY_HOST: LazyLock<String> = LazyLock::new(|| {
env::var("PROXY_HOST").unwrap_or("0.0.0.0:12321".into())
});
static PROXY_HOST: LazyLock<String> =
LazyLock::new(|| env::var("PROXY_HOST").unwrap_or("0.0.0.0:12321".into()));

static RESPONSE_TIMEOUT: LazyLock<u64> = LazyLock::new(|| {
env::var("RESPONSE_TIMEOUT").unwrap_or("10".to_string()).parse().unwrap()
env::var("RESPONSE_TIMEOUT")
.unwrap_or("10".to_string())
.parse()
.unwrap()
});


// The use of `AtomicU32` is to ensure not exceeding the integer range of other systems.
static ID_COUNTER: Lazy<AtomicU32> = Lazy::new(|| AtomicU32::new(0));

Expand Down Expand Up @@ -158,66 +169,57 @@ impl IntoResponse for R {

async fn handle_get(
Extension(callbacks): Extension<Callbacks>,
Extension(ws_tx): Extension<mpsc::UnboundedSender<Message>>,
Extension(ws_tx): Extension<mpsc::UnboundedSender<JsonRpcRequest>>,
Path(method): Path<String>,
Query(query): Query<Value>,
) -> Result<R, AppError> {
let r = match query.get("params") {
None => handle_request(method, vec![], callbacks, ws_tx).await,
None => handle_request(callbacks, ws_tx, method, vec![]).await,
Some(v) => {
let x = v.as_str().map(|s| if s.is_empty() {
"[]"
} else {
s
}).unwrap();
let x = v
.as_str()
.map(|s| if s.is_empty() { "[]" } else { s })
.unwrap();
let params = serde_json::from_str(x).unwrap();
handle_request(method, params, callbacks, ws_tx).await
handle_request(callbacks, ws_tx, method, params).await
}
};
Ok(r)
}

async fn handle_post(
Extension(callbacks): Extension<Callbacks>,
Extension(ws_tx): Extension<mpsc::UnboundedSender<Message>>,
Extension(ws_tx): Extension<mpsc::UnboundedSender<JsonRpcRequest>>,
Path(method): Path<String>,
body: Option<Json<Value>>,
) -> Result<R, AppError> {
let r = match body {
None => handle_request(method, vec![], callbacks, ws_tx).await,
Some(v) => {
match v.0.get("params") {
None => handle_request(method, vec![], callbacks, ws_tx).await,
Some(v) => {
let x = v.as_array().unwrap();
handle_request(method, x.clone(), callbacks, ws_tx).await
}
None => handle_request(callbacks, ws_tx, method, vec![]).await,
Some(v) => match v.0.get("params") {
None => handle_request(callbacks, ws_tx, method, vec![]).await,
Some(v) => {
let x = v.as_array().unwrap();
handle_request(callbacks, ws_tx, method, x.clone()).await
}
}
},
};
Ok(r)
}

async fn handle_request(
callbacks: Callbacks,
ws_tx: mpsc::UnboundedSender<JsonRpcRequest>,
method: String,
params: Vec<Value>,
callbacks: Callbacks,
ws_tx: mpsc::UnboundedSender<Message>,
) -> R {
let id = get_next_id();
info!("=> id: {}, method: {}, params: {:?}", &id, &method, &params);

let (response_tx, response_rx) = oneshot::channel();
{
callbacks.write().await.insert(id, response_tx);
}
let request = JsonRpcRequest {
id,
method,
params,
};
let request_text = serde_json::to_string(&request).unwrap();
ws_tx.send(Message::Text(request_text)).unwrap();
let request = JsonRpcRequest { id, method, params };
ws_tx.send(request).unwrap();
match tokio::time::timeout(Duration::from_secs(*RESPONSE_TIMEOUT), response_rx).await {
Ok(Ok(rep)) => {
if let Some(result) = rep.result {
Expand All @@ -236,7 +238,10 @@ async fn handle_request(
}
}
Ok(Err(_)) | Err(_) => {
warn!("<= id: {}, no response received after {} seconds", &id, *RESPONSE_TIMEOUT);
warn!(
"<= id: {}, no response received after {} seconds",
&id, *RESPONSE_TIMEOUT
);
{
callbacks.write().await.remove(&id);
}
Expand All @@ -247,7 +252,7 @@ async fn handle_request(

async fn handle_health(
Extension(callbacks): Extension<Callbacks>,
Extension(ws_tx): Extension<mpsc::UnboundedSender<Message>>,
Extension(ws_tx): Extension<mpsc::UnboundedSender<JsonRpcRequest>>,
) -> impl IntoResponse {
let id = get_next_id();
info!("=> id: {}, check health", &id);
Expand All @@ -261,12 +266,14 @@ async fn handle_health(
method: "blockchain.atomicals.get_global".into(),
params: vec![],
};
let request_text = serde_json::to_string(&request).unwrap();
ws_tx.send(Message::Text(request_text)).unwrap();
ws_tx.send(request).unwrap();
match tokio::time::timeout(Duration::from_secs(5), response_rx).await {
Ok(Ok(rep)) => R::health(rep.result.is_some()),
Ok(Err(_)) | Err(_) => {
warn!("<= id: {}, check health timeout, no response received after 5 seconds", &id);
warn!(
"<= id: {}, check health timeout, no response received after 5 seconds",
&id
);
{
callbacks.write().await.remove(&id);
}
Expand All @@ -275,7 +282,6 @@ async fn handle_health(
}
}


async fn handle_proxy() -> impl IntoResponse {
Json(json!({
"success": true,
Expand All @@ -293,7 +299,6 @@ async fn handle_proxy() -> impl IntoResponse {
}))
}


fn handle_panic(err: Box<dyn Any + Send + 'static>) -> http::Response<Full<Bytes>> {
let details = if let Some(s) = err.downcast_ref::<String>() {
s.clone()
Expand All @@ -313,12 +318,11 @@ fn handle_panic(err: Box<dyn Any + Send + 'static>) -> http::Response<Full<Bytes
.unwrap()
}


#[tokio::main]
async fn main() {
dotenv().ok();
tracing_subscriber::fmt::init();
let (ws_tx, ws_rx) = mpsc::unbounded_channel::<Message>();
let (ws_tx, ws_rx) = mpsc::unbounded_channel::<JsonRpcRequest>();
let callbacks: Callbacks = Arc::new(RwLock::new(HashMap::new()));
let ws_rx_stream = Arc::new(Mutex::new(UnboundedReceiverStream::new(ws_rx)));
let governor_conf = Box::new(
Expand All @@ -329,17 +333,15 @@ async fn main() {
.unwrap(),
);
let app = Router::new()
.fallback(
|uri: http::Uri| async move {
let body = R::error(-1, format!("No route {}", &uri));
let body = serde_json::to_string(&body).unwrap();
Response::builder()
.status(StatusCode::NOT_FOUND)
.header(header::CONTENT_TYPE, "application/json")
.body(Full::from(body))
.unwrap()
},
)
.fallback(|uri: http::Uri| async move {
let body = R::error(-1, format!("No route: {}", &uri));
let body = serde_json::to_string(&body).unwrap();
Response::builder()
.status(StatusCode::NOT_FOUND)
.header(header::CONTENT_TYPE, "application/json")
.body(Full::from(body))
.unwrap()
})
.route("/", get(|| async { "Hello, Atomicals!" }))
.route("/proxy", get(handle_proxy).post(handle_proxy))
.route("/proxy/health", get(handle_health).post(handle_health))
Expand All @@ -353,7 +355,7 @@ async fn main() {
.layer(TraceLayer::new_for_http())
.layer(GovernorLayer {
config: Box::leak(governor_conf),
})
}),
)
.layer(ConcurrencyLimitLayer::new(*CONCURRENCY_LIMIT))
.layer(CorsLayer::permissive())
Expand All @@ -368,35 +370,37 @@ async fn main() {
let wss = list.get(index).unwrap();
info!("Try to connect to ElectrumX: {}", &wss);
match connect_async(*wss).await {
Ok((ws, _)) => {
Ok((ws,_)) => {
info!("Connected to ElectrumX: {}", &wss);
let (mut write, mut read) = ws.split();
let ws_rx_stream = Arc::clone(&ws_rx_stream);
tokio::spawn(async move {
let mut guard = ws_rx_stream.lock().await;
while let Some(message) = guard.next().await {
if let Err(e) = write.send(message).await {
let request_text = serde_json::to_string(&message).unwrap();
if let Err(e) = write.send(Message::Text(request_text)).await {
error!("Failed to send message to ElectrumX: {:?}", e);
break;
}
}
});
while let Some(Ok(msg)) = read.next().await {
if msg.is_text() {
if let Ok(text) = msg.to_text() {
if let Ok(response) = serde_json::from_str::<JsonRpcResponse>(text) {
if let Some(callback) = callbacks.write().await.remove(&response.id)
if let Ok(resp) = serde_json::from_str::<JsonRpcResponse>(text) {
if let Some(callback) = callbacks.write().await.remove(&resp.id)
{
info!("<= id: {}, processed", &response.id);
let _ = callback.send(response);
info!("<= id: {}, processed", &resp.id);
let _ = callback.send(resp);
} else {
warn!("<= id: {}, not processed", &response.id);
warn!("<= id: {}, not processed", &resp.id);
}
} else {
error!("Failed to parse ws response: {}", text);
}
}
} else if msg.is_close() {
info!("Connection closed: {}", &wss);
warn!("Connection closed: {}", &wss);
break;
}
}
Expand All @@ -417,5 +421,10 @@ async fn main() {
.await
.unwrap();
info!("Listening on {}", *PROXY_HOST);
axum::serve(listener, app.into_make_service_with_connect_info::<SocketAddr>()).await.unwrap();
axum::serve(
listener,
app.into_make_service_with_connect_info::<SocketAddr>(),
)
.await
.unwrap();
}
Loading

0 comments on commit c440045

Please sign in to comment.