diff --git a/README.md b/README.md index 13fdaea..ea9c295 100644 --- a/README.md +++ b/README.md @@ -95,17 +95,18 @@ A REST call can be executed against one of the cluster instances, and the config | `topics` | Comma separated list of Kafka topics for Datadog to consume. `prod-topic1,prod-topic2,prod-topic3`|| | `datadog.api_key` | The API key of your Datadog platform.|| #### General Optional Parameters -| Name | Description | Default Value | -|-------- |----------------------------|-----------------------| -| `datadog.site` | The site of the Datadog intake to send logs to (for example 'datadoghq.eu' to send data to the EU site) | `datadoghq.com` | +| Name | Description | Default Value | +|-------- |------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------| +| `datadog.site` | The site of the Datadog intake to send logs to (for example 'datadoghq.eu' to send data to the EU site) | `datadoghq.com` | | `datadog.url` | Custom Datadog URL endpoint where your logs will be sent. `datadog.url` takes precedence over `datadog.site`. Example: `http-intake.logs.datadoghq.com:443` || -| `datadog.tags` | Tags associated with your logs in a comma separated tag:value format.|| -| `datadog.service` | The name of the application or service generating the log events.|| -| `datadog.hostname` | The name of the originating host of the log.|| -| `datadog.proxy.url` | Proxy endpoint when logs are not directly forwarded to Datadog.|| -| `datadog.proxy.port` | Proxy port when logs are not directly forwarded to Datadog.|| -| `datadog.retry.max` | The number of retries before the output plugin stops.| `5` || -| `datadog.retry.backoff_ms` | The time in milliseconds to wait following an error before a retry attempt is made.| `3000` || +| `datadog.tags` | Tags associated with your logs in a comma separated tag:value format. || +| `datadog.service` | The name of the application or service generating the log events. || +| `datadog.hostname` | The name of the originating host of the log. || +| `datadog.proxy.url` | Proxy endpoint when logs are not directly forwarded to Datadog. || +| `datadog.proxy.port` | Proxy port when logs are not directly forwarded to Datadog. || +| `datadog.retry.max` | The number of retries before the output plugin stops. | `5` || +| `datadog.retry.backoff_ms` | The time in milliseconds to wait following an error before a retry attempt is made. | `3000` || +| `datadog.add_published_date` | Valid settings are true or false. When set to `true`, The timestamp is retrieved from the Kafka record and passed to Datadog as `published_date` || ### Troubleshooting performance diff --git a/src/main/java/com/datadoghq/connect/logs/sink/DatadogLogsApiWriter.java b/src/main/java/com/datadoghq/connect/logs/sink/DatadogLogsApiWriter.java index 4c54031..f81f765 100644 --- a/src/main/java/com/datadoghq/connect/logs/sink/DatadogLogsApiWriter.java +++ b/src/main/java/com/datadoghq/connect/logs/sink/DatadogLogsApiWriter.java @@ -97,7 +97,7 @@ private JsonArray formatBatch(String topic) { } JsonElement recordJSON = recordToJSON(record); - JsonObject message = populateMetadata(topic, recordJSON); + JsonObject message = populateMetadata(topic, recordJSON, record.timestamp()); batchRecords.add(message); } @@ -110,11 +110,14 @@ private JsonElement recordToJSON(SinkRecord record) { return new Gson().fromJson(jsonPayload, JsonElement.class); } - private JsonObject populateMetadata(String topic, JsonElement message) { + private JsonObject populateMetadata(String topic, JsonElement message, Long timestamp) { JsonObject content = new JsonObject(); String tags = "topic:" + topic; content.add("message", message); content.add("ddsource", new JsonPrimitive(config.ddSource)); + if (config.addPublishedDate && timestamp != null) { + content.add("published_date", new JsonPrimitive(timestamp)); + } if (config.ddTags != null) { tags += "," + config.ddTags; diff --git a/src/main/java/com/datadoghq/connect/logs/sink/DatadogLogsSinkConnectorConfig.java b/src/main/java/com/datadoghq/connect/logs/sink/DatadogLogsSinkConnectorConfig.java index 54664bb..d7af4cf 100644 --- a/src/main/java/com/datadoghq/connect/logs/sink/DatadogLogsSinkConnectorConfig.java +++ b/src/main/java/com/datadoghq/connect/logs/sink/DatadogLogsSinkConnectorConfig.java @@ -33,6 +33,7 @@ public class DatadogLogsSinkConnectorConfig extends AbstractConfig { private static final String DD_URL_FORMAT_FROM_SITE = "http-intake.logs.%s:443"; private static final String DEFAULT_DD_SITE = "datadoghq.com"; public static final String DEFAULT_DD_URL = String.format(DD_URL_FORMAT_FROM_SITE, DEFAULT_DD_SITE); + public static final String ADD_PUBLISHED_DATE = "datadog.add_published_date"; // Respect limit documented at https://docs.datadoghq.com/api/?lang=bash#logs public final Integer ddMaxBatchLength; @@ -51,6 +52,7 @@ public class DatadogLogsSinkConnectorConfig extends AbstractConfig { public final Integer proxyPort; public final Integer retryMax; public final Integer retryBackoffMs; + public final boolean addPublishedDate; public static final ConfigDef CONFIG_DEF = baseConfigDef(); @@ -72,6 +74,7 @@ public DatadogLogsSinkConnectorConfig(Boolean useSSL, Integer ddMaxBatchLength, this.ddUrl = getString(DD_URL); this.ddSite = getString(DD_SITE); this.ddMaxBatchLength = ddMaxBatchLength; + this.addPublishedDate = getBoolean(ADD_PUBLISHED_DATE); validateConfig(); } @@ -166,7 +169,13 @@ private static void addMetadataConfigs(ConfigDef configDef) { ++orderInGroup, Width.LONG, "Datadog logs site" - ); + ).define( + ADD_PUBLISHED_DATE, + Type.BOOLEAN, + false, + null, + Importance.MEDIUM, + "Valid settings are true or false. When set to `true`, The timestamp is retrieved from the Kafka record and passed to Datadog as `published_date`"); } private static void addProxyConfigs(ConfigDef configDef) { diff --git a/src/test/java/com/datadoghq/connect/logs/sink/DatadogLogsApiWriterTest.java b/src/test/java/com/datadoghq/connect/logs/sink/DatadogLogsApiWriterTest.java index fc12d1f..f6e910f 100644 --- a/src/test/java/com/datadoghq/connect/logs/sink/DatadogLogsApiWriterTest.java +++ b/src/test/java/com/datadoghq/connect/logs/sink/DatadogLogsApiWriterTest.java @@ -9,6 +9,7 @@ This product includes software developed at Datadog (https://www.datadoghq.com/) import com.datadoghq.connect.logs.sink.util.RestHelper; import com.datadoghq.connect.logs.util.Project; +import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.data.Decimal; import org.apache.kafka.connect.data.Schema; @@ -173,4 +174,24 @@ public void metadata_asOneBatch_shouldPopulatePerBatch() throws IOException { Assert.assertEquals("[{\"message\":\"someValue1\",\"ddsource\":\"kafka-connect\",\"ddtags\":\"topic:someTopic,team:agent-core,author:berzan\",\"hostname\":\"test-host\",\"service\":\"test-service\"},{\"message\":\"someValue2\",\"ddsource\":\"kafka-connect\",\"ddtags\":\"topic:someTopic,team:agent-core,author:berzan\",\"hostname\":\"test-host\",\"service\":\"test-service\"}]", request.getBody()); } + + @Test + public void writer_withUseRecordTimeStampEnabled_shouldPopulateRecordTimestamp() throws IOException { + props.put(DatadogLogsSinkConnectorConfig.ADD_PUBLISHED_DATE, "true"); + DatadogLogsSinkConnectorConfig config = new DatadogLogsSinkConnectorConfig(false, 2, props); + DatadogLogsApiWriter writer = new DatadogLogsApiWriter(config); + + + long recordTime = 1713974401224L; + + records.add(new SinkRecord("someTopic", 0, null, "someKey", null, "someValue1", 0, recordTime, TimestampType.CREATE_TIME)); + records.add(new SinkRecord("someTopic", 0, null, "someKey", null, "someValue2", 0, recordTime, TimestampType.CREATE_TIME)); + writer.write(records); + + Assert.assertEquals(1, restHelper.getCapturedRequests().size()); + + RequestInfo request = restHelper.getCapturedRequests().get(0); + System.out.println(request.getBody()); + Assert.assertEquals("[{\"message\":\"someValue1\",\"ddsource\":\"kafka-connect\",\"published_date\":1713974401224,\"ddtags\":\"topic:someTopic\"},{\"message\":\"someValue2\",\"ddsource\":\"kafka-connect\",\"published_date\":1713974401224,\"ddtags\":\"topic:someTopic\"}]", request.getBody()); + } }