Skip to content

Commit

Permalink
ORC-1388: [C++] Support schema evolution from decimal to timestamp/st…
Browse files Browse the repository at this point in the history
…ring group

### What changes were proposed in this pull request?
Support conversion from
{decimal}
to
{string, char, varchar, timestamp, timestamp_instant}

### Why are the changes needed?
Support schema evolution at cpp side.

### How was this patch tested?
UT passed.

### Was this patch authored or co-authored using generative AI tooling?
NO

Closes #1761 from ffacs/ORC-1388.

Authored-by: ffacs <ffacs520@gmail.com>
Signed-off-by: Gang Wu <ustcwg@gmail.com>
  • Loading branch information
ffacs authored and wgtmac committed Jan 30, 2024
1 parent 8f22732 commit f326f6a
Show file tree
Hide file tree
Showing 4 changed files with 287 additions and 13 deletions.
138 changes: 126 additions & 12 deletions c++/src/ConvertColumnReader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,107 @@ namespace orc {
int32_t toScale;
};

template <typename FileTypeBatch>
class DecimalToTimestampColumnReader : public ConvertToTimestampColumnReader {
public:
DecimalToTimestampColumnReader(const Type& _readType, const Type& fileType,
StripeStreams& stripe, bool _throwOnOverflow)
: ConvertToTimestampColumnReader(_readType, fileType, stripe, _throwOnOverflow),
precision(static_cast<int32_t>(fileType.getPrecision())),
scale(static_cast<int32_t>(fileType.getScale())) {}

void next(ColumnVectorBatch& rowBatch, uint64_t numValues, char* notNull) override {
ConvertColumnReader::next(rowBatch, numValues, notNull);
const auto& srcBatch = *SafeCastBatchTo<const FileTypeBatch*>(data.get());
auto& dstBatch = *SafeCastBatchTo<TimestampVectorBatch*>(&rowBatch);
for (uint64_t i = 0; i < rowBatch.numElements; ++i) {
if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
convertDecimalToTimestamp(dstBatch, i, srcBatch);
}
}
}

private:
void convertDecimalToTimestamp(TimestampVectorBatch& dstBatch, uint64_t idx,
const FileTypeBatch& srcBatch) {
constexpr int SecondToNanoFactor = 9;
// Following constant comes from java.time.Instant
// '-1000000000-01-01T00:00Z'
constexpr int64_t MIN_EPOCH_SECONDS = -31557014167219200L;
// '1000000000-12-31T23:59:59.999999999Z'
constexpr int64_t MAX_EPOCH_SECONDS = 31556889864403199L;
// dummy variable, there's no risk of overflow
bool overflow = false;

Int128 i128(srcBatch.values[idx]);
Int128 integerPortion = scaleDownInt128ByPowerOfTen(i128, scale);
if (integerPortion < MIN_EPOCH_SECONDS || integerPortion > MAX_EPOCH_SECONDS) {
handleOverflow<Decimal, int64_t>(dstBatch, idx, throwOnOverflow);
return;
}
i128 -= scaleUpInt128ByPowerOfTen(integerPortion, scale, overflow);
Int128 fractionPortion = std::move(i128);
if (scale < SecondToNanoFactor) {
fractionPortion =
scaleUpInt128ByPowerOfTen(fractionPortion, SecondToNanoFactor - scale, overflow);
} else {
fractionPortion = scaleDownInt128ByPowerOfTen(fractionPortion, scale - SecondToNanoFactor);
}
if (fractionPortion < 0) {
fractionPortion += 1e9;
integerPortion -= 1;
}
// line 630 has guaranteed toLong() will not overflow
dstBatch.data[idx] = integerPortion.toLong();
dstBatch.nanoseconds[idx] = fractionPortion.toLong();

if (needConvertTimezone) {
dstBatch.data[idx] = readerTimezone.convertFromUTC(dstBatch.data[idx]);
}
}

const int32_t precision;
const int32_t scale;
};

template <typename FileTypeBatch>
class DecimalToStringVariantColumnReader : public ConvertToStringVariantColumnReader {
public:
DecimalToStringVariantColumnReader(const Type& _readType, const Type& fileType,
StripeStreams& stripe, bool _throwOnOverflow)
: ConvertToStringVariantColumnReader(_readType, fileType, stripe, _throwOnOverflow),
scale(fileType.getScale()) {}

uint64_t convertToStrBuffer(ColumnVectorBatch& rowBatch, uint64_t numValues) override {
uint64_t size = 0;
strBuffer.resize(numValues);
const auto& srcBatch = *SafeCastBatchTo<const FileTypeBatch*>(data.get());
if (readType.getKind() == STRING) {
for (uint64_t i = 0; i < rowBatch.numElements; ++i) {
if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
strBuffer[i] = Int128(srcBatch.values[i]).toDecimalString(scale, true);
size += strBuffer[i].size();
}
}
} else {
const auto maxLength = readType.getMaximumLength();
for (uint64_t i = 0; i < rowBatch.numElements; ++i) {
if (!rowBatch.hasNulls || rowBatch.notNull[i]) {
strBuffer[i] = Int128(srcBatch.values[i]).toDecimalString(scale, true);
}
if (strBuffer[i].size() > maxLength) {
strBuffer[i].resize(maxLength);
}
size += strBuffer[i].size();
}
}
return size;
}

private:
const int32_t scale;
};

#define DEFINE_NUMERIC_CONVERT_READER(FROM, TO, TYPE) \
using FROM##To##TO##ColumnReader = \
NumericConvertColumnReader<FROM##VectorBatch, TO##VectorBatch, TYPE>;
Expand Down Expand Up @@ -621,6 +722,14 @@ namespace orc {
using Decimal128##To##TO##ColumnReader = \
DecimalConvertColumnReader<Decimal128VectorBatch, TO##VectorBatch>;

#define DEFINE_DECIMAL_CONVERT_TO_TIMESTAMP_READER \
using Decimal64ToTimestampColumnReader = DecimalToTimestampColumnReader<Decimal64VectorBatch>; \
using Decimal128ToTimestampColumnReader = DecimalToTimestampColumnReader<Decimal128VectorBatch>;

#define DEFINE_DECIMAL_CONVERT_TO_STRING_VARINT_READER(TO) \
using Decimal64To##TO##ColumnReader = DecimalToStringVariantColumnReader<Decimal64VectorBatch>; \
using Decimal128To##TO##ColumnReader = DecimalToStringVariantColumnReader<Decimal128VectorBatch>;

DEFINE_NUMERIC_CONVERT_READER(Boolean, Byte, int8_t)
DEFINE_NUMERIC_CONVERT_READER(Boolean, Short, int16_t)
DEFINE_NUMERIC_CONVERT_READER(Boolean, Int, int32_t)
Expand Down Expand Up @@ -720,6 +829,11 @@ namespace orc {
DEFINE_DECIMAL_CONVERT_TO_DECIMAL_READER(Decimal64)
DEFINE_DECIMAL_CONVERT_TO_DECIMAL_READER(Decimal128)

DEFINE_DECIMAL_CONVERT_TO_TIMESTAMP_READER
DEFINE_DECIMAL_CONVERT_TO_STRING_VARINT_READER(String)
DEFINE_DECIMAL_CONVERT_TO_STRING_VARINT_READER(Char)
DEFINE_DECIMAL_CONVERT_TO_STRING_VARINT_READER(Varchar)

#define CREATE_READER(NAME) \
return std::make_unique<NAME>(_readType, fileType, stripe, throwOnOverflow);

Expand Down Expand Up @@ -935,13 +1049,6 @@ namespace orc {
CASE_EXCEPTION
}
}
case STRING:
case BINARY:
case TIMESTAMP:
case LIST:
case MAP:
case STRUCT:
case UNION:
case DECIMAL: {
switch (_readType.getKind()) {
CASE_CREATE_FROM_DECIMAL_READER(BOOLEAN, Boolean)
Expand All @@ -951,6 +1058,11 @@ namespace orc {
CASE_CREATE_FROM_DECIMAL_READER(LONG, Long)
CASE_CREATE_FROM_DECIMAL_READER(FLOAT, Float)
CASE_CREATE_FROM_DECIMAL_READER(DOUBLE, Double)
CASE_CREATE_FROM_DECIMAL_READER(STRING, String)
CASE_CREATE_FROM_DECIMAL_READER(CHAR, Char)
CASE_CREATE_FROM_DECIMAL_READER(VARCHAR, Varchar)
CASE_CREATE_FROM_DECIMAL_READER(TIMESTAMP, Timestamp)
CASE_CREATE_FROM_DECIMAL_READER(TIMESTAMP_INSTANT, Timestamp)
case DECIMAL: {
if (isDecimal64(fileType)) {
if (isDecimal64(_readType)) {
Expand All @@ -966,11 +1078,6 @@ namespace orc {
}
}
}
case STRING:
case CHAR:
case VARCHAR:
case TIMESTAMP:
case TIMESTAMP_INSTANT:
case BINARY:
case LIST:
case MAP:
Expand All @@ -980,6 +1087,13 @@ namespace orc {
CASE_EXCEPTION
}
}
case STRING:
case BINARY:
case TIMESTAMP:
case LIST:
case MAP:
case STRUCT:
case UNION:
case DATE:
case VARCHAR:
case CHAR:
Expand Down
3 changes: 2 additions & 1 deletion c++/src/SchemaEvolution.cc
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,8 @@ namespace orc {
break;
}
case DECIMAL: {
ret.isValid = ret.needConvert = isNumeric(readType);
ret.isValid = ret.needConvert =
isNumeric(readType) || isStringVariant(readType) || isTimestamp(readType);
break;
}
case STRING:
Expand Down
143 changes: 143 additions & 0 deletions c++/test/TestConvertColumnReader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
* limitations under the License.
*/

#include "Timezone.hh"
#include "orc/Type.hh"
#include "wrap/gtest-wrapper.h"

Expand Down Expand Up @@ -672,4 +673,146 @@ namespace orc {
}
}

TEST(ConvertColumnReader, TestConvertDecimalToTimestamp) {
constexpr int DEFAULT_MEM_STREAM_SIZE = 10 * 1024 * 1024;
constexpr int TEST_CASES = 1024;
std::string writerTimezoneName = "America/New_York";
std::string readerTimezoneName = "Australia/Sydney";
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
std::unique_ptr<Type> fileType(
Type::buildTypeFromString("struct<c1:decimal(14,4),c2:decimal(25,10)>"));
std::shared_ptr<Type> readType(
Type::buildTypeFromString("struct<c1:timestamp,c2:timestamp with local time zone>"));
WriterOptions options;
options.setUseTightNumericVector(true);
options.setTimezoneName(writerTimezoneName);
auto writer = createWriter(*fileType, &memStream, options);
auto batch = writer->createRowBatch(TEST_CASES);
auto structBatch = dynamic_cast<StructVectorBatch*>(batch.get());
auto& c1 = dynamic_cast<Decimal64VectorBatch&>(*structBatch->fields[0]);
auto& c2 = dynamic_cast<Decimal128VectorBatch&>(*structBatch->fields[1]);

auto convertToSeconds = [](const Timezone& writerTimezone, const std::string& date) {
tm timeStruct;
if (strptime(date.c_str(), "%Y-%m-%d %H:%M:%S", &timeStruct) == nullptr) {
throw TimezoneError("bad time " + date);
}
return writerTimezone.convertFromUTC(timegm(&timeStruct));
};

std::vector<std::string> timeStrings;
for (int i = 0; i < TEST_CASES; i++) {
int64_t year = 1960 + (i / 12);
int64_t month = i % 12 + 1;
int64_t day = 27;
std::string others = "23:45:56";
std::stringstream ss;
ss << year << "-";
ss << std::setfill('0') << std::setw(2) << month << "-" << day << " " << others;
timeStrings.push_back(ss.str());
}
std::vector<int64_t> ts[2];
for (auto& time : timeStrings) {
ts[0].emplace_back(convertToSeconds(getTimezoneByName("GMT"), time));
ts[1].emplace_back(convertToSeconds(getTimezoneByName(writerTimezoneName), time));
}
bool overflow = false;

for (int i = 0; i < TEST_CASES; i++) {
c1.values[i] = ts[0][i] * 10000 + 1234;
c2.values[i] = scaleUpInt128ByPowerOfTen(Int128(ts[1][i]), 10, overflow) +=
Int128("1234567895");
assert(!overflow);
}

structBatch->numElements = c1.numElements = c2.numElements = TEST_CASES;
structBatch->hasNulls = c1.hasNulls = c2.hasNulls = false;
writer->add(*batch);
writer->close();
auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength());
auto pool = getDefaultPool();
auto reader = createReader(*pool, std::move(inStream));
RowReaderOptions rowReaderOptions;
rowReaderOptions.setUseTightNumericVector(true);
rowReaderOptions.setReadType(readType);
rowReaderOptions.setTimezoneName(readerTimezoneName);
auto rowReader = reader->createRowReader(rowReaderOptions);
auto readBatch = rowReader->createRowBatch(TEST_CASES);
EXPECT_EQ(true, rowReader->next(*readBatch));

auto& readSturctBatch = dynamic_cast<StructVectorBatch&>(*readBatch);
auto& readC1 = dynamic_cast<TimestampVectorBatch&>(*readSturctBatch.fields[0]);
auto& readC2 = dynamic_cast<TimestampVectorBatch&>(*readSturctBatch.fields[1]);
for (int i = 0; i < TEST_CASES; i++) {
size_t idx = static_cast<size_t>(i);
EXPECT_TRUE(readC1.notNull[idx]) << i;
EXPECT_TRUE(readC2.notNull[idx]) << i;
EXPECT_EQ(getTimezoneByName(readerTimezoneName).convertToUTC(readC1.data[i]), ts[0][i]);
EXPECT_TRUE(readC1.nanoseconds[i] == 123400000);
EXPECT_EQ(readC2.data[i], ts[1][i]);
if (readC2.data[i] < 0) {
EXPECT_EQ(readC2.nanoseconds[i], 123456790) << timeStrings[i];
} else {
EXPECT_EQ(readC2.nanoseconds[i], 123456789) << timeStrings[i];
}
}
}

TEST(ConvertColumnReader, TestConvertDecimalToStringVariant) {
constexpr int DEFAULT_MEM_STREAM_SIZE = 10 * 1024 * 1024;
constexpr int TEST_CASES = 1024;
MemoryOutputStream memStream(DEFAULT_MEM_STREAM_SIZE);
std::unique_ptr<Type> fileType(
Type::buildTypeFromString("struct<c1:decimal(14,3),c2:decimal(14,3),c3:decimal(14,3)>"));
std::shared_ptr<Type> readType(
Type::buildTypeFromString("struct<c1:char(5),c2:varchar(5),c3:string>"));
WriterOptions options;
auto writer = createWriter(*fileType, &memStream, options);
auto batch = writer->createRowBatch(TEST_CASES);
auto structBatch = dynamic_cast<StructVectorBatch*>(batch.get());
auto& c1 = dynamic_cast<Decimal64VectorBatch&>(*structBatch->fields[0]);
auto& c2 = dynamic_cast<Decimal64VectorBatch&>(*structBatch->fields[1]);
auto& c3 = dynamic_cast<Decimal64VectorBatch&>(*structBatch->fields[2]);

for (int i = 0; i < TEST_CASES; i++) {
c1.values[i] = i * 1000 + 123;
c2.values[i] = i * 1000 + 456;
c3.values[i] = i * 1000 + 789;
}
structBatch->numElements = c1.numElements = c2.numElements = TEST_CASES;
structBatch->hasNulls = c1.hasNulls = c2.hasNulls = false;
writer->add(*batch);
writer->close();
auto inStream = std::make_unique<MemoryInputStream>(memStream.getData(), memStream.getLength());
auto pool = getDefaultPool();
auto reader = createReader(*pool, std::move(inStream));
RowReaderOptions rowReaderOptions;
rowReaderOptions.setUseTightNumericVector(true);
rowReaderOptions.setReadType(readType);
auto rowReader = reader->createRowReader(rowReaderOptions);
auto readBatch = rowReader->createRowBatch(TEST_CASES);
EXPECT_EQ(true, rowReader->next(*readBatch));

auto& readSturctBatch = dynamic_cast<StructVectorBatch&>(*readBatch);
auto& readC1 = dynamic_cast<StringVectorBatch&>(*readSturctBatch.fields[0]);
auto& readC2 = dynamic_cast<StringVectorBatch&>(*readSturctBatch.fields[1]);
auto& readC3 = dynamic_cast<StringVectorBatch&>(*readSturctBatch.fields[2]);
for (int i = 0; i < TEST_CASES; i++) {
if (i < 10) {
EXPECT_EQ(std::to_string(i) + ".123", std::string(readC1.data[i], readC1.length[i]));
EXPECT_EQ(std::to_string(i) + ".456", std::string(readC2.data[i], readC2.length[i]));
} else if (i >= 10 && i < 100) {
EXPECT_EQ(std::to_string(i) + ".12", std::string(readC1.data[i], readC1.length[i]));
EXPECT_EQ(std::to_string(i) + ".45", std::string(readC2.data[i], readC2.length[i]));
} else if (i >= 100 && i < 1000) {
EXPECT_EQ(std::to_string(i) + ".1", std::string(readC1.data[i], readC1.length[i]));
EXPECT_EQ(std::to_string(i) + ".4", std::string(readC2.data[i], readC2.length[i]));
} else {
EXPECT_EQ(std::to_string(i) + ".", std::string(readC1.data[i], readC1.length[i]));
EXPECT_EQ(std::to_string(i) + ".", std::string(readC2.data[i], readC2.length[i]));
}
EXPECT_EQ(std::to_string(i) + ".789", std::string(readC3.data[i], readC3.length[i]));
}
}

} // namespace orc
16 changes: 16 additions & 0 deletions c++/test/TestSchemaEvolution.cc
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,22 @@ namespace orc {
}
}

// conversion from decimal to string/char/varchar
for (size_t i = 12; i <= 13; i++) {
for (size_t j = 7; j <= 11; j++) {
canConvert[i][j] = true;
needConvert[i][j] = true;
}
}

// conversion from decimal to timestamp
for (size_t i = 12; i <= 13; i++) {
for (size_t j = 14; j <= 15; j++) {
canConvert[i][j] = true;
needConvert[i][j] = true;
}
}

for (size_t i = 0; i < typesSize; i++) {
for (size_t j = 0; j < typesSize; j++) {
testConvertReader(types[i], types[j], canConvert[i][j], needConvert[i][j]);
Expand Down

0 comments on commit f326f6a

Please sign in to comment.