Skip to content

Commit

Permalink
keep rewriting
Browse files Browse the repository at this point in the history
  • Loading branch information
link2xt committed Aug 1, 2024
1 parent 3968f88 commit 52df2e7
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 32 deletions.
27 changes: 16 additions & 11 deletions src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@
use std::sync::atomic::AtomicI64;

use anyhow::Result;
use axum::http::{header, HeaderMap};
use axum::response::IntoResponse;
use axum::routing::get;
use prometheus_client::encoding::text::encode;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::registry::Registry;

use anyhow::Result;

use crate::state::State;

#[derive(Debug, Default)]
Expand Down Expand Up @@ -86,18 +88,21 @@ impl Metrics {
}

pub async fn start(state: State, server: String) -> Result<()> {
let mut app = tide::with_state(state);
app.at("/metrics").get(metrics);
app.listen(server).await?;
let app = axum::Router::new()
.route("/metrics", get(metrics))
.with_state(state);
let listener = tokio::net::TcpListener::bind(server).await?;
axum::serve(listener, app).await?;
Ok(())
}

async fn metrics(req: tide::Request<State>) -> tide::Result<tide::Response> {
async fn metrics(req: tide::Request<State>) -> impl IntoResponse {
let mut encoded = String::new();
encode(&mut encoded, &req.state().metrics().registry).unwrap();
let response = tide::Response::builder(tide::StatusCode::Ok)
.body(encoded)
.content_type("application/openmetrics-text; version=1.0.0; charset=utf-8")
.build();
Ok(response)
let mut headers = HeaderMap::new();
headers.insert(
header::CONTENT_TYPE,
"application/openmetrics-text; version=1.0.0; charset=utf-8",
);
(headers, encoded)
}
6 changes: 3 additions & 3 deletions src/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub async fn start(state: State, interval: std::time::Duration) -> Result<()> {

let Some((timestamp, token)) = schedule.pop()? else {
info!("No tokens to notify, sleeping for a minute.");
async_std::task::sleep(Duration::from_secs(60)).await;
tokio::time::sleep(Duration::from_secs(60)).await;
continue;
};

Expand All @@ -49,7 +49,7 @@ pub async fn start(state: State, interval: std::time::Duration) -> Result<()> {
"Sleeping for {} before next notification.",
humantime::format_duration(delay)
);
async_std::task::sleep(delay).await;
tokio::time::sleep(delay).await;
}

if let Err(err) = wakeup(
Expand All @@ -66,7 +66,7 @@ pub async fn start(state: State, interval: std::time::Duration) -> Result<()> {

// Sleep to avoid busy looping and flooding APNS
// with requests in case of database errors.
async_std::task::sleep(Duration::from_secs(60)).await;
tokio::time::sleep(Duration::from_secs(60)).await;
}
}
}
Expand Down
37 changes: 20 additions & 17 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@ use a2::{
Priority, PushType,
};
use anyhow::{bail, Error, Result};
use axum::http::StatusCode;
use axum::routing::{get, post};
use log::*;
use serde::Deserialize;
use std::str::FromStr;
use axum::routing::{get, post};

use crate::metrics::Metrics;
use crate::state::State;
Expand All @@ -28,19 +29,21 @@ struct DeviceQuery {
}

/// Registers a device for heartbeat notifications.
async fn register_device(mut req: tide::Request<State>) -> tide::Result<tide::Response> {
let query: DeviceQuery = req.body_json().await?;
async fn register_device(
axum::extract::State(state): axum::extract::State(State),
axum::extract::Json(query): axum::extract::Json<DeviceQuery>,
) -> Result<StatusCode> {
info!("register_device {}", query.token);

let schedule = req.state().schedule();
let schedule = state.schedule();
schedule.insert_token_now(&query.token)?;

// Flush database to ensure we don't lose this token in case of restart.
schedule.flush().await?;

req.state().metrics().heartbeat_registrations_total.inc();
state.metrics().heartbeat_registrations_total.inc();

Ok(tide::Response::new(tide::StatusCode::Ok))
Ok(StatusCode::OK)
}

enum NotificationToken {
Expand Down Expand Up @@ -94,14 +97,14 @@ async fn notify_fcm(
) -> tide::Result<tide::Response> {
let Some(fcm_api_key) = fcm_api_key else {
warn!("Cannot notify FCM because key is not set");
return Ok(tide::Response::new(tide::StatusCode::InternalServerError));
return Ok(StatusCode::INTERNAL_SERVER_ERROR);
};

if !token
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '_' || c == ':' || c == '-')
{
return Ok(tide::Response::new(tide::StatusCode::Gone));
return Ok(StatusCode::GONE);
}

let url = "https://fcm.googleapis.com/v1/projects/delta-chat-fcm/messages:send";
Expand All @@ -119,15 +122,15 @@ async fn notify_fcm(
warn!("Failed to deliver FCM notification to {token}");
warn!("BODY: {body:?}");
warn!("RES: {res:?}");
return Ok(tide::Response::new(tide::StatusCode::Gone));
return Ok(StatusCode::GONE);
}
if status.is_server_error() {
warn!("Internal server error while attempting to deliver FCM notification to {token}");
return Ok(tide::Response::new(tide::StatusCode::InternalServerError));
return Ok(StatusCode::INTERNAL_SERVER_ERROR);
}
info!("Delivered notification to FCM token {token}");
metrics.fcm_notifications_total.inc();
Ok(tide::Response::new(tide::StatusCode::Ok))
Ok(StatusCode::OK)
}

async fn notify_apns(
Expand Down Expand Up @@ -167,7 +170,7 @@ async fn notify_apns(
}
}

Ok(tide::Response::new(tide::StatusCode::Ok))
Ok(tide::Response::new(StatusCode::OK))
}
Err(ResponseError(res)) => {
info!("Removing token {} due to error {:?}.", &device_token, res);
Expand All @@ -180,20 +183,20 @@ async fn notify_apns(
error!("failed to remove {}: {:?}", &device_token, err);
}
// Return 410 Gone response so email server can remove the token.
Ok(tide::Response::new(tide::StatusCode::Gone))
Ok(tide::Response::new(StatusCode::GONE))
} else {
Ok(tide::Response::new(tide::StatusCode::InternalServerError))
Ok(tide::Response::new(StatusCode::INTERNAL_SERVER_ERROR))
}
}
Err(err) => {
error!("failed to send notification: {}, {:?}", device_token, err);
Ok(tide::Response::new(tide::StatusCode::InternalServerError))
Ok(tide::Response::new(StatusCode::INTERNAL_SERVER_ERROR))
}
}
}

/// Notifies a single device with a visible notification.
async fn notify_device(mut req: tide::Request<State>) -> tide::Result<tide::Response> {
async fn notify_device(mut req: tide::Request<State>) -> Result<tide::Response> {
let device_token = req.body_string().await?;
info!("Got direct notification for {device_token}.");

Expand All @@ -206,7 +209,7 @@ async fn notify_device(mut req: tide::Request<State>) -> tide::Result<tide::Resp
} => {
let client = req.state().fcm_client().clone();
let Ok(fcm_token) = req.state().fcm_token().await else {
return Ok(tide::Response::new(tide::StatusCode::InternalServerError));
return Ok(tide::Response::new(StatusCode::INTERNAL_SERVER_ERROR));
};
let metrics = req.state().metrics();
notify_fcm(
Expand Down
2 changes: 1 addition & 1 deletion src/state.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::io::Seek;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;

use a2::{Client, Endpoint};
use anyhow::{Context as _, Result};
use tokio::sync::Arc;

use crate::metrics::Metrics;
use crate::schedule::Schedule;
Expand Down

0 comments on commit 52df2e7

Please sign in to comment.