From 2531d93de9283b3182fcefd16d20ff3c6a1c7ef9 Mon Sep 17 00:00:00 2001 From: Michael Jeffrey Date: Mon, 6 May 2024 13:14:49 -0700 Subject: [PATCH] Add client_requests timing module to metrics (#802) * Add client_requests timing module to metrics - Add a timing span to anything that can be instrumented and returns a Result. Example: ```ignore let client = GatewayClient::new(channel); client.info(req) .with_timing("iot_fetch_info") .await?; ``` This will result in a prometheus metric >> client_request_duration_seconds{name = "iot_fetch_info", quantile="xxx"} - Install the `ApiTimingLayer`. Adding `.with_span_events(FmtSpan::CLOSE)` to a regular format layer will print the timing spans to stdout as well. Example: ```ignore tracing_subscriber::registry() .with(tracing_subscriber::fmt::layer().with_span_events(FmtSpan::CLOSE)) .with(metrics::client_requests::client_request_timing_layer("histogram_name")) .init(); ``` - Remove unused `install_metrics` function, replace with nested `install` function that `start_metrics` delegates to. This allows us to start metrics in tests without needing to make a `Settings` struct. * Update metrics (#805) * Update metrics crate Also bumps metrics-exporter-prometheus. The biggest change from 0.21 -> 0.22 is in this PR https://github.com/metrics-rs/metrics/pull/394 * allow for versions greater than 0.22 this may change if the api to metrics changes _again_ before a major version. * make string values consts Makes the values hide a little bit less in the code --- Cargo.lock | 170 ++++++++++++----- Cargo.toml | 2 +- boost_manager/src/telemetry.rs | 2 +- boost_manager/src/updater.rs | 6 +- db_store/src/meta.rs | 4 +- db_store/src/metric_tracker.rs | 4 +- file_store/src/file_info_poller.rs | 3 +- file_store/src/file_sink.rs | 10 +- iot_config/src/telemetry.rs | 31 ++-- iot_packet_verifier/src/burner.rs | 2 +- iot_verifier/src/entropy_loader.rs | 2 +- iot_verifier/src/gateway_cache.rs | 4 +- iot_verifier/src/region_cache.rs | 4 +- iot_verifier/src/telemetry.rs | 22 +-- iot_verifier/src/witness_updater.rs | 2 +- metrics/Cargo.toml | 6 + metrics/src/client_requests.rs | 205 +++++++++++++++++++++ metrics/src/lib.rs | 28 ++- mobile_config/src/telemetry.rs | 4 +- mobile_packet_verifier/src/burner.rs | 6 +- mobile_verifier/src/coverage.rs | 8 +- mobile_verifier/src/data_session.rs | 14 +- mobile_verifier/src/heartbeats/cbrs.rs | 10 +- mobile_verifier/src/heartbeats/wifi.rs | 10 +- mobile_verifier/src/speedtests.rs | 8 +- mobile_verifier/src/subscriber_location.rs | 9 +- mobile_verifier/src/telemetry.rs | 4 +- poc_entropy/src/server.rs | 2 +- price/src/metrics.rs | 4 +- reward_index/src/telemetry.rs | 2 +- solana/src/burn.rs | 4 +- 31 files changed, 440 insertions(+), 152 deletions(-) create mode 100644 metrics/src/client_requests.rs diff --git a/Cargo.lock b/Cargo.lock index b69555b8f..885237365 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -82,9 +82,9 @@ dependencies = [ [[package]] name = "ahash" -version = "0.8.7" +version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77c3a9648d43b9cd48db467b3f87fdd6e146bcc88ab0180006cef2179fe11d01" +checksum = "e89da841a80418a9b391ebaea17f5c112ffaaa96f621d2c285b5174da76b9011" dependencies = [ "cfg-if", "const-random", @@ -2969,6 +2969,21 @@ version = "1.0.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.1.0" @@ -3276,7 +3291,7 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c6fb938100651db317719f46877a3cd82105920be4ea2ff49d55d1d65fa7bec1" dependencies = [ - "ahash 0.8.7", + "ahash 0.8.11", "auto_ops", "either", "float_eq", @@ -3320,7 +3335,7 @@ version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "33ff8ae62cd3a9102e5637afc8452c55acf3844001bd5374e0b0bd7b6616c038" dependencies = [ - "ahash 0.8.7", + "ahash 0.8.11", ] [[package]] @@ -3328,6 +3343,9 @@ name = "hashbrown" version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7dfda62a12f55daeae5015f81b0baea145391cb4520f86c248fc615d72640d12" +dependencies = [ + "ahash 0.8.11", +] [[package]] name = "hashlink" @@ -3401,7 +3419,7 @@ dependencies = [ "bs58 0.5.0", "byteorder", "ed25519-compact", - "getrandom 0.2.10", + "getrandom 0.1.16", "k256", "lazy_static", "multihash", @@ -3728,6 +3746,19 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper 0.14.28", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "hyper-util" version = "0.1.2" @@ -4352,15 +4383,6 @@ version = "0.4.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" -[[package]] -name = "mach2" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d0d1830bcd151a6fc4aea1369af235b36c1528fe976b8ff678683c9995eade8" -dependencies = [ - "libc", -] - [[package]] name = "matchers" version = "0.1.0" @@ -4448,23 +4470,23 @@ dependencies = [ [[package]] name = "metrics" -version = "0.21.1" +version = "0.22.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fde3af1a009ed76a778cb84fdef9e7dbbdf5775ae3e4cc1f434a6a307f6f76c5" +checksum = "2be3cbd384d4e955b231c895ce10685e3d8260c5ccffae898c96c723b0772835" dependencies = [ - "ahash 0.8.7", - "metrics-macros", + "ahash 0.8.11", "portable-atomic", ] [[package]] name = "metrics-exporter-prometheus" -version = "0.12.1" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a4964177ddfdab1e3a2b37aec7cf320e14169abb0ed73999f558136409178d5" +checksum = "83a4c4718a371ddfb7806378f23617876eea8b82e5ff1324516bcd283249d9ea" dependencies = [ "base64 0.21.7", "hyper 0.14.28", + "hyper-tls", "indexmap 1.9.3", "ipnet", "metrics", @@ -4475,26 +4497,15 @@ dependencies = [ "tracing", ] -[[package]] -name = "metrics-macros" -version = "0.7.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddece26afd34c31585c74a4db0630c376df271c285d682d1e55012197830b6df" -dependencies = [ - "proc-macro2 1.0.69", - "quote 1.0.33", - "syn 2.0.38", -] - [[package]] name = "metrics-util" -version = "0.15.1" +version = "0.16.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4de2ed6e491ed114b40b732e4d1659a9d53992ebd87490c44a6ffe23739d973e" +checksum = "8b07a5eb561b8cbc16be2d216faf7757f9baf3bfb94dbb0fae3df8387a5bb47f" dependencies = [ "crossbeam-epoch", "crossbeam-utils", - "hashbrown 0.13.1", + "hashbrown 0.14.1", "metrics", "num_cpus", "quanta", @@ -4741,6 +4752,24 @@ version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" +[[package]] +name = "native-tls" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07226173c32f2926027b63cce4bcd8076c3552846cbe7925f3aaffeac0a3b92e" +dependencies = [ + "lazy_static", + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "nb" version = "0.1.3" @@ -5012,12 +5041,50 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" +[[package]] +name = "openssl" +version = "0.10.64" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95a0481286a310808298130d22dd1fef0fa571e05a8f44ec801801e84b216b1f" +dependencies = [ + "bitflags 2.5.0", + "cfg-if", + "foreign-types", + "libc", + "once_cell", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2 1.0.69", + "quote 1.0.33", + "syn 2.0.38", +] + [[package]] name = "openssl-probe" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf" +[[package]] +name = "openssl-sys" +version = "0.9.102" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c597637d56fbc83893a35eb0dd04b2b8e7a50c91e64e9493e398b5df4fb45fa2" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "os_str_bytes" version = "6.4.1" @@ -5270,12 +5337,16 @@ dependencies = [ name = "poc-metrics" version = "0.1.0" dependencies = [ + "futures", "metrics", "metrics-exporter-prometheus", + "reqwest", "serde", "thiserror", + "tokio", "tower", "tracing", + "tracing-subscriber", ] [[package]] @@ -5521,13 +5592,12 @@ dependencies = [ [[package]] name = "quanta" -version = "0.11.1" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a17e662a7a8291a865152364c20c7abc5e60486ab2001e8ec10b24862de0b9ab" +checksum = "8e5167a477619228a0b284fac2674e3c388cba90631d7b7de620e6f1fcd08da5" dependencies = [ "crossbeam-utils", "libc", - "mach2", "once_cell", "raw-cpuid", "wasi 0.11.0+wasi-snapshot-preview1", @@ -5685,11 +5755,11 @@ dependencies = [ [[package]] name = "raw-cpuid" -version = "10.7.0" +version = "11.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c297679cb867470fa8c9f67dbba74a78d78e3e98d7cf2b08d6d71540f797332" +checksum = "e29830cbb1290e404f24c73af91c5d8d631ce7e128691e9477556b540cd01ecd" dependencies = [ - "bitflags 1.3.2", + "bitflags 2.5.0", ] [[package]] @@ -6754,7 +6824,7 @@ version = "1.16.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "361cc834e5fbbe1a73f1d904fcb8ab052a665e5be6061bd1ba7ab478d7d17c9c" dependencies = [ - "ahash 0.8.7", + "ahash 0.8.11", "blake3", "block-buffer 0.10.4", "bs58 0.4.0", @@ -6856,7 +6926,7 @@ version = "1.16.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4d44a4998ba6d9b37e89399d9ce2812e84489dd4665df619fb23366e1c2ec1b" dependencies = [ - "ahash 0.8.7", + "ahash 0.8.11", "bincode", "bv", "caps", @@ -7878,6 +7948,16 @@ dependencies = [ "syn 2.0.38", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.23.4" @@ -8322,6 +8402,12 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77439c1b53d2303b20d9459b1ade71a83c716e3f9c34f3228c00e6f185d6c002" +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "vec_map" version = "0.8.2" diff --git a/Cargo.toml b/Cargo.toml index 64d33e0ad..48b60ed99 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -80,7 +80,7 @@ reqwest = { version = "0", default-features = false, features = [ ] } beacon = { git = "https://github.com/helium/proto", branch = "master" } humantime = "2" -metrics = "0.21" +metrics = ">=0.22" metrics-exporter-prometheus = "0" tracing = "0" tracing-subscriber = { version = "0", default-features = false, features = [ diff --git a/boost_manager/src/telemetry.rs b/boost_manager/src/telemetry.rs index 7589b68df..4437b9ed1 100644 --- a/boost_manager/src/telemetry.rs +++ b/boost_manager/src/telemetry.rs @@ -16,7 +16,7 @@ pub async fn last_reward_processed_time( db: &Pool, datetime: DateTime, ) -> anyhow::Result<()> { - metrics::gauge!(LAST_REWARD_PROCESSED_TIME, datetime.timestamp() as f64); + metrics::gauge!(LAST_REWARD_PROCESSED_TIME).set(datetime.timestamp() as f64); meta::store(db, LAST_REWARD_PROCESSED_TIME, datetime.timestamp()).await?; Ok(()) diff --git a/boost_manager/src/updater.rs b/boost_manager/src/updater.rs index 02b0350a1..22eafda50 100644 --- a/boost_manager/src/updater.rs +++ b/boost_manager/src/updater.rs @@ -143,9 +143,9 @@ where async fn check_failed_activations(&self) -> Result<()> { let num_marked_failed = db::update_failed_activations(&self.pool).await?; - metrics::counter!("failed_activations", num_marked_failed); + metrics::counter!("failed_activations").increment(num_marked_failed); let total_failed_count = db::get_failed_activations_count(&self.pool).await?; - metrics::gauge!("db_failed_row_count", total_failed_count as f64); + metrics::gauge!("db_failed_row_count").set(total_failed_count as f64); if total_failed_count > 0 { tracing::warn!("{} failed status activations ", total_failed_count); }; @@ -159,7 +159,7 @@ where summed_activations_count: u64, ) -> Result<()> { tracing::info!("processed batch of {} activations successfully", batch_size); - metrics::counter!("success_activations", summed_activations_count); + metrics::counter!("success_activations").increment(summed_activations_count); db::update_success_batch(&self.pool, ids).await?; Ok(()) } diff --git a/db_store/src/meta.rs b/db_store/src/meta.rs index d4dfc73b0..ffaa99aa6 100644 --- a/db_store/src/meta.rs +++ b/db_store/src/meta.rs @@ -6,11 +6,11 @@ macro_rules! query_exec_timed { ( $name:literal, $query:expr, $meth:ident, $exec:expr ) => {{ match poc_metrics::record_duration!(concat!($name, "_duration"), $query.$meth($exec).await) { Ok(x) => { - metrics::increment_counter!(concat!($name, "_count"), "status" => "ok"); + metrics::counter!(concat!($name, "_count"), "status" => "ok").increment(1); Ok(x) } Err(e) => { - metrics::increment_counter!(concat!($name, "_count"), "status" => "error"); + metrics::counter!(concat!($name, "_count"), "status" => "error").increment(1); Err(Error::SqlError(e)) } } diff --git a/db_store/src/metric_tracker.rs b/db_store/src/metric_tracker.rs index 19357c237..b0be3db2c 100644 --- a/db_store/src/metric_tracker.rs +++ b/db_store/src/metric_tracker.rs @@ -14,7 +14,7 @@ async fn run(size_name: String, idle_name: String, pool: sqlx::Pool self.file_info.prefix.clone(), "process-name" => self.process_name.clone(), - ); + ).set(latency.num_seconds() as f64); recorder.record(&self.process_name, &self.file_info).await?; Ok(futures::stream::iter(self.data.into_iter()).boxed()) diff --git a/file_store/src/file_sink.rs b/file_store/src/file_sink.rs index 13d512608..0081e53ca 100644 --- a/file_store/src/file_sink.rs +++ b/file_store/src/file_sink.rs @@ -127,7 +127,7 @@ impl FileSinkBuilder { metric: self.metric, }; - metrics::register_counter!(client.metric, vec![OK_LABEL]); + metrics::counter!(client.metric, vec![OK_LABEL]); let mut sink = FileSink { target_path: self.target_path, @@ -172,22 +172,22 @@ impl FileSinkClient { tokio::select! { result = self.sender.send_timeout(Message::Data(on_write_tx, bytes), SEND_TIMEOUT) => match result { Ok(_) => { - metrics::increment_counter!( + metrics::counter!( self.metric, labels .chain(std::iter::once(OK_LABEL)) .collect::>() - ); + ).increment(1); tracing::debug!("file_sink write succeeded for {:?}", self.metric); Ok(on_write_rx) } Err(SendTimeoutError::Closed(_)) => { - metrics::increment_counter!( + metrics::counter!( self.metric, labels .chain(std::iter::once(ERROR_LABEL)) .collect::>() - ); + ).increment(1); tracing::error!("file_sink write failed for {:?} channel closed", self.metric); Err(Error::channel()) } diff --git a/iot_config/src/telemetry.rs b/iot_config/src/telemetry.rs index 883c91f73..d55a8adc5 100644 --- a/iot_config/src/telemetry.rs +++ b/iot_config/src/telemetry.rs @@ -14,19 +14,19 @@ const GATEWAY_CHAIN_LOOKUP_DURATION_METRIC: &str = concat!(env!("CARGO_PKG_NAME"), "-", "gateway-info-lookup-duration"); pub fn initialize() { - metrics::gauge!(STREAM_METRIC, 0.0); + metrics::gauge!(STREAM_METRIC).set(0.0); } pub fn count_request(service: &'static str, rpc: &'static str) { - metrics::increment_counter!(RPC_METRIC, "service" => service, "rpc" => rpc); + metrics::counter!(RPC_METRIC, "service" => service, "rpc" => rpc).increment(1); } pub fn count_gateway_info_lookup(result: &'static str) { - metrics::increment_counter!(GATEWAY_CHAIN_LOOKUP_METRIC, "result" => result); + metrics::counter!(GATEWAY_CHAIN_LOOKUP_METRIC, "result" => result).increment(1); } pub fn gauge_hexes(cells: usize) { - metrics::gauge!(REGION_HEX_METRIC, cells as f64); + metrics::gauge!(REGION_HEX_METRIC).set(cells as f64); } pub fn count_region_lookup( @@ -35,37 +35,38 @@ pub fn count_region_lookup( ) { let reported_region = reported_region.map_or_else(|| "LOOKUP_FAILED".to_string(), |region| region.to_string()); - metrics::increment_counter!( + metrics::counter!( REGION_LOOKUP_METRIC, // per metrics docs, &str should be preferred for performance; should the regions be // mapped through a match of region => &'static str of the name? "default_region" => default_region.to_string(), "reported_region" => reported_region - ); + ) + .increment(1); } pub fn duration_gateway_info_lookup(start: std::time::Instant) { - metrics::histogram!(GATEWAY_CHAIN_LOOKUP_DURATION_METRIC, start.elapsed()); + metrics::histogram!(GATEWAY_CHAIN_LOOKUP_DURATION_METRIC).record(start.elapsed()); } pub fn count_skf_updates(adds: usize, removes: usize) { - metrics::counter!(SKF_ADD_COUNT_METRIC, adds as u64); - metrics::counter!(SKF_REMOVE_COUNT_METRIC, removes as u64); + metrics::counter!(SKF_ADD_COUNT_METRIC).increment(adds as u64); + metrics::counter!(SKF_REMOVE_COUNT_METRIC).increment(removes as u64); } pub fn count_eui_updates(adds: usize, removes: usize) { - metrics::counter!(EUI_ADD_COUNT_METRIC, adds as u64); - metrics::counter!(EUI_REMOVE_COUNT_METRIC, removes as u64); + metrics::counter!(EUI_ADD_COUNT_METRIC).increment(adds as u64); + metrics::counter!(EUI_REMOVE_COUNT_METRIC).increment(removes as u64); } pub fn count_devaddr_updates(adds: usize, removes: usize) { - metrics::counter!(DEVADDR_ADD_COUNT_METRIC, adds as u64); - metrics::counter!(DEVADDR_REMOVE_COUNT_METRIC, removes as u64); + metrics::counter!(DEVADDR_ADD_COUNT_METRIC).increment(adds as u64); + metrics::counter!(DEVADDR_REMOVE_COUNT_METRIC).increment(removes as u64); } pub fn route_stream_subscribe() { - metrics::increment_gauge!(STREAM_METRIC, 1.0); + metrics::gauge!(STREAM_METRIC).increment(1.0); } pub fn route_stream_unsubscribe() { - metrics::decrement_gauge!(STREAM_METRIC, 1.0); + metrics::gauge!(STREAM_METRIC).decrement(1.0); } diff --git a/iot_packet_verifier/src/burner.rs b/iot_packet_verifier/src/burner.rs index 9c8950cdf..4f0dda388 100644 --- a/iot_packet_verifier/src/burner.rs +++ b/iot_packet_verifier/src/burner.rs @@ -131,7 +131,7 @@ where payer_account.burned = payer_account.burned.saturating_sub(amount); payer_account.balance = payer_account.balance.saturating_sub(amount); - metrics::counter!("burned", amount, "payer" => payer.to_string()); + metrics::counter!("burned", "payer" => payer.to_string()).increment(amount); Ok(()) } diff --git a/iot_verifier/src/entropy_loader.rs b/iot_verifier/src/entropy_loader.rs index 5f858cf22..3fddb31e2 100644 --- a/iot_verifier/src/entropy_loader.rs +++ b/iot_verifier/src/entropy_loader.rs @@ -63,7 +63,7 @@ impl EntropyLoader { report.version as i32, ) .await?; - metrics::increment_counter!("oracles_iot_verifier_loader_entropy"); + metrics::counter!("oracles_iot_verifier_loader_entropy").increment(1); Ok(transaction) }) .await? diff --git a/iot_verifier/src/gateway_cache.rs b/iot_verifier/src/gateway_cache.rs index 4b76018e8..fe2ef2afe 100644 --- a/iot_verifier/src/gateway_cache.rs +++ b/iot_verifier/src/gateway_cache.rs @@ -26,11 +26,11 @@ impl GatewayCache { ) -> Result { match self.gateway_cache_receiver.borrow().get(address) { Some(hit) => { - metrics::increment_counter!("oracles_iot_verifier_gateway_cache_hit"); + metrics::counter!("oracles_iot_verifier_gateway_cache_hit").increment(1); Ok(hit.clone()) } None => { - metrics::increment_counter!("oracles_iot_verifier_gateway_cache_miss"); + metrics::counter!("oracles_iot_verifier_gateway_cache_miss").increment(1); Err(GatewayCacheError::GatewayNotFound(address.clone())) } } diff --git a/iot_verifier/src/region_cache.rs b/iot_verifier/src/region_cache.rs index 127e2be0e..ceab85a2c 100644 --- a/iot_verifier/src/region_cache.rs +++ b/iot_verifier/src/region_cache.rs @@ -49,12 +49,12 @@ where ) -> Result> { match self.cache.get(®ion).await { Some(hit) => { - metrics::increment_counter!("oracles_iot_verifier_region_params_cache_hit"); + metrics::counter!("oracles_iot_verifier_region_params_cache_hit").increment(1); Ok(hit.value().clone()) } _ => match self.gateways.clone().resolve_region_params(region).await { Ok(res) => { - metrics::increment_counter!("oracles_iot_verifier_region_params_cache_miss"); + metrics::counter!("oracles_iot_verifier_region_params_cache_miss").increment(1); self.cache .insert(region, res.clone(), self.refresh_interval) .await; diff --git a/iot_verifier/src/telemetry.rs b/iot_verifier/src/telemetry.rs index 2629a6a7e..8f0a110c7 100644 --- a/iot_verifier/src/telemetry.rs +++ b/iot_verifier/src/telemetry.rs @@ -26,47 +26,47 @@ pub async fn initialize(db: &Pool) -> anyhow::Result<()> { } pub fn count_packets(count: u64) { - metrics::counter!(PACKET_COUNTER, count); + metrics::counter!(PACKET_COUNTER).increment(count); } pub fn count_non_rewardable_packets(count: u64) { - metrics::counter!(NON_REWARDABLE_PACKET_COUNTER, count); + metrics::counter!(NON_REWARDABLE_PACKET_COUNTER).increment(count); } pub fn count_loader_beacons(count: u64) { - metrics::counter!(LOADER_BEACON_COUNTER, count); + metrics::counter!(LOADER_BEACON_COUNTER).increment(count); } pub fn count_loader_witnesses(count: u64) { - metrics::counter!(LOADER_WITNESS_COUNTER, count); + metrics::counter!(LOADER_WITNESS_COUNTER).increment(count); } pub fn count_loader_dropped_beacons(count: u64, labels: &[(&'static str, &'static str)]) { - metrics::counter!(LOADER_DROPPED_BEACON_COUNTER, count, labels); + metrics::counter!(LOADER_DROPPED_BEACON_COUNTER, labels).increment(count); } pub fn count_loader_dropped_witnesses(count: u64, labels: &[(&'static str, &'static str)]) { - metrics::counter!(LOADER_DROPPED_WITNESS_COUNTER, count, labels); + metrics::counter!(LOADER_DROPPED_WITNESS_COUNTER, labels).increment(count); } pub fn num_beacons(count: u64) { - metrics::gauge!(BEACON_GUAGE, count as f64); + metrics::gauge!(BEACON_GUAGE).set(count as f64); } pub fn increment_num_beacons_by(count: u64) { - metrics::increment_gauge!(BEACON_GUAGE, count as f64); + metrics::gauge!(BEACON_GUAGE).increment(count as f64); } pub fn decrement_num_beacons() { - metrics::decrement_gauge!(BEACON_GUAGE, 1.0) + metrics::gauge!(BEACON_GUAGE).decrement(1.0) } pub fn increment_invalid_witnesses(labels: &[(&'static str, &'static str)]) { - metrics::increment_counter!(INVALID_WITNESS_COUNTER, labels); + metrics::counter!(INVALID_WITNESS_COUNTER, labels).increment(1); } pub fn last_rewarded_end_time(datetime: DateTime) { - metrics::gauge!(LAST_REWARDED_END_TIME, datetime.timestamp() as f64); + metrics::gauge!(LAST_REWARDED_END_TIME).set(datetime.timestamp() as f64); } #[derive(Default)] diff --git a/iot_verifier/src/witness_updater.rs b/iot_verifier/src/witness_updater.rs index d1fa912c0..661653d06 100644 --- a/iot_verifier/src/witness_updater.rs +++ b/iot_verifier/src/witness_updater.rs @@ -24,7 +24,7 @@ struct Telemetry { impl Telemetry { fn new() -> Self { - let gauge = metrics::register_gauge!("iot_verifier_witness_updater_queue"); + let gauge = metrics::gauge!("iot_verifier_witness_updater_queue"); gauge.set(0.0); Self { queue_gauge: gauge } } diff --git a/metrics/Cargo.toml b/metrics/Cargo.toml index 5b16e567d..8eb9a10c2 100644 --- a/metrics/Cargo.toml +++ b/metrics/Cargo.toml @@ -11,5 +11,11 @@ tower = "0.4" thiserror = { workspace = true } serde = { workspace = true } tracing = { workspace = true } +tracing-subscriber = { workspace = true } metrics = { workspace = true } metrics-exporter-prometheus = { workspace = true } +futures = { workspace = true } + +[dev-dependencies] +tokio = { workspace = true } +reqwest = { workspace = true } diff --git a/metrics/src/client_requests.rs b/metrics/src/client_requests.rs new file mode 100644 index 000000000..afc802a7d --- /dev/null +++ b/metrics/src/client_requests.rs @@ -0,0 +1,205 @@ +//! Add a timing span to anything that can be instrumented and returns a Result. +//! +//! Example: +//! ```ignore +//! use poc_metrics::client_requests::ClientMetricTiming; +//! +//! async fn time_function() { +//! let x: Result = async { Ok(42) } +//! .with_timing("iot_fetch_info") +//! .await; +//! assert_eq!(42, x.unwrap()); +//! } +//! ``` +//! +//! This will result in a prometheus metric +//! >> client_request_duration_seconds{name = "iot_fetch_info", quantile="xxx"} +//! +//! Install the `ApiTimingLayer`. +//! +//! Adding `.with_span_events(FmtSpan::CLOSE)` to a regular format layer will +//! print the timing spans to stdout as well. +//! +//! Example: +//! ```ignore +//! use poc_metrics::client_requests; +//! use tracing_subscriber::fmt::format::FmtSpan; +//! use tracing_subscriber::layer::SubscriberExt; +//! use tracing_subscriber::util::SubscriberInitExt; +//! +//! tracing_subscriber::registry() +//! .with(tracing_subscriber::fmt::layer().with_span_events(FmtSpan::CLOSE)) +//! .with(client_requests::client_request_timing_layer("histogram_name")) +//! .init(); +//! ``` +use futures::{future::Inspect, Future, FutureExt}; +use std::time::Instant; +use tracing::{field::Visit, instrument::Instrumented, span, Instrument, Subscriber}; +use tracing_subscriber::{filter, layer, registry::LookupSpan, Layer}; + +const SPAN_NAME: &str = "metrics::timing"; +const RESULT_FIELD: &str = "result"; +const NAME_FIELD: &str = "name"; +const SUCCESS: &str = "ok"; +const ERROR: &str = "error"; +const UNKNOWN: &str = "unknown"; + +pub fn client_request_timing_layer(histogram_name: &'static str) -> impl layer::Layer +where + S: Subscriber + for<'a> LookupSpan<'a>, +{ + ApiTimingLayer::new(histogram_name).with_filter(filter::filter_fn(|m| m.name() == SPAN_NAME)) +} + +pub trait ClientMetricTiming: Sized + Instrument + FutureExt { + fn with_timing( + self, + name: &'static str, + ) -> Instrumented)>> + where + Self: Future> + Sized; +} + +// Impl ClientMetricTiming for all futures that return a Result +impl ClientMetricTiming for F +where + F: Future> + Sized, +{ + fn with_timing( + self, + name: &'static str, + ) -> Instrumented)>> { + // NOTE(mj): `tracing::info_span!(SPAN_NAME, {NAME_FIELD} = name, {RESULT_FIELD} = tracing::field::Empty);` + // + // Results in the error "format must be a string literal". Maybe one day + // this will be fixed in the tracing macro so we can use it likes the + // docs say. + let span = tracing::info_span!(SPAN_NAME, name, result = tracing::field::Empty); + let inner_span = span.clone(); + self.inspect(move |res| { + inner_span.record(RESULT_FIELD, res.as_ref().ok().map_or(ERROR, |_| SUCCESS)); + }) + .instrument(span) + } +} + +struct Timing { + name: Option, + start: Instant, + // ok | error | unknown + result: String, +} + +impl Timing { + fn new() -> Self { + Self { + name: None, + start: Instant::now(), + result: UNKNOWN.to_string(), + } + } + + fn record(self, histogram_name: &'static str) { + if let Some(name) = self.name { + metrics::histogram!( + histogram_name, + NAME_FIELD => name, + RESULT_FIELD => self.result + ) + .record(self.start.elapsed().as_secs_f64()) + } + } +} + +impl Visit for Timing { + fn record_debug(&mut self, _field: &tracing::field::Field, _value: &dyn std::fmt::Debug) {} + fn record_str(&mut self, field: &tracing::field::Field, value: &str) { + match field.name() { + NAME_FIELD => self.name = Some(value.to_string()), + RESULT_FIELD => self.result = value.to_string(), + _ => (), + } + } +} + +struct ApiTimingLayer { + histogram_name: &'static str, +} + +impl ApiTimingLayer { + fn new(histogram_name: &'static str) -> Self { + Self { histogram_name } + } +} + +impl tracing_subscriber::Layer for ApiTimingLayer +where + S: Subscriber + for<'a> LookupSpan<'a>, +{ + fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: layer::Context<'_, S>) { + let span = ctx.span(id).expect("Span not found, this is a bug"); + + let mut timing = Timing::new(); + attrs.values().record(&mut timing); + span.extensions_mut().insert(timing); + } + + fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: layer::Context<'_, S>) { + let span = ctx.span(id).unwrap(); + + if let Some(timing) = span.extensions_mut().get_mut::() { + values.record(timing); + }; + } + + fn on_close(&self, id: tracing::Id, ctx: layer::Context) { + let span = ctx.span(&id).unwrap(); + + if let Some(timing) = span.extensions_mut().remove::() { + timing.record(self.histogram_name); + }; + } +} + +#[cfg(test)] +mod tests { + use super::ClientMetricTiming; + use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + + #[tokio::test] + async fn test_telemetry() -> Result<(), Box> { + tracing_subscriber::registry() + // Uncomment to view traces and Spans closing + // .with( + // tracing_subscriber::fmt::layer() + // .with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE), + // ) + .with(super::client_request_timing_layer("histogram_name")) + .init(); + + // Let the OS assign a port + let addr = { + let listener = std::net::TcpListener::bind("127.0.0.1:0").unwrap(); + listener.local_addr()? + }; + tracing::info!("listening on {addr}"); + super::super::install(addr)?; + + let success = async { Ok("nothing went wrong") }; + let failure = async { Err("something went wrong") }; + let _: Result<&str, &str> = success.with_timing("success").await; + let _: Result<&str, &str> = failure.with_timing("failing").await; + + // .with_timing() can only be added to futures that return Results. + // let will_not_compile = async { 1 + 2 }.with_timing("not a result"); + + let res = reqwest::get(format!("http://{addr}")).await?; + let body = res.text().await?; + + tracing::info!("response: \n{body}"); + assert!(body.contains(r#"histogram_name_count{name="success",result="ok"} 1"#)); + assert!(body.contains(r#"histogram_name_count{name="failing",result="error"} 1"#)); + + Ok(()) + } +} diff --git a/metrics/src/lib.rs b/metrics/src/lib.rs index 5e4b6b60b..b427a4817 100644 --- a/metrics/src/lib.rs +++ b/metrics/src/lib.rs @@ -12,32 +12,26 @@ use std::{ }; use tower::{Layer, Service}; +pub mod client_requests; mod error; pub mod settings; pub fn start_metrics(settings: &Settings) -> Result { let socket: SocketAddr = settings.endpoint.parse()?; - PrometheusBuilder::new() - .with_http_listener(socket) - .install()?; - Ok(()) + install(socket) } -/// Install the Prometheus export gateway -pub fn install_metrics() { - let endpoint = - std::env::var("METRICS_SCRAPE_ENDPOINT").unwrap_or_else(|_| String::from("0.0.0.0:9000")); - let socket: SocketAddr = endpoint - .parse() - .expect("Invalid METRICS_SCRAPE_ENDPOINT value"); +fn install(socket_addr: SocketAddr) -> Result { if let Err(e) = PrometheusBuilder::new() - .with_http_listener(socket) + .with_http_listener(socket_addr) .install() { tracing::error!(target: "poc", "Failed to install Prometheus scrape endpoint: {e}"); } else { - tracing::info!(target: "poc", "Metrics scrape endpoint listening on {endpoint}"); + tracing::info!(target: "poc", "Metrics scrape endpoint listening on {socket_addr}"); } + + Ok(()) } /// Measure the duration of a block and record it @@ -49,7 +43,7 @@ macro_rules! record_duration { ( $metric_name:expr, $e:expr ) => {{ let timer = std::time::Instant::now(); let res = $e; - ::metrics::histogram!($metric_name, timer.elapsed()); + ::metrics::histogram!($metric_name).record(timer.elapsed()); res }}; } @@ -126,7 +120,7 @@ where let metric_name_time = self.metric_name_time; let timer = std::time::Instant::now(); - metrics::increment_gauge!(metric_name_count, 1.0); + metrics::gauge!(metric_name_count).increment(1.0); let clone = self.inner.clone(); // take the service that was ready @@ -134,11 +128,11 @@ where Box::pin(async move { let res = inner.call(req).await; - metrics::decrement_gauge!(metric_name_count, 1.0); + metrics::gauge!(metric_name_count).decrement(1.0); let elapsed_time = timer.elapsed(); tracing::debug!("request processed in {elapsed_time:?}"); // TODO What units to use? Is f64 seconds appropriate? - ::metrics::histogram!(metric_name_time, elapsed_time.as_secs_f64()); + metrics::histogram!(metric_name_time).record(elapsed_time.as_secs_f64()); res }) } diff --git a/mobile_config/src/telemetry.rs b/mobile_config/src/telemetry.rs index 3238d8748..550b2df21 100644 --- a/mobile_config/src/telemetry.rs +++ b/mobile_config/src/telemetry.rs @@ -3,9 +3,9 @@ const GATEWAY_CHAIN_LOOKUP_METRIC: &str = concat!(env!("CARGO_PKG_NAME"), "-", "gateway-chain-lookup"); pub fn count_request(service: &'static str, rpc: &'static str) { - metrics::increment_counter!(RPC_METRIC, "service" => service, "rpc" => rpc); + metrics::counter!(RPC_METRIC, "service" => service, "rpc" => rpc).increment(1); } pub fn count_gateway_chain_lookup(result: &'static str) { - metrics::increment_counter!(GATEWAY_CHAIN_LOOKUP_METRIC, "result" => result); + metrics::counter!(GATEWAY_CHAIN_LOOKUP_METRIC, "result" => result).increment(1); } diff --git a/mobile_packet_verifier/src/burner.rs b/mobile_packet_verifier/src/burner.rs index ffb6a0c0f..6cf3c7f41 100644 --- a/mobile_packet_verifier/src/burner.rs +++ b/mobile_packet_verifier/src/burner.rs @@ -96,13 +96,15 @@ where tracing::info!(%total_dcs, %payer, "Burning DC"); if self.burn_data_credits(&payer, total_dcs).await.is_err() { // We have failed to burn data credits: - metrics::counter!("burned", total_dcs, "payer" => payer.to_string(), "success" => "false"); + metrics::counter!("burned", "payer" => payer.to_string(), "success" => "false") + .increment(total_dcs); continue; } // We succesfully managed to burn data credits: - metrics::counter!("burned", total_dcs, "payer" => payer.to_string(), "success" => "true"); + metrics::counter!("burned", "payer" => payer.to_string(), "success" => "true") + .increment(total_dcs); // Delete from the data transfer session and write out to S3 diff --git a/mobile_verifier/src/coverage.rs b/mobile_verifier/src/coverage.rs index 3290abe94..860005154 100644 --- a/mobile_verifier/src/coverage.rs +++ b/mobile_verifier/src/coverage.rs @@ -178,16 +178,16 @@ impl CoverageDaemon { self.oracle_boosting_sink.commit().await?; loop { - #[rustfmt::skip] tokio::select! { _ = shutdown.clone() => { tracing::info!("CoverageDaemon shutting down"); break; } Some(file) = self.coverage_objs.recv() => { - let start = Instant::now(); - self.process_file(file).await?; - metrics::histogram!("coverage_object_processing_time", start.elapsed()); + let start = Instant::now(); + self.process_file(file).await?; + metrics::histogram!("coverage_object_processing_time") + .record(start.elapsed()); } } } diff --git a/mobile_verifier/src/data_session.rs b/mobile_verifier/src/data_session.rs index 9873febc3..2aa744177 100644 --- a/mobile_verifier/src/data_session.rs +++ b/mobile_verifier/src/data_session.rs @@ -73,7 +73,6 @@ impl DataSessionIngestor { tracing::info!("starting DataSessionIngestor"); tokio::spawn(async move { loop { - #[rustfmt::skip] tokio::select! { biased; _ = shutdown.clone() => { @@ -81,12 +80,10 @@ impl DataSessionIngestor { break; } Some(file) = self.receiver.recv() => { - let start = Instant::now(); - self.process_file(file).await?; - metrics::histogram!( - "valid_data_transfer_session_processing_time", - start.elapsed() - ); + let start = Instant::now(); + self.process_file(file).await?; + metrics::histogram!("valid_data_transfer_session_processing_time") + .record(start.elapsed()); } } } @@ -115,7 +112,8 @@ impl DataSessionIngestor { .try_fold(transaction, |mut transaction, report| async move { let data_session = HotspotDataSession::from_valid_data_session(report, file_ts); data_session.save(&mut transaction).await?; - metrics::increment_counter!("oracles_mobile_verifier_ingest_hotspot_data_session"); + metrics::counter!("oracles_mobile_verifier_ingest_hotspot_data_session") + .increment(1); Ok(transaction) }) .await? diff --git a/mobile_verifier/src/heartbeats/cbrs.rs b/mobile_verifier/src/heartbeats/cbrs.rs index e8b0a097e..3560c1e60 100644 --- a/mobile_verifier/src/heartbeats/cbrs.rs +++ b/mobile_verifier/src/heartbeats/cbrs.rs @@ -122,7 +122,6 @@ where let location_cache = LocationCache::new(&self.pool); loop { - #[rustfmt::skip] tokio::select! { biased; _ = shutdown.clone() => { @@ -130,15 +129,16 @@ where break; } Some(file) = self.heartbeats.recv() => { - let start = Instant::now(); - self.process_file( + let start = Instant::now(); + self.process_file( file, &heartbeat_cache, &coverage_claim_time_cache, &coverage_object_cache, &location_cache, - ).await?; - metrics::histogram!("cbrs_heartbeat_processing_time", start.elapsed()); + ).await?; + metrics::histogram!("cbrs_heartbeat_processing_time") + .record(start.elapsed()); } } } diff --git a/mobile_verifier/src/heartbeats/wifi.rs b/mobile_verifier/src/heartbeats/wifi.rs index bd5337a6d..03a3701a2 100644 --- a/mobile_verifier/src/heartbeats/wifi.rs +++ b/mobile_verifier/src/heartbeats/wifi.rs @@ -119,7 +119,6 @@ where let location_cache = LocationCache::new(&self.pool); loop { - #[rustfmt::skip] tokio::select! { biased; _ = shutdown.clone() => { @@ -127,15 +126,16 @@ where break; } Some(file) = self.heartbeats.recv() => { - let start = Instant::now(); - self.process_file( + let start = Instant::now(); + self.process_file( file, &heartbeat_cache, &coverage_claim_time_cache, &coverage_object_cache, &location_cache - ).await?; - metrics::histogram!("wifi_heartbeat_processing_time", start.elapsed()); + ).await?; + metrics::histogram!("wifi_heartbeat_processing_time") + .record(start.elapsed()); } } } diff --git a/mobile_verifier/src/speedtests.rs b/mobile_verifier/src/speedtests.rs index 9f6bc3d1d..2704e2da4 100644 --- a/mobile_verifier/src/speedtests.rs +++ b/mobile_verifier/src/speedtests.rs @@ -123,7 +123,6 @@ where pub async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> { loop { - #[rustfmt::skip] tokio::select! { biased; _ = shutdown.clone() => { @@ -131,9 +130,10 @@ where break; } Some(file) = self.speedtests.recv() => { - let start = Instant::now(); - self.process_file(file).await?; - metrics::histogram!("speedtest_processing_time", start.elapsed()); + let start = Instant::now(); + self.process_file(file).await?; + metrics::histogram!("speedtest_processing_time") + .record(start.elapsed()); } } } diff --git a/mobile_verifier/src/subscriber_location.rs b/mobile_verifier/src/subscriber_location.rs index e17712a58..75c32c923 100644 --- a/mobile_verifier/src/subscriber_location.rs +++ b/mobile_verifier/src/subscriber_location.rs @@ -105,17 +105,14 @@ where async fn run(mut self, shutdown: triggered::Listener) -> anyhow::Result<()> { loop { - #[rustfmt::skip] tokio::select! { biased; _ = shutdown.clone() => break, Some(file) = self.reports_receiver.recv() => { - let start = Instant::now(); + let start = Instant::now(); self.process_file(file).await?; - metrics::histogram!( - "subscriber_location_processing_time", - start.elapsed() - ); + metrics::histogram!("subscriber_location_processing_time") + .record(start.elapsed()); } } } diff --git a/mobile_verifier/src/telemetry.rs b/mobile_verifier/src/telemetry.rs index 4bfe3be2a..224fc3ab3 100644 --- a/mobile_verifier/src/telemetry.rs +++ b/mobile_verifier/src/telemetry.rs @@ -13,9 +13,9 @@ pub async fn initialize(db: &Pool) -> anyhow::Result<()> { } pub fn last_rewarded_end_time(timestamp: DateTime) { - metrics::gauge!(LAST_REWARDED_END_TIME, timestamp.timestamp() as f64); + metrics::gauge!(LAST_REWARDED_END_TIME).set(timestamp.timestamp() as f64); } pub fn data_transfer_rewards_scale(scale: f64) { - metrics::gauge!(DATA_TRANSFER_REWARDS_SCALE, scale); + metrics::gauge!(DATA_TRANSFER_REWARDS_SCALE).set(scale); } diff --git a/poc_entropy/src/server.rs b/poc_entropy/src/server.rs index be1843a26..e771afffb 100644 --- a/poc_entropy/src/server.rs +++ b/poc_entropy/src/server.rs @@ -18,7 +18,7 @@ impl PocEntropy for EntropyServer { _request: tonic::Request, ) -> Result, tonic::Status> { let entropy = &*self.entropy_watch.borrow(); - metrics::increment_counter!("entropy_server_get_count"); + metrics::counter!("entropy_server_get_count").increment(1); Ok(tonic::Response::new(entropy.into())) } } diff --git a/price/src/metrics.rs b/price/src/metrics.rs index f13813e8a..c39b53996 100644 --- a/price/src/metrics.rs +++ b/price/src/metrics.rs @@ -12,9 +12,9 @@ impl Metrics { } fn increment_counter(counter: String, token_type: BlockchainTokenTypeV1) { - metrics::increment_counter!(counter, "token_type" => token_type.as_str_name()); + metrics::counter!(counter, "token_type" => token_type.as_str_name()).increment(1); } fn set_gauge(token_type: BlockchainTokenTypeV1, value: f64) { - metrics::gauge!(PRICE_GAUGE, value, "token_type" => token_type.as_str_name()); + metrics::gauge!(PRICE_GAUGE, "token_type" => token_type.as_str_name()).set(value); } diff --git a/reward_index/src/telemetry.rs b/reward_index/src/telemetry.rs index 7589b68df..4437b9ed1 100644 --- a/reward_index/src/telemetry.rs +++ b/reward_index/src/telemetry.rs @@ -16,7 +16,7 @@ pub async fn last_reward_processed_time( db: &Pool, datetime: DateTime, ) -> anyhow::Result<()> { - metrics::gauge!(LAST_REWARD_PROCESSED_TIME, datetime.timestamp() as f64); + metrics::gauge!(LAST_REWARD_PROCESSED_TIME).set(datetime.timestamp() as f64); meta::store(db, LAST_REWARD_PROCESSED_TIME, datetime.timestamp()).await?; Ok(()) diff --git a/solana/src/burn.rs b/solana/src/burn.rs index d31db7233..3103a4cc2 100644 --- a/solana/src/burn.rs +++ b/solana/src/burn.rs @@ -141,9 +141,9 @@ impl SolanaNetwork for SolanaRpc { if self.payers_to_monitor.contains(payer) { metrics::gauge!( "balance", - account_layout.amount as f64, "payer" => payer.to_string() - ); + ) + .set(account_layout.amount as f64); } Ok(account_layout.amount)