Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-24834: [C#] Support writing compressed IPC data #39871

Merged
merged 11 commits into from
Feb 7, 2024
8 changes: 4 additions & 4 deletions csharp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,10 @@ for currently available features.

### Compression

- Buffer compression is not supported when writing IPC files or streams
- Buffer decompression is supported, but requires installing the `Apache.Arrow.Compression` package,
and passing an `Apache.Arrow.Compression.CompressionCodecFactory` instance to the
`ArrowFileReader` or `ArrowStreamReader` constructor.
- Buffer compression and decompression is supported, but requires installing the `Apache.Arrow.Compression` package.
When reading compressed data, you must pass an `Apache.Arrow.Compression.CompressionCodecFactory` instance to the
`ArrowFileReader` or `ArrowStreamReader` constructor, and when writing compressed data a
`CompressionCodecFactory` must be set in the `IpcOptions`.
Alternatively, a custom implementation of `ICompressionCodecFactory` can be used.

## Not Implemented
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,16 @@ namespace Apache.Arrow.Compression
public sealed class CompressionCodecFactory : ICompressionCodecFactory
{
public ICompressionCodec CreateCodec(CompressionCodecType compressionCodecType)
{
return CreateCodec(compressionCodecType, null);
}

public ICompressionCodec CreateCodec(CompressionCodecType compressionCodecType, int? compressionLevel)
{
return compressionCodecType switch
{
CompressionCodecType.Lz4Frame => Lz4CompressionCodec.Instance,
CompressionCodecType.Zstd => new ZstdCompressionCodec(),
CompressionCodecType.Lz4Frame => new Lz4CompressionCodec(compressionLevel),
CompressionCodecType.Zstd => new ZstdCompressionCodec(compressionLevel),
_ => throw new NotImplementedException($"Compression type {compressionCodecType} is not supported")
};
}
Expand Down
32 changes: 28 additions & 4 deletions csharp/src/Apache.Arrow.Compression/Lz4CompressionCodec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,48 @@
// limitations under the License.

using System;
using System.IO;
using Apache.Arrow.Ipc;
using K4os.Compression.LZ4;
using K4os.Compression.LZ4.Streams;

namespace Apache.Arrow.Compression
{
internal sealed class Lz4CompressionCodec : ICompressionCodec
{
/// <summary>
/// Singleton instance, used as this class doesn't need to be disposed and has no state
/// </summary>
public static readonly Lz4CompressionCodec Instance = new Lz4CompressionCodec();
private readonly LZ4EncoderSettings _settings = null;

public Lz4CompressionCodec(int? compressionLevel = null)
{
if (compressionLevel.HasValue)
{
if (Enum.IsDefined(typeof(LZ4Level), compressionLevel))
{
_settings = new LZ4EncoderSettings
{
CompressionLevel = (LZ4Level) compressionLevel,
};
}
else
{
throw new ArgumentException(
$"Invalid LZ4 compression level ({compressionLevel})", nameof(compressionLevel));
}
}
}

public int Decompress(ReadOnlyMemory<byte> source, Memory<byte> destination)
{
using var decoder = LZ4Frame.Decode(source);
return decoder.ReadManyBytes(destination.Span);
}

public void Compress(ReadOnlyMemory<byte> source, Stream destination)
{
using var encoder = LZ4Frame.Encode(destination, _settings, leaveOpen: true);
encoder.WriteManyBytes(source.Span);
}

public void Dispose()
{
}
Expand Down
22 changes: 21 additions & 1 deletion csharp/src/Apache.Arrow.Compression/ZstdCompressionCodec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// limitations under the License.

using System;
using System.IO;
using Apache.Arrow.Ipc;
using ZstdSharp;

Expand All @@ -22,20 +23,39 @@ namespace Apache.Arrow.Compression
internal sealed class ZstdCompressionCodec : ICompressionCodec
{
private readonly Decompressor _decompressor;
private readonly Compressor _compressor;

public ZstdCompressionCodec()
public ZstdCompressionCodec(int? compressionLevel = null)
{
if (compressionLevel.HasValue &&
(compressionLevel.Value < Compressor.MinCompressionLevel ||
compressionLevel.Value > Compressor.MaxCompressionLevel))
{
throw new ArgumentException(
$"Zstd compression level must be between {Compressor.MinCompressionLevel} and {Compressor.MaxCompressionLevel}",
nameof(compressionLevel));
}

_decompressor = new Decompressor();
_compressor = new Compressor(compressionLevel ?? Compressor.DefaultCompressionLevel);
}

public int Decompress(ReadOnlyMemory<byte> source, Memory<byte> destination)
{
return _decompressor.Unwrap(source.Span, destination.Span);
}

public void Compress(ReadOnlyMemory<byte> source, Stream destination)
{
using var compressor = new CompressionStream(
destination, _compressor, preserveCompressor: true, leaveOpen: true);
compressor.Write(source.Span);
}

public void Dispose()
{
_decompressor.Dispose();
_compressor.Dispose();
}
}
}
10 changes: 8 additions & 2 deletions csharp/src/Apache.Arrow/Ipc/ArrowFileWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Apache.Arrow.Memory;

namespace Apache.Arrow.Ipc
{
Expand All @@ -37,12 +38,17 @@ public ArrowFileWriter(Stream stream, Schema schema)
}

public ArrowFileWriter(Stream stream, Schema schema, bool leaveOpen)
: this(stream, schema, leaveOpen, options: null)
: this(stream, schema, leaveOpen, options: null, allocator: null)
{
}

public ArrowFileWriter(Stream stream, Schema schema, bool leaveOpen, IpcOptions options)
: base(stream, schema, leaveOpen, options)
: this(stream, schema, leaveOpen, options, allocator: null)
{
}

public ArrowFileWriter(Stream stream, Schema schema, bool leaveOpen, IpcOptions options, MemoryAllocator allocator)
: base(stream, schema, leaveOpen, options, allocator)
{
if (!stream.CanWrite)
{
Expand Down
Loading
Loading