Skip to content

Commit

Permalink
feat: add Prometheus metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
link2xt committed Apr 16, 2024
1 parent 3d5cb20 commit ccb809f
Show file tree
Hide file tree
Showing 9 changed files with 192 additions and 7 deletions.
68 changes: 65 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ async-std = { version = "1.9", features = ["tokio1", "attributes", "unstable"] }
femme = "2.1.0"
humantime = "2.0.1"
log = "0.4.11"
prometheus-client = "0.22.2"
serde = { version = "1.0.114", features = ["derive"] }
sled = "0.34.2"
structopt = "0.3.15"
Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,11 @@ $ ./target/release/notifiers --certificate-file <file.p12> --password <password>
```sh
$ curl -X POST -d '{ "token": "<device token>" }' http://localhost:9000/register
```

### Enabling metrics

To enable OpenMetrics (Prometheus) metrics endpoint,
run with `--metrics` argument,
e.g. `--metrics 127.0.0.1:9001`.
Metrics can then be retrieved with
`curl http://127.0.0.1:9001/metrics`.
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pub mod metrics;
pub mod notifier;
pub mod server;
pub mod state;
21 changes: 19 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::path::PathBuf;
use std::sync::Arc;

use anyhow::{Context, Result};
use async_std::prelude::*;
use structopt::StructOpt;

use notifiers::{notifier, server, state};
use notifiers::{metrics, notifier, server, state};

#[derive(Debug, StructOpt)]
struct Opt {
Expand All @@ -23,6 +24,10 @@ struct Opt {
/// The port on which to start the server.
#[structopt(long, default_value = "9000")]
port: u16,
/// The host and port on which to start the metrics server.
/// For example, `127.0.0.1:9001`.
#[structopt(long)]
metrics: Option<String>,
/// The path to the database file.
#[structopt(long, default_value = "notifiers.db", parse(from_os_str))]
db: PathBuf,
Expand All @@ -37,11 +42,23 @@ async fn main() -> Result<()> {
let opt = Opt::from_args();
let certificate = std::fs::File::open(&opt.certificate_file).context("invalid certificate")?;

let state = state::State::new(&opt.db, certificate, &opt.password, opt.topic.clone())?;
let metrics_state = Arc::new(metrics::Metrics::new());

let state = state::State::new(
&opt.db,
certificate,
&opt.password,
opt.topic.clone(),
metrics_state.clone(),
)?;

let state2 = state.clone();
let host = opt.host.clone();
let port = opt.port;

if let Some(metrics_address) = opt.metrics.clone() {
async_std::task::spawn(async move { metrics::start(metrics_state, metrics_address).await });
}
let server = async_std::task::spawn(async move { server::start(state2, host, port).await });

let notif = async_std::task::spawn(async move { notifier::start(state, opt.interval).await });
Expand Down
81 changes: 81 additions & 0 deletions src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
//! Prometheus (OpenMetrics) metrics server.
//!
//! It is listening on its own address
//! to allow exposting it on a private network only
//! independently of the main service.
use std::sync::atomic::AtomicI64;
use std::sync::Arc;

use prometheus_client::encoding::text::encode;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::registry::Registry;

use anyhow::Result;

#[derive(Debug, Default)]
pub struct Metrics {
pub registry: Registry,

/// Number of successfully sent visible notifications.
pub direct_notifications_total: Counter,

/// Number of tokens registered for heartbeat notifications.
pub heartbeat_token_count: Gauge<i64, AtomicI64>,
}

impl Metrics {
pub fn new() -> Self {
let mut registry = Registry::default();

let direct_notifications_total = Counter::default();
registry.register(
"direct_notifications",
"Number of direct notifications",
direct_notifications_total.clone(),
);

let heartbeat_token_count = Gauge::<i64, AtomicI64>::default();
registry.register(
"heartbeat_token_count",
"Number of tokens registered for heartbeat notifications",
heartbeat_token_count.clone(),
);

Self {
registry,
direct_notifications_total,
heartbeat_token_count,
}
}

/// Counts direct notification.
pub fn inc_direct_notification(&self) {
self.direct_notifications_total.inc();
}

/// Sets number of tokens registered for heartbeat notifications.
pub fn set_heartbeat_token_count(&self, value: usize) {
self.heartbeat_token_count.set(value as i64);
}
}

type State = Arc<Metrics>;

pub async fn start(state: State, server: String) -> Result<()> {
let mut app = tide::with_state(state);
app.at("/metrics").get(metrics);
app.listen(server).await?;
Ok(())
}

async fn metrics(req: tide::Request<State>) -> tide::Result<tide::Response> {
let mut encoded = String::new();
encode(&mut encoded, &req.state().registry).unwrap();
let response = tide::Response::builder(tide::StatusCode::Ok)
.body(encoded)
.content_type("application/openmetrics-text; version=1.0.0; charset=utf-8")
.build();
Ok(response)
}
8 changes: 6 additions & 2 deletions src/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ use anyhow::Result;
use async_std::prelude::*;
use log::*;

use crate::metrics::Metrics;
use crate::state::State;

pub async fn start(state: State, interval: std::time::Duration) -> Result<()> {
let db = state.db();
let metrics = state.metrics();
let production_client = state.production_client();
let sandbox_client = state.sandbox_client();
let topic = state.topic();
Expand All @@ -20,19 +22,20 @@ pub async fn start(state: State, interval: std::time::Duration) -> Result<()> {
);

// first wakeup on startup
wakeup(db, production_client, sandbox_client, topic).await;
wakeup(db, metrics, production_client, sandbox_client, topic).await;

// create interval
let mut interval = async_std::stream::interval(interval);
while interval.next().await.is_some() {
wakeup(db, production_client, sandbox_client, topic).await;
wakeup(db, metrics, production_client, sandbox_client, topic).await;
}

Ok(())
}

async fn wakeup(
db: &sled::Db,
metrics: &Metrics,
production_client: &Client,
sandbox_client: &Client,
topic: Option<&str>,
Expand All @@ -46,6 +49,7 @@ async fn wakeup(
.collect::<Vec<_>>();

info!("sending notifications to {} devices", tokens.len());
metrics.set_heartbeat_token_count(tokens.len());

for device_token in tokens {
info!("notify: {}", device_token);
Expand Down
1 change: 1 addition & 0 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ async fn notify_device(mut req: tide::Request<State>) -> tide::Result<tide::Resp
match res.code {
200 => {
info!("delivered notification for {}", device_token);
req.state().metrics().inc_direct_notification();
}
_ => {
warn!("unexpected status: {:?}", res);
Expand Down
10 changes: 10 additions & 0 deletions src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use async_std::sync::Arc;
use log::*;
use std::io::Seek;

use crate::metrics::Metrics;

#[derive(Debug, Clone)]
pub struct State {
inner: Arc<InnerState>,
Expand All @@ -20,6 +22,8 @@ pub struct InnerState {
sandbox_client: Client,

topic: Option<String>,

metrics: Arc<Metrics>,
}

impl State {
Expand All @@ -28,6 +32,7 @@ impl State {
mut certificate: std::fs::File,
password: &str,
topic: Option<String>,
metrics: Arc<Metrics>,
) -> Result<Self> {
let db = sled::open(db)?;
let production_client =
Expand All @@ -45,6 +50,7 @@ impl State {
production_client,
sandbox_client,
topic,
metrics,
}),
})
}
Expand All @@ -64,4 +70,8 @@ impl State {
pub fn topic(&self) -> Option<&str> {
self.inner.topic.as_deref()
}

pub fn metrics(&self) -> &Metrics {
self.inner.metrics.as_ref()
}
}

0 comments on commit ccb809f

Please sign in to comment.