Skip to content

Commit

Permalink
Support include publish time to metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
shibd committed Nov 5, 2024
1 parent de86331 commit fd5ed36
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ public class BlobStoreAbstractConfig implements Serializable {
private boolean useHumanReadableMessageId;
private boolean useHumanReadableSchemaVersion;
private boolean includeTopicToMetadata;
private boolean includePublishTimeToMetadata;

public void validate() {
checkNotNull(provider, "provider not set.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public class AvroFormat implements Format<GenericRecord> , InitConfiguration<Blo
private boolean useHumanReadableMessageId;
private boolean useHumanReadableSchemaVersion;
private boolean includeTopicToMetadata;
private boolean includePublishTimeToMetadata;
private CodecFactory codecFactory;

@Override
Expand All @@ -67,6 +68,7 @@ public void configure(BlobStoreAbstractConfig configuration) {
this.useHumanReadableMessageId = configuration.isUseHumanReadableMessageId();
this.useHumanReadableSchemaVersion = configuration.isUseHumanReadableSchemaVersion();
this.includeTopicToMetadata = configuration.isIncludeTopicToMetadata();
this.includePublishTimeToMetadata = configuration.isIncludePublishTimeToMetadata();
String codecName = configuration.getAvroCodec();
if (codecName == null) {
this.codecFactory = CodecFactory.nullCodec();
Expand All @@ -87,8 +89,8 @@ public void initSchema(org.apache.pulsar.client.api.Schema<GenericRecord> schema
internalSchema = schema;
rootAvroSchema = AvroRecordUtil.convertToAvroSchema(schema);
if (useMetadata){
rootAvroSchema = MetadataUtil.setMetadataSchema(rootAvroSchema,
useHumanReadableMessageId, useHumanReadableSchemaVersion, includeTopicToMetadata);
rootAvroSchema = MetadataUtil.setMetadataSchema(rootAvroSchema, useHumanReadableMessageId,
useHumanReadableSchemaVersion, includeTopicToMetadata, includePublishTimeToMetadata);
}

LOGGER.debug("Using avro schema: {}", rootAvroSchema);
Expand Down Expand Up @@ -126,7 +128,8 @@ public ByteBuffer recordWriterBuf(Iterator<Record<GenericRecord>> records) throw
if (useMetadata) {
org.apache.avro.generic.GenericRecord metadataRecord =
MetadataUtil.extractedMetadataRecord(next,
useHumanReadableMessageId, useHumanReadableSchemaVersion, includeTopicToMetadata);
useHumanReadableMessageId, useHumanReadableSchemaVersion,
includeTopicToMetadata, includePublishTimeToMetadata);
writeRecord.put(MetadataUtil.MESSAGE_METADATA_KEY, metadataRecord);
}
fileWriter.append(writeRecord);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public class JsonFormat implements Format<GenericRecord>, InitConfiguration<Blob
private boolean useHumanReadableMessageId;
private boolean useHumanReadableSchemaVersion;
private boolean includeTopicToMetadata;
private boolean includePublishTimeToMetadata;

@Override
public String getExtension() {
Expand All @@ -82,6 +83,7 @@ public void configure(BlobStoreAbstractConfig configuration) {
this.useHumanReadableMessageId = configuration.isUseHumanReadableMessageId();
this.useHumanReadableSchemaVersion = configuration.isUseHumanReadableSchemaVersion();
this.includeTopicToMetadata = configuration.isIncludeTopicToMetadata();
this.includePublishTimeToMetadata = configuration.isIncludePublishTimeToMetadata();

if (configuration.isJsonAllowNaN()) {
JSON_MAPPER.get().enable(ALLOW_NON_NUMERIC_NUMBERS.mappedFeature());
Expand Down Expand Up @@ -117,11 +119,11 @@ public ByteBuffer recordWriterBuf(Iterator<Record<GenericRecord>> record) throws
GenericRecord val = next.getValue();
final Schema<GenericRecord> schema = next.getSchema();
log.debug("next record {} schema {} val {}", next, schema, val);
Map<String, Object> writeValue = convertRecordToObject(next.getValue(), schema);
Map<String, Object> writeValue = convertRecordToObject(val, schema);
if (useMetadata) {
writeValue.put(MetadataUtil.MESSAGE_METADATA_KEY,
MetadataUtil.extractedMetadata(next, useHumanReadableMessageId, useHumanReadableSchemaVersion,
includeTopicToMetadata));
includeTopicToMetadata, includePublishTimeToMetadata));
}
String recordAsString = JSON_MAPPER.get().writeValueAsString(writeValue);
stringBuilder.append(recordAsString).append("\n");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class ParquetFormat implements Format<GenericRecord>, InitConfiguration<B
private boolean useHumanReadableMessageId;
private boolean useHumanReadableSchemaVersion;
private boolean includeTopicToMetadata;
private boolean includePublishTimeToMetadata;

private CompressionCodecName compressionCodecName = CompressionCodecName.GZIP;

Expand All @@ -78,6 +79,7 @@ public void configure(BlobStoreAbstractConfig configuration) {
this.useHumanReadableMessageId = configuration.isUseHumanReadableMessageId();
this.useHumanReadableSchemaVersion = configuration.isUseHumanReadableSchemaVersion();
this.includeTopicToMetadata = configuration.isIncludeTopicToMetadata();
this.includePublishTimeToMetadata = configuration.isIncludePublishTimeToMetadata();
this.compressionCodecName = CompressionCodecName.fromConf(configuration.getParquetCodec());
}

Expand Down Expand Up @@ -186,8 +188,8 @@ public void initSchema(org.apache.pulsar.client.api.Schema<GenericRecord> schema
} else {
rootAvroSchema = AvroRecordUtil.convertToAvroSchema(schema);
if (useMetadata) {
rootAvroSchema = MetadataUtil.setMetadataSchema(rootAvroSchema,
useHumanReadableMessageId, useHumanReadableSchemaVersion, includeTopicToMetadata);
rootAvroSchema = MetadataUtil.setMetadataSchema(rootAvroSchema, useHumanReadableMessageId,
useHumanReadableSchemaVersion, includeTopicToMetadata, includePublishTimeToMetadata);
}
log.info("Using avro schema: {}", rootAvroSchema);
}
Expand Down Expand Up @@ -249,7 +251,8 @@ public ByteBuffer recordWriterBuf(Iterator<Record<GenericRecord>> records) throw
// Add metadata to the record
DynamicMessage.Builder messageBuilder = DynamicMessage.newBuilder(descriptor);
Metadata.PulsarIOCSCProtobufMessageMetadata metadata = getMetadataFromMessage(next,
useHumanReadableMessageId, useHumanReadableSchemaVersion);
useHumanReadableMessageId, useHumanReadableSchemaVersion,
includeTopicToMetadata, includePublishTimeToMetadata);
for (Descriptors.FieldDescriptor field : descriptor.getFields()) {
if (field.getName().equals(MESSAGE_METADATA_KEY)) {
messageBuilder.setField(field, metadata);
Expand All @@ -272,7 +275,8 @@ public ByteBuffer recordWriterBuf(Iterator<Record<GenericRecord>> records) throw
MetadataUtil.extractedMetadataRecord(next,
useHumanReadableMessageId,
useHumanReadableSchemaVersion,
includeTopicToMetadata);
includeTopicToMetadata,
includePublishTimeToMetadata);
writeRecord.put(MESSAGE_METADATA_KEY, metadataRecord);
}
if (parquetWriter != null) {
Expand Down
51 changes: 33 additions & 18 deletions src/main/java/org/apache/pulsar/io/jcloud/util/MetadataUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class MetadataUtil {
public static final String METADATA_SCHEMA_VERSION_KEY = "schemaVersion";
public static final String METADATA_MESSAGE_ID_KEY = "messageId";
public static final String METADATA_TOPIC = "topic";
public static final String METADATA_PUBLISH_TIME = "publishTime";
public static final String MESSAGE_METADATA_KEY = "__message_metadata__";
public static final String MESSAGE_METADATA_NAME = "messageMetadata";
public static final Schema MESSAGE_METADATA = buildMetadataSchema();
Expand All @@ -51,11 +52,12 @@ public class MetadataUtil {
public static org.apache.avro.generic.GenericRecord extractedMetadataRecord(Record<GenericRecord> next,
boolean useHumanReadableMessageId,
boolean useHumanReadableSchemaVersion,
boolean includeTopicToMetadata) {
boolean includeTopicToMetadata,
boolean includePublishTimeToMetadata) {
final Message<GenericRecord> message = next.getMessage().get();

GenericData.Record record = new GenericData.Record(buildMetadataSchema(
useHumanReadableMessageId, useHumanReadableSchemaVersion, includeTopicToMetadata));
GenericData.Record record = new GenericData.Record(buildMetadataSchema(useHumanReadableMessageId,
useHumanReadableSchemaVersion, includeTopicToMetadata, includePublishTimeToMetadata));
record.put(METADATA_PROPERTIES_KEY, message.getProperties());
if (useHumanReadableSchemaVersion) {
record.put(METADATA_SCHEMA_VERSION_KEY,
Expand All @@ -71,21 +73,17 @@ public static org.apache.avro.generic.GenericRecord extractedMetadataRecord(Reco
if (includeTopicToMetadata) {
record.put(METADATA_TOPIC, message.getTopicName());
}
if (includePublishTimeToMetadata) {
record.put(METADATA_PUBLISH_TIME, message.getPublishTime());
}
return record;
}

public static org.apache.avro.generic.GenericRecord extractedMetadataRecord(Record<GenericRecord> next) {
return extractedMetadataRecord(next, false, false, false);
}

public static Map<String, Object> extractedMetadata(Record<GenericRecord> next) {
return extractedMetadata(next, false, false, false);
}

public static Map<String, Object> extractedMetadata(Record<GenericRecord> next,
boolean useHumanReadableMessageId,
boolean useHumanReadableSchemaVersion,
boolean includeTopicToMetadata) {
boolean includeTopicToMetadata,
boolean includePublishTimeToMetadata) {
Map<String, Object> metadata = new HashMap<>();
final Message<GenericRecord> message = next.getMessage().get();
metadata.put(METADATA_PROPERTIES_KEY, message.getProperties());
Expand All @@ -103,6 +101,9 @@ public static Map<String, Object> extractedMetadata(Record<GenericRecord> next,
if (includeTopicToMetadata) {
metadata.put(METADATA_TOPIC, message.getTopicName());
}
if (includePublishTimeToMetadata) {
metadata.put(METADATA_PUBLISH_TIME, message.getPublishTime());
}
return metadata;
}

Expand All @@ -124,14 +125,15 @@ public static Schema setMetadataSchema(Schema schema) {
public static Schema setMetadataSchema(Schema schema,
boolean useHumanReadableMessageId,
boolean useHumanReadableSchemaVersion,
boolean includeTopicToMetadata) {
boolean includeTopicToMetadata,
boolean includePublishTimeToMetadata) {
final List<Schema.Field> fieldWithMetadata = schemaFieldThreadLocal.get();
fieldWithMetadata.clear();
schema.getFields().forEach(f -> {
fieldWithMetadata.add(new Schema.Field(f, f.schema()));
});
fieldWithMetadata.add(new Schema.Field(MESSAGE_METADATA_KEY, buildMetadataSchema(
useHumanReadableMessageId, useHumanReadableSchemaVersion, includeTopicToMetadata)));
fieldWithMetadata.add(new Schema.Field(MESSAGE_METADATA_KEY, buildMetadataSchema(useHumanReadableMessageId,
useHumanReadableSchemaVersion, includeTopicToMetadata, includePublishTimeToMetadata)));
return Schema.createRecord(schema.getName(),
schema.getDoc(),
schema.getNamespace(),
Expand All @@ -141,12 +143,13 @@ public static Schema setMetadataSchema(Schema schema,
}

private static Schema buildMetadataSchema(){
return buildMetadataSchema(false, false, false);
return buildMetadataSchema(false, false, false, false);
}

private static Schema buildMetadataSchema(boolean useHumanReadableMessageId,
boolean useHumanReadableSchemaVersion,
boolean includeTopicToMetadata) {
boolean includeTopicToMetadata,
boolean includePublishTimeToMetadata) {
List<Schema.Field> fields = new ArrayList<>();
fields.add(new Schema.Field(METADATA_PROPERTIES_KEY,
Schema.createUnion(
Expand All @@ -172,6 +175,10 @@ private static Schema buildMetadataSchema(boolean useHumanReadableMessageId,
fields.add(new Schema.Field(METADATA_TOPIC, Schema.createUnion(
Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING))));
}
if (includePublishTimeToMetadata) {
fields.add(new Schema.Field(METADATA_PUBLISH_TIME, Schema.createUnion(
Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.LONG))));
}
return Schema.createRecord(MESSAGE_METADATA_NAME,
null,
null,
Expand All @@ -187,7 +194,9 @@ public static long parseSchemaVersionFromBytes(byte[] schemaVersion) {

public static Metadata.PulsarIOCSCProtobufMessageMetadata getMetadataFromMessage(Record<GenericRecord> next,
boolean useHumanReadableMessageId,
boolean useHumanReadableSchemaVersion) {
boolean useHumanReadableSchemaVersion,
boolean includeTopicToMetadata,
boolean includePublishTimeToMetadata) {
Metadata.PulsarIOCSCProtobufMessageMetadata.Builder metadataBuilder =
Metadata.PulsarIOCSCProtobufMessageMetadata.newBuilder();
final Message<GenericRecord> message = next.getMessage().get();
Expand All @@ -204,6 +213,12 @@ public static Metadata.PulsarIOCSCProtobufMessageMetadata getMetadataFromMessage
} else {
metadataBuilder.setMessageIdBytes(ByteString.copyFrom(message.getMessageId().toByteArray()));
}
if (includeTopicToMetadata) {
metadataBuilder.setTopic(message.getTopicName());
}
if (includePublishTimeToMetadata) {
metadataBuilder.setPublishTime(message.getPublishTime());
}
return metadataBuilder.build();
}
}
2 changes: 2 additions & 0 deletions src/main/proto/ProtobufMessageMetadata.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,7 @@ message PulsarIOCSCProtobufMessageMetadata {
map<string, string> properties = 1;
string schema_version = 2;
string message_id = 3;
string topic = 4;
int64 publish_time = 5;
}

Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ public void testExtractedMetadata() throws IOException {
byte[] schemaVersionBytes = new byte[]{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0A};
byte[] messageIdBytes = new byte[]{0x00, 0x01, 0x02, 0x03};
String topicName = "test-topic";
long publishTime = System.currentTimeMillis();
Record<GenericRecord> mockRecord = mock(Record.class);
Message<GenericRecord> mockMessage = mock(Message.class);
when(mockMessage.getPublishTime()).thenReturn(publishTime);
MessageId mockMessageId = mock(MessageId.class);
when(mockRecord.getMessage()).thenReturn(Optional.of(mockMessage));
when(mockMessage.getMessageId()).thenReturn(mockMessageId);
Expand All @@ -57,22 +59,23 @@ public void testExtractedMetadata() throws IOException {
when(mockMessage.getTopicName()).thenReturn(topicName);

Map<String, Object> metadataWithHumanReadableMetadata =
MetadataUtil.extractedMetadata(mockRecord, true, true, true);
MetadataUtil.extractedMetadata(mockRecord, true, true, true, true);
Assert.assertEquals(metadataWithHumanReadableMetadata.get(METADATA_MESSAGE_ID_KEY), messageIdString);
Assert.assertNotEquals(metadataWithHumanReadableMetadata.get(METADATA_MESSAGE_ID_KEY),
ByteBuffer.wrap(messageIdBytes));
Assert.assertEquals(metadataWithHumanReadableMetadata.get(METADATA_SCHEMA_VERSION_KEY),
MetadataUtil.parseSchemaVersionFromBytes(schemaVersionBytes));
Assert.assertEquals(metadataWithHumanReadableMetadata.get(MetadataUtil.METADATA_TOPIC), topicName);
Assert.assertEquals(metadataWithHumanReadableMetadata.get(MetadataUtil.METADATA_PUBLISH_TIME), publishTime);

Map<String, Object> metadataWithHumanReadableMessageId =
MetadataUtil.extractedMetadata(mockRecord, true, false, false);
MetadataUtil.extractedMetadata(mockRecord, true, false, false, false);
Assert.assertEquals(metadataWithHumanReadableMessageId.get(METADATA_MESSAGE_ID_KEY), messageIdString);
Assert.assertNotEquals(metadataWithHumanReadableMessageId.get(METADATA_MESSAGE_ID_KEY),
ByteBuffer.wrap(messageIdBytes));


Map<String, Object> metadata = MetadataUtil.extractedMetadata(mockRecord, false, false, false);
Map<String, Object> metadata = MetadataUtil.extractedMetadata(mockRecord, false, false, false, false);
Assert.assertEquals(metadata.get(METADATA_MESSAGE_ID_KEY), ByteBuffer.wrap(messageIdBytes));
Assert.assertEquals(metadata.get(METADATA_SCHEMA_VERSION_KEY), schemaVersionBytes);
Assert.assertNotEquals(metadata.get(METADATA_MESSAGE_ID_KEY), messageIdString);
Expand Down

0 comments on commit fd5ed36

Please sign in to comment.