Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Arrow IPC serializers and parsers #968

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# plumber (development version)

* Adds support for [Arrow IPC Streams](https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc) (@josiahparry)
gadenbuie marked this conversation as resolved.
Show resolved Hide resolved
* Added support for graphic devices provided by ragg and svglite (@thomasp85
#964)

Expand Down
38 changes: 25 additions & 13 deletions R/parse-body.R
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,17 @@ parser_feather <- function(...) {
})
}

#' @describeIn parsers Arrow IPC parser. See [arrow::read_ipc_stream()] for more details.
#' @export
parser_arrow_ipc <- function(...) {
Copy link
Member

@gadenbuie gadenbuie Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if this should be named parser_arrow_ipc_stream() or something similar? From Read Arrow IPC stream format — read_ipc_stream • Arrow R Package it seems that there are two IPC formats, "stream" and "file":

Apache Arrow defines two formats for serializing data for interprocess communication (IPC): a "stream" format and a "file" format, known as Feather.

Since the "file" format is synonymous with "feather" (which already has parse_feather()), my take-away is that stream is an important aspect we should include in the name. I also like trying to match the naming of the underlying function (arrow::{read,write}_ipc_stream()) while staying within plumber's naming patterns.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gocha! That's a good point. I believe though that Feather specifically refers to the V1 format. Then V2 is the IPC file—though I think it is a bit of a vagary there.

So, to be crystal clear before merge:

  • Change parser_arrow_ipc() to parser_arrow_ipc_stream()

Copy link
Member

@gadenbuie gadenbuie Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah it sounds like there some room for confusion around which variant of IPC this is that would be cleared up by parser_arrow_ipc_stream(), so I'm in favor of making that change before merging. Otherwise the PR looks great, thanks @JosiahParry!

parser_read_file(function(tmpfile) {
if (!requireNamespace("arrow", quietly = TRUE)) {
stop("`arrow` must be installed for `parser_arrow_ipc` to work")
}
arrow::read_ipc_stream(tmpfile, ...)
})
}

#' @describeIn parsers parquet parser. See [arrow::read_parquet()] for more details.
#' @export
parser_parquet <- function(...) {
Expand Down Expand Up @@ -572,20 +583,21 @@ parser_none <- function() {

register_parsers_onLoad <- function() {
# parser alias names for plumbing
register_parser("csv", parser_csv, fixed = c("application/csv", "application/x-csv", "text/csv", "text/x-csv"))
register_parser("json", parser_json, fixed = c("application/json", "text/json"))
register_parser("multi", parser_multi, fixed = "multipart/form-data", regex = "^multipart/")
register_parser("octet", parser_octet, fixed = "application/octet-stream")
register_parser("form", parser_form, fixed = "application/x-www-form-urlencoded")
register_parser("rds", parser_rds, fixed = "application/rds")
register_parser("feather", parser_feather, fixed = c("application/vnd.apache.arrow.file", "application/feather"))
register_parser("parquet", parser_parquet, fixed = "application/vnd.apache.parquet")
register_parser("text", parser_text, fixed = "text/plain", regex = "^text/")
register_parser("tsv", parser_tsv, fixed = c("application/tab-separated-values", "text/tab-separated-values"))
register_parser("csv", parser_csv, fixed = c("application/csv", "application/x-csv", "text/csv", "text/x-csv"))
register_parser("json", parser_json, fixed = c("application/json", "text/json"))
register_parser("multi", parser_multi, fixed = "multipart/form-data", regex = "^multipart/")
register_parser("octet", parser_octet, fixed = "application/octet-stream")
register_parser("form", parser_form, fixed = "application/x-www-form-urlencoded")
register_parser("rds", parser_rds, fixed = "application/rds")
register_parser("feather", parser_feather, fixed = c("application/vnd.apache.arrow.file", "application/feather"))
register_parser("arrow_ipc", parser_arrow_ipc, fixed = c("application/vnd.apache.arrow.stream"))
register_parser("parquet", parser_parquet, fixed = "application/vnd.apache.parquet")
register_parser("text", parser_text, fixed = "text/plain", regex = "^text/")
register_parser("tsv", parser_tsv, fixed = c("application/tab-separated-values", "text/tab-separated-values"))
# yaml types: https://stackoverflow.com/a/38000954/591574
register_parser("yaml", parser_yaml, fixed = c("text/vnd.yaml", "application/yaml", "application/x-yaml", "text/yaml", "text/x-yaml"))
register_parser("none", parser_none, regex = "*")
register_parser("geojson", parser_geojson, fixed = c("application/geo+json", "application/vdn.geo+json"))
register_parser("yaml", parser_yaml, fixed = c("text/vnd.yaml", "application/yaml", "application/x-yaml", "text/yaml", "text/x-yaml"))
register_parser("none", parser_none, regex = "*")
register_parser("geojson", parser_geojson, fixed = c("application/geo+json", "application/vdn.geo+json"))

parser_all <- function() {
stop("This function should never be called. It should be handled by `make_parser('all')`")
Expand Down
16 changes: 16 additions & 0 deletions R/serializer.R
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,21 @@ serializer_feather <- function(type = "application/vnd.apache.arrow.file") {
)
}

#' @describeIn serializers Arrow IPC serializer. See also: [arrow::write_ipc_stream()]
#' @export
serializer_arrow_ipc <- function(type = "application/vnd.apache.arrow.stream") {
if (!requireNamespace("arrow", quietly = TRUE)) {
stop("`arrow` must be installed for `serializer_arrow_ipc` to work")
}
serializer_write_file(
fileext = "",
type = type,
write_fn = function(val, tmpfile) {
arrow::write_ipc_stream(val, tmpfile)
}
)
}

#' @describeIn serializers parquet serializer. See also: [arrow::write_parquet()]
#' @export
serializer_parquet <- function(type = "application/vnd.apache.parquet") {
Expand Down Expand Up @@ -692,6 +707,7 @@ add_serializers_onLoad <- function() {
register_serializer("csv", serializer_csv)
register_serializer("tsv", serializer_tsv)
register_serializer("feather", serializer_feather)
register_serializer("arrow_ipc", serializer_arrow_ipc)
register_serializer("parquet", serializer_parquet)
register_serializer("yaml", serializer_yaml)
register_serializer("geojson", serializer_geojson)
Expand Down
20 changes: 20 additions & 0 deletions tests/testthat/test-parse-body.R
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,26 @@ test_that("Test feather parser", {
expect_equal(parsed, r_object)
})

test_that("Test Arrow IPC parser", {
skip_if_not_installed("arrow")

tmp <- tempfile()
on.exit({
file.remove(tmp)
}, add = TRUE)

r_object <- iris
arrow::write_ipc_stream(r_object, tmp)
val <- readBin(tmp, "raw", 10000)

parsed <- parse_body(val, "application/vnd.apache.arrow.stream", make_parser("arrow_ipc"))
# convert from feather tibble to data.frame
parsed <- as.data.frame(parsed, stringsAsFactors = FALSE)
attr(parsed, "spec") <- NULL

expect_equal(parsed, r_object)
})

test_that("Test parquet parser", {
skip_if_not_installed("arrow")

Expand Down
41 changes: 41 additions & 0 deletions tests/testthat/test-serializer-arrow-ipc.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
context("Arrow IPC serializer")

test_that("Arrow IPC serializes properly", {
skip_if_not_installed("arrow")

d <- data.frame(a=1, b=2, c="hi")
val <- serializer_arrow_ipc()(d, data.frame(), PlumberResponse$new(), stop)
expect_equal(val$status, 200L)
expect_equal(val$headers$`Content-Type`, "application/vnd.apache.arrow.stream")

# can test by doing a full round trip if we believe the parser works via `test-parse-body.R`
parsed <- parse_body(val$body, "application/vnd.apache.arrow.stream", make_parser("arrow_ipc"))
# convert from feather tibble to data.frame
parsed <- as.data.frame(parsed, stringsAsFactors = FALSE)
attr(parsed, "spec") <- NULL

expect_equal(parsed, d)
})

test_that("Errors call error handler", {
skip_if_not_installed("arrow")

errors <- 0
errHandler <- function(req, res, err){
errors <<- errors + 1
}

expect_equal(errors, 0)
serializer_feather()(parse(text="hi"), data.frame(), PlumberResponse$new("csv"), errorHandler = errHandler)
expect_equal(errors, 1)
})

test_that("Errors are rendered correctly with debug TRUE", {
skip_if_not_installed("arrow")

pr <- pr() %>% pr_get("/", function() stop("myerror"), serializer = serializer_feather()) %>% pr_set_debug(TRUE)
capture.output(res <- pr$serve(make_req(pr = pr), PlumberResponse$new("csv")))

expect_match(res$body, "Error in (function () : myerror", fixed = TRUE)
})

1 change: 1 addition & 0 deletions tests/testthat/test-serializer-feather.R
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@ test_that("Errors are rendered correctly with debug TRUE", {

expect_match(res$body, "Error in (function () : myerror", fixed = TRUE)
})

Loading