diff --git a/cpp/examples/basic_io.cpp b/cpp/examples/basic_io.cpp index 39bfc315cd..9fed0cee6a 100644 --- a/cpp/examples/basic_io.cpp +++ b/cpp/examples/basic_io.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021-2024, NVIDIA CORPORATION. + * Copyright (c) 2021-2025, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -124,7 +124,7 @@ int main() check(a[i] == b[i]); } } - kvikio::defaults::thread_pool_nthreads_reset(16); + kvikio::defaults::set_thread_pool_nthreads(16); { std::cout << std::endl; Timer timer; diff --git a/cpp/include/kvikio/defaults.hpp b/cpp/include/kvikio/defaults.hpp index 4334549d23..adb3a3b3f7 100644 --- a/cpp/include/kvikio/defaults.hpp +++ b/cpp/include/kvikio/defaults.hpp @@ -99,14 +99,14 @@ class defaults { [[nodiscard]] static CompatMode compat_mode(); /** - * @brief Reset the value of `kvikio::defaults::compat_mode()`. + * @brief Set the value of `kvikio::defaults::compat_mode()`. * * Changing the compatibility mode affects all the new FileHandles whose `compat_mode` argument is * not explicitly set, but it never affects existing FileHandles. * * @param compat_mode Compatibility mode. */ - static void compat_mode_reset(CompatMode compat_mode); + static void set_compat_mode(CompatMode compat_mode); /** * @brief Infer the `AUTO` compatibility mode from the system runtime. @@ -157,7 +157,7 @@ class defaults { * * Notice, it is not possible to change the default thread pool. KvikIO will * always use the same thread pool however it is possible to change number of - * threads in the pool (see `kvikio::default::thread_pool_nthreads_reset()`). + * threads in the pool (see `kvikio::default::set_thread_pool_nthreads()`). * * @return The default thread pool instance. */ @@ -166,7 +166,7 @@ class defaults { /** * @brief Get the number of threads in the default thread pool. * - * Set the default value using `kvikio::default::thread_pool_nthreads_reset()` or by + * Set the default value using `kvikio::default::set_thread_pool_nthreads()` or by * setting the `KVIKIO_NTHREADS` environment variable. If not set, the default value is 1. * * @return The number of threads. @@ -174,20 +174,19 @@ class defaults { [[nodiscard]] static unsigned int thread_pool_nthreads(); /** - * @brief Reset the number of threads in the default thread pool. Waits for all currently running + * @brief Set the number of threads in the default thread pool. Waits for all currently running * tasks to be completed, then destroys all threads in the pool and creates a new thread pool with * the new number of threads. Any tasks that were waiting in the queue before the pool was reset - * will then be executed by the new threads. If the pool was paused before resetting it, the new - * pool will be paused as well. + * will then be executed by the new threads. * * @param nthreads The number of threads to use. */ - static void thread_pool_nthreads_reset(unsigned int nthreads); + static void set_thread_pool_nthreads(unsigned int nthreads); /** * @brief Get the default task size used for parallel IO operations. * - * Set the default value using `kvikio::default::task_size_reset()` or by setting + * Set the default value using `kvikio::default::set_task_size()` or by setting * the `KVIKIO_TASK_SIZE` environment variable. If not set, the default value is 4 MiB. * * @return The default task size in bytes. @@ -195,11 +194,11 @@ class defaults { [[nodiscard]] static std::size_t task_size(); /** - * @brief Reset the default task size used for parallel IO operations. + * @brief Set the default task size used for parallel IO operations. * * @param nbytes The default task size in bytes. */ - static void task_size_reset(std::size_t nbytes); + static void set_task_size(std::size_t nbytes); /** * @brief Get the default GDS threshold, which is the minimum size to use GDS (in bytes). @@ -207,7 +206,7 @@ class defaults { * In order to improve performance of small IO, `.pread()` and `.pwrite()` implement a shortcut * that circumvent the threadpool and use the POSIX backend directly. * - * Set the default value using `kvikio::default::gds_threshold_reset()` or by setting the + * Set the default value using `kvikio::default::set_gds_threshold()` or by setting the * `KVIKIO_GDS_THRESHOLD` environment variable. If not set, the default value is 1 MiB. * * @return The default GDS threshold size in bytes. @@ -215,15 +214,15 @@ class defaults { [[nodiscard]] static std::size_t gds_threshold(); /** - * @brief Reset the default GDS threshold, which is the minimum size to use GDS (in bytes). + * @brief Set the default GDS threshold, which is the minimum size to use GDS (in bytes). * @param nbytes The default GDS threshold size in bytes. */ - static void gds_threshold_reset(std::size_t nbytes); + static void set_gds_threshold(std::size_t nbytes); /** * @brief Get the size of the bounce buffer used to stage data in host memory. * - * Set the value using `kvikio::default::bounce_buffer_size_reset()` or by setting the + * Set the value using `kvikio::default::set_bounce_buffer_size()` or by setting the * `KVIKIO_BOUNCE_BUFFER_SIZE` environment variable. If not set, the value is 16 MiB. * * @return The bounce buffer size in bytes. @@ -231,16 +230,16 @@ class defaults { [[nodiscard]] static std::size_t bounce_buffer_size(); /** - * @brief Reset the size of the bounce buffer used to stage data in host memory. + * @brief Set the size of the bounce buffer used to stage data in host memory. * * @param nbytes The bounce buffer size in bytes. */ - static void bounce_buffer_size_reset(std::size_t nbytes); + static void set_bounce_buffer_size(std::size_t nbytes); /** * @brief Get the maximum number of attempts per remote IO read. * - * Set the value using `kvikio::default::http_max_attempts_reset()` or by setting + * Set the value using `kvikio::default::set_http_max_attempts()` or by setting * the `KVIKIO_HTTP_MAX_ATTEMPTS` environment variable. If not set, the value is 3. * * @return The maximum number of remote IO reads to attempt before raising an @@ -249,16 +248,16 @@ class defaults { [[nodiscard]] static std::size_t http_max_attempts(); /** - * @brief Reset the maximum number of attempts per remote IO read. + * @brief Set the maximum number of attempts per remote IO read. * * @param attempts The maximum number of attempts to try before raising an error. */ - static void http_max_attempts_reset(std::size_t attempts); + static void set_http_max_attempts(std::size_t attempts); /** * @brief The list of HTTP status codes to retry. * - * Set the value using `kvikio::default::http_status_codes()` or by setting the + * Set the value using `kvikio::default::set_http_status_codes()` or by setting the * `KVIKIO_HTTP_STATUS_CODES` environment variable. If not set, the default value is * * - 429 @@ -272,11 +271,11 @@ class defaults { [[nodiscard]] static std::vector const& http_status_codes(); /** - * @brief Reset the list of HTTP status codes to retry. + * @brief Set the list of HTTP status codes to retry. * * @param status_codes The HTTP status codes to retry. */ - static void http_status_codes_reset(std::vector status_codes); + static void set_http_status_codes(std::vector status_codes); }; } // namespace kvikio diff --git a/cpp/src/defaults.cpp b/cpp/src/defaults.cpp index e0a908cf4d..bb0849d337 100644 --- a/cpp/src/defaults.cpp +++ b/cpp/src/defaults.cpp @@ -143,7 +143,7 @@ defaults* defaults::instance() } CompatMode defaults::compat_mode() { return instance()->_compat_mode; } -void defaults::compat_mode_reset(CompatMode compat_mode) { instance()->_compat_mode = compat_mode; } +void defaults::set_compat_mode(CompatMode compat_mode) { instance()->_compat_mode = compat_mode; } CompatMode defaults::infer_compat_mode_if_auto(CompatMode compat_mode) noexcept { @@ -169,7 +169,7 @@ BS_thread_pool& defaults::thread_pool() { return instance()->_thread_pool; } unsigned int defaults::thread_pool_nthreads() { return thread_pool().get_thread_count(); } -void defaults::thread_pool_nthreads_reset(unsigned int nthreads) +void defaults::set_thread_pool_nthreads(unsigned int nthreads) { if (nthreads == 0) { throw std::invalid_argument("number of threads must be a positive integer greater than zero"); @@ -179,7 +179,7 @@ void defaults::thread_pool_nthreads_reset(unsigned int nthreads) std::size_t defaults::task_size() { return instance()->_task_size; } -void defaults::task_size_reset(std::size_t nbytes) +void defaults::set_task_size(std::size_t nbytes) { if (nbytes == 0) { throw std::invalid_argument("task size must be a positive integer greater than zero"); @@ -189,11 +189,11 @@ void defaults::task_size_reset(std::size_t nbytes) std::size_t defaults::gds_threshold() { return instance()->_gds_threshold; } -void defaults::gds_threshold_reset(std::size_t nbytes) { instance()->_gds_threshold = nbytes; } +void defaults::set_gds_threshold(std::size_t nbytes) { instance()->_gds_threshold = nbytes; } std::size_t defaults::bounce_buffer_size() { return instance()->_bounce_buffer_size; } -void defaults::bounce_buffer_size_reset(std::size_t nbytes) +void defaults::set_bounce_buffer_size(std::size_t nbytes) { if (nbytes == 0) { throw std::invalid_argument( @@ -204,7 +204,7 @@ void defaults::bounce_buffer_size_reset(std::size_t nbytes) std::size_t defaults::http_max_attempts() { return instance()->_http_max_attempts; } -void defaults::http_max_attempts_reset(std::size_t attempts) +void defaults::set_http_max_attempts(std::size_t attempts) { if (attempts == 0) { throw std::invalid_argument("attempts must be a positive integer"); } instance()->_http_max_attempts = attempts; @@ -212,7 +212,7 @@ void defaults::http_max_attempts_reset(std::size_t attempts) std::vector const& defaults::http_status_codes() { return instance()->_http_status_codes; } -void defaults::http_status_codes_reset(std::vector status_codes) +void defaults::set_http_status_codes(std::vector status_codes) { instance()->_http_status_codes = std::move(status_codes); } diff --git a/docs/source/api.rst b/docs/source/api.rst index fd34367a00..b7907fae9e 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -31,8 +31,16 @@ Defaults .. autofunction:: compat_mode -.. autofunction:: compat_mode_reset +.. autofunction:: num_threads -.. autofunction:: get_num_threads +.. autofunction:: task_size -.. autofunction:: num_threads_reset +.. autofunction:: gds_threshold + +.. autofunction:: bounce_buffer_size + +.. autofunction:: http_status_codes + +.. autofunction:: http_max_attempts + +.. autofunction:: set diff --git a/docs/source/runtime_settings.rst b/docs/source/runtime_settings.rst index 5847c1ffbe..b488c10146 100644 --- a/docs/source/runtime_settings.rst +++ b/docs/source/runtime_settings.rst @@ -17,37 +17,37 @@ Under ``AUTO``, KvikIO falls back to the compatibility mode: * when running in Windows Subsystem for Linux (WSL). * when ``/run/udev`` isn't readable, which typically happens when running inside a docker image not launched with ``--volume /run/udev:/run/udev:ro``. -This setting can also be programmatically controlled by :py:func:`kvikio.defaults.set_compat_mode` and :py:func:`kvikio.defaults.compat_mode_reset`. +This setting can also be programmatically accessed using :py:func:`kvikio.defaults.compat_mode` (getter) and :py:func:`kvikio.defaults.set` (setter). Thread Pool ``KVIKIO_NTHREADS`` ------------------------------- KvikIO can use multiple threads for IO automatically. Set the environment variable ``KVIKIO_NTHREADS`` to the number of threads in the thread pool. If not set, the default value is 1. -This setting can also be controlled by :py:func:`kvikio.defaults.get_num_threads`, :py:func:`kvikio.defaults.num_threads_reset`, and :py:func:`kvikio.defaults.set_num_threads`. +This setting can also be accessed using :py:func:`kvikio.defaults.num_threads` (getter) and :py:func:`kvikio.defaults.set` (setter). Task Size ``KVIKIO_TASK_SIZE`` ------------------------------ KvikIO splits parallel IO operations into multiple tasks. Set the environment variable ``KVIKIO_TASK_SIZE`` to the maximum task size (in bytes). If not set, the default value is 4194304 (4 MiB). -This setting can also be controlled by :py:func:`kvikio.defaults.task_size`, :py:func:`kvikio.defaults.task_size_reset`, and :py:func:`kvikio.defaults.set_task_size`. +This setting can also be accessed using :py:func:`kvikio.defaults.task_size` (getter) and :py:func:`kvikio.defaults.set` (setter). GDS Threshold ``KVIKIO_GDS_THRESHOLD`` -------------------------------------- In order to improve performance of small IO, ``.pread()`` and ``.pwrite()`` implement a shortcut that circumvent the threadpool and use the POSIX backend directly. Set the environment variable ``KVIKIO_GDS_THRESHOLD`` to the minimum size (in bytes) to use GDS. If not set, the default value is 1048576 (1 MiB). -This setting can also be controlled by :py:func:`kvikio.defaults.gds_threshold`, :py:func:`kvikio.defaults.gds_threshold_reset`, and :py:func:`kvikio.defaults.set_gds_threshold`. +This setting can also be accessed using :py:func:`kvikio.defaults.gds_threshold` (getter) and :py:func:`kvikio.defaults.set` (setter). Size of the Bounce Buffer ``KVIKIO_BOUNCE_BUFFER_SIZE`` ------------------------------------------------------- KvikIO might have to use intermediate host buffers (one per thread) when copying between files and device memory. Set the environment variable ``KVIKIO_BOUNCE_BUFFER_SIZE`` to the size (in bytes) of these "bounce" buffers. If not set, the default value is 16777216 (16 MiB). -This setting can also be controlled by :py:func:`kvikio.defaults.bounce_buffer_size`, :py:func:`kvikio.defaults.bounce_buffer_size_reset`, and :py:func:`kvikio.defaults.set_bounce_buffer_size`. +This setting can also be accessed using :py:func:`kvikio.defaults.bounce_buffer_size` (getter) and :py:func:`kvikio.defaults.set` (setter). -#### HTTP Retries ------------------ +HTTP Retries ``KVIKIO_HTTP_STATUS_CODES``, ``KVIKIO_HTTP_MAX_ATTEMPTS`` +------------------------------------------------------------------------ -The behavior when a remote IO read returns a error can be controlled through the `KVIKIO_HTTP_STATUS_CODES` and `KVIKIO_HTTP_MAX_ATTEMPTS` environment variables. +The behavior when a remote I/O read returns an error can be controlled through the ``KVIKIO_HTTP_STATUS_CODES`` and ``KVIKIO_HTTP_MAX_ATTEMPTS`` environment variables. -`KVIKIO_HTTP_STATUS_CODES` controls the status codes to retry and can be controlled by :py:func:`kvikio.defaults.http_status_codes`, :py:func:`kvikio.defaults.http_status_codes_reset`, and :py:func:`kvikio.defaults.set_http_status_codes`. +KvikIO will retry a request should any of the HTTP status code in ``KVIKIO_HTTP_STATUS_CODES`` is received. The default values are ``429, 500, 502, 503, 504``. This setting can also be accessed using :py:func:`kvikio.defaults.http_status_codes` (getter) and :py:func:`kvikio.defaults.set` (setter). -`KVIKIO_HTTP_MAX_ATTEMPTS` controls the maximum number of attempts to make before throwing an exception and can be controlled by :py:func:`kvikio.defaults.http_max_attempts`, :py:func:`kvikio.defaults.http_max_attempts_reset`, and :py:func:`kvikio.defaults.set_http_max_attempts`. +The maximum number of attempts to make before throwing an exception is controlled by ``KVIKIO_HTTP_MAX_ATTEMPTS``. The default value is 3. This setting can also be accessed using :py:func:`kvikio.defaults.http_max_attempts` (getter) and :py:func:`kvikio.defaults.set` (setter). diff --git a/python/kvikio/kvikio/_lib/defaults.pyx b/python/kvikio/kvikio/_lib/defaults.pyx index 0770cb557a..16f208391f 100644 --- a/python/kvikio/kvikio/_lib/defaults.pyx +++ b/python/kvikio/kvikio/_lib/defaults.pyx @@ -15,79 +15,79 @@ cdef extern from "" namespace "kvikio" nogil: ON = 1 AUTO = 2 CompatMode cpp_compat_mode "kvikio::defaults::compat_mode"() except + - void cpp_compat_mode_reset \ - "kvikio::defaults::compat_mode_reset"(CompatMode compat_mode) except + + void cpp_set_compat_mode \ + "kvikio::defaults::set_compat_mode"(CompatMode compat_mode) except + unsigned int cpp_thread_pool_nthreads \ "kvikio::defaults::thread_pool_nthreads"() except + - void cpp_thread_pool_nthreads_reset \ - "kvikio::defaults::thread_pool_nthreads_reset" (unsigned int nthreads) except + + void cpp_set_thread_pool_nthreads \ + "kvikio::defaults::set_thread_pool_nthreads" (unsigned int nthreads) except + size_t cpp_task_size "kvikio::defaults::task_size"() except + - void cpp_task_size_reset "kvikio::defaults::task_size_reset"(size_t nbytes) except + + void cpp_set_task_size "kvikio::defaults::set_task_size"(size_t nbytes) except + size_t cpp_gds_threshold "kvikio::defaults::gds_threshold"() except + - void cpp_gds_threshold_reset \ - "kvikio::defaults::gds_threshold_reset"(size_t nbytes) except + + void cpp_set_gds_threshold \ + "kvikio::defaults::set_gds_threshold"(size_t nbytes) except + size_t cpp_bounce_buffer_size "kvikio::defaults::bounce_buffer_size"() except + - void cpp_bounce_buffer_size_reset \ - "kvikio::defaults::bounce_buffer_size_reset"(size_t nbytes) except + + void cpp_set_bounce_buffer_size \ + "kvikio::defaults::set_bounce_buffer_size"(size_t nbytes) except + size_t cpp_http_max_attempts "kvikio::defaults::http_max_attempts"() except + - void cpp_http_max_attempts_reset \ - "kvikio::defaults::http_max_attempts_reset"(size_t attempts) except + + void cpp_set_http_max_attempts \ + "kvikio::defaults::set_http_max_attempts"(size_t attempts) except + vector[int] cpp_http_status_codes "kvikio::defaults::http_status_codes"() except + - void cpp_http_status_codes_reset \ - "kvikio::defaults::http_status_codes_reset"(vector[int] status_codes) except + + void cpp_set_http_status_codes \ + "kvikio::defaults::set_http_status_codes"(vector[int] status_codes) except + def compat_mode() -> CompatMode: return cpp_compat_mode() -def compat_mode_reset(compat_mode: CompatMode) -> None: - cpp_compat_mode_reset(compat_mode) +def set_compat_mode(compat_mode: CompatMode) -> None: + cpp_set_compat_mode(compat_mode) def thread_pool_nthreads() -> int: return cpp_thread_pool_nthreads() -def thread_pool_nthreads_reset(nthreads: int) -> None: - cpp_thread_pool_nthreads_reset(nthreads) +def set_thread_pool_nthreads(nthreads: int) -> None: + cpp_set_thread_pool_nthreads(nthreads) def task_size() -> int: return cpp_task_size() -def task_size_reset(nbytes: int) -> None: - cpp_task_size_reset(nbytes) +def set_task_size(nbytes: int) -> None: + cpp_set_task_size(nbytes) def gds_threshold() -> int: return cpp_gds_threshold() -def gds_threshold_reset(nbytes: int) -> None: - cpp_gds_threshold_reset(nbytes) +def set_gds_threshold(nbytes: int) -> None: + cpp_set_gds_threshold(nbytes) def bounce_buffer_size() -> int: return cpp_bounce_buffer_size() -def bounce_buffer_size_reset(nbytes: int) -> None: - cpp_bounce_buffer_size_reset(nbytes) +def set_bounce_buffer_size(nbytes: int) -> None: + cpp_set_bounce_buffer_size(nbytes) def http_max_attempts() -> int: return cpp_http_max_attempts() -def http_max_attempts_reset(attempts: int) -> None: - cpp_http_max_attempts_reset(attempts) +def set_http_max_attempts(attempts: int) -> None: + cpp_set_http_max_attempts(attempts) def http_status_codes() -> list[int]: return cpp_http_status_codes() -def http_status_codes_reset(status_codes: list[int]) -> None: - return cpp_http_status_codes_reset(status_codes) +def set_http_status_codes(status_codes: list[int]) -> None: + return cpp_set_http_status_codes(status_codes) diff --git a/python/kvikio/kvikio/benchmarks/http_io.py b/python/kvikio/kvikio/benchmarks/http_io.py index 68d4643004..af4e44b973 100644 --- a/python/kvikio/kvikio/benchmarks/http_io.py +++ b/python/kvikio/kvikio/benchmarks/http_io.py @@ -1,4 +1,4 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2024-2025, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. import argparse @@ -47,7 +47,7 @@ def main(args): cupy.cuda.set_allocator(None) # Disable CuPy's default memory pool cupy.arange(10) # Make sure CUDA is initialized - kvikio.defaults.num_threads_reset(args.nthreads) + kvikio.defaults.set("num_threads", args.nthreads) print("Roundtrip benchmark") print("--------------------------------------") print(f"nelem | {args.nelem} ({format_bytes(args.nbytes)})") @@ -68,7 +68,8 @@ def main(args): res.append(elapsed) def pprint_api_res(name, samples): - samples = [args.nbytes / s for s in samples] # Convert to throughput + # Convert to throughput + samples = [args.nbytes / s for s in samples] mean = statistics.harmonic_mean(samples) if len(samples) > 1 else samples[0] ret = f"{api}-{name}".ljust(18) ret += f"| {format_bytes(mean).rjust(10)}/s".ljust(14) diff --git a/python/kvikio/kvikio/benchmarks/s3_io.py b/python/kvikio/kvikio/benchmarks/s3_io.py index 5e1846a1e5..08bdfc93a0 100644 --- a/python/kvikio/kvikio/benchmarks/s3_io.py +++ b/python/kvikio/kvikio/benchmarks/s3_io.py @@ -1,4 +1,4 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2024-2025, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. import argparse @@ -134,7 +134,7 @@ def main(args): cupy.arange(10) # Make sure CUDA is initialized os.environ["KVIKIO_NTHREADS"] = str(args.nthreads) - kvikio.defaults.num_threads_reset(args.nthreads) + kvikio.defaults.set("num_threads", args.nthreads) print("Remote S3 benchmark") print("--------------------------------------") @@ -157,7 +157,8 @@ def main(args): res.append(elapsed) def pprint_api_res(name, samples): - samples = [args.nbytes / s for s in samples] # Convert to throughput + # Convert to throughput + samples = [args.nbytes / s for s in samples] mean = statistics.harmonic_mean(samples) if len(samples) > 1 else samples[0] ret = f"{api}-{name}".ljust(18) ret += f"| {format_bytes(mean).rjust(10)}/s".ljust(14) diff --git a/python/kvikio/kvikio/benchmarks/single_node_io.py b/python/kvikio/kvikio/benchmarks/single_node_io.py index bca29ef90d..f5fc9057d1 100644 --- a/python/kvikio/kvikio/benchmarks/single_node_io.py +++ b/python/kvikio/kvikio/benchmarks/single_node_io.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2024, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2021-2025, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. import argparse @@ -259,7 +259,7 @@ def main(args): cupy.cuda.set_allocator(None) # Disable CuPy's default memory pool cupy.arange(10) # Make sure CUDA is initialized - kvikio.defaults.num_threads_reset(args.nthreads) + kvikio.defaults.set("num_threads", args.nthreads) print("Roundtrip benchmark") print("----------------------------------") diff --git a/python/kvikio/kvikio/benchmarks/utils.py b/python/kvikio/kvikio/benchmarks/utils.py index fa25c361a4..02d97991de 100644 --- a/python/kvikio/kvikio/benchmarks/utils.py +++ b/python/kvikio/kvikio/benchmarks/utils.py @@ -1,4 +1,4 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2024-2025, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. from __future__ import annotations @@ -28,7 +28,7 @@ def pprint_sys_info() -> None: """Pretty print system information""" version = kvikio.cufile_driver.libcufile_version() - props = kvikio.cufile_driver.DriverProperties() + props = kvikio.cufile_driver.properties try: import pynvml diff --git a/python/kvikio/kvikio/benchmarks/zarr_io.py b/python/kvikio/kvikio/benchmarks/zarr_io.py index fc226c2263..7882fcad8c 100644 --- a/python/kvikio/kvikio/benchmarks/zarr_io.py +++ b/python/kvikio/kvikio/benchmarks/zarr_io.py @@ -1,4 +1,4 @@ -# Copyright (c) 2023-2024, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2023-2025, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. import argparse @@ -118,7 +118,7 @@ def main(args): cupy.cuda.set_allocator(None) # Disable CuPy's default memory pool cupy.arange(10) # Make sure CUDA is initialized - kvikio.defaults.num_threads_reset(args.nthreads) + kvikio.defaults.set("num_threads", args.nthreads) drop_vm_cache_msg = str(args.drop_vm_cache) if not args.drop_vm_cache: drop_vm_cache_msg += " (use --drop-vm-cache for better accuracy!)" diff --git a/python/kvikio/kvikio/cufile_driver.py b/python/kvikio/kvikio/cufile_driver.py index fb32be347a..8018415191 100644 --- a/python/kvikio/kvikio/cufile_driver.py +++ b/python/kvikio/kvikio/cufile_driver.py @@ -1,14 +1,116 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2024-2025, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. import atexit -from typing import Tuple +from typing import Any, Tuple, overload +import kvikio.utils from kvikio._lib import cufile_driver # type: ignore -# TODO: Wrap nicely, maybe as a dataclass? -# -DriverProperties = cufile_driver.DriverProperties +properties = cufile_driver.DriverProperties() + + +class ConfigContextManager: + def __init__(self, config: dict[str, str]): + ( + self._property_getters, + self._property_setters, + ) = self._property_getter_and_setter() + self._old_properties = {} + + for key, value in config.items(): + self._old_properties[key] = self._get_property(key) + self._set_property(key, value) + + def __enter__(self): + return None + + def __exit__(self, type_unused, value, traceback_unused): + for key, value in self._old_properties.items(): + self._set_property(key, value) + + def _get_property(self, property: str) -> Any: + func = self._property_getters[property] + + # getter signature: object.__get__(self, instance, owner=None) + return func(properties) + + def _set_property(self, property: str, value: Any): + func = self._property_setters[property] + + # setter signature: object.__set__(self, instance, value) + func(properties, value) + + @kvikio.utils.call_once + def _property_getter_and_setter(self) -> tuple[dict[str, Any], dict[str, Any]]: + class_dict = vars(cufile_driver.DriverProperties) + + property_getter_names = [ + "poll_mode", + "poll_thresh_size", + "max_device_cache_size", + "max_pinned_memory_size", + ] + + property_getters = {} + property_setters = {} + + for name in property_getter_names: + property_getters[name] = class_dict[name].__get__ + property_setters[name] = class_dict[name].__set__ + return property_getters, property_setters + + +@overload +def set(config: dict[str, Any], /) -> ConfigContextManager: + ... + + +@overload +def set(key: str, value: Any, /) -> ConfigContextManager: + ... + + +def set(*config) -> ConfigContextManager: + """Set cuFile driver configurations. + + Examples: + + - To set one or more properties + + .. code-block:: python + + kvikio.cufile_driver.properties.set({"prop1": value1, "prop2": value2}) + + - To set a single property + + .. code-block:: python + + kvikio.cufile_driver.properties.set("prop", value) + + Parameters + ---------- + config + The configurations. Can either be a single parameter (dict) consisting of one + or more properties, or two parameters key (string) and value (Any) + indicating a single property. + """ + + err_msg = ( + "Valid arguments are kvikio.cufile_driver.properties.set(config: dict) or " + "kvikio.cufile_driver.properties.set(key: str, value: Any)" + ) + + if len(config) == 1: + if not isinstance(config[0], dict): + raise ValueError(err_msg) + return ConfigContextManager(config[0]) + elif len(config) == 2: + if not isinstance(config[0], str): + raise ValueError(err_msg) + return ConfigContextManager({config[0]: config[1]}) + else: + raise ValueError(err_msg) def libcufile_version() -> Tuple[int, int]: diff --git a/python/kvikio/kvikio/defaults.py b/python/kvikio/kvikio/defaults.py index 4201cc29a3..3688be6a6e 100644 --- a/python/kvikio/kvikio/defaults.py +++ b/python/kvikio/kvikio/defaults.py @@ -1,10 +1,117 @@ # Copyright (c) 2021-2025, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. - -import contextlib +import warnings +from typing import Any, Callable, overload import kvikio._lib.defaults +import kvikio.utils + + +class ConfigContextManager: + def __init__(self, config: dict[str, str]): + ( + self._property_getters, + self._property_setters, + ) = self._property_getter_and_setter() + self._old_properties = {} + + for key, value in config.items(): + self._old_properties[key] = self._get_property(key) + self._set_property(key, value) + + def __enter__(self): + return None + + def __exit__(self, type_unused, value, traceback_unused): + for key, value in self._old_properties.items(): + self._set_property(key, value) + + def _get_property(self, property: str) -> Any: + if property == "num_threads": + property = "thread_pool_nthreads" + func = self._property_getters[property] + return func() + + def _set_property(self, property: str, value: Any): + if property == "num_threads": + property = "thread_pool_nthreads" + func = self._property_setters[property] + func(value) + + @kvikio.utils.call_once + def _property_getter_and_setter(self) -> tuple[dict[str, Any], dict[str, Any]]: + module_dict = vars(kvikio._lib.defaults) + + property_getter_names = [ + "compat_mode", + "thread_pool_nthreads", + "task_size", + "gds_threshold", + "bounce_buffer_size", + "http_max_attempts", + "http_status_codes", + ] + + property_getters = {} + property_setters = {} + + for name in property_getter_names: + property_getters[name] = module_dict[name] + property_setters[name] = module_dict["set_" + name] + return property_getters, property_setters + + +@overload +def set(config: dict[str, Any], /) -> ConfigContextManager: + ... + + +@overload +def set(key: str, value: Any, /) -> ConfigContextManager: + ... + + +def set(*config) -> ConfigContextManager: + """Set KvikIO configurations. + + Examples: + + - To set one or more properties + + .. code-block:: python + + kvikio.defaults.set({"prop1": value1, "prop2": value2}) + + - To set a single property + + .. code-block:: python + + kvikio.defaults.set("prop", value) + + Parameters + ---------- + config + The configurations. Can either be a single parameter (dict) consisting of one + or more properties, or two parameters key (string) and value (Any) + indicating a single property. + """ + + err_msg = ( + "Valid arguments are kvikio.defaults.set(config: dict) or " + "kvikio.defaults.set(key: str, value: Any)" + ) + + if len(config) == 1: + if not isinstance(config[0], dict): + raise ValueError(err_msg) + return ConfigContextManager(config[0]) + elif len(config) == 2: + if not isinstance(config[0], str): + raise ValueError(err_msg) + return ConfigContextManager({config[0]: config[1]}) + else: + raise ValueError(err_msg) def compat_mode() -> kvikio.CompatMode: @@ -32,45 +139,10 @@ def compat_mode() -> kvikio.CompatMode: return kvikio._lib.defaults.compat_mode() -def compat_mode_reset(compatmode: kvikio.CompatMode) -> None: - """Reset the compatibility mode. - - Use this function to enable/disable compatibility mode explicitly. - - Parameters - ---------- - compatmode : kvikio.CompatMode - Set to kvikio.CompatMode.ON to enable and kvikio.CompatMode.OFF to disable - compatibility mode, or kvikio.CompatMode.AUTO to let KvikIO determine: try - OFF first, and upon failure, fall back to ON. - """ - kvikio._lib.defaults.compat_mode_reset(compatmode) - - -@contextlib.contextmanager -def set_compat_mode(compatmode: kvikio.CompatMode): - """Context for resetting the compatibility mode. - - Parameters - ---------- - compatmode : kvikio.CompatMode - Set to kvikio.CompatMode.ON to enable and kvikio.CompatMode.OFF to disable - compatibility mode, or kvikio.CompatMode.AUTO to let KvikIO determine: try - OFF first, and upon failure, fall back to ON. - """ - num_threads_reset(get_num_threads()) # Sync all running threads - old_value = compat_mode() - try: - compat_mode_reset(compatmode) - yield - finally: - compat_mode_reset(old_value) - - -def get_num_threads() -> int: +def num_threads() -> int: """Get the number of threads of the thread pool. - Set the default value using `num_threads_reset()` or by setting the + Set the default value using `set("num_threads", value)` or by setting the `KVIKIO_NTHREADS` environment variable. If not set, the default value is 1. Returns @@ -81,46 +153,10 @@ def get_num_threads() -> int: return kvikio._lib.defaults.thread_pool_nthreads() -def num_threads_reset(nthreads: int) -> None: - """Reset the number of threads in the default thread pool. - - Waits for all currently running tasks to be completed, then destroys all threads - in the pool and creates a new thread pool with the new number of threads. Any - tasks that were waiting in the queue before the pool was reset will then be - executed by the new threads. If the pool was paused before resetting it, the new - pool will be paused as well. - - Parameters - ---------- - nthreads : int - The number of threads to use. The default value can be specified by setting - the `KVIKIO_NTHREADS` environment variable. If not set, the default value - is 1. - """ - kvikio._lib.defaults.thread_pool_nthreads_reset(nthreads) - - -@contextlib.contextmanager -def set_num_threads(nthreads: int): - """Context for resetting the number of threads in the default thread pool. - - Parameters - ---------- - nthreads : int - The number of threads to use. - """ - old_value = get_num_threads() - try: - num_threads_reset(nthreads) - yield - finally: - num_threads_reset(old_value) - - def task_size() -> int: """Get the default task size used for parallel IO operations. - Set the default value using `task_size_reset()` or by setting + Set the default value using `set("task_size", value)` or by setting the `KVIKIO_TASK_SIZE` environment variable. If not set, the default value is 4 MiB. @@ -132,34 +168,6 @@ def task_size() -> int: return kvikio._lib.defaults.task_size() -def task_size_reset(nbytes: int) -> None: - """Reset the default task size used for parallel IO operations. - - Parameters - ---------- - nbytes : int - The default task size in bytes. - """ - kvikio._lib.defaults.task_size_reset(nbytes) - - -@contextlib.contextmanager -def set_task_size(nbytes: int): - """Context for resetting the task size used for parallel IO operations. - - Parameters - ---------- - nbytes : int - The default task size in bytes. - """ - old_value = task_size() - try: - task_size_reset(nbytes) - yield - finally: - task_size_reset(old_value) - - def gds_threshold() -> int: """Get the default GDS threshold, which is the minimum size to use GDS. @@ -167,7 +175,7 @@ def gds_threshold() -> int: implements a shortcut that circumvent the threadpool and use the POSIX backend directly. - Set the default value using `gds_threshold_reset()` or by setting the + Set the default value using `set("gds_threshold", value)` or by setting the `KVIKIO_GDS_THRESHOLD` environment variable. If not set, the default value is 1 MiB. @@ -179,38 +187,10 @@ def gds_threshold() -> int: return kvikio._lib.defaults.gds_threshold() -def gds_threshold_reset(nbytes: int) -> None: - """Reset the default GDS threshold, which is the minimum size to use GDS. - - Parameters - ---------- - nbytes : int - The default GDS threshold size in bytes. - """ - kvikio._lib.defaults.gds_threshold_reset(nbytes) - - -@contextlib.contextmanager -def set_gds_threshold(nbytes: int): - """Context for resetting the default GDS threshold. - - Parameters - ---------- - nbytes : int - The default GDS threshold size in bytes. - """ - old_value = gds_threshold() - try: - gds_threshold_reset(nbytes) - yield - finally: - gds_threshold_reset(old_value) - - def bounce_buffer_size() -> int: """Get the size of the bounce buffer used to stage data in host memory. - Set the value using `bounce_buffer_size_reset()` or by setting the + Set the value using `set("bounce_buffer_size", value)` or by setting the `KVIKIO_BOUNCE_BUFFER_SIZE` environment variable. If not set, the value is 16 MiB. @@ -222,41 +202,13 @@ def bounce_buffer_size() -> int: return kvikio._lib.defaults.bounce_buffer_size() -def bounce_buffer_size_reset(nbytes: int) -> None: - """Reset the size of the bounce buffer used to stage data in host memory. - - Parameters - ---------- - nbytes : int - The bounce buffer size in bytes. - """ - kvikio._lib.defaults.bounce_buffer_size_reset(nbytes) - - -@contextlib.contextmanager -def set_bounce_buffer_size(nbytes: int): - """Context for resetting the size of the bounce buffer. - - Parameters - ---------- - nbytes : int - The bounce buffer size in bytes. - """ - old_value = bounce_buffer_size() - try: - bounce_buffer_size_reset(nbytes) - yield - finally: - bounce_buffer_size_reset(old_value) - - def http_max_attempts() -> int: """Get the maximum number of attempts per remote IO read. Reads are retried up until ``http_max_attempts`` when the response has certain HTTP status codes. - Set the value using `http_max_attempts_reset()` or by setting the + Set the value using `set("http_max_attempts", value)` or by setting the ``KVIKIO_HTTP_MAX_ATTEMPTS`` environment variable. If not set, the value is 3. @@ -269,38 +221,10 @@ def http_max_attempts() -> int: return kvikio._lib.defaults.http_max_attempts() -def http_max_attempts_reset(attempts: int) -> None: - """Reset the maximum number of attempts per remote IO read. - - Parameters - ---------- - attempts : int - The maximum number of attempts to try before raising an error. - """ - kvikio._lib.defaults.http_max_attempts_reset(attempts) - - -@contextlib.contextmanager -def set_http_max_attempts(attempts: int): - """Context for resetting the maximum number of HTTP attempts. - - Parameters - ---------- - attempts : int - The maximum number of attempts to try before raising an error. - """ - old_value = http_max_attempts() - try: - http_max_attempts_reset(attempts) - yield - finally: - http_max_attempts_reset(old_value) - - def http_status_codes() -> list[int]: """Get the list of HTTP status codes to retry. - Set the value using ``set_http_status_codes`` or by setting the + Set the value using ``set("http_status_codes", value)`` or by setting the ``KVIKIO_HTTP_STATUS_CODES`` environment variable. If not set, the default value is @@ -318,29 +242,164 @@ def http_status_codes() -> list[int]: return kvikio._lib.defaults.http_status_codes() -def http_status_codes_reset(status_codes: list[int]) -> None: - """Reset the list of HTTP status codes to retry. +def kvikio_deprecation_notice(msg: str): + def decorator(func: Callable): + def wrapper(*args, **kwargs): + warnings.warn(msg, category=FutureWarning, stacklevel=2) + return func(*args, **kwargs) + + return wrapper + + return decorator + + +@kvikio_deprecation_notice('Use kvikio.defaults.set("compat_mode", value) instead') +def compat_mode_reset(compatmode: kvikio.CompatMode) -> None: + """(Deprecated) Reset the compatibility mode. + + Use this function to enable/disable compatibility mode explicitly. Parameters ---------- - status_codes : list[int] - The HTTP status codes to retry. + compatmode : kvikio.CompatMode + Set to kvikio.CompatMode.ON to enable and kvikio.CompatMode.OFF to disable + compatibility mode, or kvikio.CompatMode.AUTO to let KvikIO determine: try + OFF first, and upon failure, fall back to ON. """ - kvikio._lib.defaults.http_status_codes_reset(status_codes) + set("compat_mode", compatmode) -@contextlib.contextmanager -def set_http_status_codes(status_codes: list[int]): - """Context for resetting the HTTP status codes to retry. +@kvikio_deprecation_notice('Use kvikio.defaults.set("compat_mode", value) instead') +def set_compat_mode(compatmode: kvikio.CompatMode): + """(Deprecated) Same with compat_mode_reset.""" + compat_mode_reset(compatmode) + + +@kvikio_deprecation_notice('Use kvikio.defaults.set("num_threads", value) instead') +def num_threads_reset(nthreads: int) -> None: + """(Deprecated) Reset the number of threads in the default thread pool. + + Waits for all currently running tasks to be completed, then destroys all threads + in the pool and creates a new thread pool with the new number of threads. Any + tasks that were waiting in the queue before the pool was reset will then be + executed by the new threads. If the pool was paused before resetting it, the new + pool will be paused as well. + + Parameters + ---------- + nthreads : int + The number of threads to use. The default value can be specified by setting + the `KVIKIO_NTHREADS` environment variable. If not set, the default value + is 1. + """ + set("num_threads", nthreads) + + +@kvikio_deprecation_notice('Use kvikio.defaults.set("num_threads", value) instead') +def set_num_threads(nthreads: int): + """(Deprecated) Same with num_threads_reset.""" + set("num_threads", nthreads) + + +@kvikio_deprecation_notice('Use kvikio.defaults.set("task_size", value) instead') +def task_size_reset(nbytes: int) -> None: + """(Deprecated) Reset the default task size used for parallel IO operations. + + Parameters + ---------- + nbytes : int + The default task size in bytes. + """ + set("task_size", nbytes) + + +@kvikio_deprecation_notice('Use kvikio.defaults.set("task_size", value) instead') +def set_task_size(nbytes: int): + """(Deprecated) Same with task_size_reset.""" + set("task_size", nbytes) + + +@kvikio_deprecation_notice('Use kvikio.defaults.set("gds_threshold", value) instead') +def gds_threshold_reset(nbytes: int) -> None: + """(Deprecated) Reset the default GDS threshold, which is the minimum size to + use GDS. + + Parameters + ---------- + nbytes : int + The default GDS threshold size in bytes. + """ + set("gds_threshold", nbytes) + + +@kvikio_deprecation_notice('Use kvikio.defaults.set("gds_threshold", value) instead') +def set_gds_threshold(nbytes: int): + """(Deprecated) Same with gds_threshold_reset.""" + set("gds_threshold", nbytes) + + +@kvikio_deprecation_notice( + 'Use kvikio.defaults.set("bounce_buffer_size", value) instead' +) +def bounce_buffer_size_reset(nbytes: int) -> None: + """(Deprecated) Reset the size of the bounce buffer used to stage data in host + memory. + + Parameters + ---------- + nbytes : int + The bounce buffer size in bytes. + """ + set("bounce_buffer_size", nbytes) + + +@kvikio_deprecation_notice( + 'Use kvikio.defaults.set("bounce_buffer_size", value) instead' +) +def set_bounce_buffer_size(nbytes: int): + """(Deprecated) Same with bounce_buffer_size_reset.""" + set("bounce_buffer_size", nbytes) + + +@kvikio_deprecation_notice( + 'Use kvikio.defaults.set("http_max_attempts", value) instead' +) +def http_max_attempts_reset(attempts: int) -> None: + """(Deprecated) Reset the maximum number of attempts per remote IO read. + + Parameters + ---------- + attempts : int + The maximum number of attempts to try before raising an error. + """ + set("http_max_attempts", attempts) + + +@kvikio_deprecation_notice( + 'Use kvikio.defaults.set("http_max_attempts", value) instead' +) +def set_http_max_attempts(attempts: int): + """(Deprecated) Same with http_max_attempts_reset.""" + set("http_max_attempts", attempts) + + +@kvikio_deprecation_notice( + 'Use kvikio.defaults.set("http_status_codes", value) instead' +) +def http_status_codes_reset(status_codes: list[int]) -> None: + """(Deprecated) Reset the list of HTTP status codes to retry. Parameters ---------- status_codes : list[int] - THe HTTP status codes to retry. + The HTTP status codes to retry. """ - old_value = http_status_codes() - try: - http_status_codes_reset(status_codes) - yield - finally: - http_status_codes_reset(old_value) + set("http_status_codes", status_codes) + + +@kvikio_deprecation_notice( + 'Use kvikio.defaults.set("http_status_codes", value) instead' +) +def set_http_status_codes(status_codes: list[int]): + """(Deprecated) Same with http_status_codes_reset.""" + set("http_status_codes", status_codes) diff --git a/python/kvikio/kvikio/utils.py b/python/kvikio/kvikio/utils.py index fc88e321a5..e79386023c 100644 --- a/python/kvikio/kvikio/utils.py +++ b/python/kvikio/kvikio/utils.py @@ -11,7 +11,7 @@ SimpleHTTPRequestHandler, ThreadingHTTPServer, ) -from typing import Any +from typing import Any, Callable class LocalHttpServer: @@ -94,3 +94,32 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): self.process.kill() + + +def call_once(func: Callable): + """Decorate a function such that it is only called once + + Examples: + + .. code-block:: python + + @call_once + foo(args) + + Parameters + ---------- + func: Callable + The function to be decorated. + """ + once_flag = True + cached_result = None + + def wrapper(*args, **kwargs): + nonlocal once_flag + nonlocal cached_result + if once_flag: + once_flag = False + cached_result = func(*args, **kwargs) + return cached_result + + return wrapper diff --git a/python/kvikio/tests/conftest.py b/python/kvikio/tests/conftest.py index c1cc77026e..120f862ae6 100644 --- a/python/kvikio/tests/conftest.py +++ b/python/kvikio/tests/conftest.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2022-2025, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. import contextlib @@ -94,5 +94,5 @@ def xp(request): def gds_threshold(request): """Fixture to parametrize over GDS threshold values""" - with kvikio.defaults.set_gds_threshold(request.param): + with kvikio.defaults.set("gds_threshold", request.param): yield request.param diff --git a/python/kvikio/tests/test_basic_io.py b/python/kvikio/tests/test_basic_io.py index e1e9932e23..7d5577d6c5 100644 --- a/python/kvikio/tests/test_basic_io.py +++ b/python/kvikio/tests/test_basic_io.py @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2024, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2021-2025, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. import os @@ -27,18 +27,17 @@ def test_write(tmp_path, xp, gds_threshold, size, nthreads, tasksize): """Test basic read/write""" filename = tmp_path / "test-file" - with kvikio.defaults.set_num_threads(nthreads): - with kvikio.defaults.set_task_size(tasksize): - a = xp.arange(size) - f = kvikio.CuFile(filename, "w") - assert not f.closed - assert check_bit_flags(f.open_flags(), os.O_WRONLY) - assert f.write(a) == a.nbytes - f.close() - assert f.closed + with kvikio.defaults.set({"num_threads": nthreads, "task_size": tasksize}): + a = xp.arange(size) + f = kvikio.CuFile(filename, "w") + assert not f.closed + assert check_bit_flags(f.open_flags(), os.O_WRONLY) + assert f.write(a) == a.nbytes + f.close() + assert f.closed - b = numpy.fromfile(filename, dtype=a.dtype) - xp.testing.assert_array_equal(a, b) + b = numpy.fromfile(filename, dtype=a.dtype) + xp.testing.assert_array_equal(a, b) @pytest.mark.parametrize("size", [1, 10, 100, 1000, 1024, 4096, 4096 * 10]) @@ -48,17 +47,16 @@ def test_read(tmp_path, xp, gds_threshold, size, nthreads, tasksize): """Test basic read/write""" filename = tmp_path / "test-file" - with kvikio.defaults.set_num_threads(nthreads): - with kvikio.defaults.set_task_size(tasksize): - a = numpy.arange(size) - a.tofile(filename) - os.sync() + with kvikio.defaults.set({"num_threads": nthreads, "task_size": tasksize}): + a = numpy.arange(size) + a.tofile(filename) + os.sync() - b = xp.empty_like(a) - f = kvikio.CuFile(filename, "r") - assert check_bit_flags(f.open_flags(), os.O_RDONLY) - assert f.read(b) == b.nbytes - xp.testing.assert_array_equal(a, b) + b = xp.empty_like(a) + f = kvikio.CuFile(filename, "r") + assert check_bit_flags(f.open_flags(), os.O_RDONLY) + assert f.read(b) == b.nbytes + xp.testing.assert_array_equal(a, b) def test_file_handle_context(tmp_path): @@ -106,11 +104,11 @@ def test_incorrect_open_mode_error(tmp_path, xp): def test_set_compat_mode_between_io(tmp_path): """Test changing `compat_mode`""" - with kvikio.defaults.set_compat_mode(False): + with kvikio.defaults.set("compat_mode", kvikio.CompatMode.OFF): f = kvikio.CuFile(tmp_path / "test-file", "w") assert not f.closed assert f.open_flags() & os.O_WRONLY != 0 - with kvikio.defaults.set_compat_mode(True): + with kvikio.defaults.set("compat_mode", kvikio.CompatMode.ON): a = cupy.arange(10) assert f.write(a) == a.nbytes @@ -155,17 +153,16 @@ def test_write_to_files_in_chunks(tmp_path, xp, gds_threshold): def test_read_write_slices(tmp_path, xp, gds_threshold, nthreads, tasksize, start, end): """Read and write different slices""" - with kvikio.defaults.set_num_threads(nthreads): - with kvikio.defaults.set_task_size(tasksize): - filename = tmp_path / "test-file" - a = xp.arange(10 * 4096) # 10 page-sizes - b = a.copy() - a[start:end] = 42 - with kvikio.CuFile(filename, "w") as f: - assert f.write(a[start:end]) == a[start:end].nbytes - with kvikio.CuFile(filename, "r") as f: - assert f.read(b[start:end]) == b[start:end].nbytes - xp.testing.assert_array_equal(a, b) + with kvikio.defaults.set({"num_threads": nthreads, "task_size": tasksize}): + filename = tmp_path / "test-file" + a = xp.arange(10 * 4096) # 10 page-sizes + b = a.copy() + a[start:end] = 42 + with kvikio.CuFile(filename, "w") as f: + assert f.write(a[start:end]) == a[start:end].nbytes + with kvikio.CuFile(filename, "r") as f: + assert f.read(b[start:end]) == b[start:end].nbytes + xp.testing.assert_array_equal(a, b) @pytest.mark.parametrize("size", [1, 10, 100, 1000, 1024, 4096, 4096 * 10]) @@ -232,31 +229,30 @@ def test_multiple_gpus(tmp_path, xp, gds_threshold): """Test IO from two different GPUs""" filename = tmp_path / "test-file" - with kvikio.defaults.set_num_threads(10): - with kvikio.defaults.set_task_size(10): - # Allocate an array on each device + with kvikio.defaults.set({"num_threads": 10, "task_size": 10}): + # Allocate an array on each device + with cupy.cuda.Device(0): + a0 = xp.arange(200) + with cupy.cuda.Device(1): + a1 = xp.zeros(200, dtype=a0.dtype) + + # Test when the device match the allocation + with kvikio.CuFile(filename, "w") as f: with cupy.cuda.Device(0): - a0 = xp.arange(200) + assert f.write(a0) == a0.nbytes + with kvikio.CuFile(filename, "r") as f: with cupy.cuda.Device(1): - a1 = xp.zeros(200, dtype=a0.dtype) - - # Test when the device match the allocation - with kvikio.CuFile(filename, "w") as f: - with cupy.cuda.Device(0): - assert f.write(a0) == a0.nbytes - with kvikio.CuFile(filename, "r") as f: - with cupy.cuda.Device(1): - assert f.read(a1) == a1.nbytes - assert bytes(a0) == bytes(a1) - - # Test when the device doesn't match the allocation - with kvikio.CuFile(filename, "w") as f: - with cupy.cuda.Device(1): - assert f.write(a0) == a0.nbytes - with kvikio.CuFile(filename, "r") as f: - with cupy.cuda.Device(0): - assert f.read(a1) == a1.nbytes - assert bytes(a0) == bytes(a1) + assert f.read(a1) == a1.nbytes + assert bytes(a0) == bytes(a1) + + # Test when the device doesn't match the allocation + with kvikio.CuFile(filename, "w") as f: + with cupy.cuda.Device(1): + assert f.write(a0) == a0.nbytes + with kvikio.CuFile(filename, "r") as f: + with cupy.cuda.Device(0): + assert f.read(a1) == a1.nbytes + assert bytes(a0) == bytes(a1) @pytest.mark.parametrize("size", [1, 10, 100, 1000]) @@ -265,31 +261,35 @@ def test_multiple_gpus(tmp_path, xp, gds_threshold): def test_different_bounce_buffer_sizes(tmp_path, size, tasksize, buffer_size): """Test different bounce buffer sizes""" filename = tmp_path / "test-file" - with kvikio.defaults.set_compat_mode(True), kvikio.defaults.set_num_threads(10): - with kvikio.defaults.set_task_size(tasksize): - with kvikio.defaults.set_bounce_buffer_size(buffer_size): - with kvikio.CuFile(filename, "w+") as f: - a = cupy.arange(size) - b = cupy.empty_like(a) - f.write(a) - assert f.read(b) == b.nbytes - cupy.testing.assert_array_equal(a, b) + with kvikio.defaults.set( + { + "compat_mode": kvikio.CompatMode.ON, + "num_threads": 10, + "bounce_buffer_size": buffer_size, + } + ): + with kvikio.CuFile(filename, "w+") as f: + a = cupy.arange(size) + b = cupy.empty_like(a) + f.write(a) + assert f.read(b) == b.nbytes + cupy.testing.assert_array_equal(a, b) def test_bounce_buffer_free(tmp_path): """Test freeing the bounce buffer allocations""" filename = tmp_path / "test-file" kvikio.buffer.bounce_buffer_free() - with kvikio.defaults.set_compat_mode(True), kvikio.defaults.set_num_threads(1): + with kvikio.defaults.set({"compat_mode": kvikio.CompatMode.ON, "num_threads": 1}): with kvikio.CuFile(filename, "w") as f: - with kvikio.defaults.set_bounce_buffer_size(1024): + with kvikio.defaults.set({"bounce_buffer_size": 1024}): # Notice, since the bounce buffer size is only checked when the buffer # is used, we populate the bounce buffer in between we clear it. f.write(cupy.arange(10)) assert kvikio.buffer.bounce_buffer_free() == 1024 assert kvikio.buffer.bounce_buffer_free() == 0 f.write(cupy.arange(10)) - with kvikio.defaults.set_bounce_buffer_size(2048): + with kvikio.defaults.set({"bounce_buffer_size": 2048}): f.write(cupy.arange(10)) assert kvikio.buffer.bounce_buffer_free() == 2048 assert kvikio.buffer.bounce_buffer_free() == 0 diff --git a/python/kvikio/tests/test_cufile_driver.py b/python/kvikio/tests/test_cufile_driver.py index a1dc3a6454..fcc95c6cbc 100644 --- a/python/kvikio/tests/test_cufile_driver.py +++ b/python/kvikio/tests/test_cufile_driver.py @@ -1,4 +1,4 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2024-2025, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. import pytest @@ -16,3 +16,37 @@ def test_version(): def test_open_and_close(): kvikio.cufile_driver.driver_open() kvikio.cufile_driver.driver_close() + + +def test_property_setter(): + """Test the method `set`""" + + # Attempt to set a nonexistent property + with pytest.raises(KeyError): + kvikio.cufile_driver.set("nonexistent_property", 123) + + # Nested context managers + poll_thresh_size_default = kvikio.cufile_driver.properties.poll_thresh_size + with kvikio.cufile_driver.set("poll_thresh_size", 1024): + assert kvikio.cufile_driver.properties.poll_thresh_size == 1024 + with kvikio.cufile_driver.set("poll_thresh_size", 2048): + assert kvikio.cufile_driver.properties.poll_thresh_size == 2048 + with kvikio.cufile_driver.set("poll_thresh_size", 4096): + assert kvikio.cufile_driver.properties.poll_thresh_size == 4096 + assert kvikio.cufile_driver.properties.poll_thresh_size == 2048 + assert kvikio.cufile_driver.properties.poll_thresh_size == 1024 + assert kvikio.cufile_driver.properties.poll_thresh_size == poll_thresh_size_default + + # Multiple context managers + poll_mode_default = kvikio.cufile_driver.properties.poll_mode + max_device_cache_size_default = ( + kvikio.cufile_driver.properties.max_device_cache_size + ) + with kvikio.cufile_driver.set({"poll_mode": True, "max_device_cache_size": 2048}): + assert kvikio.cufile_driver.properties.poll_mode and ( + kvikio.cufile_driver.properties.max_device_cache_size == 2048 + ) + assert (kvikio.cufile_driver.properties.poll_mode == poll_mode_default) and ( + kvikio.cufile_driver.properties.max_device_cache_size + == max_device_cache_size_default + ) diff --git a/python/kvikio/tests/test_defaults.py b/python/kvikio/tests/test_defaults.py index 5e2d6675f8..82c6327f5e 100644 --- a/python/kvikio/tests/test_defaults.py +++ b/python/kvikio/tests/test_defaults.py @@ -7,6 +7,49 @@ import kvikio.defaults +def test_property_setter(): + """Test the method `set`""" + + # Attempt to set a nonexistent property + with pytest.raises(KeyError): + kvikio.defaults.set("nonexistent_property", 123) + + # Attempt to set a property whose name is mistakenly prefixed by "set_" + # (coinciding with the setter method). + with pytest.raises(KeyError): + kvikio.defaults.set("set_task_size", 123) + + # Nested context managers + task_size_default = kvikio.defaults.task_size() + with kvikio.defaults.set("task_size", 1024): + assert kvikio.defaults.task_size() == 1024 + with kvikio.defaults.set("task_size", 2048): + assert kvikio.defaults.task_size() == 2048 + with kvikio.defaults.set("task_size", 4096): + assert kvikio.defaults.task_size() == 4096 + assert kvikio.defaults.task_size() == 2048 + assert kvikio.defaults.task_size() == 1024 + assert kvikio.defaults.task_size() == task_size_default + + # Multiple context managers + task_size_default = kvikio.defaults.task_size() + num_threads_default = kvikio.defaults.num_threads() + bounce_buffer_size_default = kvikio.defaults.bounce_buffer_size() + with kvikio.defaults.set( + {"task_size": 1024, "num_threads": 16, "bounce_buffer_size": 1024} + ): + assert ( + (kvikio.defaults.task_size() == 1024) + and (kvikio.defaults.num_threads() == 16) + and (kvikio.defaults.bounce_buffer_size() == 1024) + ) + assert ( + (kvikio.defaults.task_size() == task_size_default) + and (kvikio.defaults.num_threads() == num_threads_default) + and (kvikio.defaults.bounce_buffer_size() == bounce_buffer_size_default) + ) + + @pytest.mark.skipif( kvikio.defaults.compat_mode() == kvikio.CompatMode.ON, reason="cannot test `compat_mode` when already running in compatibility mode", @@ -15,11 +58,11 @@ def test_compat_mode(): """Test changing `compat_mode`""" before = kvikio.defaults.compat_mode() - with kvikio.defaults.set_compat_mode(kvikio.CompatMode.ON): + with kvikio.defaults.set("compat_mode", kvikio.CompatMode.ON): assert kvikio.defaults.compat_mode() == kvikio.CompatMode.ON - kvikio.defaults.compat_mode_reset(kvikio.CompatMode.OFF) + kvikio.defaults.set("compat_mode", kvikio.CompatMode.OFF) assert kvikio.defaults.compat_mode() == kvikio.CompatMode.OFF - kvikio.defaults.compat_mode_reset(kvikio.CompatMode.AUTO) + kvikio.defaults.set("compat_mode", kvikio.CompatMode.AUTO) assert kvikio.defaults.compat_mode() == kvikio.CompatMode.AUTO assert before == kvikio.defaults.compat_mode() @@ -27,91 +70,91 @@ def test_compat_mode(): def test_num_threads(): """Test changing `num_threads`""" - before = kvikio.defaults.get_num_threads() - with kvikio.defaults.set_num_threads(3): - assert kvikio.defaults.get_num_threads() == 3 - kvikio.defaults.num_threads_reset(4) - assert kvikio.defaults.get_num_threads() == 4 - assert before == kvikio.defaults.get_num_threads() + before = kvikio.defaults.num_threads() + with kvikio.defaults.set("num_threads", 3): + assert kvikio.defaults.num_threads() == 3 + kvikio.defaults.set("num_threads", 4) + assert kvikio.defaults.num_threads() == 4 + assert before == kvikio.defaults.num_threads() with pytest.raises(ValueError, match="positive integer greater than zero"): - kvikio.defaults.num_threads_reset(0) + kvikio.defaults.set("num_threads", 0) with pytest.raises(OverflowError, match="negative value"): - kvikio.defaults.num_threads_reset(-1) + kvikio.defaults.set("num_threads", -1) def test_task_size(): """Test changing `task_size`""" before = kvikio.defaults.task_size() - with kvikio.defaults.set_task_size(3): + with kvikio.defaults.set("task_size", 3): assert kvikio.defaults.task_size() == 3 - kvikio.defaults.task_size_reset(4) + kvikio.defaults.set("task_size", 4) assert kvikio.defaults.task_size() == 4 assert before == kvikio.defaults.task_size() with pytest.raises(ValueError, match="positive integer greater than zero"): - kvikio.defaults.task_size_reset(0) + kvikio.defaults.set("task_size", 0) with pytest.raises(OverflowError, match="negative value"): - kvikio.defaults.task_size_reset(-1) + kvikio.defaults.set("task_size", -1) def test_gds_threshold(): """Test changing `gds_threshold`""" before = kvikio.defaults.gds_threshold() - with kvikio.defaults.set_gds_threshold(3): + with kvikio.defaults.set("gds_threshold", 3): assert kvikio.defaults.gds_threshold() == 3 - kvikio.defaults.gds_threshold_reset(4) + kvikio.defaults.set("gds_threshold", 4) assert kvikio.defaults.gds_threshold() == 4 assert before == kvikio.defaults.gds_threshold() with pytest.raises(OverflowError, match="negative value"): - kvikio.defaults.gds_threshold_reset(-1) + kvikio.defaults.set("gds_threshold", -1) def test_bounce_buffer_size(): """Test changing `bounce_buffer_size`""" before = kvikio.defaults.bounce_buffer_size() - with kvikio.defaults.set_bounce_buffer_size(3): + with kvikio.defaults.set("bounce_buffer_size", 3): assert kvikio.defaults.bounce_buffer_size() == 3 - kvikio.defaults.bounce_buffer_size_reset(4) + kvikio.defaults.set("bounce_buffer_size", 4) assert kvikio.defaults.bounce_buffer_size() == 4 assert before == kvikio.defaults.bounce_buffer_size() with pytest.raises(ValueError, match="positive integer greater than zero"): - kvikio.defaults.bounce_buffer_size_reset(0) + kvikio.defaults.set("bounce_buffer_size", 0) with pytest.raises(OverflowError, match="negative value"): - kvikio.defaults.bounce_buffer_size_reset(-1) + kvikio.defaults.set("bounce_buffer_size", -1) def test_http_max_attempts(): before = kvikio.defaults.http_max_attempts() - with kvikio.defaults.set_http_max_attempts(5): + with kvikio.defaults.set("http_max_attempts", 5): assert kvikio.defaults.http_max_attempts() == 5 - kvikio.defaults.http_max_attempts_reset(4) + kvikio.defaults.set("http_max_attempts", 4) assert kvikio.defaults.http_max_attempts() == 4 assert kvikio.defaults.http_max_attempts() == before with pytest.raises(ValueError, match="positive integer"): - kvikio.defaults.http_max_attempts_reset(0) + kvikio.defaults.set("http_max_attempts", 0) with pytest.raises(OverflowError, match="negative value"): - kvikio.defaults.http_max_attempts_reset(-1) + kvikio.defaults.set("http_max_attempts", -1) def test_http_status_codes(): before = kvikio.defaults.http_status_codes() - with kvikio.defaults.set_http_status_codes([500]): + with kvikio.defaults.set("http_status_codes", [500]): assert kvikio.defaults.http_status_codes() == [500] - kvikio.defaults.http_status_codes_reset([429, 500]) + kvikio.defaults.set("http_status_codes", [429, 500]) assert kvikio.defaults.http_status_codes() == [429, 500] assert kvikio.defaults.http_status_codes() == before with pytest.raises(TypeError): - kvikio.defaults.http_status_codes_reset(0) + kvikio.defaults.set("http_status_codes", 0) with pytest.raises(TypeError): - kvikio.defaults.http_status_codes_reset(["a"]) + kvikio.defaults.set("http_status_codes", ["a"]) diff --git a/python/kvikio/tests/test_http_io.py b/python/kvikio/tests/test_http_io.py index e62dbb81af..d05df02397 100644 --- a/python/kvikio/tests/test_http_io.py +++ b/python/kvikio/tests/test_http_io.py @@ -99,14 +99,13 @@ def test_read(http_server, tmpdir, xp, size, nthreads, tasksize): a = xp.arange(size) a.tofile(tmpdir / "a") - with kvikio.defaults.set_num_threads(nthreads): - with kvikio.defaults.set_task_size(tasksize): - with kvikio.RemoteFile.open_http(f"{http_server}/a") as f: - assert f.nbytes() == a.nbytes - assert f"{http_server}/a" in str(f) - b = xp.empty_like(a) - assert f.read(b) == a.nbytes - xp.testing.assert_array_equal(a, b) + with kvikio.defaults.set({"num_threads": nthreads, "task_size": tasksize}): + with kvikio.RemoteFile.open_http(f"{http_server}/a") as f: + assert f.nbytes() == a.nbytes + assert f"{http_server}/a" in str(f) + b = xp.empty_like(a) + assert f.read(b) == a.nbytes + xp.testing.assert_array_equal(a, b) @pytest.mark.parametrize("nthreads", [1, 10]) @@ -114,7 +113,7 @@ def test_large_read(http_server, tmpdir, xp, nthreads): a = xp.arange(16_000_000) a.tofile(tmpdir / "a") - with kvikio.defaults.set_num_threads(nthreads): + with kvikio.defaults.set("num_threads", nthreads): with kvikio.RemoteFile.open_http(f"{http_server}/a") as f: assert f.nbytes() == a.nbytes assert f"{http_server}/a" in str(f) @@ -187,7 +186,9 @@ def test_retry_http_503_fails(tmpdir, xp, capfd): a.tofile(tmpdir / "a") b = xp.empty_like(a) - with pytest.raises(RuntimeError) as m, kvikio.defaults.set_http_max_attempts(2): + with pytest.raises(RuntimeError) as m, kvikio.defaults.set( + "http_max_attempts", 2 + ): with kvikio.RemoteFile.open_http(f"{server.url}/a") as f: f.read(b) @@ -212,7 +213,7 @@ def test_no_retries_ok(tmpdir): ) as server: http_server = server.url b = np.empty_like(a) - with kvikio.defaults.set_http_max_attempts(1): + with kvikio.defaults.set("http_max_attempts", 1): with kvikio.RemoteFile.open_http(f"{http_server}/a") as f: assert f.nbytes() == a.nbytes assert f"{http_server}/a" in str(f) @@ -227,7 +228,7 @@ def test_set_http_status_code(tmpdir): handler_options={"error_counter": ErrorCounter()}, ) as server: http_server = server.url - with kvikio.defaults.set_http_status_codes([429]): + with kvikio.defaults.set("http_status_codes", [429]): # this raises on the first 503 error, since it's not in the list. assert kvikio.defaults.http_status_codes() == [429] with pytest.raises(RuntimeError, match="503"): diff --git a/python/kvikio/tests/test_s3_io.py b/python/kvikio/tests/test_s3_io.py index 1f2bae95d0..45997b1e71 100644 --- a/python/kvikio/tests/test_s3_io.py +++ b/python/kvikio/tests/test_s3_io.py @@ -1,4 +1,4 @@ -# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2024-2025, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. import multiprocessing as mp @@ -124,16 +124,20 @@ def test_read(s3_base, xp, size, nthreads, tasksize, buffer_size): with s3_context( s3_base=s3_base, bucket=bucket_name, files={object_name: bytes(a)} ) as server_address: - with kvikio.defaults.set_num_threads(nthreads): - with kvikio.defaults.set_task_size(tasksize): - with kvikio.defaults.set_bounce_buffer_size(buffer_size): - with kvikio.RemoteFile.open_s3_url( - f"{server_address}/{bucket_name}/{object_name}" - ) as f: - assert f.nbytes() == a.nbytes - b = xp.empty_like(a) - assert f.read(buf=b) == a.nbytes - xp.testing.assert_array_equal(a, b) + with kvikio.defaults.set( + { + "num_threads": nthreads, + "task_size": tasksize, + "bounce_buffer_size": buffer_size, + } + ): + with kvikio.RemoteFile.open_s3_url( + f"{server_address}/{bucket_name}/{object_name}" + ) as f: + assert f.nbytes() == a.nbytes + b = xp.empty_like(a) + assert f.read(buf=b) == a.nbytes + xp.testing.assert_array_equal(a, b) @pytest.mark.parametrize(