Skip to content

Commit

Permalink
chore: clean up eventd and service check types (#130)
Browse files Browse the repository at this point in the history
  • Loading branch information
tobz authored Jul 23, 2024
1 parent 8612664 commit adf91b2
Show file tree
Hide file tree
Showing 14 changed files with 177 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ where
debug!(events_len = event_buffer.len(), "Processing event buffer.");

for event in event_buffer {
if let Some(metric) = event.into_metric() {
if let Some(metric) = event.try_into_metric() {
let request_builder = match MetricsEndpoint::from_metric(&metric) {
MetricsEndpoint::Series => &mut series_request_builder,
MetricsEndpoint::Sketches => &mut sketches_request_builder,
Expand Down
2 changes: 1 addition & 1 deletion lib/saluki-components/src/destinations/prometheus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl Destination for Prometheus {
// Process each metric event in the batch, either merging it with the existing value or
// inserting it for the first time.
for event in events {
if let Some(metric) = event.into_metric() {
if let Some(metric) = event.try_into_metric() {
let (context, value, _) = metric.into_parts();

// Skip any metric types we can't handle.
Expand Down
2 changes: 1 addition & 1 deletion lib/saluki-components/src/sources/dogstatsd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ async fn drive_stream(
if origin_detection {
if let ConnectionAddress::ProcessLike(Some(creds)) = &peer_addr {
for event in &mut event_buffer {
if let Some(metric) = event.as_metric_mut() {
if let Some(metric) = event.try_as_metric_mut() {
if metric.metadata().origin_entity().is_none() {
metric
.metadata_mut()
Expand Down
4 changes: 2 additions & 2 deletions lib/saluki-components/src/transforms/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ impl Transform for Aggregate {
let event_buffer_len = event_buffer.len();

for event in events {
if let Some(metric) = event.into_metric() {
if let Some(metric) = event.try_into_metric() {
if self.forward_timestamped_metrics && metric.metadata().timestamp().is_some() {
event_buffer.push(Event::Metric(metric));
} else if !state.insert(metric) {
Expand Down Expand Up @@ -523,7 +523,7 @@ mod tests {

let mut metrics = event_buffer
.into_iter()
.filter_map(|event| event.into_metric())
.filter_map(|event| event.try_into_metric())
.collect::<Vec<_>>();
metrics.sort_by(|a, b| a.context().name().cmp(b.context().name()));
metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl HostEnrichment {
impl SynchronousTransform for HostEnrichment {
fn transform_buffer(&self, event_buffer: &mut EventBuffer) {
for event in event_buffer {
if let Some(metric) = event.as_metric_mut() {
if let Some(metric) = event.try_as_metric_mut() {
self.enrich_metric(metric)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ where
{
fn transform_buffer(&self, event_buffer: &mut EventBuffer) {
for event in event_buffer {
if let Some(metric) = event.as_metric_mut() {
if let Some(metric) = event.try_as_metric_mut() {
self.enrich_metric(metric)
}
}
Expand Down
110 changes: 55 additions & 55 deletions lib/saluki-core/src/topology/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,9 +447,9 @@ mod test {
fn invalid_input_id() {
let mut graph = Graph::default();
let result = graph
.with_source("in_log", DataType::Log)
.with_destination("out_log", DataType::Log)
.with_edge_fallible("in log", "out_log")
.with_source("in_eventd", DataType::EventD)
.with_destination("out_eventd", DataType::EventD)
.with_edge_fallible("in log", "out_eventd")
.map(|_| ()); // ditch mutable self ref to allow for equality check

assert!(result.is_err());
Expand All @@ -460,18 +460,18 @@ mod test {
fn nonexistent_input_id() {
let mut graph = Graph::default();
let result = graph
.with_source("in_log", DataType::Log)
.with_destination("out_log", DataType::Log)
.with_edge_fallible("in_loog", "out_log")
.with_source("in_eventd", DataType::EventD)
.with_destination("out_eventd", DataType::EventD)
.with_edge_fallible("in_loog", "out_eventd")
.map(|_| ()); // ditch mutable self ref to allow for equality check

assert_eq!(result, Err(component_id_doesnt_exist("in_loog")));

let mut graph = Graph::default();
let result = graph
.with_source("in_all_but_logs", DataType::all_bits())
.with_destination("out_log", DataType::Log)
.with_edge_fallible("in_all_but_logs.logs", "out_log")
.with_destination("out_eventd", DataType::EventD)
.with_edge_fallible("in_all_but_logs.logs", "out_eventd")
.map(|_| ()); // ditch mutable self ref to allow for equality check

assert_eq!(result, Err(output_id_doesnt_exist("in_all_but_logs.logs")));
Expand All @@ -481,17 +481,17 @@ mod test {
fn multiple_outputs() {
let mut graph = Graph::default();
graph
.with_source("in_log", DataType::Log)
.with_source("in_eventd", DataType::EventD)
.with_transform_multiple_outputs(
"log_to_log",
DataType::Log,
&[(None, DataType::Log), (Some("errors"), DataType::Log)],
"eventd_to_eventd",
DataType::EventD,
&[(None, DataType::EventD), (Some("errors"), DataType::EventD)],
)
.with_destination("out_log", DataType::Log)
.with_destination("out_errored_log", DataType::Log)
.with_edge("in_log", "log_to_log")
.with_edge("log_to_log", "out_log")
.with_edge("log_to_log.errors", "out_errored_log");
.with_destination("out_eventd", DataType::EventD)
.with_destination("out_errored_eventd", DataType::EventD)
.with_edge("in_eventd", "eventd_to_eventd")
.with_edge("eventd_to_eventd", "out_eventd")
.with_edge("eventd_to_eventd.errors", "out_errored_eventd");

assert_eq!(Ok(()), graph.check_data_types());
}
Expand All @@ -500,11 +500,11 @@ mod test {
fn detect_cycles() {
let mut graph = Graph::default();
graph
.with_source("in", DataType::Log)
.with_transform("one", DataType::Log, DataType::Log)
.with_transform("two", DataType::Log, DataType::Log)
.with_transform("three", DataType::Log, DataType::Log)
.with_destination("out", DataType::Log)
.with_source("in", DataType::EventD)
.with_transform("one", DataType::EventD, DataType::EventD)
.with_transform("two", DataType::EventD, DataType::EventD)
.with_transform("three", DataType::EventD, DataType::EventD)
.with_destination("out", DataType::EventD)
.with_multi_edge(&["in", "three"], "one")
.with_edge("one", "two")
.with_edge("two", "three")
Expand All @@ -519,11 +519,11 @@ mod test {

let mut graph = Graph::default();
graph
.with_source("in", DataType::Log)
.with_transform("one", DataType::Log, DataType::Log)
.with_transform("two", DataType::Log, DataType::Log)
.with_transform("three", DataType::Log, DataType::Log)
.with_destination("out", DataType::Log)
.with_source("in", DataType::EventD)
.with_transform("one", DataType::EventD, DataType::EventD)
.with_transform("two", DataType::EventD, DataType::EventD)
.with_transform("three", DataType::EventD, DataType::EventD)
.with_destination("out", DataType::EventD)
.with_multi_edge(&["in", "three"], "one")
.with_edge("one", "two")
.with_edge("two", "three")
Expand All @@ -541,10 +541,10 @@ mod test {
fn disconnected_components() {
let mut graph = Graph::default();
graph
.with_source("in", DataType::Log)
.with_transform("one", DataType::Log, DataType::Log)
.with_transform("two", DataType::Log, DataType::Log)
.with_destination("out", DataType::Log)
.with_source("in", DataType::EventD)
.with_transform("one", DataType::EventD, DataType::EventD)
.with_transform("two", DataType::EventD, DataType::EventD)
.with_destination("out", DataType::EventD)
.with_edge("in", "out");

assert_eq!(
Expand All @@ -559,11 +559,11 @@ mod test {
fn diamond_edge_formation() {
let mut graph = Graph::default();
graph
.with_source("in", DataType::Log)
.with_transform("one", DataType::Log, DataType::Log)
.with_transform("two", DataType::Log, DataType::Log)
.with_transform("three", DataType::Log, DataType::Log)
.with_destination("out", DataType::Log)
.with_source("in", DataType::EventD)
.with_transform("one", DataType::EventD, DataType::EventD)
.with_transform("two", DataType::EventD, DataType::EventD)
.with_transform("three", DataType::EventD, DataType::EventD)
.with_destination("out", DataType::EventD)
.with_edge("in", "one")
.with_edge("in", "two")
.with_multi_edge(&["one", "two"], "three")
Expand All @@ -576,14 +576,14 @@ mod test {
fn datatype_disjoint_sets() {
let mut graph = Graph::default();
graph
.with_source("in", DataType::Log)
.with_source("in", DataType::EventD)
.with_destination("out", DataType::Metric)
.with_edge("in", "out");

assert_eq!(
Err(GraphError::DataTypeMismatch {
from_component_output_id: try_into_component_output_id("in").unwrap(),
from_ty: DataType::Log,
from_ty: DataType::EventD,
to_component_id: try_into_component_id("out").unwrap(),
to_ty: DataType::Metric,
}),
Expand All @@ -595,10 +595,10 @@ mod test {
fn datatype_subset_into_superset() {
let mut graph = Graph::default();
graph
.with_source("in_log", DataType::Log)
.with_source("in_eventd", DataType::EventD)
.with_source("in_metric", DataType::Metric)
.with_destination("out", DataType::all_bits())
.with_multi_edge(&["in_log", "in_metric"], "out");
.with_multi_edge(&["in_eventd", "in_metric"], "out");

assert_eq!(Ok(()), graph.check_data_types());
}
Expand All @@ -608,14 +608,14 @@ mod test {
let mut graph = Graph::default();
graph
.with_source("in", DataType::all_bits())
.with_transform("log_to_any", DataType::Log, DataType::all_bits())
.with_transform("any_to_log", DataType::all_bits(), DataType::Log)
.with_destination("out_log", DataType::Log)
.with_transform("eventd_to_any", DataType::EventD, DataType::all_bits())
.with_transform("any_to_eventd", DataType::all_bits(), DataType::EventD)
.with_destination("out_eventd", DataType::EventD)
.with_destination("out_metric", DataType::Metric)
.with_edge("in", "log_to_any")
.with_edge("in", "any_to_log")
.with_multi_edge(&["in", "log_to_any", "any_to_log"], "out_log")
.with_multi_edge(&["in", "log_to_any"], "out_metric");
.with_edge("in", "eventd_to_any")
.with_edge("in", "any_to_eventd")
.with_multi_edge(&["in", "eventd_to_any", "any_to_eventd"], "out_eventd")
.with_multi_edge(&["in", "eventd_to_any"], "out_metric");

assert_eq!(Ok(()), graph.check_data_types());
}
Expand All @@ -624,21 +624,21 @@ mod test {
fn allows_both_directions_for_metrics() {
let mut graph = Graph::default();
graph
.with_source("in_log", DataType::Log)
.with_source("in_eventd", DataType::EventD)
.with_source("in_metric", DataType::Metric)
.with_transform("log_to_log", DataType::Log, DataType::Log)
.with_transform("eventd_to_eventd", DataType::EventD, DataType::EventD)
.with_transform("metric_to_metric", DataType::Metric, DataType::Metric)
.with_transform("any_to_any", DataType::all_bits(), DataType::all_bits())
.with_transform("any_to_log", DataType::all_bits(), DataType::Log)
.with_transform("any_to_eventd", DataType::all_bits(), DataType::EventD)
.with_transform("any_to_metric", DataType::all_bits(), DataType::Metric)
.with_destination("out_log", DataType::Log)
.with_destination("out_eventd", DataType::EventD)
.with_destination("out_metric", DataType::Metric)
.with_edge("in_log", "log_to_log")
.with_edge("in_eventd", "eventd_to_eventd")
.with_edge("in_metric", "metric_to_metric")
.with_multi_edge(&["log_to_log", "metric_to_metric"], "any_to_any")
.with_edge("any_to_any", "any_to_log")
.with_multi_edge(&["eventd_to_eventd", "metric_to_metric"], "any_to_any")
.with_edge("any_to_any", "any_to_eventd")
.with_edge("any_to_any", "any_to_metric")
.with_edge("any_to_log", "out_log")
.with_edge("any_to_eventd", "out_eventd")
.with_edge("any_to_metric", "out_metric");

assert_eq!(Ok(()), graph.check_data_types());
Expand Down
12 changes: 6 additions & 6 deletions lib/saluki-core/src/topology/interconnect/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ mod tests {
let forwarded_metric = forwarded_ebuf
.into_iter()
.next()
.and_then(|event| event.into_metric())
.and_then(|event| event.try_into_metric())
.expect("should be single metric in the buffer");
assert_eq!(forwarded_metric.context(), metric.context());
}
Expand Down Expand Up @@ -242,7 +242,7 @@ mod tests {
let forwarded_metric = forwarded_ebuf
.into_iter()
.next()
.and_then(|event| event.into_metric())
.and_then(|event| event.try_into_metric())
.expect("should be single metric in the buffer");
assert_eq!(forwarded_metric.context(), metric.context());
}
Expand Down Expand Up @@ -274,14 +274,14 @@ mod tests {
let forwarded_metric1 = forwarded_ebuf1
.into_iter()
.next()
.and_then(|event| event.into_metric())
.and_then(|event| event.try_into_metric())
.expect("should be single metric in the buffer");
assert_eq!(forwarded_metric1.context(), metric.context());

let forwarded_metric2 = forwarded_ebuf2
.into_iter()
.next()
.and_then(|event| event.into_metric())
.and_then(|event| event.try_into_metric())
.expect("should be single metric in the buffer");
assert_eq!(forwarded_metric2.context(), metric.context());
}
Expand Down Expand Up @@ -313,14 +313,14 @@ mod tests {
let forwarded_metric1 = forwarded_ebuf1
.into_iter()
.next()
.and_then(|event| event.into_metric())
.and_then(|event| event.try_into_metric())
.expect("should be single metric in the buffer");
assert_eq!(forwarded_metric1.context(), metric.context());

let forwarded_metric2 = forwarded_ebuf2
.into_iter()
.next()
.and_then(|event| event.into_metric())
.and_then(|event| event.try_into_metric())
.expect("should be single metric in the buffer");
assert_eq!(forwarded_metric2.context(), metric.context());
}
Expand Down
15 changes: 0 additions & 15 deletions lib/saluki-event/src/eventd/alert.rs

This file was deleted.

Loading

0 comments on commit adf91b2

Please sign in to comment.