Skip to content

Commit

Permalink
move status/flare component into non-component standalone task
Browse files Browse the repository at this point in the history
  • Loading branch information
tobz committed Feb 10, 2025
1 parent 5c8f7f7 commit fa9f680
Show file tree
Hide file tree
Showing 11 changed files with 264 additions and 252 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions bin/agent-data-plane/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ fips = ["saluki-app/tls-fips"]
[dependencies]
async-trait = { workspace = true }
bytesize = { workspace = true }
chrono = { workspace = true }
datadog-protos = { workspace = true }
memory-accounting = { workspace = true }
rand = { workspace = true }
saluki-app = { workspace = true, features = ["full"] }
saluki-components = { workspace = true }
saluki-config = { workspace = true }
Expand All @@ -34,7 +37,9 @@ tokio = { workspace = true, features = [
"signal",
] }
tokio-rustls = { workspace = true }
tonic = { workspace = true }
tracing = { workspace = true }
uuid = { workspace = true, features = ["std", "v7"] }

[target.'cfg(target_os = "linux")'.dependencies]
tikv-jemalloc-ctl = { workspace = true, features = ["use_std"] }
Expand Down
58 changes: 39 additions & 19 deletions bin/agent-data-plane/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,55 @@ use std::future::pending;

use saluki_app::api::APIBuilder;
use saluki_config::GenericConfiguration;
use saluki_error::{ErrorContext as _, GenericError};
use saluki_error::{generic_error, ErrorContext as _, GenericError};
use saluki_io::net::ListenAddress;
use tracing::{error, info};

mod remote_agent;
use self::remote_agent::RemoteAgentHelperConfiguration;

const PRIMARY_UNPRIVILEGED_API_PORT: u16 = 5100;
const PRIMARY_PRIVILEGED_API_PORT: u16 = 5101;

pub async fn configure_and_spawn_api_endpoints(config: &GenericConfiguration, unprivileged_api: APIBuilder, privileged_api: APIBuilder) -> Result<(), GenericError> {
spawn_unprivileged_api(config, unprivileged_api).await?;
spawn_privileged_api(config, privileged_api).await?;

Ok(())
}

async fn spawn_unprivileged_api(
configuration: &GenericConfiguration, api_builder: APIBuilder,
pub async fn configure_and_spawn_api_endpoints(
config: &GenericConfiguration, unprivileged_api: APIBuilder, mut privileged_api: APIBuilder,
) -> Result<(), GenericError> {
let api_listen_address = configuration
let api_listen_address = config
.try_get_typed("api_listen_address")
.error_context("Failed to get API listen address.")?
.unwrap_or_else(|| ListenAddress::any_tcp(PRIMARY_UNPRIVILEGED_API_PORT));

let secure_api_listen_address = config
.try_get_typed("secure_api_listen_address")
.error_context("Failed to get secure API listen address.")?
.unwrap_or_else(|| ListenAddress::any_tcp(PRIMARY_PRIVILEGED_API_PORT));

// When not in standalone mode, install the necessary components for registering ourselves with the Datadog Agent as
// a "remote agent", which wires up ADP to allow the Datadog Agent to query it for status and flare information.
let in_standalone_mode = config.get_typed_or_default::<bool>("adp.standalone_mode");
if !in_standalone_mode {
let local_secure_api_listen_addr = secure_api_listen_address
.as_local_connect_addr()
.ok_or_else(|| generic_error!("Failed to get local secure API listen address to advertise."))?;

// Build and spawn our helper task for registering ourselves with the Datadog Agent as a remote agent.
let remote_agent_config =
RemoteAgentHelperConfiguration::from_configuration(config, local_secure_api_listen_addr).await?;
let remote_agent_service = remote_agent_config.spawn().await;

// Register our Remote Agent gRPC service with the privileged API.
privileged_api = privileged_api.with_grpc_service(remote_agent_service);
}

spawn_unprivileged_api(unprivileged_api, api_listen_address).await?;
spawn_privileged_api(privileged_api, secure_api_listen_address).await?;

Ok(())
}

async fn spawn_unprivileged_api(
api_builder: APIBuilder, api_listen_address: ListenAddress,
) -> Result<(), GenericError> {
// TODO: Use something better than `pending()`... perhaps something like a more generalized
// `ComponentShutdownCoordinator` that allows for triggering and waiting for all attached tasks to signal that
// they've shutdown.
Expand All @@ -38,14 +65,7 @@ async fn spawn_unprivileged_api(
Ok(())
}

async fn spawn_privileged_api(
configuration: &GenericConfiguration, api_builder: APIBuilder,
) -> Result<(), GenericError> {
let api_listen_address = configuration
.try_get_typed("secure_api_listen_address")
.error_context("Failed to get secure API listen address.")?
.unwrap_or_else(|| ListenAddress::any_tcp(PRIMARY_PRIVILEGED_API_PORT));

async fn spawn_privileged_api(api_builder: APIBuilder, api_listen_address: ListenAddress) -> Result<(), GenericError> {
// TODO: Use something better than `pending()`... perhaps something like a more generalized
// `ComponentShutdownCoordinator` that allows for triggering and waiting for all attached tasks to signal that
// they've shutdown.
Expand Down
127 changes: 127 additions & 0 deletions bin/agent-data-plane/src/api/remote_agent/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
use std::time::Duration;
use std::{collections::HashMap, net::SocketAddr};

use async_trait::async_trait;
use chrono::{DateTime, Utc};
use datadog_protos::agent::{
GetFlareFilesRequest, GetFlareFilesResponse, GetStatusDetailsRequest, GetStatusDetailsResponse, RemoteAgent,
RemoteAgentServer, StatusSection,
};
use rand::{distributions::Alphanumeric, thread_rng, Rng};
use saluki_config::GenericConfiguration;
use saluki_env::helpers::remote_agent::RemoteAgentClient;
use saluki_error::GenericError;
use tokio::time::{interval, MissedTickBehavior};
use tracing::debug;
use uuid::Uuid;

/// Remote Agent helper configuration.
pub struct RemoteAgentHelperConfiguration {
id: String,
display_name: String,
local_api_listen_addr: SocketAddr,
client: RemoteAgentClient,
}

impl RemoteAgentHelperConfiguration {
/// Creates a new `RemoteAgentHelperConfiguration` from the given configuration.
pub async fn from_configuration(
config: &GenericConfiguration, local_api_listen_addr: SocketAddr,
) -> Result<Self, GenericError> {
let app_details = saluki_metadata::get_app_details();
let formatted_full_name = app_details
.full_name()
.replace(" ", "-")
.replace("_", "-")
.to_lowercase();
let client = RemoteAgentClient::from_configuration(config).await?;

Ok(Self {
id: format!("{}-{}", formatted_full_name, Uuid::now_v7()),
display_name: formatted_full_name,
local_api_listen_addr,
client,
})
}

/// Spawns the remote agent helper task.
///
/// The spawned task ensures that this process is registered as a Remote Agent with the configured Datadog Agent
/// instance. Additionally, an implementation of the `RemoteAgent` gRPC service is returned that must be installed
/// on the API server that is listening at `local_api_listen_addr`.
pub async fn spawn(self) -> RemoteAgentServer<RemoteAgentImpl> {
let service_impl = RemoteAgentImpl { started: Utc::now() };
let service = RemoteAgentServer::new(service_impl);

tokio::spawn(run_remote_agent_helper(
self.id,
self.display_name,
self.local_api_listen_addr,
self.client,
));

service
}
}

async fn run_remote_agent_helper(
id: String, display_name: String, local_api_listen_addr: SocketAddr, mut client: RemoteAgentClient,
) {
let local_api_listen_addr = local_api_listen_addr.to_string();
let auth_token: String = thread_rng()
.sample_iter(&Alphanumeric)
.take(64)
.map(char::from)
.collect();

let mut register_agent = interval(Duration::from_secs(10));
register_agent.set_missed_tick_behavior(MissedTickBehavior::Delay);

debug!("Remote Agent helper started.");

loop {
register_agent.tick().await;
match client
.register_remote_agent_request(&id, &display_name, &local_api_listen_addr, &auth_token)
.await
{
Ok(resp) => {
let new_refresh_interval = resp.into_inner().recommended_refresh_interval_secs;
register_agent.reset_after(Duration::from_secs(new_refresh_interval as u64));
debug!("Refreshed registration with Datadog Agent");
}
Err(e) => {
debug!("Failed to refresh registration with Datadog Agent: {}", e);
}
}
}
}

#[derive(Default)]
pub struct RemoteAgentImpl {
started: DateTime<Utc>,
}

#[async_trait]
impl RemoteAgent for RemoteAgentImpl {
async fn get_status_details(
&self, _request: tonic::Request<GetStatusDetailsRequest>,
) -> Result<tonic::Response<GetStatusDetailsResponse>, tonic::Status> {
let mut status_fields = HashMap::new();
status_fields.insert("Started".to_string(), self.started.to_rfc3339());
let response = GetStatusDetailsResponse {
main_section: Some(StatusSection { fields: status_fields }),
named_sections: HashMap::new(),
};
Ok(tonic::Response::new(response))
}

async fn get_flare_files(
&self, _request: tonic::Request<GetFlareFilesRequest>,
) -> Result<tonic::Response<GetFlareFilesResponse>, tonic::Status> {
let response = GetFlareFilesResponse {
files: HashMap::default(),
};
Ok(tonic::Response::new(response))
}
}
37 changes: 8 additions & 29 deletions bin/agent-data-plane/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,12 @@
#![deny(warnings)]
#![deny(missing_docs)]
use std:: time::{Duration, Instant};
use std::time::{Duration, Instant};

use memory_accounting::{ComponentBounds, ComponentRegistry};
use saluki_app::{api::APIBuilder, logging::LoggingAPIHandler, prelude::*};
use saluki_components::{
destinations::{
new_remote_agent_service, DatadogEventsServiceChecksConfiguration, DatadogMetricsConfiguration,
DatadogStatusFlareConfiguration, PrometheusConfiguration,
},
destinations::{DatadogEventsServiceChecksConfiguration, DatadogMetricsConfiguration, PrometheusConfiguration},
sources::{DogStatsDConfiguration, InternalMetricsConfiguration},
transforms::{
AggregateConfiguration, ChainedConfiguration, DogstatsDPrefixFilterConfiguration, HostEnrichmentConfiguration,
Expand Down Expand Up @@ -120,7 +117,6 @@ async fn run(started: Instant, logging_api_handler: LoggingAPIHandler) -> Result

let privileged_api = APIBuilder::new()
.with_self_signed_tls()
.with_grpc_service(new_remote_agent_service())
.with_handler(logging_api_handler)
.with_optional_handler(env_provider.workload_api_handler());

Expand Down Expand Up @@ -226,40 +222,23 @@ async fn create_topology(
.connect_component("dd_metrics_out", ["enrich"])?
.connect_component("dd_events_sc_out", ["dsd_in.events", "dsd_in.service_checks"])?;

// When telemetry is enabled, we need to collect internal metrics, so add those components and route them here.
let telemetry_enabled = configuration.get_typed_or_default::<bool>("telemetry_enabled");
let in_standalone_mode = configuration.get_typed_or_default::<bool>("adp.standalone_mode");

// When telemetry is enabled, or we're not in standalone mode, we need to collect internal metrics, so add those
// components and route them here.
if telemetry_enabled || !in_standalone_mode {
if telemetry_enabled {
let int_metrics_config = InternalMetricsConfiguration;
let int_metrics_remap_config = AgentTelemetryRemapperConfiguration::new();

blueprint
.add_source("internal_metrics_in", int_metrics_config)?
.add_transform("internal_metrics_remap", int_metrics_remap_config)?
.connect_component("internal_metrics_remap", ["internal_metrics_in"])?;
}

// When not in standalone mode, install the necessary components for registering ourselves with the Datadog Agent as
// a "remote agent", which wires up ADP to allow the Datadog Agent to query it for status and flare information.
if !in_standalone_mode {
let status_configuration = DatadogStatusFlareConfiguration::from_configuration(configuration).await?;
blueprint
.add_destination("dd_status_flare_out", status_configuration)?
.connect_component("dd_status_flare_out", ["internal_metrics_remap"])?;
}

// When internal telemetry is enabled, expose a Prometheus scrape endpoint that the Datadog Agent will pull from.
if telemetry_enabled {
let prometheus_config = PrometheusConfiguration::from_configuration(configuration)?;

info!(
"Serving telemetry scrape endpoint on {}.",
prometheus_config.listen_address()
);

blueprint
.add_source("internal_metrics_in", int_metrics_config)?
.add_transform("internal_metrics_remap", int_metrics_remap_config)?
.add_destination("internal_metrics_out", prometheus_config)?
.connect_component("internal_metrics_remap", ["internal_metrics_in"])?
.connect_component("internal_metrics_out", ["internal_metrics_remap"])?;
}

Expand Down
4 changes: 0 additions & 4 deletions lib/saluki-components/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ async-trait = { workspace = true }
bitmask-enum = { workspace = true }
bytes = { workspace = true }
bytesize = { workspace = true }
chrono = { workspace = true }
datadog-protos = { workspace = true }
ddsketch-agent = { workspace = true }
float-cmp = { workspace = true, features = ["ratio"] }
Expand All @@ -35,7 +34,6 @@ paste = { workspace = true }
pin-project = { workspace = true }
protobuf = { workspace = true }
quanta = { workspace = true }
rand = { workspace = true }
regex = { workspace = true }
rustls = { workspace = true }
saluki-config = { workspace = true }
Expand All @@ -57,11 +55,9 @@ snafu = { workspace = true }
stringtheory = { workspace = true }
tokio = { workspace = true, features = ["io-util", "macros", "net", "rt", "rt-multi-thread", "signal", "sync"] }
tokio-util = { workspace = true }
tonic = { workspace = true }
tower = { workspace = true, features = ["limit", "retry", "timeout", "util"] }
tracing = { workspace = true }
url = { workspace = true }
uuid = { workspace = true, features = ["std", "v7"] }

[dev-dependencies]
saluki-metrics = { workspace = true, features = ["test"] }
3 changes: 0 additions & 3 deletions lib/saluki-components/src/destinations/datadog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,3 @@ pub use self::events_service_checks::DatadogEventsServiceChecksConfiguration;

mod metrics;
pub use self::metrics::DatadogMetricsConfiguration;

mod status_flare;
pub use self::status_flare::{new_remote_agent_service, DatadogStatusFlareConfiguration};
Loading

0 comments on commit fa9f680

Please sign in to comment.