-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathbuffer.go
249 lines (204 loc) · 7.52 KB
/
buffer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
package tiledb
/*
#include <tiledb/tiledb.h>
#include <stdlib.h>
*/
import "C"
import (
"bytes"
"errors"
"fmt"
"io"
"math"
"runtime"
"unsafe"
)
// Buffer A generic Buffer object used by some TileDB APIs
type Buffer struct {
tiledbBuffer *C.tiledb_buffer_t
context *Context
// data is a reference to the memory that this Buffer refers to.
// If this is set to `nil`, the Buffer is was allocated and its memory is
// owned by TileDB internals.
//
// Buffer technically violates the contract of CGo, by passing []byte slices
// to C code, which holds onto it long after the CGo call has returned.
// This means that, without keeping this around, Go thinks it can collect
// the store that we've passed in:
//
// someBytes := getSomeBytes()
// buf.SetBuffer(someBytes)
// // if it's not referenced later, someBytes might now be collected!
//
// By holding onto this reference here, we shield the caller from this
// happening to them. This is still unsafe per the language spec, but because
// the Go garbage collector (as of v1.18) does not move objects around,
// this is not THAT dangerous at runtime.
data byteBuffer
}
// NewBuffer allocates a new buffer.
func NewBuffer(context *Context) (*Buffer, error) {
buffer := Buffer{context: context}
if buffer.context == nil {
return nil, errors.New("error creating tiledb buffer, context is nil")
}
ret := C.tiledb_buffer_alloc(buffer.context.tiledbContext, &buffer.tiledbBuffer)
if ret != C.TILEDB_OK {
return nil, fmt.Errorf("error creating tiledb buffer: %w", buffer.context.LastError())
}
freeOnGC(&buffer)
return &buffer, nil
}
// Free releases the internal TileDB core data that was allocated on the C heap.
// It is automatically called when this object is garbage collected, but can be
// called earlier to manually release memory if needed. Free is idempotent and
// can safely be called many times on the same object; if it has already
// been freed, it will not be freed again.
func (b *Buffer) Free() {
b.data = nil
if b.tiledbBuffer != nil {
C.tiledb_buffer_free(&b.tiledbBuffer)
}
}
// Context exposes the internal TileDB context used to initialize the buffer.
func (b *Buffer) Context() *Context {
return b.context
}
// SetType sets the buffer datatype.
func (b *Buffer) SetType(datatype Datatype) error {
ret := C.tiledb_buffer_set_type(b.context.tiledbContext, b.tiledbBuffer, C.tiledb_datatype_t(datatype))
if ret != C.TILEDB_OK {
return fmt.Errorf("error setting datatype for tiledb buffer: %w", b.context.LastError())
}
return nil
}
// Type returns the buffer datatype.
func (b *Buffer) Type() (Datatype, error) {
var bufferType C.tiledb_datatype_t
ret := C.tiledb_buffer_get_type(b.context.tiledbContext, b.tiledbBuffer, &bufferType)
if ret != C.TILEDB_OK {
return 0, fmt.Errorf("error getting tiledb buffer type: %w", b.context.LastError())
}
return Datatype(bufferType), nil
}
// Serialize returns a copy of the bytes in the buffer.
//
// Deprecated: Use WriteTo or ReadAt instead for increased performance.
func (b *Buffer) Serialize(serializationType SerializationType) ([]byte, error) {
bs, err := b.dataCopy()
if err != nil {
return nil, err
}
switch serializationType {
case TILEDB_CAPNP:
// The entire byte array contains Cap'nP data. Don't bother it.
case TILEDB_JSON:
// The data might be a null-terminated string. Strip off the terminator.
bs = bytes.TrimSuffix(bs, []byte{0})
default:
return nil, fmt.Errorf("unsupported serialization type: %v", serializationType)
}
return bs, nil
}
// ReadAt writes the contents of a Buffer at a given offset to a slice.
func (b *Buffer) ReadAt(p []byte, off int64) (int, error) {
if off < 0 {
return 0, errors.New("offset cannot be negative")
}
var cbuffer unsafe.Pointer
var csize C.uint64_t
ret := C.tiledb_buffer_get_data(b.context.tiledbContext, b.tiledbBuffer, &cbuffer, &csize)
if ret != C.TILEDB_OK {
return 0, fmt.Errorf("error getting tiledb buffer data: %w", b.context.LastError())
}
if uintptr(off) >= uintptr(csize) || cbuffer == nil {
// Match ReaderAt behavior of os.File and fail with io.EOF if the offset is greater or equal to the size.
return 0, io.EOF
}
availableBytes := uint64(csize) - uint64(off)
sizeToRead := min(math.MaxInt, int(availableBytes))
readSize := copy(p, unsafe.Slice((*byte)(unsafe.Pointer(uintptr(cbuffer)+uintptr(off))), sizeToRead))
var err error
if int64(readSize)+off == int64(csize) {
err = io.EOF
}
return readSize, err
}
// WriteTo writes the contents of a Buffer to an io.Writer.
func (b *Buffer) WriteTo(w io.Writer) (int64, error) {
var cbuffer unsafe.Pointer
var csize C.uint64_t
ret := C.tiledb_buffer_get_data(b.context.tiledbContext, b.tiledbBuffer, &cbuffer, &csize)
if ret != C.TILEDB_OK {
return 0, fmt.Errorf("error getting tiledb buffer data: %w", b.context.LastError())
}
if cbuffer == nil || csize == 0 {
return 0, nil
}
remaining := int64(csize)
// Because io.Writer supports writing up to 2GB of data at a time, we have to use a loop
// for the bigger buffers.
for remaining > 0 {
writeSize := min(math.MaxInt, int(remaining))
// Construct a slice from the buffer's data without copying it.
n, err := w.Write(unsafe.Slice((*byte)(unsafe.Pointer(uintptr(cbuffer)+uintptr(csize)-uintptr(remaining))), writeSize))
remaining -= int64(n)
if err != nil {
return int64(csize) - remaining, fmt.Errorf("error writing buffer to writer: %w", err)
}
}
return int64(csize), nil
}
// Static assert that Buffer implements io.WriterTo.
var _ io.WriterTo = (*Buffer)(nil)
var _ io.ReaderAt = (*Buffer)(nil)
// SetBuffer sets the buffer to point at the given Go slice. The memory is now
// Go-managed.
func (b *Buffer) SetBuffer(buffer []byte) error {
b.data = byteBuffer(buffer)
ret := C.tiledb_buffer_set_data(b.context.tiledbContext, b.tiledbBuffer, b.data.start(), C.uint64_t(b.data.lenBytes()))
if ret != C.TILEDB_OK {
return fmt.Errorf("error setting tiledb buffer: %w", b.context.LastError())
}
return nil
}
// dataCopy returns a copy of the bytes stored in the buffer.
func (b *Buffer) dataCopy() ([]byte, error) {
var cbuffer unsafe.Pointer
var csize C.uint64_t
ret := C.tiledb_buffer_get_data(b.context.tiledbContext, b.tiledbBuffer, &cbuffer, &csize)
if ret != C.TILEDB_OK {
return nil, fmt.Errorf("error getting tiledb buffer data: %w", b.context.LastError())
}
if cbuffer == nil {
return nil, nil
}
if b.data == nil {
// This is a TileDB-managed buffer. We need to copy its data into Go memory.
// We assume that once a buffer is set to point to user-provided memory,
// TileDB never updates the buffer to point to its own memory (i.e., the
// only time when there will be a buffer pointing to TileDB-owned memory is
// when TileDB allocates a fresh buffer, e.g. as an out parameter from a
// serialization function).
// Since this buffer is TileDB-managed, make sure it's not GC'd before we're
// done with its memory.
defer runtime.KeepAlive(b)
if csize > math.MaxInt32 {
return nil, fmt.Errorf("TileDB's buffer (%d) larger than maximum allowed CGo buffer (%d)", csize, math.MaxInt32)
}
return C.GoBytes(cbuffer, C.int(csize)), nil
}
gotBytes := b.data.subSlice(cbuffer, uintptr(csize))
cpy := make([]byte, len(gotBytes))
copy(cpy, gotBytes)
return cpy, nil
}
func (b *Buffer) Len() (uint64, error) {
var cbuffer unsafe.Pointer
var csize C.uint64_t
ret := C.tiledb_buffer_get_data(b.context.tiledbContext, b.tiledbBuffer, &cbuffer, &csize)
if ret != C.TILEDB_OK {
return 0, fmt.Errorf("error getting tiledb buffer data: %w", b.context.LastError())
}
return uint64(csize), nil
}