diff --git a/cpp/arcae/base_column_map.h b/cpp/arcae/base_column_map.h index a5789d82..cabbee6d 100644 --- a/cpp/arcae/base_column_map.h +++ b/cpp/arcae/base_column_map.h @@ -28,7 +28,7 @@ enum MapOrder {C_ORDER=0, F_ORDER}; // A return of < 0 indicates a non-existent selection std::ptrdiff_t SelectDim(std::size_t dim, std::size_t sdims, std::size_t ndims); -using RowIds = std::vector; +using RowIds = absl::Span; using ColumnSelection = std::vector; // Describes a mapping between disk and memory diff --git a/cpp/arcae/safe_table_proxy.cc b/cpp/arcae/safe_table_proxy.cc index cb68ec05..3c203e14 100644 --- a/cpp/arcae/safe_table_proxy.cc +++ b/cpp/arcae/safe_table_proxy.cc @@ -7,19 +7,18 @@ #include #include "arcae/safe_table_proxy.h" +#include "arcae/column_convert_visitor.h" +#include "arcae/column_read_map.h" +#include "arcae/column_read_visitor.h" +#include "arrow/result.h" -using ::arrow::DataType; -using ::arrow::Buffer; -using ::arrow::Future; using ::arrow::Result; using ::arrow::Status; using ::casacore::Record; -using ::casacore::Table; using ::casacore::TableColumn; using ::casacore::TableIterProxy; -using ::casacore::TableLock; using ::casacore::TableProxy; namespace arcae { @@ -102,6 +101,27 @@ SafeTableProxy::GetColumn(const std::string & column, casacore::uInt startrow, c }); } +arrow::Result> +SafeTableProxy::GetColumn2(const std::string & column, const ColumnSelection & selection) const { + ARROW_RETURN_NOT_OK(FailIfClosed()); + + return run_isolated([this, &column, &selection]() -> arrow::Result> { + auto & casa_table = this->table_proxy->table(); + + if(!casa_table.tableDesc().isColumn(column)) { + return arrow::Status::UnknownError(column, " does not exist"); + } + + auto table_column = TableColumn(casa_table, column); + const auto & column_desc = table_column.columnDesc(); + ARROW_ASSIGN_OR_RAISE(auto map, ColumnReadMap::Make(table_column, selection)); + auto visitor = ColumnReadVisitor(map); + ARROW_RETURN_NOT_OK(visitor.Visit(column_desc.dataType())); + return std::move(visitor.array_); + }); +} + + arrow::Result> SafeTableProxy::ToArrow(casacore::uInt startrow, casacore::uInt nrow, const std::vector & columns) const { ARROW_RETURN_NOT_OK(FailIfClosed()); diff --git a/cpp/arcae/safe_table_proxy.h b/cpp/arcae/safe_table_proxy.h index 89cd5b8d..12593ef6 100644 --- a/cpp/arcae/safe_table_proxy.h +++ b/cpp/arcae/safe_table_proxy.h @@ -8,10 +8,10 @@ #include #include +#include #include -#include "arcae/column_convert_visitor.h" - +#include "arcae/base_column_map.h" namespace arcae { @@ -109,6 +109,11 @@ class SafeTableProxy { casacore::uInt startrow, casacore::uInt nrow) const; + arrow::Result> GetColumn2( + const std::string & column, + const ColumnSelection & selection) const; + + arrow::Result GetTableDescriptor() const; arrow::Result GetColumnDescriptor(const std::string & column) const; diff --git a/cpp/tests/column_map_test.cc b/cpp/tests/column_map_test.cc index 3c6a5c4d..1d67198f 100644 --- a/cpp/tests/column_map_test.cc +++ b/cpp/tests/column_map_test.cc @@ -3,6 +3,8 @@ #include #include + +#include #include #include #include @@ -186,7 +188,7 @@ TEST_F(ColumnConvertTest, SelectFromRange) { { // Select some rows from the VAR_DATA column auto data_column = GetArrayColumn(proxy.table(), "VAR_DATA"); - auto row_ids = arcae::RowIds{0, 1, 2, 3, 6, 7, 8, 9}; + auto row_ids = std::vector{0, 1, 2, 3, 6, 7, 8, 9}; ASSERT_OK_AND_ASSIGN(auto map, ColumnReadMap::Make(data_column, arcae::ColumnSelection{{row_ids}})); ASSERT_TRUE(map.shape_provider_.IsVarying()); @@ -224,8 +226,8 @@ TEST_F(ColumnConvertTest, SelectFromRange) { { // Select some rows and a channel from the VAR_DATA column auto data_column = GetArrayColumn(proxy.table(), "VAR_DATA"); - auto row_ids = arcae::RowIds{0, 1, 2, 3, 6, 7, 8, 9}; - auto chan_ids = arcae::RowIds{0}; + auto row_ids = std::vector{0, 1, 2, 3, 6, 7, 8, 9}; + auto chan_ids = std::vector{0}; ASSERT_OK_AND_ASSIGN(auto map, ColumnReadMap::Make(data_column, arcae::ColumnSelection{{row_ids, chan_ids}})); ASSERT_TRUE(map.shape_provider_.IsVarying()); @@ -264,8 +266,8 @@ TEST_F(ColumnConvertTest, SelectFromRange) { { // Select some rows and a corr from the VAR_DATA column auto data_column = GetArrayColumn(proxy.table(), "VAR_DATA"); - auto row_ids = arcae::RowIds{0, 1, 2, 3, 6, 7, 8, 9}; - auto corr_ids = arcae::RowIds{0}; + auto row_ids = std::vector{0, 1, 2, 3, 6, 7, 8, 9}; + auto corr_ids = std::vector{0}; ASSERT_OK_AND_ASSIGN(auto map, ColumnReadMap::Make(data_column, arcae::ColumnSelection{{row_ids, {}, corr_ids}})); ASSERT_TRUE(map.shape_provider_.IsVarying()); @@ -303,7 +305,7 @@ TEST_F(ColumnConvertTest, SelectFromRange) { { // Select some rows a channel and a correlation in the VAR_DATA column auto data_column = GetArrayColumn(proxy.table(), "VAR_DATA"); - auto row_ids = arcae::RowIds{0, 1, 2, 3, 6, 7, 8, 9}; + auto row_ids = std::vector{0, 1, 2, 3, 6, 7, 8, 9}; auto selection = arcae::ColumnSelection{row_ids, {0}, {0}}; ASSERT_OK_AND_ASSIGN(auto map, ColumnReadMap::Make(data_column, selection)); diff --git a/src/arcae/CMakeLists.txt b/src/arcae/CMakeLists.txt index c51150a3..ff841c26 100644 --- a/src/arcae/CMakeLists.txt +++ b/src/arcae/CMakeLists.txt @@ -1,4 +1,5 @@ find_package(Python COMPONENTS Interpreter Development.Module REQUIRED) +find_package(absl CONFIG REQUIRED) find_package(PkgConfig REQUIRED) find_program(CYTHON_FOUND "cython" REQUIRED) @@ -17,6 +18,7 @@ target_link_libraries(arrow_tables PUBLIC arcae PkgConfig::casacore) target_link_directories(arrow_tables PUBLIC ${PYARROW_LIBDIRS}) target_include_directories(arrow_tables PUBLIC PkgConfig::casacore + absl::span ${PYARROW_INCLUDE} ${NUMPY_INCLUDE} ${CMAKE_SOURCE_DIR}/cpp) diff --git a/src/arcae/arrow_tables.pxd b/src/arcae/arrow_tables.pxd index a1ae0db1..fee0f4d9 100644 --- a/src/arcae/arrow_tables.pxd +++ b/src/arcae/arrow_tables.pxd @@ -12,6 +12,20 @@ from pyarrow.includes.libarrow cimport * cdef extern from "" nogil: cdef unsigned int UINT_MAX +cdef extern from "" namespace "casacore" nogil: + ctypedef unsigned long long uInt64 + ctypedef uInt64 rownr_t + +cdef extern from "" namespace "absl" nogil: + cdef cppclass Span[T]: + Span() except + + Span(T * array, size_t length) except + + Span(Span&) except + + size() + +cdef extern from "arcae/base_column_map.h" namespace "arcae" nogil: + ctypedef Span[rownr_t] RowIds + ctypedef vector[RowIds] ColumnSelection cdef extern from "arcae/service_locator.h" namespace "arcae" nogil: cdef cppclass CServiceLocator" arcae::ServiceLocator": @@ -36,6 +50,7 @@ cdef extern from "arcae/safe_table_proxy.h" namespace "arcae" nogil: CResult[shared_ptr[CTable]] ToArrow " SafeTableProxy::ToArrow"(unsigned int startrow, unsigned int nrow, const vector[string] & columns) CResult[shared_ptr[CArray]] GetColumn " SafeTableProxy::GetColumn"(const string & column, unsigned int startrow, unsigned int nrow) + CResult[shared_ptr[CArray]] GetColumn2 " SafeTableProxy::GetColumn2"(const string & column, const ColumnSelection & selection) CResult[string] GetTableDescriptor " SafeTableProxy::GetTableDescriptor"() CResult[string] GetColumnDescriptor "SafeTableProxy::GetColumnDescriptor"(const string & column) CResult[unsigned int] nRow " SafeTableProxy::nRow"() diff --git a/src/arcae/arrow_tables.pyx b/src/arcae/arrow_tables.pyx index aa6f3533..a91bf0cc 100644 --- a/src/arcae/arrow_tables.pyx +++ b/src/arcae/arrow_tables.pyx @@ -2,6 +2,7 @@ # cython: language_level = 3 from collections.abc import Iterable, MutableMapping +import ctypes import cython import json from typing import Optional @@ -10,10 +11,14 @@ from libcpp cimport bool from libcpp.map cimport map from libcpp.memory cimport dynamic_pointer_cast, shared_ptr from libcpp.string cimport string +from libcpp.utility cimport move from libcpp.vector cimport vector import numpy as np import pyarrow as pa + +cimport numpy as cnp + from pyarrow.includes.common cimport * from pyarrow.includes.libarrow cimport * @@ -44,6 +49,10 @@ from arcae.arrow_tables cimport (CCasaTable, CTaql, complex64, complex128, + ColumnSelection, + RowIds, + Span, + rownr_t, UINT_MAX) def ms_descriptor(table: str, complete: bool = False) -> dict: @@ -91,6 +100,49 @@ cdef class ComplexFloatType(ComplexType): cdef class ComplexFloatArray(ExtensionArray): pass + +cdef class SelectionObj: + cdef vector[vector[rownr_t]] vec_store + cdef ColumnSelection selection + + def __init__(self, index: tuple = None): + cdef rownr_t[::1] dim_memview + + if index is None: + pass + elif isinstance(index, (tuple, list)): + for d, dim_index in enumerate(index): + if dim_index is None: + self.selection.push_back(RowIds()) + elif isinstance(dim_index, slice): + # Convert a slice object into a vector, and then a span of rownr_t + if dim_index.step is not None and dim_index.step != 1: + raise ValueError(f"slice step {dim_index.step} is not 1") + + vec_index = vector[rownr_t](dim_index.stop - dim_index.start, 0) + + for i, r in enumerate(range(dim_index.start, dim_index.stop)): + vec_index[i] = r + + span = RowIds(&vec_index[0], vec_index.size()) + self.vec_store.push_back(move(vec_index)) + self.selection.push_back(move(span)) + elif isinstance(dim_index, np.ndarray): + if dim_index.ndim != 1: + raise ValueError(f"Multi-dimensional ndarray received as index " + f"in dimension {d}") + # Cast to uint64 if necessary + dim_memview = np.require(dim_index, dtype=np.uint64, requirements=["C"]) + span = RowIds(&dim_memview[0], dim_memview.shape[0]) + self.selection.push_back(move(span)) + else: + raise TypeError(f"Invalid index type {type(dim_index)} " + f"for dimension {d}") + else: + raise TypeError(f"index must be a tuple of " + f"(None, slices or numpy arrays), or None") + + # Create a Cython extension type around the CCasaTable C++ instance cdef class Table: cdef shared_ptr[CCasaTable] c_table @@ -147,7 +199,57 @@ cdef class Table: return pyarrow_wrap_table(ctable) - def getcol(self, column: str, unsigned int startrow=0, unsigned int nrow=UINT_MAX): + def getcol2(self, column: str, index: tuple = None) -> np.ndarray: + cdef string cpp_column = tobytes(column) + cdef SelectionObj selobj = SelectionObj(index) + + carray = GetResultValue(self.c_table.get().GetColumn2(cpp_column, selobj.selection)) + py_column = pyarrow_wrap_array(carray) + + if isinstance(py_column, (pa.ListArray, pa.LargeListArray)): + raise TypeError(f"Can't convert variably shaped column {column} to numpy array") + + if isinstance(py_column, pa.NumericArray): + return py_column.to_numpy(zero_copy_only=True) + + if isinstance(py_column, pa.StringArray): + return py_column.to_numpy(zero_copy_only=False) + + if isinstance(py_column, pa.FixedSizeListArray): + shape = [len(py_column)] + nested_column = py_column + zero_copy_only = True + + while True: + if pa.types.is_primitive(nested_column.type): + break + elif isinstance(nested_column, pa.StringArray): + zero_copy_only = False + break + elif isinstance(nested_column, pa.FixedSizeListArray): + shape.append(nested_column.type.list_size) + nested_column = nested_column.flatten() + else: + raise TypeError(f"Encountered invalid type {nested_column.type} " + f"when converting column {column} from a " + f"nested FixedSizeListArray to a numpy array. " + f"Only FixedSizeListArrays or PrimitiveArrays " + f"are supported") + + nested_column = nested_column.to_numpy(zero_copy_only=zero_copy_only) + array = nested_column.reshape(tuple(shape)) + + # Convert to complex if necessary + if "COMPLEX" in self.getcoldesc(column)["valueType"].upper(): + complex_dtype = np.result_type(array.dtype, np.complex64) + array = array.view(complex_dtype)[..., 0] + + return array + + raise TypeError(f"Unhandled column type {py_column.type}") + + + def getcol(self, column: str, unsigned int startrow=0, unsigned int nrow=UINT_MAX) -> np.ndarray: cdef: shared_ptr[CArray] carray string cpp_column = tobytes(column) diff --git a/src/arcae/tests/test_pytable.py b/src/arcae/tests/test_pytable.py index 2a4c06ac..d4b755ab 100644 --- a/src/arcae/tests/test_pytable.py +++ b/src/arcae/tests/test_pytable.py @@ -148,6 +148,25 @@ def test_getcol(getcol_table): with pytest.raises(pa.lib.ArrowException, match="NONEXISTENT does not exist"): T.getcol("NONEXISTENT") +def test_getcol2(getcol_table): + T = arcae.table(getcol_table) + + assert_array_equal(T.getcol2("TIME"), [0, 1, 2]) + assert_array_equal(T.getcol2("TIME", (slice(0, 2),)), [0, 1]) + assert_array_equal(T.getcol2("TIME", (np.array([0, 1]),)), [0, 1]) + assert_array_equal(T.getcol2("TIME", (np.array([0, 2]),)), [0, 2]) + assert_array_equal(T.getcol2("STRING"), ["0", "1", "2"]) + + assert_array_equal(T.getcol2("FLOAT_DATA"), [ + [[0, 0, 0, 0], [0, 0, 0, 0]], + [[1, 1, 1, 1], [1, 1, 1, 1]], + [[2, 2, 2, 2], [2, 2, 2, 2]]]) + + assert_array_equal(T.getcol2("FLOAT_DATA", (slice(0, 2), slice(0, 2))), [ + [[0, 0, 0, 0], [0, 0, 0, 0]], + [[1, 1, 1, 1], [1, 1, 1, 1]]]) + + def test_partial_read(sorting_table): """ Tests that partial reads work """ T = arcae.table(sorting_table)