Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add self monitoring to log gateway #1813

Merged
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 47 additions & 2 deletions .github/workflows/pr-integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ jobs:
- name: Run tests without Istio
if: ${{ matrix.scenario == 'healthy' }}
run: |
bin/ginkgo run ${{ runner.debug && '-v' || '' }} --tags e2e --label-filter="self-mon-${{ matrix.signal-type }}-${{ matrix.scenario }}" test/e2e
bin/ginkgo run ${{ runner.debug && '-v' || '' }} --tags e2e --label-filter="self-mon-${{ matrix.signal-type }}-${{ matrix.scenario }} && !experimental" test/e2e

# we need Istio for fault injection to simulate backpressure and outages
- name: Deploy Istio Module
Expand All @@ -157,7 +157,52 @@ jobs:
- name: Run tests with Istio
if: ${{ matrix.scenario != 'healthy' }}
run: |
bin/ginkgo run ${{ runner.debug && '-v' || '' }} --tags istio --label-filter="self-mon-${{ matrix.signal-type }}-${{ matrix.scenario }}" test/integration/istio
bin/ginkgo run ${{ runner.debug && '-v' || '' }} --tags istio --label-filter="self-mon-${{ matrix.signal-type }}-${{ matrix.scenario }}&& !experimental" test/integration/istio

- name: Finalize Test
uses: "./.github/template/finalize-test"
if: success() || failure()
with:
failure: failure()
job-name: ${{ github.job }}-${{ matrix.signal-type }}-${{ matrix.scenario }}

e2e-experimental-self-mon:
needs: setup
strategy:
fail-fast: false
matrix:
signal-type:
- logs
- metrics
- traces
scenario:
- healthy
- backpressure
- outage
runs-on: ubuntu-latest
steps:
- name: Checkout repo
uses: actions/checkout@v4

- name: Prepare Test
uses: "./.github/template/prepare-test"
with:
github-token: ${{ secrets.GITHUB_TOKEN }}

- name: Run tests without Istio
if: ${{ matrix.scenario == 'healthy' }}
run: |
bin/ginkgo run ${{ runner.debug && '-v' || '' }} --tags e2e --label-filter="self-mon-${{ matrix.signal-type }}-${{ matrix.scenario }} && experimental" test/e2e

# we need Istio for fault injection to simulate backpressure and outages
- name: Deploy Istio Module
if: ${{ matrix.scenario != 'healthy' }}
run: hack/deploy-istio.sh

- name: Run tests with Istio
if: ${{ matrix.scenario != 'healthy' }}
run: |
bin/ginkgo run ${{ runner.debug && '-v' || '' }} --tags istio --label-filter="self-mon-${{ matrix.signal-type }}-${{ matrix.scenario }} && experimental" test/integration/istio

- name: Finalize Test
uses: "./.github/template/finalize-test"
Expand Down
1 change: 1 addition & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ packages:
FlowHealthProber:
github.com/kyma-project/telemetry-manager/internal/reconciler/logpipeline/otel:
interfaces:
FlowHealthProber:
GatewayApplierDeleter:
GatewayConfigBuilder:
AgentApplierDeleter:
Expand Down
10 changes: 8 additions & 2 deletions controllers/telemetry/logpipeline_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,17 @@ func NewLogPipelineController(client client.Client, reconcileTriggerChan <-chan
return nil, err
}

otelFlowHealthProber, err := prober.NewOtelLogPipelineProber(types.NamespacedName{Name: config.SelfMonitorName, Namespace: config.TelemetryNamespace})
if err != nil {
return nil, err
}

fbReconciler, err := configureFluentBitReconciler(client, config, flowHealthProber)
if err != nil {
return nil, err
}

otelReconciler, err := configureOtelReconciler(client, config, flowHealthProber)
otelReconciler, err := configureOtelReconciler(client, config, otelFlowHealthProber)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -200,7 +205,7 @@ func configureFluentBitReconciler(client client.Client, config LogPipelineContro
}

//nolint:unparam // error is always nil: An error could be returned after implementing the IstioStatusChecker (TODO)
func configureOtelReconciler(client client.Client, config LogPipelineControllerConfig, _ *prober.LogPipelineProber) (*logpipelineotel.Reconciler, error) {
func configureOtelReconciler(client client.Client, config LogPipelineControllerConfig, flowHealthProber *prober.OTelPipelineProber) (*logpipelineotel.Reconciler, error) {
pipelineValidator := &logpipelineotel.Validator{
// TODO: Add validators
}
Expand All @@ -220,6 +225,7 @@ func configureOtelReconciler(client client.Client, config LogPipelineControllerC
client,
config.TelemetryNamespace,
config.ModuleVersion,
flowHealthProber,
agentConfigBuilder,
otelcollector.NewLogAgentApplierDeleter(config.OTelCollectorImage, config.TelemetryNamespace, config.LogAgentPriorityClassName),
&workloadstatus.DaemonSetProber{Client: client},
Expand Down
55 changes: 44 additions & 11 deletions internal/conditions/conditions.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,22 +65,35 @@ var commonMessages = map[string]string{
ReasonValidationFailed: "Pipeline validation failed due to an error from the Kubernetes API server",
}

var logPipelineMessages = map[string]string{
ReasonAgentConfigured: "LogPipeline specification is successfully applied to the configuration of Log agent",
ReasonAgentNotReady: "Log agent DaemonSet is not ready",
ReasonAgentReady: "Log agent DaemonSet is ready",
ReasonComponentsRunning: "All log components are running",
ReasonEndpointInvalid: "HTTP output host invalid: %s",
ReasonGatewayConfigured: "LogPipeline specification is successfully applied to the configuration of Log gateway",
ReasonGatewayNotReady: "Log gateway Deployment is not ready",
ReasonGatewayReady: "Log gateway Deployment is ready",
var commonLogPipelineMessages = map[string]string{
ReasonAgentConfigured: "LogPipeline specification is successfully applied to the configuration of Log agent",
ReasonAgentNotReady: "Log agent DaemonSet is not ready",
ReasonAgentReady: "Log agent DaemonSet is ready",
ReasonComponentsRunning: "All log components are running",
}

var fluentBitLogPipelineMessages = map[string]string{
ReasonEndpointInvalid: "HTTP output host invalid: %s",

ReasonSelfMonAllDataDropped: "Backend is not reachable or rejecting logs. All logs are dropped. See troubleshooting: https://kyma-project.io/#/telemetry-manager/user/02-logs?id=no-logs-arrive-at-the-backend",
ReasonSelfMonBufferFillingUp: "Buffer nearing capacity. Incoming log rate exceeds export rate. See troubleshooting: https://kyma-project.io/#/telemetry-manager/user/02-logs?id=agent-buffer-filling-up",
ReasonSelfMonConfigNotGenerated: "No logs delivered to backend because LogPipeline specification is not applied to the configuration of Log agent. Check the 'ConfigurationGenerated' condition for more details",
ReasonSelfMonNoLogsDelivered: "Backend is not reachable or rejecting logs. Logs are buffered and not yet dropped. See troubleshooting: https://kyma-project.io/#/telemetry-manager/user/02-logs?id=no-logs-arrive-at-the-backend",
ReasonSelfMonSomeDataDropped: "Backend is reachable, but rejecting logs. Some logs are dropped. See troubleshooting: https://kyma-project.io/#/telemetry-manager/user/02-logs?id=not-all-logs-arrive-at-the-backend",
}

var otelLogPipelineMessages = map[string]string{
ReasonGatewayConfigured: "LogPipeline specification is successfully applied to the configuration of Log gateway",
ReasonGatewayNotReady: "Log gateway Deployment is not ready",
ReasonGatewayReady: "Log gateway Deployment is ready",

ReasonSelfMonAllDataDropped: "Backend is not reachable or rejecting logs. All logs are dropped.",
ReasonSelfMonBufferFillingUp: "Buffer nearing capacity. Incoming log rate exceeds export rate.",
ReasonSelfMonConfigNotGenerated: "No logs delivered to backend because LogPipeline specification is not applied to the configuration of Log agent. Check the 'ConfigurationGenerated' condition for more details",
ReasonSelfMonGatewayThrottling: "Log gateway is unable to receive logs at current rate.",
ReasonSelfMonSomeDataDropped: "Backend is reachable, but rejecting logs. Some logs are dropped.",
}

var tracePipelineMessages = map[string]string{
ReasonComponentsRunning: "All trace components are running",
ReasonEndpointInvalid: "OTLP output endpoint invalid: %s",
Expand Down Expand Up @@ -109,8 +122,28 @@ var metricPipelineMessages = map[string]string{
ReasonSelfMonSomeDataDropped: "Backend is reachable, but rejecting metrics. Some metrics are dropped. See troubleshooting: https://kyma-project.io/#/telemetry-manager/user/04-metrics?id=metrics-not-arriving-at-the-destination",
}

func MessageForLogPipeline(reason string) string {
return message(reason, logPipelineMessages)
func MessageForOtelLogPipeline(reason string) string {
return messageForLogPipelines(reason, otelLogPipelineMessages)
}

func MessageForFluentBitLogPipeline(reason string) string {
return messageForLogPipelines(reason, fluentBitLogPipelineMessages)
}

func messageForLogPipelines(reason string, specializedMessages map[string]string) string {
if condMessage, found := commonMessages[reason]; found {
return condMessage
}

if condMessage, found := commonLogPipelineMessages[reason]; found {
return condMessage
}

if condMessage, found := specializedMessages[reason]; found {
return condMessage
}

return ""
}

func MessageForTracePipeline(reason string) string {
Expand Down
9 changes: 6 additions & 3 deletions internal/conditions/conditions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,16 @@ import (

func TestMessageFor(t *testing.T) {
t.Run("should return correct message which is common to all pipelines", func(t *testing.T) {
message := MessageForLogPipeline(ReasonReferencedSecretMissing)
message := MessageForFluentBitLogPipeline(ReasonReferencedSecretMissing)
require.Equal(t, commonMessages[ReasonReferencedSecretMissing], message)
})

t.Run("should return correct message which is unique to each pipeline", func(t *testing.T) {
logsDaemonSetNotReadyMessage := MessageForLogPipeline(ReasonAgentNotReady)
require.Equal(t, logPipelineMessages[ReasonAgentNotReady], logsDaemonSetNotReadyMessage)
logsDaemonSetNotReadyMessage := MessageForFluentBitLogPipeline(ReasonEndpointInvalid)
require.Equal(t, fluentBitLogPipelineMessages[ReasonEndpointInvalid], logsDaemonSetNotReadyMessage)

logDeploymentNotReadyMessage := MessageForOtelLogPipeline(ReasonGatewayNotReady)
require.Equal(t, otelLogPipelineMessages[ReasonGatewayNotReady], logDeploymentNotReadyMessage)

tracesDeploymentNotReadyMessage := MessageForTracePipeline(ReasonGatewayNotReady)
require.Equal(t, tracePipelineMessages[ReasonGatewayNotReady], tracesDeploymentNotReadyMessage)
Expand Down
12 changes: 6 additions & 6 deletions internal/conditions/tls_cert_conditions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func Test_EvaluateTLSCertCondition(t *testing.T) {
given: tlscert.ErrMissingCertKeyPair,
expectedStatus: metav1.ConditionFalse,
expectedReason: ReasonTLSConfigurationInvalid,
expectedMessage: fmt.Sprintf(MessageForLogPipeline(ReasonTLSConfigurationInvalid), tlscert.ErrMissingCertKeyPair),
expectedMessage: fmt.Sprintf(MessageForFluentBitLogPipeline(ReasonTLSConfigurationInvalid), tlscert.ErrMissingCertKeyPair),
},
{
name: "cert decode failed",
Expand All @@ -45,28 +45,28 @@ func Test_EvaluateTLSCertCondition(t *testing.T) {
given: tlscert.ErrKeyDecodeFailed,
expectedStatus: metav1.ConditionFalse,
expectedReason: ReasonTLSConfigurationInvalid,
expectedMessage: fmt.Sprintf(MessageForLogPipeline(ReasonTLSConfigurationInvalid), tlscert.ErrKeyDecodeFailed),
expectedMessage: fmt.Sprintf(MessageForFluentBitLogPipeline(ReasonTLSConfigurationInvalid), tlscert.ErrKeyDecodeFailed),
},
{
name: "private key parse failed",
given: tlscert.ErrKeyParseFailed,
expectedStatus: metav1.ConditionFalse,
expectedReason: ReasonTLSConfigurationInvalid,
expectedMessage: fmt.Sprintf(MessageForLogPipeline(ReasonTLSConfigurationInvalid), tlscert.ErrKeyParseFailed),
expectedMessage: fmt.Sprintf(MessageForFluentBitLogPipeline(ReasonTLSConfigurationInvalid), tlscert.ErrKeyParseFailed),
},
{
name: "ca decode failed",
given: tlscert.ErrCADecodeFailed,
expectedStatus: metav1.ConditionFalse,
expectedReason: ReasonTLSConfigurationInvalid,
expectedMessage: fmt.Sprintf(MessageForLogPipeline(ReasonTLSConfigurationInvalid), tlscert.ErrCADecodeFailed),
expectedMessage: fmt.Sprintf(MessageForFluentBitLogPipeline(ReasonTLSConfigurationInvalid), tlscert.ErrCADecodeFailed),
},
{
name: "ca parse failed",
given: tlscert.ErrCAParseFailed,
expectedStatus: metav1.ConditionFalse,
expectedReason: ReasonTLSConfigurationInvalid,
expectedMessage: fmt.Sprintf(MessageForLogPipeline(ReasonTLSConfigurationInvalid), tlscert.ErrCAParseFailed),
expectedMessage: fmt.Sprintf(MessageForFluentBitLogPipeline(ReasonTLSConfigurationInvalid), tlscert.ErrCAParseFailed),
},
{
name: "cert expired",
Expand Down Expand Up @@ -101,7 +101,7 @@ func Test_EvaluateTLSCertCondition(t *testing.T) {
given: tlscert.ErrInvalidCertificateKeyPair,
expectedStatus: metav1.ConditionFalse,
expectedReason: ReasonTLSConfigurationInvalid,
expectedMessage: fmt.Sprintf(MessageForLogPipeline(ReasonTLSConfigurationInvalid), tlscert.ErrInvalidCertificateKeyPair),
expectedMessage: fmt.Sprintf(MessageForFluentBitLogPipeline(ReasonTLSConfigurationInvalid), tlscert.ErrInvalidCertificateKeyPair),
},
}

Expand Down
17 changes: 11 additions & 6 deletions internal/reconciler/commonstatus/checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import (
)

const (
SignalTypeTraces = "traces"
SignalTypeMetrics = "metrics"
SignalTypeLogs = "logs"
SignalTypeTraces = "traces"
SignalTypeMetrics = "metrics"
SignalTypeLogs = "logs"
SignalTypeOtelLogs = "otel-logs"
)

type Prober interface {
Expand All @@ -35,8 +36,8 @@ func GetGatewayHealthyCondition(ctx context.Context, prober Prober, namespacedNa
msg = conditions.MessageForMetricPipeline(reason)
}

if signalType == SignalTypeLogs {
msg = conditions.MessageForLogPipeline(reason)
if signalType == SignalTypeOtelLogs {
msg = conditions.MessageForOtelLogPipeline(reason)
}

err := prober.IsReady(ctx, namespacedName)
Expand Down Expand Up @@ -66,12 +67,16 @@ func GetGatewayHealthyCondition(ctx context.Context, prober Prober, namespacedNa
func GetAgentHealthyCondition(ctx context.Context, prober Prober, namespacedName types.NamespacedName, errToMsgCon ErrorToMessageConverter, signalType string) *metav1.Condition {
status := metav1.ConditionTrue
reason := conditions.ReasonAgentReady
msg := conditions.MessageForLogPipeline(reason)
msg := conditions.MessageForFluentBitLogPipeline(reason)

if signalType == SignalTypeMetrics {
msg = conditions.MessageForMetricPipeline(reason)
}

if signalType == SignalTypeOtelLogs {
msg = conditions.MessageForOtelLogPipeline(reason)
}

err := prober.IsReady(ctx, namespacedName)
if err != nil && !workloadstatus.IsRolloutInProgressError(err) {
logf.FromContext(ctx).V(1).Error(err, "Failed to probe agent - set condition as not healthy")
Expand Down
2 changes: 1 addition & 1 deletion internal/reconciler/commonstatus/checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func TestLogsGetHealthCondition(t *testing.T) {
Type: conditions.TypeAgentHealthy,
Status: metav1.ConditionTrue,
Reason: conditions.ReasonAgentReady,
Message: conditions.MessageForLogPipeline(conditions.ReasonAgentReady),
Message: conditions.MessageForFluentBitLogPipeline(conditions.ReasonAgentReady),
},
},
{
Expand Down
2 changes: 1 addition & 1 deletion internal/reconciler/logparser/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func TestUpdateStatus(t *testing.T) {
require.NotNil(t, agentHealthyCond, "could not find condition of type %s", conditions.TypeAgentHealthy)
require.Equal(t, metav1.ConditionTrue, agentHealthyCond.Status)
require.Equal(t, conditions.ReasonAgentReady, agentHealthyCond.Reason)
require.Equal(t, conditions.MessageForLogPipeline(conditions.ReasonAgentReady), agentHealthyCond.Message)
require.Equal(t, conditions.MessageForFluentBitLogPipeline(conditions.ReasonAgentReady), agentHealthyCond.Message)
require.Equal(t, updatedParser.Generation, agentHealthyCond.ObservedGeneration)
require.NotEmpty(t, agentHealthyCond.LastTransitionTime)
})
Expand Down
8 changes: 4 additions & 4 deletions internal/reconciler/logpipeline/fluentbit/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (r *Reconciler) setFluentBitConfigGeneratedCondition(ctx context.Context, p
func (r *Reconciler) evaluateConfigGeneratedCondition(ctx context.Context, pipeline *telemetryv1alpha1.LogPipeline) (status metav1.ConditionStatus, reason string, message string) {
err := r.pipelineValidator.validate(ctx, pipeline)
if err == nil {
return metav1.ConditionTrue, conditions.ReasonAgentConfigured, conditions.MessageForLogPipeline(conditions.ReasonAgentConfigured)
return metav1.ConditionTrue, conditions.ReasonAgentConfigured, conditions.MessageForFluentBitLogPipeline(conditions.ReasonAgentConfigured)
}

if errors.Is(err, secretref.ErrSecretRefNotFound) || errors.Is(err, secretref.ErrSecretKeyNotFound) || errors.Is(err, secretref.ErrSecretRefMissingFields) {
Expand All @@ -100,12 +100,12 @@ func (r *Reconciler) evaluateConfigGeneratedCondition(ctx context.Context, pipel
if endpoint.IsEndpointInvalidError(err) {
return metav1.ConditionFalse,
conditions.ReasonEndpointInvalid,
fmt.Sprintf(conditions.MessageForLogPipeline(conditions.ReasonEndpointInvalid), err.Error())
fmt.Sprintf(conditions.MessageForFluentBitLogPipeline(conditions.ReasonEndpointInvalid), err.Error())
}

var APIRequestFailed *errortypes.APIRequestFailedError
if errors.As(err, &APIRequestFailed) {
return metav1.ConditionFalse, conditions.ReasonValidationFailed, conditions.MessageForLogPipeline(conditions.ReasonValidationFailed)
return metav1.ConditionFalse, conditions.ReasonValidationFailed, conditions.MessageForFluentBitLogPipeline(conditions.ReasonValidationFailed)
}

return conditions.EvaluateTLSCertCondition(err)
Expand All @@ -118,7 +118,7 @@ func (r *Reconciler) setFlowHealthCondition(ctx context.Context, pipeline *telem
Type: conditions.TypeFlowHealthy,
Status: status,
Reason: reason,
Message: conditions.MessageForLogPipeline(reason),
Message: conditions.MessageForFluentBitLogPipeline(reason),
ObservedGeneration: pipeline.Generation,
}

Expand Down
Loading
Loading