Skip to content

Commit

Permalink
Merge pull request #791 from helium/macpie/custom-tracing
Browse files Browse the repository at this point in the history
Macpie/custom tracing
  • Loading branch information
macpie authored May 6, 2024
2 parents 43d5a56 + 203bee0 commit d29595c
Show file tree
Hide file tree
Showing 10 changed files with 533 additions and 82 deletions.
202 changes: 185 additions & 17 deletions Cargo.lock

Large diffs are not rendered by default.

93 changes: 53 additions & 40 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,26 @@ debug = true

[workspace]
members = [
"boost_manager",
"db_store",
"denylist",
"file_store",
"ingest",
"iot_config",
"iot_packet_verifier",
"iot_verifier",
"metrics",
"mobile_config",
"mobile_config_cli",
"mobile_packet_verifier",
"mobile_verifier",
"poc_entropy",
"price",
"reward_index",
"reward_scheduler",
"solana",
"task_manager",
"boost_manager",
"custom_tracing",
"db_store",
"denylist",
"file_store",
"ingest",
"iot_config",
"iot_packet_verifier",
"iot_verifier",
"metrics",
"mobile_config",
"mobile_config_cli",
"mobile_packet_verifier",
"mobile_verifier",
"poc_entropy",
"price",
"reward_index",
"reward_scheduler",
"solana",
"task_manager",
]
resolver = "2"

Expand All @@ -32,14 +33,14 @@ edition = "2021"

[workspace.dependencies]
anchor-client = "0.29.0"
anyhow = {version = "1", features = ["backtrace"]}
bs58 = {version = "0.4", features=["check"]}
anyhow = { version = "1", features = ["backtrace"] }
bs58 = { version = "0.4", features = ["check"] }
thiserror = "1"
clap = {version = "4", features = ["derive"]}
serde = {version = "1", features=["derive"]}
clap = { version = "4", features = ["derive"] }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
http-serde = "1"
chrono = {version = "0", features = ["serde"]}
chrono = { version = "0", features = ["serde"] }
tokio = { version = "1", default-features = false, features = [
"fs",
"macros",
Expand All @@ -48,38 +49,50 @@ tokio = { version = "1", default-features = false, features = [
"rt-multi-thread",
"rt",
"process",
"time"
"time",
] }
tokio-stream = "0"
sqlx = {version = "0", features = [
sqlx = { version = "0", features = [
"postgres",
"uuid",
"decimal",
"chrono",
"migrate",
"macros",
"runtime-tokio-rustls"
]}
helium-anchor-gen = {git = "https://github.com/helium/helium-anchor-gen.git"}
helium-crypto = {version = "0.8.4", features=["sqlx-postgres", "multisig"]}
hextree = {git = "https://github.com/jaykickliter/HexTree", branch = "main", features = ["disktree"]}
helium-proto = {git = "https://github.com/helium/proto", branch = "master", features = ["services"]}
"runtime-tokio-rustls",
] }
helium-anchor-gen = { git = "https://github.com/helium/helium-anchor-gen.git" }
helium-crypto = { version = "0.8.4", features = ["sqlx-postgres", "multisig"] }
hextree = { git = "https://github.com/jaykickliter/HexTree", branch = "main", features = [
"disktree",
] }
helium-proto = { git = "https://github.com/helium/proto", branch = "master", features = [
"services",
] }
solana-client = "1.16"
solana-sdk = "1.16"
solana-program = "1.16"
spl-token = "3.5.0"
reqwest = {version = "0", default-features=false, features = ["gzip", "json", "rustls-tls"]}
reqwest = { version = "0", default-features = false, features = [
"gzip",
"json",
"rustls-tls",
] }
beacon = { git = "https://github.com/helium/proto", branch = "master" }
humantime = "2"
metrics = "0.21"
metrics-exporter-prometheus = "0"
tracing = "0"
tracing-subscriber = { version = "0", default-features=false, features = ["env-filter", "registry", "fmt"] }
tracing-subscriber = { version = "0", default-features = false, features = [
"env-filter",
"registry",
"fmt",
] }
rust_decimal = "1"
rust_decimal_macros = "1"
base64 = ">=0.21"
sha2 = "0.10"
tonic = {version = "0.10", features = ["tls", "tls-roots"]}
tonic = { version = "0.10", features = ["tls", "tls-roots"] }
http = "<=0.2"
triggered = "0"
futures = "*"
Expand All @@ -88,9 +101,9 @@ prost = "*"
pyth-sdk-solana = "=0.8"
once_cell = "1"
lazy_static = "1"
config = {version="0", default-features=false, features=["toml"]}
h3o = {version = "0", features = ["serde"]}
xorf = {version = "0", features = ["serde"] }
config = { version = "0", default-features = false, features = ["toml"] }
h3o = { version = "0", features = ["serde"] }
xorf = { version = "0", features = ["serde"] }
bytes = "*"
bincode = "1"
twox-hash = "1"
Expand All @@ -100,8 +113,8 @@ retainer = "*"
rand = "0.8"
itertools = "*"
tokio-util = "0"
uuid = {version = "1", features = ["v4", "serde"]}
tower-http = {version = "0", features = ["trace"]}
uuid = { version = "1", features = ["v4", "serde"] }
tower-http = { version = "0", features = ["trace"] }

[patch.crates-io]
sqlx = { git = "https://github.com/helium/sqlx.git", rev = "92a2268f02e0cac6fccb34d3e926347071dbb88d" }
34 changes: 34 additions & 0 deletions custom_tracing/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
[package]
name = "custom-tracing"
version = "0.1.0"
authors.workspace = true
license.workspace = true
edition.workspace = true

[dependencies]
notify = { version = "6", default-features = false }
anyhow = "1"
tokio = { version = "1", features = ["rt-multi-thread", "sync", "signal"] }
tracing = "0"
tracing-subscriber = { version = "0", default-features = true, features = [
"env-filter",
"registry",
"fmt",
] }
tower-http = { version = "0", features = ["trace"] }
tower-layer = { version = "0" }
axum = { version = "0.7", features = ["tracing"], optional = true }
helium-proto = { workspace = true, optional = true }
http = { workspace = true, optional = true }


[target.'cfg(target_os = "macos")'.dependencies]
notify = { version = "6", default-features = false, features = [
"macos_fsevent",
] }


[features]
default = []
http-1 = ["axum"]
grpc = ["helium-proto", "http"]
26 changes: 26 additions & 0 deletions custom_tracing/src/grpc_layer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use helium_proto::services::Body;
use http::request::Request;
use tower_http::{
classify::{GrpcErrorsAsFailures, SharedClassifier},
trace::{DefaultOnFailure, DefaultOnResponse, TraceLayer},
LatencyUnit,
};
use tracing::{Level, Span};

type GrpcLayer =
TraceLayer<SharedClassifier<GrpcErrorsAsFailures>, for<'a> fn(&'a http::Request<Body>) -> Span>;

pub fn new_with_span(make_span: fn(&Request<Body>) -> Span) -> GrpcLayer {
TraceLayer::new_for_grpc()
.make_span_with(make_span)
.on_response(
DefaultOnResponse::new()
.level(Level::DEBUG)
.latency_unit(LatencyUnit::Millis),
)
.on_failure(
DefaultOnFailure::new()
.level(Level::WARN)
.latency_unit(LatencyUnit::Millis),
)
}
27 changes: 27 additions & 0 deletions custom_tracing/src/http_layer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
use axum::{body::Body, http::Request};
use tower_http::{
classify::{ServerErrorsAsFailures, SharedClassifier},
trace::{DefaultOnFailure, DefaultOnResponse, TraceLayer},
LatencyUnit,
};
use tracing::{Level, Span};

pub fn new_with_span(
make_span: fn(&Request<Body>) -> Span,
) -> TraceLayer<
SharedClassifier<ServerErrorsAsFailures>,
for<'a> fn(&'a axum::http::Request<axum::body::Body>) -> Span,
> {
TraceLayer::new_for_http()
.make_span_with(make_span)
.on_response(
DefaultOnResponse::new()
.level(Level::DEBUG)
.latency_unit(LatencyUnit::Millis),
)
.on_failure(
DefaultOnFailure::new()
.level(Level::WARN)
.latency_unit(LatencyUnit::Millis),
)
}
148 changes: 148 additions & 0 deletions custom_tracing/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
use anyhow::Result;
use notify::{event::DataChange, Config, RecommendedWatcher, RecursiveMode, Watcher};
use std::{fs, path::Path};
use tracing::Span;
use tracing_subscriber::{
layer::SubscriberExt,
reload::{self, Handle},
util::SubscriberInitExt,
EnvFilter, Registry,
};

#[cfg(feature = "grpc")]
pub mod grpc_layer;
#[cfg(feature = "http-1")]
pub mod http_layer;

pub async fn init(og_filter: String, tracing_cfg_file: String) -> Result<()> {
let (filtered_layer, reload_handle) =
reload::Layer::new(tracing_subscriber::EnvFilter::new(og_filter.clone()));

tracing_subscriber::registry()
.with(filtered_layer)
.with(tracing_subscriber::fmt::layer())
.init();

tokio::spawn(async move {
let state = State {
og_filter: og_filter.clone(),
tracing_cfg_file,
reload_handle,
};
if let Err(err) = state.watch().await {
tracing::warn!(?err, "tracing error watching configuration for update")
}
});

tracing::info!("custom tracing installed");

Ok(())
}

pub fn record<T>(field: &str, value: T)
where
T: std::fmt::Display,
{
Span::current().record(field, &tracing::field::display(value));
}

#[derive(Clone)]
pub struct State {
pub og_filter: String,
pub tracing_cfg_file: String,
pub reload_handle: Handle<EnvFilter, Registry>,
}

impl State {
async fn watch(&self) -> Result<()> {
let (tx, mut rx) = tokio::sync::mpsc::channel(10);

let mut watcher = RecommendedWatcher::new(
move |res| {
tx.blocking_send(res).expect("Failed to send event");
},
Config::default(),
)?;

watcher.watch(".".as_ref(), RecursiveMode::NonRecursive)?;

while let Some(res) = rx.recv().await {
match res {
Err(err) => tracing::warn!(?err, "tracing config watcher configuration file error"),
Ok(event) => match event.kind {
notify::EventKind::Modify(notify::event::ModifyKind::Data(
DataChange::Content,
)) => {
if let Some(event_path) = event.paths.first() {
if Path::new(event_path).exists() {
match fs::read_to_string(event_path) {
Err(_err) => tracing::warn!(
?_err,
"tracing config watcher failed to read file"
),
Ok(content) => {
if file_match(event_path, self.tracing_cfg_file.clone()) {
self.handle_change(content)?;
}
}
}
}
}
}
notify::EventKind::Remove(notify::event::RemoveKind::File) => {
if let Some(event_path) = event.paths.first() {
if file_match(event_path, self.tracing_cfg_file.clone()) {
self.handle_delete()?;
}
}
}
_event => {
tracing::debug!(?_event, "tracing config watcher ignored unhandled message")
}
},
}
}

Ok(())
}

fn handle_change(&self, content: String) -> Result<()> {
if content.is_empty() {
self.handle_delete()
} else {
match tracing_subscriber::EnvFilter::try_new(content.clone()) {
Err(_err) => tracing::warn!(
filter = content,
?_err,
"tracing config watcher failed to parse filter"
),
Ok(new_filter) => {
self.reload_handle.modify(|filter| *filter = new_filter)?;
tracing::info!(filter = content, "custom tracing config updated");
}
}
Ok(())
}
}

fn handle_delete(&self) -> Result<()> {
let new_filter = self.og_filter.clone();

self.reload_handle
.modify(|filter| *filter = tracing_subscriber::EnvFilter::new(new_filter.clone()))?;

tracing::info!(
filter = new_filter,
"tracing config watcher file deleted, reverting to rustlog filter"
);
Ok(())
}
}

fn file_match(event_path: &Path, file: String) -> bool {
event_path
.to_str()
.map(|path| path.split('/'))
.map(|path| path.last().is_some_and(|file_name| file_name == file))
.unwrap_or_default()
}
Loading

0 comments on commit d29595c

Please sign in to comment.