Skip to content

Commit

Permalink
[I/O] Port to pyarrow filesystems by default. (#942)
Browse files Browse the repository at this point in the history
This PR ports to our filesystem inference to create `pyarrow`
`FileSystem` instances by default, only falling back to fsspec if the
URI protocol is not supported. We also change our reading narrow waist
to be around pyarrow `NativeFile` handles.
  • Loading branch information
clarkzinzow authored May 22, 2023
1 parent 69f4f0a commit e50b3ac
Show file tree
Hide file tree
Showing 3 changed files with 248 additions and 56 deletions.
192 changes: 189 additions & 3 deletions daft/filesystem.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from __future__ import annotations

import dataclasses
import pathlib
import sys
import urllib.parse
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urlparse

if sys.version_info < (3, 8):
from typing_extensions import Literal
Expand All @@ -15,10 +16,38 @@

import fsspec
import pyarrow as pa
from fsspec.implementations.http import HTTPFileSystem
from fsspec.registry import get_filesystem_class
from loguru import logger
from pyarrow.fs import (
FileSystem,
FSSpecHandler,
PyFileSystem,
_resolve_filesystem_and_path,
)

from daft.datasources import ParquetSourceInfo, SourceInfo

_CACHED_FSES: dict[str, FileSystem] = {}


def _get_fs_from_cache(protocol: str) -> FileSystem | None:
"""
Get an instantiated pyarrow filesystem from the cache based on the URI protocol.
Returns None if no such cache entry exists.
"""
global _CACHED_FSES

return _CACHED_FSES.get(protocol)


def _put_fs_in_cache(protocol: str, fs: FileSystem) -> None:
"""Put pyarrow filesystem in cache under provided protocol."""
global _CACHED_FSES

_CACHED_FSES[protocol] = fs


@dataclasses.dataclass(frozen=True)
class ListingInfo:
Expand Down Expand Up @@ -71,18 +100,175 @@ def get_filesystem(protocol: str, **kwargs) -> fsspec.AbstractFileSystem:


def get_protocol_from_path(path: str) -> str:
parsed_scheme = urlparse(path).scheme
parsed_scheme = urllib.parse.urlparse(path, allow_fragments=False).scheme
if parsed_scheme == "" or parsed_scheme is None:
return "file"
return parsed_scheme


_CANONICAL_PROTOCOLS = {
"gcs": "gs",
"https": "http",
"s3a": "s3",
"ssh": "sftp",
"arrow_hdfs": "hdfs",
"az": "abfs",
"blockcache": "cached",
"jlab": "jupyter",
}


def canonicalize_protocol(protocol: str) -> str:
"""
Return the canonical protocol from the provided protocol, such that there's a 1:1
mapping between protocols and pyarrow/fsspec filesystem implementations.
"""
return _CANONICAL_PROTOCOLS.get(protocol, protocol)


def get_filesystem_from_path(path: str, **kwargs) -> fsspec.AbstractFileSystem:
protocol = get_protocol_from_path(path)
fs = get_filesystem(protocol, **kwargs)
return fs


def _resolve_paths_and_filesystem(
paths: str | pathlib.Path | list[str],
filesystem: FileSystem | fsspec.AbstractFileSystem | None = None,
) -> tuple[list[str], FileSystem]:
"""
Resolves and normalizes all provided paths, infers a filesystem from the
paths, and ensures that all paths use the same filesystem.
Args:
paths: A single file/directory path or a list of file/directory paths.
A list of paths can contain both files and directories.
filesystem: The filesystem implementation that should be used for
reading these files. If None, a filesystem will be inferred. If not
None, the provided filesystem will still be validated against all
filesystems inferred from the provided paths to ensure
compatibility.
"""
if isinstance(paths, pathlib.Path):
paths = str(paths)
if isinstance(paths, str):
paths = [paths]
assert isinstance(paths, list), paths
assert all(isinstance(p, str) for p in paths), paths
assert len(paths) > 0, paths

# Ensure that protocols for all paths are consistent, i.e. that they would map to the
# same filesystem.
protocols = {get_protocol_from_path(path) for path in paths}
canonicalized_protocols = {canonicalize_protocol(protocol) for protocol in protocols}
if len(canonicalized_protocols) > 1:
raise ValueError(
"All paths must have the same canonical protocol to ensure that they are all "
f"hitting the same storage backend, but got protocols {protocols} with canonical "
f"protocols - {canonicalized_protocols} and full paths - {paths}"
)

# Canonical protocol shared by all paths.
protocol = next(iter(canonicalized_protocols))

if filesystem is None:
# Try to get filesystem from protocol -> fs cache.
filesystem = _get_fs_from_cache(protocol)
elif isinstance(filesystem, fsspec.AbstractFileSystem):
# Wrap fsspec filesystems so they are valid pyarrow filesystems.
filesystem = PyFileSystem(FSSpecHandler(filesystem))

# Resolve path and filesystem for the first path.
# We use this first resolved filesystem for validation on all other paths.
resolved_path, resolved_filesystem = _resolve_path_and_filesystem(paths[0], filesystem)

if filesystem is None:
filesystem = resolved_filesystem
# Put resolved filesystem in cache under these paths' canonical protocol.
_put_fs_in_cache(protocol, filesystem)

# filesystem should be a non-None pyarrow FileSystem at this point, either
# user-provided, taken from the cache, or inferred from the first path.
assert filesystem is not None and isinstance(filesystem, FileSystem)

# Resolve all other paths and validate with the user-provided/cached/inferred filesystem.
resolved_paths = [resolved_path]
for path in paths[1:]:
resolved_path, _ = _resolve_path_and_filesystem(path, filesystem)
resolved_paths.append(resolved_path)

return resolved_paths, filesystem


def _resolve_path_and_filesystem(
path: str,
filesystem: FileSystem | fsspec.AbstractFileSystem | None,
) -> tuple[str, FileSystem]:
"""
Resolves and normalizes the provided path, infers a filesystem from the
path, and ensures that the inferred filesystem is compatible with the passed
filesystem, if provided.
Args:
path: A single file/directory path.
filesystem: The filesystem implementation that should be used for
reading these files. If None, a filesystem will be inferred. If not
None, the provided filesystem will still be validated against the
filesystem inferred from the provided path to ensure compatibility.
"""
# Use pyarrow utility to resolve filesystem and this particular path.
# If a non-None filesystem is provided to this utility, it will ensure that
# it is compatible with the provided path.
# A non-None filesystem will be provided if:
# - a user-provided filesystem was passed to _resolve_paths_and_filesystem.
# - a filesystem for the paths' protocol exists in the protocol -> fs cache.
# - a filesystem was resolved for a previous path; i.e., filesystem is
# guaranteed to be non-None for all but the first path.
try:
resolved_filesystem, resolved_path = _resolve_filesystem_and_path(path, filesystem)
except pa.lib.ArrowInvalid as e:
if "Unrecognized filesystem type in URI" in str(e):
# Fall back to fsspec.
protocol = get_protocol_from_path(path)
logger.debug(f"pyarrow doesn't support paths with protocol {protocol}, falling back to fsspec.")
try:
fsspec_fs_cls = get_filesystem_class(protocol)
except ValueError:
raise ValueError("pyarrow and fsspec don't recognize protocol {protocol} for path {path}.")
fsspec_fs = fsspec_fs_cls()
resolved_filesystem, resolved_path = _resolve_filesystem_and_path(path, fsspec_fs)
else:
raise

# If filesystem is fsspec HTTPFileSystem, the protocol/scheme of paths
# should not be unwrapped/removed, because HTTPFileSystem expects full file
# paths including protocol/scheme. This is different behavior compared to
# pyarrow filesystems.
if not _is_http_fs(resolved_filesystem):
resolved_path = _unwrap_protocol(resolved_path)

resolved_path = resolved_filesystem.normalize_path(resolved_path)
return resolved_path, resolved_filesystem


def _is_http_fs(fs: FileSystem) -> bool:
"""Returns whether the provided pyarrow filesystem is an HTTP filesystem."""
return (
isinstance(fs, PyFileSystem)
and isinstance(fs.handler, FSSpecHandler)
and isinstance(fs.handler.fs, HTTPFileSystem)
)


def _unwrap_protocol(path):
"""
Slice off any protocol prefixes on path.
"""
parsed = urllib.parse.urlparse(path, allow_fragments=False) # support '#' in path
query = "?" + parsed.query if parsed.query else "" # support '?' in path
return parsed.netloc + parsed.path + query


###
# File globbing
###
Expand All @@ -92,7 +278,7 @@ def _ensure_path_protocol(protocol: str, returned_path: str) -> str:
"""This function adds the protocol that fsspec strips from returned results"""
if protocol == "file":
return returned_path
parsed_scheme = urlparse(returned_path).scheme
parsed_scheme = urllib.parse.urlparse(returned_path).scheme
if parsed_scheme == "" or parsed_scheme is None:
return f"{protocol}://{returned_path}"
return returned_path
Expand Down
95 changes: 46 additions & 49 deletions daft/table/table_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import pathlib
from collections.abc import Generator
from typing import IO, Union
from urllib.parse import urlparse
from uuid import uuid4

import fsspec
Expand All @@ -13,9 +12,10 @@
from pyarrow import dataset as pads
from pyarrow import json as pajson
from pyarrow import parquet as papq
from pyarrow.fs import FileSystem

from daft.expressions import ExpressionsProjection
from daft.filesystem import get_filesystem_from_path
from daft.filesystem import _resolve_paths_and_filesystem
from daft.runners.partitioning import (
vPartitionParseCSVOptions,
vPartitionReadOptions,
Expand All @@ -26,33 +26,23 @@
FileInput = Union[pathlib.Path, str, IO[bytes]]


# The number of rows to read per batch. This is sized to generate 10MiB batches
# for rows about 1KiB in size.
_PARQUET_FRAGMENT_BATCH_SIZE = 100000


@contextlib.contextmanager
def _get_file(
def _open_stream(
file: FileInput,
fs: fsspec.AbstractFileSystem | None,
) -> Generator[FileInput, None, None]:
"""Helper method to return an appropriate file handle
1. If `fs` is not None, we fall-back onto the provided fsspec FileSystem and return an fsspec file handle
2. If `file` is a pathlib, we stringify it
3. If `file` is a string, we leave it unmodified
"""
if isinstance(file, pathlib.Path):
file = str(file)

if isinstance(file, str):
# Use provided fsspec filesystem, slow but necessary for backward-compatibility
if fs is not None:
with fs.open(file, compression="infer") as f:
yield f
# Corner-case to handle `http` filepaths using fsspec because PyArrow cannot handle it
elif urlparse(file).scheme in {"http", "https"}:
fsspec_fs = get_filesystem_from_path(file)
with fsspec_fs.open(file, compression="infer") as f:
yield f
# Safely yield a string path, which can be correctly interpreted by PyArrow filesystem
else:
yield file
fs: FileSystem | fsspec.AbstractFileSystem | None,
) -> Generator[pa.NativeFile, None, None]:
"""Opens the provided file for reading, yield a pyarrow file handle."""
if isinstance(file, (pathlib.Path, str)):
paths, fs = _resolve_paths_and_filesystem(file, fs)
assert len(paths) == 1
path = paths[0]
with fs.open_input_stream(path) as f:
yield f
else:
yield file

Expand All @@ -73,7 +63,7 @@ def read_json(
Returns:
Table: Parsed Table from JSON
"""
with _get_file(file, fs) as f:
with _open_stream(file, fs) as f:
table = pajson.read_json(f)

if read_options.column_names is not None:
Expand Down Expand Up @@ -102,28 +92,35 @@ def read_parquet(
Returns:
Table: Parsed Table from Parquet
"""
with _get_file(file, fs) as f:
pqf = papq.ParquetFile(f)
# If no rows required, we manually construct an empty table with the right schema
if read_options.num_rows == 0:
arrow_schema = pqf.metadata.schema.to_arrow_schema()
table = pa.Table.from_arrays([pa.array([], type=field.type) for field in arrow_schema], schema=arrow_schema)
elif read_options.num_rows is not None:
# Read the file by rowgroup.
tables = []
rows_read = 0
for i in range(pqf.metadata.num_row_groups):
tables.append(pqf.read_row_group(i, columns=read_options.column_names))
rows_read += len(tables[i])
if not isinstance(file, (str, pathlib.Path)):
# BytesIO path.
return Table.from_arrow(papq.read_table(file, columns=read_options.column_names))

paths, fs = _resolve_paths_and_filesystem(file, fs)
assert len(paths) == 1
path = paths[0]
fragment = pads.ParquetFileFormat().make_fragment(path, filesystem=fs)
schema = fragment.metadata.schema.to_arrow_schema()
# If no rows required, we manually construct an empty table with the right schema
if read_options.num_rows == 0:
table = pa.Table.from_arrays([pa.array([], type=field.type) for field in schema], schema=schema)
elif read_options.num_rows is not None:
# Read the file by row group.
frags = fragment.split_by_row_group()
tables = []
rows_read = 0
for frag in frags:
for batch in frag.to_batches(columns=read_options.column_names, batch_size=_PARQUET_FRAGMENT_BATCH_SIZE):
tables.append(pa.Table.from_batches([batch], schema=schema))
rows_read += len(batch)
if rows_read >= read_options.num_rows:
break
table = pa.concat_tables(tables)
table = table.slice(length=read_options.num_rows)
else:
table = papq.read_table(
f,
columns=read_options.column_names,
)
if rows_read >= read_options.num_rows:
break
table = pa.concat_tables(tables)
table = table.slice(length=read_options.num_rows)
else:
table = fragment.to_table(columns=read_options.column_names)

return Table.from_arrow(table)

Expand Down Expand Up @@ -158,7 +155,7 @@ def read_csv(
skip_header_row = full_column_names is not None and csv_options.has_headers
pyarrow_skip_rows_after_names = (1 if skip_header_row else 0) + csv_options.skip_rows_after_header

with _get_file(file, fs) as f:
with _open_stream(file, fs) as f:
table = pacsv.read_csv(
f,
parse_options=pacsv.ParseOptions(
Expand Down
Loading

0 comments on commit e50b3ac

Please sign in to comment.