Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-6540): Add c++ zstd compression API #30

Merged
merged 21 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions .github/workflows/lint.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
name: Lint

on:
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]

jobs:
build:
runs-on: ubuntu-latest

name: ${{ matrix.lint-target }}
strategy:
matrix:
lint-target: ["c++", "typescript"]

steps:
- uses: actions/checkout@v4

- name: Use Node.js LTS
uses: actions/setup-node@v4
with:
node-version: 'lts/*'
cache: 'npm'

- name: Install dependencies
shell: bash
run: npm i --ignore-scripts

- if: matrix.lint-target == 'c++'
shell: bash
run: |
npm run check:clang-format
- if: matrix.lint-target == 'typescript'
shell: bash
run: |
npm run check:eslint
8 changes: 6 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,12 @@ jobs:
cache: 'npm'
registry-url: 'https://registry.npmjs.org'

- name: Build with Node.js ${{ matrix.node }} on ${{ matrix.os }}
run: npm install && npm run compile
- name: Install zstd
run: npm run install-zstd
shell: bash

- name: install dependencies and compmile
run: npm install --loglevel verbose
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
shell: bash

- name: Test ${{ matrix.os }}
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ node_modules
build

npm-debug.log
deps
71 changes: 71 additions & 0 deletions addon/compression.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@


addaleax marked this conversation as resolved.
Show resolved Hide resolved
#include "compression.h"

std::vector<uint8_t> Compression::compress(const std::vector<uint8_t>& data,
size_t compression_level) {
size_t output_buffer_size = ZSTD_compressBound(data.size());
std::vector<uint8_t> output(output_buffer_size);

size_t result_code =
ZSTD_compress(output.data(), output.size(), data.data(), data.size(), compression_level);

if (ZSTD_isError(result_code)) {
throw std::runtime_error(ZSTD_getErrorName(result_code));
}

output.resize(result_code);

return output;
}

std::vector<uint8_t> Compression::decompress(const std::vector<uint8_t>& compressed) {
std::vector<uint8_t> decompressed;

using DCTX_Deleter = void (*)(ZSTD_DCtx*);

std::unique_ptr<ZSTD_DCtx, DCTX_Deleter> decompression_context(
ZSTD_createDCtx(), [](ZSTD_DCtx* ctx) { ZSTD_freeDCtx(ctx); });

ZSTD_inBuffer input = {compressed.data(), compressed.size(), 0};
std::vector<uint8_t> output_buffer(ZSTD_DStreamOutSize());
ZSTD_outBuffer output = {output_buffer.data(), output_buffer.size(), 0};

// Source: https://facebook.github.io/zstd/zstd_manual.html#Chapter9
//
// Use ZSTD_decompressStream() repetitively to consume your input.
// The function will update both `pos` fields.
// If `input.pos < input.size`, some input has not been consumed.
// It's up to the caller to present again remaining data.
// The function tries to flush all data decoded immediately, respecting output buffer size.
// If `output.pos < output.size`, decoder has flushed everything it could.
// But if `output.pos == output.size`, there might be some data left within internal buffers.,
// In which case, call ZSTD_decompressStream() again to flush whatever remains in the buffer.
// Note : with no additional input provided, amount of data flushed is necessarily <=
// ZSTD_BLOCKSIZE_MAX.
// @return : 0 when a frame is completely decoded and fully flushed,
// or an error code, which can be tested using ZSTD_isError(),
// or any other value > 0, which means there is still some decoding or flushing to do to
// complete current frame :
// the return value is a suggested next input size (just a hint
// for better latency) that will never request more than the
// remaining frame size.
auto inputRemains = [](ZSTD_inBuffer& input) { return input.pos < input.size; };
auto isOutputBufferFlushed = [](ZSTD_outBuffer& output) { return output.pos < output.size; };

while (inputRemains(input) || !isOutputBufferFlushed(output)) {
size_t const ret = ZSTD_decompressStream(decompression_context.get(), &output, &input);
if (ZSTD_isError(ret)) {
throw std::runtime_error(ZSTD_getErrorName(ret));
}

for (size_t i = 0; i < output.pos; ++i) {
decompressed.push_back(output_buffer[i]);
}

// move the position back go 0, to indicate that we are ready for more data
output.pos = 0;
}

return decompressed;
}
15 changes: 15 additions & 0 deletions addon/compression.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#ifndef MONGODB_ZSTD_COMPRESSION
#define MONGODB_ZSTD_COMPRESSION

#include <exception>
#include <vector>

#include "compression_worker.h"
#include "zstd.h"

namespace Compression {
std::vector<uint8_t> compress(const std::vector<uint8_t>& data, size_t compression_level);
std::vector<uint8_t> decompress(const std::vector<uint8_t>& data);
} // namespace Compression

#endif
43 changes: 43 additions & 0 deletions addon/compression_worker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#ifndef COMPRESSION_WORKER_H
#define COMPRESSION_WORKER_H
addaleax marked this conversation as resolved.
Show resolved Hide resolved
#include <napi.h>

#include <optional>
#include <variant>

using namespace Napi;
addaleax marked this conversation as resolved.
Show resolved Hide resolved

/**
* @brief An asynchronous Napi::Worker that can be with any function that produces
* CompressionResults.
* */
class CompressionWorker final : public Napi::AsyncWorker {
public:
CompressionWorker(const Napi::Function& callback, std::function<std::vector<uint8_t>()> worker)
: Napi::AsyncWorker{callback, "compression worker"}, m_worker(worker), m_result{} {}

protected:
void Execute() final {
W-A-James marked this conversation as resolved.
Show resolved Hide resolved
m_result = m_worker();
}

void OnOK() final {
if (!m_result.has_value()) {
Callback().Call({Napi::Error::New(Env(),
"zstd runtime error - async worker finished without "
"a compression or decompression result.")
.Value()});
}
addaleax marked this conversation as resolved.
Show resolved Hide resolved

std::vector<uint8_t> data = *m_result;
addaleax marked this conversation as resolved.
Show resolved Hide resolved
Buffer result = Buffer<uint8_t>::Copy(Env(), data.data(), data.size());

Callback().Call({Env().Undefined(), result});
}

private:
std::function<std::vector<uint8_t>()> m_worker;
std::optional<std::vector<uint8_t>> m_result;
};

#endif
50 changes: 44 additions & 6 deletions addon/zstd.cpp
Original file line number Diff line number Diff line change
@@ -1,14 +1,52 @@
#include "zstd.h"

#include <napi.h>

#include <string>
#include <vector>

#include "compression.h"
#include "compression_worker.h"

using namespace Napi;

Napi::String Compress(const Napi::CallbackInfo& info) {
auto string = Napi::String::New(info.Env(), "compress()");
return string;
void Compress(const Napi::CallbackInfo& info) {
// Argument handling happens in JS
if (info.Length() != 3) {
std::string error_message = "Expected three arguments.";
throw TypeError::New(info.Env(), error_message);
}

Uint8Array to_compress = info[0].As<Uint8Array>();
std::vector<uint8_t> data(to_compress.Data(), to_compress.Data() + to_compress.ElementLength());

size_t compression_level = static_cast<size_t>(info[1].ToNumber().Int32Value());
const Napi::Function& callback = info[2].As<Function>();
addaleax marked this conversation as resolved.
Show resolved Hide resolved

CompressionWorker* worker =
new CompressionWorker(callback, [data = std::move(data), compression_level] {
return Compression::compress(data, compression_level);
});

worker->Queue();
}
Napi::String Decompress(const Napi::CallbackInfo& info) {
auto string = Napi::String::New(info.Env(), "decompress()");
return string;

void Decompress(const CallbackInfo& info) {
// Argument handling happens in JS
if (info.Length() != 2) {
std::string error_message = "Expected two argument.";
throw TypeError::New(info.Env(), error_message);
}

Napi::Uint8Array compressed_data = info[0].As<Uint8Array>();
std::vector<uint8_t> data(compressed_data.Data(),
compressed_data.Data() + compressed_data.ElementLength());
const Napi::Function& callback = info[1].As<Function>();

CompressionWorker* worker = new CompressionWorker(
callback, [data = std::move(data)] { return Compression::decompress(data); });

worker->Queue();
}

Napi::Object Init(Napi::Env env, Napi::Object exports) {
Expand Down
18 changes: 14 additions & 4 deletions binding.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,35 @@
'type': 'loadable_module',
'defines': ['ZSTD_STATIC_LINKING_ONLY'],
'include_dirs': [
"<!(node -p \"require('node-addon-api').include_dir\")"
"<!(node -p \"require('node-addon-api').include_dir\")",
"<(module_root_dir)/deps/zstd/lib",
],
'variables': {
'ARCH': '<(host_arch)',
'built_with_electron%': 0
},
'sources': [
'addon/zstd.cpp'
'addon/zstd.cpp',
'addon/compression_worker.h',
'addon/compression.h',
'addon/compression.cpp'
],
'xcode_settings': {
'GCC_ENABLE_CPP_EXCEPTIONS': 'YES',
'CLANG_CXX_LIBRARY': 'libc++',
'MACOSX_DEPLOYMENT_TARGET': '10.12',
'MACOSX_DEPLOYMENT_TARGET': '11',
'GCC_SYMBOLS_PRIVATE_EXTERN': 'YES', # -fvisibility=hidden
},
'cflags!': [ '-fno-exceptions' ],
'cflags_cc!': [ '-fno-exceptions' ],
'cflags_cc': ['-std=c++17'],
'msvs_settings': {
'VCCLCompilerTool': { 'ExceptionHandling': 1 },
}
},
'link_settings': {
addaleax marked this conversation as resolved.
Show resolved Hide resolved
'libraries': [
'<(module_root_dir)/deps/zstd/build/cmake/lib/libzstd.a',
]
},
}]
}
26 changes: 26 additions & 0 deletions etc/install-zstd.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@

addaleax marked this conversation as resolved.
Show resolved Hide resolved
set -o xtrace

clean_deps() {
rm -rf deps
}

download_zstd() {
rm -rf deps
mkdir -p deps/zstd

curl -L "https://github.com/facebook/zstd/releases/download/v1.5.6/zstd-1.5.6.tar.gz" \
| tar -zxf - -C deps/zstd --strip-components 1
}

build_zstd() {
export MACOSX_DEPLOYMENT_TARGET=10.12
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
cd deps/zstd/build/cmake

cmake .
make
}

clean_deps
download_zstd
build_zstd
20 changes: 17 additions & 3 deletions lib/index.js
Original file line number Diff line number Diff line change
@@ -1,16 +1,30 @@
const { compress: _compress, decompress: _decompress } = require('bindings')('zstd');
const zstd = require('bindings')('zstd');
addaleax marked this conversation as resolved.
Show resolved Hide resolved
const { promisify } = require('util');

const _compress = promisify(zstd.compress);
const _decompress = promisify(zstd.decompress);
// Error objects created via napi don't have JS stacks; wrap them so .stack is present
// https://github.com/nodejs/node/issues/25318#issuecomment-451068073

exports.compress = async function compress(data) {
exports.compress = async function compress(data, compressionLevel) {
if (!Buffer.isBuffer(data)) {
throw new TypeError(`parameter 'data' must be a Buffer.`);
}

if (compressionLevel != null && typeof compressionLevel !== 'number') {
throw new TypeError(`parameter 'compressionLevel' must be a number.`);
}

try {
return await _compress(data);
return await _compress(data, compressionLevel ?? 3);
} catch (e) {
throw new Error(`zstd: ${e.message}`);
}
};
exports.decompress = async function decompress(data) {
if (!Buffer.isBuffer(data)) {
throw new TypeError(`parameter 'data' must be a Buffer.`);
}
nbbeeken marked this conversation as resolved.
Show resolved Hide resolved
try {
return await _decompress(data);
} catch (e) {
Expand Down
Loading