diff --git a/postgres.sql b/postgres.sql index 0418d58..45b7e67 100644 --- a/postgres.sql +++ b/postgres.sql @@ -1,6 +1,7 @@ CREATE TABLE faucet_http_events ( request_uuid UUID, namespace TEXT, + version TEXT, target TEXT, worker_route TEXT, worker_id INT, @@ -8,13 +9,26 @@ CREATE TABLE faucet_http_events ( method TEXT, path TEXT, query_params TEXT, - version TEXT, + http_version TEXT, status SMALLINT, user_agent TEXT, elapsed BIGINT, time TIMESTAMPTZ ); -CREATE INDEX faucet_http_events_request_uuid_idx -ON faucet_http_events USING BTREE (request_uuid); +CREATE TABLE faucet_error_reporting ( + request_uuid UUID, + session_uuid UUID, -- Every Shiny Session has their own request uuid. + event_time TIMESTAMPTZ, + error_message TEXT, + context JSONB +); + +CREATE INDEX idx_faucet_http_events_request_uuid +ON faucet_http_events (request_uuid); + +CREATE INDEX idx_faucet_error_reporting_request_uuid +ON faucet_error_reporting (request_uuid); +CREATE INDEX idx_faucet_error_reporting_session_uuid +ON faucet_error_reporting (session_uuid); diff --git a/src/cli.rs b/src/cli.rs index 9e36fcc..837e181 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -187,6 +187,10 @@ pub struct Args { /// Save HTTP events on PostgreSQL under a specific namespace. #[arg(long, env = "FAUCET_TELEMETRY_NAMESPACE", default_value = "faucet")] pub telemetry_namespace: String, + + /// Represents the source code version of the service to run. This is useful for telemetry. + #[arg(long, env = "FAUCET_TELEMETRY_VERSION", default_value = None)] + pub telemetry_version: Option, } impl StartArgs { diff --git a/src/main.rs b/src/main.rs index 5ef7ce8..4b3056e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -16,7 +16,11 @@ pub async fn main() -> FaucetResult<()> { }; let telemetry = cli_args.pg_con_string.map(|pg_con| { - match TelemetryManager::start(cli_args.telemetry_namespace, pg_con) { + match TelemetryManager::start( + &cli_args.telemetry_namespace, + cli_args.telemetry_version.as_deref(), + &pg_con, + ) { Ok(telemetry) => telemetry, Err(e) => { eprintln!("Unable to start telemetry manager: {e}"); @@ -83,7 +87,7 @@ pub async fn main() -> FaucetResult<()> { if let Some(telemetry) = telemetry { log::debug!("Waiting to stop DB writes"); drop(telemetry.sender); - let _ = telemetry.join_handle.await; + let _ = telemetry.http_events_join_handle.await; } Ok(()) diff --git a/src/server/logging.rs b/src/server/logging.rs index 8ea713d..6c312ee 100644 --- a/src/server/logging.rs +++ b/src/server/logging.rs @@ -223,7 +223,7 @@ where log_data.log(); if let Some(telemetry) = &self.telemetry { - telemetry.send(log_data); + telemetry.send_http_event(log_data); } Ok(res) diff --git a/src/server/mod.rs b/src/server/mod.rs index 4f0974f..d4811cc 100644 --- a/src/server/mod.rs +++ b/src/server/mod.rs @@ -1,6 +1,6 @@ mod logging; pub use logging::{logger, LogData, LogOption}; -mod onion; +pub mod onion; mod router; mod service; use crate::{ diff --git a/src/server/onion.rs b/src/server/onion.rs index 6891479..c1d06b0 100644 --- a/src/server/onion.rs +++ b/src/server/onion.rs @@ -1,6 +1,6 @@ use std::net::IpAddr; -pub trait Service { +pub trait Service: Send + Sync { type Response; type Error; async fn call( diff --git a/src/telemetry/mod.rs b/src/telemetry/mod.rs index 19c5d04..e791327 100644 --- a/src/telemetry/mod.rs +++ b/src/telemetry/mod.rs @@ -12,20 +12,20 @@ use crate::{ #[derive(Clone)] pub struct TelemetrySender { - pub sender: UnboundedSender<(chrono::DateTime, LogData)>, + pub sender_http_events: UnboundedSender<(chrono::DateTime, LogData)>, } impl TelemetrySender { - pub fn send(&self, data: LogData) { + pub fn send_http_event(&self, data: LogData) { let timestamp = chrono::Local::now(); - let _ = self.sender.send((timestamp, data)); + let _ = self.sender_http_events.send((timestamp, data)); } } pub struct TelemetryManager { _pool: deadpool_postgres::Pool, pub sender: TelemetrySender, - pub join_handle: JoinHandle<()>, + pub http_events_join_handle: JoinHandle<()>, } fn make_tls() -> tokio_postgres_rustls::MakeRustlsConnect { @@ -40,147 +40,155 @@ type PgType = tokio_postgres::types::Type; impl TelemetryManager { pub fn start( - namespace: impl AsRef, - database_url: impl AsRef, + namespace: &str, + version: Option<&str>, + database_url: &str, ) -> FaucetResult { - let namespace = Box::::from(namespace.as_ref()); - let config = tokio_postgres::Config::from_str(database_url.as_ref())?; + let config = tokio_postgres::Config::from_str(database_url)?; let mgr_config = ManagerConfig { recycling_method: RecyclingMethod::Fast, }; let mgr = Manager::from_config(config, make_tls(), mgr_config); let pool = Pool::builder(mgr).max_size(10).build()?; - let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<(_, LogData)>(); - - let join_handle = { - let pool = pool.clone(); - tokio::task::spawn(async move { - let types = &[ - PgType::UUID, // UUID - PgType::TEXT, // Namespace - PgType::TEXT, // Target - PgType::TEXT, // Worker Route - PgType::INT4, // Worker ID - PgType::INET, // IpAddr - PgType::TEXT, // Method - PgType::TEXT, // Path - PgType::TEXT, // Query Params - PgType::TEXT, // Version - PgType::INT2, // Status - PgType::TEXT, // User Agent - PgType::INT8, // Elapsed - PgType::TIMESTAMPTZ, // TIMESTAMP - ]; - let mut logs_buffer = Vec::with_capacity(100); - let mut path_buffer = Vec::::new(); - let mut query_buffer = Vec::::new(); - let mut version_buffer = Vec::::new(); - let mut user_agent_buffer = Vec::::new(); - - 'recv: while rx.recv_many(&mut logs_buffer, 100).await > 0 { - let connection = match pool.get().await { - Ok(conn) => conn, - Err(e) => { - log::error!("Unable to acquire postgresql connection: {e}"); - continue 'recv; - } - }; - let copy_sink_res = connection - .copy_in::<_, bytes::Bytes>( - "COPY faucet_http_events FROM STDIN WITH (FORMAT binary)", - ) - .await; - - match copy_sink_res { - Ok(copy_sink) => { - let mut copy_in_writer = - tokio_postgres::binary_copy::BinaryCopyInWriter::new( - copy_sink, types, - ); - - let mut copy_in_writer = pin!(copy_in_writer); - - log::debug!( - "Writing {} http events to the database", - logs_buffer.len() - ); - - 'write: for (timespamp, log_data) in logs_buffer.drain(..) { - let uuid = &log_data.state_data.uuid; - let target = &log_data.state_data.target; - let worker_id = log_data.state_data.worker_id as i32; - let worker_route = log_data.state_data.worker_route; - let ip = &log_data.state_data.ip; - let method = &log_data.method.as_str(); - let _ = write!(path_buffer, "{}", log_data.path.path()); - let path = &std::str::from_utf8(&path_buffer).unwrap_or_default(); - let _ = write!( - query_buffer, - "{}", - log_data.path.query().unwrap_or_default() - ); - let query = &std::str::from_utf8(&query_buffer).unwrap_or_default(); - let query = if query.is_empty() { None } else { Some(query) }; - let _ = write!(version_buffer, "{:?}", log_data.version); - let version = - &std::str::from_utf8(&version_buffer).unwrap_or_default(); - let status = &log_data.status; - let user_agent = match &log_data.user_agent { - LogOption::Some(v) => v.to_str().ok(), - LogOption::None => None, - }; - - let elapsed = &log_data.elapsed; - let copy_result = copy_in_writer - .as_mut() - .write(&[ - uuid, - &namespace, - target, - &worker_route, - &worker_id, - ip, - method, - path, - &query, - version, - status, - &user_agent, - elapsed, - ×pamp, - ]) - .await; - - path_buffer.clear(); - version_buffer.clear(); - user_agent_buffer.clear(); - query_buffer.clear(); - - if let Err(e) = copy_result { - log::error!("Error writing to PostgreSQL: {e}"); - break 'write; - } - } - - let copy_in_finish_res = copy_in_writer.finish().await; - if let Err(e) = copy_in_finish_res { - log::error!("Error writing to PostgreSQL: {e}"); - continue 'recv; - } - } - Err(e) => { - log::error!(target: "telemetry", "Error writing to the database: {e}") - } - } - } - }) - }; + let (sender_http_events, http_events_join_handle) = + handle_http_events(pool.clone(), namespace, version); Ok(TelemetryManager { _pool: pool, - join_handle, - sender: TelemetrySender { sender: tx }, + http_events_join_handle, + sender: TelemetrySender { sender_http_events }, }) } } + +fn handle_http_events( + pool: Pool, + namespace: &str, + version: Option<&str>, +) -> ( + UnboundedSender<(chrono::DateTime, LogData)>, + JoinHandle<()>, +) { + let namespace = Box::::from(namespace); + let version = version.map(Box::::from); + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel::<(_, LogData)>(); + let handle = tokio::task::spawn(async move { + let types = &[ + PgType::UUID, // UUID + PgType::TEXT, // Namespace + PgType::TEXT, // Version + PgType::TEXT, // Target + PgType::TEXT, // Worker Route + PgType::INT4, // Worker ID + PgType::INET, // IpAddr + PgType::TEXT, // Method + PgType::TEXT, // Path + PgType::TEXT, // Query Params + PgType::TEXT, // HTTP Version + PgType::INT2, // Status + PgType::TEXT, // User Agent + PgType::INT8, // Elapsed + PgType::TIMESTAMPTZ, // TIMESTAMP + ]; + let mut logs_buffer = Vec::with_capacity(100); + let mut path_buffer = Vec::::new(); + let mut query_buffer = Vec::::new(); + let mut version_buffer = Vec::::new(); + let mut user_agent_buffer = Vec::::new(); + + 'recv: while rx.recv_many(&mut logs_buffer, 100).await > 0 { + let connection = match pool.get().await { + Ok(conn) => conn, + Err(e) => { + log::error!("Unable to acquire postgresql connection: {e}"); + continue 'recv; + } + }; + let copy_sink_res = connection + .copy_in::<_, bytes::Bytes>( + "COPY faucet_http_events FROM STDIN WITH (FORMAT binary)", + ) + .await; + + match copy_sink_res { + Ok(copy_sink) => { + let mut copy_in_writer = + tokio_postgres::binary_copy::BinaryCopyInWriter::new(copy_sink, types); + + let mut copy_in_writer = pin!(copy_in_writer); + + log::debug!("Writing {} http events to the database", logs_buffer.len()); + + 'write: for (timestamp, log_data) in logs_buffer.drain(..) { + let uuid = &log_data.state_data.uuid; + let target = &log_data.state_data.target; + let worker_id = log_data.state_data.worker_id as i32; + let worker_route = log_data.state_data.worker_route; + let ip = &log_data.state_data.ip; + let method = &log_data.method.as_str(); + let _ = write!(path_buffer, "{}", log_data.path.path()); + let path = &std::str::from_utf8(&path_buffer).unwrap_or_default(); + let _ = write!( + query_buffer, + "{}", + log_data.path.query().unwrap_or_default() + ); + let query = &std::str::from_utf8(&query_buffer).unwrap_or_default(); + let query = if query.is_empty() { None } else { Some(query) }; + let _ = write!(version_buffer, "{:?}", log_data.version); + let http_version = + &std::str::from_utf8(&version_buffer).unwrap_or_default(); + let status = &log_data.status; + let user_agent = match &log_data.user_agent { + LogOption::Some(v) => v.to_str().ok(), + LogOption::None => None, + }; + + let elapsed = &log_data.elapsed; + let copy_result = copy_in_writer + .as_mut() + .write(&[ + uuid, + &namespace, + &version, + target, + &worker_route, + &worker_id, + ip, + method, + path, + &query, + http_version, + status, + &user_agent, + elapsed, + ×tamp, + ]) + .await; + + path_buffer.clear(); + version_buffer.clear(); + user_agent_buffer.clear(); + query_buffer.clear(); + + if let Err(e) = copy_result { + log::error!("Error writing to PostgreSQL: {e}"); + break 'write; + } + } + + let copy_in_finish_res = copy_in_writer.finish().await; + if let Err(e) = copy_in_finish_res { + log::error!("Error writing to PostgreSQL: {e}"); + continue 'recv; + } + } + Err(e) => { + log::error!(target: "telemetry", "Error writing to the database: {e}") + } + } + } + }); + (tx, handle) +}