From cfe261c3d2347d3890cbb4acadb18e4d58618a6f Mon Sep 17 00:00:00 2001 From: Evgenii Balakhonov Date: Fri, 8 Mar 2024 02:22:26 +0300 Subject: [PATCH] The JMX Bean metric support implemented. So far, one metric has been implemented: The number of completed retries made in the current task. --- .../connect/http/config/HttpSinkConfig.java | 8 +++ .../http/metrics/HttpConnectorMetrics.java | 55 +++++++++++++++++++ .../http/sender/AbstractHttpSender.java | 3 + 3 files changed, 66 insertions(+) create mode 100644 src/main/java/io/aiven/kafka/connect/http/metrics/HttpConnectorMetrics.java diff --git a/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java b/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java index 9f314c51..82581bd5 100644 --- a/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java +++ b/src/main/java/io/aiven/kafka/connect/http/config/HttpSinkConfig.java @@ -35,6 +35,8 @@ import org.apache.kafka.common.config.types.Password; import org.apache.kafka.connect.errors.ConnectException; +import io.aiven.kafka.connect.http.metrics.HttpConnectorMetrics; + import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; public final class HttpSinkConfig extends AbstractConfig { @@ -85,6 +87,8 @@ public final class HttpSinkConfig extends AbstractConfig { private static final String ERRORS_GROUP = "Errors Handling"; private static final String ERRORS_TOLERANCE = "errors.tolerance"; + private final HttpConnectorMetrics metrics = new HttpConnectorMetrics(this); + public static ConfigDef configDef() { final ConfigDef configDef = new ConfigDef(); addConnectionConfigGroup(configDef); @@ -786,6 +790,10 @@ public final boolean sslTrustAllCertificates() { return getBoolean(HTTP_SSL_TRUST_ALL_CERTIFICATES); } + public HttpConnectorMetrics metrics() { + return metrics; + } + public static void main(final String... args) { System.out.println("========================================="); System.out.println("HTTP Sink connector Configuration Options"); diff --git a/src/main/java/io/aiven/kafka/connect/http/metrics/HttpConnectorMetrics.java b/src/main/java/io/aiven/kafka/connect/http/metrics/HttpConnectorMetrics.java new file mode 100644 index 00000000..1e5a2635 --- /dev/null +++ b/src/main/java/io/aiven/kafka/connect/http/metrics/HttpConnectorMetrics.java @@ -0,0 +1,55 @@ +/* + * Copyright 2019 Aiven Oy and http-connector-for-apache-kafka project contributors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.aiven.kafka.connect.http.metrics; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.kafka.common.metrics.JmxReporter; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.metrics.MetricsContext; + +import io.aiven.kafka.connect.http.HttpSinkConnector; +import io.aiven.kafka.connect.http.config.HttpSinkConfig; + +public class HttpConnectorMetrics { + private static final String SOURCE_CONNECTOR_GROUP = HttpSinkConnector.class.getSimpleName(); + + private volatile AtomicInteger retryCount = new AtomicInteger(0); + + public HttpConnectorMetrics(final HttpSinkConfig config) { + final Metrics metrics = new Metrics(); + metrics.addMetric(metrics.metricName("retry-count", SOURCE_CONNECTOR_GROUP, + "The number of completed retries made in the current task."), + (metricConfig, now) -> getRetryCount()); + final JmxReporter reporter = new JmxReporter(); + reporter.contextChange(() -> Map.of(MetricsContext.NAMESPACE, "kafka.connect.http-sink")); + metrics.addReporter(reporter); + } + + protected int getRetryCount() { + return retryCount.get(); + } + + public void incrementRetryCount() { + retryCount.incrementAndGet(); + } + + public void resetRetryCount() { + retryCount.set(0); + } +} diff --git a/src/main/java/io/aiven/kafka/connect/http/sender/AbstractHttpSender.java b/src/main/java/io/aiven/kafka/connect/http/sender/AbstractHttpSender.java index 0fa5f520..3be71472 100644 --- a/src/main/java/io/aiven/kafka/connect/http/sender/AbstractHttpSender.java +++ b/src/main/java/io/aiven/kafka/connect/http/sender/AbstractHttpSender.java @@ -63,6 +63,7 @@ protected HttpResponse sendWithRetries( final Builder requestBuilderWithPayload, final HttpResponseHandler httpResponseHandler, final int retries ) { + config.metrics().resetRetryCount(); int remainingRetries = retries; while (remainingRetries >= 0) { try { @@ -72,11 +73,13 @@ protected HttpResponse sendWithRetries( log.debug("Server replied with status code {} and body {}", response.statusCode(), response.body()); // Handle the response httpResponseHandler.onResponse(response, remainingRetries); + config.metrics().resetRetryCount(); return response; } catch (final IOException e) { log.info("Sending failed, will retry in {} ms ({} retries remain)", config.retryBackoffMs(), remainingRetries, e); remainingRetries -= 1; + config.metrics().incrementRetryCount(); TimeUnit.MILLISECONDS.sleep(config.retryBackoffMs()); } } catch (final InterruptedException e) {