From 5fb529b556d1869d3d627daa7e4793cbf8c4282f Mon Sep 17 00:00:00 2001 From: Tom Augspurger Date: Tue, 11 Feb 2025 23:05:57 +0000 Subject: [PATCH] Added timeout to kvikio requests --- cpp/include/kvikio/defaults.hpp | 18 +++++ cpp/src/defaults.cpp | 20 ++++- cpp/src/shim/libcurl.cpp | 28 +++++-- docs/source/runtime_settings.rst | 4 +- python/kvikio/kvikio/_lib/defaults.pyx | 14 ++++ python/kvikio/kvikio/defaults.py | 43 +++++++++++ python/kvikio/tests/test_http_io.py | 102 +++++++++++++++++++++---- 7 files changed, 206 insertions(+), 23 deletions(-) diff --git a/cpp/include/kvikio/defaults.hpp b/cpp/include/kvikio/defaults.hpp index 4334549d23..8746628b32 100644 --- a/cpp/include/kvikio/defaults.hpp +++ b/cpp/include/kvikio/defaults.hpp @@ -69,6 +69,7 @@ class defaults { std::size_t _gds_threshold; std::size_t _bounce_buffer_size; std::size_t _http_max_attempts; + long _http_timeout; std::vector _http_status_codes; static unsigned int get_num_threads_from_env(); @@ -255,6 +256,23 @@ class defaults { */ static void http_max_attempts_reset(std::size_t attempts); + /** + * @brief The maximum time, in seconds, the transfer is allowed to complete. + * + * Set the value using `kvikio::default::http_timeout_reset()` or by setting the + * `KVIKIO_HTTP_TIMEOUT` environment variable. If not set, the value is 60. + * + * @return The maximum time the transfer is allowed to complete. + */ + [[nodiscard]] static long http_timeout(); + + /** + * @brief Reset the http timeout. + * + * @param timeout_seconds The maximum time the transfer is allowed to complete. + */ + static void http_timeout_reset(long timeout_seconds); + /** * @brief The list of HTTP status codes to retry. * diff --git a/cpp/src/defaults.cpp b/cpp/src/defaults.cpp index e0a908cf4d..6ff3b20f21 100644 --- a/cpp/src/defaults.cpp +++ b/cpp/src/defaults.cpp @@ -129,6 +129,16 @@ defaults::defaults() } _http_max_attempts = env; } + + // Determine the default value of `http_timeout` + { + const long env = getenv_or("KVIKIO_HTTP_TIMEOUT", 60); + if (env <= 0) { + throw std::invalid_argument("KVIKIO_HTTP_TIMEOUT has to be a positive integer"); + } + _http_timeout = env; + } + // Determine the default value of `http_status_codes` { _http_status_codes = @@ -211,10 +221,18 @@ void defaults::http_max_attempts_reset(std::size_t attempts) } std::vector const& defaults::http_status_codes() { return instance()->_http_status_codes; } - void defaults::http_status_codes_reset(std::vector status_codes) { instance()->_http_status_codes = std::move(status_codes); } +long defaults::http_timeout() { return instance()->_http_timeout; } +void defaults::http_timeout_reset(long timeout_seconds) +{ + if (timeout_seconds <= 0) { + throw std::invalid_argument("timeout_seconds must be a positive integer"); + } + instance()->_http_timeout = timeout_seconds; +} + } // namespace kvikio diff --git a/cpp/src/shim/libcurl.cpp b/cpp/src/shim/libcurl.cpp index 05b6e02d10..77c05135ff 100644 --- a/cpp/src/shim/libcurl.cpp +++ b/cpp/src/shim/libcurl.cpp @@ -111,6 +111,9 @@ CurlHandle::CurlHandle(LibCurl::UniqueHandlePtr handle, // Make curl_easy_perform() fail when receiving HTTP code errors. setopt(CURLOPT_FAILONERROR, 1L); + + // Make requests time out after `value` seconds. + setopt(CURLOPT_TIMEOUT, kvikio::defaults::http_timeout()); } CurlHandle::~CurlHandle() noexcept { LibCurl::instance().retain_handle(std::move(_handle)); } @@ -125,9 +128,10 @@ void CurlHandle::perform() auto max_delay = 4000; // milliseconds auto http_max_attempts = kvikio::defaults::http_max_attempts(); auto& http_status_codes = kvikio::defaults::http_status_codes(); + CURLcode err; while (attempt_count++ < http_max_attempts) { - auto err = curl_easy_perform(handle()); + err = curl_easy_perform(handle()); if (err == CURLE_OK) { // We set CURLE_HTTP_RETURNED_ERROR, so >= 400 status codes are considered @@ -141,7 +145,7 @@ void CurlHandle::perform() (std::find(http_status_codes.begin(), http_status_codes.end(), http_code) != http_status_codes.end()); - if (is_retryable_response) { + if ((err == CURLE_OPERATION_TIMEDOUT) || is_retryable_response) { // backoff and retry again. With a base value of 500ms, we retry after // 500ms, 1s, 2s, 4s, ... auto const backoff_delay = base_delay * (1 << std::min(attempt_count - 1, 4)); @@ -150,9 +154,14 @@ void CurlHandle::perform() // Only print this message out and sleep if we're actually going to retry again. if (attempt_count < http_max_attempts) { - std::cout << "KvikIO: Got HTTP code " << http_code << ". Retrying after " << delay - << "ms (attempt " << attempt_count << " of " << http_max_attempts << ")." - << std::endl; + if (err == CURLE_OPERATION_TIMEDOUT) { + std::cout << "KvikIO: Timeout error. Retrying after " << delay << "ms (attempt " + << attempt_count << " of " << http_max_attempts << ")." << std::endl; + } else { + std::cout << "KvikIO: Got HTTP code " << http_code << ". Retrying after " << delay + << "ms (attempt " << attempt_count << " of " << http_max_attempts << ")." + << std::endl; + } std::this_thread::sleep_for(std::chrono::milliseconds(delay)); } } else { @@ -170,11 +179,14 @@ void CurlHandle::perform() } } - // We've exceeded the maximum number of requests. Fail with a good error - // message. std::stringstream ss; ss << "KvikIO: HTTP request reached maximum number of attempts (" << http_max_attempts - << "). Got HTTP code " << http_code << "."; + << "). Reason: "; + if (err == CURLE_OPERATION_TIMEDOUT) { + ss << "Operation timed out."; + } else { + ss << "Got HTTP code " << http_code << "."; + } throw std::runtime_error(ss.str()); } } // namespace kvikio diff --git a/docs/source/runtime_settings.rst b/docs/source/runtime_settings.rst index 5847c1ffbe..52fa41a4bf 100644 --- a/docs/source/runtime_settings.rst +++ b/docs/source/runtime_settings.rst @@ -46,8 +46,10 @@ This setting can also be controlled by :py:func:`kvikio.defaults.bounce_buffer_s #### HTTP Retries ----------------- -The behavior when a remote IO read returns a error can be controlled through the `KVIKIO_HTTP_STATUS_CODES` and `KVIKIO_HTTP_MAX_ATTEMPTS` environment variables. +The behavior when a remote IO read returns a error can be controlled through the `KVIKIO_HTTP_STATUS_CODES`, `KVIKIO_HTTP_MAX_ATTEMPTS`, and `KVIKIO_HTTP_TIMEOUT` environment variables. `KVIKIO_HTTP_STATUS_CODES` controls the status codes to retry and can be controlled by :py:func:`kvikio.defaults.http_status_codes`, :py:func:`kvikio.defaults.http_status_codes_reset`, and :py:func:`kvikio.defaults.set_http_status_codes`. `KVIKIO_HTTP_MAX_ATTEMPTS` controls the maximum number of attempts to make before throwing an exception and can be controlled by :py:func:`kvikio.defaults.http_max_attempts`, :py:func:`kvikio.defaults.http_max_attempts_reset`, and :py:func:`kvikio.defaults.set_http_max_attempts`. + +`KVIKIO_HTTP_TIMEOUT` controls the maximum duration of the HTTP request and can be controlled by :py:func:`kvikio.defaults.http_timoeout`, :py:func:`kvikio.defaults.http_timeout_reset`, and :py:func:`kvikio.defaults.set_http_timeout`. diff --git a/python/kvikio/kvikio/_lib/defaults.pyx b/python/kvikio/kvikio/_lib/defaults.pyx index 0770cb557a..638967dda2 100644 --- a/python/kvikio/kvikio/_lib/defaults.pyx +++ b/python/kvikio/kvikio/_lib/defaults.pyx @@ -29,9 +29,15 @@ cdef extern from "" namespace "kvikio" nogil: size_t cpp_bounce_buffer_size "kvikio::defaults::bounce_buffer_size"() except + void cpp_bounce_buffer_size_reset \ "kvikio::defaults::bounce_buffer_size_reset"(size_t nbytes) except + + size_t cpp_http_max_attempts "kvikio::defaults::http_max_attempts"() except + void cpp_http_max_attempts_reset \ "kvikio::defaults::http_max_attempts_reset"(size_t attempts) except + + + long cpp_http_timeout "kvikio::defaults::http_timeout"() except + + void cpp_http_timeout_reset \ + "kvikio::defaults::http_timeout_reset"(long timeout_seconds) except + + vector[int] cpp_http_status_codes "kvikio::defaults::http_status_codes"() except + void cpp_http_status_codes_reset \ "kvikio::defaults::http_status_codes_reset"(vector[int] status_codes) except + @@ -85,6 +91,14 @@ def http_max_attempts_reset(attempts: int) -> None: cpp_http_max_attempts_reset(attempts) +def http_timeout() -> int: + return cpp_http_timeout() + + +def http_timeout_reset(timeout_seconds: int) -> None: + cpp_http_timeout_reset(timeout_seconds) + + def http_status_codes() -> list[int]: return cpp_http_status_codes() diff --git a/python/kvikio/kvikio/defaults.py b/python/kvikio/kvikio/defaults.py index 4201cc29a3..4bfb5646fb 100644 --- a/python/kvikio/kvikio/defaults.py +++ b/python/kvikio/kvikio/defaults.py @@ -297,6 +297,49 @@ def set_http_max_attempts(attempts: int): http_max_attempts_reset(old_value) +def http_timeout() -> int: + """Get the maximum duration, in seconds, HTTP requests are allowed to take. + + Set the value using :py:func:``kvikio.defaults.set_http_timeout`` or by + setting the ``KVIKIO_HTTP_TIMEOUT`` environment variable. If not set, the + default value is 60. + + Returns + ------- + timeout : int + The maximum duration HTTP requests are allowed to take. + """ + return kvikio._lib.defaults.http_timeout() + + +def http_timeout_reset(timeout_seconds: int) -> None: + """Reset the maximum duration HTTP requests are allowed to take. + + Parameters + ---------- + timeout_seconds : int + The maximum duration, in seconds, HTTP requests are allowed to take. + """ + kvikio._lib.defaults.http_timeout_reset(timeout_seconds) + + +@contextlib.contextmanager +def set_http_timeout(timeout_seconds: int): + """Context for resetting the the maximum duration of HTTP requests. + + Parameters + ---------- + timeout_seconds : int + The maximum duration, in seconds, HTTP requests are allowed to take. + """ + old_value = http_timeout() + try: + http_timeout_reset(timeout_seconds) + yield + finally: + http_timeout_reset(old_value) + + def http_status_codes() -> list[int]: """Get the list of HTTP status codes to retry. diff --git a/python/kvikio/tests/test_http_io.py b/python/kvikio/tests/test_http_io.py index e62dbb81af..05a61831fa 100644 --- a/python/kvikio/tests/test_http_io.py +++ b/python/kvikio/tests/test_http_io.py @@ -3,6 +3,7 @@ import http +import time from http.server import SimpleHTTPRequestHandler from typing import Literal @@ -22,11 +23,12 @@ ) -class ErrorCounter: +class RequestCounter: # ThreadedHTTPServer creates a new handler per request. # This lets us share some state between requests. def __init__(self): - self.value = 0 + self.error_count = 0 + self.delay_count = 0 class HTTP503Handler(SimpleHTTPRequestHandler): @@ -35,34 +37,49 @@ class HTTP503Handler(SimpleHTTPRequestHandler): Parameters ---------- - error_counter : ErrorCounter - A class with a mutable `value` for the number of 503 errors that have - been returned. + request_counter : RequestCounter + A class with a mutable values to track the number of 503 errors and delayed + responses that have been returned. max_error_count : int The number of times to respond with a 503 before responding normally. + delay_duration : int + The duration, in seconds, to sleep before responding to a GET request + when this handler has handled fewer than `max_delay_count` requests. + max_delay_count : int + The maximum number of requests to delay for `delay_duration`. """ def __init__( self, *args, directory=None, - error_counter: ErrorCounter = ErrorCounter(), + request_counter: RequestCounter = RequestCounter(), max_error_count: int = 1, + delay_duration: int = 0, + max_delay_count: int = 1, **kwargs, ): self.max_error_count = max_error_count - self.error_counter = error_counter + self.request_counter = request_counter + self.delay_duration = delay_duration + self.max_delay_count = max_delay_count super().__init__(*args, directory=directory, **kwargs) def _do_with_error_count(self, method: Literal["GET", "HEAD"]) -> None: - if self.error_counter.value < self.max_error_count: - self.error_counter.value += 1 + if self.request_counter.error_count < self.max_error_count: + self.request_counter.error_count += 1 self.send_error(http.HTTPStatus.SERVICE_UNAVAILABLE) - self.send_header("CurrentErrorCount", str(self.error_counter.value)) + self.send_header("CurrentErrorCount", str(self.request_counter.error_count)) self.send_header("MaxErrorCount", str(self.max_error_count)) return None else: if method == "GET": + if ( + self.delay_duration > 0 + and self.request_counter.delay_count < self.max_delay_count + ): + self.request_counter.delay_count += 1 + time.sleep(self.delay_duration) return super().do_GET() else: return super().do_HEAD() @@ -166,7 +183,7 @@ def test_retry_http_503_ok(tmpdir, xp): tmpdir, max_lifetime=60, handler=HTTP503Handler, - handler_options={"error_counter": ErrorCounter()}, + handler_options={"request_counter": RequestCounter()}, ) as server: http_server = server.url b = xp.empty_like(a) @@ -181,7 +198,7 @@ def test_retry_http_503_fails(tmpdir, xp, capfd): tmpdir, max_lifetime=60, handler=HTTP503Handler, - handler_options={"error_counter": ErrorCounter(), "max_error_count": 100}, + handler_options={"request_counter": RequestCounter(), "max_error_count": 100}, ) as server: a = xp.arange(100, dtype="uint8") a.tofile(tmpdir / "a") @@ -219,12 +236,34 @@ def test_no_retries_ok(tmpdir): f.read(b) +def test_retry_timeout_ok(tmpdir): + a = np.arange(100, dtype="uint8") + a.tofile(tmpdir / "a") + with LocalHttpServer( + tmpdir, + max_lifetime=60, + handler=HTTP503Handler, + handler_options={ + "request_counter": RequestCounter(), + "delay_duration": 2, + "max_error_count": 0, + }, + ) as server: + http_server = server.url + b = np.empty_like(a) + with kvikio.defaults.set_http_timeout(1): + with kvikio.RemoteFile.open_http(f"{http_server}/a") as f: + assert f.nbytes() == a.nbytes + assert f"{http_server}/a" in str(f) + f.read(b) + + def test_set_http_status_code(tmpdir): with LocalHttpServer( tmpdir, max_lifetime=60, handler=HTTP503Handler, - handler_options={"error_counter": ErrorCounter()}, + handler_options={"request_counter": RequestCounter()}, ) as server: http_server = server.url with kvikio.defaults.set_http_status_codes([429]): @@ -233,3 +272,40 @@ def test_set_http_status_code(tmpdir): with pytest.raises(RuntimeError, match="503"): with kvikio.RemoteFile.open_http(f"{http_server}/a"): pass + + +def test_timeout_raises(tmpdir, capfd): + # This server / settings setup will make 2 requests. + # The first request times out after 1s and is retried. + # The second request times out after 1s and throws an exception. + a = np.arange(100, dtype="uint8") + a.tofile(tmpdir / "a") + + with LocalHttpServer( + tmpdir, + max_lifetime=60, + handler=HTTP503Handler, + handler_options={ + "request_counter": RequestCounter(), + "max_error_count": 0, + "delay_duration": 2, + "max_delay_count": 10, + }, + ) as server: + http_server = server.url + b = np.empty_like(a) + with kvikio.defaults.set_http_max_attempts(2), kvikio.defaults.set_http_timeout( + 1 + ): + # TODO: this should raise a TimeoutError + with pytest.raises(RuntimeError) as m: + with kvikio.RemoteFile.open_http(f"{http_server}/a") as f: + assert f.nbytes() == a.nbytes + f.read(b) + assert m.match("KvikIO: HTTP request reached maximum number of attempts") + assert m.match("Operation timed out.") + + captured = capfd.readouterr() + records = captured.out.strip().split("\n") + assert len(records) == 1 + assert records[0] == "KvikIO: Timeout error. Retrying after 500ms (attempt 1 of 2)."