Skip to content

Commit

Permalink
Python bindings provisionally working
Browse files Browse the repository at this point in the history
  • Loading branch information
sjperkins committed Jan 31, 2024
1 parent 78a826a commit c88c6a4
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 15 deletions.
2 changes: 1 addition & 1 deletion cpp/arcae/base_column_map.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ enum MapOrder {C_ORDER=0, F_ORDER};
// A return of < 0 indicates a non-existent selection
std::ptrdiff_t SelectDim(std::size_t dim, std::size_t sdims, std::size_t ndims);

using RowIds = std::vector<casacore::rownr_t>;
using RowIds = absl::Span<const casacore::rownr_t>;
using ColumnSelection = std::vector<RowIds>;

// Describes a mapping between disk and memory
Expand Down
30 changes: 25 additions & 5 deletions cpp/arcae/safe_table_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,18 @@
#include <casacore/tables/Tables/TableIterProxy.h>

#include "arcae/safe_table_proxy.h"
#include "arcae/column_convert_visitor.h"
#include "arcae/column_read_map.h"
#include "arcae/column_read_visitor.h"
#include "arrow/result.h"


using ::arrow::DataType;
using ::arrow::Buffer;
using ::arrow::Future;
using ::arrow::Result;
using ::arrow::Status;

using ::casacore::Record;
using ::casacore::Table;
using ::casacore::TableColumn;
using ::casacore::TableIterProxy;
using ::casacore::TableLock;
using ::casacore::TableProxy;

namespace arcae {
Expand Down Expand Up @@ -102,6 +101,27 @@ SafeTableProxy::GetColumn(const std::string & column, casacore::uInt startrow, c
});
}

arrow::Result<std::shared_ptr<arrow::Array>>
SafeTableProxy::GetColumn2(const std::string & column, const ColumnSelection & selection) const {
ARROW_RETURN_NOT_OK(FailIfClosed());

return run_isolated([this, &column, &selection]() -> arrow::Result<std::shared_ptr<arrow::Array>> {
auto & casa_table = this->table_proxy->table();

if(!casa_table.tableDesc().isColumn(column)) {
return arrow::Status::UnknownError(column, " does not exist");
}

auto table_column = TableColumn(casa_table, column);
const auto & column_desc = table_column.columnDesc();
ARROW_ASSIGN_OR_RAISE(auto map, ColumnReadMap::Make(table_column, selection));
auto visitor = ColumnReadVisitor(map);
ARROW_RETURN_NOT_OK(visitor.Visit(column_desc.dataType()));
return std::move(visitor.array_);
});
}


arrow::Result<std::shared_ptr<arrow::Table>>
SafeTableProxy::ToArrow(casacore::uInt startrow, casacore::uInt nrow, const std::vector<std::string> & columns) const {
ARROW_RETURN_NOT_OK(FailIfClosed());
Expand Down
9 changes: 7 additions & 2 deletions cpp/arcae/safe_table_proxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@
#include <casacore/tables/Tables.h>
#include <casacore/tables/Tables/TableProxy.h>

#include <arrow/util/logging.h>
#include <arrow/util/thread_pool.h>

#include "arcae/column_convert_visitor.h"

#include "arcae/base_column_map.h"

namespace arcae {

Expand Down Expand Up @@ -109,6 +109,11 @@ class SafeTableProxy {
casacore::uInt startrow,
casacore::uInt nrow) const;

arrow::Result<std::shared_ptr<arrow::Array>> GetColumn2(
const std::string & column,
const ColumnSelection & selection) const;


arrow::Result<std::string> GetTableDescriptor() const;
arrow::Result<std::string> GetColumnDescriptor(const std::string & column) const;

Expand Down
14 changes: 8 additions & 6 deletions cpp/tests/column_map_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#include <sys/types.h>

#include <arrow/api.h>

#include <casacore/casa/aipsxtype.h>
#include <casacore/casa/Arrays/IPosition.h>
#include <casacore/casa/BasicSL/Complexfwd.h>
#include <casacore/tables/Tables/ArrColDesc.h>
Expand Down Expand Up @@ -186,7 +188,7 @@ TEST_F(ColumnConvertTest, SelectFromRange) {
{
// Select some rows from the VAR_DATA column
auto data_column = GetArrayColumn<CasaComplex>(proxy.table(), "VAR_DATA");
auto row_ids = arcae::RowIds{0, 1, 2, 3, 6, 7, 8, 9};
auto row_ids = std::vector<casacore::rownr_t>{0, 1, 2, 3, 6, 7, 8, 9};
ASSERT_OK_AND_ASSIGN(auto map, ColumnReadMap::Make(data_column, arcae::ColumnSelection{{row_ids}}));

ASSERT_TRUE(map.shape_provider_.IsVarying());
Expand Down Expand Up @@ -224,8 +226,8 @@ TEST_F(ColumnConvertTest, SelectFromRange) {
{
// Select some rows and a channel from the VAR_DATA column
auto data_column = GetArrayColumn<CasaComplex>(proxy.table(), "VAR_DATA");
auto row_ids = arcae::RowIds{0, 1, 2, 3, 6, 7, 8, 9};
auto chan_ids = arcae::RowIds{0};
auto row_ids = std::vector<casacore::rownr_t>{0, 1, 2, 3, 6, 7, 8, 9};
auto chan_ids = std::vector<casacore::rownr_t>{0};
ASSERT_OK_AND_ASSIGN(auto map, ColumnReadMap::Make(data_column, arcae::ColumnSelection{{row_ids, chan_ids}}));

ASSERT_TRUE(map.shape_provider_.IsVarying());
Expand Down Expand Up @@ -264,8 +266,8 @@ TEST_F(ColumnConvertTest, SelectFromRange) {
{
// Select some rows and a corr from the VAR_DATA column
auto data_column = GetArrayColumn<CasaComplex>(proxy.table(), "VAR_DATA");
auto row_ids = arcae::RowIds{0, 1, 2, 3, 6, 7, 8, 9};
auto corr_ids = arcae::RowIds{0};
auto row_ids = std::vector<casacore::rownr_t>{0, 1, 2, 3, 6, 7, 8, 9};
auto corr_ids = std::vector<casacore::rownr_t>{0};
ASSERT_OK_AND_ASSIGN(auto map, ColumnReadMap::Make(data_column, arcae::ColumnSelection{{row_ids, {}, corr_ids}}));

ASSERT_TRUE(map.shape_provider_.IsVarying());
Expand Down Expand Up @@ -303,7 +305,7 @@ TEST_F(ColumnConvertTest, SelectFromRange) {
{
// Select some rows a channel and a correlation in the VAR_DATA column
auto data_column = GetArrayColumn<CasaComplex>(proxy.table(), "VAR_DATA");
auto row_ids = arcae::RowIds{0, 1, 2, 3, 6, 7, 8, 9};
auto row_ids = std::vector<casacore::rownr_t>{0, 1, 2, 3, 6, 7, 8, 9};
auto selection = arcae::ColumnSelection{row_ids, {0}, {0}};
ASSERT_OK_AND_ASSIGN(auto map, ColumnReadMap::Make(data_column, selection));

Expand Down
2 changes: 2 additions & 0 deletions src/arcae/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
find_package(Python COMPONENTS Interpreter Development.Module REQUIRED)
find_package(absl CONFIG REQUIRED)
find_package(PkgConfig REQUIRED)
find_program(CYTHON_FOUND "cython" REQUIRED)

Expand All @@ -17,6 +18,7 @@ target_link_libraries(arrow_tables PUBLIC arcae PkgConfig::casacore)
target_link_directories(arrow_tables PUBLIC ${PYARROW_LIBDIRS})
target_include_directories(arrow_tables PUBLIC
PkgConfig::casacore
absl::span
${PYARROW_INCLUDE}
${NUMPY_INCLUDE}
${CMAKE_SOURCE_DIR}/cpp)
Expand Down
15 changes: 15 additions & 0 deletions src/arcae/arrow_tables.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,20 @@ from pyarrow.includes.libarrow cimport *
cdef extern from "<climits>" nogil:
cdef unsigned int UINT_MAX

cdef extern from "<casacore/casa/aipsxtype.h>" namespace "casacore" nogil:
ctypedef unsigned long long uInt64
ctypedef uInt64 rownr_t

cdef extern from "<absl/types/span.h>" namespace "absl" nogil:
cdef cppclass Span[T]:
Span() except +
Span(T * array, size_t length) except +
Span(Span&) except +
size()

cdef extern from "arcae/base_column_map.h" namespace "arcae" nogil:
ctypedef Span[rownr_t] RowIds
ctypedef vector[RowIds] ColumnSelection

cdef extern from "arcae/service_locator.h" namespace "arcae" nogil:
cdef cppclass CServiceLocator" arcae::ServiceLocator":
Expand All @@ -36,6 +50,7 @@ cdef extern from "arcae/safe_table_proxy.h" namespace "arcae" nogil:

CResult[shared_ptr[CTable]] ToArrow " SafeTableProxy::ToArrow"(unsigned int startrow, unsigned int nrow, const vector[string] & columns)
CResult[shared_ptr[CArray]] GetColumn " SafeTableProxy::GetColumn"(const string & column, unsigned int startrow, unsigned int nrow)
CResult[shared_ptr[CArray]] GetColumn2 " SafeTableProxy::GetColumn2"(const string & column, const ColumnSelection & selection)
CResult[string] GetTableDescriptor " SafeTableProxy::GetTableDescriptor"()
CResult[string] GetColumnDescriptor "SafeTableProxy::GetColumnDescriptor"(const string & column)
CResult[unsigned int] nRow " SafeTableProxy::nRow"()
Expand Down
104 changes: 103 additions & 1 deletion src/arcae/arrow_tables.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# cython: language_level = 3

from collections.abc import Iterable, MutableMapping
import ctypes
import cython
import json
from typing import Optional
Expand All @@ -10,10 +11,14 @@ from libcpp cimport bool
from libcpp.map cimport map
from libcpp.memory cimport dynamic_pointer_cast, shared_ptr
from libcpp.string cimport string
from libcpp.utility cimport move
from libcpp.vector cimport vector

import numpy as np
import pyarrow as pa

cimport numpy as cnp

from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *

Expand Down Expand Up @@ -44,6 +49,10 @@ from arcae.arrow_tables cimport (CCasaTable,
CTaql,
complex64,
complex128,
ColumnSelection,
RowIds,
Span,
rownr_t,
UINT_MAX)

def ms_descriptor(table: str, complete: bool = False) -> dict:
Expand Down Expand Up @@ -91,6 +100,49 @@ cdef class ComplexFloatType(ComplexType):
cdef class ComplexFloatArray(ExtensionArray):
pass


cdef class SelectionObj:
cdef vector[vector[rownr_t]] vec_store
cdef ColumnSelection selection

def __init__(self, index: tuple = None):
cdef rownr_t[::1] dim_memview

if index is None:
pass
elif isinstance(index, (tuple, list)):
for d, dim_index in enumerate(index):
if dim_index is None:
self.selection.push_back(RowIds())
elif isinstance(dim_index, slice):
# Convert a slice object into a vector, and then a span of rownr_t
if dim_index.step is not None and dim_index.step != 1:
raise ValueError(f"slice step {dim_index.step} is not 1")

vec_index = vector[rownr_t](dim_index.stop - dim_index.start, 0)

for i, r in enumerate(range(dim_index.start, dim_index.stop)):
vec_index[i] = r

span = RowIds(&vec_index[0], vec_index.size())
self.vec_store.push_back(move(vec_index))
self.selection.push_back(move(span))
elif isinstance(dim_index, np.ndarray):
if dim_index.ndim != 1:
raise ValueError(f"Multi-dimensional ndarray received as index "
f"in dimension {d}")
# Cast to uint64 if necessary
dim_memview = np.require(dim_index, dtype=np.uint64, requirements=["C"])
span = RowIds(&dim_memview[0], dim_memview.shape[0])
self.selection.push_back(move(span))
else:
raise TypeError(f"Invalid index type {type(dim_index)} "
f"for dimension {d}")
else:
raise TypeError(f"index must be a tuple of "
f"(None, slices or numpy arrays), or None")


# Create a Cython extension type around the CCasaTable C++ instance
cdef class Table:
cdef shared_ptr[CCasaTable] c_table
Expand Down Expand Up @@ -147,7 +199,57 @@ cdef class Table:

return pyarrow_wrap_table(ctable)

def getcol(self, column: str, unsigned int startrow=0, unsigned int nrow=UINT_MAX):
def getcol2(self, column: str, index: tuple = None) -> np.ndarray:
cdef string cpp_column = tobytes(column)
cdef SelectionObj selobj = SelectionObj(index)

carray = GetResultValue(self.c_table.get().GetColumn2(cpp_column, selobj.selection))
py_column = pyarrow_wrap_array(carray)

if isinstance(py_column, (pa.ListArray, pa.LargeListArray)):
raise TypeError(f"Can't convert variably shaped column {column} to numpy array")

if isinstance(py_column, pa.NumericArray):
return py_column.to_numpy(zero_copy_only=True)

if isinstance(py_column, pa.StringArray):
return py_column.to_numpy(zero_copy_only=False)

if isinstance(py_column, pa.FixedSizeListArray):
shape = [len(py_column)]
nested_column = py_column
zero_copy_only = True

while True:
if pa.types.is_primitive(nested_column.type):
break
elif isinstance(nested_column, pa.StringArray):
zero_copy_only = False
break
elif isinstance(nested_column, pa.FixedSizeListArray):
shape.append(nested_column.type.list_size)
nested_column = nested_column.flatten()
else:
raise TypeError(f"Encountered invalid type {nested_column.type} "
f"when converting column {column} from a "
f"nested FixedSizeListArray to a numpy array. "
f"Only FixedSizeListArrays or PrimitiveArrays "
f"are supported")

nested_column = nested_column.to_numpy(zero_copy_only=zero_copy_only)
array = nested_column.reshape(tuple(shape))

# Convert to complex if necessary
if "COMPLEX" in self.getcoldesc(column)["valueType"].upper():
complex_dtype = np.result_type(array.dtype, np.complex64)
array = array.view(complex_dtype)[..., 0]

return array

raise TypeError(f"Unhandled column type {py_column.type}")


def getcol(self, column: str, unsigned int startrow=0, unsigned int nrow=UINT_MAX) -> np.ndarray:
cdef:
shared_ptr[CArray] carray
string cpp_column = tobytes(column)
Expand Down
19 changes: 19 additions & 0 deletions src/arcae/tests/test_pytable.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,25 @@ def test_getcol(getcol_table):
with pytest.raises(pa.lib.ArrowException, match="NONEXISTENT does not exist"):
T.getcol("NONEXISTENT")

def test_getcol2(getcol_table):
T = arcae.table(getcol_table)

assert_array_equal(T.getcol2("TIME"), [0, 1, 2])
assert_array_equal(T.getcol2("TIME", (slice(0, 2),)), [0, 1])
assert_array_equal(T.getcol2("TIME", (np.array([0, 1]),)), [0, 1])
assert_array_equal(T.getcol2("TIME", (np.array([0, 2]),)), [0, 2])
assert_array_equal(T.getcol2("STRING"), ["0", "1", "2"])

assert_array_equal(T.getcol2("FLOAT_DATA"), [
[[0, 0, 0, 0], [0, 0, 0, 0]],
[[1, 1, 1, 1], [1, 1, 1, 1]],
[[2, 2, 2, 2], [2, 2, 2, 2]]])

assert_array_equal(T.getcol2("FLOAT_DATA", (slice(0, 2), slice(0, 2))), [
[[0, 0, 0, 0], [0, 0, 0, 0]],
[[1, 1, 1, 1], [1, 1, 1, 1]]])


def test_partial_read(sorting_table):
""" Tests that partial reads work """
T = arcae.table(sorting_table)
Expand Down

0 comments on commit c88c6a4

Please sign in to comment.