Skip to content

Commit

Permalink
Add SchemaConformingTransformerV2 to enhance text search abilities (a…
Browse files Browse the repository at this point in the history
…pache#12788)

* Add SchemaConformingTransformerV2 to enhance text search abilities

* Fix style

* Update __mergedTextIndex field logics

* Fix UT

* Resolve comments and add fieldPathsToPreserveInput config
  • Loading branch information
lnbest0707-uber authored Apr 9, 2024
1 parent c93de37 commit 13673f1
Show file tree
Hide file tree
Showing 14 changed files with 2,120 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
LAST_REALTIME_SEGMENT_CATCHUP_DURATION_SECONDS("seconds", false),
LAST_REALTIME_SEGMENT_COMPLETION_DURATION_SECONDS("seconds", false),
REALTIME_OFFHEAP_MEMORY_USED("bytes", false),
REALTIME_MERGED_TEXT_IDX_DOCUMENT_AVG_LEN("bytes", false),
REALTIME_SEGMENT_NUM_PARTITIONS("realtimeSegmentNumPartitions", false),
LLC_SIMULTANEOUS_SEGMENT_BUILDS("llcSimultaneousSegmentBuilds", true),
// Upsert metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public enum ServerMeter implements AbstractMetrics.Meter {
INVALID_REALTIME_ROWS_DROPPED("rows", false),
INCOMPLETE_REALTIME_ROWS_CONSUMED("rows", false),
REALTIME_CONSUMPTION_EXCEPTIONS("exceptions", true),
REALTIME_MERGED_TEXT_IDX_TRUNCATED_DOCUMENT_SIZE("bytes", false),
REALTIME_OFFSET_COMMITS("commits", true),
REALTIME_OFFSET_COMMIT_EXCEPTIONS("exceptions", false),
STREAM_CONSUMER_CREATE_EXCEPTIONS("exceptions", false),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ protected void buildSegment()
.setIngestionConfig(new IngestionConfig(null, null, null, null,
Arrays.asList(new TransformConfig(M1_V2, "Groovy({INT_COL1_V3 == null || "
+ "INT_COL1_V3 == Integer.MIN_VALUE ? INT_COL1 : INT_COL1_V3 }, INT_COL1, INT_COL1_V3)")),
null, null, null))
null, null, null, null))
.build();
Schema schema =
new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).addSingleValueDimension(D1, FieldSpec.DataType.STRING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,13 @@ public class CompositeTransformer implements RecordTransformer {
* records that have varying fields to a fixed schema without dropping any fields
* </li>
* <li>
* {@link DataTypeTransformer} after {@link SchemaConformingTransformer} to convert values to comply with the
* schema
* Optional {@link SchemaConformingTransformerV2} after {@link FilterTransformer}, so that we can transform
* input records that have varying fields to a fixed schema and keep or drop other fields by configuration. We
* could also gain enhanced text search capabilities from it.
* </li>
* <li>
* {@link DataTypeTransformer} after {@link SchemaConformingTransformer} or {@link SchemaConformingTransformerV2}
* to convert values to comply with the schema
* </li>
* <li>
* Optional {@link TimeValidationTransformer} after {@link DataTypeTransformer} so that time value is converted to
Expand All @@ -78,7 +83,8 @@ public class CompositeTransformer implements RecordTransformer {
*/
public static List<RecordTransformer> getDefaultTransformers(TableConfig tableConfig, Schema schema) {
return Stream.of(new ExpressionTransformer(tableConfig, schema), new FilterTransformer(tableConfig),
new SchemaConformingTransformer(tableConfig, schema), new DataTypeTransformer(tableConfig, schema),
new SchemaConformingTransformer(tableConfig, schema),
new SchemaConformingTransformerV2(tableConfig, schema), new DataTypeTransformer(tableConfig, schema),
new TimeValidationTransformer(tableConfig, schema), new SpecialValueTransformer(schema),
new NullValueTransformer(tableConfig, schema), new SanitizationTransformer(schema)).filter(t -> !t.isNoOp())
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ private static void validateSchemaFieldNames(Set<String> schemaFields,
/**
* @return The field type for the given extras field
*/
private static DataType getAndValidateExtrasFieldType(Schema schema, @Nonnull String extrasFieldName) {
static DataType getAndValidateExtrasFieldType(Schema schema, @Nonnull String extrasFieldName) {
FieldSpec fieldSpec = schema.getFieldSpecFor(extrasFieldName);
Preconditions.checkState(null != fieldSpec, "Field '%s' doesn't exist in schema", extrasFieldName);
DataType fieldDataType = fieldSpec.getDataType();
Expand Down Expand Up @@ -250,7 +250,7 @@ private static Map<String, Object> validateSchemaAndCreateTree(@Nonnull Schema s
* @param subKeys Returns the sub-keys
* @throws IllegalArgumentException if any sub-key is empty
*/
private static void getAndValidateSubKeys(String key, int firstKeySeparatorIdx, List<String> subKeys)
static void getAndValidateSubKeys(String key, int firstKeySeparatorIdx, List<String> subKeys)
throws IllegalArgumentException {
int subKeyBeginIdx = 0;
int subKeyEndIdx = firstKeySeparatorIdx;
Expand Down Expand Up @@ -511,7 +511,16 @@ public void addIndexableEntry(String key, Object value) {
if (null == _indexableExtras) {
_indexableExtras = new HashMap<>();
}
_indexableExtras.put(key, value);
if (key == null && value instanceof Map) {
// If the key is null, it means that the value is a map that should be merged with the indexable extras
_indexableExtras.putAll((Map<String, Object>) value);
} else if (_indexableExtras.containsKey(key) && _indexableExtras.get(key) instanceof Map && value instanceof Map) {
// If the key already exists in the indexable extras and both the existing value and the new value are maps,
// merge the two maps
((Map<String, Object>) _indexableExtras.get(key)).putAll((Map<String, Object>) value);
} else {
_indexableExtras.put(key, value);
}
}

/**
Expand All @@ -524,7 +533,17 @@ public void addUnindexableEntry(String key, Object value) {
if (null == _unindexableExtras) {
_unindexableExtras = new HashMap<>();
}
_unindexableExtras.put(key, value);
if (key == null && value instanceof Map) {
// If the key is null, it means that the value is a map that should be merged with the unindexable extras
_unindexableExtras.putAll((Map<String, Object>) value);
} else if (_unindexableExtras.containsKey(key) && _unindexableExtras.get(key) instanceof Map
&& value instanceof Map) {
// If the key already exists in the uindexable extras and both the existing value and the new value are maps,
// merge the two maps
((Map<String, Object>) _unindexableExtras.get(key)).putAll((Map<String, Object>) value);
} else {
_unindexableExtras.put(key, value);
}
}

/**
Expand All @@ -542,4 +561,8 @@ public void addChild(String key, ExtraFieldsContainer child) {
addUnindexableEntry(key, childUnindexableFields);
}
}

public void addChild(ExtraFieldsContainer child) {
addChild(null, child);
}
}
Loading

0 comments on commit 13673f1

Please sign in to comment.