Skip to content

Commit

Permalink
feat: Add self monitoring to log gateway (#1813)
Browse files Browse the repository at this point in the history
Co-authored-by: Korbinian Stoemmer <korbinian.stoemmer@sap.com>
  • Loading branch information
rakesh-garimella and k15r authored Feb 19, 2025
1 parent c1d5703 commit debbb0d
Show file tree
Hide file tree
Showing 28 changed files with 982 additions and 59 deletions.
49 changes: 47 additions & 2 deletions .github/workflows/pr-integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ jobs:
- name: Run tests without Istio
if: ${{ matrix.scenario == 'healthy' }}
run: |
bin/ginkgo run ${{ runner.debug && '-v' || '' }} --tags e2e --label-filter="self-mon-${{ matrix.signal-type }}-${{ matrix.scenario }}" test/e2e
bin/ginkgo run ${{ runner.debug && '-v' || '' }} --tags e2e --label-filter="self-mon-${{ matrix.signal-type }}-${{ matrix.scenario }} && !experimental" test/e2e
# we need Istio for fault injection to simulate backpressure and outages
- name: Deploy Istio Module
Expand All @@ -158,7 +158,52 @@ jobs:
- name: Run tests with Istio
if: ${{ matrix.scenario != 'healthy' }}
run: |
bin/ginkgo run ${{ runner.debug && '-v' || '' }} --tags istio --label-filter="self-mon-${{ matrix.signal-type }}-${{ matrix.scenario }}" test/integration/istio
bin/ginkgo run ${{ runner.debug && '-v' || '' }} --tags istio --label-filter="self-mon-${{ matrix.signal-type }}-${{ matrix.scenario }}&& !experimental" test/integration/istio
- name: Finalize Test
uses: "./.github/template/finalize-test"
if: success() || failure()
with:
failure: failure()
job-name: ${{ github.job }}-${{ matrix.signal-type }}-${{ matrix.scenario }}

e2e-experimental-self-mon:
needs: setup
strategy:
fail-fast: false
matrix:
signal-type:
- logs
- metrics
- traces
scenario:
- healthy
- backpressure
- outage
runs-on: ubuntu-latest
steps:
- name: Checkout repo
uses: actions/checkout@v4

- name: Prepare Test
uses: "./.github/template/prepare-test"
with:
github-token: ${{ secrets.GITHUB_TOKEN }}

- name: Run tests without Istio
if: ${{ matrix.scenario == 'healthy' }}
run: |
bin/ginkgo run ${{ runner.debug && '-v' || '' }} --tags e2e --label-filter="self-mon-${{ matrix.signal-type }}-${{ matrix.scenario }} && experimental" test/e2e
# we need Istio for fault injection to simulate backpressure and outages
- name: Deploy Istio Module
if: ${{ matrix.scenario != 'healthy' }}
run: hack/deploy-istio.sh

- name: Run tests with Istio
if: ${{ matrix.scenario != 'healthy' }}
run: |
bin/ginkgo run ${{ runner.debug && '-v' || '' }} --tags istio --label-filter="self-mon-${{ matrix.signal-type }}-${{ matrix.scenario }} && experimental" test/integration/istio
- name: Finalize Test
uses: "./.github/template/finalize-test"
Expand Down
1 change: 1 addition & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ packages:
FlowHealthProber:
github.com/kyma-project/telemetry-manager/internal/reconciler/logpipeline/otel:
interfaces:
FlowHealthProber:
GatewayApplierDeleter:
GatewayConfigBuilder:
AgentApplierDeleter:
Expand Down
10 changes: 8 additions & 2 deletions controllers/telemetry/logpipeline_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,17 @@ func NewLogPipelineController(client client.Client, reconcileTriggerChan <-chan
return nil, err
}

otelFlowHealthProber, err := prober.NewOtelLogPipelineProber(types.NamespacedName{Name: config.SelfMonitorName, Namespace: config.TelemetryNamespace})
if err != nil {
return nil, err
}

fbReconciler, err := configureFluentBitReconciler(client, config, flowHealthProber)
if err != nil {
return nil, err
}

otelReconciler, err := configureOtelReconciler(client, config, flowHealthProber)
otelReconciler, err := configureOtelReconciler(client, config, otelFlowHealthProber)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -223,7 +228,7 @@ func configureFluentBitReconciler(client client.Client, config LogPipelineContro
}

//nolint:unparam // error is always nil: An error could be returned after implementing the IstioStatusChecker (TODO)
func configureOtelReconciler(client client.Client, config LogPipelineControllerConfig, _ *prober.LogPipelineProber) (*logpipelineotel.Reconciler, error) {
func configureOtelReconciler(client client.Client, config LogPipelineControllerConfig, flowHealthProber *prober.OTelPipelineProber) (*logpipelineotel.Reconciler, error) {
pipelineValidator := &logpipelineotel.Validator{
// TODO: Add validators
}
Expand All @@ -243,6 +248,7 @@ func configureOtelReconciler(client client.Client, config LogPipelineControllerC
client,
config.TelemetryNamespace,
config.ModuleVersion,
flowHealthProber,
agentConfigBuilder,
otelcollector.NewLogAgentApplierDeleter(config.OTelCollectorImage, config.TelemetryNamespace, config.LogAgentPriorityClassName),
&workloadstatus.DaemonSetProber{Client: client},
Expand Down
55 changes: 44 additions & 11 deletions internal/conditions/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,22 +65,35 @@ var commonMessages = map[string]string{
ReasonValidationFailed: "Pipeline validation failed due to an error from the Kubernetes API server",
}

var logPipelineMessages = map[string]string{
ReasonAgentConfigured: "LogPipeline specification is successfully applied to the configuration of Log agent",
ReasonAgentNotReady: "Log agent DaemonSet is not ready",
ReasonAgentReady: "Log agent DaemonSet is ready",
ReasonComponentsRunning: "All log components are running",
ReasonEndpointInvalid: "HTTP output host invalid: %s",
ReasonGatewayConfigured: "LogPipeline specification is successfully applied to the configuration of Log gateway",
ReasonGatewayNotReady: "Log gateway Deployment is not ready",
ReasonGatewayReady: "Log gateway Deployment is ready",
var commonLogPipelineMessages = map[string]string{
ReasonAgentConfigured: "LogPipeline specification is successfully applied to the configuration of Log agent",
ReasonAgentNotReady: "Log agent DaemonSet is not ready",
ReasonAgentReady: "Log agent DaemonSet is ready",
ReasonComponentsRunning: "All log components are running",
}

var fluentBitLogPipelineMessages = map[string]string{
ReasonEndpointInvalid: "HTTP output host invalid: %s",

ReasonSelfMonAllDataDropped: "Backend is not reachable or rejecting logs. All logs are dropped. See troubleshooting: https://kyma-project.io/#/telemetry-manager/user/02-logs?id=no-logs-arrive-at-the-backend",
ReasonSelfMonBufferFillingUp: "Buffer nearing capacity. Incoming log rate exceeds export rate. See troubleshooting: https://kyma-project.io/#/telemetry-manager/user/02-logs?id=agent-buffer-filling-up",
ReasonSelfMonConfigNotGenerated: "No logs delivered to backend because LogPipeline specification is not applied to the configuration of Log agent. Check the 'ConfigurationGenerated' condition for more details",
ReasonSelfMonNoLogsDelivered: "Backend is not reachable or rejecting logs. Logs are buffered and not yet dropped. See troubleshooting: https://kyma-project.io/#/telemetry-manager/user/02-logs?id=no-logs-arrive-at-the-backend",
ReasonSelfMonSomeDataDropped: "Backend is reachable, but rejecting logs. Some logs are dropped. See troubleshooting: https://kyma-project.io/#/telemetry-manager/user/02-logs?id=not-all-logs-arrive-at-the-backend",
}

var otelLogPipelineMessages = map[string]string{
ReasonGatewayConfigured: "LogPipeline specification is successfully applied to the configuration of Log gateway",
ReasonGatewayNotReady: "Log gateway Deployment is not ready",
ReasonGatewayReady: "Log gateway Deployment is ready",

ReasonSelfMonAllDataDropped: "Backend is not reachable or rejecting logs. All logs are dropped.",
ReasonSelfMonBufferFillingUp: "Buffer nearing capacity. Incoming log rate exceeds export rate.",
ReasonSelfMonConfigNotGenerated: "No logs delivered to backend because LogPipeline specification is not applied to the configuration of Log agent. Check the 'ConfigurationGenerated' condition for more details",
ReasonSelfMonGatewayThrottling: "Log gateway is unable to receive logs at current rate.",
ReasonSelfMonSomeDataDropped: "Backend is reachable, but rejecting logs. Some logs are dropped.",
}

var tracePipelineMessages = map[string]string{
ReasonComponentsRunning: "All trace components are running",
ReasonEndpointInvalid: "OTLP output endpoint invalid: %s",
Expand Down Expand Up @@ -109,8 +122,28 @@ var metricPipelineMessages = map[string]string{
ReasonSelfMonSomeDataDropped: "Backend is reachable, but rejecting metrics. Some metrics are dropped. See troubleshooting: https://kyma-project.io/#/telemetry-manager/user/04-metrics?id=metrics-not-arriving-at-the-destination",
}

func MessageForLogPipeline(reason string) string {
return message(reason, logPipelineMessages)
func MessageForOtelLogPipeline(reason string) string {
return messageForLogPipelines(reason, otelLogPipelineMessages)
}

func MessageForFluentBitLogPipeline(reason string) string {
return messageForLogPipelines(reason, fluentBitLogPipelineMessages)
}

func messageForLogPipelines(reason string, specializedMessages map[string]string) string {
if condMessage, found := commonMessages[reason]; found {
return condMessage
}

if condMessage, found := commonLogPipelineMessages[reason]; found {
return condMessage
}

if condMessage, found := specializedMessages[reason]; found {
return condMessage
}

return ""
}

func MessageForTracePipeline(reason string) string {
Expand Down
9 changes: 6 additions & 3 deletions internal/conditions/conditions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ import (

func TestMessageFor(t *testing.T) {
t.Run("should return correct message which is common to all pipelines", func(t *testing.T) {
message := MessageForLogPipeline(ReasonReferencedSecretMissing)
message := MessageForFluentBitLogPipeline(ReasonReferencedSecretMissing)
require.Equal(t, commonMessages[ReasonReferencedSecretMissing], message)
})

t.Run("should return correct message which is unique to each pipeline", func(t *testing.T) {
logsDaemonSetNotReadyMessage := MessageForLogPipeline(ReasonAgentNotReady)
require.Equal(t, logPipelineMessages[ReasonAgentNotReady], logsDaemonSetNotReadyMessage)
logsDaemonSetNotReadyMessage := MessageForFluentBitLogPipeline(ReasonEndpointInvalid)
require.Equal(t, fluentBitLogPipelineMessages[ReasonEndpointInvalid], logsDaemonSetNotReadyMessage)

logDeploymentNotReadyMessage := MessageForOtelLogPipeline(ReasonGatewayNotReady)
require.Equal(t, otelLogPipelineMessages[ReasonGatewayNotReady], logDeploymentNotReadyMessage)

tracesDeploymentNotReadyMessage := MessageForTracePipeline(ReasonGatewayNotReady)
require.Equal(t, tracePipelineMessages[ReasonGatewayNotReady], tracesDeploymentNotReadyMessage)
Expand Down
12 changes: 6 additions & 6 deletions internal/conditions/tls_cert_conditions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func Test_EvaluateTLSCertCondition(t *testing.T) {
given: tlscert.ErrMissingCertKeyPair,
expectedStatus: metav1.ConditionFalse,
expectedReason: ReasonTLSConfigurationInvalid,
expectedMessage: fmt.Sprintf(MessageForLogPipeline(ReasonTLSConfigurationInvalid), tlscert.ErrMissingCertKeyPair),
expectedMessage: fmt.Sprintf(MessageForFluentBitLogPipeline(ReasonTLSConfigurationInvalid), tlscert.ErrMissingCertKeyPair),
},
{
name: "cert decode failed",
Expand All @@ -45,28 +45,28 @@ func Test_EvaluateTLSCertCondition(t *testing.T) {
given: tlscert.ErrKeyDecodeFailed,
expectedStatus: metav1.ConditionFalse,
expectedReason: ReasonTLSConfigurationInvalid,
expectedMessage: fmt.Sprintf(MessageForLogPipeline(ReasonTLSConfigurationInvalid), tlscert.ErrKeyDecodeFailed),
expectedMessage: fmt.Sprintf(MessageForFluentBitLogPipeline(ReasonTLSConfigurationInvalid), tlscert.ErrKeyDecodeFailed),
},
{
name: "private key parse failed",
given: tlscert.ErrKeyParseFailed,
expectedStatus: metav1.ConditionFalse,
expectedReason: ReasonTLSConfigurationInvalid,
expectedMessage: fmt.Sprintf(MessageForLogPipeline(ReasonTLSConfigurationInvalid), tlscert.ErrKeyParseFailed),
expectedMessage: fmt.Sprintf(MessageForFluentBitLogPipeline(ReasonTLSConfigurationInvalid), tlscert.ErrKeyParseFailed),
},
{
name: "ca decode failed",
given: tlscert.ErrCADecodeFailed,
expectedStatus: metav1.ConditionFalse,
expectedReason: ReasonTLSConfigurationInvalid,
expectedMessage: fmt.Sprintf(MessageForLogPipeline(ReasonTLSConfigurationInvalid), tlscert.ErrCADecodeFailed),
expectedMessage: fmt.Sprintf(MessageForFluentBitLogPipeline(ReasonTLSConfigurationInvalid), tlscert.ErrCADecodeFailed),
},
{
name: "ca parse failed",
given: tlscert.ErrCAParseFailed,
expectedStatus: metav1.ConditionFalse,
expectedReason: ReasonTLSConfigurationInvalid,
expectedMessage: fmt.Sprintf(MessageForLogPipeline(ReasonTLSConfigurationInvalid), tlscert.ErrCAParseFailed),
expectedMessage: fmt.Sprintf(MessageForFluentBitLogPipeline(ReasonTLSConfigurationInvalid), tlscert.ErrCAParseFailed),
},
{
name: "cert expired",
Expand Down Expand Up @@ -101,7 +101,7 @@ func Test_EvaluateTLSCertCondition(t *testing.T) {
given: tlscert.ErrInvalidCertificateKeyPair,
expectedStatus: metav1.ConditionFalse,
expectedReason: ReasonTLSConfigurationInvalid,
expectedMessage: fmt.Sprintf(MessageForLogPipeline(ReasonTLSConfigurationInvalid), tlscert.ErrInvalidCertificateKeyPair),
expectedMessage: fmt.Sprintf(MessageForFluentBitLogPipeline(ReasonTLSConfigurationInvalid), tlscert.ErrInvalidCertificateKeyPair),
},
}

Expand Down
23 changes: 13 additions & 10 deletions internal/reconciler/commonstatus/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import (
)

const (
SignalTypeTraces = "traces"
SignalTypeMetrics = "metrics"
SignalTypeLogs = "logs"
SignalTypeTraces = "traces"
SignalTypeMetrics = "metrics"
SignalTypeLogs = "logs"
SignalTypeOtelLogs = "otel-logs"
)

type Prober interface {
Expand All @@ -31,12 +32,11 @@ func GetGatewayHealthyCondition(ctx context.Context, prober Prober, namespacedNa
reason := conditions.ReasonGatewayReady
msg := conditions.MessageForTracePipeline(reason)

if signalType == SignalTypeMetrics {
switch signalType {
case SignalTypeMetrics:
msg = conditions.MessageForMetricPipeline(reason)
}

if signalType == SignalTypeLogs {
msg = conditions.MessageForLogPipeline(reason)
case SignalTypeOtelLogs:
msg = conditions.MessageForOtelLogPipeline(reason)
}

err := prober.IsReady(ctx, namespacedName)
Expand Down Expand Up @@ -66,10 +66,13 @@ func GetGatewayHealthyCondition(ctx context.Context, prober Prober, namespacedNa
func GetAgentHealthyCondition(ctx context.Context, prober Prober, namespacedName types.NamespacedName, errToMsgCon ErrorToMessageConverter, signalType string) *metav1.Condition {
status := metav1.ConditionTrue
reason := conditions.ReasonAgentReady
msg := conditions.MessageForLogPipeline(reason)
msg := conditions.MessageForFluentBitLogPipeline(reason)

if signalType == SignalTypeMetrics {
switch signalType {
case SignalTypeMetrics:
msg = conditions.MessageForMetricPipeline(reason)
case SignalTypeOtelLogs:
msg = conditions.MessageForOtelLogPipeline(reason)
}

err := prober.IsReady(ctx, namespacedName)
Expand Down
2 changes: 1 addition & 1 deletion internal/reconciler/commonstatus/checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func TestLogsGetHealthCondition(t *testing.T) {
Type: conditions.TypeAgentHealthy,
Status: metav1.ConditionTrue,
Reason: conditions.ReasonAgentReady,
Message: conditions.MessageForLogPipeline(conditions.ReasonAgentReady),
Message: conditions.MessageForFluentBitLogPipeline(conditions.ReasonAgentReady),
},
},
{
Expand Down
2 changes: 1 addition & 1 deletion internal/reconciler/logparser/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestUpdateStatus(t *testing.T) {
require.NotNil(t, agentHealthyCond, "could not find condition of type %s", conditions.TypeAgentHealthy)
require.Equal(t, metav1.ConditionTrue, agentHealthyCond.Status)
require.Equal(t, conditions.ReasonAgentReady, agentHealthyCond.Reason)
require.Equal(t, conditions.MessageForLogPipeline(conditions.ReasonAgentReady), agentHealthyCond.Message)
require.Equal(t, conditions.MessageForFluentBitLogPipeline(conditions.ReasonAgentReady), agentHealthyCond.Message)
require.Equal(t, updatedParser.Generation, agentHealthyCond.ObservedGeneration)
require.NotEmpty(t, agentHealthyCond.LastTransitionTime)
})
Expand Down
8 changes: 4 additions & 4 deletions internal/reconciler/logpipeline/fluentbit/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (r *Reconciler) setFluentBitConfigGeneratedCondition(ctx context.Context, p
func (r *Reconciler) evaluateConfigGeneratedCondition(ctx context.Context, pipeline *telemetryv1alpha1.LogPipeline) (status metav1.ConditionStatus, reason string, message string) {
err := r.pipelineValidator.validate(ctx, pipeline)
if err == nil {
return metav1.ConditionTrue, conditions.ReasonAgentConfigured, conditions.MessageForLogPipeline(conditions.ReasonAgentConfigured)
return metav1.ConditionTrue, conditions.ReasonAgentConfigured, conditions.MessageForFluentBitLogPipeline(conditions.ReasonAgentConfigured)
}

if errors.Is(err, secretref.ErrSecretRefNotFound) || errors.Is(err, secretref.ErrSecretKeyNotFound) || errors.Is(err, secretref.ErrSecretRefMissingFields) {
Expand All @@ -100,12 +100,12 @@ func (r *Reconciler) evaluateConfigGeneratedCondition(ctx context.Context, pipel
if endpoint.IsEndpointInvalidError(err) {
return metav1.ConditionFalse,
conditions.ReasonEndpointInvalid,
fmt.Sprintf(conditions.MessageForLogPipeline(conditions.ReasonEndpointInvalid), err.Error())
fmt.Sprintf(conditions.MessageForFluentBitLogPipeline(conditions.ReasonEndpointInvalid), err.Error())
}

var APIRequestFailed *errortypes.APIRequestFailedError
if errors.As(err, &APIRequestFailed) {
return metav1.ConditionFalse, conditions.ReasonValidationFailed, conditions.MessageForLogPipeline(conditions.ReasonValidationFailed)
return metav1.ConditionFalse, conditions.ReasonValidationFailed, conditions.MessageForFluentBitLogPipeline(conditions.ReasonValidationFailed)
}

return conditions.EvaluateTLSCertCondition(err)
Expand All @@ -118,7 +118,7 @@ func (r *Reconciler) setFlowHealthCondition(ctx context.Context, pipeline *telem
Type: conditions.TypeFlowHealthy,
Status: status,
Reason: reason,
Message: conditions.MessageForLogPipeline(reason),
Message: conditions.MessageForFluentBitLogPipeline(reason),
ObservedGeneration: pipeline.Generation,
}

Expand Down
Loading

0 comments on commit debbb0d

Please sign in to comment.