Skip to content

Commit

Permalink
DRILL-8421: Parquet microsecond columns (#2793)
Browse files Browse the repository at this point in the history
* Read parquet TIME_MICROS columns as 64-bit values before truncating to 32-bits
* Truncate parquet min and max metadata values for microsecond columns to milliseconds
* Express parquet TIME_MICROS metadata as Integer values
  • Loading branch information
handmadecode authored and jnturton committed Apr 18, 2023
1 parent 20f9f2c commit fa22d67
Show file tree
Hide file tree
Showing 6 changed files with 379 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -344,10 +344,11 @@ public static Object getValue(Object value, PrimitiveType.PrimitiveTypeName prim
case INT64:
if (originalType == OriginalType.DECIMAL) {
return BigInteger.valueOf(getLong(value));
} else if (originalType == OriginalType.TIME_MICROS) {
return getInt(value);
} else {
return getLong(value);
}

case FLOAT:
return getFloat(value);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ static class NullableDictionaryTimeMicrosReader extends NullableColumnReader<Nul
protected void readField(long recordsToReadInThisPass) {
ValuesReader valReader = usingDictionary ? pageReader.getDictionaryValueReader() : pageReader.getValueReader();
for (int i = 0; i < recordsToReadInThisPass; i++) {
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, valReader.readInteger() / 1000);
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, (int) (valReader.readLong() / 1000));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,13 @@ protected void readField(long recordsToReadInThisPass) {
if (recordsRequireDecoding()) {
ValuesReader valReader = usingDictionary ? pageReader.getDictionaryValueReader() : pageReader.getValueReader();
for (int i = 0; i < recordsReadInThisIteration; i++) {
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, valReader.readInteger() / 1000);
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, (int) (valReader.readLong() / 1000));
}
} else {
int dataTypeLengthInBytes = (int) Math.ceil(dataTypeLengthInBits / 8.0);
for (int i = 0; i < recordsReadInThisIteration; i++) {
int value = pageReader.pageData.getInt((int) readStartInBytes + i * dataTypeLengthInBytes);
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, value / 1000);
long value = pageReader.pageData.getLong((int) readStartInBytes + i * dataTypeLengthInBytes);
valueVec.getMutator().setSafe(valuesReadInCurrentPass + i, (int) (value / 1000));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,12 @@ private void addColumnMetadata(String[] columnName,
minValue = ParquetReaderUtility.autoCorrectCorruptedDate((Integer) minValue);
maxValue = ParquetReaderUtility.autoCorrectCorruptedDate((Integer) maxValue);
}
if (isMicrosecondColumnType(columnTypeMetadata.originalType)) {
// DRILL-8241: truncate the min/max of microsecond columns to milliseconds, otherwise the
// initial scanning of files when filtering will compare to the wrong values.
minValue = truncateMicros(minValue);
maxValue = truncateMicros(maxValue);
}
}
long numNulls = stats.getNumNulls();
Metadata_V4.ColumnMetadata_v4 columnMetadata = new Metadata_V4.ColumnMetadata_v4(columnTypeMetadata.name,
Expand All @@ -218,6 +224,18 @@ private void addColumnMetadata(String[] columnName,
columnTypeInfo.put(columnTypeMetadataKey, columnTypeMetadata);
}

private static boolean isMicrosecondColumnType(OriginalType columnType) {
return columnType == OriginalType.TIME_MICROS || columnType == OriginalType.TIMESTAMP_MICROS;
}

private static Object truncateMicros(Object microSeconds) {
if (microSeconds instanceof Number) {
return Long.valueOf(((Number) microSeconds).longValue() / 1000);
} else {
return microSeconds;
}
}

/**
* Get the host affinity for a row group.
*
Expand Down
Loading

0 comments on commit fa22d67

Please sign in to comment.