Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ORC-1388: [C++] Support schema evolution from decimal to timestamp/string group #1761

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 124 additions & 12 deletions c++/src/ConvertColumnReader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,105 @@ namespace orc {
int32_t toScale;
};

template <typename FileTypeBatch>
class DecimalToTimestampColumnReader : public ConvertToTimestampColumnReader {
public:
DecimalToTimestampColumnReader(const Type& _readType, const Type& fileType,
StripeStreams& stripe, bool _throwOnOverflow)
: ConvertToTimestampColumnReader(_readType, fileType, stripe, _throwOnOverflow),
precision(static_cast<int32_t>(fileType.getPrecision())),
scale(static_cast<int32_t>(fileType.getScale())) {}

void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override {
ConvertColumnReader::next(rowBatch, numValues, notNull);
const auto& srcBatch = *SafeCastBatchTo<const FileTypeBatch*>(data.get());
auto& dstBatch = *SafeCastBatchTo<TimestampVectorBatch*>(&rowBatch);
for (uint64_t i = 0; i < rowBatch.numElements; ++i) {
if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
convertDecimalToTimestamp(dstBatch, i, srcBatch);
}
}
}

private:
void convertDecimalToTimestamp(TimestampVectorBatch& dstBatch, uint64_t idx,
const FileTypeBatch& srcBatch) {
constexpr int SecondToNanoFactor = 9;
// '-1000000000-01-01T00:00Z'
constexpr int64_t MIN_EPOCH_SECONDS = -31557014167219200L;
// '1000000000-12-31T23:59:59.999999999Z'
constexpr int64_t MAX_EPOCH_SECONDS = 31556889864403199L;
// dummy variable, there's no risk of overflow

Int128 i128(srcBatch.values[idx]);
bool overflow = false;
Int128 integerPortion = scaleDownInt128ByPowerOfTen(i128, scale);
if (integerPortion < MIN_EPOCH_SECONDS || integerPortion > MAX_EPOCH_SECONDS) {
handleOverflow<Decimal, int64_t>(dstBatch, idx, throwOnOverflow);
return;
}
i128 -= scaleUpInt128ByPowerOfTen(integerPortion, scale, overflow);
Int128 fractionPortion = std::move(i128);
if (scale < SecondToNanoFactor) {
fractionPortion =
scaleUpInt128ByPowerOfTen(fractionPortion, SecondToNanoFactor - scale, overflow);
} else {
fractionPortion = scaleDownInt128ByPowerOfTen(fractionPortion, scale - SecondToNanoFactor);
}
if (fractionPortion < 0) {
fractionPortion += 1e9;
integerPortion -= 1;
}
dstBatch.data[idx] = integerPortion.toLong();
dstBatch.nanoseconds[idx] = fractionPortion.toLong();

if (needConvertTimezone) {
dstBatch.data[idx] = readerTimezone.convertFromUTC(dstBatch.data[idx]);
}
}

const int32_t precision;
const int32_t scale;
};

template <typename FileTypeBatch>
class DecimalToStringVariantColumnReader : public ConvertToStringVariantColumnReader {
public:
DecimalToStringVariantColumnReader(const Type& _readType, const Type& fileType,
StripeStreams& stripe, bool _throwOnOverflow)
: ConvertToStringVariantColumnReader(_readType, fileType, stripe, _throwOnOverflow),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to this PR: we have many weird _xxx variable names appearing in the function signature. I'm thinking to do something like apache/arrow, which only use xxx_ as the name of private class member variables. In this way, we can get rid of the weird case like this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to this PR: we have many weird _xxx variable names appearing in the function signature. I'm thinking to do something like apache/arrow, which only use xxx_ as the name of private class member variables. In this way, we can get rid of the weird case like this.

Maybe we can use clang-tidy with option "readability-identifier-naming.ClassMemberSuffix" to do this. I can make a PR to fix the identifiers naming style this or next week.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you!

scale(fileType.getScale()) {}

uint64_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t numValues) override {
uint64_t size = 0;
strBuffer.resize(numValues);
const auto& srcBatch = *SafeCastBatchTo<const FileTypeBatch*>(data.get());
if (readType.getKind() == STRING) {
for (uint64_t i = 0; i < rowBatch.numElements; ++i) {
if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
strBuffer[i] = Int128(srcBatch.values[i]).toDecimalString(scale, true);
size += strBuffer[i].size();
}
}
} else {
const auto maxLength = readType.getMaximumLength();
for (uint64_t i = 0; i < rowBatch.numElements; ++i) {
if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
strBuffer[i] = Int128(srcBatch.values[i]).toDecimalString(scale, true);
}
if (strBuffer[i].size() > maxLength) {
strBuffer[i].resize(maxLength);
}
size += strBuffer[i].size();
}
}
return size;
}

private:
const int32_t scale;
};

#define DEFINE_NUMERIC_CONVERT_READER(FROM, TO, TYPE) \
using FROM##To##TO##ColumnReader = \
NumericConvertColumnReader<FROM##VectorBatch, TO##VectorBatch, TYPE>;
Expand Down Expand Up @@ -621,6 +720,14 @@ namespace orc {
using Decimal128##To##TO##ColumnReader = \
DecimalConvertColumnReader<Decimal128VectorBatch, TO##VectorBatch>;

#define DEFINE_DECIMAL_CONVERT_TO_TIMESTAMP_READER \
using Decimal64ToTimestampColumnReader = DecimalToTimestampColumnReader<Decimal64VectorBatch>; \
using Decimal128ToTimestampColumnReader = DecimalToTimestampColumnReader<Decimal128VectorBatch>;

#define DEFINE_DECIMAL_CONVERT_TO_STRING_VARINT_READER(TO) \
using Decimal64To##TO##ColumnReader = DecimalToStringVariantColumnReader<Decimal64VectorBatch>; \
using Decimal128To##TO##ColumnReader = DecimalToStringVariantColumnReader<Decimal128VectorBatch>;

DEFINE_NUMERIC_CONVERT_READER(Boolean, Byte, int8_t)
DEFINE_NUMERIC_CONVERT_READER(Boolean, Short, int16_t)
DEFINE_NUMERIC_CONVERT_READER(Boolean, Int, int32_t)
Expand Down Expand Up @@ -720,6 +827,11 @@ namespace orc {
DEFINE_DECIMAL_CONVERT_TO_DECIMAL_READER(Decimal64)
DEFINE_DECIMAL_CONVERT_TO_DECIMAL_READER(Decimal128)

DEFINE_DECIMAL_CONVERT_TO_TIMESTAMP_READER
DEFINE_DECIMAL_CONVERT_TO_STRING_VARINT_READER(String)
DEFINE_DECIMAL_CONVERT_TO_STRING_VARINT_READER(Char)
DEFINE_DECIMAL_CONVERT_TO_STRING_VARINT_READER(Varchar)

#define CREATE_READER(NAME) \
return std::make_unique<NAME>(_readType, fileType, stripe, throwOnOverflow);

Expand Down Expand Up @@ -935,13 +1047,6 @@ namespace orc {
CASE_EXCEPTION
}
}
case STRING:
case BINARY:
case TIMESTAMP:
case LIST:
case MAP:
case STRUCT:
case UNION:
case DECIMAL: {
switch (_readType.getKind()) {
CASE_CREATE_FROM_DECIMAL_READER(BOOLEAN, Boolean)
Expand All @@ -951,6 +1056,11 @@ namespace orc {
CASE_CREATE_FROM_DECIMAL_READER(LONG, Long)
CASE_CREATE_FROM_DECIMAL_READER(FLOAT, Float)
CASE_CREATE_FROM_DECIMAL_READER(DOUBLE, Double)
CASE_CREATE_FROM_DECIMAL_READER(STRING, String)
CASE_CREATE_FROM_DECIMAL_READER(CHAR, Char)
CASE_CREATE_FROM_DECIMAL_READER(VARCHAR, Varchar)
CASE_CREATE_FROM_DECIMAL_READER(TIMESTAMP, Timestamp)
CASE_CREATE_FROM_DECIMAL_READER(TIMESTAMP_INSTANT, Timestamp)
case DECIMAL: {
if (isDecimal64(fileType)) {
if (isDecimal64(_readType)) {
Expand All @@ -966,11 +1076,6 @@ namespace orc {
}
}
}
case STRING:
case CHAR:
case VARCHAR:
case TIMESTAMP:
case TIMESTAMP_INSTANT:
case BINARY:
case LIST:
case MAP:
Expand All @@ -980,6 +1085,13 @@ namespace orc {
CASE_EXCEPTION
}
}
case STRING:
case BINARY:
case TIMESTAMP:
case LIST:
case MAP:
case STRUCT:
case UNION:
case DATE:
case VARCHAR:
case CHAR:
Expand Down
3 changes: 2 additions & 1 deletion c++/src/SchemaEvolution.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ namespace orc {
break;
}
case DECIMAL: {
ret.isValid = ret.needConvert = isNumeric(readType);
ret.isValid = ret.needConvert =
isNumeric(readType) || isStringVariant(readType) || isTimestamp(readType);
break;
}
case STRING:
Expand Down
143 changes: 143 additions & 0 deletions c++/test/TestConvertColumnReader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* limitations under the License.
*/

#include "Timezone.hh"
#include "orc/Type.hh"
#include "wrap/gtest-wrapper.h"

Expand Down Expand Up @@ -672,4 +673,146 @@ namespace orc {
}
}

TEST(ConvertColumnReader, TestConvertDecimalToTimestamp) {
constexpr int DEFAULT_MEM_STREAM_SIZE = 10 * 1024 * 1024;
constexpr int TEST_CASES = 1024;
std::string writerTimezoneName = "America/New_York";
std::string readerTimezoneName = "Australia/Sydney";
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
std::unique_ptr<Type> fileType(
Type::buildTypeFromString("struct<c1:decimal(14,4),c2:decimal(25,10)>"));
std::shared_ptr<Type> readType(
Type::buildTypeFromString("struct<c1:timestamp,c2:timestamp with local time zone>"));
WriterOptions options;
options.setUseTightNumericVector(true);
options.setTimezoneName(writerTimezoneName);
auto writer = createWriter(*fileType, &memStream, options);
auto batch = writer->createRowBatch(TEST_CASES);
auto structBatch = dynamic_cast<StructVectorBatch*>(batch.get());
auto& c1 = dynamic_cast<Decimal64VectorBatch&>(*structBatch->fields[0]);
auto& c2 = dynamic_cast<Decimal128VectorBatch&>(*structBatch->fields[1]);

auto convertToSeconds = [](const Timezone& writerTimezone, const std::string& date) {
tm timeStruct;
if (strptime(date.c_str(), "%Y-%m-%d %H:%M:%S", &timeStruct) == nullptr) {
throw TimezoneError("bad time " + date);
}
return writerTimezone.convertFromUTC(timegm(&timeStruct));
};

std::vector<std::string> timeStrings;
for (int i = 0; i < TEST_CASES; i++) {
int64_t year = 1960 + (i / 12);
int64_t month = i % 12 + 1;
int64_t day = 27;
std::string others = "23:45:56";
std::stringstream ss;
ss << year << "-";
ss << std::setfill('0') << std::setw(2) << month << "-" << day << " " << others;
timeStrings.push_back(ss.str());
}
std::vector<int64_t> ts[2];
for (auto& time : timeStrings) {
ts[0].emplace_back(convertToSeconds(getTimezoneByName("GMT"), time));
ts[1].emplace_back(convertToSeconds(getTimezoneByName(writerTimezoneName), time));
}
bool overflow = false;

for (int i = 0; i < TEST_CASES; i++) {
c1.values[i] = ts[0][i] * 10000 + 1234;
c2.values[i] = scaleUpInt128ByPowerOfTen(Int128(ts[1][i]), 10, overflow) +=
Int128("1234567895");
assert(!overflow);
}

structBatch->numElements = c1.numElements = c2.numElements = TEST_CASES;
structBatch->hasNulls = c1.hasNulls = c2.hasNulls = false;
writer->add(*batch);
writer->close();
auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength());
auto pool = getDefaultPool();
auto reader = createReader(*pool, std::move(inStream));
RowReaderOptions rowReaderOptions;
rowReaderOptions.setUseTightNumericVector(true);
rowReaderOptions.setReadType(readType);
rowReaderOptions.setTimezoneName(readerTimezoneName);
auto rowReader = reader->createRowReader(rowReaderOptions);
auto readBatch = rowReader->createRowBatch(TEST_CASES);
EXPECT_EQ(true, rowReader->next(*readBatch));

auto& readSturctBatch = dynamic_cast<StructVectorBatch&>(*readBatch);
auto& readC1 = dynamic_cast<TimestampVectorBatch&>(*readSturctBatch.fields[0]);
auto& readC2 = dynamic_cast<TimestampVectorBatch&>(*readSturctBatch.fields[1]);
for (int i = 0; i < TEST_CASES; i++) {
size_t idx = static_cast<size_t>(i);
EXPECT_TRUE(readC1.notNull[idx]) << i;
EXPECT_TRUE(readC2.notNull[idx]) << i;
EXPECT_EQ(getTimezoneByName(readerTimezoneName).convertToUTC(readC1.data[i]), ts[0][i]);
EXPECT_TRUE(readC1.nanoseconds[i] == 123400000);
EXPECT_EQ(readC2.data[i], ts[1][i]);
if (readC2.data[i] < 0) {
EXPECT_EQ(readC2.nanoseconds[i], 123456790) << timeStrings[i];
} else {
EXPECT_EQ(readC2.nanoseconds[i], 123456789) << timeStrings[i];
}
}
}

TEST(ConvertColumnReader, TestConvertDecimalToStringVariant) {
constexpr int DEFAULT_MEM_STREAM_SIZE = 10 * 1024 * 1024;
constexpr int TEST_CASES = 1024;
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
std::unique_ptr<Type> fileType(
Type::buildTypeFromString("struct<c1:decimal(14,3),c2:decimal(14,3),c3:decimal(14,3)>"));
std::shared_ptr<Type> readType(
Type::buildTypeFromString("struct<c1:char(5),c2:varchar(5),c3:string>"));
WriterOptions options;
auto writer = createWriter(*fileType, &memStream, options);
auto batch = writer->createRowBatch(TEST_CASES);
auto structBatch = dynamic_cast<StructVectorBatch*>(batch.get());
auto& c1 = dynamic_cast<Decimal64VectorBatch&>(*structBatch->fields[0]);
auto& c2 = dynamic_cast<Decimal64VectorBatch&>(*structBatch->fields[1]);
auto& c3 = dynamic_cast<Decimal64VectorBatch&>(*structBatch->fields[2]);

for (int i = 0; i < TEST_CASES; i++) {
c1.values[i] = i * 1000 + 123;
c2.values[i] = i * 1000 + 456;
c3.values[i] = i * 1000 + 789;
}
structBatch->numElements = c1.numElements = c2.numElements = TEST_CASES;
structBatch->hasNulls = c1.hasNulls = c2.hasNulls = false;
writer->add(*batch);
writer->close();
auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength());
auto pool = getDefaultPool();
auto reader = createReader(*pool, std::move(inStream));
RowReaderOptions rowReaderOptions;
rowReaderOptions.setUseTightNumericVector(true);
rowReaderOptions.setReadType(readType);
auto rowReader = reader->createRowReader(rowReaderOptions);
auto readBatch = rowReader->createRowBatch(TEST_CASES);
EXPECT_EQ(true, rowReader->next(*readBatch));

auto& readSturctBatch = dynamic_cast<StructVectorBatch&>(*readBatch);
auto& readC1 = dynamic_cast<StringVectorBatch&>(*readSturctBatch.fields[0]);
auto& readC2 = dynamic_cast<StringVectorBatch&>(*readSturctBatch.fields[1]);
auto& readC3 = dynamic_cast<StringVectorBatch&>(*readSturctBatch.fields[2]);
for (int i = 0; i < TEST_CASES; i++) {
if (i < 10) {
EXPECT_EQ(std::to_string(i) + ".123", std::string(readC1.data[i], readC1.length[i]));
EXPECT_EQ(std::to_string(i) + ".456", std::string(readC2.data[i], readC2.length[i]));
} else if (i >= 10 && i < 100) {
EXPECT_EQ(std::to_string(i) + ".12", std::string(readC1.data[i], readC1.length[i]));
EXPECT_EQ(std::to_string(i) + ".45", std::string(readC2.data[i], readC2.length[i]));
} else if (i >= 100 && i < 1000) {
EXPECT_EQ(std::to_string(i) + ".1", std::string(readC1.data[i], readC1.length[i]));
EXPECT_EQ(std::to_string(i) + ".4", std::string(readC2.data[i], readC2.length[i]));
} else {
EXPECT_EQ(std::to_string(i) + ".", std::string(readC1.data[i], readC1.length[i]));
EXPECT_EQ(std::to_string(i) + ".", std::string(readC2.data[i], readC2.length[i]));
}
EXPECT_EQ(std::to_string(i) + ".789", std::string(readC3.data[i], readC3.length[i]));
}
}

} // namespace orc
16 changes: 16 additions & 0 deletions c++/test/TestSchemaEvolution.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,22 @@ namespace orc {
}
}

// conversion from decimal to string/char/varchar
for (size_t i = 12; i <= 13; i++) {
for (size_t j = 7; j <= 11; j++) {
canConvert[i][j] = true;
needConvert[i][j] = true;
}
}

// conversion from decimal to timestamp
for (size_t i = 12; i <= 13; i++) {
for (size_t j = 14; j <= 15; j++) {
canConvert[i][j] = true;
needConvert[i][j] = true;
}
}

for (size_t i = 0; i < typesSize; i++) {
for (size_t j = 0; j < typesSize; j++) {
testConvertReader(types[i], types[j], canConvert[i][j], needConvert[i][j]);
Expand Down
Loading