diff --git a/README.md b/README.md index ea9c295..92b5bbc 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Datadog Kafka Connect Logs -`datadog-kafka-connect-logs` is a [Kafka Connector](http://kafka.apache.org/documentation.html#connect) for sending +`datadog-kafka-connect-logs` is a [Kafka Connector](http://kafka.apache.org/documentation.html#connect) for sending records from Kafka as logs to the [Datadog Logs Intake API](https://docs.datadoghq.com/api/v1/logs/). It is a plugin meant to be installed on a [Kafka Connect Cluster](https://docs.confluent.io/current/connect/) running @@ -12,7 +12,7 @@ besides a [Kafka Broker](https://www.confluent.io/what-is-apache-kafka/). 2. Java 8 and above. 3. Confluent Platform 4.0.x and above (optional). -To install the plugin, one must have a working instance of Kafka Connect connected to a Kafka Broker. See also +To install the plugin, one must have a working instance of Kafka Connect connected to a Kafka Broker. See also [Confluent's](https://www.confluent.io/product/confluent-platform/) documentation for easily setting this up. ## Installation and Setup @@ -24,25 +24,24 @@ See [Confluent's documentation](https://docs.confluent.io/current/connect/managi ### Download from Github Download the latest version from the GitHub [releases page](https://github.com/DataDog/datadog-kafka-connect-logs/releases). -Also see [Confluent's documentation](https://docs.confluent.io/current/connect/managing/community.html) on installing +Also see [Confluent's documentation](https://docs.confluent.io/current/connect/managing/community.html) on installing community connectors. ### Build from Source 1. Clone the repo from https://github.com/DataDog/datadog-kafka-connect-logs 2. Verify that Java8 JRE or JDK is installed. -3. Run `mvn clean compile package`. This will build the jar in the `/target` directory. The name will be -`datadog-kafka-connect-logs-[VERSION].jar`. +3. Run `mvn clean compile package`. This builds the jar in the `/target` directory. The file name has the format `datadog-kafka-connect-logs-[VERSION].jar`. 4. The zip file for use on [Confluent Hub](https://www.confluent.io/hub/) can be found in `target/components/packages`. ## Quick Start 1. To install the plugin, place the plugin's jar file (see [previous section](#installation-and-setup) on how to download or build it) -in or under the location specified in `plugin.path` . If you use Confluent Platform, simply run -`confluent-hub install target/components/packages/`. + in or under the location specified in `plugin.path` . If you use Confluent Platform, run + `confluent-hub install target/components/packages/`. 2. Restart your Kafka Connect instance. -3. Run the following command to manually create connector tasks. Adjust `topics` to configure the Kafka topic to be -ingested and set your Datadog `api_key`. +3. Run the following command to manually create connector tasks. Adjust `topics` to configure the Kafka topic to be + ingested and set your Datadog `api_key`. ``` curl localhost:8083/connectors -X POST -H "Content-Type: application/json" -d '{ @@ -56,8 +55,8 @@ ingested and set your Datadog `api_key`. }' ``` -4. You can verify that data is ingested to the Datadog platform by searching for `source:kafka-connect` in the Log -Explorer tab +4. You can verify that data is ingested to the Datadog platform by searching for `source:kafka-connect` in the Log + Explorer tab 5. Use the following commands to check status, and manage connectors and tasks: ``` @@ -95,18 +94,19 @@ A REST call can be executed against one of the cluster instances, and the config | `topics` | Comma separated list of Kafka topics for Datadog to consume. `prod-topic1,prod-topic2,prod-topic3`|| | `datadog.api_key` | The API key of your Datadog platform.|| #### General Optional Parameters -| Name | Description | Default Value | -|-------- |------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------| -| `datadog.site` | The site of the Datadog intake to send logs to (for example 'datadoghq.eu' to send data to the EU site) | `datadoghq.com` | +| Name | Description | Default Value | +|-------- |-------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------| +| `datadog.site` | The site of the Datadog intake to send logs to (for example 'datadoghq.eu' to send data to the EU site) | `datadoghq.com` | | `datadog.url` | Custom Datadog URL endpoint where your logs will be sent. `datadog.url` takes precedence over `datadog.site`. Example: `http-intake.logs.datadoghq.com:443` || -| `datadog.tags` | Tags associated with your logs in a comma separated tag:value format. || -| `datadog.service` | The name of the application or service generating the log events. || -| `datadog.hostname` | The name of the originating host of the log. || -| `datadog.proxy.url` | Proxy endpoint when logs are not directly forwarded to Datadog. || -| `datadog.proxy.port` | Proxy port when logs are not directly forwarded to Datadog. || -| `datadog.retry.max` | The number of retries before the output plugin stops. | `5` || -| `datadog.retry.backoff_ms` | The time in milliseconds to wait following an error before a retry attempt is made. | `3000` || -| `datadog.add_published_date` | Valid settings are true or false. When set to `true`, The timestamp is retrieved from the Kafka record and passed to Datadog as `published_date` || +| `datadog.tags` | Tags associated with your logs in a comma separated tag:value format. || +| `datadog.service` | The name of the application or service generating the log events. || +| `datadog.hostname` | The name of the originating host of the log. || +| `datadog.proxy.url` | Proxy endpoint when logs are not directly forwarded to Datadog. || +| `datadog.proxy.port` | Proxy port when logs are not directly forwarded to Datadog. || +| `datadog.retry.max` | The number of retries before the output plugin stops. | `5` || +| `datadog.retry.backoff_ms` | The time in milliseconds to wait following an error before a retry attempt is made. | `3000` || +| `datadog.add_published_date` | Valid settings are true or false. When set to `true`, The timestamp is retrieved from the Kafka record and passed to Datadog as `published_date` || +| `datadog.parse_record_headers` | Valid settings are true or false. When set to `true`, Kafka Record Headers are parsed and passed to DataDog as a `kafkaheaders` object |`false`| ### Troubleshooting performance @@ -126,7 +126,7 @@ To improve performance of the connector, you can try the following options: ## Single Message Transforms -Kafka Connect supports Single Message Transforms that let you change the structure or content of a message. To +Kafka Connect supports Single Message Transforms that let you change the structure or content of a message. To experiment with this feature, try adding these lines to your sink connector configuration: ```properties @@ -135,7 +135,7 @@ transforms.addExtraField.type=org.apache.kafka.connect.transforms.InsertField$Va transforms.addExtraField.static.field=extraField transforms.addExtraField.static.value=extraValue ``` -Now if you restart the sink connector and send some more test messages, each new record should have a `extraField` field +If you restart the sink connector and send some more test messages, each new record should have a `extraField` field with value `value`. For more in-depth video, see [confluent's documentation](https://docs.confluent.io/current/connect/transforms/index.html). ## Testing @@ -146,14 +146,14 @@ To run the supplied unit tests, run `mvn test` from the root of the project. ### System Tests -We use Confluent Platform for a batteries-included Kafka environment for local testing. Follow the guide +Use use Confluent Platform for a batteries-included Kafka environment for local testing. Follow the guide [here](https://docs.confluent.io/current/quickstart/ce-quickstart.html) to install the Confluent Platform. -Then, install the [Confluent Kafka Datagen Connector](https://github.com/confluentinc/kafka-connect-datagen) to create -sample data of arbitrary types. Install this Datadog Logs Connector by running +Then, install the [Confluent Kafka Datagen Connector](https://github.com/confluentinc/kafka-connect-datagen) to create +sample data of arbitrary types. Install this Datadog Logs Connector by running `confluent-hub install target/components/packages/`. -In the `/test` directory there are some `.json` configuration files to make it easy to create Connectors. There are +In the `/test` directory, there are some `.json` configuration files to make it easy to create Connectors. There are configurations for both the Datagen Connector with various datatypes, as well as the Datadog Logs Connector. To the latter, you will need to add a valid Datadog API Key for once you upload the `.json` to Confluent Platform. diff --git a/src/main/java/com/datadoghq/connect/logs/sink/DatadogLogsApiWriter.java b/src/main/java/com/datadoghq/connect/logs/sink/DatadogLogsApiWriter.java index f81f765..17b1520 100644 --- a/src/main/java/com/datadoghq/connect/logs/sink/DatadogLogsApiWriter.java +++ b/src/main/java/com/datadoghq/connect/logs/sink/DatadogLogsApiWriter.java @@ -6,25 +6,42 @@ This product includes software developed at Datadog (https://www.datadoghq.com/) package com.datadoghq.connect.logs.sink; import com.datadoghq.connect.logs.util.Project; -import com.google.gson.*; +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonPrimitive; +import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.json.JsonConverter; import org.apache.kafka.connect.sink.SinkRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.*; +import javax.ws.rs.core.Response; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; import java.net.HttpURLConnection; import java.net.InetSocketAddress; import java.net.Proxy; import java.net.URL; import java.nio.charset.StandardCharsets; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; import java.util.zip.GZIPOutputStream; -import javax.ws.rs.core.Response; + +import static java.util.stream.Collectors.toMap; +import static java.util.stream.StreamSupport.stream; public class DatadogLogsApiWriter { - private final DatadogLogsSinkConnectorConfig config; private static final Logger log = LoggerFactory.getLogger(DatadogLogsApiWriter.class); + private final DatadogLogsSinkConnectorConfig config; private final Map> batches; private final JsonConverter jsonConverter; @@ -33,7 +50,7 @@ public DatadogLogsApiWriter(DatadogLogsSinkConnectorConfig config) { this.batches = new HashMap<>(); this.jsonConverter = new JsonConverter(); - Map jsonConverterConfig = new HashMap(); + Map jsonConverterConfig = new HashMap<>(); jsonConverterConfig.put("schemas.enable", "false"); jsonConverterConfig.put("decimal.format", "NUMERIC"); @@ -42,13 +59,14 @@ public DatadogLogsApiWriter(DatadogLogsSinkConnectorConfig config) { /** * Writes records to the Datadog Logs API. + * * @param records to be written from the Source Broker to the Datadog Logs API. * @throws IOException may be thrown if the connection to the API fails. */ public void write(Collection records) throws IOException { for (SinkRecord record : records) { if (!batches.containsKey(record.topic())) { - batches.put(record.topic(), new ArrayList<> (Collections.singletonList(record))); + batches.put(record.topic(), new ArrayList<>(Collections.singletonList(record))); } else { batches.get(record.topic()).add(record); } @@ -64,7 +82,7 @@ public void write(Collection records) throws IOException { private void flushBatches() throws IOException { // send any outstanding batches - for(Map.Entry> entry: batches.entrySet()) { + for (Map.Entry> entry : batches.entrySet()) { sendBatch(entry.getKey()); } @@ -97,20 +115,31 @@ private JsonArray formatBatch(String topic) { } JsonElement recordJSON = recordToJSON(record); - JsonObject message = populateMetadata(topic, recordJSON, record.timestamp()); + JsonObject message = populateMetadata(topic, recordJSON, record.timestamp(), () -> kafkaHeadersToJsonElement(record)); batchRecords.add(message); } return batchRecords; } + private JsonElement kafkaHeadersToJsonElement(SinkRecord sinkRecord) { + Map headerMap = stream(sinkRecord.headers().spliterator(), false) + .collect(toMap(Header::key, Header::value)); + + Gson gson = new Gson(); + + String jsonString = gson.toJson(headerMap); + + return gson.fromJson(jsonString, JsonElement.class); + } + private JsonElement recordToJSON(SinkRecord record) { byte[] rawJSONPayload = jsonConverter.fromConnectData(record.topic(), record.valueSchema(), record.value()); String jsonPayload = new String(rawJSONPayload, StandardCharsets.UTF_8); return new Gson().fromJson(jsonPayload, JsonElement.class); } - private JsonObject populateMetadata(String topic, JsonElement message, Long timestamp) { + private JsonObject populateMetadata(String topic, JsonElement message, Long timestamp, Supplier kafkaHeaders) { JsonObject content = new JsonObject(); String tags = "topic:" + topic; content.add("message", message); @@ -119,6 +148,10 @@ private JsonObject populateMetadata(String topic, JsonElement message, Long time content.add("published_date", new JsonPrimitive(timestamp)); } + if (config.parseRecordHeaders) { + content.add("kafkaheaders", kafkaHeaders.get()); + } + if (config.ddTags != null) { tags += "," + config.ddTags; } @@ -161,7 +194,7 @@ private void sendRequest(JsonArray content, URL url) throws IOException { if (Response.Status.Family.familyOf(status) != Response.Status.Family.SUCCESSFUL) { InputStream stream = con.getErrorStream(); String error = ""; - if (stream != null ) { + if (stream != null) { error = getOutput(stream); } con.disconnect(); diff --git a/src/main/java/com/datadoghq/connect/logs/sink/DatadogLogsSinkConnectorConfig.java b/src/main/java/com/datadoghq/connect/logs/sink/DatadogLogsSinkConnectorConfig.java index d7af4cf..f774ea6 100644 --- a/src/main/java/com/datadoghq/connect/logs/sink/DatadogLogsSinkConnectorConfig.java +++ b/src/main/java/com/datadoghq/connect/logs/sink/DatadogLogsSinkConnectorConfig.java @@ -34,6 +34,7 @@ public class DatadogLogsSinkConnectorConfig extends AbstractConfig { private static final String DEFAULT_DD_SITE = "datadoghq.com"; public static final String DEFAULT_DD_URL = String.format(DD_URL_FORMAT_FROM_SITE, DEFAULT_DD_SITE); public static final String ADD_PUBLISHED_DATE = "datadog.add_published_date"; + public static final String PARSE_RECORD_HEADERS = "datadog.parse_record_headers"; // Respect limit documented at https://docs.datadoghq.com/api/?lang=bash#logs public final Integer ddMaxBatchLength; @@ -53,6 +54,7 @@ public class DatadogLogsSinkConnectorConfig extends AbstractConfig { public final Integer retryMax; public final Integer retryBackoffMs; public final boolean addPublishedDate; + public final boolean parseRecordHeaders; public static final ConfigDef CONFIG_DEF = baseConfigDef(); @@ -75,6 +77,7 @@ public DatadogLogsSinkConnectorConfig(Boolean useSSL, Integer ddMaxBatchLength, this.ddSite = getString(DD_SITE); this.ddMaxBatchLength = ddMaxBatchLength; this.addPublishedDate = getBoolean(ADD_PUBLISHED_DATE); + this.parseRecordHeaders = getBoolean(PARSE_RECORD_HEADERS); validateConfig(); } @@ -175,7 +178,13 @@ private static void addMetadataConfigs(ConfigDef configDef) { false, null, Importance.MEDIUM, - "Valid settings are true or false. When set to `true`, The timestamp is retrieved from the Kafka record and passed to Datadog as `published_date`"); + "Valid settings are true or false. When set to `true`, The timestamp is retrieved from the Kafka record and passed to Datadog as `published_date`" + ).define(PARSE_RECORD_HEADERS, + Type.BOOLEAN, + false, + null, + Importance.MEDIUM, + "Valid settings are true or false. When set to `true`, Kafka Record Headers will be parsed and passed to DataDog as `kafkaheaders` object"); } private static void addProxyConfigs(ConfigDef configDef) { @@ -250,4 +259,4 @@ private String getTags(String key) { return null; } -} \ No newline at end of file +} diff --git a/src/test/java/com/datadoghq/connect/logs/sink/DatadogLogsApiWriterTest.java b/src/test/java/com/datadoghq/connect/logs/sink/DatadogLogsApiWriterTest.java index f6e910f..d4962c1 100644 --- a/src/test/java/com/datadoghq/connect/logs/sink/DatadogLogsApiWriterTest.java +++ b/src/test/java/com/datadoghq/connect/logs/sink/DatadogLogsApiWriterTest.java @@ -8,31 +8,34 @@ This product includes software developed at Datadog (https://www.datadoghq.com/) import com.datadoghq.connect.logs.sink.util.RequestInfo; import com.datadoghq.connect.logs.sink.util.RestHelper; import com.datadoghq.connect.logs.util.Project; - import org.apache.kafka.common.record.TimestampType; -import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.apache.kafka.connect.header.Headers; +import org.apache.kafka.connect.sink.SinkRecord; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import java.io.IOException; +import java.math.BigDecimal; +import java.math.BigInteger; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; -import java.math.BigDecimal; -import java.math.BigInteger; public class DatadogLogsApiWriterTest { + private static String apiKey = "API_KEY"; private Map props; private List records; private RestHelper restHelper; - private static String apiKey = "API_KEY"; @Before public void setUp() throws Exception { @@ -153,7 +156,7 @@ public void writer_IOException_for_status_429() throws Exception { DatadogLogsApiWriter writer = new DatadogLogsApiWriter(config); restHelper.setHttpStatusCode(429); - records.add(new SinkRecord("someTopic", 0, null, "someKey", null, "someValue1", 0)); + records.add(new SinkRecord("someTopic", 0, null, "someKey", null, "someValue1", 0)); writer.write(records); } @@ -194,4 +197,50 @@ public void writer_withUseRecordTimeStampEnabled_shouldPopulateRecordTimestamp() System.out.println(request.getBody()); Assert.assertEquals("[{\"message\":\"someValue1\",\"ddsource\":\"kafka-connect\",\"published_date\":1713974401224,\"ddtags\":\"topic:someTopic\"},{\"message\":\"someValue2\",\"ddsource\":\"kafka-connect\",\"published_date\":1713974401224,\"ddtags\":\"topic:someTopic\"}]", request.getBody()); } + + @Test + public void writer_parse_record_headers_enabled() throws IOException { + props.put(DatadogLogsSinkConnectorConfig.PARSE_RECORD_HEADERS, "true"); + DatadogLogsSinkConnectorConfig config = new DatadogLogsSinkConnectorConfig(false, 2, props); + DatadogLogsApiWriter writer = new DatadogLogsApiWriter(config); + + + Schema keySchema = Schema.INT32_SCHEMA; + Schema valueSchema = SchemaBuilder.struct() + .field("field1", Schema.STRING_SCHEMA) + .field("field2", Schema.INT32_SCHEMA) + .build(); + + Integer key = 123; + Struct value = new Struct(valueSchema) + .put("field1", "value1") + .put("field2", 456); + + Headers headers = new ConnectHeaders(); + headers.addString("headerKey", "headerValue"); + + long recordTime = 1713974401224L; + + SinkRecord sinkRecord = new SinkRecord("topicName", 0, keySchema, key, valueSchema, value, + 100L, recordTime, null, headers); + + records.add(sinkRecord); + records.add(new SinkRecord("someTopic", 0, null, "someKey", null, + "someValue1", 0, recordTime, TimestampType.CREATE_TIME)); + writer.write(records); + + Assert.assertEquals(2, restHelper.getCapturedRequests().size()); + + RequestInfo requestWithHeaders = restHelper.getCapturedRequests().get(0); + RequestInfo requestWithoutHeaders = restHelper.getCapturedRequests().get(1); + + Set requestBodySetActual = new HashSet<>(); + requestBodySetActual.add(requestWithHeaders.getBody()); + requestBodySetActual.add(requestWithoutHeaders.getBody()); + Set requestBodySetExpected = new HashSet<>(); + requestBodySetExpected.add("[{\"message\":{\"field1\":\"value1\",\"field2\":456},\"ddsource\":\"kafka-connect\",\"kafkaheaders\":{\"headerKey\":\"headerValue\"},\"ddtags\":\"topic:topicName\"}]"); + requestBodySetExpected.add("[{\"message\":\"someValue1\",\"ddsource\":\"kafka-connect\",\"kafkaheaders\":{},\"ddtags\":\"topic:someTopic\"}]"); + Assert.assertEquals(requestBodySetExpected, requestBodySetActual); + props.remove(DatadogLogsSinkConnectorConfig.PARSE_RECORD_HEADERS); + } }