Skip to content

Commit

Permalink
Enable tracing of thread pool tasks using NVTX (#630)
Browse files Browse the repository at this point in the history
This PR implements the basic feature outlined in #631. 
The two good-to-haves are currently blocked.

Authors:
  - Tianyu Liu (https://github.com/kingcrimsontianyu)

Approvers:
  - Mads R. B. Kristensen (https://github.com/madsbk)
  - Lawrence Mitchell (https://github.com/wence-)

URL: #630
  • Loading branch information
kingcrimsontianyu authored Feb 18, 2025
1 parent 887679b commit 096ac0f
Show file tree
Hide file tree
Showing 10 changed files with 368 additions and 135 deletions.
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ set(SOURCES
"src/error.cpp"
"src/file_handle.cpp"
"src/file_utils.cpp"
"src/nvtx.cpp"
"src/posix_io.cpp"
"src/shim/cuda.cpp"
"src/shim/cufile.cpp"
Expand Down
200 changes: 200 additions & 0 deletions cpp/include/kvikio/nvtx.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
/*
* Copyright (c) 2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <cstdint>

#ifdef KVIKIO_CUDA_FOUND
#include <nvtx3/nvtx3.hpp>
#endif

#include <kvikio/shim/cuda.hpp>
#include <kvikio/utils.hpp>

namespace kvikio {

#ifdef KVIKIO_CUDA_FOUND
/**
* @brief Tag type for libkvikio's NVTX domain.
*/
struct libkvikio_domain {
static constexpr char const* name{"libkvikio"};
};

using nvtx_scoped_range_type = nvtx3::scoped_range_in<libkvikio_domain>;
using nvtx_registered_string_type = nvtx3::registered_string_in<libkvikio_domain>;

// Macro to concatenate two tokens x and y.
#define KVIKIO_CONCAT_HELPER(x, y) x##y
#define KVIKIO_CONCAT(x, y) KVIKIO_CONCAT_HELPER(x, y)

// Macro to create a static, registered string that will not have a name conflict with any
// registered string defined in the same scope.
#define KVIKIO_REGISTER_STRING(message) \
[](const char* a_message) -> auto& { \
static kvikio::nvtx_registered_string_type a_reg_str{a_message}; \
return a_reg_str; \
}(message)

// Implementation of KVIKIO_NVTX_FUNC_RANGE()
#define KVIKIO_NVTX_FUNC_RANGE_IMPL() NVTX3_FUNC_RANGE_IN(kvikio::libkvikio_domain)

// Implementation of KVIKIO_NVTX_SCOPED_RANGE(...)
#define KVIKIO_NVTX_SCOPED_RANGE_IMPL_3(message, payload_v, color) \
kvikio::nvtx_scoped_range_type KVIKIO_CONCAT(_kvikio_nvtx_range, __LINE__) \
{ \
nvtx3::event_attributes \
{ \
KVIKIO_REGISTER_STRING(message), nvtx3::payload{kvikio::convert_to_64bit(payload_v)}, color \
} \
}
#define KVIKIO_NVTX_SCOPED_RANGE_IMPL_2(message, payload) \
KVIKIO_NVTX_SCOPED_RANGE_IMPL_3(message, payload, kvikio::nvtx_manager::default_color())
#define KVIKIO_NVTX_SCOPED_RANGE_SELECTOR(_1, _2, _3, NAME, ...) NAME
#define KVIKIO_NVTX_SCOPED_RANGE_IMPL(...) \
KVIKIO_NVTX_SCOPED_RANGE_SELECTOR( \
__VA_ARGS__, KVIKIO_NVTX_SCOPED_RANGE_IMPL_3, KVIKIO_NVTX_SCOPED_RANGE_IMPL_2) \
(__VA_ARGS__)

// Implementation of KVIKIO_NVTX_MARKER(message, payload)
#define KVIKIO_NVTX_MARKER_IMPL(message, payload_v) \
nvtx3::mark_in<kvikio::libkvikio_domain>(nvtx3::event_attributes{ \
KVIKIO_REGISTER_STRING(message), nvtx3::payload{kvikio::convert_to_64bit(payload_v)}})

#endif

#ifdef KVIKIO_CUDA_FOUND
using nvtx_color_type = nvtx3::color;
#else
using nvtx_color_type = int;
#endif

/**
* @brief Utility singleton class for NVTX annotation.
*/
class nvtx_manager {
public:
static nvtx_manager& instance() noexcept;

/**
* @brief Return the default color.
*
* @return Default color.
*/
static const nvtx_color_type& default_color() noexcept;

/**
* @brief Return the color at the given index from the internal color palette whose size n is a
* power of 2. The index may exceed the size of the color palette, in which case it wraps around,
* i.e. (idx mod n).
*
* @param idx The index value.
* @return The color picked from the internal color palette.
*/
static const nvtx_color_type& get_color_by_index(std::uint64_t idx) noexcept;

/**
* @brief Rename the current thread under the KvikIO NVTX domain.
*
* @note This NVTX feature is currently not supported by the Nsight System profiler. As a result,
* the OS thread will not be renamed in the nsys-ui.
*/
static void rename_current_thread(std::string_view new_name) noexcept;

nvtx_manager(nvtx_manager const&) = delete;
nvtx_manager& operator=(nvtx_manager const&) = delete;
nvtx_manager(nvtx_manager&&) = delete;
nvtx_manager& operator=(nvtx_manager&&) = delete;

private:
nvtx_manager() = default;
};

/**
* @brief Convenience macro for generating an NVTX range in the `libkvikio` domain
* from the lifetime of a function.
*
* Takes no argument. The name of the immediately enclosing function returned by `__func__` is used
* as the message.
*
* Example:
* ```
* void some_function(){
* KVIKIO_NVTX_FUNC_RANGE(); // The name `some_function` is used as the message
* ...
* }
* ```
*/
#ifdef KVIKIO_CUDA_FOUND
#define KVIKIO_NVTX_FUNC_RANGE() KVIKIO_NVTX_FUNC_RANGE_IMPL()
#else
#define KVIKIO_NVTX_FUNC_RANGE(...) \
do { \
} while (0)
#endif

/**
* @brief Convenience macro for generating an NVTX scoped range in the `libkvikio` domain to
* annotate a time duration.
*
* @param message String literal for NVTX annotation. To improve profile-time performance, the
* string literal is registered in NVTX.
* @param payload NVTX payload.
* @param color (Optional) NVTX color. If unspecified, a default NVTX color is used.
*
* Example:
* ```
* void some_function(){
* KVIKIO_NVTX_SCOPED_RANGE("my function", 42);
* ...
* }
* ```
*/
#ifdef KVIKIO_CUDA_FOUND
#define KVIKIO_NVTX_SCOPED_RANGE(...) KVIKIO_NVTX_SCOPED_RANGE_IMPL(__VA_ARGS__)
#else
#define KVIKIO_NVTX_SCOPED_RANGE(message, payload, ...) \
do { \
} while (0)
#endif

/**
* @brief Convenience macro for generating an NVTX marker in the `libkvikio` domain to annotate a
* certain time point.
*
* @param message String literal for NVTX annotation. To improve profile-time performance, the
* string literal is registered in NVTX.
* @param payload NVTX payload.
*
* Example:
* ```
* std::future<void> some_function(){
* size_t io_size{2077};
* KVIKIO_NVTX_MARKER("I/O operation", io_size);
* perform_async_io_operation(io_size);
* ...
* }
* ```
*/
#ifdef KVIKIO_CUDA_FOUND
#define KVIKIO_NVTX_MARKER(message, payload) KVIKIO_NVTX_MARKER_IMPL(message, payload)
#else
#define KVIKIO_NVTX_MARKER(message, payload) \
do { \
} while (0)
#endif

} // namespace kvikio
58 changes: 49 additions & 9 deletions cpp/include/kvikio/parallel_operation.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2021-2024, NVIDIA CORPORATION.
* Copyright (c) 2021-2025, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,6 +15,7 @@
*/
#pragma once

#include <atomic>
#include <cassert>
#include <future>
#include <numeric>
Expand All @@ -24,18 +25,51 @@

#include <kvikio/defaults.hpp>
#include <kvikio/error.hpp>
#include <kvikio/nvtx.hpp>
#include <kvikio/utils.hpp>

namespace kvikio {

namespace detail {

/**
* @brief Determine the NVTX color and call index. They are used to identify tasks from different
* pread/pwrite calls. Tasks from the same pread/pwrite call are given the same color and call
* index. The call index is atomically incremented on each pread/pwrite call, and will wrap around
* once it reaches the maximum value the integer type `std::uint64_t` can hold (this overflow
* behavior is well-defined in C++). The color is picked from an internal color palette according to
* the call index value.
*
* @return A pair of NVTX color and call index.
*/
inline const std::pair<const nvtx_color_type&, std::uint64_t> get_next_color_and_call_idx() noexcept
{
static std::atomic_uint64_t call_counter{1ull};
auto call_idx = call_counter.fetch_add(1ull, std::memory_order_relaxed);
auto& nvtx_color = nvtx_manager::get_color_by_index(call_idx);
return {nvtx_color, call_idx};
}

template <typename F, typename T>
std::future<std::size_t> submit_task(
F op, T buf, std::size_t size, std::size_t file_offset, std::size_t devPtr_offset)
std::future<std::size_t> submit_task(F op,
T buf,
std::size_t size,
std::size_t file_offset,
std::size_t devPtr_offset,
std::uint64_t nvtx_payload = 0ull,
nvtx_color_type nvtx_color = nvtx_manager::default_color())
{
return defaults::thread_pool().submit_task(
[=] { return op(buf, size, file_offset, devPtr_offset); });
return defaults::thread_pool().submit_task([=] {
KVIKIO_NVTX_SCOPED_RANGE("task", nvtx_payload, nvtx_color);

// Rename the worker thread in the thread pool to improve clarity from nsys-ui.
// Note: This NVTX feature is currently not supported by nsys-ui.
thread_local std::once_flag call_once_per_thread;
std::call_once(call_once_per_thread,
[] { nvtx_manager::rename_current_thread("thread pool"); });

return op(buf, size, file_offset, devPtr_offset);
});
}

} // namespace detail
Expand All @@ -58,13 +92,15 @@ std::future<std::size_t> parallel_io(F op,
std::size_t size,
std::size_t file_offset,
std::size_t task_size,
std::size_t devPtr_offset)
std::size_t devPtr_offset,
std::uint64_t call_idx = 0,
nvtx_color_type nvtx_color = nvtx_manager::default_color())
{
if (task_size == 0) { throw std::invalid_argument("`task_size` cannot be zero"); }

// Single-task guard
if (task_size >= size || page_size >= size) {
return detail::submit_task(op, buf, size, file_offset, devPtr_offset);
return detail::submit_task(op, buf, size, file_offset, devPtr_offset, call_idx, nvtx_color);
}

// We know an upper bound of the total number of tasks
Expand All @@ -73,14 +109,18 @@ std::future<std::size_t> parallel_io(F op,

// 1) Submit `task_size` sized tasks
while (size >= task_size) {
tasks.push_back(detail::submit_task(op, buf, task_size, file_offset, devPtr_offset));
tasks.push_back(
detail::submit_task(op, buf, task_size, file_offset, devPtr_offset, call_idx, nvtx_color));
file_offset += task_size;
devPtr_offset += task_size;
size -= task_size;
}

// 2) Submit a task for the remainder
if (size > 0) { tasks.push_back(detail::submit_task(op, buf, size, file_offset, devPtr_offset)); }
if (size > 0) {
tasks.push_back(
detail::submit_task(op, buf, size, file_offset, devPtr_offset, call_idx, nvtx_color));
}

// Finally, we sum the result of all tasks.
auto gather_tasks = [](std::vector<std::future<std::size_t>>&& tasks) -> std::size_t {
Expand Down
1 change: 1 addition & 0 deletions cpp/include/kvikio/posix_io.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include <kvikio/bounce_buffer.hpp>
#include <kvikio/error.hpp>
#include <kvikio/nvtx.hpp>
#include <kvikio/shim/cuda.hpp>
#include <kvikio/utils.hpp>

Expand Down
Loading

0 comments on commit 096ac0f

Please sign in to comment.