From 6ef06c962ea0b3b84d98be4043dd07e9cb72ba92 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 28 Apr 2025 14:26:32 -0700 Subject: [PATCH 01/14] [omdb] quick webhook receiver list command --- dev-tools/omdb/src/bin/omdb/db.rs | 131 ++++++++++++++++++++++++++++++ 1 file changed, 131 insertions(+) diff --git a/dev-tools/omdb/src/bin/omdb/db.rs b/dev-tools/omdb/src/bin/omdb/db.rs index 76cc39c860a..77dcd2b104e 100644 --- a/dev-tools/omdb/src/bin/omdb/db.rs +++ b/dev-tools/omdb/src/bin/omdb/db.rs @@ -142,6 +142,8 @@ use omicron_common::api::external::DataPageParams; use omicron_common::api::external::Generation; use omicron_common::api::external::InstanceState; use omicron_common::api::external::MacAddr; +use omicron_common::api::external::NameOrId; +use omicron_common::api::external::http_pagination::PaginatedBy; use omicron_uuid_kinds::CollectionUuid; use omicron_uuid_kinds::DatasetUuid; use omicron_uuid_kinds::DownstairsRegionUuid; @@ -383,6 +385,8 @@ enum DbCommands { Vmms(VmmListArgs), /// Print information about the oximeter collector. Oximeter(OximeterArgs), + /// Print information about webhooks + Webhook(WebhookArgs), /// Commands for querying and interacting with pools Zpool(ZpoolArgs), } @@ -1150,6 +1154,44 @@ struct SetStorageBufferArgs { storage_buffer: i64, } +struct WebhookArgs { + #[command(subcommand)] + command: WebhookCommands, +} + +#[derive(Debug, Subcommand, Clone)] +enum WebhookCommands { + /// Get information on webhook receivers. + #[clap(alias = "rx")] + Receiver { + #[command(subcommand)] + command: WebhookRxCommands, + }, + /// Get information on webhook events + Event, +} + +#[derive(Debug, Subcommand, Clone)] +enum WebhookRxCommands { + /// List webhook receivers + #[clap(alias = "ls")] + List(WebhookRxListArgs), + + #[clap(alias = "show")] + Info(WebhookRxInfoArgs), +} + +#[derive(Debug, Args, Clone)] +struct WebhookRxInfoArgs { + receiver: NameOrId, +} + +#[derive(Debug, Args, Clone)] +struct WebhookRxListArgs { + #[clap(long, short)] + start_at: Option, +} + impl DbArgs { /// Run a `omdb db` subcommand. /// @@ -1460,6 +1502,8 @@ impl DbArgs { DbCommands::Oximeter(OximeterArgs { command: OximeterCommands::ListProducers }) => cmd_db_oximeter_list_producers(&datastore, fetch_opts).await, + + DbCommands::Webhook(WebhookArgs { command }) => cmd_db_webhook(&opctx, &datastore, &fetch_opts, command).await, DbCommands::Zpool(ZpoolArgs { command: ZpoolCommands::List(args) }) => cmd_db_zpool_list(&opctx, &datastore, &args).await, @@ -8026,6 +8070,93 @@ async fn cmd_db_oximeter_list_producers( Ok(()) } +async fn cmd_db_webhook( + opctx: &OpContext, + datastore: &DataStore, + fetch_opts: &DbFetchOptions, + command: &WebhookCommands, +) -> anyhow::Result<()> { + match command { + WebhookCommands::Receiver { + command: WebhookRxCommands::List(args), + } => cmd_db_webhook_rx_list(opctx, datastore, fetch_opts, args).await, + WebhookCommands::Receiver { + command: WebhookRxCommands::Info(args), + } => cmd_db_webhook_rx_info(opctx, datastore, fetch_opts, args).await, + WebhookCommands::Event => { + Err(anyhow::anyhow!("not yet implemented, sorry!")) + } + } +} + +async fn cmd_db_webhook_rx_list( + opctx: &OpContext, + datastore: &DataStore, + fetch_opts: &DbFetchOptions, + args: &WebhookRxListArgs, +) -> anyhow::Result<()> { + let ctx = || { + if let Some(starting_at) = args.start_at { + format!("listing webhook receivers (starting at {starting_at}") + } else { + "listing webhook_receivers".to_string() + } + }; + let pagparams = DataPageParams { + marker: args.start_at.as_ref(), + ..first_page(fetch_opts.fetch_limit) + }; + let rxs = datastore + .webhook_rx_list(opctx, &PaginatedBy::Id(pagparams)) + .await + .with_context(ctx)?; + + check_limit(&rxs, fetch_opts.fetch_limit, ctx); + + #[derive(Tabled)] + #[tabled(rename_all = "SCREAMING_SNAKE_CASE")] + struct RxRow { + id: Uuid, + #[tabled(display_with = "datetime_rfc3339_concise")] + created: chrono::DateTime, + #[tabled(display_with = "datetime_rfc3339_concise")] + modified: chrono::DateTime, + secrets: usize, + events: usize, + name: String, + endpoint: String, + } + + let rows = rxs.into_iter().map( + |db::model::WebhookReceiverConfig { rx, secrets, events }| RxRow { + id: rx.id().into_untyped_uuid(), + name: rx.identity.name.to_string(), + created: rx.time_created(), + modified: rx.time_modified(), + secrets: secrets.len(), + events: events.len(), + endpoint: rx.endpoint, + }, + ); + + let table = tabled::Table::new(rows) + .with(tabled::settings::Style::empty()) + .with(tabled::settings::Padding::new(0, 1, 0, 0)) + .to_string(); + println!("{table}"); + + Ok(()) +} + +async fn cmd_db_webhook_rx_info( + _opctx: &OpContext, + _datastore: &DataStore, + _fetch_opts: &DbFetchOptions, + _args: &WebhookRxInfoArgs, +) -> anyhow::Result<()> { + anyhow::bail!("TODO: eliza, implement this one!") +} + // Format a `chrono::DateTime` in RFC3339 with milliseconds precision and using // `Z` rather than the UTC offset for UTC timestamps, to save a few characters // of line width in tabular output. From ecd0b5e205acff7c5f58f3026b49a1f817ddaa35 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 28 Apr 2025 14:44:13 -0700 Subject: [PATCH 02/14] Update db.rs --- dev-tools/omdb/src/bin/omdb/db.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/dev-tools/omdb/src/bin/omdb/db.rs b/dev-tools/omdb/src/bin/omdb/db.rs index 77dcd2b104e..648adbeac4d 100644 --- a/dev-tools/omdb/src/bin/omdb/db.rs +++ b/dev-tools/omdb/src/bin/omdb/db.rs @@ -1154,6 +1154,7 @@ struct SetStorageBufferArgs { storage_buffer: i64, } +#[derive(Debug, Args, Clone)] struct WebhookArgs { #[command(subcommand)] command: WebhookCommands, From 137fcf9bbad3143101d4c88dc101b3a526656a38 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Fri, 14 Mar 2025 12:02:09 -0700 Subject: [PATCH 03/14] [omdb] quick webhook receiver list command --- dev-tools/omdb/src/bin/omdb/db.rs | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/dev-tools/omdb/src/bin/omdb/db.rs b/dev-tools/omdb/src/bin/omdb/db.rs index 648adbeac4d..fdf7ae83fef 100644 --- a/dev-tools/omdb/src/bin/omdb/db.rs +++ b/dev-tools/omdb/src/bin/omdb/db.rs @@ -8123,20 +8123,22 @@ async fn cmd_db_webhook_rx_list( #[tabled(display_with = "datetime_rfc3339_concise")] modified: chrono::DateTime, secrets: usize, - events: usize, + subscriptions: usize, name: String, endpoint: String, } let rows = rxs.into_iter().map( - |db::model::WebhookReceiverConfig { rx, secrets, events }| RxRow { - id: rx.id().into_untyped_uuid(), - name: rx.identity.name.to_string(), - created: rx.time_created(), - modified: rx.time_modified(), - secrets: secrets.len(), - events: events.len(), - endpoint: rx.endpoint, + |db::model::WebhookReceiverConfig { rx, secrets, subscriptions }| { + RxRow { + id: rx.id().into_untyped_uuid(), + name: rx.identity.name.to_string(), + created: rx.time_created(), + modified: rx.time_modified(), + secrets: secrets.len(), + subscriptions: subscriptions.len(), + endpoint: rx.endpoint, + } }, ); From 60c8565eb06b66b6ea4d1dbf9ab1cf8524ce6d84 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 17 Mar 2025 14:48:18 -0700 Subject: [PATCH 04/14] [omdb] add initial webhook receiver OMDB commands This commit adds some quick OMDB commands for listing and displaying webhook receivers. Subsequent commits will also add support for inspecting event deliveries to receivers, but I wanted to get the basic stuff in first. --- dev-tools/omdb/src/bin/omdb/db.rs | 224 +++++++++++++++++++++++++- dev-tools/omdb/tests/usage_errors.out | 2 + 2 files changed, 220 insertions(+), 6 deletions(-) diff --git a/dev-tools/omdb/src/bin/omdb/db.rs b/dev-tools/omdb/src/bin/omdb/db.rs index fdf7ae83fef..dd897bf4340 100644 --- a/dev-tools/omdb/src/bin/omdb/db.rs +++ b/dev-tools/omdb/src/bin/omdb/db.rs @@ -8083,7 +8083,7 @@ async fn cmd_db_webhook( } => cmd_db_webhook_rx_list(opctx, datastore, fetch_opts, args).await, WebhookCommands::Receiver { command: WebhookRxCommands::Info(args), - } => cmd_db_webhook_rx_info(opctx, datastore, fetch_opts, args).await, + } => cmd_db_webhook_rx_info(datastore, fetch_opts, args).await, WebhookCommands::Event => { Err(anyhow::anyhow!("not yet implemented, sorry!")) } @@ -8152,12 +8152,224 @@ async fn cmd_db_webhook_rx_list( } async fn cmd_db_webhook_rx_info( - _opctx: &OpContext, - _datastore: &DataStore, - _fetch_opts: &DbFetchOptions, - _args: &WebhookRxInfoArgs, + datastore: &DataStore, + fetch_opts: &DbFetchOptions, + args: &WebhookRxInfoArgs, ) -> anyhow::Result<()> { - anyhow::bail!("TODO: eliza, implement this one!") + use nexus_db_schema::schema::webhook_receiver::dsl as rx_dsl; + use nexus_db_schema::schema::webhook_rx_event_glob::dsl as glob_dsl; + use nexus_db_schema::schema::webhook_rx_subscription::dsl as subscription_dsl; + use nexus_db_schema::schema::webhook_secret::dsl as secret_dsl; + + let conn = datastore.pool_connection_for_tests().await?; + let mut query = match args.receiver { + NameOrId::Id(id) => { + rx_dsl::webhook_receiver.filter(rx_dsl::id.eq(id)).into_boxed() + } + NameOrId::Name(ref name) => rx_dsl::webhook_receiver + .filter(rx_dsl::name.eq(name.to_string())) + .into_boxed(), + }; + if !fetch_opts.include_deleted { + query = query.filter(rx_dsl::time_deleted.is_null()); + } + + let rx = query + .limit(1) + .select(db::model::WebhookReceiver::as_select()) + .get_result_async(&*conn) + .await + .optional() + .with_context(|| format!("loading webhook receiver {}", args.receiver))? + .ok_or_else(|| { + anyhow::anyhow!("no instance {} exists", args.receiver) + })?; + + const ID: &'static str = "ID"; + const NAME: &'static str = "name"; + + const DESCRIPTION: &'static str = "description"; + const CREATED: &'static str = "created at"; + const DELETED: &'static str = "deleted at"; + const MODIFIED: &'static str = "modified at"; + const ENDPOINT: &'static str = "endpoint"; + const GEN: &'static str = "generation"; + const EXACT: &'static str = "exact subscriptions"; + const GLOBS: &'static str = "glob subscriptions"; + const GLOB_REGEX: &'static str = " regex"; + const GLOB_SCHEMA_VERSION: &'static str = " schema version"; + const GLOB_CREATED: &'static str = " created at"; + const GLOB_EXACT: &'static str = " exact subscriptions"; + const WIDTH: usize = const_max_len(&[ + ID, + NAME, + DESCRIPTION, + CREATED, + DELETED, + MODIFIED, + ENDPOINT, + GEN, + EXACT, + GLOBS, + GLOB_REGEX, + GLOB_SCHEMA_VERSION, + GLOB_CREATED, + GLOB_EXACT, + ]); + + let db::model::WebhookReceiver { + identity: + db::model::WebhookReceiverIdentity { + id, + name, + description, + time_created, + time_modified, + time_deleted, + }, + endpoint, + secret_gen, + subscription_gen, + } = rx; + + println!("\n{:=<80}", "== RECEIVER "); + println!(" {NAME:>WIDTH$}: {name}"); + println!(" {ID:>WIDTH$}: {id}"); + println!(" {DESCRIPTION:>WIDTH$}: {description}"); + println!(" {ENDPOINT:>WIDTH$}: {endpoint}"); + println!(); + println!(" {CREATED:>WIDTH$}: {time_created}"); + println!(" {MODIFIED:>WIDTH$}: {time_modified}"); + if let Some(deleted) = time_deleted { + println!(" {DELETED:>WIDTH$}: {deleted}"); + } + + println!("\n{:=<80}", "== SECRETS "); + println!(" {GEN:>WIDTH$}: {}", secret_gen.0); + + let query = secret_dsl::webhook_secret + .filter(secret_dsl::rx_id.eq(id.into_untyped_uuid())) + .select(db::model::WebhookSecret::as_select()); + let secrets = if fetch_opts.include_deleted { + query.load_async(&*conn).await + } else { + query + .filter(secret_dsl::time_deleted.is_null()) + .load_async(&*conn) + .await + }; + + match secrets { + Ok(secrets) => { + #[derive(Tabled)] + struct SecretRow { + id: Uuid, + + #[tabled(display_with = "datetime_rfc3339_concise")] + created: chrono::DateTime, + + #[tabled(display_with = "datetime_opt_rfc3339_concise")] + deleted: Option>, + } + let rows = secrets.into_iter().map( + |db::model::WebhookSecret { + identity: + db::model::WebhookSecretIdentity { + id, + time_modified: _, + time_created, + }, + webhook_receiver_id: _, + secret: _, + time_deleted, + }| SecretRow { + id: id.into_untyped_uuid(), + created: time_created, + deleted: time_deleted, + }, + ); + + let table = tabled::Table::new(rows) + .with(tabled::settings::Style::empty()) + .with(tabled::settings::Padding::new(0, 1, 0, 0)) + .to_string(); + println!("{table}"); + } + Err(e) => eprintln!("failed to list secrets: {e}"), + } + + println!("\n{:=<80}", "== SUBSCRIPTIONS "); + println!(" {GEN:>WIDTH$}: {}", subscription_gen.0); + + let exact = subscription_dsl::webhook_rx_subscription + .filter(subscription_dsl::rx_id.eq(id.into_untyped_uuid())) + .filter(subscription_dsl::glob.is_null()) + .select(subscription_dsl::event_class) + .load_async::(&*conn) + .await; + match exact { + Ok(exact) => { + println!(" {EXACT:>WIDTH$}: {}", exact.len()); + for event_class in exact { + println!(" - {event_class}"); + } + } + Err(e) => { + eprintln!("failed to list exact subscriptions: {e}"); + } + } + + let globs = glob_dsl::webhook_rx_event_glob + .filter(glob_dsl::rx_id.eq(id.into_untyped_uuid())) + .select(db::model::WebhookRxEventGlob::as_select()) + .load_async::(&*conn) + .await; + match globs { + Ok(globs) => { + println!(" {GLOBS:>WIDTH$}: {}", globs.len()); + for glob in globs { + let db::model::WebhookRxEventGlob { + rx_id: _, + glob: db::model::WebhookGlob { glob, regex }, + time_created, + schema_version, + } = glob; + println!(" - {glob}"); + println!(" {GLOB_CREATED:>WIDTH$}: {time_created}"); + if let Some(v) = schema_version { + println!(" {GLOB_SCHEMA_VERSION:>WIDTH$}: {v}") + } else { + println!( + "(i) {GLOB_SCHEMA_VERSION:>WIDTH$}: ", + ) + } + + println!(" {GLOB_REGEX:>WIDTH$}: {regex}"); + let exact = subscription_dsl::webhook_rx_subscription + .filter(subscription_dsl::rx_id.eq(id.into_untyped_uuid())) + .filter(subscription_dsl::glob.eq(glob)) + .select(subscription_dsl::event_class) + .load_async::(&*conn) + .await; + match exact { + Ok(exact) => { + println!(" {GLOB_EXACT:>WIDTH$}: {}", exact.len()); + for event_class in exact { + println!(" - {event_class}") + } + } + Err(e) => eprintln!( + "failed to list exact subscriptions for glob: {e}" + ), + } + } + } + Err(e) => { + eprintln!("failed to list glob subscriptions: {e}"); + } + } + + Ok(()) } // Format a `chrono::DateTime` in RFC3339 with milliseconds precision and using diff --git a/dev-tools/omdb/tests/usage_errors.out b/dev-tools/omdb/tests/usage_errors.out index 05d68e96f04..4495b43e2e2 100644 --- a/dev-tools/omdb/tests/usage_errors.out +++ b/dev-tools/omdb/tests/usage_errors.out @@ -139,6 +139,7 @@ Commands: processes vmms Alias to `omdb db vmm list` oximeter Print information about the oximeter collector + webhook Print information about webhooks zpool Commands for querying and interacting with pools help Print this message or the help of the given subcommand(s) @@ -195,6 +196,7 @@ Commands: processes vmms Alias to `omdb db vmm list` oximeter Print information about the oximeter collector + webhook Print information about webhooks zpool Commands for querying and interacting with pools help Print this message or the help of the given subcommand(s) From 619cd1a9898d766b0b056a7d32ea3e91e6fbd5a1 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 18 Mar 2025 11:30:56 -0700 Subject: [PATCH 05/14] [omdb] add `webhook deliveries list` --- dev-tools/omdb/src/bin/omdb/db.rs | 263 ++++++++++++++++-- nexus/db-model/src/webhook_delivery_state.rs | 8 + .../db-model/src/webhook_delivery_trigger.rs | 8 + nexus/types/src/external_api/views.rs | 42 ++- 4 files changed, 295 insertions(+), 26 deletions(-) diff --git a/dev-tools/omdb/src/bin/omdb/db.rs b/dev-tools/omdb/src/bin/omdb/db.rs index dd897bf4340..d8d4e28c5a0 100644 --- a/dev-tools/omdb/src/bin/omdb/db.rs +++ b/dev-tools/omdb/src/bin/omdb/db.rs @@ -104,6 +104,9 @@ use nexus_db_model::Volume; use nexus_db_model::VolumeRepair; use nexus_db_model::VolumeResourceUsage; use nexus_db_model::VpcSubnet; +use nexus_db_model::WebhookDelivery; +use nexus_db_model::WebhookEventClass; +use nexus_db_model::WebhookReceiver; use nexus_db_model::Zpool; use nexus_db_model::to_db_typed_uuid; use nexus_db_queries::context::OpContext; @@ -154,6 +157,7 @@ use omicron_uuid_kinds::PhysicalDiskUuid; use omicron_uuid_kinds::PropolisUuid; use omicron_uuid_kinds::SledUuid; use omicron_uuid_kinds::VolumeUuid; +use omicron_uuid_kinds::WebhookEventUuid; use omicron_uuid_kinds::ZpoolUuid; use sled_agent_client::VolumeConstructionRequest; use std::borrow::Cow; @@ -1162,7 +1166,7 @@ struct WebhookArgs { #[derive(Debug, Subcommand, Clone)] enum WebhookCommands { - /// Get information on webhook receivers. + /// Get information on webhook receivers #[clap(alias = "rx")] Receiver { #[command(subcommand)] @@ -1170,6 +1174,11 @@ enum WebhookCommands { }, /// Get information on webhook events Event, + /// Get information on webhook delivieries + Delivery { + #[command(subcommand)] + command: WebhookDeliveryCommands, + }, } #[derive(Debug, Subcommand, Clone)] @@ -1189,10 +1198,44 @@ struct WebhookRxInfoArgs { #[derive(Debug, Args, Clone)] struct WebhookRxListArgs { - #[clap(long, short)] + #[clap(long, short = 'a')] start_at: Option, } +#[derive(Debug, Subcommand, Clone)] +enum WebhookDeliveryCommands { + /// List webhook deliveries + #[clap(alias = "ls")] + List(WebhookDeliveryListArgs), +} + +#[derive(Debug, Args, Clone)] +struct WebhookDeliveryListArgs { + /// If present, show only deliveries to this receiver. + #[clap(long, short, alias = "rx")] + receiver: Option, + + /// If present, select only deliveries for the given event. + #[clap(long, short)] + event: Option, + + /// If present, select only deliveries in the provided state(s) + #[clap(long = "state", short)] + states: Vec, + + /// If present, select only deliveries with the provided trigger(s) + #[clap(long = "trigger", short)] + triggers: Vec, + + /// Include only delivery entries created before this timestamp + #[clap(long, short)] + before: Option>, + + /// Include only delivery entries created after this timestamp + #[clap(long, short)] + after: Option>, +} + impl DbArgs { /// Run a `omdb db` subcommand. /// @@ -8084,6 +8127,9 @@ async fn cmd_db_webhook( WebhookCommands::Receiver { command: WebhookRxCommands::Info(args), } => cmd_db_webhook_rx_info(datastore, fetch_opts, args).await, + WebhookCommands::Delivery { + command: WebhookDeliveryCommands::List(args), + } => cmd_db_webhook_delivery_list(datastore, fetch_opts, args).await, WebhookCommands::Event => { Err(anyhow::anyhow!("not yet implemented, sorry!")) } @@ -8156,33 +8202,16 @@ async fn cmd_db_webhook_rx_info( fetch_opts: &DbFetchOptions, args: &WebhookRxInfoArgs, ) -> anyhow::Result<()> { - use nexus_db_schema::schema::webhook_receiver::dsl as rx_dsl; use nexus_db_schema::schema::webhook_rx_event_glob::dsl as glob_dsl; use nexus_db_schema::schema::webhook_rx_subscription::dsl as subscription_dsl; use nexus_db_schema::schema::webhook_secret::dsl as secret_dsl; let conn = datastore.pool_connection_for_tests().await?; - let mut query = match args.receiver { - NameOrId::Id(id) => { - rx_dsl::webhook_receiver.filter(rx_dsl::id.eq(id)).into_boxed() - } - NameOrId::Name(ref name) => rx_dsl::webhook_receiver - .filter(rx_dsl::name.eq(name.to_string())) - .into_boxed(), - }; - if !fetch_opts.include_deleted { - query = query.filter(rx_dsl::time_deleted.is_null()); - } - - let rx = query - .limit(1) - .select(db::model::WebhookReceiver::as_select()) - .get_result_async(&*conn) + let rx = lookup_webhook_rx(datastore, &args.receiver) .await - .optional() .with_context(|| format!("loading webhook receiver {}", args.receiver))? .ok_or_else(|| { - anyhow::anyhow!("no instance {} exists", args.receiver) + anyhow::anyhow!("no webhook receiver {} exists", args.receiver) })?; const ID: &'static str = "ID"; @@ -8217,9 +8246,9 @@ async fn cmd_db_webhook_rx_info( GLOB_EXACT, ]); - let db::model::WebhookReceiver { + let WebhookReceiver { identity: - db::model::WebhookReceiverIdentity { + nexus_db_model::WebhookReceiverIdentity { id, name, description, @@ -8305,7 +8334,7 @@ async fn cmd_db_webhook_rx_info( .filter(subscription_dsl::rx_id.eq(id.into_untyped_uuid())) .filter(subscription_dsl::glob.is_null()) .select(subscription_dsl::event_class) - .load_async::(&*conn) + .load_async::(&*conn) .await; match exact { Ok(exact) => { @@ -8349,7 +8378,7 @@ async fn cmd_db_webhook_rx_info( .filter(subscription_dsl::rx_id.eq(id.into_untyped_uuid())) .filter(subscription_dsl::glob.eq(glob)) .select(subscription_dsl::event_class) - .load_async::(&*conn) + .load_async::(&*conn) .await; match exact { Ok(exact) => { @@ -8372,6 +8401,190 @@ async fn cmd_db_webhook_rx_info( Ok(()) } +async fn cmd_db_webhook_delivery_list( + datastore: &DataStore, + fetch_opts: &DbFetchOptions, + args: &WebhookDeliveryListArgs, +) -> anyhow::Result<()> { + use nexus_db_schema::schema::webhook_delivery::dsl as delivery_dsl; + let conn = datastore.pool_connection_for_tests().await?; + let mut query = delivery_dsl::webhook_delivery + .limit(fetch_opts.fetch_limit.get().into()) + .order_by(delivery_dsl::time_created.desc()) + .into_boxed(); + + if let (Some(before), Some(after)) = (args.before, args.after) { + anyhow::ensure!( + after < before, + "if both after and before are included, after must be earlier than before" + ); + } + + if let Some(before) = args.before { + query = query.filter(delivery_dsl::time_created.lt(before)); + } + + if let Some(after) = args.before { + query = query.filter(delivery_dsl::time_created.gt(after)); + } + + if let Some(ref receiver) = args.receiver { + let rx = + lookup_webhook_rx(datastore, receiver).await?.ok_or_else(|| { + anyhow::anyhow!("no webhook receiver {receiver} found") + })?; + query = query.filter(delivery_dsl::rx_id.eq(rx.identity.id)); + } + + if !args.states.is_empty() { + query = query.filter(delivery_dsl::state.eq_any(args.states.clone())); + } + + if !args.triggers.is_empty() { + query = query + .filter(delivery_dsl::triggered_by.eq_any(args.triggers.clone())); + } + + let ctx = || "listing webhook receivers"; + + let deliveries = query + .select(WebhookDelivery::as_select()) + .load_async(&*conn) + .await + .with_context(ctx)?; + + check_limit(&deliveries, fetch_opts.fetch_limit, ctx); + + #[derive(Tabled)] + struct DeliveryRow { + id: Uuid, + trigger: nexus_db_model::WebhookDeliveryTrigger, + state: nexus_db_model::WebhookDeliveryState, + attempts: u8, + #[tabled(display_with = "datetime_rfc3339_concise")] + time_created: DateTime, + #[tabled(display_with = "datetime_opt_rfc3339_concise")] + time_completed: Option>, + } + + #[derive(Tabled)] + struct WithEventId { + #[tabled(inline)] + inner: T, + event_id: Uuid, + } + + #[derive(Tabled)] + struct WithRxId { + #[tabled(inline)] + inner: T, + receiver_id: Uuid, + } + + impl From<&'_ WebhookDelivery> for DeliveryRow { + fn from(d: &WebhookDelivery) -> Self { + let WebhookDelivery { + id, + // event and receiver UUIDs are toggled on and off based on + // whether or not we are filtering by receiver and event, so + // ignore them here. + event_id: _, + rx_id: _, + attempts, + state, + time_created, + time_completed, + // ignore these as they are used for runtime coordination and + // aren't very useful for showing delivery history + deliverator_id: _, + time_leased: _, + triggered_by, + } = d; + Self { + id: id.into_untyped_uuid(), + trigger: *triggered_by, + state: *state, + attempts: attempts.0, + time_created: *time_created, + time_completed: *time_completed, + } + } + } + + impl<'d, T> From<&'d WebhookDelivery> for WithEventId + where + T: From<&'d WebhookDelivery> + Tabled, + { + fn from(d: &'d WebhookDelivery) -> Self { + Self { event_id: d.event_id.into_untyped_uuid(), inner: T::from(d) } + } + } + + impl<'d, T> From<&'d WebhookDelivery> for WithRxId + where + T: From<&'d WebhookDelivery> + Tabled, + { + fn from(d: &'d WebhookDelivery) -> Self { + Self { receiver_id: d.rx_id.into_untyped_uuid(), inner: T::from(d) } + } + } + + let mut table = match (args.receiver.as_ref(), args.event) { + // Filtered by both receiver and event, so don't display either. + (Some(_), Some(_)) => { + tabled::Table::new(deliveries.iter().map(DeliveryRow::from)) + } + // Filtered by neither receiver nor event, so include both. + (None, None) => tabled::Table::new( + deliveries.iter().map(WithRxId::>::from), + ), + // Filtered by receiver ID only + (Some(_), None) => tabled::Table::new( + deliveries.iter().map(WithEventId::::from), + ), + // Filtered by event ID only + (None, Some(_)) => tabled::Table::new( + deliveries.iter().map(WithRxId::::from), + ), + }; + table + .with(tabled::settings::Style::empty()) + .with(tabled::settings::Padding::new(0, 1, 0, 0)); + + println!("{table}"); + Ok(()) +} + +/// Helper function to look up a webhook receiver with the given name or ID +async fn lookup_webhook_rx( + datastore: &DataStore, + name_or_id: &NameOrId, +) -> anyhow::Result> { + use nexus_db_schema::schema::webhook_receiver::dsl; + + let conn = datastore.pool_connection_for_tests().await?; + match name_or_id { + NameOrId::Id(id) => { + dsl::webhook_receiver + .filter(dsl::id.eq(*id)) + .limit(1) + .select(WebhookReceiver::as_select()) + .get_result_async(&*conn) + .await + } + NameOrId::Name(ref name) => { + dsl::webhook_receiver + .filter(dsl::name.eq(name.to_string())) + .limit(1) + .select(WebhookReceiver::as_select()) + .get_result_async(&*conn) + .await + } + } + .optional() + .with_context(|| format!("loading webhook_receiver {name_or_id}")) +} + // Format a `chrono::DateTime` in RFC3339 with milliseconds precision and using // `Z` rather than the UTC offset for UTC timestamps, to save a few characters // of line width in tabular output. diff --git a/nexus/db-model/src/webhook_delivery_state.rs b/nexus/db-model/src/webhook_delivery_state.rs index de37195d8cd..e8714ecfeae 100644 --- a/nexus/db-model/src/webhook_delivery_state.rs +++ b/nexus/db-model/src/webhook_delivery_state.rs @@ -7,6 +7,7 @@ use nexus_types::external_api::views; use serde::Deserialize; use serde::Serialize; use std::fmt; +use std::str::FromStr; impl_enum_type!( WebhookDeliveryStateEnum: @@ -57,3 +58,10 @@ impl From for WebhookDeliveryState { } } } + +impl FromStr for WebhookDeliveryState { + type Err = omicron_common::api::external::Error; + fn from_str(s: &str) -> Result { + views::WebhookDeliveryState::from_str(s).map(Into::into) + } +} diff --git a/nexus/db-model/src/webhook_delivery_trigger.rs b/nexus/db-model/src/webhook_delivery_trigger.rs index dc761a75379..a69a69f50e6 100644 --- a/nexus/db-model/src/webhook_delivery_trigger.rs +++ b/nexus/db-model/src/webhook_delivery_trigger.rs @@ -7,6 +7,7 @@ use nexus_types::external_api::views; use serde::Deserialize; use serde::Serialize; use std::fmt; +use std::str::FromStr; impl_enum_type!( WebhookDeliveryTriggerEnum: @@ -61,3 +62,10 @@ impl From for WebhookDeliveryTrigger { } } } + +impl FromStr for WebhookDeliveryTrigger { + type Err = omicron_common::api::external::Error; + fn from_str(s: &str) -> Result { + views::WebhookDeliveryTrigger::from_str(s).map(Into::into) + } +} diff --git a/nexus/types/src/external_api/views.rs b/nexus/types/src/external_api/views.rs index c06e239ce2e..fcfd69b06e7 100644 --- a/nexus/types/src/external_api/views.rs +++ b/nexus/types/src/external_api/views.rs @@ -1174,8 +1174,33 @@ impl fmt::Display for WebhookDeliveryState { } } +impl std::str::FromStr for WebhookDeliveryState { + type Err = Error; + fn from_str(s: &str) -> Result { + for &v in Self::ALL { + if s.trim().eq_ignore_ascii_case(v.as_str()) { + return Ok(v); + } + } + Err(Error::invalid_value( + "WebhookDeliveryState", + "expected one of 'pending', 'delivered', or 'failed'", + )) + } +} + /// The reason a webhook event was delivered -#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize, JsonSchema)] +#[derive( + Copy, + Clone, + Debug, + Eq, + PartialEq, + Deserialize, + Serialize, + JsonSchema, + strum::VariantArray, +)] #[serde(rename_all = "snake_case")] pub enum WebhookDeliveryTrigger { /// Delivery was triggered by the event occurring for the first time. @@ -1202,6 +1227,21 @@ impl fmt::Display for WebhookDeliveryTrigger { } } +impl std::str::FromStr for WebhookDeliveryTrigger { + type Err = Error; + fn from_str(s: &str) -> Result { + for &v in ::VARIANTS { + if s.trim().eq_ignore_ascii_case(v.as_str()) { + return Ok(v); + } + } + Err(Error::invalid_value( + "WebhookDeliveryTrigger", + "expected one of 'event', 'resend', or 'probe'", + )) + } +} + /// An individual delivery attempt for a webhook event. /// /// This represents a single HTTP request that was sent to the receiver, and its From 92232ec9d9ec2097974862c483fab294486565e5 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 18 Mar 2025 12:51:20 -0700 Subject: [PATCH 06/14] [omdb] add `webhook delivery info` --- dev-tools/omdb/src/bin/omdb/db.rs | 189 ++++++++++++++++++++++++++++++ 1 file changed, 189 insertions(+) diff --git a/dev-tools/omdb/src/bin/omdb/db.rs b/dev-tools/omdb/src/bin/omdb/db.rs index d8d4e28c5a0..7b620c9fbad 100644 --- a/dev-tools/omdb/src/bin/omdb/db.rs +++ b/dev-tools/omdb/src/bin/omdb/db.rs @@ -1207,6 +1207,10 @@ enum WebhookDeliveryCommands { /// List webhook deliveries #[clap(alias = "ls")] List(WebhookDeliveryListArgs), + + /// Show details on a webhook delivery, including its payload and attempt history. + #[clap(alias = "show")] + Info(WebhookDeliveryInfoArgs), } #[derive(Debug, Args, Clone)] @@ -1236,6 +1240,12 @@ struct WebhookDeliveryListArgs { after: Option>, } +#[derive(Debug, Args, Clone)] +struct WebhookDeliveryInfoArgs { + /// The ID of the delivery to show. + delivery_id: Uuid, +} + impl DbArgs { /// Run a `omdb db` subcommand. /// @@ -8130,6 +8140,9 @@ async fn cmd_db_webhook( WebhookCommands::Delivery { command: WebhookDeliveryCommands::List(args), } => cmd_db_webhook_delivery_list(datastore, fetch_opts, args).await, + WebhookCommands::Delivery { + command: WebhookDeliveryCommands::Info(args), + } => cmd_db_webhook_delivery_info(datastore, fetch_opts, args).await, WebhookCommands::Event => { Err(anyhow::anyhow!("not yet implemented, sorry!")) } @@ -8456,6 +8469,7 @@ async fn cmd_db_webhook_delivery_list( check_limit(&deliveries, fetch_opts.fetch_limit, ctx); #[derive(Tabled)] + #[tabled(rename_all = "SCREAMING_SNAKE_CASE")] struct DeliveryRow { id: Uuid, trigger: nexus_db_model::WebhookDeliveryTrigger, @@ -8468,6 +8482,7 @@ async fn cmd_db_webhook_delivery_list( } #[derive(Tabled)] + #[tabled(rename_all = "SCREAMING_SNAKE_CASE")] struct WithEventId { #[tabled(inline)] inner: T, @@ -8475,6 +8490,7 @@ async fn cmd_db_webhook_delivery_list( } #[derive(Tabled)] + #[tabled(rename_all = "SCREAMING_SNAKE_CASE")] struct WithRxId { #[tabled(inline)] inner: T, @@ -8585,6 +8601,171 @@ async fn lookup_webhook_rx( .with_context(|| format!("loading webhook_receiver {name_or_id}")) } +async fn cmd_db_webhook_delivery_info( + datastore: &DataStore, + fetch_opts: &DbFetchOptions, + args: &WebhookDeliveryInfoArgs, +) -> anyhow::Result<()> { + use db::model::WebhookDeliveryAttempt; + use nexus_db_schema::schema::webhook_delivery::dsl; + use nexus_db_schema::schema::webhook_delivery_attempt::dsl as attempt_dsl; + + let WebhookDeliveryInfoArgs { delivery_id } = args; + let conn = datastore.pool_connection_for_tests().await?; + let delivery = dsl::webhook_delivery + .filter(dsl::id.eq(*delivery_id)) + .limit(1) + .select(WebhookDelivery::as_select()) + .get_result_async(&*conn) + .await + .optional() + .with_context(|| format!("loading webhook delivery {delivery_id}"))? + .ok_or_else(|| { + anyhow::anyhow!("no webhook delivery {delivery_id} exists") + })?; + + const ID: &'static str = "ID"; + const EVENT_ID: &'static str = "event ID"; + const RECEIVER_ID: &'static str = "receiver ID"; + const STATE: &'static str = "state"; + const TRIGGER: &'static str = "triggered by"; + const ATTEMPTS: &'static str = "attempts"; + const TIME_CREATED: &'static str = "created at"; + const TIME_COMPLETED: &'static str = "completed at"; + + const DELIVERATOR_ID: &'static str = "by Nexus"; + const TIME_LEASED: &'static str = "leased at"; + + const WIDTH: usize = const_max_len(&[ + ID, + EVENT_ID, + RECEIVER_ID, + TRIGGER, + STATE, + TIME_CREATED, + TIME_COMPLETED, + DELIVERATOR_ID, + TIME_LEASED, + ATTEMPTS, + ]); + + let WebhookDelivery { + id, + event_id, + rx_id, + triggered_by, + attempts, + time_created, + time_completed, + state, + deliverator_id, + time_leased, + } = delivery; + println!("\n{:=<80}", "== DELIVERY "); + println!(" {ID:>WIDTH$}: {id}"); + println!(" {EVENT_ID:>WIDTH$}: {event_id}"); + println!(" {RECEIVER_ID:>WIDTH$}: {rx_id}"); + println!(" {STATE:>WIDTH$}: {state}"); + println!(" {TRIGGER:>WIDTH$}: {triggered_by}"); + println!(" {TIME_CREATED:>WIDTH$}: {time_created}"); + println!(" {ATTEMPTS}: {}", attempts.0); + + if let Some(completed) = time_completed { + println!("\n{:=<80}", "== DELIVERY COMPLETED "); + println!(" {TIME_COMPLETED:>WIDTH$}: {completed}"); + if let Some(leased) = time_leased { + println!(" {TIME_LEASED:>WIDTH$}: {leased}"); + } else { + println!( + "/!\\ WEIRD: delivery is completed but has no start timestamp?" + ); + } + if let Some(nexus) = deliverator_id { + println!(" {DELIVERATOR_ID:>WIDTH$}: {nexus}"); + } else { + println!("/!\\ WEIRD: delivery is completed but has no Nexus ID?"); + } + } else if let Some(leased) = time_leased { + println!("\n{:=<80}", "== DELIVERY IN PROGRESS "); + println!(" {TIME_LEASED:>WIDTH$}: {leased}"); + + if let Some(nexus) = deliverator_id { + println!(" {DELIVERATOR_ID:>WIDTH$}: {nexus}"); + } else { + println!( + "/!\\ WEIRD: delivery is in progress but has no Nexus ID?" + ); + } + } else if let Some(deliverator) = deliverator_id { + println!( + "/!\\ WEIRD: delivery is not completed or in progress but has \ + Nexus ID {deliverator:?}" + ); + } + + // Okay, now go get attempts for this delivery. + let ctx = || format!("listing delivery attempts for {delivery_id}"); + let attempts = attempt_dsl::webhook_delivery_attempt + .filter(attempt_dsl::delivery_id.eq(*delivery_id)) + .order_by(attempt_dsl::attempt.desc()) + .limit(fetch_opts.fetch_limit.get().into()) + .select(WebhookDeliveryAttempt::as_select()) + .load_async(&*conn) + .await + .with_context(ctx)?; + + check_limit(&attempts, fetch_opts.fetch_limit, ctx); + + if !attempts.is_empty() { + println!("\n{:=<80}", "== DELIVERY ATTEMPT HISTORY "); + + #[derive(Tabled)] + #[tabled(rename_all = "SCREAMING_SNAKE_CASE")] + struct DeliveryAttemptRow { + id: Uuid, + #[tabled(rename = "#")] + attempt: u8, + #[tabled(display_with = "datetime_rfc3339_concise")] + time_created: DateTime, + nexus_id: Uuid, + result: db::model::WebhookDeliveryAttemptResult, + #[tabled(display_with = "display_u16_opt")] + status: Option, + #[tabled(display_with = "display_time_delta_opt")] + duration: Option, + } + + let rows = attempts.into_iter().map( + |WebhookDeliveryAttempt { + id, + delivery_id: _, + rx_id: _, + attempt, + result, + response_status, + response_duration, + time_created, + deliverator_id, + }| DeliveryAttemptRow { + id: id.into_untyped_uuid(), + attempt: attempt.0, + time_created, + nexus_id: deliverator_id.into_untyped_uuid(), + result, + status: response_status.map(|u| u.into()), + duration: response_duration, + }, + ); + let mut table = tabled::Table::new(rows); + table + .with(tabled::settings::Style::empty()) + .with(tabled::settings::Padding::new(0, 1, 0, 0)); + println!("{table}"); + } + + Ok(()) +} + // Format a `chrono::DateTime` in RFC3339 with milliseconds precision and using // `Z` rather than the UTC offset for UTC timestamps, to save a few characters // of line width in tabular output. @@ -8698,3 +8879,11 @@ async fn cmd_db_zpool_set_storage_buffer( Ok(()) } + +fn display_time_delta_opt(t: &Option) -> String { + t.map(|t| t.to_string()).unwrap_or_else(|| "-".to_string()) +} + +fn display_u16_opt(u: &Option) -> String { + u.map(|u| u.to_string()).unwrap_or_else(|| "-".to_string()) +} From 5dc3f93032b295fdb3c95808144b58abc2e2004a Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Mon, 28 Apr 2025 21:20:01 -0700 Subject: [PATCH 07/14] pput all the webhook stuff in its own file this should make life easier in the future (e.g. no merge conflicts) --- dev-tools/omdb/src/bin/omdb/db.rs | 749 +-------------------- dev-tools/omdb/src/bin/omdb/db/webhook.rs | 766 ++++++++++++++++++++++ dev-tools/omdb/src/bin/omdb/helpers.rs | 19 + 3 files changed, 789 insertions(+), 745 deletions(-) create mode 100644 dev-tools/omdb/src/bin/omdb/db/webhook.rs diff --git a/dev-tools/omdb/src/bin/omdb/db.rs b/dev-tools/omdb/src/bin/omdb/db.rs index 7b620c9fbad..4b048084c92 100644 --- a/dev-tools/omdb/src/bin/omdb/db.rs +++ b/dev-tools/omdb/src/bin/omdb/db.rs @@ -104,9 +104,6 @@ use nexus_db_model::Volume; use nexus_db_model::VolumeRepair; use nexus_db_model::VolumeResourceUsage; use nexus_db_model::VpcSubnet; -use nexus_db_model::WebhookDelivery; -use nexus_db_model::WebhookEventClass; -use nexus_db_model::WebhookReceiver; use nexus_db_model::Zpool; use nexus_db_model::to_db_typed_uuid; use nexus_db_queries::context::OpContext; @@ -145,8 +142,6 @@ use omicron_common::api::external::DataPageParams; use omicron_common::api::external::Generation; use omicron_common::api::external::InstanceState; use omicron_common::api::external::MacAddr; -use omicron_common::api::external::NameOrId; -use omicron_common::api::external::http_pagination::PaginatedBy; use omicron_uuid_kinds::CollectionUuid; use omicron_uuid_kinds::DatasetUuid; use omicron_uuid_kinds::DownstairsRegionUuid; @@ -157,7 +152,6 @@ use omicron_uuid_kinds::PhysicalDiskUuid; use omicron_uuid_kinds::PropolisUuid; use omicron_uuid_kinds::SledUuid; use omicron_uuid_kinds::VolumeUuid; -use omicron_uuid_kinds::WebhookEventUuid; use omicron_uuid_kinds::ZpoolUuid; use sled_agent_client::VolumeConstructionRequest; use std::borrow::Cow; @@ -174,8 +168,11 @@ use std::sync::Arc; use strum::IntoEnumIterator; use tabled::Tabled; use uuid::Uuid; +use webhook::WebhookArgs; +use webhook::cmd_db_webhook; mod saga; +mod webhook; const NO_ACTIVE_PROPOLIS_MSG: &str = ""; const NOT_ON_SLED_MSG: &str = ""; @@ -1158,94 +1155,6 @@ struct SetStorageBufferArgs { storage_buffer: i64, } -#[derive(Debug, Args, Clone)] -struct WebhookArgs { - #[command(subcommand)] - command: WebhookCommands, -} - -#[derive(Debug, Subcommand, Clone)] -enum WebhookCommands { - /// Get information on webhook receivers - #[clap(alias = "rx")] - Receiver { - #[command(subcommand)] - command: WebhookRxCommands, - }, - /// Get information on webhook events - Event, - /// Get information on webhook delivieries - Delivery { - #[command(subcommand)] - command: WebhookDeliveryCommands, - }, -} - -#[derive(Debug, Subcommand, Clone)] -enum WebhookRxCommands { - /// List webhook receivers - #[clap(alias = "ls")] - List(WebhookRxListArgs), - - #[clap(alias = "show")] - Info(WebhookRxInfoArgs), -} - -#[derive(Debug, Args, Clone)] -struct WebhookRxInfoArgs { - receiver: NameOrId, -} - -#[derive(Debug, Args, Clone)] -struct WebhookRxListArgs { - #[clap(long, short = 'a')] - start_at: Option, -} - -#[derive(Debug, Subcommand, Clone)] -enum WebhookDeliveryCommands { - /// List webhook deliveries - #[clap(alias = "ls")] - List(WebhookDeliveryListArgs), - - /// Show details on a webhook delivery, including its payload and attempt history. - #[clap(alias = "show")] - Info(WebhookDeliveryInfoArgs), -} - -#[derive(Debug, Args, Clone)] -struct WebhookDeliveryListArgs { - /// If present, show only deliveries to this receiver. - #[clap(long, short, alias = "rx")] - receiver: Option, - - /// If present, select only deliveries for the given event. - #[clap(long, short)] - event: Option, - - /// If present, select only deliveries in the provided state(s) - #[clap(long = "state", short)] - states: Vec, - - /// If present, select only deliveries with the provided trigger(s) - #[clap(long = "trigger", short)] - triggers: Vec, - - /// Include only delivery entries created before this timestamp - #[clap(long, short)] - before: Option>, - - /// Include only delivery entries created after this timestamp - #[clap(long, short)] - after: Option>, -} - -#[derive(Debug, Args, Clone)] -struct WebhookDeliveryInfoArgs { - /// The ID of the delivery to show. - delivery_id: Uuid, -} - impl DbArgs { /// Run a `omdb db` subcommand. /// @@ -1557,7 +1466,7 @@ impl DbArgs { command: OximeterCommands::ListProducers }) => cmd_db_oximeter_list_producers(&datastore, fetch_opts).await, - DbCommands::Webhook(WebhookArgs { command }) => cmd_db_webhook(&opctx, &datastore, &fetch_opts, command).await, + DbCommands::Webhook(args) => cmd_db_webhook(&opctx, &datastore, &fetch_opts, &args).await, DbCommands::Zpool(ZpoolArgs { command: ZpoolCommands::List(args) }) => cmd_db_zpool_list(&opctx, &datastore, &args).await, @@ -8124,648 +8033,6 @@ async fn cmd_db_oximeter_list_producers( Ok(()) } -async fn cmd_db_webhook( - opctx: &OpContext, - datastore: &DataStore, - fetch_opts: &DbFetchOptions, - command: &WebhookCommands, -) -> anyhow::Result<()> { - match command { - WebhookCommands::Receiver { - command: WebhookRxCommands::List(args), - } => cmd_db_webhook_rx_list(opctx, datastore, fetch_opts, args).await, - WebhookCommands::Receiver { - command: WebhookRxCommands::Info(args), - } => cmd_db_webhook_rx_info(datastore, fetch_opts, args).await, - WebhookCommands::Delivery { - command: WebhookDeliveryCommands::List(args), - } => cmd_db_webhook_delivery_list(datastore, fetch_opts, args).await, - WebhookCommands::Delivery { - command: WebhookDeliveryCommands::Info(args), - } => cmd_db_webhook_delivery_info(datastore, fetch_opts, args).await, - WebhookCommands::Event => { - Err(anyhow::anyhow!("not yet implemented, sorry!")) - } - } -} - -async fn cmd_db_webhook_rx_list( - opctx: &OpContext, - datastore: &DataStore, - fetch_opts: &DbFetchOptions, - args: &WebhookRxListArgs, -) -> anyhow::Result<()> { - let ctx = || { - if let Some(starting_at) = args.start_at { - format!("listing webhook receivers (starting at {starting_at}") - } else { - "listing webhook_receivers".to_string() - } - }; - let pagparams = DataPageParams { - marker: args.start_at.as_ref(), - ..first_page(fetch_opts.fetch_limit) - }; - let rxs = datastore - .webhook_rx_list(opctx, &PaginatedBy::Id(pagparams)) - .await - .with_context(ctx)?; - - check_limit(&rxs, fetch_opts.fetch_limit, ctx); - - #[derive(Tabled)] - #[tabled(rename_all = "SCREAMING_SNAKE_CASE")] - struct RxRow { - id: Uuid, - #[tabled(display_with = "datetime_rfc3339_concise")] - created: chrono::DateTime, - #[tabled(display_with = "datetime_rfc3339_concise")] - modified: chrono::DateTime, - secrets: usize, - subscriptions: usize, - name: String, - endpoint: String, - } - - let rows = rxs.into_iter().map( - |db::model::WebhookReceiverConfig { rx, secrets, subscriptions }| { - RxRow { - id: rx.id().into_untyped_uuid(), - name: rx.identity.name.to_string(), - created: rx.time_created(), - modified: rx.time_modified(), - secrets: secrets.len(), - subscriptions: subscriptions.len(), - endpoint: rx.endpoint, - } - }, - ); - - let table = tabled::Table::new(rows) - .with(tabled::settings::Style::empty()) - .with(tabled::settings::Padding::new(0, 1, 0, 0)) - .to_string(); - println!("{table}"); - - Ok(()) -} - -async fn cmd_db_webhook_rx_info( - datastore: &DataStore, - fetch_opts: &DbFetchOptions, - args: &WebhookRxInfoArgs, -) -> anyhow::Result<()> { - use nexus_db_schema::schema::webhook_rx_event_glob::dsl as glob_dsl; - use nexus_db_schema::schema::webhook_rx_subscription::dsl as subscription_dsl; - use nexus_db_schema::schema::webhook_secret::dsl as secret_dsl; - - let conn = datastore.pool_connection_for_tests().await?; - let rx = lookup_webhook_rx(datastore, &args.receiver) - .await - .with_context(|| format!("loading webhook receiver {}", args.receiver))? - .ok_or_else(|| { - anyhow::anyhow!("no webhook receiver {} exists", args.receiver) - })?; - - const ID: &'static str = "ID"; - const NAME: &'static str = "name"; - - const DESCRIPTION: &'static str = "description"; - const CREATED: &'static str = "created at"; - const DELETED: &'static str = "deleted at"; - const MODIFIED: &'static str = "modified at"; - const ENDPOINT: &'static str = "endpoint"; - const GEN: &'static str = "generation"; - const EXACT: &'static str = "exact subscriptions"; - const GLOBS: &'static str = "glob subscriptions"; - const GLOB_REGEX: &'static str = " regex"; - const GLOB_SCHEMA_VERSION: &'static str = " schema version"; - const GLOB_CREATED: &'static str = " created at"; - const GLOB_EXACT: &'static str = " exact subscriptions"; - const WIDTH: usize = const_max_len(&[ - ID, - NAME, - DESCRIPTION, - CREATED, - DELETED, - MODIFIED, - ENDPOINT, - GEN, - EXACT, - GLOBS, - GLOB_REGEX, - GLOB_SCHEMA_VERSION, - GLOB_CREATED, - GLOB_EXACT, - ]); - - let WebhookReceiver { - identity: - nexus_db_model::WebhookReceiverIdentity { - id, - name, - description, - time_created, - time_modified, - time_deleted, - }, - endpoint, - secret_gen, - subscription_gen, - } = rx; - - println!("\n{:=<80}", "== RECEIVER "); - println!(" {NAME:>WIDTH$}: {name}"); - println!(" {ID:>WIDTH$}: {id}"); - println!(" {DESCRIPTION:>WIDTH$}: {description}"); - println!(" {ENDPOINT:>WIDTH$}: {endpoint}"); - println!(); - println!(" {CREATED:>WIDTH$}: {time_created}"); - println!(" {MODIFIED:>WIDTH$}: {time_modified}"); - if let Some(deleted) = time_deleted { - println!(" {DELETED:>WIDTH$}: {deleted}"); - } - - println!("\n{:=<80}", "== SECRETS "); - println!(" {GEN:>WIDTH$}: {}", secret_gen.0); - - let query = secret_dsl::webhook_secret - .filter(secret_dsl::rx_id.eq(id.into_untyped_uuid())) - .select(db::model::WebhookSecret::as_select()); - let secrets = if fetch_opts.include_deleted { - query.load_async(&*conn).await - } else { - query - .filter(secret_dsl::time_deleted.is_null()) - .load_async(&*conn) - .await - }; - - match secrets { - Ok(secrets) => { - #[derive(Tabled)] - struct SecretRow { - id: Uuid, - - #[tabled(display_with = "datetime_rfc3339_concise")] - created: chrono::DateTime, - - #[tabled(display_with = "datetime_opt_rfc3339_concise")] - deleted: Option>, - } - let rows = secrets.into_iter().map( - |db::model::WebhookSecret { - identity: - db::model::WebhookSecretIdentity { - id, - time_modified: _, - time_created, - }, - webhook_receiver_id: _, - secret: _, - time_deleted, - }| SecretRow { - id: id.into_untyped_uuid(), - created: time_created, - deleted: time_deleted, - }, - ); - - let table = tabled::Table::new(rows) - .with(tabled::settings::Style::empty()) - .with(tabled::settings::Padding::new(0, 1, 0, 0)) - .to_string(); - println!("{table}"); - } - Err(e) => eprintln!("failed to list secrets: {e}"), - } - - println!("\n{:=<80}", "== SUBSCRIPTIONS "); - println!(" {GEN:>WIDTH$}: {}", subscription_gen.0); - - let exact = subscription_dsl::webhook_rx_subscription - .filter(subscription_dsl::rx_id.eq(id.into_untyped_uuid())) - .filter(subscription_dsl::glob.is_null()) - .select(subscription_dsl::event_class) - .load_async::(&*conn) - .await; - match exact { - Ok(exact) => { - println!(" {EXACT:>WIDTH$}: {}", exact.len()); - for event_class in exact { - println!(" - {event_class}"); - } - } - Err(e) => { - eprintln!("failed to list exact subscriptions: {e}"); - } - } - - let globs = glob_dsl::webhook_rx_event_glob - .filter(glob_dsl::rx_id.eq(id.into_untyped_uuid())) - .select(db::model::WebhookRxEventGlob::as_select()) - .load_async::(&*conn) - .await; - match globs { - Ok(globs) => { - println!(" {GLOBS:>WIDTH$}: {}", globs.len()); - for glob in globs { - let db::model::WebhookRxEventGlob { - rx_id: _, - glob: db::model::WebhookGlob { glob, regex }, - time_created, - schema_version, - } = glob; - println!(" - {glob}"); - println!(" {GLOB_CREATED:>WIDTH$}: {time_created}"); - if let Some(v) = schema_version { - println!(" {GLOB_SCHEMA_VERSION:>WIDTH$}: {v}") - } else { - println!( - "(i) {GLOB_SCHEMA_VERSION:>WIDTH$}: ", - ) - } - - println!(" {GLOB_REGEX:>WIDTH$}: {regex}"); - let exact = subscription_dsl::webhook_rx_subscription - .filter(subscription_dsl::rx_id.eq(id.into_untyped_uuid())) - .filter(subscription_dsl::glob.eq(glob)) - .select(subscription_dsl::event_class) - .load_async::(&*conn) - .await; - match exact { - Ok(exact) => { - println!(" {GLOB_EXACT:>WIDTH$}: {}", exact.len()); - for event_class in exact { - println!(" - {event_class}") - } - } - Err(e) => eprintln!( - "failed to list exact subscriptions for glob: {e}" - ), - } - } - } - Err(e) => { - eprintln!("failed to list glob subscriptions: {e}"); - } - } - - Ok(()) -} - -async fn cmd_db_webhook_delivery_list( - datastore: &DataStore, - fetch_opts: &DbFetchOptions, - args: &WebhookDeliveryListArgs, -) -> anyhow::Result<()> { - use nexus_db_schema::schema::webhook_delivery::dsl as delivery_dsl; - let conn = datastore.pool_connection_for_tests().await?; - let mut query = delivery_dsl::webhook_delivery - .limit(fetch_opts.fetch_limit.get().into()) - .order_by(delivery_dsl::time_created.desc()) - .into_boxed(); - - if let (Some(before), Some(after)) = (args.before, args.after) { - anyhow::ensure!( - after < before, - "if both after and before are included, after must be earlier than before" - ); - } - - if let Some(before) = args.before { - query = query.filter(delivery_dsl::time_created.lt(before)); - } - - if let Some(after) = args.before { - query = query.filter(delivery_dsl::time_created.gt(after)); - } - - if let Some(ref receiver) = args.receiver { - let rx = - lookup_webhook_rx(datastore, receiver).await?.ok_or_else(|| { - anyhow::anyhow!("no webhook receiver {receiver} found") - })?; - query = query.filter(delivery_dsl::rx_id.eq(rx.identity.id)); - } - - if !args.states.is_empty() { - query = query.filter(delivery_dsl::state.eq_any(args.states.clone())); - } - - if !args.triggers.is_empty() { - query = query - .filter(delivery_dsl::triggered_by.eq_any(args.triggers.clone())); - } - - let ctx = || "listing webhook receivers"; - - let deliveries = query - .select(WebhookDelivery::as_select()) - .load_async(&*conn) - .await - .with_context(ctx)?; - - check_limit(&deliveries, fetch_opts.fetch_limit, ctx); - - #[derive(Tabled)] - #[tabled(rename_all = "SCREAMING_SNAKE_CASE")] - struct DeliveryRow { - id: Uuid, - trigger: nexus_db_model::WebhookDeliveryTrigger, - state: nexus_db_model::WebhookDeliveryState, - attempts: u8, - #[tabled(display_with = "datetime_rfc3339_concise")] - time_created: DateTime, - #[tabled(display_with = "datetime_opt_rfc3339_concise")] - time_completed: Option>, - } - - #[derive(Tabled)] - #[tabled(rename_all = "SCREAMING_SNAKE_CASE")] - struct WithEventId { - #[tabled(inline)] - inner: T, - event_id: Uuid, - } - - #[derive(Tabled)] - #[tabled(rename_all = "SCREAMING_SNAKE_CASE")] - struct WithRxId { - #[tabled(inline)] - inner: T, - receiver_id: Uuid, - } - - impl From<&'_ WebhookDelivery> for DeliveryRow { - fn from(d: &WebhookDelivery) -> Self { - let WebhookDelivery { - id, - // event and receiver UUIDs are toggled on and off based on - // whether or not we are filtering by receiver and event, so - // ignore them here. - event_id: _, - rx_id: _, - attempts, - state, - time_created, - time_completed, - // ignore these as they are used for runtime coordination and - // aren't very useful for showing delivery history - deliverator_id: _, - time_leased: _, - triggered_by, - } = d; - Self { - id: id.into_untyped_uuid(), - trigger: *triggered_by, - state: *state, - attempts: attempts.0, - time_created: *time_created, - time_completed: *time_completed, - } - } - } - - impl<'d, T> From<&'d WebhookDelivery> for WithEventId - where - T: From<&'d WebhookDelivery> + Tabled, - { - fn from(d: &'d WebhookDelivery) -> Self { - Self { event_id: d.event_id.into_untyped_uuid(), inner: T::from(d) } - } - } - - impl<'d, T> From<&'d WebhookDelivery> for WithRxId - where - T: From<&'d WebhookDelivery> + Tabled, - { - fn from(d: &'d WebhookDelivery) -> Self { - Self { receiver_id: d.rx_id.into_untyped_uuid(), inner: T::from(d) } - } - } - - let mut table = match (args.receiver.as_ref(), args.event) { - // Filtered by both receiver and event, so don't display either. - (Some(_), Some(_)) => { - tabled::Table::new(deliveries.iter().map(DeliveryRow::from)) - } - // Filtered by neither receiver nor event, so include both. - (None, None) => tabled::Table::new( - deliveries.iter().map(WithRxId::>::from), - ), - // Filtered by receiver ID only - (Some(_), None) => tabled::Table::new( - deliveries.iter().map(WithEventId::::from), - ), - // Filtered by event ID only - (None, Some(_)) => tabled::Table::new( - deliveries.iter().map(WithRxId::::from), - ), - }; - table - .with(tabled::settings::Style::empty()) - .with(tabled::settings::Padding::new(0, 1, 0, 0)); - - println!("{table}"); - Ok(()) -} - -/// Helper function to look up a webhook receiver with the given name or ID -async fn lookup_webhook_rx( - datastore: &DataStore, - name_or_id: &NameOrId, -) -> anyhow::Result> { - use nexus_db_schema::schema::webhook_receiver::dsl; - - let conn = datastore.pool_connection_for_tests().await?; - match name_or_id { - NameOrId::Id(id) => { - dsl::webhook_receiver - .filter(dsl::id.eq(*id)) - .limit(1) - .select(WebhookReceiver::as_select()) - .get_result_async(&*conn) - .await - } - NameOrId::Name(ref name) => { - dsl::webhook_receiver - .filter(dsl::name.eq(name.to_string())) - .limit(1) - .select(WebhookReceiver::as_select()) - .get_result_async(&*conn) - .await - } - } - .optional() - .with_context(|| format!("loading webhook_receiver {name_or_id}")) -} - -async fn cmd_db_webhook_delivery_info( - datastore: &DataStore, - fetch_opts: &DbFetchOptions, - args: &WebhookDeliveryInfoArgs, -) -> anyhow::Result<()> { - use db::model::WebhookDeliveryAttempt; - use nexus_db_schema::schema::webhook_delivery::dsl; - use nexus_db_schema::schema::webhook_delivery_attempt::dsl as attempt_dsl; - - let WebhookDeliveryInfoArgs { delivery_id } = args; - let conn = datastore.pool_connection_for_tests().await?; - let delivery = dsl::webhook_delivery - .filter(dsl::id.eq(*delivery_id)) - .limit(1) - .select(WebhookDelivery::as_select()) - .get_result_async(&*conn) - .await - .optional() - .with_context(|| format!("loading webhook delivery {delivery_id}"))? - .ok_or_else(|| { - anyhow::anyhow!("no webhook delivery {delivery_id} exists") - })?; - - const ID: &'static str = "ID"; - const EVENT_ID: &'static str = "event ID"; - const RECEIVER_ID: &'static str = "receiver ID"; - const STATE: &'static str = "state"; - const TRIGGER: &'static str = "triggered by"; - const ATTEMPTS: &'static str = "attempts"; - const TIME_CREATED: &'static str = "created at"; - const TIME_COMPLETED: &'static str = "completed at"; - - const DELIVERATOR_ID: &'static str = "by Nexus"; - const TIME_LEASED: &'static str = "leased at"; - - const WIDTH: usize = const_max_len(&[ - ID, - EVENT_ID, - RECEIVER_ID, - TRIGGER, - STATE, - TIME_CREATED, - TIME_COMPLETED, - DELIVERATOR_ID, - TIME_LEASED, - ATTEMPTS, - ]); - - let WebhookDelivery { - id, - event_id, - rx_id, - triggered_by, - attempts, - time_created, - time_completed, - state, - deliverator_id, - time_leased, - } = delivery; - println!("\n{:=<80}", "== DELIVERY "); - println!(" {ID:>WIDTH$}: {id}"); - println!(" {EVENT_ID:>WIDTH$}: {event_id}"); - println!(" {RECEIVER_ID:>WIDTH$}: {rx_id}"); - println!(" {STATE:>WIDTH$}: {state}"); - println!(" {TRIGGER:>WIDTH$}: {triggered_by}"); - println!(" {TIME_CREATED:>WIDTH$}: {time_created}"); - println!(" {ATTEMPTS}: {}", attempts.0); - - if let Some(completed) = time_completed { - println!("\n{:=<80}", "== DELIVERY COMPLETED "); - println!(" {TIME_COMPLETED:>WIDTH$}: {completed}"); - if let Some(leased) = time_leased { - println!(" {TIME_LEASED:>WIDTH$}: {leased}"); - } else { - println!( - "/!\\ WEIRD: delivery is completed but has no start timestamp?" - ); - } - if let Some(nexus) = deliverator_id { - println!(" {DELIVERATOR_ID:>WIDTH$}: {nexus}"); - } else { - println!("/!\\ WEIRD: delivery is completed but has no Nexus ID?"); - } - } else if let Some(leased) = time_leased { - println!("\n{:=<80}", "== DELIVERY IN PROGRESS "); - println!(" {TIME_LEASED:>WIDTH$}: {leased}"); - - if let Some(nexus) = deliverator_id { - println!(" {DELIVERATOR_ID:>WIDTH$}: {nexus}"); - } else { - println!( - "/!\\ WEIRD: delivery is in progress but has no Nexus ID?" - ); - } - } else if let Some(deliverator) = deliverator_id { - println!( - "/!\\ WEIRD: delivery is not completed or in progress but has \ - Nexus ID {deliverator:?}" - ); - } - - // Okay, now go get attempts for this delivery. - let ctx = || format!("listing delivery attempts for {delivery_id}"); - let attempts = attempt_dsl::webhook_delivery_attempt - .filter(attempt_dsl::delivery_id.eq(*delivery_id)) - .order_by(attempt_dsl::attempt.desc()) - .limit(fetch_opts.fetch_limit.get().into()) - .select(WebhookDeliveryAttempt::as_select()) - .load_async(&*conn) - .await - .with_context(ctx)?; - - check_limit(&attempts, fetch_opts.fetch_limit, ctx); - - if !attempts.is_empty() { - println!("\n{:=<80}", "== DELIVERY ATTEMPT HISTORY "); - - #[derive(Tabled)] - #[tabled(rename_all = "SCREAMING_SNAKE_CASE")] - struct DeliveryAttemptRow { - id: Uuid, - #[tabled(rename = "#")] - attempt: u8, - #[tabled(display_with = "datetime_rfc3339_concise")] - time_created: DateTime, - nexus_id: Uuid, - result: db::model::WebhookDeliveryAttemptResult, - #[tabled(display_with = "display_u16_opt")] - status: Option, - #[tabled(display_with = "display_time_delta_opt")] - duration: Option, - } - - let rows = attempts.into_iter().map( - |WebhookDeliveryAttempt { - id, - delivery_id: _, - rx_id: _, - attempt, - result, - response_status, - response_duration, - time_created, - deliverator_id, - }| DeliveryAttemptRow { - id: id.into_untyped_uuid(), - attempt: attempt.0, - time_created, - nexus_id: deliverator_id.into_untyped_uuid(), - result, - status: response_status.map(|u| u.into()), - duration: response_duration, - }, - ); - let mut table = tabled::Table::new(rows); - table - .with(tabled::settings::Style::empty()) - .with(tabled::settings::Padding::new(0, 1, 0, 0)); - println!("{table}"); - } - - Ok(()) -} - // Format a `chrono::DateTime` in RFC3339 with milliseconds precision and using // `Z` rather than the UTC offset for UTC timestamps, to save a few characters // of line width in tabular output. @@ -8879,11 +8146,3 @@ async fn cmd_db_zpool_set_storage_buffer( Ok(()) } - -fn display_time_delta_opt(t: &Option) -> String { - t.map(|t| t.to_string()).unwrap_or_else(|| "-".to_string()) -} - -fn display_u16_opt(u: &Option) -> String { - u.map(|u| u.to_string()).unwrap_or_else(|| "-".to_string()) -} diff --git a/dev-tools/omdb/src/bin/omdb/db/webhook.rs b/dev-tools/omdb/src/bin/omdb/db/webhook.rs new file mode 100644 index 00000000000..eb680e414c0 --- /dev/null +++ b/dev-tools/omdb/src/bin/omdb/db/webhook.rs @@ -0,0 +1,766 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! `omdb db webhook` subcommands + +use super::DbFetchOptions; +use super::check_limit; +use super::first_page; +use crate::helpers::const_max_len; +use crate::helpers::datetime_opt_rfc3339_concise; +use crate::helpers::datetime_rfc3339_concise; +use crate::helpers::display_option_blank; + +use anyhow::Context; +use async_bb8_diesel::AsyncRunQueryDsl; +use chrono::DateTime; +use chrono::Utc; +use clap::Args; +use clap::Subcommand; +use diesel::ExpressionMethods; +use diesel::OptionalExtension; +use diesel::expression::SelectableHelper; +use diesel::query_dsl::QueryDsl; +use nexus_db_model::WebhookDelivery; +use nexus_db_model::WebhookEventClass; +use nexus_db_model::WebhookReceiver; +use nexus_db_queries::context::OpContext; +use nexus_db_queries::db; +use nexus_db_queries::db::DataStore; +use nexus_types::identity::Resource; +use omicron_common::api::external::DataPageParams; +use omicron_common::api::external::NameOrId; +use omicron_common::api::external::http_pagination::PaginatedBy; +use omicron_uuid_kinds::GenericUuid; +use omicron_uuid_kinds::WebhookEventUuid; +use tabled::Tabled; +use uuid::Uuid; + +#[derive(Debug, Args, Clone)] +pub(super) struct WebhookArgs { + #[command(subcommand)] + command: Commands, +} + +#[derive(Debug, Subcommand, Clone)] +enum Commands { + /// Get information on webhook receivers + #[clap(alias = "rx")] + Receiver { + #[command(subcommand)] + command: RxCommands, + }, + /// Get information on webhook events + Event, + /// Get information on webhook delivieries + Delivery { + #[command(subcommand)] + command: DeliveryCommands, + }, +} + +#[derive(Debug, Subcommand, Clone)] +enum RxCommands { + /// List webhook receivers + #[clap(alias = "ls")] + List(WebhookRxListArgs), + + #[clap(alias = "show")] + Info(WebhookRxInfoArgs), +} + +#[derive(Debug, Args, Clone)] +struct WebhookRxInfoArgs { + receiver: NameOrId, +} + +#[derive(Debug, Args, Clone)] +struct WebhookRxListArgs { + #[clap(long, short = 'a')] + start_at: Option, +} + +#[derive(Debug, Subcommand, Clone)] +enum DeliveryCommands { + /// List webhook deliveries + #[clap(alias = "ls")] + List(DeliveryListArgs), + + /// Show details on a webhook delivery, including its payload and attempt history. + #[clap(alias = "show")] + Info(DeliveryInfoArgs), +} + +#[derive(Debug, Args, Clone)] +struct DeliveryListArgs { + /// If present, show only deliveries to this receiver. + #[clap(long, short, alias = "rx")] + receiver: Option, + + /// If present, select only deliveries for the given event. + #[clap(long, short)] + event: Option, + + /// If present, select only deliveries in the provided state(s) + #[clap(long = "state", short)] + states: Vec, + + /// If present, select only deliveries with the provided trigger(s) + #[clap(long = "trigger", short)] + triggers: Vec, + + /// Include only delivery entries created before this timestamp + #[clap(long, short)] + before: Option>, + + /// Include only delivery entries created after this timestamp + #[clap(long, short)] + after: Option>, +} + +#[derive(Debug, Args, Clone)] +struct DeliveryInfoArgs { + /// The ID of the delivery to show. + delivery_id: Uuid, +} + +pub(super) async fn cmd_db_webhook( + opctx: &OpContext, + datastore: &DataStore, + fetch_opts: &DbFetchOptions, + args: &WebhookArgs, +) -> anyhow::Result<()> { + match &args.command { + Commands::Receiver { command: RxCommands::List(args) } => { + cmd_db_webhook_rx_list(opctx, datastore, fetch_opts, args).await + } + Commands::Receiver { command: RxCommands::Info(args) } => { + cmd_db_webhook_rx_info(datastore, fetch_opts, args).await + } + Commands::Delivery { command: DeliveryCommands::List(args) } => { + cmd_db_webhook_delivery_list(datastore, fetch_opts, args).await + } + Commands::Delivery { command: DeliveryCommands::Info(args) } => { + cmd_db_webhook_delivery_info(datastore, fetch_opts, args).await + } + Commands::Event => Err(anyhow::anyhow!("not yet implemented, sorry!")), + } +} + +async fn cmd_db_webhook_rx_list( + opctx: &OpContext, + datastore: &DataStore, + fetch_opts: &DbFetchOptions, + args: &WebhookRxListArgs, +) -> anyhow::Result<()> { + let ctx = || { + if let Some(starting_at) = args.start_at { + format!("listing webhook receivers (starting at {starting_at}") + } else { + "listing webhook_receivers".to_string() + } + }; + let pagparams = DataPageParams { + marker: args.start_at.as_ref(), + ..first_page(fetch_opts.fetch_limit) + }; + let rxs = datastore + .webhook_rx_list(opctx, &PaginatedBy::Id(pagparams)) + .await + .with_context(ctx)?; + + check_limit(&rxs, fetch_opts.fetch_limit, ctx); + + #[derive(Tabled)] + #[tabled(rename_all = "SCREAMING_SNAKE_CASE")] + struct RxRow { + id: Uuid, + #[tabled(display_with = "datetime_rfc3339_concise")] + created: chrono::DateTime, + #[tabled(display_with = "datetime_rfc3339_concise")] + modified: chrono::DateTime, + secrets: usize, + subscriptions: usize, + name: String, + endpoint: String, + } + + let rows = rxs.into_iter().map( + |db::model::WebhookReceiverConfig { rx, secrets, subscriptions }| { + RxRow { + id: rx.id().into_untyped_uuid(), + name: rx.identity.name.to_string(), + created: rx.time_created(), + modified: rx.time_modified(), + secrets: secrets.len(), + subscriptions: subscriptions.len(), + endpoint: rx.endpoint, + } + }, + ); + + let table = tabled::Table::new(rows) + .with(tabled::settings::Style::empty()) + .with(tabled::settings::Padding::new(0, 1, 0, 0)) + .to_string(); + println!("{table}"); + + Ok(()) +} + +async fn cmd_db_webhook_rx_info( + datastore: &DataStore, + fetch_opts: &DbFetchOptions, + args: &WebhookRxInfoArgs, +) -> anyhow::Result<()> { + use nexus_db_schema::schema::webhook_rx_event_glob::dsl as glob_dsl; + use nexus_db_schema::schema::webhook_rx_subscription::dsl as subscription_dsl; + use nexus_db_schema::schema::webhook_secret::dsl as secret_dsl; + + let conn = datastore.pool_connection_for_tests().await?; + let rx = lookup_webhook_rx(datastore, &args.receiver) + .await + .with_context(|| format!("loading webhook receiver {}", args.receiver))? + .ok_or_else(|| { + anyhow::anyhow!("no webhook receiver {} exists", args.receiver) + })?; + + const ID: &'static str = "ID"; + const NAME: &'static str = "name"; + + const DESCRIPTION: &'static str = "description"; + const CREATED: &'static str = "created at"; + const DELETED: &'static str = "deleted at"; + const MODIFIED: &'static str = "modified at"; + const ENDPOINT: &'static str = "endpoint"; + const GEN: &'static str = "generation"; + const EXACT: &'static str = "exact subscriptions"; + const GLOBS: &'static str = "glob subscriptions"; + const GLOB_REGEX: &'static str = " regex"; + const GLOB_SCHEMA_VERSION: &'static str = " schema version"; + const GLOB_CREATED: &'static str = " created at"; + const GLOB_EXACT: &'static str = " exact subscriptions"; + const WIDTH: usize = const_max_len(&[ + ID, + NAME, + DESCRIPTION, + CREATED, + DELETED, + MODIFIED, + ENDPOINT, + GEN, + EXACT, + GLOBS, + GLOB_REGEX, + GLOB_SCHEMA_VERSION, + GLOB_CREATED, + GLOB_EXACT, + ]); + + let WebhookReceiver { + identity: + nexus_db_model::WebhookReceiverIdentity { + id, + name, + description, + time_created, + time_modified, + time_deleted, + }, + endpoint, + secret_gen, + subscription_gen, + } = rx; + + println!("\n{:=<80}", "== RECEIVER "); + println!(" {NAME:>WIDTH$}: {name}"); + println!(" {ID:>WIDTH$}: {id}"); + println!(" {DESCRIPTION:>WIDTH$}: {description}"); + println!(" {ENDPOINT:>WIDTH$}: {endpoint}"); + println!(); + println!(" {CREATED:>WIDTH$}: {time_created}"); + println!(" {MODIFIED:>WIDTH$}: {time_modified}"); + if let Some(deleted) = time_deleted { + println!(" {DELETED:>WIDTH$}: {deleted}"); + } + + println!("\n{:=<80}", "== SECRETS "); + println!(" {GEN:>WIDTH$}: {}", secret_gen.0); + + let query = secret_dsl::webhook_secret + .filter(secret_dsl::rx_id.eq(id.into_untyped_uuid())) + .select(db::model::WebhookSecret::as_select()); + let secrets = if fetch_opts.include_deleted { + query.load_async(&*conn).await + } else { + query + .filter(secret_dsl::time_deleted.is_null()) + .load_async(&*conn) + .await + }; + + match secrets { + Ok(secrets) => { + #[derive(Tabled)] + struct SecretRow { + id: Uuid, + + #[tabled(display_with = "datetime_rfc3339_concise")] + created: chrono::DateTime, + + #[tabled(display_with = "datetime_opt_rfc3339_concise")] + deleted: Option>, + } + let rows = secrets.into_iter().map( + |db::model::WebhookSecret { + identity: + db::model::WebhookSecretIdentity { + id, + time_modified: _, + time_created, + }, + webhook_receiver_id: _, + secret: _, + time_deleted, + }| SecretRow { + id: id.into_untyped_uuid(), + created: time_created, + deleted: time_deleted, + }, + ); + + let table = tabled::Table::new(rows) + .with(tabled::settings::Style::empty()) + .with(tabled::settings::Padding::new(0, 1, 0, 0)) + .to_string(); + println!("{table}"); + } + Err(e) => eprintln!("failed to list secrets: {e}"), + } + + println!("\n{:=<80}", "== SUBSCRIPTIONS "); + println!(" {GEN:>WIDTH$}: {}", subscription_gen.0); + + let exact = subscription_dsl::webhook_rx_subscription + .filter(subscription_dsl::rx_id.eq(id.into_untyped_uuid())) + .filter(subscription_dsl::glob.is_null()) + .select(subscription_dsl::event_class) + .load_async::(&*conn) + .await; + match exact { + Ok(exact) => { + println!(" {EXACT:>WIDTH$}: {}", exact.len()); + for event_class in exact { + println!(" - {event_class}"); + } + } + Err(e) => { + eprintln!("failed to list exact subscriptions: {e}"); + } + } + + let globs = glob_dsl::webhook_rx_event_glob + .filter(glob_dsl::rx_id.eq(id.into_untyped_uuid())) + .select(db::model::WebhookRxEventGlob::as_select()) + .load_async::(&*conn) + .await; + match globs { + Ok(globs) => { + println!(" {GLOBS:>WIDTH$}: {}", globs.len()); + for glob in globs { + let db::model::WebhookRxEventGlob { + rx_id: _, + glob: db::model::WebhookGlob { glob, regex }, + time_created, + schema_version, + } = glob; + println!(" - {glob}"); + println!(" {GLOB_CREATED:>WIDTH$}: {time_created}"); + if let Some(v) = schema_version { + println!(" {GLOB_SCHEMA_VERSION:>WIDTH$}: {v}") + } else { + println!( + "(i) {GLOB_SCHEMA_VERSION:>WIDTH$}: ", + ) + } + + println!(" {GLOB_REGEX:>WIDTH$}: {regex}"); + let exact = subscription_dsl::webhook_rx_subscription + .filter(subscription_dsl::rx_id.eq(id.into_untyped_uuid())) + .filter(subscription_dsl::glob.eq(glob)) + .select(subscription_dsl::event_class) + .load_async::(&*conn) + .await; + match exact { + Ok(exact) => { + println!(" {GLOB_EXACT:>WIDTH$}: {}", exact.len()); + for event_class in exact { + println!(" - {event_class}") + } + } + Err(e) => eprintln!( + "failed to list exact subscriptions for glob: {e}" + ), + } + } + } + Err(e) => { + eprintln!("failed to list glob subscriptions: {e}"); + } + } + + Ok(()) +} + +async fn cmd_db_webhook_delivery_list( + datastore: &DataStore, + fetch_opts: &DbFetchOptions, + args: &DeliveryListArgs, +) -> anyhow::Result<()> { + use nexus_db_schema::schema::webhook_delivery::dsl as delivery_dsl; + let conn = datastore.pool_connection_for_tests().await?; + let mut query = delivery_dsl::webhook_delivery + .limit(fetch_opts.fetch_limit.get().into()) + .order_by(delivery_dsl::time_created.desc()) + .into_boxed(); + + if let (Some(before), Some(after)) = (args.before, args.after) { + anyhow::ensure!( + after < before, + "if both after and before are included, after must be earlier than before" + ); + } + + if let Some(before) = args.before { + query = query.filter(delivery_dsl::time_created.lt(before)); + } + + if let Some(after) = args.before { + query = query.filter(delivery_dsl::time_created.gt(after)); + } + + if let Some(ref receiver) = args.receiver { + let rx = + lookup_webhook_rx(datastore, receiver).await?.ok_or_else(|| { + anyhow::anyhow!("no webhook receiver {receiver} found") + })?; + query = query.filter(delivery_dsl::rx_id.eq(rx.identity.id)); + } + + if !args.states.is_empty() { + query = query.filter(delivery_dsl::state.eq_any(args.states.clone())); + } + + if !args.triggers.is_empty() { + query = query + .filter(delivery_dsl::triggered_by.eq_any(args.triggers.clone())); + } + + let ctx = || "listing webhook receivers"; + + let deliveries = query + .select(WebhookDelivery::as_select()) + .load_async(&*conn) + .await + .with_context(ctx)?; + + check_limit(&deliveries, fetch_opts.fetch_limit, ctx); + + #[derive(Tabled)] + #[tabled(rename_all = "SCREAMING_SNAKE_CASE")] + struct DeliveryRow { + id: Uuid, + trigger: nexus_db_model::WebhookDeliveryTrigger, + state: nexus_db_model::WebhookDeliveryState, + attempts: u8, + #[tabled(display_with = "datetime_rfc3339_concise")] + time_created: DateTime, + #[tabled(display_with = "datetime_opt_rfc3339_concise")] + time_completed: Option>, + } + + #[derive(Tabled)] + #[tabled(rename_all = "SCREAMING_SNAKE_CASE")] + struct WithEventId { + #[tabled(inline)] + inner: T, + event_id: Uuid, + } + + #[derive(Tabled)] + #[tabled(rename_all = "SCREAMING_SNAKE_CASE")] + struct WithRxId { + #[tabled(inline)] + inner: T, + receiver_id: Uuid, + } + + impl From<&'_ WebhookDelivery> for DeliveryRow { + fn from(d: &WebhookDelivery) -> Self { + let WebhookDelivery { + id, + // event and receiver UUIDs are toggled on and off based on + // whether or not we are filtering by receiver and event, so + // ignore them here. + event_id: _, + rx_id: _, + attempts, + state, + time_created, + time_completed, + // ignore these as they are used for runtime coordination and + // aren't very useful for showing delivery history + deliverator_id: _, + time_leased: _, + triggered_by, + } = d; + Self { + id: id.into_untyped_uuid(), + trigger: *triggered_by, + state: *state, + attempts: attempts.0, + time_created: *time_created, + time_completed: *time_completed, + } + } + } + + impl<'d, T> From<&'d WebhookDelivery> for WithEventId + where + T: From<&'d WebhookDelivery> + Tabled, + { + fn from(d: &'d WebhookDelivery) -> Self { + Self { event_id: d.event_id.into_untyped_uuid(), inner: T::from(d) } + } + } + + impl<'d, T> From<&'d WebhookDelivery> for WithRxId + where + T: From<&'d WebhookDelivery> + Tabled, + { + fn from(d: &'d WebhookDelivery) -> Self { + Self { receiver_id: d.rx_id.into_untyped_uuid(), inner: T::from(d) } + } + } + + let mut table = match (args.receiver.as_ref(), args.event) { + // Filtered by both receiver and event, so don't display either. + (Some(_), Some(_)) => { + tabled::Table::new(deliveries.iter().map(DeliveryRow::from)) + } + // Filtered by neither receiver nor event, so include both. + (None, None) => tabled::Table::new( + deliveries.iter().map(WithRxId::>::from), + ), + // Filtered by receiver ID only + (Some(_), None) => tabled::Table::new( + deliveries.iter().map(WithEventId::::from), + ), + // Filtered by event ID only + (None, Some(_)) => tabled::Table::new( + deliveries.iter().map(WithRxId::::from), + ), + }; + table + .with(tabled::settings::Style::empty()) + .with(tabled::settings::Padding::new(0, 1, 0, 0)); + + println!("{table}"); + Ok(()) +} + +/// Helper function to look up a webhook receiver with the given name or ID +async fn lookup_webhook_rx( + datastore: &DataStore, + name_or_id: &NameOrId, +) -> anyhow::Result> { + use nexus_db_schema::schema::webhook_receiver::dsl; + + let conn = datastore.pool_connection_for_tests().await?; + match name_or_id { + NameOrId::Id(id) => { + dsl::webhook_receiver + .filter(dsl::id.eq(*id)) + .limit(1) + .select(WebhookReceiver::as_select()) + .get_result_async(&*conn) + .await + } + NameOrId::Name(ref name) => { + dsl::webhook_receiver + .filter(dsl::name.eq(name.to_string())) + .limit(1) + .select(WebhookReceiver::as_select()) + .get_result_async(&*conn) + .await + } + } + .optional() + .with_context(|| format!("loading webhook_receiver {name_or_id}")) +} + +async fn cmd_db_webhook_delivery_info( + datastore: &DataStore, + fetch_opts: &DbFetchOptions, + args: &DeliveryInfoArgs, +) -> anyhow::Result<()> { + use db::model::WebhookDeliveryAttempt; + use nexus_db_schema::schema::webhook_delivery::dsl; + use nexus_db_schema::schema::webhook_delivery_attempt::dsl as attempt_dsl; + + let DeliveryInfoArgs { delivery_id } = args; + let conn = datastore.pool_connection_for_tests().await?; + let delivery = dsl::webhook_delivery + .filter(dsl::id.eq(*delivery_id)) + .limit(1) + .select(WebhookDelivery::as_select()) + .get_result_async(&*conn) + .await + .optional() + .with_context(|| format!("loading webhook delivery {delivery_id}"))? + .ok_or_else(|| { + anyhow::anyhow!("no webhook delivery {delivery_id} exists") + })?; + + const ID: &'static str = "ID"; + const EVENT_ID: &'static str = "event ID"; + const RECEIVER_ID: &'static str = "receiver ID"; + const STATE: &'static str = "state"; + const TRIGGER: &'static str = "triggered by"; + const ATTEMPTS: &'static str = "attempts"; + const TIME_CREATED: &'static str = "created at"; + const TIME_COMPLETED: &'static str = "completed at"; + + const DELIVERATOR_ID: &'static str = "by Nexus"; + const TIME_LEASED: &'static str = "leased at"; + + const WIDTH: usize = const_max_len(&[ + ID, + EVENT_ID, + RECEIVER_ID, + TRIGGER, + STATE, + TIME_CREATED, + TIME_COMPLETED, + DELIVERATOR_ID, + TIME_LEASED, + ATTEMPTS, + ]); + + let WebhookDelivery { + id, + event_id, + rx_id, + triggered_by, + attempts, + time_created, + time_completed, + state, + deliverator_id, + time_leased, + } = delivery; + println!("\n{:=<80}", "== DELIVERY "); + println!(" {ID:>WIDTH$}: {id}"); + println!(" {EVENT_ID:>WIDTH$}: {event_id}"); + println!(" {RECEIVER_ID:>WIDTH$}: {rx_id}"); + println!(" {STATE:>WIDTH$}: {state}"); + println!(" {TRIGGER:>WIDTH$}: {triggered_by}"); + println!(" {TIME_CREATED:>WIDTH$}: {time_created}"); + println!(" {ATTEMPTS}: {}", attempts.0); + + if let Some(completed) = time_completed { + println!("\n{:=<80}", "== DELIVERY COMPLETED "); + println!(" {TIME_COMPLETED:>WIDTH$}: {completed}"); + if let Some(leased) = time_leased { + println!(" {TIME_LEASED:>WIDTH$}: {leased}"); + } else { + println!( + "/!\\ WEIRD: delivery is completed but has no start timestamp?" + ); + } + if let Some(nexus) = deliverator_id { + println!(" {DELIVERATOR_ID:>WIDTH$}: {nexus}"); + } else { + println!("/!\\ WEIRD: delivery is completed but has no Nexus ID?"); + } + } else if let Some(leased) = time_leased { + println!("\n{:=<80}", "== DELIVERY IN PROGRESS "); + println!(" {TIME_LEASED:>WIDTH$}: {leased}"); + + if let Some(nexus) = deliverator_id { + println!(" {DELIVERATOR_ID:>WIDTH$}: {nexus}"); + } else { + println!( + "/!\\ WEIRD: delivery is in progress but has no Nexus ID?" + ); + } + } else if let Some(deliverator) = deliverator_id { + println!( + "/!\\ WEIRD: delivery is not completed or in progress but has \ + Nexus ID {deliverator:?}" + ); + } + + // Okay, now go get attempts for this delivery. + let ctx = || format!("listing delivery attempts for {delivery_id}"); + let attempts = attempt_dsl::webhook_delivery_attempt + .filter(attempt_dsl::delivery_id.eq(*delivery_id)) + .order_by(attempt_dsl::attempt.desc()) + .limit(fetch_opts.fetch_limit.get().into()) + .select(WebhookDeliveryAttempt::as_select()) + .load_async(&*conn) + .await + .with_context(ctx)?; + + check_limit(&attempts, fetch_opts.fetch_limit, ctx); + + if !attempts.is_empty() { + println!("\n{:=<80}", "== DELIVERY ATTEMPT HISTORY "); + + #[derive(Tabled)] + #[tabled(rename_all = "SCREAMING_SNAKE_CASE")] + struct DeliveryAttemptRow { + id: Uuid, + #[tabled(rename = "#")] + attempt: u8, + #[tabled(display_with = "datetime_rfc3339_concise")] + time_created: DateTime, + nexus_id: Uuid, + result: db::model::WebhookDeliveryAttemptResult, + #[tabled(display_with = "display_option_blank")] + status: Option, + #[tabled(display_with = "display_option_blank")] + duration: Option, + } + + let rows = attempts.into_iter().map( + |WebhookDeliveryAttempt { + id, + delivery_id: _, + rx_id: _, + attempt, + result, + response_status, + response_duration, + time_created, + deliverator_id, + }| DeliveryAttemptRow { + id: id.into_untyped_uuid(), + attempt: attempt.0, + time_created, + nexus_id: deliverator_id.into_untyped_uuid(), + result, + status: response_status.map(|u| u.into()), + duration: response_duration, + }, + ); + let mut table = tabled::Table::new(rows); + table + .with(tabled::settings::Style::empty()) + .with(tabled::settings::Padding::new(0, 1, 0, 0)); + println!("{table}"); + } + + Ok(()) +} diff --git a/dev-tools/omdb/src/bin/omdb/helpers.rs b/dev-tools/omdb/src/bin/omdb/helpers.rs index fec9ab80578..efa6f97fde3 100644 --- a/dev-tools/omdb/src/bin/omdb/helpers.rs +++ b/dev-tools/omdb/src/bin/omdb/helpers.rs @@ -5,6 +5,8 @@ //! Utility helpers for the omdb CLI. use anyhow::bail; +use chrono::DateTime; +use chrono::Utc; use clap::ColorChoice; use reedline::DefaultPrompt; use reedline::DefaultPromptSegment; @@ -43,6 +45,23 @@ pub(crate) fn display_option_blank( opt.as_ref().map(|x| x.to_string()).unwrap_or_else(|| "".to_string()) } +// Format a `chrono::DateTime` in RFC3339 with milliseconds precision and using +// `Z` rather than the UTC offset for UTC timestamps, to save a few characters +// of line width in tabular output. +pub(crate) fn datetime_rfc3339_concise(t: &DateTime) -> String { + t.to_rfc3339_opts(chrono::format::SecondsFormat::Millis, true) +} + +// Format an optional `chrono::DateTime` in RFC3339 with milliseconds precision +// and using `Z` rather than the UTC offset for UTC timestamps, to save a few +// characters of line width in tabular output. +pub(crate) fn datetime_opt_rfc3339_concise( + t: &Option>, +) -> String { + t.map(|t| t.to_rfc3339_opts(chrono::format::SecondsFormat::Millis, true)) + .unwrap_or_else(|| "-".to_string()) +} + pub(crate) struct ConfirmationPrompt(Reedline); impl ConfirmationPrompt { From ca69aaf4438380e26c15cf734f05702ed4fc665b Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 29 Apr 2025 09:31:23 -0700 Subject: [PATCH 08/14] [omdb] add `omdb db webhook event info` --- dev-tools/omdb/src/bin/omdb/db/webhook.rs | 315 ++++++++++++++++------ 1 file changed, 226 insertions(+), 89 deletions(-) diff --git a/dev-tools/omdb/src/bin/omdb/db/webhook.rs b/dev-tools/omdb/src/bin/omdb/db/webhook.rs index eb680e414c0..f25ec70b888 100644 --- a/dev-tools/omdb/src/bin/omdb/db/webhook.rs +++ b/dev-tools/omdb/src/bin/omdb/db/webhook.rs @@ -23,11 +23,15 @@ use diesel::OptionalExtension; use diesel::expression::SelectableHelper; use diesel::query_dsl::QueryDsl; use nexus_db_model::WebhookDelivery; +use nexus_db_model::WebhookEvent; use nexus_db_model::WebhookEventClass; use nexus_db_model::WebhookReceiver; use nexus_db_queries::context::OpContext; use nexus_db_queries::db; use nexus_db_queries::db::DataStore; +use nexus_db_schema::schema::webhook_delivery::dsl as delivery_dsl; +use nexus_db_schema::schema::webhook_delivery_attempt::dsl as attempt_dsl; +use nexus_db_schema::schema::webhook_event::dsl as event_dsl; use nexus_types::identity::Resource; use omicron_common::api::external::DataPageParams; use omicron_common::api::external::NameOrId; @@ -52,7 +56,10 @@ enum Commands { command: RxCommands, }, /// Get information on webhook events - Event, + Event { + #[command(subcommand)] + command: EventCommands, + }, /// Get information on webhook delivieries Delivery { #[command(subcommand)] @@ -64,19 +71,19 @@ enum Commands { enum RxCommands { /// List webhook receivers #[clap(alias = "ls")] - List(WebhookRxListArgs), + List(RxListArgs), #[clap(alias = "show")] - Info(WebhookRxInfoArgs), + Info(RxInfoArgs), } #[derive(Debug, Args, Clone)] -struct WebhookRxInfoArgs { +struct RxInfoArgs { receiver: NameOrId, } #[derive(Debug, Args, Clone)] -struct WebhookRxListArgs { +struct RxListArgs { #[clap(long, short = 'a')] start_at: Option, } @@ -125,6 +132,40 @@ struct DeliveryInfoArgs { delivery_id: Uuid, } +#[derive(Debug, Subcommand, Clone)] +enum EventCommands { + /// List webhook events + #[clap(alias = "ls")] + List(EventListArgs), + + /// Show details on a webhook event + #[clap(alias = "show")] + Info(EventInfoArgs), +} + +#[derive(Debug, Args, Clone)] +struct EventListArgs { + /// If set, include event JSON payloads in the output. + /// + /// Note that this results in very wide output. + #[clap(long, short)] + payload: bool, + + /// Include only events created before this timestamp + #[clap(long, short)] + before: Option>, + + /// Include only events created after this timestamp + #[clap(long, short)] + after: Option>, +} + +#[derive(Debug, Args, Clone)] +struct EventInfoArgs { + /// The ID of the event to show + event_id: WebhookEventUuid, +} + pub(super) async fn cmd_db_webhook( opctx: &OpContext, datastore: &DataStore, @@ -144,15 +185,25 @@ pub(super) async fn cmd_db_webhook( Commands::Delivery { command: DeliveryCommands::Info(args) } => { cmd_db_webhook_delivery_info(datastore, fetch_opts, args).await } - Commands::Event => Err(anyhow::anyhow!("not yet implemented, sorry!")), + Commands::Event { command: EventCommands::Info(args) } => { + cmd_db_webhook_event_info(datastore, fetch_opts, args).await + } + Commands::Event { command: EventCommands::List(args) } => { + cmd_db_webhook_event_list(datastore, fetch_opts, args).await + } } } +const ID: &'static str = "ID"; +const TIME_CREATED: &'static str = "created at"; +const TIME_DELETED: &'static str = "deleted at"; +const TIME_MODIFIED: &'static str = "modified at"; + async fn cmd_db_webhook_rx_list( opctx: &OpContext, datastore: &DataStore, fetch_opts: &DbFetchOptions, - args: &WebhookRxListArgs, + args: &RxListArgs, ) -> anyhow::Result<()> { let ctx = || { if let Some(starting_at) = args.start_at { @@ -212,7 +263,7 @@ async fn cmd_db_webhook_rx_list( async fn cmd_db_webhook_rx_info( datastore: &DataStore, fetch_opts: &DbFetchOptions, - args: &WebhookRxInfoArgs, + args: &RxInfoArgs, ) -> anyhow::Result<()> { use nexus_db_schema::schema::webhook_rx_event_glob::dsl as glob_dsl; use nexus_db_schema::schema::webhook_rx_subscription::dsl as subscription_dsl; @@ -226,13 +277,8 @@ async fn cmd_db_webhook_rx_info( anyhow::anyhow!("no webhook receiver {} exists", args.receiver) })?; - const ID: &'static str = "ID"; const NAME: &'static str = "name"; - const DESCRIPTION: &'static str = "description"; - const CREATED: &'static str = "created at"; - const DELETED: &'static str = "deleted at"; - const MODIFIED: &'static str = "modified at"; const ENDPOINT: &'static str = "endpoint"; const GEN: &'static str = "generation"; const EXACT: &'static str = "exact subscriptions"; @@ -245,9 +291,9 @@ async fn cmd_db_webhook_rx_info( ID, NAME, DESCRIPTION, - CREATED, - DELETED, - MODIFIED, + TIME_CREATED, + TIME_DELETED, + TIME_MODIFIED, ENDPOINT, GEN, EXACT, @@ -279,10 +325,10 @@ async fn cmd_db_webhook_rx_info( println!(" {DESCRIPTION:>WIDTH$}: {description}"); println!(" {ENDPOINT:>WIDTH$}: {endpoint}"); println!(); - println!(" {CREATED:>WIDTH$}: {time_created}"); - println!(" {MODIFIED:>WIDTH$}: {time_modified}"); + println!(" {TIME_CREATED:>WIDTH$}: {time_created}"); + println!(" {TIME_MODIFIED:>WIDTH$}: {time_modified}"); if let Some(deleted) = time_deleted { - println!(" {DELETED:>WIDTH$}: {deleted}"); + println!(" {TIME_DELETED:>WIDTH$}: {deleted}"); } println!("\n{:=<80}", "== SECRETS "); @@ -418,7 +464,6 @@ async fn cmd_db_webhook_delivery_list( fetch_opts: &DbFetchOptions, args: &DeliveryListArgs, ) -> anyhow::Result<()> { - use nexus_db_schema::schema::webhook_delivery::dsl as delivery_dsl; let conn = datastore.pool_connection_for_tests().await?; let mut query = delivery_dsl::webhook_delivery .limit(fetch_opts.fetch_limit.get().into()) @@ -457,7 +502,7 @@ async fn cmd_db_webhook_delivery_list( .filter(delivery_dsl::triggered_by.eq_any(args.triggers.clone())); } - let ctx = || "listing webhook receivers"; + let ctx = || "listing webhook deliveries"; let deliveries = query .select(WebhookDelivery::as_select()) @@ -467,19 +512,6 @@ async fn cmd_db_webhook_delivery_list( check_limit(&deliveries, fetch_opts.fetch_limit, ctx); - #[derive(Tabled)] - #[tabled(rename_all = "SCREAMING_SNAKE_CASE")] - struct DeliveryRow { - id: Uuid, - trigger: nexus_db_model::WebhookDeliveryTrigger, - state: nexus_db_model::WebhookDeliveryState, - attempts: u8, - #[tabled(display_with = "datetime_rfc3339_concise")] - time_created: DateTime, - #[tabled(display_with = "datetime_opt_rfc3339_concise")] - time_completed: Option>, - } - #[derive(Tabled)] #[tabled(rename_all = "SCREAMING_SNAKE_CASE")] struct WithEventId { @@ -488,44 +520,6 @@ async fn cmd_db_webhook_delivery_list( event_id: Uuid, } - #[derive(Tabled)] - #[tabled(rename_all = "SCREAMING_SNAKE_CASE")] - struct WithRxId { - #[tabled(inline)] - inner: T, - receiver_id: Uuid, - } - - impl From<&'_ WebhookDelivery> for DeliveryRow { - fn from(d: &WebhookDelivery) -> Self { - let WebhookDelivery { - id, - // event and receiver UUIDs are toggled on and off based on - // whether or not we are filtering by receiver and event, so - // ignore them here. - event_id: _, - rx_id: _, - attempts, - state, - time_created, - time_completed, - // ignore these as they are used for runtime coordination and - // aren't very useful for showing delivery history - deliverator_id: _, - time_leased: _, - triggered_by, - } = d; - Self { - id: id.into_untyped_uuid(), - trigger: *triggered_by, - state: *state, - attempts: attempts.0, - time_created: *time_created, - time_completed: *time_completed, - } - } - } - impl<'d, T> From<&'d WebhookDelivery> for WithEventId where T: From<&'d WebhookDelivery> + Tabled, @@ -535,15 +529,6 @@ async fn cmd_db_webhook_delivery_list( } } - impl<'d, T> From<&'d WebhookDelivery> for WithRxId - where - T: From<&'d WebhookDelivery> + Tabled, - { - fn from(d: &'d WebhookDelivery) -> Self { - Self { receiver_id: d.rx_id.into_untyped_uuid(), inner: T::from(d) } - } - } - let mut table = match (args.receiver.as_ref(), args.event) { // Filtered by both receiver and event, so don't display either. (Some(_), Some(_)) => { @@ -551,7 +536,9 @@ async fn cmd_db_webhook_delivery_list( } // Filtered by neither receiver nor event, so include both. (None, None) => tabled::Table::new( - deliveries.iter().map(WithRxId::>::from), + deliveries + .iter() + .map(DeliveryRowWithRxId::>::from), ), // Filtered by receiver ID only (Some(_), None) => tabled::Table::new( @@ -559,7 +546,7 @@ async fn cmd_db_webhook_delivery_list( ), // Filtered by event ID only (None, Some(_)) => tabled::Table::new( - deliveries.iter().map(WithRxId::::from), + deliveries.iter().map(DeliveryRowWithRxId::::from), ), }; table @@ -570,6 +557,66 @@ async fn cmd_db_webhook_delivery_list( Ok(()) } +#[derive(Tabled)] +#[tabled(rename_all = "SCREAMING_SNAKE_CASE")] +struct DeliveryRow { + id: Uuid, + trigger: nexus_db_model::WebhookDeliveryTrigger, + state: nexus_db_model::WebhookDeliveryState, + attempts: u8, + #[tabled(display_with = "datetime_rfc3339_concise")] + time_created: DateTime, + #[tabled(display_with = "datetime_opt_rfc3339_concise")] + time_completed: Option>, +} + +#[derive(Tabled)] +#[tabled(rename_all = "SCREAMING_SNAKE_CASE")] +struct DeliveryRowWithRxId { + #[tabled(inline)] + inner: T, + receiver_id: Uuid, +} + +impl From<&'_ WebhookDelivery> for DeliveryRow { + fn from(d: &WebhookDelivery) -> Self { + let WebhookDelivery { + id, + // event and receiver UUIDs are toggled on and off based on + // whether or not we are filtering by receiver and event, so + // ignore them here. + event_id: _, + rx_id: _, + attempts, + state, + time_created, + time_completed, + // ignore these as they are used for runtime coordination and + // aren't very useful for showing delivery history + deliverator_id: _, + time_leased: _, + triggered_by, + } = d; + Self { + id: id.into_untyped_uuid(), + trigger: *triggered_by, + state: *state, + attempts: attempts.0, + time_created: *time_created, + time_completed: *time_completed, + } + } +} + +impl<'d, T> From<&'d WebhookDelivery> for DeliveryRowWithRxId +where + T: From<&'d WebhookDelivery> + Tabled, +{ + fn from(d: &'d WebhookDelivery) -> Self { + Self { receiver_id: d.rx_id.into_untyped_uuid(), inner: T::from(d) } + } +} + /// Helper function to look up a webhook receiver with the given name or ID async fn lookup_webhook_rx( datastore: &DataStore, @@ -606,13 +653,11 @@ async fn cmd_db_webhook_delivery_info( args: &DeliveryInfoArgs, ) -> anyhow::Result<()> { use db::model::WebhookDeliveryAttempt; - use nexus_db_schema::schema::webhook_delivery::dsl; - use nexus_db_schema::schema::webhook_delivery_attempt::dsl as attempt_dsl; let DeliveryInfoArgs { delivery_id } = args; let conn = datastore.pool_connection_for_tests().await?; - let delivery = dsl::webhook_delivery - .filter(dsl::id.eq(*delivery_id)) + let delivery = delivery_dsl::webhook_delivery + .filter(delivery_dsl::id.eq(*delivery_id)) .limit(1) .select(WebhookDelivery::as_select()) .get_result_async(&*conn) @@ -623,13 +668,11 @@ async fn cmd_db_webhook_delivery_info( anyhow::anyhow!("no webhook delivery {delivery_id} exists") })?; - const ID: &'static str = "ID"; const EVENT_ID: &'static str = "event ID"; const RECEIVER_ID: &'static str = "receiver ID"; const STATE: &'static str = "state"; const TRIGGER: &'static str = "triggered by"; const ATTEMPTS: &'static str = "attempts"; - const TIME_CREATED: &'static str = "created at"; const TIME_COMPLETED: &'static str = "completed at"; const DELIVERATOR_ID: &'static str = "by Nexus"; @@ -764,3 +807,97 @@ async fn cmd_db_webhook_delivery_info( Ok(()) } + +async fn cmd_db_webhook_event_list( + _datastore: &DataStore, + _fetch_opts: &DbFetchOptions, + _args: &EventListArgs, +) -> anyhow::Result<()> { + anyhow::bail!("not yet implemented"); +} + +async fn cmd_db_webhook_event_info( + datastore: &DataStore, + fetch_opts: &DbFetchOptions, + args: &EventInfoArgs, +) -> anyhow::Result<()> { + let EventInfoArgs { event_id } = args; + let conn = datastore.pool_connection_for_tests().await?; + + let event = event_dsl::webhook_event + .filter(event_dsl::id.eq(event_id.into_untyped_uuid())) + .select(WebhookEvent::as_select()) + .limit(1) + .get_result_async(&*conn) + .await + .optional() + .with_context(|| format!("loading webhook event {event_id}"))? + .ok_or_else(|| anyhow::anyhow!("no webhook event {event_id} exists"))?; + + let WebhookEvent { + identity: + db::model::WebhookEventIdentity { id, time_created, time_modified }, + time_dispatched, + event_class, + event, + num_dispatched, + } = event; + + const CLASS: &str = "class"; + const TIME_DISPATCHED: &str = "fully dispatched at"; + const NUM_DISPATCHED: &str = "deliveries dispatched"; + + const WIDTH: usize = const_max_len(&[ + ID, + TIME_CREATED, + TIME_MODIFIED, + TIME_DISPATCHED, + NUM_DISPATCHED, + CLASS, + ]); + + println!("\n{:=<80}", "== EVENT "); + println!(" {ID:>WIDTH$}: {id:?}"); + println!(" {CLASS:>WIDTH$}: {event_class}"); + println!(" {TIME_CREATED:>WIDTH$}: {time_created}"); + println!(" {TIME_MODIFIED:>WIDTH$}: {time_modified}"); + println!(); + println!(" {NUM_DISPATCHED:>WIDTH$}: {num_dispatched}"); + if let Some(t) = time_dispatched { + println!(" {TIME_DISPATCHED:>WIDTH$}: {t}") + } + + println!("\n{:=<80}", "== EVENT PAYLOAD "); + serde_json::to_writer_pretty(std::io::stdout(), &event).with_context( + || format!("failed to serialize event payload: {event:?}"), + )?; + + let ctx = || format!("listing deliveries for event {event_id:?}"); + let deliveries = delivery_dsl::webhook_delivery + .limit(fetch_opts.fetch_limit.get().into()) + .order_by(delivery_dsl::time_created.desc()) + .select(WebhookDelivery::as_select()) + .load_async(&*conn) + .await + .with_context(ctx)?; + + check_limit(&deliveries, fetch_opts.fetch_limit, ctx); + + if !deliveries.is_empty() { + println!("\n{:=<80}", "== DELIVERIES "); + let mut table = tabled::Table::new( + deliveries.iter().map(DeliveryRowWithRxId::::from), + ); + table + .with(tabled::settings::Style::empty()) + .with(tabled::settings::Padding::new(0, 1, 0, 0)); + println!("{table}") + } else if num_dispatched > 0 { + println!( + "/!\\ WEIRD: event claims to have {num_dispatched} deliveries \ + dispatched, but no delivery records were found" + ) + } + + Ok(()) +} From a30c266e7deff25db8ceb6b61208e41e297ae097 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 29 Apr 2025 10:00:55 -0700 Subject: [PATCH 09/14] [omdb] add `omdb db webhook event list` --- dev-tools/omdb/src/bin/omdb/db/webhook.rs | 142 +++++++++++++++++++++- 1 file changed, 138 insertions(+), 4 deletions(-) diff --git a/dev-tools/omdb/src/bin/omdb/db/webhook.rs b/dev-tools/omdb/src/bin/omdb/db/webhook.rs index f25ec70b888..0354ef6a807 100644 --- a/dev-tools/omdb/src/bin/omdb/db/webhook.rs +++ b/dev-tools/omdb/src/bin/omdb/db/webhook.rs @@ -158,6 +158,22 @@ struct EventListArgs { /// Include only events created after this timestamp #[clap(long, short)] after: Option>, + + /// Include only events fully dispatched before this timestamp + #[clap(long)] + dispatched_before: Option>, + + /// Include only events fully dispatched after this timestamp + #[clap(long)] + dispatched_after: Option>, + + /// If `true`, include only events that have been fully dispatched. + /// If `false`, include only events that have not been fully dispatched. + /// + /// If this argument is not provided, both dispatched and un-dispatched + /// events are included. + #[clap(long, short)] + dispatched: Option, } #[derive(Debug, Args, Clone)] @@ -809,11 +825,129 @@ async fn cmd_db_webhook_delivery_info( } async fn cmd_db_webhook_event_list( - _datastore: &DataStore, - _fetch_opts: &DbFetchOptions, - _args: &EventListArgs, + datastore: &DataStore, + fetch_opts: &DbFetchOptions, + args: &EventListArgs, ) -> anyhow::Result<()> { - anyhow::bail!("not yet implemented"); + let EventListArgs { + payload, + before, + after, + dispatched_before, + dispatched_after, + dispatched, + } = args; + + if let (Some(before), Some(after)) = (before, after) { + anyhow::ensure!( + after < before, + "if both `--after` and `--before` are included, after must be + earlier than before" + ); + } + + if let (Some(before), Some(after)) = (dispatched_before, dispatched_after) { + anyhow::ensure!( + after < before, + "if both `--dispatched-after` and `--dispatched-before` are + included, after must be earlier than before" + ); + } + + let conn = datastore.pool_connection_for_tests().await?; + + let mut query = event_dsl::webhook_event + .limit(fetch_opts.fetch_limit.get().into()) + .order_by(event_dsl::time_created.asc()) + .select(WebhookEvent::as_select()) + .into_boxed(); + + if let Some(before) = before { + query = query.filter(event_dsl::time_created.lt(*before)); + } + + if let Some(after) = after { + query = query.filter(event_dsl::time_created.gt(*after)); + } + + if let Some(before) = dispatched_before { + query = query.filter(event_dsl::time_dispatched.lt(*before)); + } + + if let Some(after) = dispatched_after { + query = query.filter(event_dsl::time_dispatched.gt(*after)); + } + + if let Some(dispatched) = dispatched { + if *dispatched { + query = query.filter(event_dsl::time_dispatched.is_not_null()); + } else { + query = query.filter(event_dsl::time_dispatched.is_null()); + } + } + + let ctx = || "loading webhook events"; + let events = query.load_async(&*conn).await.with_context(ctx)?; + + check_limit(&events, fetch_opts.fetch_limit, ctx); + + #[derive(Tabled)] + #[tabled(rename_all = "SCREAMING_SNAKE_CASE")] + struct EventRow { + id: Uuid, + class: WebhookEventClass, + #[tabled(display_with = "datetime_rfc3339_concise")] + time_created: DateTime, + #[tabled(display_with = "datetime_opt_rfc3339_concise")] + time_dispatched: Option>, + dispatched: i64, + } + + impl From<&'_ WebhookEvent> for EventRow { + fn from(event: &'_ WebhookEvent) -> Self { + Self { + id: event.identity.id.into_untyped_uuid(), + class: event.event_class, + time_created: event.identity.time_created, + time_dispatched: event.time_dispatched, + dispatched: event.num_dispatched, + } + } + } + + #[derive(Tabled)] + #[tabled(rename_all = "SCREAMING_SNAKE_CASE")] + struct EventRowWithPayload { + #[tabled(inline)] + row: EventRow, + payload: String, + } + + let mut table = if *payload { + let rows = events.iter().map(|event| { + let payload = match serde_json::to_string(&event.event) { + Ok(payload) => payload, + Err(e) => { + eprintln!( + "/!\\ failed to serialize payload for {:?}: {e}", + event.identity.id + ); + "".to_string() + } + }; + EventRowWithPayload { row: event.into(), payload } + }); + tabled::Table::new(rows) + } else { + let rows = events.iter().map(EventRow::from); + tabled::Table::new(rows) + }; + table + .with(tabled::settings::Style::empty()) + .with(tabled::settings::Padding::new(0, 1, 0, 0)); + println!("{table}"); + + Ok(()) } async fn cmd_db_webhook_event_info( From d16ad9ad136b61707407cde724fafccdf962fbb0 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 29 Apr 2025 10:02:30 -0700 Subject: [PATCH 10/14] [omdb] fix delivery list after being before --- dev-tools/omdb/src/bin/omdb/db/webhook.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev-tools/omdb/src/bin/omdb/db/webhook.rs b/dev-tools/omdb/src/bin/omdb/db/webhook.rs index 0354ef6a807..1ae162491a6 100644 --- a/dev-tools/omdb/src/bin/omdb/db/webhook.rs +++ b/dev-tools/omdb/src/bin/omdb/db/webhook.rs @@ -497,7 +497,7 @@ async fn cmd_db_webhook_delivery_list( query = query.filter(delivery_dsl::time_created.lt(before)); } - if let Some(after) = args.before { + if let Some(after) = args.after { query = query.filter(delivery_dsl::time_created.gt(after)); } From 17e9d9288826eb3a7375f1e2295195b11d2ba40b Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 29 Apr 2025 10:07:54 -0700 Subject: [PATCH 11/14] [omdb] fix delivery list event ID arg not being used --- dev-tools/omdb/src/bin/omdb/db/webhook.rs | 27 ++++++++++++++--------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/dev-tools/omdb/src/bin/omdb/db/webhook.rs b/dev-tools/omdb/src/bin/omdb/db/webhook.rs index 1ae162491a6..ff253384193 100644 --- a/dev-tools/omdb/src/bin/omdb/db/webhook.rs +++ b/dev-tools/omdb/src/bin/omdb/db/webhook.rs @@ -480,28 +480,31 @@ async fn cmd_db_webhook_delivery_list( fetch_opts: &DbFetchOptions, args: &DeliveryListArgs, ) -> anyhow::Result<()> { + let DeliveryListArgs { before, after, receiver, states, triggers, event } = + args; let conn = datastore.pool_connection_for_tests().await?; let mut query = delivery_dsl::webhook_delivery .limit(fetch_opts.fetch_limit.get().into()) .order_by(delivery_dsl::time_created.desc()) .into_boxed(); - if let (Some(before), Some(after)) = (args.before, args.after) { + if let (Some(before), Some(after)) = (before, after) { anyhow::ensure!( after < before, - "if both after and before are included, after must be earlier than before" + "if both `--after` and `--before` are included, after must be + earlier than before" ); } - if let Some(before) = args.before { + if let Some(before) = *before { query = query.filter(delivery_dsl::time_created.lt(before)); } - if let Some(after) = args.after { + if let Some(after) = *after { query = query.filter(delivery_dsl::time_created.gt(after)); } - if let Some(ref receiver) = args.receiver { + if let Some(ref receiver) = receiver { let rx = lookup_webhook_rx(datastore, receiver).await?.ok_or_else(|| { anyhow::anyhow!("no webhook receiver {receiver} found") @@ -509,13 +512,17 @@ async fn cmd_db_webhook_delivery_list( query = query.filter(delivery_dsl::rx_id.eq(rx.identity.id)); } - if !args.states.is_empty() { - query = query.filter(delivery_dsl::state.eq_any(args.states.clone())); + if !states.is_empty() { + query = query.filter(delivery_dsl::state.eq_any(states.clone())); + } + + if !triggers.is_empty() { + query = + query.filter(delivery_dsl::triggered_by.eq_any(triggers.clone())); } - if !args.triggers.is_empty() { - query = query - .filter(delivery_dsl::triggered_by.eq_any(args.triggers.clone())); + if let Some(id) = event { + query = query.filter(delivery_dsl::event_id.eq(id.into_untyped_uuid())); } let ctx = || "listing webhook deliveries"; From 2f4f98b30ffc0e9634d1667491756808bef2ad05 Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 29 Apr 2025 11:25:57 -0700 Subject: [PATCH 12/14] Update webhook.rs Co-authored-by: Sean Klein --- dev-tools/omdb/src/bin/omdb/db/webhook.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dev-tools/omdb/src/bin/omdb/db/webhook.rs b/dev-tools/omdb/src/bin/omdb/db/webhook.rs index ff253384193..fe45c08b9a4 100644 --- a/dev-tools/omdb/src/bin/omdb/db/webhook.rs +++ b/dev-tools/omdb/src/bin/omdb/db/webhook.rs @@ -223,7 +223,7 @@ async fn cmd_db_webhook_rx_list( ) -> anyhow::Result<()> { let ctx = || { if let Some(starting_at) = args.start_at { - format!("listing webhook receivers (starting at {starting_at}") + format!("listing webhook receivers (starting at {starting_at})") } else { "listing webhook_receivers".to_string() } From 0596f7d8153aeecfbe83b1a135afc6417f0ca38d Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Tue, 29 Apr 2025 11:49:58 -0700 Subject: [PATCH 13/14] generate parse errors programmatically --- nexus/types/src/external_api/views.rs | 62 +++++++++++++++++++++++---- 1 file changed, 53 insertions(+), 9 deletions(-) diff --git a/nexus/types/src/external_api/views.rs b/nexus/types/src/external_api/views.rs index fcfd69b06e7..1011fa68a21 100644 --- a/nexus/types/src/external_api/views.rs +++ b/nexus/types/src/external_api/views.rs @@ -26,6 +26,7 @@ use std::collections::BTreeMap; use std::collections::BTreeSet; use std::fmt; use std::net::IpAddr; +use std::sync::LazyLock; use strum::{EnumIter, IntoEnumIterator}; use url::Url; use uuid::Uuid; @@ -575,7 +576,7 @@ pub struct Sled { #[serde(flatten)] pub identity: AssetIdentityMetadata, pub baseboard: Baseboard, - /// The rack to which this Sled is currently attached + /// The rack to which this Sled is currently attached* pub rack_id: Uuid, /// The operator-defined policy of a sled. pub policy: SledPolicy, @@ -1177,15 +1178,15 @@ impl fmt::Display for WebhookDeliveryState { impl std::str::FromStr for WebhookDeliveryState { type Err = Error; fn from_str(s: &str) -> Result { + static EXPECTED_ONE_OF: LazyLock = + LazyLock::new(expected_one_of::); + for &v in Self::ALL { if s.trim().eq_ignore_ascii_case(v.as_str()) { return Ok(v); } } - Err(Error::invalid_value( - "WebhookDeliveryState", - "expected one of 'pending', 'delivered', or 'failed'", - )) + Err(Error::invalid_value("WebhookDeliveryState", &*EXPECTED_ONE_OF)) } } @@ -1230,15 +1231,15 @@ impl fmt::Display for WebhookDeliveryTrigger { impl std::str::FromStr for WebhookDeliveryTrigger { type Err = Error; fn from_str(s: &str) -> Result { + static EXPECTED_ONE_OF: LazyLock = + LazyLock::new(expected_one_of::); + for &v in ::VARIANTS { if s.trim().eq_ignore_ascii_case(v.as_str()) { return Ok(v); } } - Err(Error::invalid_value( - "WebhookDeliveryTrigger", - "expected one of 'event', 'resend', or 'probe'", - )) + Err(Error::invalid_value("WebhookDeliveryTrigger", &*EXPECTED_ONE_OF)) } } @@ -1365,3 +1366,46 @@ pub struct TargetRelease { /// The source of the target release. pub release_source: TargetReleaseSource, } + +fn expected_one_of() -> String { + use std::fmt::Write; + let mut msg = "expected one of:".to_string(); + let mut variants = T::VARIANTS.iter().peekable(); + while let Some(variant) = variants.next() { + if variants.peek().is_some() { + write!(&mut msg, " '{variant}',").unwrap(); + } else { + write!(&mut msg, " or '{variant}'").unwrap(); + } + } + msg +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_expected_one_of() { + // Test this using an enum that we declare here, so that the test + // needn't be updated if the types which actually use this helper + // change. + #[derive(Debug, strum::VariantArray)] + enum Test { + Foo, + Bar, + Baz, + } + + impl fmt::Display for Test { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Debug::fmt(self, f) + } + } + + assert_eq!( + expected_one_of::(), + "expected one of: 'Foo', 'Bar', or 'Baz'" + ); + } +} From 9020a44bdd6227d310f2934b795508fba84e4acf Mon Sep 17 00:00:00 2001 From: Eliza Weisman Date: Wed, 30 Apr 2025 11:00:41 -0700 Subject: [PATCH 14/14] oops... --- nexus/types/src/external_api/views.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nexus/types/src/external_api/views.rs b/nexus/types/src/external_api/views.rs index 1011fa68a21..b9cb1c52790 100644 --- a/nexus/types/src/external_api/views.rs +++ b/nexus/types/src/external_api/views.rs @@ -576,7 +576,7 @@ pub struct Sled { #[serde(flatten)] pub identity: AssetIdentityMetadata, pub baseboard: Baseboard, - /// The rack to which this Sled is currently attached* + /// The rack to which this Sled is currently attached pub rack_id: Uuid, /// The operator-defined policy of a sled. pub policy: SledPolicy,