Skip to content

Commit

Permalink
Added timeout to kvikio requests
Browse files Browse the repository at this point in the history
  • Loading branch information
TomAugspurger committed Feb 24, 2025
1 parent 25051e6 commit 5fb529b
Show file tree
Hide file tree
Showing 7 changed files with 206 additions and 23 deletions.
18 changes: 18 additions & 0 deletions cpp/include/kvikio/defaults.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class defaults {
std::size_t _gds_threshold;
std::size_t _bounce_buffer_size;
std::size_t _http_max_attempts;
long _http_timeout;
std::vector<int> _http_status_codes;

static unsigned int get_num_threads_from_env();
Expand Down Expand Up @@ -255,6 +256,23 @@ class defaults {
*/
static void http_max_attempts_reset(std::size_t attempts);

/**
* @brief The maximum time, in seconds, the transfer is allowed to complete.
*
* Set the value using `kvikio::default::http_timeout_reset()` or by setting the
* `KVIKIO_HTTP_TIMEOUT` environment variable. If not set, the value is 60.
*
* @return The maximum time the transfer is allowed to complete.
*/
[[nodiscard]] static long http_timeout();

/**
* @brief Reset the http timeout.
*
* @param timeout_seconds The maximum time the transfer is allowed to complete.
*/
static void http_timeout_reset(long timeout_seconds);

/**
* @brief The list of HTTP status codes to retry.
*
Expand Down
20 changes: 19 additions & 1 deletion cpp/src/defaults.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,16 @@ defaults::defaults()
}
_http_max_attempts = env;
}

// Determine the default value of `http_timeout`
{
const long env = getenv_or("KVIKIO_HTTP_TIMEOUT", 60);
if (env <= 0) {
throw std::invalid_argument("KVIKIO_HTTP_TIMEOUT has to be a positive integer");
}
_http_timeout = env;
}

// Determine the default value of `http_status_codes`
{
_http_status_codes =
Expand Down Expand Up @@ -211,10 +221,18 @@ void defaults::http_max_attempts_reset(std::size_t attempts)
}

std::vector<int> const& defaults::http_status_codes() { return instance()->_http_status_codes; }

void defaults::http_status_codes_reset(std::vector<int> status_codes)
{
instance()->_http_status_codes = std::move(status_codes);
}

long defaults::http_timeout() { return instance()->_http_timeout; }
void defaults::http_timeout_reset(long timeout_seconds)
{
if (timeout_seconds <= 0) {
throw std::invalid_argument("timeout_seconds must be a positive integer");
}
instance()->_http_timeout = timeout_seconds;
}

} // namespace kvikio
28 changes: 20 additions & 8 deletions cpp/src/shim/libcurl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,9 @@ CurlHandle::CurlHandle(LibCurl::UniqueHandlePtr handle,

// Make curl_easy_perform() fail when receiving HTTP code errors.
setopt(CURLOPT_FAILONERROR, 1L);

// Make requests time out after `value` seconds.
setopt(CURLOPT_TIMEOUT, kvikio::defaults::http_timeout());
}

CurlHandle::~CurlHandle() noexcept { LibCurl::instance().retain_handle(std::move(_handle)); }
Expand All @@ -125,9 +128,10 @@ void CurlHandle::perform()
auto max_delay = 4000; // milliseconds
auto http_max_attempts = kvikio::defaults::http_max_attempts();
auto& http_status_codes = kvikio::defaults::http_status_codes();
CURLcode err;

while (attempt_count++ < http_max_attempts) {
auto err = curl_easy_perform(handle());
err = curl_easy_perform(handle());

if (err == CURLE_OK) {
// We set CURLE_HTTP_RETURNED_ERROR, so >= 400 status codes are considered
Expand All @@ -141,7 +145,7 @@ void CurlHandle::perform()
(std::find(http_status_codes.begin(), http_status_codes.end(), http_code) !=
http_status_codes.end());

if (is_retryable_response) {
if ((err == CURLE_OPERATION_TIMEDOUT) || is_retryable_response) {
// backoff and retry again. With a base value of 500ms, we retry after
// 500ms, 1s, 2s, 4s, ...
auto const backoff_delay = base_delay * (1 << std::min(attempt_count - 1, 4));
Expand All @@ -150,9 +154,14 @@ void CurlHandle::perform()

// Only print this message out and sleep if we're actually going to retry again.
if (attempt_count < http_max_attempts) {
std::cout << "KvikIO: Got HTTP code " << http_code << ". Retrying after " << delay
<< "ms (attempt " << attempt_count << " of " << http_max_attempts << ")."
<< std::endl;
if (err == CURLE_OPERATION_TIMEDOUT) {
std::cout << "KvikIO: Timeout error. Retrying after " << delay << "ms (attempt "
<< attempt_count << " of " << http_max_attempts << ")." << std::endl;
} else {
std::cout << "KvikIO: Got HTTP code " << http_code << ". Retrying after " << delay
<< "ms (attempt " << attempt_count << " of " << http_max_attempts << ")."
<< std::endl;
}
std::this_thread::sleep_for(std::chrono::milliseconds(delay));
}
} else {
Expand All @@ -170,11 +179,14 @@ void CurlHandle::perform()
}
}

// We've exceeded the maximum number of requests. Fail with a good error
// message.
std::stringstream ss;
ss << "KvikIO: HTTP request reached maximum number of attempts (" << http_max_attempts
<< "). Got HTTP code " << http_code << ".";
<< "). Reason: ";
if (err == CURLE_OPERATION_TIMEDOUT) {
ss << "Operation timed out.";
} else {
ss << "Got HTTP code " << http_code << ".";
}
throw std::runtime_error(ss.str());
}
} // namespace kvikio
4 changes: 3 additions & 1 deletion docs/source/runtime_settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,10 @@ This setting can also be controlled by :py:func:`kvikio.defaults.bounce_buffer_s
#### HTTP Retries
-----------------

The behavior when a remote IO read returns a error can be controlled through the `KVIKIO_HTTP_STATUS_CODES` and `KVIKIO_HTTP_MAX_ATTEMPTS` environment variables.
The behavior when a remote IO read returns a error can be controlled through the `KVIKIO_HTTP_STATUS_CODES`, `KVIKIO_HTTP_MAX_ATTEMPTS`, and `KVIKIO_HTTP_TIMEOUT` environment variables.

`KVIKIO_HTTP_STATUS_CODES` controls the status codes to retry and can be controlled by :py:func:`kvikio.defaults.http_status_codes`, :py:func:`kvikio.defaults.http_status_codes_reset`, and :py:func:`kvikio.defaults.set_http_status_codes`.

`KVIKIO_HTTP_MAX_ATTEMPTS` controls the maximum number of attempts to make before throwing an exception and can be controlled by :py:func:`kvikio.defaults.http_max_attempts`, :py:func:`kvikio.defaults.http_max_attempts_reset`, and :py:func:`kvikio.defaults.set_http_max_attempts`.

`KVIKIO_HTTP_TIMEOUT` controls the maximum duration of the HTTP request and can be controlled by :py:func:`kvikio.defaults.http_timoeout`, :py:func:`kvikio.defaults.http_timeout_reset`, and :py:func:`kvikio.defaults.set_http_timeout`.
14 changes: 14 additions & 0 deletions python/kvikio/kvikio/_lib/defaults.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,15 @@ cdef extern from "<kvikio/defaults.hpp>" namespace "kvikio" nogil:
size_t cpp_bounce_buffer_size "kvikio::defaults::bounce_buffer_size"() except +
void cpp_bounce_buffer_size_reset \
"kvikio::defaults::bounce_buffer_size_reset"(size_t nbytes) except +

size_t cpp_http_max_attempts "kvikio::defaults::http_max_attempts"() except +
void cpp_http_max_attempts_reset \
"kvikio::defaults::http_max_attempts_reset"(size_t attempts) except +

long cpp_http_timeout "kvikio::defaults::http_timeout"() except +
void cpp_http_timeout_reset \
"kvikio::defaults::http_timeout_reset"(long timeout_seconds) except +

vector[int] cpp_http_status_codes "kvikio::defaults::http_status_codes"() except +
void cpp_http_status_codes_reset \
"kvikio::defaults::http_status_codes_reset"(vector[int] status_codes) except +
Expand Down Expand Up @@ -85,6 +91,14 @@ def http_max_attempts_reset(attempts: int) -> None:
cpp_http_max_attempts_reset(attempts)


def http_timeout() -> int:
return cpp_http_timeout()


def http_timeout_reset(timeout_seconds: int) -> None:
cpp_http_timeout_reset(timeout_seconds)


def http_status_codes() -> list[int]:
return cpp_http_status_codes()

Expand Down
43 changes: 43 additions & 0 deletions python/kvikio/kvikio/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,49 @@ def set_http_max_attempts(attempts: int):
http_max_attempts_reset(old_value)


def http_timeout() -> int:
"""Get the maximum duration, in seconds, HTTP requests are allowed to take.
Set the value using :py:func:``kvikio.defaults.set_http_timeout`` or by
setting the ``KVIKIO_HTTP_TIMEOUT`` environment variable. If not set, the
default value is 60.
Returns
-------
timeout : int
The maximum duration HTTP requests are allowed to take.
"""
return kvikio._lib.defaults.http_timeout()


def http_timeout_reset(timeout_seconds: int) -> None:
"""Reset the maximum duration HTTP requests are allowed to take.
Parameters
----------
timeout_seconds : int
The maximum duration, in seconds, HTTP requests are allowed to take.
"""
kvikio._lib.defaults.http_timeout_reset(timeout_seconds)


@contextlib.contextmanager
def set_http_timeout(timeout_seconds: int):
"""Context for resetting the the maximum duration of HTTP requests.
Parameters
----------
timeout_seconds : int
The maximum duration, in seconds, HTTP requests are allowed to take.
"""
old_value = http_timeout()
try:
http_timeout_reset(timeout_seconds)
yield
finally:
http_timeout_reset(old_value)


def http_status_codes() -> list[int]:
"""Get the list of HTTP status codes to retry.
Expand Down
Loading

0 comments on commit 5fb529b

Please sign in to comment.