Skip to content

Commit

Permalink
Merge branch 'minor/enhance-style-for-compressed' of github.com:maple…
Browse files Browse the repository at this point in the history
…FU/arrow into minor/enhance-style-for-compressed
  • Loading branch information
mapleFU committed Apr 11, 2024
2 parents 430742a + 582e9d4 commit 84b8ffd
Show file tree
Hide file tree
Showing 33 changed files with 814 additions and 383 deletions.
32 changes: 24 additions & 8 deletions cpp/src/arrow/io/compressed.cc
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ class CompressedInputStream::Impl {

// Read compressed data if necessary
Status EnsureCompressedData() {
int64_t compressed_avail = compressed_ ? compressed_->size() - compressed_pos_ : 0;
int64_t compressed_avail = compressed_buffer_available();
if (compressed_avail == 0) {
// Ensure compressed_ buffer is allocated with kChunkSize.
if (!supports_zero_copy_from_raw_) {
Expand Down Expand Up @@ -297,10 +297,14 @@ class CompressedInputStream::Impl {
return Status::OK();
}

// Decompress some data from the compressed_ buffer.
// Call this function only if the decompressed_ buffer is empty.
// Decompress some data from the compressed_ buffer into decompressor_.
// Call this function only if the decompressed_ buffer is fully consumed.
Status DecompressData() {
// compressed_buffer_available() could be 0 here because there might
// still be some decompressed data left to emit even though the compressed
// data was entirely consumed (especially if the expansion factor is large)
DCHECK_NE(compressed_->data(), nullptr);
DCHECK_EQ(0, decompressed_buffer_available());

int64_t decompress_size = kDecompressSize;

Expand Down Expand Up @@ -353,7 +357,8 @@ class CompressedInputStream::Impl {

// Try to feed more data into the decompressed_ buffer.
Status RefillDecompressed(bool* has_data) {
// First try to read data from the decompressor
// First try to read data from the decompressor, unless we haven't read any
// compressed data yet.
if (compressed_ && compressed_->size() != 0) {
if (decompressor_->IsFinished()) {
// We just went over the end of a previous compressed stream.
Expand All @@ -362,10 +367,12 @@ class CompressedInputStream::Impl {
}
RETURN_NOT_OK(DecompressData());
}
if (!decompressed_ || decompressed_->size() == 0) {
// Got nothing, need to read more compressed data
int64_t decompress_avail = decompressed_buffer_available();
if (decompress_avail == 0) {
// Got nothing from existing `compressed_` and `decompressor_`,
// need to read more compressed data.
RETURN_NOT_OK(EnsureCompressedData());
if (compressed_pos_ == compressed_->size()) {
if (compressed_buffer_available() == 0) {
// No more data to decompress
if (!fresh_decompressor_ && !decompressor_->IsFinished()) {
return Status::IOError("Truncated compressed stream");
Expand Down Expand Up @@ -405,13 +412,22 @@ class CompressedInputStream::Impl {
ARROW_ASSIGN_OR_RAISE(auto buf, AllocateResizableBuffer(nbytes, pool_));
ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, Read(nbytes, buf->mutable_data()));
RETURN_NOT_OK(buf->Resize(bytes_read));
// Using std::move because the some compiler might has issue below:
// Using std::move because some compiler might has issue below:
// https://wg21.cmeerw.net/cwg/issue1579
return std::move(buf);
}

const std::shared_ptr<InputStream>& raw() const { return raw_; }

private:
int64_t compressed_buffer_available() const {
return compressed_ ? compressed_->size() - compressed_pos_ : 0;
}

int64_t decompressed_buffer_available() const {
return decompressed_ ? decompressed_->size() - decompressed_pos_ : 0;
}

private:
// Read 64 KB compressed data at a time
static const int64_t kChunkSize = 64 * 1024;
Expand Down
30 changes: 4 additions & 26 deletions cpp/src/arrow/ipc/json_simple_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,21 +189,6 @@ class TestIntegers : public ::testing::Test {

TYPED_TEST_SUITE_P(TestIntegers);

template <typename DataType>
std::vector<typename DataType::c_type> TestIntegersMutateIfNeeded(
std::vector<typename DataType::c_type> data) {
return data;
}

// TODO: This works, but is it the right way to do this?
template <>
std::vector<HalfFloatType::c_type> TestIntegersMutateIfNeeded<HalfFloatType>(
std::vector<HalfFloatType::c_type> data) {
std::for_each(data.begin(), data.end(),
[](HalfFloatType::c_type& value) { value = Float16(value).bits(); });
return data;
}

TYPED_TEST_P(TestIntegers, Basics) {
using T = TypeParam;
using c_type = typename T::c_type;
Expand All @@ -212,17 +197,16 @@ TYPED_TEST_P(TestIntegers, Basics) {
auto type = this->type();

AssertJSONArray<T>(type, "[]", {});
AssertJSONArray<T>(type, "[4, 0, 5]", TestIntegersMutateIfNeeded<T>({4, 0, 5}));
AssertJSONArray<T>(type, "[4, null, 5]", {true, false, true},
TestIntegersMutateIfNeeded<T>({4, 0, 5}));
AssertJSONArray<T>(type, "[4, 0, 5]", {4, 0, 5});
AssertJSONArray<T>(type, "[4, null, 5]", {true, false, true}, {4, 0, 5});

// Test limits
const auto min_val = std::numeric_limits<c_type>::min();
const auto max_val = std::numeric_limits<c_type>::max();
std::string json_string = JSONArray(0, 1, min_val);
AssertJSONArray<T>(type, json_string, TestIntegersMutateIfNeeded<T>({0, 1, min_val}));
AssertJSONArray<T>(type, json_string, {0, 1, min_val});
json_string = JSONArray(0, 1, max_val);
AssertJSONArray<T>(type, json_string, TestIntegersMutateIfNeeded<T>({0, 1, max_val}));
AssertJSONArray<T>(type, json_string, {0, 1, max_val});
}

TYPED_TEST_P(TestIntegers, Errors) {
Expand Down Expand Up @@ -289,12 +273,6 @@ INSTANTIATE_TYPED_TEST_SUITE_P(TestUInt8, TestIntegers, UInt8Type);
INSTANTIATE_TYPED_TEST_SUITE_P(TestUInt16, TestIntegers, UInt16Type);
INSTANTIATE_TYPED_TEST_SUITE_P(TestUInt32, TestIntegers, UInt32Type);
INSTANTIATE_TYPED_TEST_SUITE_P(TestUInt64, TestIntegers, UInt64Type);
// FIXME: I understand that HalfFloatType is backed by a uint16_t, but does it
// make sense to run this test over it?
// The way ConvertNumber for HalfFloatType is currently written, it allows the
// conversion of floating point notation to a half float, which causes failures
// in this test, one example is asserting 0.0 cannot be parsed as a half float.
// INSTANTIATE_TYPED_TEST_SUITE_P(TestHalfFloat, TestIntegers, HalfFloatType);

template <typename T>
class TestStrings : public ::testing::Test {
Expand Down
28 changes: 28 additions & 0 deletions cpp/src/arrow/util/formatting_util_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
#include "arrow/testing/gtest_util.h"
#include "arrow/type.h"
#include "arrow/util/decimal.h"
#include "arrow/util/float16.h"
#include "arrow/util/formatting.h"

namespace arrow {

using internal::StringFormatter;
using util::Float16;

class StringAppender {
public:
Expand Down Expand Up @@ -280,6 +282,32 @@ TEST(Formatting, Double) {
AssertFormatting(formatter, -HUGE_VAL, "-inf");
}

TEST(Formatting, HalfFloat) {
StringFormatter<HalfFloatType> formatter;

AssertFormatting(formatter, Float16(0.0f).bits(), "0");
AssertFormatting(formatter, Float16(-0.0f).bits(), "-0");
AssertFormatting(formatter, Float16(1.5f).bits(), "1.5");

// Slightly adapted from values present here
// https://blogs.mathworks.com/cleve/2017/05/08/half-precision-16-bit-floating-point-arithmetic/
AssertFormatting(formatter, 0x3c00, "1");
AssertFormatting(formatter, 0x3c01, "1.0009765625");
AssertFormatting(formatter, 0x0400, "0.00006103515625");
AssertFormatting(formatter, 0x0001, "5.960464477539063e-8");

// Can't avoid loss of precision here.
AssertFormatting(formatter, Float16(1234.567f).bits(), "1235");
AssertFormatting(formatter, Float16(1e3f).bits(), "1000");
AssertFormatting(formatter, Float16(1e4f).bits(), "10000");
AssertFormatting(formatter, Float16(1e10f).bits(), "inf");
AssertFormatting(formatter, Float16(1e15f).bits(), "inf");

AssertFormatting(formatter, 0xffff, "nan");
AssertFormatting(formatter, 0x7c00, "inf");
AssertFormatting(formatter, 0xfc00, "-inf");
}

template <typename T>
void TestDecimalFormatter() {
struct TestParam {
Expand Down
72 changes: 57 additions & 15 deletions cpp/src/arrow/util/macros.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,41 +36,83 @@
TypeName& operator=(TypeName&&) = default
#endif

#define ARROW_UNUSED(x) (void)(x)
#define ARROW_ARG_UNUSED(x)
// With ARROW_PREDICT_FALSE, GCC and clang can be told that a certain branch is
// not likely to be taken (for instance, a CHECK failure), and use that information in
// static analysis. Giving the compiler this information can affect the generated code
// layout in the absence of better information (i.e. -fprofile-arcs). [1] explains how
// this feature can be used to improve code generation. It was written as a positive
// comment to a negative article about the use of these annotations.
//
// GCC can be told that a certain branch is not likely to be taken (for
// instance, a CHECK failure), and use that information in static analysis.
// Giving it this information can help it optimize for the common case in
// the absence of better information (ie. -fprofile-arcs).
// ARROW_COMPILER_ASSUME allows the compiler to assume that a given expression is
// true, without evaluating it, and to optimise based on this assumption [2]. If this
// condition is violated at runtime, the behavior is undefined. This can be useful to
// generate both faster and smaller code in compute kernels.
//
#if defined(__GNUC__)
#define ARROW_PREDICT_FALSE(x) (__builtin_expect(!!(x), 0))
#define ARROW_PREDICT_TRUE(x) (__builtin_expect(!!(x), 1))
// IMPORTANT: Different optimisers are likely to react differently to this annotation!
// It should be used with care when we can prove by some means that the assumption
// is (1) guaranteed to always hold and (2) is useful for optimization [3]. If the
// assumption is pessimistic, it might even block the compiler from decisions that
// could lead to better code [4]. If you have a good intuition for what the compiler
// can do with assumptions [5], you can use this macro to guide it and end up with
// results you would only get with more complex code transformations.
// `clang -S -emit-llvm` can be used to check how the generated code changes with
// your specific use of this macro.
//
// [1] https://lobste.rs/s/uwgtkt/don_t_use_likely_unlikely_attributes#c_xi3wmc
// [2] "Portable assumptions"
// https://www.open-std.org/jtc1/sc22/wg21/docs/papers/2021/p1774r4.pdf
// [3] "Assertions Are Pessimistic, Assumptions Are Optimistic"
// https://blog.regehr.org/archives/1096
// [4] https://discourse.llvm.org/t/llvm-assume-blocks-optimization/71609
// [5] J. Doerfert et al. 2019. "Performance Exploration Through Optimistic Static
// Program Annotations". https://github.com/jdoerfert/PETOSPA/blob/master/ISC19.pdf
#define ARROW_UNUSED(x) (void)(x)
#define ARROW_ARG_UNUSED(x)
#if defined(__GNUC__) // GCC and compatible compilers (clang, Intel ICC)
#define ARROW_NORETURN __attribute__((noreturn))
#define ARROW_NOINLINE __attribute__((noinline))
#define ARROW_FORCE_INLINE __attribute__((always_inline))
#define ARROW_PREDICT_FALSE(x) (__builtin_expect(!!(x), 0))
#define ARROW_PREDICT_TRUE(x) (__builtin_expect(!!(x), 1))
#define ARROW_PREFETCH(addr) __builtin_prefetch(addr)
#elif defined(_MSC_VER)
#define ARROW_RESTRICT __restrict
#if defined(__clang__) // clang-specific
#define ARROW_COMPILER_ASSUME(expr) __builtin_assume(expr)
#else // GCC-specific
#if __GNUC__ >= 13
#define ARROW_COMPILER_ASSUME(expr) __attribute__((assume(expr)))
#else
// GCC does not have a built-in assume intrinsic before GCC 13, so we use an
// if statement and __builtin_unreachable() to achieve the same effect [2].
// Unlike clang's __builtin_assume and C++23's [[assume(expr)]], using this
// on GCC won't warn about side-effects in the expression, so make sure expr
// is side-effect free when working with GCC versions before 13 (Jan-2024),
// otherwise clang/MSVC builds will fail in CI.
#define ARROW_COMPILER_ASSUME(expr) \
if (expr) { \
} else { \
__builtin_unreachable(); \
}
#endif // __GNUC__ >= 13
#endif
#elif defined(_MSC_VER) // MSVC
#define ARROW_NORETURN __declspec(noreturn)
#define ARROW_NOINLINE __declspec(noinline)
#define ARROW_FORCE_INLINE __declspec(forceinline)
#define ARROW_PREDICT_FALSE(x) (x)
#define ARROW_PREDICT_TRUE(x) (x)
#define ARROW_PREFETCH(addr)
#define ARROW_RESTRICT __restrict
#define ARROW_COMPILER_ASSUME(expr) __assume(expr)
#else
#define ARROW_NORETURN
#define ARROW_NOINLINE
#define ARROW_FORCE_INLINE
#define ARROW_PREDICT_FALSE(x) (x)
#define ARROW_PREDICT_TRUE(x) (x)
#define ARROW_PREFETCH(addr)
#endif

#if defined(__GNUC__) || defined(__clang__) || defined(_MSC_VER)
#define ARROW_RESTRICT __restrict
#else
#define ARROW_RESTRICT
#define ARROW_COMPILER_ASSUME(expr)
#endif

// ----------------------------------------------------------------------
Expand Down
36 changes: 36 additions & 0 deletions cpp/src/arrow/util/value_parsing_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@

#include "arrow/testing/gtest_util.h"
#include "arrow/type.h"
#include "arrow/util/float16.h"
#include "arrow/util/value_parsing.h"

namespace arrow {

using util::Float16;

namespace internal {

template <typename T>
Expand Down Expand Up @@ -152,6 +156,25 @@ TEST(StringConversion, ToDouble) {
AssertConversionFails(&converter, "1.5");
}

TEST(StringConversion, ToHalfFloat) {
AssertConversion<HalfFloatType>("1.5", Float16(1.5f).bits());
AssertConversion<HalfFloatType>("0", Float16(0.0f).bits());
AssertConversion<HalfFloatType>("-0.0", Float16(-0.0f).bits());
AssertConversion<HalfFloatType>("-1e15", Float16(-1e15).bits());
AssertConversion<HalfFloatType>("+Infinity", 0x7c00);
AssertConversion<HalfFloatType>("-Infinity", 0xfc00);
AssertConversion<HalfFloatType>("Infinity", 0x7c00);

AssertConversionFails<HalfFloatType>("");
AssertConversionFails<HalfFloatType>("e");
AssertConversionFails<HalfFloatType>("1,5");

StringConverter<HalfFloatType> converter(/*decimal_point=*/',');
AssertConversion(&converter, "1,5", Float16(1.5f).bits());
AssertConversion(&converter, "0", Float16(0.0f).bits());
AssertConversionFails(&converter, "1.5");
}

#if !defined(_WIN32) || defined(NDEBUG)

TEST(StringConversion, ToFloatLocale) {
Expand Down Expand Up @@ -180,6 +203,19 @@ TEST(StringConversion, ToDoubleLocale) {
AssertConversionFails(&converter, "1,5");
}

TEST(StringConversion, ToHalfFloatLocale) {
// French locale uses the comma as decimal point
LocaleGuard locale_guard("fr_FR.UTF-8");

AssertConversion<HalfFloatType>("1.5", Float16(1.5).bits());
AssertConversionFails<HalfFloatType>("1,5");

StringConverter<HalfFloatType> converter(/*decimal_point=*/'#');
AssertConversion(&converter, "1#5", Float16(1.5).bits());
AssertConversionFails(&converter, "1.5");
AssertConversionFails(&converter, "1,5");
}

#endif // _WIN32

TEST(StringConversion, ToInt8) {
Expand Down
15 changes: 9 additions & 6 deletions cpp/src/parquet/encoding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1192,10 +1192,12 @@ int PlainBooleanDecoder::DecodeArrow(
int values_decoded = num_values - null_count;
if (ARROW_PREDICT_FALSE(num_values_ < values_decoded)) {
// A too large `num_values` was requested.
ParquetException::EofException();
ParquetException::EofException(
"A too large `num_values` was requested in PlainBooleanDecoder: remain " +
std::to_string(num_values_) + ", requested: " + std::to_string(values_decoded));
}
if (ARROW_PREDICT_FALSE(!bit_reader_->Advance(values_decoded))) {
ParquetException::EofException();
ParquetException::EofException("PlainDecoder doesn't have enough values in page");
}

if (null_count == 0) {
Expand All @@ -1208,7 +1210,7 @@ int PlainBooleanDecoder::DecodeArrow(
BitBlockCounter bit_counter(valid_bits, valid_bits_offset, num_values);
int64_t value_position = 0;
int64_t valid_bits_offset_position = valid_bits_offset;
int64_t previous_value_offset = 0;
int64_t previous_value_offset = total_num_values_ - num_values_;
while (value_position < num_values) {
auto block = bit_counter.NextWord();
if (block.AllSet()) {
Expand All @@ -1224,8 +1226,7 @@ int PlainBooleanDecoder::DecodeArrow(
} else {
for (int64_t i = 0; i < block.length; ++i) {
if (bit_util::GetBit(valid_bits, valid_bits_offset_position + i)) {
bool value = bit_util::GetBit(
data_, total_num_values_ - num_values_ + previous_value_offset);
bool value = bit_util::GetBit(data_, previous_value_offset);
builder->UnsafeAppend(value);
previous_value_offset += 1;
} else {
Expand Down Expand Up @@ -3177,7 +3178,9 @@ class RleBooleanDecoder : public DecoderImpl, virtual public BooleanDecoder {
PARQUET_THROW_NOT_OK(
out->AppendValues(values.begin(), values.begin() + current_batch_size));
num_values -= current_batch_size;
current_index_in_batch = 0;
// set current_index_in_batch to current_batch_size means
// the whole batch is totally consumed.
current_index_in_batch = current_batch_size;
} while (num_values > 0);
return num_non_null_values;
}
Expand Down
Loading

0 comments on commit 84b8ffd

Please sign in to comment.