Skip to content

Commit

Permalink
missed a spot
Browse files Browse the repository at this point in the history
  • Loading branch information
tobz committed Jan 29, 2025
1 parent 5a51f11 commit 4ce4d0e
Show file tree
Hide file tree
Showing 2 changed files with 353 additions and 0 deletions.
134 changes: 134 additions & 0 deletions lib/saluki-env/src/workload/providers/remote_agent/api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
use saluki_api::{
extract::State,
response::IntoResponse,
routing::{get, Router},
APIHandler,
};
use saluki_context::{
origin::OriginTagCardinality,
tags::{SharedTagSet, TagSet},
};
use serde::Serialize;

use crate::{
prelude::*,
workload::{entity::HighestPrecedenceEntityIdRef, stores::TagStoreQuerier, EntityId},
};

#[derive(Serialize)]
struct EntityInformation<'a> {
entity_id: &'a EntityId,
ancestors: Vec<&'a EntityId>,
low_cardinality_tags: SharedTagSet,
orchestrator_cardinality_tags: SharedTagSet,
high_cardinality_tags: SharedTagSet,
}

/// State used for the Remote Agent workload API handler.
#[derive(Clone)]
pub struct RemoteAgentWorkloadState {
tag_querier: TagStoreQuerier,
}

impl RemoteAgentWorkloadState {
fn get_tags_dump_response(&self) -> String {
let mut active_entities = FastHashSet::default();
let mut entity_mappings = FastHashMap::default();
let mut entity_info_map = FastHashMap::default();
let empty_tagset = TagSet::default().into_shared();

// First, collect a list of all entities presently in the tag store, and then also go through and collect the
// entity mappings for each entity.
self.tag_querier.visit_active_entities(|entity_id| {
active_entities.insert(entity_id.clone());
});

self.tag_querier.visit_entity_mappings(|entity_id, parent_id| {
active_entities.insert(entity_id.clone());
entity_mappings.insert(entity_id.clone(), parent_id.clone());
});

// For each entity, build its ancestry chain and collect all of the relevant tags.
for entity_id in active_entities.iter() {
let mut ancestors = Vec::new();

let mut current_entity = entity_id;
while let Some(parent) = entity_mappings.get(current_entity) {
ancestors.insert(0, parent);
current_entity = parent;
}

let low_cardinality_tags = self
.tag_querier
.get_entity_tags(entity_id, OriginTagCardinality::Low)
.unwrap_or_else(|| empty_tagset.clone());
let orchestrator_cardinality_tags = self
.tag_querier
.get_entity_tags(entity_id, OriginTagCardinality::Orchestrator)
.unwrap_or_else(|| empty_tagset.clone());
let high_cardinality_tags = self
.tag_querier
.get_entity_tags(entity_id, OriginTagCardinality::High)
.unwrap_or_else(|| empty_tagset.clone());

entity_info_map.insert(
entity_id,
EntityInformation {
entity_id,
ancestors,
low_cardinality_tags,
orchestrator_cardinality_tags,
high_cardinality_tags,
},
);
}

// Collapse the entity information map into sorted vector of entity information, which is sorted in precedence
// order of the entity ID.
let mut entity_info = entity_info_map.into_values().collect::<Vec<_>>();
entity_info.sort_by_cached_key(|entity_info| HighestPrecedenceEntityIdRef::from(entity_info.entity_id));

serde_json::to_string(&entity_info).unwrap()
}
}

/// An API handler for interacting with the underlying data stores that comprise the Remote Agent workload provider.
///
/// This handler registers a number of routes that allow for introspecting the state of the underlying data stores to
/// understand exactly what entities are being tracked and what tags are associated with them.
///
/// # Routes
///
/// ## GET `/workload/remote_agent/tags/dump`
///
/// This route will dump the contents of the associated tag store in a human-readable form.
///
/// All entities present in the tag store will be listed, along with their ancestry chain and the tags associated with
/// the entity at each tag cardinality level. Entities are sorted in the output from highest to lowest precedence.
pub struct RemoteAgentWorkloadAPIHandler {
state: RemoteAgentWorkloadState,
}

impl RemoteAgentWorkloadAPIHandler {
pub(crate) fn from_state(tag_querier: TagStoreQuerier) -> Self {
Self {
state: RemoteAgentWorkloadState { tag_querier },
}
}

async fn tags_dump_handler(State(state): State<RemoteAgentWorkloadState>) -> impl IntoResponse {
state.get_tags_dump_response()
}
}

impl APIHandler for RemoteAgentWorkloadAPIHandler {
type State = RemoteAgentWorkloadState;

fn generate_initial_state(&self) -> Self::State {
self.state.clone()
}

fn generate_routes(&self) -> Router<Self::State> {
Router::new().route("/workload/remote_agent/tags/dump", get(Self::tags_dump_handler))
}
}
219 changes: 219 additions & 0 deletions lib/saluki-env/src/workload/providers/remote_agent/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
//! A workload provider based on the Datadog Agent's remote tagger and workloadmeta APIs.
use std::{future::Future, num::NonZeroUsize};

use async_trait::async_trait;
use memory_accounting::{ComponentRegistry, MemoryBounds, MemoryBoundsBuilder};
use saluki_config::GenericConfiguration;
use saluki_context::{
origin::{OriginInfo, OriginKey, OriginTagCardinality, OriginTagVisitor, OriginTagsResolver},
tags::SharedTagSet,
};
use saluki_error::{generic_error, ErrorContext as _, GenericError};
use saluki_health::{Health, HealthRegistry};
use stringtheory::interning::GenericMapInterner;

#[cfg(target_os = "linux")]
use crate::workload::collectors::CgroupsMetadataCollector;
use crate::{
features::{Feature, FeatureDetector},
workload::{
aggregator::MetadataAggregator,
collectors::{
ContainerdMetadataCollector, RemoteAgentTaggerMetadataCollector, RemoteAgentWorkloadMetadataCollector,
},
entity::EntityId,
origin::{OriginEnrichmentConfiguration, OriginTagsQuerier},
stores::{ExternalDataStore, TagStore, TagStoreQuerier},
},
WorkloadProvider,
};

mod api;
pub use self::api::RemoteAgentWorkloadAPIHandler;

// TODO: Make these configurable.

// SAFETY: The value is demonstrably not zero.
const DEFAULT_TAG_STORE_ENTITY_LIMIT: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(2000) };

// SAFETY: The value is demonstrably not zero.
const DEFAULT_EXTERNAL_DATA_STORE_ENTITY_LIMIT: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(2000) };

// SAFETY: We know the value is not zero.
const DEFAULT_STRING_INTERNER_SIZE_BYTES: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(512 * 1024) }; // 512KB.

/// Datadog Agent-based workload provider.
///
/// This provider is based primarily on the remote tagger API exposed by the Datadog Agent, which handles the bulk of
/// the work by collecting and aggregating tags for container entities. This remote tagger API operates in a streaming
/// fashion, which the provider uses to stream update operations to the tag store.
///
/// Additionally, two collectors are optionally used: a `containerd` collector and a `cgroups-v2` collector. The
/// `containerd` collector will, if containerd is running, be used to collect metadata that allows mapping container
/// PIDs (UDS-based Origin Detection) to container IDs. The `cgroups-v2` collector will collect metadata about the
/// current set of cgroups v2 controllers, tracking any controllers which appear related to containers and storing a
/// mapping of controller inodes to container IDs.
///
/// These additional collectors are necessary to bridge the gap from container PID and cgroup controller inode, as the
/// remote tagger API does not stream us these mappings itself and only deals with resolved container IDs.
#[derive(Clone)]
pub struct RemoteAgentWorkloadProvider {
tags_querier: TagStoreQuerier,
origin_tags_querier: OriginTagsQuerier,
}

impl RemoteAgentWorkloadProvider {
/// Create a new `RemoteAgentWorkloadProvider` based on the given configuration.
pub async fn from_configuration(
config: &GenericConfiguration, component_registry: ComponentRegistry, health_registry: &HealthRegistry,
) -> Result<Self, GenericError> {
let mut component_registry = component_registry.get_or_create("remote-agent");
let mut provider_bounds = component_registry.bounds_builder();

// Create our string interner which will get used primarily for tags, but also for any other long-ish lived strings.
let string_interner_size_bytes = config
.try_get_typed::<NonZeroUsize>("remote_agent_string_interner_size_bytes")?
.unwrap_or(DEFAULT_STRING_INTERNER_SIZE_BYTES);
let string_interner = GenericMapInterner::new(string_interner_size_bytes);

provider_bounds
.subcomponent("string_interner")
.firm()
.with_fixed_amount(string_interner_size_bytes.get());

// Construct our aggregator, and add any collectors based on the detected features we've been given.
let aggregator_health = health_registry
.register_component("env_provider.workload.remote_agent.aggregator")
.ok_or_else(|| {
generic_error!(
"Component 'env_provider.workload.remote_agent.aggregator' already registered in health registry."
)
})?;
let mut aggregator = MetadataAggregator::new(aggregator_health);

let mut collector_bounds = provider_bounds.subcomponent("collectors");

// Add the containerd collector if the feature is available.
let feature_detector = FeatureDetector::automatic(config);
if feature_detector.is_feature_available(Feature::Containerd) {
let cri_collector = build_collector("containerd", health_registry, &mut collector_bounds, |health| {
ContainerdMetadataCollector::from_configuration(config, health, string_interner.clone())
})
.await?;

aggregator.add_collector(cri_collector);
}

// Add the cgroups collector if the feature if we're on Linux.
#[cfg(target_os = "linux")]
{
let cgroups_collector = build_collector("cgroups", health_registry, &mut collector_bounds, |health| {
CgroupsMetadataCollector::from_configuration(
config,
feature_detector.clone(),
health,
string_interner.clone(),
)
})
.await?;

aggregator.add_collector(cgroups_collector);
}

// Finally, add the Remote Agent collectors: one for the tagger, and one for workloadmeta.
let ra_tags_collector =
build_collector("remote-agent-tags", health_registry, &mut collector_bounds, |health| {
RemoteAgentTaggerMetadataCollector::from_configuration(config, health, string_interner.clone())
})
.await?;

aggregator.add_collector(ra_tags_collector);

let ra_wmeta_collector =
build_collector("remote-agent-wmeta", health_registry, &mut collector_bounds, |health| {
RemoteAgentWorkloadMetadataCollector::from_configuration(config, health, string_interner.clone())
})
.await?;

aggregator.add_collector(ra_wmeta_collector);

// Create and attach the various metadata stores.
let tag_store = TagStore::with_entity_limit(DEFAULT_TAG_STORE_ENTITY_LIMIT);
let tags_querier = tag_store.querier();

aggregator.add_store(tag_store);

let external_data_store = ExternalDataStore::with_entity_limit(DEFAULT_EXTERNAL_DATA_STORE_ENTITY_LIMIT);
let external_data_resolver = external_data_store.resolver();

aggregator.add_store(external_data_store);

let origin_enrichment_config = config
.as_typed::<OriginEnrichmentConfiguration>()
.error_context("Failed to load origin enrichment configuration.")?;
let origin_tags_querier =
OriginTagsQuerier::new(origin_enrichment_config, tags_querier.clone(), external_data_resolver);

// With the aggregator configured, update the memory bounds and spawn the aggregator.
provider_bounds.with_subcomponent("aggregator", &aggregator);

tokio::spawn(aggregator.run());

Ok(Self {
tags_querier,
origin_tags_querier,
})
}

/// Returns an API handler for dumping the contents of the underlying data stores.
///
/// This handler can be used to register routes on an [`APIBuilder`][saluki_api::APIBuilder] for dumping the
/// contents of the underlying data stores powering this workload provider. See [`RemoteAgentWorkloadAPIHandler`]
/// for more information about routes and responses.
pub fn api_handler(&self) -> RemoteAgentWorkloadAPIHandler {
RemoteAgentWorkloadAPIHandler::from_state(self.tags_querier.clone())
}
}

#[async_trait]
impl WorkloadProvider for RemoteAgentWorkloadProvider {
fn get_tags_for_entity(&self, entity_id: &EntityId, cardinality: OriginTagCardinality) -> Option<SharedTagSet> {
self.tags_querier.get_entity_tags(entity_id, cardinality)
}
}

impl OriginTagsResolver for RemoteAgentWorkloadProvider {
fn resolve_origin_key(&self, origin_info: OriginInfo<'_>) -> Option<OriginKey> {
self.origin_tags_querier.resolve_origin_key_from_info(origin_info)
}

fn visit_origin_tags(&self, origin_key: OriginKey, visitor: &mut dyn OriginTagVisitor) {
self.origin_tags_querier.visit_origin_tags(origin_key, visitor)
}
}

async fn build_collector<F, Fut, O>(
collector_name: &str, health_registry: &HealthRegistry, bounds_builder: &mut MemoryBoundsBuilder<'_>, build: F,
) -> Result<O, GenericError>
where
F: FnOnce(Health) -> Fut,
Fut: Future<Output = Result<O, GenericError>>,
O: MemoryBounds,
{
let health = health_registry
.register_component(format!(
"env_provider.workload.remote_agent.collector.{}",
collector_name
))
.ok_or_else(|| {
generic_error!(
"Component 'env_provider.workload.remote_agent.collector.{}' already registered in health registry.",
collector_name
)
})?;
let collector = build(health).await?;
bounds_builder.with_subcomponent(collector_name, &collector);

Ok(collector)
}

0 comments on commit 4ce4d0e

Please sign in to comment.