From fd5ed36e7bd2c4d306206db60d9e1ca7be3b0bc5 Mon Sep 17 00:00:00 2001 From: Baodi Shi Date: Tue, 5 Nov 2024 17:37:12 +0800 Subject: [PATCH] Support include publish time to metadata --- .../io/jcloud/BlobStoreAbstractConfig.java | 1 + .../pulsar/io/jcloud/format/AvroFormat.java | 9 ++-- .../pulsar/io/jcloud/format/JsonFormat.java | 6 ++- .../io/jcloud/format/ParquetFormat.java | 12 +++-- .../pulsar/io/jcloud/util/MetadataUtil.java | 51 ++++++++++++------- src/main/proto/ProtobufMessageMetadata.proto | 2 + .../io/jcloud/utils/MetadataUtilTest.java | 9 ++-- 7 files changed, 60 insertions(+), 30 deletions(-) diff --git a/src/main/java/org/apache/pulsar/io/jcloud/BlobStoreAbstractConfig.java b/src/main/java/org/apache/pulsar/io/jcloud/BlobStoreAbstractConfig.java index 4b4f4a4f..5772230f 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/BlobStoreAbstractConfig.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/BlobStoreAbstractConfig.java @@ -112,6 +112,7 @@ public class BlobStoreAbstractConfig implements Serializable { private boolean useHumanReadableMessageId; private boolean useHumanReadableSchemaVersion; private boolean includeTopicToMetadata; + private boolean includePublishTimeToMetadata; public void validate() { checkNotNull(provider, "provider not set."); diff --git a/src/main/java/org/apache/pulsar/io/jcloud/format/AvroFormat.java b/src/main/java/org/apache/pulsar/io/jcloud/format/AvroFormat.java index 74bb7301..dcd1caf2 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/format/AvroFormat.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/format/AvroFormat.java @@ -54,6 +54,7 @@ public class AvroFormat implements Format , InitConfiguration schema internalSchema = schema; rootAvroSchema = AvroRecordUtil.convertToAvroSchema(schema); if (useMetadata){ - rootAvroSchema = MetadataUtil.setMetadataSchema(rootAvroSchema, - useHumanReadableMessageId, useHumanReadableSchemaVersion, includeTopicToMetadata); + rootAvroSchema = MetadataUtil.setMetadataSchema(rootAvroSchema, useHumanReadableMessageId, + useHumanReadableSchemaVersion, includeTopicToMetadata, includePublishTimeToMetadata); } LOGGER.debug("Using avro schema: {}", rootAvroSchema); @@ -126,7 +128,8 @@ public ByteBuffer recordWriterBuf(Iterator> records) throw if (useMetadata) { org.apache.avro.generic.GenericRecord metadataRecord = MetadataUtil.extractedMetadataRecord(next, - useHumanReadableMessageId, useHumanReadableSchemaVersion, includeTopicToMetadata); + useHumanReadableMessageId, useHumanReadableSchemaVersion, + includeTopicToMetadata, includePublishTimeToMetadata); writeRecord.put(MetadataUtil.MESSAGE_METADATA_KEY, metadataRecord); } fileWriter.append(writeRecord); diff --git a/src/main/java/org/apache/pulsar/io/jcloud/format/JsonFormat.java b/src/main/java/org/apache/pulsar/io/jcloud/format/JsonFormat.java index f56a7e50..d077b6d3 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/format/JsonFormat.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/format/JsonFormat.java @@ -70,6 +70,7 @@ public class JsonFormat implements Format, InitConfiguration> record) throws GenericRecord val = next.getValue(); final Schema schema = next.getSchema(); log.debug("next record {} schema {} val {}", next, schema, val); - Map writeValue = convertRecordToObject(next.getValue(), schema); + Map writeValue = convertRecordToObject(val, schema); if (useMetadata) { writeValue.put(MetadataUtil.MESSAGE_METADATA_KEY, MetadataUtil.extractedMetadata(next, useHumanReadableMessageId, useHumanReadableSchemaVersion, - includeTopicToMetadata)); + includeTopicToMetadata, includePublishTimeToMetadata)); } String recordAsString = JSON_MAPPER.get().writeValueAsString(writeValue); stringBuilder.append(recordAsString).append("\n"); diff --git a/src/main/java/org/apache/pulsar/io/jcloud/format/ParquetFormat.java b/src/main/java/org/apache/pulsar/io/jcloud/format/ParquetFormat.java index 0b8ba94f..c1abbdd4 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/format/ParquetFormat.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/format/ParquetFormat.java @@ -64,6 +64,7 @@ public class ParquetFormat implements Format, InitConfiguration schema } else { rootAvroSchema = AvroRecordUtil.convertToAvroSchema(schema); if (useMetadata) { - rootAvroSchema = MetadataUtil.setMetadataSchema(rootAvroSchema, - useHumanReadableMessageId, useHumanReadableSchemaVersion, includeTopicToMetadata); + rootAvroSchema = MetadataUtil.setMetadataSchema(rootAvroSchema, useHumanReadableMessageId, + useHumanReadableSchemaVersion, includeTopicToMetadata, includePublishTimeToMetadata); } log.info("Using avro schema: {}", rootAvroSchema); } @@ -249,7 +251,8 @@ public ByteBuffer recordWriterBuf(Iterator> records) throw // Add metadata to the record DynamicMessage.Builder messageBuilder = DynamicMessage.newBuilder(descriptor); Metadata.PulsarIOCSCProtobufMessageMetadata metadata = getMetadataFromMessage(next, - useHumanReadableMessageId, useHumanReadableSchemaVersion); + useHumanReadableMessageId, useHumanReadableSchemaVersion, + includeTopicToMetadata, includePublishTimeToMetadata); for (Descriptors.FieldDescriptor field : descriptor.getFields()) { if (field.getName().equals(MESSAGE_METADATA_KEY)) { messageBuilder.setField(field, metadata); @@ -272,7 +275,8 @@ public ByteBuffer recordWriterBuf(Iterator> records) throw MetadataUtil.extractedMetadataRecord(next, useHumanReadableMessageId, useHumanReadableSchemaVersion, - includeTopicToMetadata); + includeTopicToMetadata, + includePublishTimeToMetadata); writeRecord.put(MESSAGE_METADATA_KEY, metadataRecord); } if (parquetWriter != null) { diff --git a/src/main/java/org/apache/pulsar/io/jcloud/util/MetadataUtil.java b/src/main/java/org/apache/pulsar/io/jcloud/util/MetadataUtil.java index a46b9a89..69f9a5eb 100644 --- a/src/main/java/org/apache/pulsar/io/jcloud/util/MetadataUtil.java +++ b/src/main/java/org/apache/pulsar/io/jcloud/util/MetadataUtil.java @@ -42,6 +42,7 @@ public class MetadataUtil { public static final String METADATA_SCHEMA_VERSION_KEY = "schemaVersion"; public static final String METADATA_MESSAGE_ID_KEY = "messageId"; public static final String METADATA_TOPIC = "topic"; + public static final String METADATA_PUBLISH_TIME = "publishTime"; public static final String MESSAGE_METADATA_KEY = "__message_metadata__"; public static final String MESSAGE_METADATA_NAME = "messageMetadata"; public static final Schema MESSAGE_METADATA = buildMetadataSchema(); @@ -51,11 +52,12 @@ public class MetadataUtil { public static org.apache.avro.generic.GenericRecord extractedMetadataRecord(Record next, boolean useHumanReadableMessageId, boolean useHumanReadableSchemaVersion, - boolean includeTopicToMetadata) { + boolean includeTopicToMetadata, + boolean includePublishTimeToMetadata) { final Message message = next.getMessage().get(); - GenericData.Record record = new GenericData.Record(buildMetadataSchema( - useHumanReadableMessageId, useHumanReadableSchemaVersion, includeTopicToMetadata)); + GenericData.Record record = new GenericData.Record(buildMetadataSchema(useHumanReadableMessageId, + useHumanReadableSchemaVersion, includeTopicToMetadata, includePublishTimeToMetadata)); record.put(METADATA_PROPERTIES_KEY, message.getProperties()); if (useHumanReadableSchemaVersion) { record.put(METADATA_SCHEMA_VERSION_KEY, @@ -71,21 +73,17 @@ public static org.apache.avro.generic.GenericRecord extractedMetadataRecord(Reco if (includeTopicToMetadata) { record.put(METADATA_TOPIC, message.getTopicName()); } + if (includePublishTimeToMetadata) { + record.put(METADATA_PUBLISH_TIME, message.getPublishTime()); + } return record; } - public static org.apache.avro.generic.GenericRecord extractedMetadataRecord(Record next) { - return extractedMetadataRecord(next, false, false, false); - } - - public static Map extractedMetadata(Record next) { - return extractedMetadata(next, false, false, false); - } - public static Map extractedMetadata(Record next, boolean useHumanReadableMessageId, boolean useHumanReadableSchemaVersion, - boolean includeTopicToMetadata) { + boolean includeTopicToMetadata, + boolean includePublishTimeToMetadata) { Map metadata = new HashMap<>(); final Message message = next.getMessage().get(); metadata.put(METADATA_PROPERTIES_KEY, message.getProperties()); @@ -103,6 +101,9 @@ public static Map extractedMetadata(Record next, if (includeTopicToMetadata) { metadata.put(METADATA_TOPIC, message.getTopicName()); } + if (includePublishTimeToMetadata) { + metadata.put(METADATA_PUBLISH_TIME, message.getPublishTime()); + } return metadata; } @@ -124,14 +125,15 @@ public static Schema setMetadataSchema(Schema schema) { public static Schema setMetadataSchema(Schema schema, boolean useHumanReadableMessageId, boolean useHumanReadableSchemaVersion, - boolean includeTopicToMetadata) { + boolean includeTopicToMetadata, + boolean includePublishTimeToMetadata) { final List fieldWithMetadata = schemaFieldThreadLocal.get(); fieldWithMetadata.clear(); schema.getFields().forEach(f -> { fieldWithMetadata.add(new Schema.Field(f, f.schema())); }); - fieldWithMetadata.add(new Schema.Field(MESSAGE_METADATA_KEY, buildMetadataSchema( - useHumanReadableMessageId, useHumanReadableSchemaVersion, includeTopicToMetadata))); + fieldWithMetadata.add(new Schema.Field(MESSAGE_METADATA_KEY, buildMetadataSchema(useHumanReadableMessageId, + useHumanReadableSchemaVersion, includeTopicToMetadata, includePublishTimeToMetadata))); return Schema.createRecord(schema.getName(), schema.getDoc(), schema.getNamespace(), @@ -141,12 +143,13 @@ public static Schema setMetadataSchema(Schema schema, } private static Schema buildMetadataSchema(){ - return buildMetadataSchema(false, false, false); + return buildMetadataSchema(false, false, false, false); } private static Schema buildMetadataSchema(boolean useHumanReadableMessageId, boolean useHumanReadableSchemaVersion, - boolean includeTopicToMetadata) { + boolean includeTopicToMetadata, + boolean includePublishTimeToMetadata) { List fields = new ArrayList<>(); fields.add(new Schema.Field(METADATA_PROPERTIES_KEY, Schema.createUnion( @@ -172,6 +175,10 @@ private static Schema buildMetadataSchema(boolean useHumanReadableMessageId, fields.add(new Schema.Field(METADATA_TOPIC, Schema.createUnion( Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.STRING)))); } + if (includePublishTimeToMetadata) { + fields.add(new Schema.Field(METADATA_PUBLISH_TIME, Schema.createUnion( + Schema.create(Schema.Type.NULL), Schema.create(Schema.Type.LONG)))); + } return Schema.createRecord(MESSAGE_METADATA_NAME, null, null, @@ -187,7 +194,9 @@ public static long parseSchemaVersionFromBytes(byte[] schemaVersion) { public static Metadata.PulsarIOCSCProtobufMessageMetadata getMetadataFromMessage(Record next, boolean useHumanReadableMessageId, - boolean useHumanReadableSchemaVersion) { + boolean useHumanReadableSchemaVersion, + boolean includeTopicToMetadata, + boolean includePublishTimeToMetadata) { Metadata.PulsarIOCSCProtobufMessageMetadata.Builder metadataBuilder = Metadata.PulsarIOCSCProtobufMessageMetadata.newBuilder(); final Message message = next.getMessage().get(); @@ -204,6 +213,12 @@ public static Metadata.PulsarIOCSCProtobufMessageMetadata getMetadataFromMessage } else { metadataBuilder.setMessageIdBytes(ByteString.copyFrom(message.getMessageId().toByteArray())); } + if (includeTopicToMetadata) { + metadataBuilder.setTopic(message.getTopicName()); + } + if (includePublishTimeToMetadata) { + metadataBuilder.setPublishTime(message.getPublishTime()); + } return metadataBuilder.build(); } } diff --git a/src/main/proto/ProtobufMessageMetadata.proto b/src/main/proto/ProtobufMessageMetadata.proto index ea054d16..955de9c5 100644 --- a/src/main/proto/ProtobufMessageMetadata.proto +++ b/src/main/proto/ProtobufMessageMetadata.proto @@ -26,5 +26,7 @@ message PulsarIOCSCProtobufMessageMetadata { map properties = 1; string schema_version = 2; string message_id = 3; + string topic = 4; + int64 publish_time = 5; } diff --git a/src/test/java/org/apache/pulsar/io/jcloud/utils/MetadataUtilTest.java b/src/test/java/org/apache/pulsar/io/jcloud/utils/MetadataUtilTest.java index 251d26cb..63271c41 100644 --- a/src/test/java/org/apache/pulsar/io/jcloud/utils/MetadataUtilTest.java +++ b/src/test/java/org/apache/pulsar/io/jcloud/utils/MetadataUtilTest.java @@ -45,8 +45,10 @@ public void testExtractedMetadata() throws IOException { byte[] schemaVersionBytes = new byte[]{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0A}; byte[] messageIdBytes = new byte[]{0x00, 0x01, 0x02, 0x03}; String topicName = "test-topic"; + long publishTime = System.currentTimeMillis(); Record mockRecord = mock(Record.class); Message mockMessage = mock(Message.class); + when(mockMessage.getPublishTime()).thenReturn(publishTime); MessageId mockMessageId = mock(MessageId.class); when(mockRecord.getMessage()).thenReturn(Optional.of(mockMessage)); when(mockMessage.getMessageId()).thenReturn(mockMessageId); @@ -57,22 +59,23 @@ public void testExtractedMetadata() throws IOException { when(mockMessage.getTopicName()).thenReturn(topicName); Map metadataWithHumanReadableMetadata = - MetadataUtil.extractedMetadata(mockRecord, true, true, true); + MetadataUtil.extractedMetadata(mockRecord, true, true, true, true); Assert.assertEquals(metadataWithHumanReadableMetadata.get(METADATA_MESSAGE_ID_KEY), messageIdString); Assert.assertNotEquals(metadataWithHumanReadableMetadata.get(METADATA_MESSAGE_ID_KEY), ByteBuffer.wrap(messageIdBytes)); Assert.assertEquals(metadataWithHumanReadableMetadata.get(METADATA_SCHEMA_VERSION_KEY), MetadataUtil.parseSchemaVersionFromBytes(schemaVersionBytes)); Assert.assertEquals(metadataWithHumanReadableMetadata.get(MetadataUtil.METADATA_TOPIC), topicName); + Assert.assertEquals(metadataWithHumanReadableMetadata.get(MetadataUtil.METADATA_PUBLISH_TIME), publishTime); Map metadataWithHumanReadableMessageId = - MetadataUtil.extractedMetadata(mockRecord, true, false, false); + MetadataUtil.extractedMetadata(mockRecord, true, false, false, false); Assert.assertEquals(metadataWithHumanReadableMessageId.get(METADATA_MESSAGE_ID_KEY), messageIdString); Assert.assertNotEquals(metadataWithHumanReadableMessageId.get(METADATA_MESSAGE_ID_KEY), ByteBuffer.wrap(messageIdBytes)); - Map metadata = MetadataUtil.extractedMetadata(mockRecord, false, false, false); + Map metadata = MetadataUtil.extractedMetadata(mockRecord, false, false, false, false); Assert.assertEquals(metadata.get(METADATA_MESSAGE_ID_KEY), ByteBuffer.wrap(messageIdBytes)); Assert.assertEquals(metadata.get(METADATA_SCHEMA_VERSION_KEY), schemaVersionBytes); Assert.assertNotEquals(metadata.get(METADATA_MESSAGE_ID_KEY), messageIdString);