Skip to content

Commit

Permalink
feat(arrow/cdata): Add Implementation of Async C Data interface (#169)
Browse files Browse the repository at this point in the history
This adds a basic implementation of helpers for managing an
ArrowAsyncDeviceStreamHandler for using the Async Arrow C Device
interface. The corresponding C++ helper implementation can be found at
apache/arrow#44495 with the discusson on the
actual C structures located at
apache/arrow#43632.

---------

Co-authored-by: Sutou Kouhei <kou@cozmixng.org>
  • Loading branch information
zeroshade and kou authored Nov 12, 2024
1 parent 892038c commit d10a859
Show file tree
Hide file tree
Showing 9 changed files with 948 additions and 20 deletions.
341 changes: 336 additions & 5 deletions arrow/cdata/arrow/c/abi.h

Large diffs are not rendered by default.

91 changes: 76 additions & 15 deletions arrow/cdata/arrow/c/helpers.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,97 +18,158 @@
#pragma once

#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include "arrow/c/abi.h"

#define ARROW_C_ASSERT(condition, msg) \
do { \
if (!(condition)) { \
fprintf(stderr, "%s:%d:: %s", __FILE__, __LINE__, (msg)); \
abort(); \
} \
} while (0)

#ifdef __cplusplus
extern "C" {
#endif

/// Query whether the C schema is released
static inline int ArrowSchemaIsReleased(const struct ArrowSchema* schema) {
inline int ArrowSchemaIsReleased(const struct ArrowSchema* schema) {
return schema->release == NULL;
}

/// Mark the C schema released (for use in release callbacks)
static inline void ArrowSchemaMarkReleased(struct ArrowSchema* schema) {
inline void ArrowSchemaMarkReleased(struct ArrowSchema* schema) {
schema->release = NULL;
}

/// Move the C schema from `src` to `dest`
///
/// Note `dest` must *not* point to a valid schema already, otherwise there
/// will be a memory leak.
static inline void ArrowSchemaMove(struct ArrowSchema* src, struct ArrowSchema* dest) {
inline void ArrowSchemaMove(struct ArrowSchema* src, struct ArrowSchema* dest) {
assert(dest != src);
assert(!ArrowSchemaIsReleased(src));
memcpy(dest, src, sizeof(struct ArrowSchema));
ArrowSchemaMarkReleased(src);
}

/// Release the C schema, if necessary, by calling its release callback
static inline void ArrowSchemaRelease(struct ArrowSchema* schema) {
inline void ArrowSchemaRelease(struct ArrowSchema* schema) {
if (!ArrowSchemaIsReleased(schema)) {
schema->release(schema);
assert(ArrowSchemaIsReleased(schema));
ARROW_C_ASSERT(ArrowSchemaIsReleased(schema),
"ArrowSchemaRelease did not cleanup release callback");
}
}

/// Query whether the C array is released
static inline int ArrowArrayIsReleased(const struct ArrowArray* array) {
inline int ArrowArrayIsReleased(const struct ArrowArray* array) {
return array->release == NULL;
}

inline int ArrowDeviceArrayIsReleased(const struct ArrowDeviceArray* array) {
return ArrowArrayIsReleased(&array->array);
}

/// Mark the C array released (for use in release callbacks)
static inline void ArrowArrayMarkReleased(struct ArrowArray* array) { array->release = NULL; }
inline void ArrowArrayMarkReleased(struct ArrowArray* array) { array->release = NULL; }

inline void ArrowDeviceArrayMarkReleased(struct ArrowDeviceArray* array) {
ArrowArrayMarkReleased(&array->array);
}

/// Move the C array from `src` to `dest`
///
/// Note `dest` must *not* point to a valid array already, otherwise there
/// will be a memory leak.
static inline void ArrowArrayMove(struct ArrowArray* src, struct ArrowArray* dest) {
inline void ArrowArrayMove(struct ArrowArray* src, struct ArrowArray* dest) {
assert(dest != src);
assert(!ArrowArrayIsReleased(src));
memcpy(dest, src, sizeof(struct ArrowArray));
ArrowArrayMarkReleased(src);
}

inline void ArrowDeviceArrayMove(struct ArrowDeviceArray* src,
struct ArrowDeviceArray* dest) {
assert(dest != src);
assert(!ArrowDeviceArrayIsReleased(src));
memcpy(dest, src, sizeof(struct ArrowDeviceArray));
ArrowDeviceArrayMarkReleased(src);
}

/// Release the C array, if necessary, by calling its release callback
static inline void ArrowArrayRelease(struct ArrowArray* array) {
inline void ArrowArrayRelease(struct ArrowArray* array) {
if (!ArrowArrayIsReleased(array)) {
array->release(array);
assert(ArrowArrayIsReleased(array));
ARROW_C_ASSERT(ArrowArrayIsReleased(array),
"ArrowArrayRelease did not cleanup release callback");
}
}

inline void ArrowDeviceArrayRelease(struct ArrowDeviceArray* array) {
if (!ArrowDeviceArrayIsReleased(array)) {
array->array.release(&array->array);
ARROW_C_ASSERT(ArrowDeviceArrayIsReleased(array),
"ArrowDeviceArrayRelease did not cleanup release callback");
}
}

/// Query whether the C array stream is released
static inline int ArrowArrayStreamIsReleased(const struct ArrowArrayStream* stream) {
inline int ArrowArrayStreamIsReleased(const struct ArrowArrayStream* stream) {
return stream->release == NULL;
}

inline int ArrowDeviceArrayStreamIsReleased(const struct ArrowDeviceArrayStream* stream) {
return stream->release == NULL;
}

/// Mark the C array stream released (for use in release callbacks)
static inline void ArrowArrayStreamMarkReleased(struct ArrowArrayStream* stream) {
inline void ArrowArrayStreamMarkReleased(struct ArrowArrayStream* stream) {
stream->release = NULL;
}

inline void ArrowDeviceArrayStreamMarkReleased(struct ArrowDeviceArrayStream* stream) {
stream->release = NULL;
}

/// Move the C array stream from `src` to `dest`
///
/// Note `dest` must *not* point to a valid stream already, otherwise there
/// will be a memory leak.
static inline void ArrowArrayStreamMove(struct ArrowArrayStream* src,
inline void ArrowArrayStreamMove(struct ArrowArrayStream* src,
struct ArrowArrayStream* dest) {
assert(dest != src);
assert(!ArrowArrayStreamIsReleased(src));
memcpy(dest, src, sizeof(struct ArrowArrayStream));
ArrowArrayStreamMarkReleased(src);
}

inline void ArrowDeviceArrayStreamMove(struct ArrowDeviceArrayStream* src,
struct ArrowDeviceArrayStream* dest) {
assert(dest != src);
assert(!ArrowDeviceArrayStreamIsReleased(src));
memcpy(dest, src, sizeof(struct ArrowDeviceArrayStream));
ArrowDeviceArrayStreamMarkReleased(src);
}

/// Release the C array stream, if necessary, by calling its release callback
static inline void ArrowArrayStreamRelease(struct ArrowArrayStream* stream) {
inline void ArrowArrayStreamRelease(struct ArrowArrayStream* stream) {
if (!ArrowArrayStreamIsReleased(stream)) {
stream->release(stream);
assert(ArrowArrayStreamIsReleased(stream));
ARROW_C_ASSERT(ArrowArrayStreamIsReleased(stream),
"ArrowArrayStreamRelease did not cleanup release callback");
}
}

inline void ArrowDeviceArrayStreamRelease(struct ArrowDeviceArrayStream* stream) {
if (!ArrowDeviceArrayStreamIsReleased(stream)) {
stream->release(stream);
ARROW_C_ASSERT(ArrowDeviceArrayStreamIsReleased(stream),
"ArrowDeviceArrayStreamRelease did not cleanup release callback");
}
}

Expand Down
5 changes: 5 additions & 0 deletions arrow/cdata/cdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ type (
CArrowArray = C.struct_ArrowArray
// CArrowArrayStream is the C Stream Interface object for handling streams of record batches.
CArrowArrayStream = C.struct_ArrowArrayStream

CArrowAsyncDeviceStreamHandler = C.struct_ArrowAsyncDeviceStreamHandler
CArrowAsyncProducer = C.struct_ArrowAsyncProducer
CArrowAsyncTask = C.struct_ArrowAsyncTask
CArrowDeviceArray = C.struct_ArrowDeviceArray
)

// Map from the defined strings to their corresponding arrow.DataType interface
Expand Down
80 changes: 80 additions & 0 deletions arrow/cdata/cdata_exports.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,23 @@ package cdata
// void goReleaseSchema(struct ArrowSchema* schema) {
// releaseExportedSchema(schema);
// }
//
// void goCallCancel(struct ArrowAsyncProducer* producer) {
// producer->cancel(producer);
// }
//
// int goExtractTaskData(struct ArrowAsyncTask* task, struct ArrowDeviceArray* out) {
// return task->extract_data(task, out);
// }
//
// static void goCallRequest(struct ArrowAsyncProducer* producer, int64_t n) {
// producer->request(producer, n);
// }
import "C"

import (
"bytes"
"context"
"encoding/binary"
"fmt"
"runtime/cgo"
Expand Down Expand Up @@ -489,3 +502,70 @@ func (rr cRecordReader) release() {
}
rr.rdr.Release()
}

type cAsyncStreamHandler struct {
producer *CArrowAsyncProducer
taskQueue chan taskState
ctx context.Context
}

func asyncTaskQueue(ctx context.Context, schema *arrow.Schema, recordStream chan<- RecordMessage, taskQueue <-chan taskState, producer *CArrowAsyncProducer) {
defer close(recordStream)
for {
select {
case <-ctx.Done():
C.goCallCancel(producer)
return
case task, ok := <-taskQueue:
// if the queue closes or we receive a nil task, we're done
if !ok || (task.err == nil && task.task.extract_data == nil) {
return
}

if task.err != nil {
recordStream <- RecordMessage{Err: task.err}
continue
}

// request another batch now that we've processed this one
C.goCallRequest(producer, C.int64_t(1))

var out CArrowDeviceArray
if C.goExtractTaskData(&task.task, &out) != C.int(0) {
continue
}

rec, err := ImportCRecordBatchWithSchema(&out.array, schema)
if err != nil {
recordStream <- RecordMessage{Err: err}
} else {
recordStream <- RecordMessage{Record: rec, AdditionalMetadata: task.meta}
}
}
}
}

func (h *cAsyncStreamHandler) onNextTask(task *CArrowAsyncTask, metadata *C.char) C.int {
if task == nil {
h.taskQueue <- taskState{}
return 0
}

ts := taskState{task: *task}
if metadata != nil {
ts.meta = decodeCMetadata(metadata)
}
h.taskQueue <- ts
return 0
}

func (h *cAsyncStreamHandler) onError(code C.int, message, metadata *C.char) {
h.taskQueue <- taskState{err: AsyncStreamError{
Code: int(code), Msg: C.GoString(message), Metadata: C.GoString(metadata)}}
}

func (h *cAsyncStreamHandler) release() {
close(h.taskQueue)
h.taskQueue, h.producer = nil, nil
h.producer = nil
}
Loading

0 comments on commit d10a859

Please sign in to comment.