Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: clean up eventd and service check types #130

Merged
merged 4 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ where
debug!(events_len = event_buffer.len(), "Processing event buffer.");

for event in event_buffer {
if let Some(metric) = event.into_metric() {
if let Some(metric) = event.try_into_metric() {
let request_builder = match MetricsEndpoint::from_metric(&metric) {
MetricsEndpoint::Series => &mut series_request_builder,
MetricsEndpoint::Sketches => &mut sketches_request_builder,
Expand Down
2 changes: 1 addition & 1 deletion lib/saluki-components/src/destinations/prometheus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl Destination for Prometheus {
// Process each metric event in the batch, either merging it with the existing value or
// inserting it for the first time.
for event in events {
if let Some(metric) = event.into_metric() {
if let Some(metric) = event.try_into_metric() {
let (context, value, _) = metric.into_parts();

// Skip any metric types we can't handle.
Expand Down
2 changes: 1 addition & 1 deletion lib/saluki-components/src/sources/dogstatsd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ async fn drive_stream(
if origin_detection {
if let ConnectionAddress::ProcessLike(Some(creds)) = &peer_addr {
for event in &mut event_buffer {
if let Some(metric) = event.as_metric_mut() {
if let Some(metric) = event.try_as_metric_mut() {
if metric.metadata().origin_entity().is_none() {
metric
.metadata_mut()
Expand Down
4 changes: 2 additions & 2 deletions lib/saluki-components/src/transforms/aggregate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ impl Transform for Aggregate {
let event_buffer_len = event_buffer.len();

for event in events {
if let Some(metric) = event.into_metric() {
if let Some(metric) = event.try_into_metric() {
if self.forward_timestamped_metrics && metric.metadata().timestamp().is_some() {
event_buffer.push(Event::Metric(metric));
} else if !state.insert(metric) {
Expand Down Expand Up @@ -523,7 +523,7 @@ mod tests {

let mut metrics = event_buffer
.into_iter()
.filter_map(|event| event.into_metric())
.filter_map(|event| event.try_into_metric())
.collect::<Vec<_>>();
metrics.sort_by(|a, b| a.context().name().cmp(b.context().name()));
metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ impl HostEnrichment {
impl SynchronousTransform for HostEnrichment {
fn transform_buffer(&self, event_buffer: &mut EventBuffer) {
for event in event_buffer {
if let Some(metric) = event.as_metric_mut() {
if let Some(metric) = event.try_as_metric_mut() {
self.enrich_metric(metric)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ where
{
fn transform_buffer(&self, event_buffer: &mut EventBuffer) {
for event in event_buffer {
if let Some(metric) = event.as_metric_mut() {
if let Some(metric) = event.try_as_metric_mut() {
self.enrich_metric(metric)
}
}
Expand Down
110 changes: 55 additions & 55 deletions lib/saluki-core/src/topology/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,9 +447,9 @@ mod test {
fn invalid_input_id() {
let mut graph = Graph::default();
let result = graph
.with_source("in_log", DataType::Log)
.with_destination("out_log", DataType::Log)
.with_edge_fallible("in log", "out_log")
.with_source("in_eventd", DataType::EventD)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This, and all changes in this file, are just renaming to reflect the fact I removed DataType::Log and DataType::Trace.

.with_destination("out_eventd", DataType::EventD)
.with_edge_fallible("in log", "out_eventd")
.map(|_| ()); // ditch mutable self ref to allow for equality check

assert!(result.is_err());
Expand All @@ -460,18 +460,18 @@ mod test {
fn nonexistent_input_id() {
let mut graph = Graph::default();
let result = graph
.with_source("in_log", DataType::Log)
.with_destination("out_log", DataType::Log)
.with_edge_fallible("in_loog", "out_log")
.with_source("in_eventd", DataType::EventD)
.with_destination("out_eventd", DataType::EventD)
.with_edge_fallible("in_loog", "out_eventd")
.map(|_| ()); // ditch mutable self ref to allow for equality check

assert_eq!(result, Err(component_id_doesnt_exist("in_loog")));

let mut graph = Graph::default();
let result = graph
.with_source("in_all_but_logs", DataType::all_bits())
.with_destination("out_log", DataType::Log)
.with_edge_fallible("in_all_but_logs.logs", "out_log")
.with_destination("out_eventd", DataType::EventD)
.with_edge_fallible("in_all_but_logs.logs", "out_eventd")
.map(|_| ()); // ditch mutable self ref to allow for equality check

assert_eq!(result, Err(output_id_doesnt_exist("in_all_but_logs.logs")));
Expand All @@ -481,17 +481,17 @@ mod test {
fn multiple_outputs() {
let mut graph = Graph::default();
graph
.with_source("in_log", DataType::Log)
.with_source("in_eventd", DataType::EventD)
.with_transform_multiple_outputs(
"log_to_log",
DataType::Log,
&[(None, DataType::Log), (Some("errors"), DataType::Log)],
"eventd_to_eventd",
DataType::EventD,
&[(None, DataType::EventD), (Some("errors"), DataType::EventD)],
)
.with_destination("out_log", DataType::Log)
.with_destination("out_errored_log", DataType::Log)
.with_edge("in_log", "log_to_log")
.with_edge("log_to_log", "out_log")
.with_edge("log_to_log.errors", "out_errored_log");
.with_destination("out_eventd", DataType::EventD)
.with_destination("out_errored_eventd", DataType::EventD)
.with_edge("in_eventd", "eventd_to_eventd")
.with_edge("eventd_to_eventd", "out_eventd")
.with_edge("eventd_to_eventd.errors", "out_errored_eventd");

assert_eq!(Ok(()), graph.check_data_types());
}
Expand All @@ -500,11 +500,11 @@ mod test {
fn detect_cycles() {
let mut graph = Graph::default();
graph
.with_source("in", DataType::Log)
.with_transform("one", DataType::Log, DataType::Log)
.with_transform("two", DataType::Log, DataType::Log)
.with_transform("three", DataType::Log, DataType::Log)
.with_destination("out", DataType::Log)
.with_source("in", DataType::EventD)
.with_transform("one", DataType::EventD, DataType::EventD)
.with_transform("two", DataType::EventD, DataType::EventD)
.with_transform("three", DataType::EventD, DataType::EventD)
.with_destination("out", DataType::EventD)
.with_multi_edge(&["in", "three"], "one")
.with_edge("one", "two")
.with_edge("two", "three")
Expand All @@ -519,11 +519,11 @@ mod test {

let mut graph = Graph::default();
graph
.with_source("in", DataType::Log)
.with_transform("one", DataType::Log, DataType::Log)
.with_transform("two", DataType::Log, DataType::Log)
.with_transform("three", DataType::Log, DataType::Log)
.with_destination("out", DataType::Log)
.with_source("in", DataType::EventD)
.with_transform("one", DataType::EventD, DataType::EventD)
.with_transform("two", DataType::EventD, DataType::EventD)
.with_transform("three", DataType::EventD, DataType::EventD)
.with_destination("out", DataType::EventD)
.with_multi_edge(&["in", "three"], "one")
.with_edge("one", "two")
.with_edge("two", "three")
Expand All @@ -541,10 +541,10 @@ mod test {
fn disconnected_components() {
let mut graph = Graph::default();
graph
.with_source("in", DataType::Log)
.with_transform("one", DataType::Log, DataType::Log)
.with_transform("two", DataType::Log, DataType::Log)
.with_destination("out", DataType::Log)
.with_source("in", DataType::EventD)
.with_transform("one", DataType::EventD, DataType::EventD)
.with_transform("two", DataType::EventD, DataType::EventD)
.with_destination("out", DataType::EventD)
.with_edge("in", "out");

assert_eq!(
Expand All @@ -559,11 +559,11 @@ mod test {
fn diamond_edge_formation() {
let mut graph = Graph::default();
graph
.with_source("in", DataType::Log)
.with_transform("one", DataType::Log, DataType::Log)
.with_transform("two", DataType::Log, DataType::Log)
.with_transform("three", DataType::Log, DataType::Log)
.with_destination("out", DataType::Log)
.with_source("in", DataType::EventD)
.with_transform("one", DataType::EventD, DataType::EventD)
.with_transform("two", DataType::EventD, DataType::EventD)
.with_transform("three", DataType::EventD, DataType::EventD)
.with_destination("out", DataType::EventD)
.with_edge("in", "one")
.with_edge("in", "two")
.with_multi_edge(&["one", "two"], "three")
Expand All @@ -576,14 +576,14 @@ mod test {
fn datatype_disjoint_sets() {
let mut graph = Graph::default();
graph
.with_source("in", DataType::Log)
.with_source("in", DataType::EventD)
.with_destination("out", DataType::Metric)
.with_edge("in", "out");

assert_eq!(
Err(GraphError::DataTypeMismatch {
from_component_output_id: try_into_component_output_id("in").unwrap(),
from_ty: DataType::Log,
from_ty: DataType::EventD,
to_component_id: try_into_component_id("out").unwrap(),
to_ty: DataType::Metric,
}),
Expand All @@ -595,10 +595,10 @@ mod test {
fn datatype_subset_into_superset() {
let mut graph = Graph::default();
graph
.with_source("in_log", DataType::Log)
.with_source("in_eventd", DataType::EventD)
.with_source("in_metric", DataType::Metric)
.with_destination("out", DataType::all_bits())
.with_multi_edge(&["in_log", "in_metric"], "out");
.with_multi_edge(&["in_eventd", "in_metric"], "out");

assert_eq!(Ok(()), graph.check_data_types());
}
Expand All @@ -608,14 +608,14 @@ mod test {
let mut graph = Graph::default();
graph
.with_source("in", DataType::all_bits())
.with_transform("log_to_any", DataType::Log, DataType::all_bits())
.with_transform("any_to_log", DataType::all_bits(), DataType::Log)
.with_destination("out_log", DataType::Log)
.with_transform("eventd_to_any", DataType::EventD, DataType::all_bits())
.with_transform("any_to_eventd", DataType::all_bits(), DataType::EventD)
.with_destination("out_eventd", DataType::EventD)
.with_destination("out_metric", DataType::Metric)
.with_edge("in", "log_to_any")
.with_edge("in", "any_to_log")
.with_multi_edge(&["in", "log_to_any", "any_to_log"], "out_log")
.with_multi_edge(&["in", "log_to_any"], "out_metric");
.with_edge("in", "eventd_to_any")
.with_edge("in", "any_to_eventd")
.with_multi_edge(&["in", "eventd_to_any", "any_to_eventd"], "out_eventd")
.with_multi_edge(&["in", "eventd_to_any"], "out_metric");

assert_eq!(Ok(()), graph.check_data_types());
}
Expand All @@ -624,21 +624,21 @@ mod test {
fn allows_both_directions_for_metrics() {
let mut graph = Graph::default();
graph
.with_source("in_log", DataType::Log)
.with_source("in_eventd", DataType::EventD)
.with_source("in_metric", DataType::Metric)
.with_transform("log_to_log", DataType::Log, DataType::Log)
.with_transform("eventd_to_eventd", DataType::EventD, DataType::EventD)
.with_transform("metric_to_metric", DataType::Metric, DataType::Metric)
.with_transform("any_to_any", DataType::all_bits(), DataType::all_bits())
.with_transform("any_to_log", DataType::all_bits(), DataType::Log)
.with_transform("any_to_eventd", DataType::all_bits(), DataType::EventD)
.with_transform("any_to_metric", DataType::all_bits(), DataType::Metric)
.with_destination("out_log", DataType::Log)
.with_destination("out_eventd", DataType::EventD)
.with_destination("out_metric", DataType::Metric)
.with_edge("in_log", "log_to_log")
.with_edge("in_eventd", "eventd_to_eventd")
.with_edge("in_metric", "metric_to_metric")
.with_multi_edge(&["log_to_log", "metric_to_metric"], "any_to_any")
.with_edge("any_to_any", "any_to_log")
.with_multi_edge(&["eventd_to_eventd", "metric_to_metric"], "any_to_any")
.with_edge("any_to_any", "any_to_eventd")
.with_edge("any_to_any", "any_to_metric")
.with_edge("any_to_log", "out_log")
.with_edge("any_to_eventd", "out_eventd")
.with_edge("any_to_metric", "out_metric");

assert_eq!(Ok(()), graph.check_data_types());
Expand Down
12 changes: 6 additions & 6 deletions lib/saluki-core/src/topology/interconnect/forwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ mod tests {
let forwarded_metric = forwarded_ebuf
.into_iter()
.next()
.and_then(|event| event.into_metric())
.and_then(|event| event.try_into_metric())
.expect("should be single metric in the buffer");
assert_eq!(forwarded_metric.context(), metric.context());
}
Expand Down Expand Up @@ -242,7 +242,7 @@ mod tests {
let forwarded_metric = forwarded_ebuf
.into_iter()
.next()
.and_then(|event| event.into_metric())
.and_then(|event| event.try_into_metric())
.expect("should be single metric in the buffer");
assert_eq!(forwarded_metric.context(), metric.context());
}
Expand Down Expand Up @@ -274,14 +274,14 @@ mod tests {
let forwarded_metric1 = forwarded_ebuf1
.into_iter()
.next()
.and_then(|event| event.into_metric())
.and_then(|event| event.try_into_metric())
.expect("should be single metric in the buffer");
assert_eq!(forwarded_metric1.context(), metric.context());

let forwarded_metric2 = forwarded_ebuf2
.into_iter()
.next()
.and_then(|event| event.into_metric())
.and_then(|event| event.try_into_metric())
.expect("should be single metric in the buffer");
assert_eq!(forwarded_metric2.context(), metric.context());
}
Expand Down Expand Up @@ -313,14 +313,14 @@ mod tests {
let forwarded_metric1 = forwarded_ebuf1
.into_iter()
.next()
.and_then(|event| event.into_metric())
.and_then(|event| event.try_into_metric())
.expect("should be single metric in the buffer");
assert_eq!(forwarded_metric1.context(), metric.context());

let forwarded_metric2 = forwarded_ebuf2
.into_iter()
.next()
.and_then(|event| event.into_metric())
.and_then(|event| event.try_into_metric())
.expect("should be single metric in the buffer");
assert_eq!(forwarded_metric2.context(), metric.context());
}
Expand Down
15 changes: 0 additions & 15 deletions lib/saluki-event/src/eventd/alert.rs

This file was deleted.

Loading