diff --git a/fhevm-engine/Cargo.lock b/fhevm-engine/Cargo.lock index e2931421..056ba87d 100644 --- a/fhevm-engine/Cargo.lock +++ b/fhevm-engine/Cargo.lock @@ -1789,7 +1789,6 @@ dependencies = [ "tonic", "tonic-build", "tonic-health", - "tonic-tracing-opentelemetry", "tonic-types", "tonic-web", ] @@ -4659,15 +4658,6 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "sharded-slab" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" -dependencies = [ - "lazy_static", -] - [[package]] name = "shlex" version = "1.3.0" @@ -5374,16 +5364,6 @@ dependencies = [ "syn 2.0.75", ] -[[package]] -name = "thread_local" -version = "1.1.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c" -dependencies = [ - "cfg-if", - "once_cell", -] - [[package]] name = "threadpool" version = "1.8.1" @@ -5600,26 +5580,6 @@ dependencies = [ "tonic", ] -[[package]] -name = "tonic-tracing-opentelemetry" -version = "0.21.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6189e979896bf663c2478b4b4d04278c356a02dc8d80228d002e5535fe976e68" -dependencies = [ - "futures-core", - "futures-util", - "http 1.1.0", - "http-body 1.0.1", - "hyper", - "opentelemetry", - "pin-project-lite", - "tonic", - "tower 0.5.1", - "tracing", - "tracing-opentelemetry", - "tracing-opentelemetry-instrumentation-sdk", -] - [[package]] name = "tonic-types" version = "0.12.1" @@ -5755,59 +5715,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" dependencies = [ "once_cell", - "valuable", -] - -[[package]] -name = "tracing-log" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" -dependencies = [ - "log", - "once_cell", - "tracing-core", -] - -[[package]] -name = "tracing-opentelemetry" -version = "0.26.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5eabc56d23707ad55ba2a0750fc24767125d5a0f51993ba41ad2c441cc7b8dea" -dependencies = [ - "js-sys", - "once_cell", - "opentelemetry", - "opentelemetry_sdk", - "smallvec", - "tracing", - "tracing-core", - "tracing-log", - "tracing-subscriber", - "web-time", -] - -[[package]] -name = "tracing-opentelemetry-instrumentation-sdk" -version = "0.21.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "612243becba145498d15c8ff9d41f333ca27b343ee874909d964cc2aed3a013f" -dependencies = [ - "http 1.1.0", - "opentelemetry", - "tracing", - "tracing-opentelemetry", -] - -[[package]] -name = "tracing-subscriber" -version = "0.3.18" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b" -dependencies = [ - "sharded-slab", - "thread_local", - "tracing-core", ] [[package]] @@ -6122,16 +6029,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "web-time" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" -dependencies = [ - "js-sys", - "wasm-bindgen", -] - [[package]] name = "webpki-roots" version = "0.25.4" diff --git a/fhevm-engine/coprocessor/Cargo.toml b/fhevm-engine/coprocessor/Cargo.toml index d8e30fd3..8fb9d1ab 100644 --- a/fhevm-engine/coprocessor/Cargo.toml +++ b/fhevm-engine/coprocessor/Cargo.toml @@ -38,7 +38,6 @@ actix-web = "4.9.0" opentelemetry = "0.25" opentelemetry-otlp = "0.25" opentelemetry_sdk = { version = "0.25", features = ["rt-tokio"] } -tonic-tracing-opentelemetry = "0.21" [dev-dependencies] testcontainers = "0.21" diff --git a/fhevm-engine/coprocessor/src/db_queries.rs b/fhevm-engine/coprocessor/src/db_queries.rs index 041afb5c..a5f994cc 100644 --- a/fhevm-engine/coprocessor/src/db_queries.rs +++ b/fhevm-engine/coprocessor/src/db_queries.rs @@ -1,15 +1,20 @@ use std::collections::{BTreeSet, HashMap}; use std::str::FromStr; +use crate::server::GrpcTracer; use crate::types::{CoprocessorError, TfheTenantKeys}; use fhevm_engine_common::utils::{safe_deserialize_versioned, safe_deserialize_versioned_sks}; +use opentelemetry::trace::Span; +use opentelemetry::KeyValue; use sqlx::{query, Postgres}; /// Returns tenant id upon valid authorization request pub async fn check_if_api_key_is_valid( req: &tonic::Request, pool: &sqlx::Pool, + ctx: &GrpcTracer, ) -> Result { + let mut outer_span = ctx.child_span("check_api_key_validity"); match req.metadata().get("authorization") { Some(auth) => { let auth_header = String::from_utf8(auth.as_bytes().to_owned()) @@ -28,6 +33,7 @@ pub async fn check_if_api_key_is_valid( Err(_) => return Err(CoprocessorError::Unauthorized), }; + let span = ctx.child_span("db_query_api_key"); let tenant = query!( "SELECT tenant_id FROM tenants WHERE tenant_api_key = $1", api_key @@ -35,12 +41,15 @@ pub async fn check_if_api_key_is_valid( .fetch_all(pool) .await .map_err(Into::::into)?; + drop(span); if tenant.is_empty() { return Err(CoprocessorError::Unauthorized); } - return Ok(tenant[0].tenant_id); + let tenant_id = tenant[0].tenant_id; + outer_span.set_attribute(KeyValue::new("tenant_id", tenant_id as i64)); + return Ok(tenant_id); } None => { return Err(CoprocessorError::Unauthorized); diff --git a/fhevm-engine/coprocessor/src/server.rs b/fhevm-engine/coprocessor/src/server.rs index abebdb41..bff92d03 100644 --- a/fhevm-engine/coprocessor/src/server.rs +++ b/fhevm-engine/coprocessor/src/server.rs @@ -1,4 +1,5 @@ use std::collections::{BTreeMap, BTreeSet}; +use std::error::Error; use std::num::NonZeroUsize; use std::str::FromStr; @@ -23,6 +24,9 @@ use fhevm_engine_common::tfhe_ops::{ use fhevm_engine_common::types::{FhevmError, SupportedFheCiphertexts, SupportedFheOperations}; use fhevm_engine_common::utils::safe_deserialize_versioned_sks; use lazy_static::lazy_static; +use opentelemetry::global::{BoxedSpan, BoxedTracer}; +use opentelemetry::trace::{Span, TraceContextExt, Tracer}; +use opentelemetry::KeyValue; use prometheus::{register_int_counter, IntCounter}; use sha3::{Digest, Keccak256}; use sqlx::{query, Acquire}; @@ -132,7 +136,6 @@ pub async fn run_server_iteration( }; Server::builder() - .layer(tonic_tracing_opentelemetry::middleware::server::OtelGrpcLayer::default()) .add_service( crate::server::coprocessor::fhevm_coprocessor_server::FhevmCoprocessorServer::new( service, @@ -181,6 +184,36 @@ alloy::sol! { // ], // }; +pub struct GrpcTracer { + ctx: opentelemetry::Context, + name: &'static str, + tracer: BoxedTracer, +} + +impl GrpcTracer { + pub fn child_span(&self, name: &'static str) -> BoxedSpan { + self.tracer.start_with_context(name, &self.ctx) + } + + pub fn set_error(&mut self, e: impl Error) { + self.ctx.span().set_status(opentelemetry::trace::Status::Error { description: e.to_string().into() }); + } +} + +impl Clone for GrpcTracer { + fn clone(&self) -> Self { + GrpcTracer { ctx: self.ctx.clone(), name: self.name, tracer: opentelemetry::global::tracer(self.name) } + } +} + +fn grpc_tracer(function_name: &'static str) -> GrpcTracer { + let name = "grpc_service"; + let tracer = opentelemetry::global::tracer(name); + let span = tracer.start(function_name); + let ctx = opentelemetry::Context::current_with_span(span); + GrpcTracer { ctx, tracer, name } +} + #[tonic::async_trait] impl coprocessor::fhevm_coprocessor_server::FhevmCoprocessor for CoprocessorService { async fn upload_inputs( @@ -188,7 +221,9 @@ impl coprocessor::fhevm_coprocessor_server::FhevmCoprocessor for CoprocessorServ request: tonic::Request, ) -> std::result::Result, tonic::Status> { UPLOAD_INPUTS_COUNTER.inc(); - self.upload_inputs_impl(request).await.inspect_err(|_| { + let mut tracer = grpc_tracer("upload_inputs"); + self.upload_inputs_impl(request, &tracer).await.inspect_err(|e| { + tracer.set_error(e); UPLOAD_INPUTS_ERRORS.inc(); }) } @@ -198,26 +233,25 @@ impl coprocessor::fhevm_coprocessor_server::FhevmCoprocessor for CoprocessorServ request: tonic::Request, ) -> std::result::Result, tonic::Status> { ASYNC_COMPUTE_COUNTER.inc(); - self.async_compute_impl(request).await.inspect_err(|_| { + let mut tracer = grpc_tracer("async_compute"); + self.async_compute_impl(request, &tracer).await.inspect_err(|e| { + tracer.set_error(e); ASYNC_COMPUTE_ERRORS.inc(); }) } - async fn wait_computations( - &self, - _request: tonic::Request, - ) -> std::result::Result, tonic::Status> { - return Err(tonic::Status::unimplemented("not implemented")); - } - async fn trivial_encrypt_ciphertexts( &self, request: tonic::Request, ) -> std::result::Result, tonic::Status> { TRIVIAL_ENCRYPT_COUNTER.inc(); - self.trivial_encrypt_ciphertexts_impl(request) + let mut tracer = grpc_tracer("trivial_encrypt_ciphertexts"); + self.trivial_encrypt_ciphertexts_impl(request, &tracer) .await - .inspect_err(|_| TRIVIAL_ENCRYPT_ERRORS.inc()) + .inspect_err(|e| { + tracer.set_error(e); + TRIVIAL_ENCRYPT_ERRORS.inc(); + }) } async fn get_ciphertexts( @@ -226,7 +260,9 @@ impl coprocessor::fhevm_coprocessor_server::FhevmCoprocessor for CoprocessorServ ) -> std::result::Result, tonic::Status> { GET_CIPHERTEXTS_COUNTER.inc(); - self.get_ciphertexts_impl(request).await.inspect_err(|_| { + let mut tracer = grpc_tracer("get_ciphertexts"); + self.get_ciphertexts_impl(request, &tracer).await.inspect_err(|e| { + tracer.set_error(e); GET_CIPHERTEXTS_ERRORS.inc(); }) } @@ -236,10 +272,11 @@ impl CoprocessorService { async fn upload_inputs_impl( &self, request: tonic::Request, + tracer: &GrpcTracer, ) -> std::result::Result, tonic::Status> { UPLOAD_INPUTS_COUNTER.inc(); - let tenant_id = check_if_api_key_is_valid(&request, &self.pool).await?; + let tenant_id = check_if_api_key_is_valid(&request, &self.pool, &tracer).await?; let req = request.get_ref(); if req.input_ciphertexts.len() > self.args.maximimum_compact_inputs_upload { @@ -321,23 +358,51 @@ impl CoprocessorService { for (idx, ci) in req.input_ciphertexts.iter().enumerate() { let cloned_input = ci.clone(); let server_key = server_key.clone(); + let tracer = tracer.clone(); + + let mut blocking_span = tracer.child_span("blocking_ciphertext_list_expand"); + blocking_span.set_attributes(vec![ + KeyValue::new("idx", idx as i64), + ]); tfhe_work_set.spawn_blocking( move || -> Result<_, (Box<(dyn std::error::Error + Send + Sync)>, usize)> { + + let mut span = tracer.child_span("set_server_key"); tfhe::set_server_key(server_key.clone()); + span.end(); + + let mut span = tracer.child_span("keccak_256_hash"); + let mut state = Keccak256::new(); + state.update(&cloned_input.input_payload); + let blob_hash = state.finalize().to_vec(); + assert_eq!(blob_hash.len(), 32, "should be 32 bytes"); + span.end(); + + let mut span = tracer.child_span("expand_ciphertext_list"); let expanded = try_expand_ciphertext_list(&cloned_input.input_payload) .map_err(|e| { let err: Box<(dyn std::error::Error + Send + Sync)> = Box::new(e); (err, idx) })?; - Ok((expanded, idx)) + span.set_attributes(vec![ + KeyValue::new("idx", idx as i64), + KeyValue::new("count", expanded.len() as i64), + KeyValue::new("input_hash", format!("0x{}", hex::encode(&blob_hash))), + ]); + span.end(); + + blocking_span.end(); + + Ok((expanded, idx, blob_hash)) }, ); } - let mut results: BTreeMap> = BTreeMap::new(); + let mut span = tracer.child_span("ciphertext_list_expand_wait"); + let mut results: BTreeMap, Vec)> = BTreeMap::new(); while let Some(output) = tfhe_work_set.join_next().await { - let (cts, idx) = output + let (cts, idx, hash) = output .map_err(|e| { let err: Box<(dyn std::error::Error + Sync + Send)> = Box::new(e); tonic::Status::from_error(err) @@ -356,10 +421,11 @@ impl CoprocessorService { } assert!( - results.insert(idx, cts).is_none(), + results.insert(idx, (cts, hash)).is_none(), "fresh map, we passed vector ordered by indexes before" ); } + span.end(); assert_eq!( results.len(), @@ -367,21 +433,21 @@ impl CoprocessorService { "We should have all the ciphertexts now" ); + let mut span = tracer.child_span("db_input_ciphertexts_insert"); let mut trx = self .pool .begin() .await .map_err(Into::::into)?; for (idx, input_blob) in req.input_ciphertexts.iter().enumerate() { - let mut state = Keccak256::new(); - state.update(&input_blob.input_payload); - let blob_hash = state.finalize().to_vec(); - assert_eq!(blob_hash.len(), 32, "should be 32 bytes"); - - let corresponding_unpacked = results + let (corresponding_unpacked, blob_hash) = results .remove(&idx) .expect("we should have all results computed now"); + let mut span = tracer.child_span("db_insert_input_blob"); + span.set_attributes(vec![ + KeyValue::new("idx", idx as i64), + ]); // save blob for audits and historical reference let _ = sqlx::query!( " @@ -397,6 +463,7 @@ impl CoprocessorService { .execute(trx.as_mut()) .await .map_err(Into::::into)?; + span.end(); let mut hash_of_ciphertext: [u8; 32] = [0; 32]; hash_of_ciphertext.copy_from_slice(&blob_hash); @@ -445,6 +512,12 @@ impl CoprocessorService { .await .map_err(|e| tonic::Status::from_error(Box::new(e)))?; + let mut span = tracer.child_span("db_insert_ciphertext"); + span.set_attributes(vec![ + KeyValue::new("blob_idx", idx as i64), + KeyValue::new("ct_idx", ct_idx as i64), + KeyValue::new("handle", format!("0x{}", hex::encode(&handle))), + ]); let _ = sqlx::query!( " INSERT INTO ciphertexts( @@ -480,12 +553,17 @@ impl CoprocessorService { }); } + let mut span = tracer.child_span("eip_712_signature"); + span.set_attributes(vec![ + KeyValue::new("blob_idx", idx as i64), + ]); let signing_hash = ct_verification.eip712_signing_hash(&eip_712_domain); let eip_712_signature = self.signer.sign_hash_sync(&signing_hash).map_err(|e| { CoprocessorError::Eip712SigningFailure { error: e.to_string(), } })?; + span.end(); ct_resp.eip712_signature = eip_712_signature.as_bytes().to_vec(); @@ -493,6 +571,7 @@ impl CoprocessorService { } trx.commit().await.map_err(Into::::into)?; + span.end(); Ok(tonic::Response::new(response)) } @@ -500,6 +579,7 @@ impl CoprocessorService { async fn async_compute_impl( &self, request: tonic::Request, + tracer: &GrpcTracer, ) -> std::result::Result, tonic::Status> { let req = request.get_ref(); if req.computations.len() > self.args.server_maximum_ciphertexts_to_schedule { @@ -511,20 +591,25 @@ impl CoprocessorService { ))); } - let tenant_id = check_if_api_key_is_valid(&request, &self.pool).await?; + let tenant_id = check_if_api_key_is_valid(&request, &self.pool, &tracer).await?; if req.computations.is_empty() { return Ok(tonic::Response::new(GenericResponse { response_code: 0 })); } + + let mut span = tracer.child_span("sort_computations_by_dependencies"); // computations are now sorted based on dependencies or error should have // been returned if there's circular dependency let (sorted_computations, handles_to_check_in_db) = sort_computations_by_dependencies(&req.computations)?; + span.end(); // to insert to db + let mut span = tracer.child_span("check_if_ciphertexts_exist_in_db"); let mut ct_types = check_if_ciphertexts_exist_in_db(handles_to_check_in_db, tenant_id, &self.pool).await?; + span.end(); let mut computations_inputs: Vec>> = Vec::with_capacity(sorted_computations.len()); let mut computations_outputs: Vec> = Vec::with_capacity(sorted_computations.len()); @@ -579,6 +664,7 @@ impl CoprocessorService { .is_none()); } + let mut tx_span = tracer.child_span("db_transaction"); let mut trx = self .pool .begin() @@ -593,6 +679,10 @@ impl CoprocessorService { let fhe_operation: i16 = comp.operation.try_into().map_err(|_| { CoprocessorError::FhevmError(FhevmError::UnknownFheOperation(comp.operation)) })?; + let mut span = tracer.child_span("insert_computation"); + span.set_attributes(vec![ + KeyValue::new("handle", format!("0x{}", hex::encode(&comp.output_handle))) + ]); let res = query!( " INSERT INTO computations( @@ -617,25 +707,30 @@ impl CoprocessorService { .execute(trx.as_mut()) .await .map_err(Into::::into)?; + span.end(); if res.rows_affected() > 0 { new_work_available = true; } } if new_work_available { + let mut span = tracer.child_span("db_new_work_notification"); query!("NOTIFY work_available") .execute(trx.as_mut()) .await .map_err(Into::::into)?; + span.end(); } trx.commit().await.map_err(Into::::into)?; + tx_span.end(); return Ok(tonic::Response::new(GenericResponse { response_code: 0 })); } async fn trivial_encrypt_ciphertexts_impl( &self, request: tonic::Request, + tracer: &GrpcTracer, ) -> std::result::Result, tonic::Status> { - let tenant_id = check_if_api_key_is_valid(&request, &self.pool).await?; + let tenant_id = check_if_api_key_is_valid(&request, &self.pool, &tracer).await?; let req = request.get_ref(); let mut unique_handles: BTreeSet<&[u8]> = BTreeSet::new(); @@ -650,6 +745,7 @@ impl CoprocessorService { } } + let mut span = tracer.child_span("db_query_server_key"); let mut sks = sqlx::query!( " SELECT sks_key @@ -661,20 +757,29 @@ impl CoprocessorService { .fetch_all(&self.pool) .await .map_err(Into::::into)?; + span.end(); assert_eq!(sks.len(), 1); let sks = sks.pop().unwrap(); let cloned = req.values.clone(); + let inner_tracer = tracer.clone(); + let mut outer_span = tracer.child_span("blocking_trivial_encrypt"); let out_cts = tokio::task::spawn_blocking(move || { + let mut span = inner_tracer.child_span("deserialize_and_set_sks"); let server_key: tfhe::ServerKey = safe_deserialize_versioned_sks(&sks.sks_key).unwrap(); tfhe::set_server_key(server_key); + span.end(); // single threaded implementation, we can optimize later let mut res: Vec<(Vec, i16, Vec)> = Vec::with_capacity(cloned.len()); for v in cloned { + let mut span = inner_tracer.child_span("trivial_encrypt"); let ct = trivial_encrypt_be_bytes(v.output_type as i16, &v.be_value); + span.end(); + let mut span = inner_tracer.child_span("compress_ciphertext"); let (ct_type, ct_bytes) = ct.compress(); + span.end(); res.push((v.handle, ct_type, ct_bytes)); } @@ -682,7 +787,9 @@ impl CoprocessorService { }) .await .unwrap(); + outer_span.end(); + let mut tx_span = tracer.child_span("db_transaction_insert_ciphertexts"); let mut conn = self .pool .acquire() @@ -691,6 +798,11 @@ impl CoprocessorService { let mut trx = conn.begin().await.map_err(Into::::into)?; for (handle, db_type, db_bytes) in out_cts { + let mut span = tracer.child_span("db_insert_ciphertext"); + span.set_attributes(vec![ + KeyValue::new("handle", format!("0x{}", hex::encode(&handle))), + KeyValue::new("ciphertext_type", db_type as i64), + ]); sqlx::query!(" INSERT INTO ciphertexts(tenant_id, handle, ciphertext, ciphertext_version, ciphertext_type) VALUES ($1, $2, $3, $4, $5) @@ -699,9 +811,11 @@ impl CoprocessorService { tenant_id, handle, db_bytes, current_ciphertext_version(), db_type as i16 ) .execute(trx.as_mut()).await.map_err(Into::::into)?; + span.end(); } trx.commit().await.map_err(Into::::into)?; + tx_span.end(); return Ok(tonic::Response::new(GenericResponse { response_code: 0 })); } @@ -709,9 +823,10 @@ impl CoprocessorService { async fn get_ciphertexts_impl( &self, request: tonic::Request, + tracer: &GrpcTracer, ) -> std::result::Result, tonic::Status> { - let tenant_id = check_if_api_key_is_valid(&request, &self.pool).await?; + let tenant_id = check_if_api_key_is_valid(&request, &self.pool, &tracer).await?; let req = request.get_ref(); if req.handles.len() > self.args.server_maximum_ciphertexts_to_get { @@ -734,6 +849,8 @@ impl CoprocessorService { let cts: Vec> = set.into_iter().collect(); + let mut span = tracer.child_span("query_ciphertexts"); + span.set_attribute(KeyValue::new("count", cts.len() as i64)); let db_cts = query!( " SELECT handle, ciphertext_type, ciphertext_version, ciphertext @@ -747,6 +864,7 @@ impl CoprocessorService { .fetch_all(&self.pool) .await .map_err(Into::::into)?; + span.end(); let mut the_map: BTreeMap, _> = BTreeMap::new(); for ct in db_cts { diff --git a/proto/coprocessor.proto b/proto/coprocessor.proto index 30e6de88..7502a2e6 100644 --- a/proto/coprocessor.proto +++ b/proto/coprocessor.proto @@ -11,7 +11,6 @@ import "common.proto"; service FhevmCoprocessor { rpc AsyncCompute (AsyncComputeRequest) returns (GenericResponse) {} - rpc WaitComputations (AsyncComputeRequest) returns (FhevmResponses) {} rpc UploadInputs (InputUploadBatch) returns (InputUploadResponse) {} rpc GetCiphertexts (GetCiphertextBatch) returns (GetCiphertextResponse) {} rpc TrivialEncryptCiphertexts (TrivialEncryptBatch) returns (GenericResponse) {}