Skip to content

Commit

Permalink
enhancement: expose debug endpoint for dumping tag store for Remote A…
Browse files Browse the repository at this point in the history
…gent workload provider (#460)
  • Loading branch information
tobz authored Jan 30, 2025
1 parent 3293da9 commit f5ce719
Show file tree
Hide file tree
Showing 12 changed files with 369 additions and 46 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion bin/agent-data-plane/src/env_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use memory_accounting::ComponentRegistry;
use saluki_config::GenericConfiguration;
use saluki_env::{
host::providers::{BoxedHostProvider, FixedHostProvider, RemoteAgentHostProvider},
workload::providers::RemoteAgentWorkloadProvider,
workload::providers::{RemoteAgentWorkloadAPIHandler, RemoteAgentWorkloadProvider},
EnvironmentProvider,
};
use saluki_error::GenericError;
Expand Down Expand Up @@ -66,6 +66,14 @@ impl ADPEnvironmentProvider {
workload_provider,
})
}

/// Returns an API handler for interacting with the underlying data stores powering the workload provider, if one
/// has been configured.
///
/// See [`RemoteAgentWorkloadAPIHandler`] for more information about routes and responses.
pub fn workload_api_handler(&self) -> Option<RemoteAgentWorkloadAPIHandler> {
self.workload_provider.as_ref().map(|provider| provider.api_handler())
}
}

impl EnvironmentProvider for ADPEnvironmentProvider {
Expand Down
9 changes: 5 additions & 4 deletions bin/agent-data-plane/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ async fn run(started: Instant, logging_api_handler: LoggingAPIHandler) -> Result

// Create a simple pipeline that runs a DogStatsD source, an aggregation transform to bucket into 10 second windows,
// and a Datadog Metrics destination that forwards aggregated buckets to the Datadog Platform.
let blueprint = create_topology(&configuration, env_provider, &component_registry).await?;
let blueprint = create_topology(&configuration, &env_provider, &component_registry).await?;

// Build our unprivileged and privileged API server.
//
Expand All @@ -122,7 +122,8 @@ async fn run(started: Instant, logging_api_handler: LoggingAPIHandler) -> Result
let privileged_api = APIBuilder::new()
.with_self_signed_tls()
.with_grpc_service(new_remote_agent_service())
.with_handler(logging_api_handler);
.with_handler(logging_api_handler)
.with_optional_handler(env_provider.workload_api_handler());

// Run memory bounds validation to ensure that we can launch the topology with our configured memory limit, if any.
let bounds_configuration = MemoryBoundsConfiguration::try_from_config(&configuration)?;
Expand Down Expand Up @@ -171,7 +172,7 @@ async fn run(started: Instant, logging_api_handler: LoggingAPIHandler) -> Result
}

async fn create_topology(
configuration: &GenericConfiguration, env_provider: ADPEnvironmentProvider, component_registry: &ComponentRegistry,
configuration: &GenericConfiguration, env_provider: &ADPEnvironmentProvider, component_registry: &ComponentRegistry,
) -> Result<TopologyBlueprint, GenericError> {
// Create a simple pipeline that runs a DogStatsD source, an aggregation transform to bucket into 10 second windows,
// and a Datadog Metrics destination that forwards aggregated buckets to the Datadog Platform.
Expand All @@ -181,7 +182,7 @@ async fn create_topology(
let dsd_agg_config = AggregateConfiguration::from_configuration(configuration)
.error_context("Failed to configure aggregate transform.")?;
let dsd_prefix_filter_configuration = DogstatsDPrefixFilterConfiguration::from_configuration(configuration)?;
let host_enrichment_config = HostEnrichmentConfiguration::from_environment_provider(env_provider);
let host_enrichment_config = HostEnrichmentConfiguration::from_environment_provider(env_provider.clone());
let enrich_config = ChainedConfiguration::default().with_transform_builder(host_enrichment_config);
let mut dd_metrics_config = DatadogMetricsConfiguration::from_configuration(configuration)
.error_context("Failed to configure Datadog Metrics destination.")?;
Expand Down
45 changes: 30 additions & 15 deletions lib/saluki-app/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,36 @@ impl APIBuilder {
self
}

/// Adds the given optional handler to this builder.
///
/// If the handler is `Some`, the initial state and routes provided by the handler will be merged into this builder.
/// Otherwise, this builder will be returned unchanged.
pub fn with_optional_handler<H>(self, handler: Option<H>) -> Self
where
H: APIHandler,
{
if let Some(handler) = handler {
self.with_handler(handler)
} else {
self
}
}

/// Add the given gRPC service to this builder.
pub fn with_grpc_service<S>(mut self, svc: S) -> Self
where
S: Service<Request<BoxBody>, Response = Response<BoxBody>, Error = Infallible>
+ NamedService
+ Clone
+ Send
+ 'static,
S::Future: Send + 'static,
S::Error: Into<Box<dyn Error + Send + Sync>> + Send,
{
self.grpc_router.add_service(svc);
self
}

/// Sets the TLS configuration for the server.
///
/// This will enable TLS for the server, and the server will only accept connections that are encrypted with TLS.
Expand Down Expand Up @@ -94,21 +124,6 @@ impl APIBuilder {
self.with_tls_config(config)
}

/// Add the given gRPC service to this builder.
pub fn with_grpc_service<S>(mut self, svc: S) -> Self
where
S: Service<Request<BoxBody>, Response = Response<BoxBody>, Error = Infallible>
+ NamedService
+ Clone
+ Send
+ 'static,
S::Future: Send + 'static,
S::Error: Into<Box<dyn Error + Send + Sync>> + Send,
{
self.grpc_router.add_service(svc);
self
}

/// Serves the API on the given listen address until `shutdown` resolves.
///
/// The listen address must be a connection-oriented address (TCP or Unix domain socket in SOCK_STREAM mode).
Expand Down
37 changes: 36 additions & 1 deletion lib/saluki-context/src/tags/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use std::{fmt, hash, ops::Deref as _, sync::Arc};

use serde::Serialize;
use stringtheory::MetaString;

mod raw;
Expand Down Expand Up @@ -89,6 +90,15 @@ impl fmt::Display for Tag {
}
}

impl Serialize for Tag {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(&self.0)
}
}

impl<T> From<T> for Tag
where
T: Into<MetaString>,
Expand Down Expand Up @@ -180,7 +190,7 @@ impl<'a> hash::Hash for BorrowedTag<'a> {
}

/// A set of tags.
#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug, Default, Serialize)]
pub struct TagSet(Vec<Tag>);

impl TagSet {
Expand Down Expand Up @@ -466,6 +476,31 @@ impl<'a> IntoIterator for &'a SharedTagSet {
}
}

impl fmt::Display for SharedTagSet {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "[")?;

for (i, tag) in self.0.deref().into_iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}

write!(f, "{}", tag.as_str())?;
}

write!(f, "]")
}
}

impl Serialize for SharedTagSet {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
self.0.deref().serialize(serializer)
}
}

impl Tagged for SharedTagSet {
fn visit_tags<F>(&self, visitor: F)
where
Expand Down
1 change: 1 addition & 0 deletions lib/saluki-env/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ prost = { workspace = true }
prost-types = { workspace = true }
regex = { workspace = true, features = ["std", "perf"] }
rustls-pemfile = { workspace = true }
saluki-api = { workspace = true }
saluki-config = { workspace = true }
saluki-context = { workspace = true }
saluki-error = { workspace = true }
Expand Down
22 changes: 12 additions & 10 deletions lib/saluki-env/src/prelude.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
use std::collections::{HashMap, HashSet};

use indexmap::IndexMap;

// We specifically alias a number of common hash-based containers here to provide versions that use [`ahash`][ahash] as
// the hasher of choice. It's a high-performance hash implementation that is far faster than the standard library's
// `SipHash` implementation. We don't have a need for DoS resistance in our use case, because we're not dealing with
Expand All @@ -10,11 +6,17 @@ use indexmap::IndexMap;
// While there are other, faster hash implementations -- rapidhash, gxhash, etc -- we're using `ahash` as it is
// well-trodden and provides more than enough performance for our current needs.

/// A [`HashSet`][std::collections::HashSet] using [`ahash`][ahash] as the configured hasher.
pub type FastHashSet<T> = HashSet<T, ahash::RandomState>;
/// A hash set based on the standard library's implementation ([`HashSet`][std::collections::HashSet]) using [`ahash`][ahash] as the configured hasher.
pub type FastHashSet<T> = std::collections::HashSet<T, ahash::RandomState>;

/// A hash map based on the standard library's implementation ([`HashMap`][std::collections::HashMap]) using [`ahash`][ahash] as the configured hasher.
pub type FastHashMap<K, V> = std::collections::HashMap<K, V, ahash::RandomState>;

/// A concurrent hash set based on `papaya` ([`HashSet``][papaya::HashSet]) using [`ahash`][ahash] as the configured hasher.
pub type FastConcurrentHashSet<T> = papaya::HashSet<T, ahash::RandomState>;

/// A [`HashMap`][std::collections::HashMap] using [`ahash`][ahash] as the configured hasher.
pub type FastHashMap<K, V> = HashMap<K, V, ahash::RandomState>;
/// A concurrent hash map based on `papaya` ([`HashMap`][papaya::HashMap]) using [`ahash`][ahash] as the configured hasher.
pub type FastConcurrentHashMap<K, V> = papaya::HashMap<K, V, ahash::RandomState>;

/// An [`IndexMap`][indexmap::IndexMap] using [`ahash`][ahash] as the configured hasher.
pub type FastIndexMap<K, V> = IndexMap<K, V, ahash::RandomState>;
/// A hash map with stable insertion order based on `indexmap` ([`IndexMap`][indexmap::IndexMap]) using [`ahash`][ahash] as the configured hasher.
pub type FastIndexMap<K, V> = indexmap::IndexMap<K, V, ahash::RandomState>;
77 changes: 76 additions & 1 deletion lib/saluki-env/src/workload/entity.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fmt;
use std::{cmp::Ordering, fmt};

use stringtheory::MetaString;

Expand Down Expand Up @@ -74,6 +74,16 @@ impl EntityId {
}
Some(Self::PodUid(pod_uid.into()))
}

fn precedence_value(&self) -> usize {
match self {
Self::Global => 0,
Self::PodUid(_) => 1,
Self::Container(_) => 2,
Self::ContainerInode(_) => 3,
Self::ContainerPid(_) => 4,
}
}
}

impl fmt::Display for EntityId {
Expand All @@ -87,3 +97,68 @@ impl fmt::Display for EntityId {
}
}
}

impl serde::Serialize for EntityId {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
// We have this manual implementation of `Serialize` just to avoid needing to bring in `serde_with` to get the
// helper that utilizes the `Display` implementation.
serializer.collect_str(self)
}
}

/// A wrapper for entity IDs that sorts them in a manner consistent with the expected precedence of entity IDs.
///
/// This type establishes a total ordering over entity IDs based on their logical precedence, which is as follows:
///
/// - global (highest precedence)
/// - pod
/// - container
/// - container inode
/// - container PID (lowest precedence)
///
/// Wrapped entity IDs are be sorted highest to lowest precedence. For entity IDs with the same precedence, they are
/// further ordered by their internal value. For entity IDs with a string identifier, lexicographical ordering is used.
/// For entity IDs with a numeric identifier, numerical ordering is used.
#[derive(Eq, PartialEq)]
pub struct HighestPrecedenceEntityIdRef<'a>(&'a EntityId);

impl<'a> From<&'a EntityId> for HighestPrecedenceEntityIdRef<'a> {
fn from(entity_id: &'a EntityId) -> Self {
Self(entity_id)
}
}

impl<'a> PartialOrd for HighestPrecedenceEntityIdRef<'a> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl<'a> Ord for HighestPrecedenceEntityIdRef<'a> {
fn cmp(&self, other: &Self) -> Ordering {
// Do the initial comparison based on the implicit precedence of each entity ID.
let self_precedence = self.0.precedence_value();
let other_precedence = other.0.precedence_value();
if self_precedence != other_precedence {
return self_precedence.cmp(&other_precedence);
}

// We have two entities at the same level of precedence, so we need to compare their actual values.
match (self.0, other.0) {
// Global entities are always equal.
(EntityId::Global, EntityId::Global) => Ordering::Equal,
(EntityId::PodUid(self_pod_uid), EntityId::PodUid(other_pod_uid)) => self_pod_uid.cmp(other_pod_uid),
(EntityId::Container(self_container_id), EntityId::Container(other_container_id)) => {
self_container_id.cmp(other_container_id)
}
(EntityId::ContainerInode(self_inode), EntityId::ContainerInode(other_inode)) => {
self_inode.cmp(other_inode)
}
(EntityId::ContainerPid(self_pid), EntityId::ContainerPid(other_pid)) => self_pid.cmp(other_pid),
_ => unreachable!("entities with different precedence should not be compared"),
}
}
}
2 changes: 1 addition & 1 deletion lib/saluki-env/src/workload/providers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ mod noop;
pub use self::noop::NoopWorkloadProvider;

mod remote_agent;
pub use self::remote_agent::RemoteAgentWorkloadProvider;
pub use self::remote_agent::{RemoteAgentWorkloadAPIHandler, RemoteAgentWorkloadProvider};
Loading

0 comments on commit f5ce719

Please sign in to comment.