Skip to content

Commit

Permalink
feat: remote tracing context (#669)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mirko-von-Leipzig authored Feb 6, 2025
1 parent 87b4a70 commit 018dde3
Show file tree
Hide file tree
Showing 11 changed files with 158 additions and 14 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Enhancements

- Add an optional open-telemetry trace exporter (#659).
- Support tracing across gRPC boundaries using remote tracing context (#669).

### Changes

Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 10 additions & 5 deletions crates/block-producer/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use miden_node_proto::generated::{
use miden_node_utils::{
errors::ApiError,
formatting::{format_input_notes, format_output_notes},
tracing::grpc::OtelInterceptor,
};
use miden_objects::{
block::BlockNumber, transaction::ProvenTransaction, utils::serde::Deserializable,
Expand Down Expand Up @@ -52,11 +53,14 @@ impl BlockProducer {
pub async fn init(config: BlockProducerConfig) -> Result<Self, ApiError> {
info!(target: COMPONENT, %config, "Initializing server");

let store = StoreClient::new(
store_client::ApiClient::connect(config.store_url.to_string())
.await
.map_err(|err| ApiError::DatabaseConnectionFailed(err.to_string()))?,
);
let channel = tonic::transport::Endpoint::try_from(config.store_url.to_string())
.map_err(|err| ApiError::InvalidStoreUrl(err.to_string()))?
.connect()
.await
.map_err(|err| ApiError::DatabaseConnectionFailed(err.to_string()))?;

let store = store_client::ApiClient::with_interceptor(channel, OtelInterceptor);
let store = StoreClient::new(store);

let latest_header = store
.latest_header()
Expand Down Expand Up @@ -208,6 +212,7 @@ impl BlockProducerRpcServer {

async fn serve(self, listener: TcpListener) -> Result<(), tonic::transport::Error> {
tonic::transport::Server::builder()
.trace_fn(miden_node_utils::tracing::grpc::block_producer_trace_fn)
.add_service(api_server::ApiServer::new(self))
.serve_with_incoming(TcpListenerStream::new(listener))
.await
Expand Down
10 changes: 6 additions & 4 deletions crates/block-producer/src/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use miden_node_proto::{
},
AccountState,
};
use miden_node_utils::formatting::format_opt;
use miden_node_utils::{formatting::format_opt, tracing::grpc::OtelInterceptor};
use miden_objects::{
account::AccountId,
block::{Block, BlockHeader, BlockNumber},
Expand All @@ -29,7 +29,7 @@ use miden_objects::{
Digest,
};
use miden_processor::crypto::RpoDigest;
use tonic::transport::Channel;
use tonic::{service::interceptor::InterceptedService, transport::Channel};
use tracing::{debug, info, instrument};

use crate::{block::BlockInputs, errors::StoreError, COMPONENT};
Expand Down Expand Up @@ -121,17 +121,19 @@ impl TryFrom<GetTransactionInputsResponse> for TransactionInputs {
// STORE CLIENT
// ================================================================================================

type InnerClient = store_client::ApiClient<InterceptedService<Channel, OtelInterceptor>>;

/// Interface to the store's gRPC API.
///
/// Essentially just a thin wrapper around the generated gRPC client which improves type safety.
#[derive(Clone)]
pub struct StoreClient {
inner: store_client::ApiClient<Channel>,
inner: InnerClient,
}

impl StoreClient {
/// TODO: this should probably take store connection string and create a connection internally
pub fn new(store: store_client::ApiClient<Channel>) -> Self {
pub fn new(store: InnerClient) -> Self {
Self { inner: store }
}

Expand Down
22 changes: 17 additions & 5 deletions crates/rpc/src/server/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ use miden_node_proto::{
},
try_convert,
};
use miden_node_utils::tracing::grpc::OtelInterceptor;
use miden_objects::{
account::AccountId, crypto::hash::rpo::RpoDigest, transaction::ProvenTransaction,
utils::serde::Deserializable, Digest, MAX_NUM_FOREIGN_ACCOUNTS, MIN_PROOF_SECURITY_LEVEL,
};
use miden_tx::TransactionVerifier;
use tonic::{
service::interceptor::InterceptedService,
transport::{Channel, Error},
Request, Response, Status,
};
Expand All @@ -34,19 +36,29 @@ use crate::{config::RpcConfig, COMPONENT};
// RPC API
// ================================================================================================

type StoreClient = store_client::ApiClient<InterceptedService<Channel, OtelInterceptor>>;
type BlockProducerClient =
block_producer_client::ApiClient<InterceptedService<Channel, OtelInterceptor>>;

pub struct RpcApi {
store: store_client::ApiClient<Channel>,
block_producer: block_producer_client::ApiClient<Channel>,
store: StoreClient,
block_producer: BlockProducerClient,
}

impl RpcApi {
pub(super) async fn from_config(config: &RpcConfig) -> Result<Self, Error> {
let store = store_client::ApiClient::connect(config.store_url.to_string()).await?;
let channel = tonic::transport::Endpoint::try_from(config.store_url.to_string())?
.connect()
.await?;
let store = store_client::ApiClient::with_interceptor(channel, OtelInterceptor);
info!(target: COMPONENT, store_endpoint = config.store_url.as_str(), "Store client initialized");

let channel = tonic::transport::Endpoint::try_from(config.block_producer_url.to_string())?
.connect()
.await?;
let block_producer =
block_producer_client::ApiClient::connect(config.block_producer_url.to_string())
.await?;
block_producer_client::ApiClient::with_interceptor(channel, OtelInterceptor);

info!(
target: COMPONENT,
block_producer_endpoint = config.block_producer_url.as_str(),
Expand Down
1 change: 1 addition & 0 deletions crates/store/src/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ impl Store {
/// Note: this blocks until the server dies.
pub async fn serve(self) -> Result<(), ApiError> {
tonic::transport::Server::builder()
.trace_fn(miden_node_utils::tracing::grpc::store_trace_fn)
.add_service(self.api_service)
.serve_with_incoming(TcpListenerStream::new(self.listener))
.await
Expand Down
1 change: 1 addition & 0 deletions crates/utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ vergen = ["dep:vergen", "dep:vergen-gitcl"]
[dependencies]
anyhow = { version = "1.0" }
figment = { version = "0.10", features = ["env", "toml"] }
http = "1.2"
itertools = { workspace = true }
miden-objects = { workspace = true }
opentelemetry = "0.27"
Expand Down
3 changes: 3 additions & 0 deletions crates/utils/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,7 @@ pub enum ApiError {

#[error("connection to the database has failed: {0}")]
DatabaseConnectionFailed(String),

#[error("parsing store url failed: {0}")]
InvalidStoreUrl(String),
}
1 change: 1 addition & 0 deletions crates/utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ pub mod crypto;
pub mod errors;
pub mod formatting;
pub mod logging;
pub mod tracing;
pub mod version;
116 changes: 116 additions & 0 deletions crates/utils/src/tracing/grpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
use tracing_opentelemetry::OpenTelemetrySpanExt;

/// A [`trace_fn`](tonic::transport::server::Server) implementation for the block producer which
/// adds open-telemetry information to the span.
///
/// Creates an `info` span following the open-telemetry standard: `block-producer.rpc/{method}`.
/// Additionally also pulls in remote tracing context which allows the server trace to be connected
/// to the client's origin trace.
pub fn block_producer_trace_fn(request: &http::Request<()>) -> tracing::Span {
let span = if let Some("SubmitProvenTransaction") = request.uri().path().rsplit('/').next() {
tracing::info_span!("block-producer.rpc/SubmitProvenTransaction")
} else {
tracing::info_span!("block-producer.rpc/Unknown")
};

add_otel_span_attributes(span, request)
}

/// A [`trace_fn`](tonic::transport::server::Server) implementation for the store which adds
/// open-telemetry information to the span.
///
/// Creates an `info` span following the open-telemetry standard: `store.rpc/{method}`. Additionally
/// also pulls in remote tracing context which allows the server trace to be connected to the
/// client's origin trace.
pub fn store_trace_fn(request: &http::Request<()>) -> tracing::Span {
let span = match request.uri().path().rsplit('/').next() {
Some("ApplyBlock") => tracing::info_span!("store.rpc/ApplyBlock"),
Some("CheckNullifiers") => tracing::info_span!("store.rpc/CheckNullifiers"),
Some("CheckNullifiersByPrefix") => tracing::info_span!("store.rpc/CheckNullifiersByPrefix"),
Some("GetAccountDetails") => tracing::info_span!("store.rpc/GetAccountDetails"),
Some("GetAccountProofs") => tracing::info_span!("store.rpc/GetAccountProofs"),
Some("GetAccountStateDelta") => tracing::info_span!("store.rpc/GetAccountStateDelta"),
Some("GetBlockByNumber") => tracing::info_span!("store.rpc/GetBlockByNumber"),
Some("GetBlockHeaderByNumber") => tracing::info_span!("store.rpc/GetBlockHeaderByNumber"),
Some("GetBlockInputs") => tracing::info_span!("store.rpc/GetBlockInputs"),
Some("GetBatchInputs") => tracing::info_span!("store.rpc/GetBatchInputs"),
Some("GetNotesById") => tracing::info_span!("store.rpc/GetNotesById"),
Some("GetTransactionInputs") => tracing::info_span!("store.rpc/GetTransactionInputs"),
Some("SyncNotes") => tracing::info_span!("store.rpc/SyncNotes"),
Some("SyncState") => tracing::info_span!("store.rpc/SyncState"),
_ => tracing::info_span!("store.rpc/Unknown"),
};

add_otel_span_attributes(span, request)
}

/// Adds remote tracing context to the span.
///
/// Could be expanded in the future by adding in more open-telemetry properties.
fn add_otel_span_attributes(span: tracing::Span, request: &http::Request<()>) -> tracing::Span {
// Pull the open-telemetry parent context using the HTTP extractor. We could make a more
// generic gRPC extractor by utilising the gRPC metadata. However that
// (a) requires cloning headers,
// (b) we would have to write this ourselves, and
// (c) gRPC metadata is transferred using HTTP headers in any case.
use tracing_opentelemetry::OpenTelemetrySpanExt;
let otel_ctx = opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.extract(&MetadataExtractor(&tonic::metadata::MetadataMap::from_headers(
request.headers().clone(),
)))
});
span.set_parent(otel_ctx);

span
}

/// Injects open-telemetry remote context into traces.
#[derive(Copy, Clone)]
pub struct OtelInterceptor;

impl tonic::service::Interceptor for OtelInterceptor {
fn call(
&mut self,
mut request: tonic::Request<()>,
) -> Result<tonic::Request<()>, tonic::Status> {
let ctx = tracing::Span::current().context();
opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.inject_context(&ctx, &mut MetadataInjector(request.metadata_mut()));
});

Ok(request)
}
}

struct MetadataExtractor<'a>(&'a tonic::metadata::MetadataMap);
impl opentelemetry::propagation::Extractor for MetadataExtractor<'_> {
/// Get a value for a key from the `MetadataMap`. If the value can't be converted to &str,
/// returns None
fn get(&self, key: &str) -> Option<&str> {
self.0.get(key).and_then(|metadata| metadata.to_str().ok())
}

/// Collect all the keys from the `MetadataMap`.
fn keys(&self) -> Vec<&str> {
self.0
.keys()
.map(|key| match key {
tonic::metadata::KeyRef::Ascii(v) => v.as_str(),
tonic::metadata::KeyRef::Binary(v) => v.as_str(),
})
.collect::<Vec<_>>()
}
}

struct MetadataInjector<'a>(&'a mut tonic::metadata::MetadataMap);
impl opentelemetry::propagation::Injector for MetadataInjector<'_> {
/// Set a key and value in the `MetadataMap`. Does nothing if the key or value are not valid
/// inputs
fn set(&mut self, key: &str, value: String) {
if let Ok(key) = tonic::metadata::MetadataKey::from_bytes(key.as_bytes()) {
if let Ok(val) = tonic::metadata::MetadataValue::try_from(&value) {
self.0.insert(key, val);
}
}
}
}
1 change: 1 addition & 0 deletions crates/utils/src/tracing/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod grpc;

0 comments on commit 018dde3

Please sign in to comment.