From 52df2e73c9e7e705eb5b63d762bcf182d2a3c631 Mon Sep 17 00:00:00 2001 From: link2xt Date: Thu, 1 Aug 2024 22:54:57 +0000 Subject: [PATCH] keep rewriting --- src/metrics.rs | 27 ++++++++++++++++----------- src/notifier.rs | 6 +++--- src/server.rs | 37 ++++++++++++++++++++----------------- src/state.rs | 2 +- 4 files changed, 40 insertions(+), 32 deletions(-) diff --git a/src/metrics.rs b/src/metrics.rs index 80f35c3..03347dd 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -6,13 +6,15 @@ use std::sync::atomic::AtomicI64; +use anyhow::Result; +use axum::http::{header, HeaderMap}; +use axum::response::IntoResponse; +use axum::routing::get; use prometheus_client::encoding::text::encode; use prometheus_client::metrics::counter::Counter; use prometheus_client::metrics::gauge::Gauge; use prometheus_client::registry::Registry; -use anyhow::Result; - use crate::state::State; #[derive(Debug, Default)] @@ -86,18 +88,21 @@ impl Metrics { } pub async fn start(state: State, server: String) -> Result<()> { - let mut app = tide::with_state(state); - app.at("/metrics").get(metrics); - app.listen(server).await?; + let app = axum::Router::new() + .route("/metrics", get(metrics)) + .with_state(state); + let listener = tokio::net::TcpListener::bind(server).await?; + axum::serve(listener, app).await?; Ok(()) } -async fn metrics(req: tide::Request) -> tide::Result { +async fn metrics(req: tide::Request) -> impl IntoResponse { let mut encoded = String::new(); encode(&mut encoded, &req.state().metrics().registry).unwrap(); - let response = tide::Response::builder(tide::StatusCode::Ok) - .body(encoded) - .content_type("application/openmetrics-text; version=1.0.0; charset=utf-8") - .build(); - Ok(response) + let mut headers = HeaderMap::new(); + headers.insert( + header::CONTENT_TYPE, + "application/openmetrics-text; version=1.0.0; charset=utf-8", + ); + (headers, encoded) } diff --git a/src/notifier.rs b/src/notifier.rs index 2d17f57..602b29b 100644 --- a/src/notifier.rs +++ b/src/notifier.rs @@ -28,7 +28,7 @@ pub async fn start(state: State, interval: std::time::Duration) -> Result<()> { let Some((timestamp, token)) = schedule.pop()? else { info!("No tokens to notify, sleeping for a minute."); - async_std::task::sleep(Duration::from_secs(60)).await; + tokio::time::sleep(Duration::from_secs(60)).await; continue; }; @@ -49,7 +49,7 @@ pub async fn start(state: State, interval: std::time::Duration) -> Result<()> { "Sleeping for {} before next notification.", humantime::format_duration(delay) ); - async_std::task::sleep(delay).await; + tokio::time::sleep(delay).await; } if let Err(err) = wakeup( @@ -66,7 +66,7 @@ pub async fn start(state: State, interval: std::time::Duration) -> Result<()> { // Sleep to avoid busy looping and flooding APNS // with requests in case of database errors. - async_std::task::sleep(Duration::from_secs(60)).await; + tokio::time::sleep(Duration::from_secs(60)).await; } } } diff --git a/src/server.rs b/src/server.rs index c126612..689b8d4 100644 --- a/src/server.rs +++ b/src/server.rs @@ -3,10 +3,11 @@ use a2::{ Priority, PushType, }; use anyhow::{bail, Error, Result}; +use axum::http::StatusCode; +use axum::routing::{get, post}; use log::*; use serde::Deserialize; use std::str::FromStr; -use axum::routing::{get, post}; use crate::metrics::Metrics; use crate::state::State; @@ -28,19 +29,21 @@ struct DeviceQuery { } /// Registers a device for heartbeat notifications. -async fn register_device(mut req: tide::Request) -> tide::Result { - let query: DeviceQuery = req.body_json().await?; +async fn register_device( + axum::extract::State(state): axum::extract::State(State), + axum::extract::Json(query): axum::extract::Json, +) -> Result { info!("register_device {}", query.token); - let schedule = req.state().schedule(); + let schedule = state.schedule(); schedule.insert_token_now(&query.token)?; // Flush database to ensure we don't lose this token in case of restart. schedule.flush().await?; - req.state().metrics().heartbeat_registrations_total.inc(); + state.metrics().heartbeat_registrations_total.inc(); - Ok(tide::Response::new(tide::StatusCode::Ok)) + Ok(StatusCode::OK) } enum NotificationToken { @@ -94,14 +97,14 @@ async fn notify_fcm( ) -> tide::Result { let Some(fcm_api_key) = fcm_api_key else { warn!("Cannot notify FCM because key is not set"); - return Ok(tide::Response::new(tide::StatusCode::InternalServerError)); + return Ok(StatusCode::INTERNAL_SERVER_ERROR); }; if !token .chars() .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == ':' || c == '-') { - return Ok(tide::Response::new(tide::StatusCode::Gone)); + return Ok(StatusCode::GONE); } let url = "https://fcm.googleapis.com/v1/projects/delta-chat-fcm/messages:send"; @@ -119,15 +122,15 @@ async fn notify_fcm( warn!("Failed to deliver FCM notification to {token}"); warn!("BODY: {body:?}"); warn!("RES: {res:?}"); - return Ok(tide::Response::new(tide::StatusCode::Gone)); + return Ok(StatusCode::GONE); } if status.is_server_error() { warn!("Internal server error while attempting to deliver FCM notification to {token}"); - return Ok(tide::Response::new(tide::StatusCode::InternalServerError)); + return Ok(StatusCode::INTERNAL_SERVER_ERROR); } info!("Delivered notification to FCM token {token}"); metrics.fcm_notifications_total.inc(); - Ok(tide::Response::new(tide::StatusCode::Ok)) + Ok(StatusCode::OK) } async fn notify_apns( @@ -167,7 +170,7 @@ async fn notify_apns( } } - Ok(tide::Response::new(tide::StatusCode::Ok)) + Ok(tide::Response::new(StatusCode::OK)) } Err(ResponseError(res)) => { info!("Removing token {} due to error {:?}.", &device_token, res); @@ -180,20 +183,20 @@ async fn notify_apns( error!("failed to remove {}: {:?}", &device_token, err); } // Return 410 Gone response so email server can remove the token. - Ok(tide::Response::new(tide::StatusCode::Gone)) + Ok(tide::Response::new(StatusCode::GONE)) } else { - Ok(tide::Response::new(tide::StatusCode::InternalServerError)) + Ok(tide::Response::new(StatusCode::INTERNAL_SERVER_ERROR)) } } Err(err) => { error!("failed to send notification: {}, {:?}", device_token, err); - Ok(tide::Response::new(tide::StatusCode::InternalServerError)) + Ok(tide::Response::new(StatusCode::INTERNAL_SERVER_ERROR)) } } } /// Notifies a single device with a visible notification. -async fn notify_device(mut req: tide::Request) -> tide::Result { +async fn notify_device(mut req: tide::Request) -> Result { let device_token = req.body_string().await?; info!("Got direct notification for {device_token}."); @@ -206,7 +209,7 @@ async fn notify_device(mut req: tide::Request) -> tide::Result { let client = req.state().fcm_client().clone(); let Ok(fcm_token) = req.state().fcm_token().await else { - return Ok(tide::Response::new(tide::StatusCode::InternalServerError)); + return Ok(tide::Response::new(StatusCode::INTERNAL_SERVER_ERROR)); }; let metrics = req.state().metrics(); notify_fcm( diff --git a/src/state.rs b/src/state.rs index 5e26832..77f40e3 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,10 +1,10 @@ use std::io::Seek; use std::path::Path; +use std::sync::Arc; use std::time::Duration; use a2::{Client, Endpoint}; use anyhow::{Context as _, Result}; -use tokio::sync::Arc; use crate::metrics::Metrics; use crate::schedule::Schedule;