From f137feb0afa18974a2d064566ada6995f34a76d1 Mon Sep 17 00:00:00 2001 From: Josiah Parry Date: Mon, 16 Dec 2024 17:07:09 -0800 Subject: [PATCH 1/4] add arrow ipc parser and serializers --- R/parse-body.R | 38 +++++++++++++------- R/serializer.R | 16 +++++++++ tests/testthat/test-parse-body.R | 20 +++++++++++ tests/testthat/test-serializer-arrow-ipc.R | 41 ++++++++++++++++++++++ tests/testthat/test-serializer-feather.R | 1 + 5 files changed, 103 insertions(+), 13 deletions(-) create mode 100644 tests/testthat/test-serializer-arrow-ipc.R diff --git a/R/parse-body.R b/R/parse-body.R index a3fc6b53..e9ac9ace 100644 --- a/R/parse-body.R +++ b/R/parse-body.R @@ -491,6 +491,17 @@ parser_feather <- function(...) { }) } +#' @describeIn parsers Arrow IPC parser. See [arrow::read_ipc_stream()] for more details. +#' @export +parser_arrow_ipc <- function(...) { + parser_read_file(function(tmpfile) { + if (!requireNamespace("arrow", quietly = TRUE)) { + stop("`arrow` must be installed for `parser_arrow_ipc` to work") + } + arrow::read_ipc_stream(tmpfile, ...) + }) +} + #' @describeIn parsers parquet parser. See [arrow::read_parquet()] for more details. #' @export parser_parquet <- function(...) { @@ -572,20 +583,21 @@ parser_none <- function() { register_parsers_onLoad <- function() { # parser alias names for plumbing - register_parser("csv", parser_csv, fixed = c("application/csv", "application/x-csv", "text/csv", "text/x-csv")) - register_parser("json", parser_json, fixed = c("application/json", "text/json")) - register_parser("multi", parser_multi, fixed = "multipart/form-data", regex = "^multipart/") - register_parser("octet", parser_octet, fixed = "application/octet-stream") - register_parser("form", parser_form, fixed = "application/x-www-form-urlencoded") - register_parser("rds", parser_rds, fixed = "application/rds") - register_parser("feather", parser_feather, fixed = c("application/vnd.apache.arrow.file", "application/feather")) - register_parser("parquet", parser_parquet, fixed = "application/vnd.apache.parquet") - register_parser("text", parser_text, fixed = "text/plain", regex = "^text/") - register_parser("tsv", parser_tsv, fixed = c("application/tab-separated-values", "text/tab-separated-values")) + register_parser("csv", parser_csv, fixed = c("application/csv", "application/x-csv", "text/csv", "text/x-csv")) + register_parser("json", parser_json, fixed = c("application/json", "text/json")) + register_parser("multi", parser_multi, fixed = "multipart/form-data", regex = "^multipart/") + register_parser("octet", parser_octet, fixed = "application/octet-stream") + register_parser("form", parser_form, fixed = "application/x-www-form-urlencoded") + register_parser("rds", parser_rds, fixed = "application/rds") + register_parser("feather", parser_feather, fixed = c("application/vnd.apache.arrow.file", "application/feather")) + register_parser("arrow_ipc", parser_arrow_ipc, fixed = c("application/vnd.apache.arrow.stream")) + register_parser("parquet", parser_parquet, fixed = "application/vnd.apache.parquet") + register_parser("text", parser_text, fixed = "text/plain", regex = "^text/") + register_parser("tsv", parser_tsv, fixed = c("application/tab-separated-values", "text/tab-separated-values")) # yaml types: https://stackoverflow.com/a/38000954/591574 - register_parser("yaml", parser_yaml, fixed = c("text/vnd.yaml", "application/yaml", "application/x-yaml", "text/yaml", "text/x-yaml")) - register_parser("none", parser_none, regex = "*") - register_parser("geojson", parser_geojson, fixed = c("application/geo+json", "application/vdn.geo+json")) + register_parser("yaml", parser_yaml, fixed = c("text/vnd.yaml", "application/yaml", "application/x-yaml", "text/yaml", "text/x-yaml")) + register_parser("none", parser_none, regex = "*") + register_parser("geojson", parser_geojson, fixed = c("application/geo+json", "application/vdn.geo+json")) parser_all <- function() { stop("This function should never be called. It should be handled by `make_parser('all')`") diff --git a/R/serializer.R b/R/serializer.R index 2617a459..0d5158cf 100644 --- a/R/serializer.R +++ b/R/serializer.R @@ -291,6 +291,21 @@ serializer_feather <- function(type = "application/vnd.apache.arrow.file") { ) } +#' @describeIn serializers Arrow IPC serializer. See also: [arrow::write_ipc_stream()] +#' @export +serializer_arrow_ipc <- function(type = "application/vnd.apache.arrow.stream") { + if (!requireNamespace("arrow", quietly = TRUE)) { + stop("`arrow` must be installed for `serializer_arrow_ipc` to work") + } + serializer_write_file( + fileext = "", + type = type, + write_fn = function(val, tmpfile) { + arrow::write_ipc_stream(val, tmpfile) + } + ) +} + #' @describeIn serializers parquet serializer. See also: [arrow::write_parquet()] #' @export serializer_parquet <- function(type = "application/vnd.apache.parquet") { @@ -692,6 +707,7 @@ add_serializers_onLoad <- function() { register_serializer("csv", serializer_csv) register_serializer("tsv", serializer_tsv) register_serializer("feather", serializer_feather) + register_serializer("arrow_ipc", serializer_arrow_ipc) register_serializer("parquet", serializer_parquet) register_serializer("yaml", serializer_yaml) register_serializer("geojson", serializer_geojson) diff --git a/tests/testthat/test-parse-body.R b/tests/testthat/test-parse-body.R index d85712e5..2dc495bf 100644 --- a/tests/testthat/test-parse-body.R +++ b/tests/testthat/test-parse-body.R @@ -109,6 +109,26 @@ test_that("Test feather parser", { expect_equal(parsed, r_object) }) +test_that("Test Arrow IPC parser", { + skip_if_not_installed("arrow") + + tmp <- tempfile() + on.exit({ + file.remove(tmp) + }, add = TRUE) + + r_object <- iris + arrow::write_ipc_stream(r_object, tmp) + val <- readBin(tmp, "raw", 10000) + + parsed <- parse_body(val, "application/vnd.apache.arrow.stream", make_parser("arrow_ipc")) + # convert from feather tibble to data.frame + parsed <- as.data.frame(parsed, stringsAsFactors = FALSE) + attr(parsed, "spec") <- NULL + + expect_equal(parsed, r_object) +}) + test_that("Test parquet parser", { skip_if_not_installed("arrow") diff --git a/tests/testthat/test-serializer-arrow-ipc.R b/tests/testthat/test-serializer-arrow-ipc.R new file mode 100644 index 00000000..839e3bd2 --- /dev/null +++ b/tests/testthat/test-serializer-arrow-ipc.R @@ -0,0 +1,41 @@ +context("Arrow IPC serializer") + +test_that("Arrow IPC serializes properly", { + skip_if_not_installed("arrow") + + d <- data.frame(a=1, b=2, c="hi") + val <- serializer_arrow_ipc()(d, data.frame(), PlumberResponse$new(), stop) + expect_equal(val$status, 200L) + expect_equal(val$headers$`Content-Type`, "application/vnd.apache.arrow.stream") + + # can test by doing a full round trip if we believe the parser works via `test-parse-body.R` + parsed <- parse_body(val$body, "application/vnd.apache.arrow.stream", make_parser("arrow_ipc")) + # convert from feather tibble to data.frame + parsed <- as.data.frame(parsed, stringsAsFactors = FALSE) + attr(parsed, "spec") <- NULL + + expect_equal(parsed, d) +}) + +test_that("Errors call error handler", { + skip_if_not_installed("arrow") + + errors <- 0 + errHandler <- function(req, res, err){ + errors <<- errors + 1 + } + + expect_equal(errors, 0) + serializer_feather()(parse(text="hi"), data.frame(), PlumberResponse$new("csv"), errorHandler = errHandler) + expect_equal(errors, 1) +}) + +test_that("Errors are rendered correctly with debug TRUE", { + skip_if_not_installed("arrow") + + pr <- pr() %>% pr_get("/", function() stop("myerror"), serializer = serializer_feather()) %>% pr_set_debug(TRUE) + capture.output(res <- pr$serve(make_req(pr = pr), PlumberResponse$new("csv"))) + + expect_match(res$body, "Error in (function () : myerror", fixed = TRUE) +}) + diff --git a/tests/testthat/test-serializer-feather.R b/tests/testthat/test-serializer-feather.R index cfa4bc59..8d5b4752 100644 --- a/tests/testthat/test-serializer-feather.R +++ b/tests/testthat/test-serializer-feather.R @@ -38,3 +38,4 @@ test_that("Errors are rendered correctly with debug TRUE", { expect_match(res$body, "Error in (function () : myerror", fixed = TRUE) }) + From f152b6fa88b4d4cc5a6baf8300847d44f4d2f66c Mon Sep 17 00:00:00 2001 From: Josiah Parry Date: Mon, 16 Dec 2024 17:08:29 -0800 Subject: [PATCH 2/4] update news --- NEWS.md | 1 + 1 file changed, 1 insertion(+) diff --git a/NEWS.md b/NEWS.md index 067e7c4e..29d5e307 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,5 +1,6 @@ # plumber (development version) +* Adds support for [Arrow IPC Streams](https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc) (@josiahparry) * Added support for graphic devices provided by ragg and svglite (@thomasp85 #964) From 7012cbcdd7132d1781017719035011d5fa9af170 Mon Sep 17 00:00:00 2001 From: Barret Schloerke Date: Fri, 10 Jan 2025 13:22:53 -0500 Subject: [PATCH 3/4] document --- NAMESPACE | 2 ++ man/parsers.Rd | 5 +++++ man/serializers.Rd | 5 +++++ 3 files changed, 12 insertions(+) diff --git a/NAMESPACE b/NAMESPACE index b72b0fc8..aa381523 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -26,6 +26,7 @@ export(include_md) export(include_rmd) export(is_plumber) export(options_plumber) +export(parser_arrow_ipc) export(parser_csv) export(parser_feather) export(parser_form) @@ -75,6 +76,7 @@ export(registered_serializers) export(serializer_agg_jpeg) export(serializer_agg_png) export(serializer_agg_tiff) +export(serializer_arrow_ipc) export(serializer_bmp) export(serializer_cat) export(serializer_content_type) diff --git a/man/parsers.Rd b/man/parsers.Rd index 52089bf3..db8bb39e 100644 --- a/man/parsers.Rd +++ b/man/parsers.Rd @@ -11,6 +11,7 @@ \alias{parser_read_file} \alias{parser_rds} \alias{parser_feather} +\alias{parser_arrow_ipc} \alias{parser_parquet} \alias{parser_octet} \alias{parser_multi} @@ -37,6 +38,8 @@ parser_rds(...) parser_feather(...) +parser_arrow_ipc(...) + parser_parquet(...) parser_octet() @@ -91,6 +94,8 @@ This parser should be used when reading from a file is required. \item \code{parser_feather()}: feather parser. See \code{\link[arrow:read_feather]{arrow::read_feather()}} for more details. +\item \code{parser_arrow_ipc()}: Arrow IPC parser. See \code{\link[arrow:read_ipc_stream]{arrow::read_ipc_stream()}} for more details. + \item \code{parser_parquet()}: parquet parser. See \code{\link[arrow:read_parquet]{arrow::read_parquet()}} for more details. \item \code{parser_octet()}: Octet stream parser. Returns the raw content. diff --git a/man/serializers.Rd b/man/serializers.Rd index 8345f8c2..26f7bb3d 100644 --- a/man/serializers.Rd +++ b/man/serializers.Rd @@ -12,6 +12,7 @@ \alias{serializer_geojson} \alias{serializer_rds} \alias{serializer_feather} +\alias{serializer_arrow_ipc} \alias{serializer_parquet} \alias{serializer_yaml} \alias{serializer_text} @@ -55,6 +56,8 @@ serializer_rds(version = "2", ascii = FALSE, ..., type = "application/rds") serializer_feather(type = "application/vnd.apache.arrow.file") +serializer_arrow_ipc(type = "application/vnd.apache.arrow.stream") + serializer_parquet(type = "application/vnd.apache.parquet") serializer_yaml(..., type = "text/x-yaml; charset=UTF-8") @@ -158,6 +161,8 @@ not have a \code{"raw"} type, then an error will be thrown. \item \code{serializer_feather()}: feather serializer. See also: \code{\link[arrow:write_feather]{arrow::write_feather()}} +\item \code{serializer_arrow_ipc()}: Arrow IPC serializer. See also: \code{\link[arrow:write_ipc_stream]{arrow::write_ipc_stream()}} + \item \code{serializer_parquet()}: parquet serializer. See also: \code{\link[arrow:write_parquet]{arrow::write_parquet()}} \item \code{serializer_yaml()}: YAML serializer. See also: \code{\link[yaml:as.yaml]{yaml::as.yaml()}} From 775c51141c7fbee620db8db9a9e097117a4f4010 Mon Sep 17 00:00:00 2001 From: Garrick Aden-Buie Date: Mon, 13 Jan 2025 09:58:38 -0500 Subject: [PATCH 4/4] docs: Add pr number to news item --- NEWS.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/NEWS.md b/NEWS.md index 29d5e307..9cdbcb8e 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,6 +1,6 @@ # plumber (development version) -* Adds support for [Arrow IPC Streams](https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc) (@josiahparry) +* Adds support for [Arrow IPC Streams](https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc) (@josiahparry #968) * Added support for graphic devices provided by ragg and svglite (@thomasp85 #964)