Skip to content

Commit

Permalink
added new config to allow kafka record timestamp to set as published_… (
Browse files Browse the repository at this point in the history
#47)

* added new config to allow kafka record timestamp to set as published_date event attribute

---------

Co-authored-by: nyk0322 <nyk0322@dexcom.com>
  • Loading branch information
nyk0322 and nyk0322 authored Apr 26, 2024
1 parent 8ad004d commit 052939a
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 13 deletions.
21 changes: 11 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,18 @@ A REST call can be executed against one of the cluster instances, and the config
| `topics` | Comma separated list of Kafka topics for Datadog to consume. `prod-topic1,prod-topic2,prod-topic3`||
| `datadog.api_key` | The API key of your Datadog platform.||
#### General Optional Parameters
| Name | Description | Default Value |
|-------- |----------------------------|-----------------------|
| `datadog.site` | The site of the Datadog intake to send logs to (for example 'datadoghq.eu' to send data to the EU site) | `datadoghq.com` |
| Name | Description | Default Value |
|-------- |------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------------------|
| `datadog.site` | The site of the Datadog intake to send logs to (for example 'datadoghq.eu' to send data to the EU site) | `datadoghq.com` |
| `datadog.url` | Custom Datadog URL endpoint where your logs will be sent. `datadog.url` takes precedence over `datadog.site`. Example: `http-intake.logs.datadoghq.com:443` ||
| `datadog.tags` | Tags associated with your logs in a comma separated tag:value format.||
| `datadog.service` | The name of the application or service generating the log events.||
| `datadog.hostname` | The name of the originating host of the log.||
| `datadog.proxy.url` | Proxy endpoint when logs are not directly forwarded to Datadog.||
| `datadog.proxy.port` | Proxy port when logs are not directly forwarded to Datadog.||
| `datadog.retry.max` | The number of retries before the output plugin stops.| `5` ||
| `datadog.retry.backoff_ms` | The time in milliseconds to wait following an error before a retry attempt is made.| `3000` ||
| `datadog.tags` | Tags associated with your logs in a comma separated tag:value format. ||
| `datadog.service` | The name of the application or service generating the log events. ||
| `datadog.hostname` | The name of the originating host of the log. ||
| `datadog.proxy.url` | Proxy endpoint when logs are not directly forwarded to Datadog. ||
| `datadog.proxy.port` | Proxy port when logs are not directly forwarded to Datadog. ||
| `datadog.retry.max` | The number of retries before the output plugin stops. | `5` ||
| `datadog.retry.backoff_ms` | The time in milliseconds to wait following an error before a retry attempt is made. | `3000` ||
| `datadog.add_published_date` | Valid settings are true or false. When set to `true`, The timestamp is retrieved from the Kafka record and passed to Datadog as `published_date` ||

### Troubleshooting performance

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private JsonArray formatBatch(String topic) {
}

JsonElement recordJSON = recordToJSON(record);
JsonObject message = populateMetadata(topic, recordJSON);
JsonObject message = populateMetadata(topic, recordJSON, record.timestamp());
batchRecords.add(message);
}

Expand All @@ -110,11 +110,14 @@ private JsonElement recordToJSON(SinkRecord record) {
return new Gson().fromJson(jsonPayload, JsonElement.class);
}

private JsonObject populateMetadata(String topic, JsonElement message) {
private JsonObject populateMetadata(String topic, JsonElement message, Long timestamp) {
JsonObject content = new JsonObject();
String tags = "topic:" + topic;
content.add("message", message);
content.add("ddsource", new JsonPrimitive(config.ddSource));
if (config.addPublishedDate && timestamp != null) {
content.add("published_date", new JsonPrimitive(timestamp));
}

if (config.ddTags != null) {
tags += "," + config.ddTags;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public class DatadogLogsSinkConnectorConfig extends AbstractConfig {
private static final String DD_URL_FORMAT_FROM_SITE = "http-intake.logs.%s:443";
private static final String DEFAULT_DD_SITE = "datadoghq.com";
public static final String DEFAULT_DD_URL = String.format(DD_URL_FORMAT_FROM_SITE, DEFAULT_DD_SITE);
public static final String ADD_PUBLISHED_DATE = "datadog.add_published_date";

// Respect limit documented at https://docs.datadoghq.com/api/?lang=bash#logs
public final Integer ddMaxBatchLength;
Expand All @@ -51,6 +52,7 @@ public class DatadogLogsSinkConnectorConfig extends AbstractConfig {
public final Integer proxyPort;
public final Integer retryMax;
public final Integer retryBackoffMs;
public final boolean addPublishedDate;

public static final ConfigDef CONFIG_DEF = baseConfigDef();

Expand All @@ -72,6 +74,7 @@ public DatadogLogsSinkConnectorConfig(Boolean useSSL, Integer ddMaxBatchLength,
this.ddUrl = getString(DD_URL);
this.ddSite = getString(DD_SITE);
this.ddMaxBatchLength = ddMaxBatchLength;
this.addPublishedDate = getBoolean(ADD_PUBLISHED_DATE);
validateConfig();
}

Expand Down Expand Up @@ -166,7 +169,13 @@ private static void addMetadataConfigs(ConfigDef configDef) {
++orderInGroup,
Width.LONG,
"Datadog logs site"
);
).define(
ADD_PUBLISHED_DATE,
Type.BOOLEAN,
false,
null,
Importance.MEDIUM,
"Valid settings are true or false. When set to `true`, The timestamp is retrieved from the Kafka record and passed to Datadog as `published_date`");
}

private static void addProxyConfigs(ConfigDef configDef) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ This product includes software developed at Datadog (https://www.datadoghq.com/)
import com.datadoghq.connect.logs.sink.util.RestHelper;
import com.datadoghq.connect.logs.util.Project;

import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.data.Decimal;
import org.apache.kafka.connect.data.Schema;
Expand Down Expand Up @@ -173,4 +174,24 @@ public void metadata_asOneBatch_shouldPopulatePerBatch() throws IOException {

Assert.assertEquals("[{\"message\":\"someValue1\",\"ddsource\":\"kafka-connect\",\"ddtags\":\"topic:someTopic,team:agent-core,author:berzan\",\"hostname\":\"test-host\",\"service\":\"test-service\"},{\"message\":\"someValue2\",\"ddsource\":\"kafka-connect\",\"ddtags\":\"topic:someTopic,team:agent-core,author:berzan\",\"hostname\":\"test-host\",\"service\":\"test-service\"}]", request.getBody());
}

@Test
public void writer_withUseRecordTimeStampEnabled_shouldPopulateRecordTimestamp() throws IOException {
props.put(DatadogLogsSinkConnectorConfig.ADD_PUBLISHED_DATE, "true");
DatadogLogsSinkConnectorConfig config = new DatadogLogsSinkConnectorConfig(false, 2, props);
DatadogLogsApiWriter writer = new DatadogLogsApiWriter(config);


long recordTime = 1713974401224L;

records.add(new SinkRecord("someTopic", 0, null, "someKey", null, "someValue1", 0, recordTime, TimestampType.CREATE_TIME));
records.add(new SinkRecord("someTopic", 0, null, "someKey", null, "someValue2", 0, recordTime, TimestampType.CREATE_TIME));
writer.write(records);

Assert.assertEquals(1, restHelper.getCapturedRequests().size());

RequestInfo request = restHelper.getCapturedRequests().get(0);
System.out.println(request.getBody());
Assert.assertEquals("[{\"message\":\"someValue1\",\"ddsource\":\"kafka-connect\",\"published_date\":1713974401224,\"ddtags\":\"topic:someTopic\"},{\"message\":\"someValue2\",\"ddsource\":\"kafka-connect\",\"published_date\":1713974401224,\"ddtags\":\"topic:someTopic\"}]", request.getBody());
}
}

0 comments on commit 052939a

Please sign in to comment.