Skip to content

Commit

Permalink
Merge pull request #36 from Overmuse/SR/metrics
Browse files Browse the repository at this point in the history
add one metric
  • Loading branch information
SebRollen authored Jul 8, 2021
2 parents a724b0f + 8265389 commit 8abe259
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 27 deletions.
73 changes: 48 additions & 25 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "order-manager"
version = "6.1.1"
version = "6.1.2"
authors = ["Sebastian Rollen <seb@overmu.se>"]
edition = "2018"

Expand All @@ -14,8 +14,10 @@ config = "0.10.1"
dotenv = "0.15.0"
futures = "0.3.15"
kafka-settings = {git = "ssh://git@github.com/Overmuse/kafka-settings.git", tag = "v0.3.1" }
lazy_static = "1.4.0"
num-traits = "0.2.14"
position-intents = {git = "ssh://git@github.com/Overmuse/position-intents.git", tag = "v0.6.0" }
prometheus = "0.12.0"
rdkafka = { version = "0.26.0", features = ["ssl-vendored"] }
refinery = { version = "0.5.0", features = ["tokio-postgres"] }
rust_decimal = { version = "1.14.1", features = ["tokio-pg"] }
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use tracing::error;
mod db;
mod intent_scheduler;
pub mod manager;
mod metrics;
mod order_sender;
mod settings;
pub mod types;
Expand Down
3 changes: 3 additions & 0 deletions src/manager/order_updates.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::OrderManager;
use crate::db;
use crate::metrics::NUM_TRADES;
use crate::types::{split_lot, Allocation, Lot};
use alpaca::{Event, OrderEvent, Side};
use anyhow::{anyhow, Context, Result};
Expand Down Expand Up @@ -31,6 +32,8 @@ impl OrderManager {
Event::Fill {
price, timestamp, ..
} => {
debug!("Order filled");
NUM_TRADES.inc();
let new_lot = self
.make_lot(&id, ticker, timestamp, price, qty)
.await
Expand Down
14 changes: 14 additions & 0 deletions src/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use lazy_static::lazy_static;
use prometheus::{IntCounter, Registry};

lazy_static! {
pub static ref REGISTRY: Registry = Registry::new();
pub static ref NUM_TRADES: IntCounter =
IntCounter::new("num_trades", "Number of trades").expect("Metric can be created");
}

pub fn register_custom_metrics() {
REGISTRY
.register(Box::new(NUM_TRADES.clone()))
.expect("collector can be registered");
}
40 changes: 39 additions & 1 deletion src/webserver.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::db;
use crate::metrics::{register_custom_metrics, REGISTRY};
use crate::types::Owner;
use std::convert::Infallible;
use std::net::{Ipv4Addr, SocketAddrV4};
Expand Down Expand Up @@ -51,8 +52,43 @@ async fn get_pending_orders(db: Db) -> Result<impl warp::Reply, warp::Rejection>
Ok(warp::reply::json(&pending_orders))
}

async fn metrics_handler() -> Result<impl warp::Reply, warp::Rejection> {
use prometheus::Encoder;
let encoder = prometheus::TextEncoder::new();

let mut buffer = Vec::new();
if let Err(e) = encoder.encode(&REGISTRY.gather(), &mut buffer) {
eprintln!("could not encode custom metrics: {}", e);
};
let mut res = match String::from_utf8(buffer.clone()) {
Ok(v) => v,
Err(e) => {
eprintln!("custom metrics could not be from_utf8'd: {}", e);
String::default()
}
};
buffer.clear();

let mut buffer = Vec::new();
if let Err(e) = encoder.encode(&prometheus::gather(), &mut buffer) {
eprintln!("could not encode prometheus metrics: {}", e);
};
let res_custom = match String::from_utf8(buffer.clone()) {
Ok(v) => v,
Err(e) => {
eprintln!("prometheus metrics could not be from_utf8'd: {}", e);
String::default()
}
};
buffer.clear();

res.push_str(&res_custom);
Ok(res)
}

#[tracing::instrument(skip(db))]
pub async fn run(port: u16, db: Db) {
register_custom_metrics();
let health = warp::path!("health").map(|| "");
let get_allocations = warp::path("allocations")
.and(warp::get())
Expand All @@ -75,13 +111,15 @@ pub async fn run(port: u16, db: Db) {
.and(warp::get())
.and(with_db(db.clone()))
.and_then(get_pending_orders);
let metrics = warp::path("metrics").and_then(metrics_handler);
let routes = warp::get()
.and(health)
.or(get_allocations)
.or(set_allocation_owner)
.or(lots)
.or(claims)
.or(pending_orders);
.or(pending_orders)
.or(metrics);
let address = SocketAddrV4::new(Ipv4Addr::new(0, 0, 0, 0), port);
warp::serve(routes).run(address).await
}

0 comments on commit 8abe259

Please sign in to comment.