Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhancement: expose debug endpoint for dumping tag store for Remote Agent workload provider #460

Merged
merged 4 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion bin/agent-data-plane/src/env_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use memory_accounting::ComponentRegistry;
use saluki_config::GenericConfiguration;
use saluki_env::{
host::providers::{BoxedHostProvider, FixedHostProvider, RemoteAgentHostProvider},
workload::providers::RemoteAgentWorkloadProvider,
workload::providers::{RemoteAgentWorkloadAPIHandler, RemoteAgentWorkloadProvider},
EnvironmentProvider,
};
use saluki_error::GenericError;
Expand Down Expand Up @@ -66,6 +66,14 @@ impl ADPEnvironmentProvider {
workload_provider,
})
}

/// Returns an API handler for interacting with the underlying data stores powering the workload provider, if one
/// has been configured.
///
/// See [`RemoteAgentWorkloadAPIHandler`] for more information about routes and responses.
pub fn workload_api_handler(&self) -> Option<RemoteAgentWorkloadAPIHandler> {
self.workload_provider.as_ref().map(|provider| provider.api_handler())
}
}

impl EnvironmentProvider for ADPEnvironmentProvider {
Expand Down
9 changes: 5 additions & 4 deletions bin/agent-data-plane/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ async fn run(started: Instant, logging_api_handler: LoggingAPIHandler) -> Result

// Create a simple pipeline that runs a DogStatsD source, an aggregation transform to bucket into 10 second windows,
// and a Datadog Metrics destination that forwards aggregated buckets to the Datadog Platform.
let blueprint = create_topology(&configuration, env_provider, &component_registry).await?;
let blueprint = create_topology(&configuration, &env_provider, &component_registry).await?;

// Build our unprivileged and privileged API server.
//
Expand All @@ -122,7 +122,8 @@ async fn run(started: Instant, logging_api_handler: LoggingAPIHandler) -> Result
let privileged_api = APIBuilder::new()
.with_self_signed_tls()
.with_grpc_service(new_remote_agent_service())
.with_handler(logging_api_handler);
.with_handler(logging_api_handler)
.with_optional_handler(env_provider.workload_api_handler());

// Run memory bounds validation to ensure that we can launch the topology with our configured memory limit, if any.
let bounds_configuration = MemoryBoundsConfiguration::try_from_config(&configuration)?;
Expand Down Expand Up @@ -171,7 +172,7 @@ async fn run(started: Instant, logging_api_handler: LoggingAPIHandler) -> Result
}

async fn create_topology(
configuration: &GenericConfiguration, env_provider: ADPEnvironmentProvider, component_registry: &ComponentRegistry,
configuration: &GenericConfiguration, env_provider: &ADPEnvironmentProvider, component_registry: &ComponentRegistry,
) -> Result<TopologyBlueprint, GenericError> {
// Create a simple pipeline that runs a DogStatsD source, an aggregation transform to bucket into 10 second windows,
// and a Datadog Metrics destination that forwards aggregated buckets to the Datadog Platform.
Expand All @@ -181,7 +182,7 @@ async fn create_topology(
let dsd_agg_config = AggregateConfiguration::from_configuration(configuration)
.error_context("Failed to configure aggregate transform.")?;
let dsd_prefix_filter_configuration = DogstatsDPrefixFilterConfiguration::from_configuration(configuration)?;
let host_enrichment_config = HostEnrichmentConfiguration::from_environment_provider(env_provider);
let host_enrichment_config = HostEnrichmentConfiguration::from_environment_provider(env_provider.clone());
let enrich_config = ChainedConfiguration::default().with_transform_builder(host_enrichment_config);
let mut dd_metrics_config = DatadogMetricsConfiguration::from_configuration(configuration)
.error_context("Failed to configure Datadog Metrics destination.")?;
Expand Down
45 changes: 30 additions & 15 deletions lib/saluki-app/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,36 @@ impl APIBuilder {
self
}

/// Adds the given optional handler to this builder.
///
/// If the handler is `Some`, the initial state and routes provided by the handler will be merged into this builder.
/// Otherwise, this builder will be returned unchanged.
pub fn with_optional_handler<H>(self, handler: Option<H>) -> Self
where
H: APIHandler,
{
if let Some(handler) = handler {
self.with_handler(handler)
} else {
self
}
}

/// Add the given gRPC service to this builder.
pub fn with_grpc_service<S>(mut self, svc: S) -> Self
where
S: Service<Request<BoxBody>, Response = Response<BoxBody>, Error = Infallible>
+ NamedService
+ Clone
+ Send
+ 'static,
S::Future: Send + 'static,
S::Error: Into<Box<dyn Error + Send + Sync>> + Send,
{
self.grpc_router.add_service(svc);
self
}

/// Sets the TLS configuration for the server.
///
/// This will enable TLS for the server, and the server will only accept connections that are encrypted with TLS.
Expand Down Expand Up @@ -94,21 +124,6 @@ impl APIBuilder {
self.with_tls_config(config)
}

/// Add the given gRPC service to this builder.
pub fn with_grpc_service<S>(mut self, svc: S) -> Self
where
S: Service<Request<BoxBody>, Response = Response<BoxBody>, Error = Infallible>
+ NamedService
+ Clone
+ Send
+ 'static,
S::Future: Send + 'static,
S::Error: Into<Box<dyn Error + Send + Sync>> + Send,
{
self.grpc_router.add_service(svc);
self
}

/// Serves the API on the given listen address until `shutdown` resolves.
///
/// The listen address must be a connection-oriented address (TCP or Unix domain socket in SOCK_STREAM mode).
Expand Down
37 changes: 36 additions & 1 deletion lib/saluki-context/src/tags/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use std::{fmt, hash, ops::Deref as _, sync::Arc};

use serde::Serialize;
use stringtheory::MetaString;

mod raw;
Expand Down Expand Up @@ -89,6 +90,15 @@ impl fmt::Display for Tag {
}
}

impl Serialize for Tag {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(&self.0)
}
}

impl<T> From<T> for Tag
where
T: Into<MetaString>,
Expand Down Expand Up @@ -180,7 +190,7 @@ impl<'a> hash::Hash for BorrowedTag<'a> {
}

/// A set of tags.
#[derive(Clone, Debug, Default)]
#[derive(Clone, Debug, Default, Serialize)]
pub struct TagSet(Vec<Tag>);

impl TagSet {
Expand Down Expand Up @@ -466,6 +476,31 @@ impl<'a> IntoIterator for &'a SharedTagSet {
}
}

impl fmt::Display for SharedTagSet {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "[")?;

for (i, tag) in self.0.deref().into_iter().enumerate() {
if i > 0 {
write!(f, ", ")?;
}

write!(f, "{}", tag.as_str())?;
}

write!(f, "]")
}
}

impl Serialize for SharedTagSet {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
self.0.deref().serialize(serializer)
}
}

impl Tagged for SharedTagSet {
fn visit_tags<F>(&self, visitor: F)
where
Expand Down
1 change: 1 addition & 0 deletions lib/saluki-env/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ prost = { workspace = true }
prost-types = { workspace = true }
regex = { workspace = true, features = ["std", "perf"] }
rustls-pemfile = { workspace = true }
saluki-api = { workspace = true }
saluki-config = { workspace = true }
saluki-context = { workspace = true }
saluki-error = { workspace = true }
Expand Down
22 changes: 12 additions & 10 deletions lib/saluki-env/src/prelude.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
use std::collections::{HashMap, HashSet};

use indexmap::IndexMap;

// We specifically alias a number of common hash-based containers here to provide versions that use [`ahash`][ahash] as
// the hasher of choice. It's a high-performance hash implementation that is far faster than the standard library's
// `SipHash` implementation. We don't have a need for DoS resistance in our use case, because we're not dealing with
Expand All @@ -10,11 +6,17 @@ use indexmap::IndexMap;
// While there are other, faster hash implementations -- rapidhash, gxhash, etc -- we're using `ahash` as it is
// well-trodden and provides more than enough performance for our current needs.

/// A [`HashSet`][std::collections::HashSet] using [`ahash`][ahash] as the configured hasher.
pub type FastHashSet<T> = HashSet<T, ahash::RandomState>;
/// A hash set based on the standard library's implementation ([`HashSet`][std::collections::HashSet]) using [`ahash`][ahash] as the configured hasher.
pub type FastHashSet<T> = std::collections::HashSet<T, ahash::RandomState>;

/// A hash map based on the standard library's implementation ([`HashMap`][std::collections::HashMap]) using [`ahash`][ahash] as the configured hasher.
pub type FastHashMap<K, V> = std::collections::HashMap<K, V, ahash::RandomState>;

/// A concurrent hash set based on `papaya` ([`HashSet``][papaya::HashSet]) using [`ahash`][ahash] as the configured hasher.
pub type FastConcurrentHashSet<T> = papaya::HashSet<T, ahash::RandomState>;

/// A [`HashMap`][std::collections::HashMap] using [`ahash`][ahash] as the configured hasher.
pub type FastHashMap<K, V> = HashMap<K, V, ahash::RandomState>;
/// A concurrent hash map based on `papaya` ([`HashMap`][papaya::HashMap]) using [`ahash`][ahash] as the configured hasher.
pub type FastConcurrentHashMap<K, V> = papaya::HashMap<K, V, ahash::RandomState>;

/// An [`IndexMap`][indexmap::IndexMap] using [`ahash`][ahash] as the configured hasher.
pub type FastIndexMap<K, V> = IndexMap<K, V, ahash::RandomState>;
/// A hash map with stable insertion order based on `indexmap` ([`IndexMap`][indexmap::IndexMap]) using [`ahash`][ahash] as the configured hasher.
pub type FastIndexMap<K, V> = indexmap::IndexMap<K, V, ahash::RandomState>;
77 changes: 76 additions & 1 deletion lib/saluki-env/src/workload/entity.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fmt;
use std::{cmp::Ordering, fmt};

use stringtheory::MetaString;

Expand Down Expand Up @@ -74,6 +74,16 @@ impl EntityId {
}
Some(Self::PodUid(pod_uid.into()))
}

fn precedence_value(&self) -> usize {
match self {
Self::Global => 0,
Self::PodUid(_) => 1,
Self::Container(_) => 2,
Self::ContainerInode(_) => 3,
Self::ContainerPid(_) => 4,
}
}
}

impl fmt::Display for EntityId {
Expand All @@ -87,3 +97,68 @@ impl fmt::Display for EntityId {
}
}
}

impl serde::Serialize for EntityId {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
// We have this manual implementation of `Serialize` just to avoid needing to bring in `serde_with` to get the
// helper that utilizes the `Display` implementation.
serializer.collect_str(self)
}
}

/// A wrapper for entity IDs that sorts them in a manner consistent with the expected precedence of entity IDs.
///
/// This type establishes a total ordering over entity IDs based on their logical precedence, which is as follows:
///
/// - global (highest precedence)
/// - pod
/// - container
/// - container inode
/// - container PID (lowest precedence)
///
/// Wrapped entity IDs are be sorted highest to lowest precedence. For entity IDs with the same precedence, they are
/// further ordered by their internal value. For entity IDs with a string identifier, lexicographical ordering is used.
/// For entity IDs with a numeric identifier, numerical ordering is used.
#[derive(Eq, PartialEq)]
pub struct HighestPrecedenceEntityIdRef<'a>(&'a EntityId);

impl<'a> From<&'a EntityId> for HighestPrecedenceEntityIdRef<'a> {
fn from(entity_id: &'a EntityId) -> Self {
Self(entity_id)
}
}

impl<'a> PartialOrd for HighestPrecedenceEntityIdRef<'a> {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
Some(self.cmp(other))
}
}

impl<'a> Ord for HighestPrecedenceEntityIdRef<'a> {
fn cmp(&self, other: &Self) -> Ordering {
// Do the initial comparison based on the implicit precedence of each entity ID.
let self_precedence = self.0.precedence_value();
let other_precedence = other.0.precedence_value();
if self_precedence != other_precedence {
return self_precedence.cmp(&other_precedence);
}

// We have two entities at the same level of precedence, so we need to compare their actual values.
match (self.0, other.0) {
// Global entities are always equal.
(EntityId::Global, EntityId::Global) => Ordering::Equal,
(EntityId::PodUid(self_pod_uid), EntityId::PodUid(other_pod_uid)) => self_pod_uid.cmp(other_pod_uid),
(EntityId::Container(self_container_id), EntityId::Container(other_container_id)) => {
self_container_id.cmp(other_container_id)
}
(EntityId::ContainerInode(self_inode), EntityId::ContainerInode(other_inode)) => {
self_inode.cmp(other_inode)
}
(EntityId::ContainerPid(self_pid), EntityId::ContainerPid(other_pid)) => self_pid.cmp(other_pid),
_ => unreachable!("entities with different precedence should not be compared"),
}
}
}
2 changes: 1 addition & 1 deletion lib/saluki-env/src/workload/providers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ mod noop;
pub use self::noop::NoopWorkloadProvider;

mod remote_agent;
pub use self::remote_agent::RemoteAgentWorkloadProvider;
pub use self::remote_agent::{RemoteAgentWorkloadAPIHandler, RemoteAgentWorkloadProvider};
Loading
Loading