Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add optional lowering to Host IR in FusionKernelRuntime #3835

Merged
merged 57 commits into from
Feb 14, 2025
Merged
Show file tree
Hide file tree
Changes from 44 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
2e8c459
save
Jan 23, 2025
781cbf9
savE
Jan 23, 2025
fc7dbe5
save
Jan 23, 2025
2053256
save
Jan 24, 2025
acae480
save
Jan 28, 2025
b077593
save
Jan 28, 2025
8ed7a99
save
Jan 28, 2025
8543fe0
save
Jan 28, 2025
3bc4611
save
Jan 29, 2025
e825350
save
Jan 31, 2025
e99b427
update
Jan 31, 2025
af9e319
update
Feb 4, 2025
eed20a5
save
Feb 4, 2025
86d6312
update
Feb 4, 2025
9b6b88a
update
Feb 4, 2025
775bb13
save
Feb 5, 2025
8e768da
change to test_p
Feb 5, 2025
6d5a5f3
lint
Feb 5, 2025
92b52d7
remove unused code
Feb 5, 2025
967065c
Merge branch 'main' into nsarka/hostir-integration-2
nsarka Feb 5, 2025
e7191e7
remove unused code
Feb 5, 2025
f69d133
Merge branch 'main' into nsarka/hostir-integration-2
nsarka Feb 5, 2025
935c97f
fix merge issue and lint
Feb 5, 2025
75dbe21
remove resizeKernelExecutors
Feb 6, 2025
3feca07
rename
Feb 6, 2025
e7ff0c3
Update csrc/runtime/fusion_kernel_runtime.cpp
nsarka Feb 6, 2025
fef998a
remove args to gtests
Feb 6, 2025
69f7ce0
fix
Feb 6, 2025
2cd1fb8
add check
nsarka Feb 7, 2025
68ccf58
replace overloaded runWithInput with runWithPolymorphicValues
nsarka Feb 7, 2025
b2a1970
save
nsarka Feb 7, 2025
2a81b26
const
nsarka Feb 7, 2025
a0f0924
change to int64_t
nsarka Feb 7, 2025
930f902
Merge branch 'main' into nsarka/hostir-integration-2
nsarka Feb 7, 2025
37202f5
const ref
wujingyue Feb 7, 2025
66af390
Update csrc/host_ir/container.cpp
nsarka Feb 7, 2025
2852174
add data attributes
nsarka Feb 7, 2025
3712028
Merge branch 'nsarka/hostir-integration-2' of github.com:nsarka/Fuser…
nsarka Feb 7, 2025
6d830d1
Communication scheduler
nsarka Feb 10, 2025
f9a136f
Revert "Communication scheduler"
nsarka Feb 11, 2025
3f79ce6
Sam's feedback
nsarka Feb 11, 2025
0b73469
remove argumentmanager
nsarka Feb 11, 2025
ab7147d
linter
nsarka Feb 11, 2025
fb16dac
Merge branch 'main' into nsarka/hostir-integration-2
nsarka Feb 11, 2025
a1446ee
Update csrc/host_ir/container.h
nsarka Feb 12, 2025
5d289a5
remove members
nsarka Feb 12, 2025
f9160ca
print host ir evaluator run end
nsarka Feb 12, 2025
de252d4
review feedback
nsarka Feb 12, 2025
6ab548d
update
nsarka Feb 12, 2025
90f5d85
update
nsarka Feb 12, 2025
2936e9d
Merge branch 'main' into nsarka/hostir-integration-2
nsarka Feb 12, 2025
6f78354
lint
nsarka Feb 12, 2025
f543414
revert to make_unique
nsarka Feb 13, 2025
47ed1be
Merge branch 'main' into nsarka/hostir-integration-2
nsarka Feb 13, 2025
3645308
lint
nsarka Feb 13, 2025
23498fa
Merge branch 'main' into nsarka/hostir-integration-2
nsarka Feb 13, 2025
34b9970
Merge branch 'main' into nsarka/hostir-integration-2
nsarka Feb 14, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions csrc/host_ir/container.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ namespace nvfuser {

namespace hir {

HostIrContainer::HostIrContainer(int64_t num_kernel_executors)
: kernel_executors_(num_kernel_executors) {}

HostIrContainer::~HostIrContainer() = default;

Stream* HostIrContainer::getDefaultStream() {
Expand All @@ -41,12 +44,14 @@ const std::vector<Expr*>& HostIrContainer::topLevelExprs() const {

void HostIrContainer::pushBackTopLevelExprs(Expr* expr) {
assertInContainer(expr, "Cannot add expr, ");
return top_level_exprs_.push_back(expr);
top_level_exprs_.push_back(expr);
}

void HostIrContainer::pushBackKernelExecutor(
void HostIrContainer::setKernelExecutor(
int64_t index,
std::unique_ptr<KernelExecutor> ke) {
return kernel_executors_.push_back(std::move(ke));
NVF_ERROR(kernel_executors_.at(index) == nullptr);
kernel_executors_.at(index) = std::move(ke);
}

KernelExecutor* HostIrContainer::getKernelExecutor(int64_t index) const {
Expand Down
7 changes: 6 additions & 1 deletion csrc/host_ir/container.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ programs. Later, we it should support non-linear program having a DAG structure.
class HostIrContainer final : public Fusion {
public:
HostIrContainer() = default;
HostIrContainer(int64_t sz);
nsarka marked this conversation as resolved.
Show resolved Hide resolved
HostIrContainer(const HostIrContainer&) = delete;
HostIrContainer& operator=(const HostIrContainer&) = delete;

Expand All @@ -43,7 +44,11 @@ class HostIrContainer final : public Fusion {

void pushBackTopLevelExprs(Expr* expr);

void pushBackKernelExecutor(std::unique_ptr<KernelExecutor> ke);
void setKernelExecutor(int64_t index, std::unique_ptr<KernelExecutor> ke);
nsarka marked this conversation as resolved.
Show resolved Hide resolved

bool hasKernelExecutor(int64_t index) const {
return kernel_executors_.at(index) != nullptr;
}

KernelExecutor* getKernelExecutor(int64_t index) const;

Expand Down
34 changes: 26 additions & 8 deletions csrc/host_ir/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -204,13 +204,7 @@ HostIrEvaluator::HostIrEvaluator(
expr_evaluator_.bind("numberOfStreams", params_.number_of_streams);
}

std::vector<at::Tensor> HostIrEvaluator::runWithInput(
std::unordered_map<Val*, c10::IValue> val_to_IValue) {
// process input values
for (const auto& [val, ivalue] : val_to_IValue) {
expr_evaluator_.bind(val, IValueToPolymorphicValue(ivalue));
}

std::vector<at::Tensor> HostIrEvaluator::dispatchAndCollectOutputs() {
// Interpret each instruction in an "eager" way by iterate over the Host Ir
// Container's top level expression list
for (auto expr : container_->topLevelExprs()) {
Expand All @@ -221,6 +215,26 @@ std::vector<at::Tensor> HostIrEvaluator::runWithInput(
return getKnownTensorOrUndefined(container_->outputs(), expr_evaluator_);
}

std::vector<at::Tensor> HostIrEvaluator::runWithInput(
std::unordered_map<Val*, c10::IValue> val_to_IValue) {
// process input values, converting IValue to PolymorphicValue
for (const auto& [val, ivalue] : val_to_IValue) {
expr_evaluator_.bind(val, IValueToPolymorphicValue(ivalue));
}

return dispatchAndCollectOutputs();
}

std::vector<at::Tensor> HostIrEvaluator::runWithPolymorphicValues(
std::unordered_map<Val*, const PolymorphicValue*> val_to_PValue) {
// process input values
for (const auto& [val, pvalue] : val_to_PValue) {
expr_evaluator_.bind(val, *pvalue);
}

return dispatchAndCollectOutputs();
}

std::string HostIrEvaluator::canRun() const {
const int64_t requested_n_gpus = requestedNumberOfDevices(container_.get());

Expand Down Expand Up @@ -315,7 +329,11 @@ void HostIrEvaluator::handle(LaunchKernel* launch_kernel) {

// run the compiled kernel
std::vector<at::Tensor> outputs =
container_->getKernelExecutor(launch_kernel->getIndex())->run(args);
container_->getKernelExecutor(launch_kernel->getIndex())
->run(
args,
launch_kernel->launch_params(),
launch_kernel->compile_params());

// Store the outputs in the context
for (auto output_idx : c10::irange(outputs.size())) {
Expand Down
5 changes: 5 additions & 0 deletions csrc/host_ir/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,11 @@ class HostIrEvaluator final : public OptOutDispatch {
std::unique_ptr<HostIrContainer> container,
Communicator* communicator = nullptr,
HostIrEvaluatorParams = HostIrEvaluatorParams());

std::vector<at::Tensor> runWithInput(
std::unordered_map<Val*, c10::IValue> val_to_IValue);
std::vector<at::Tensor> runWithPolymorphicValues(
std::unordered_map<Val*, const PolymorphicValue*> val_to_PValue);

const std::vector<Val*>& inputs() {
return container_->inputs();
Expand Down Expand Up @@ -133,6 +136,8 @@ class HostIrEvaluator final : public OptOutDispatch {

c10::cuda::CUDAStream getCUDAStream(Stream* stream);

std::vector<at::Tensor> dispatchAndCollectOutputs();

std::unique_ptr<HostIrContainer> container_;
Communicator* communicator_;
HostIrEvaluatorParams params_;
Expand Down
8 changes: 7 additions & 1 deletion csrc/host_ir/host_ir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,16 @@ bool PostOnStream::sameAs(const Statement* other) const {
LaunchKernel::LaunchKernel(
IrBuilderPasskey passkey,
int64_t hic_executor_index,
const LaunchParams& launch_constraints,
const CompileParams& compile_params,
const std::vector<Val*>& inputs,
const std::vector<Val*>& outputs)
: Expr(passkey, inputs, outputs, {}) {
: Expr(passkey, inputs, outputs, {}),
launch_constraints_(launch_constraints),
compile_params_(compile_params) {
nsarka marked this conversation as resolved.
Show resolved Hide resolved
addDataAttribute(hic_executor_index);
nsarka marked this conversation as resolved.
Show resolved Hide resolved
addDataAttribute(launch_constraints);
addDataAttribute(compile_params);
}

NVFUSER_DEFINE_CLONE_AND_CREATE(LaunchKernel)
Expand Down
15 changes: 15 additions & 0 deletions csrc/host_ir/host_ir.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <ir/base_nodes.h>
#include <ir/builder.h>
#include <multidevice/communication.h>
#include <scheduler/heuristic.h>
#include <atomic>

namespace nvfuser {
Expand Down Expand Up @@ -123,6 +124,8 @@ class LaunchKernel : public Expr {
int64_t hic_executor_index, // Index into the HostIrContainer's vector of
// KernelExecutors--i.e., the kernel this IR
// should launch
const LaunchParams& launch_constraints,
const CompileParams& compile_params,
const std::vector<Val*>& inputs,
const std::vector<Val*>& outputs);

Expand All @@ -142,6 +145,18 @@ class LaunchKernel : public Expr {
int64_t getIndex() const {
return attribute<int64_t>(0);
}

const auto& launch_params() const {
return attribute<LaunchParams>(1);
}

const auto& compile_params() const {
return attribute<CompileParams>(2);
}

private:
const LaunchParams launch_constraints_;
CompileParams compile_params_;
nsarka marked this conversation as resolved.
Show resolved Hide resolved
};

class Stream : public Val {
Expand Down
1 change: 1 addition & 0 deletions csrc/options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ const std::unordered_map<std::string, EnableOption>& getEnableOptions() {
{"static_fusion_count", EnableOption::StaticFusionCount},
{"wait_debugger", EnableOption::WaitDebugger},
{"warn_register_spill", EnableOption::WarnRegisterSpill},
{"host_ir_lowering", EnableOption::HostIrLowering},
};
return available_options;
}
Expand Down
1 change: 1 addition & 0 deletions csrc/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ enum class EnableOption {
WaitDebugger, // Used for debugging multi-GPU. The rank given in the argument
// will wait for `gdb attach` at the start.
WarnRegisterSpill, //! Enable warnings of register spill
HostIrLowering, //! Enable FusionKernelRuntime lowering to host IR
EndOfOption //! Placeholder for counting the number of elements
};

Expand Down
110 changes: 95 additions & 15 deletions csrc/runtime/fusion_kernel_runtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,19 @@ std::vector<at::Tensor> FusionKernelRuntime::runWithInputs(
KernelArgumentHolder& args) {
FUSER_PERF_SCOPE("FusionKernelRuntime::runWithInputs");

if (isOptionEnabled(EnableOption::HostIrLowering)) {
if (isDebugDumpEnabled(DebugDumpOption::PerfDebugVerbose)) {
debug() << "=================RUNNING HOSTIR EVALUATOR================="
nsarka marked this conversation as resolved.
Show resolved Hide resolved
<< std::endl;
}

std::unordered_map<Val*, const PolymorphicValue*> tensor_map;
for (const auto i : c10::irange(args.size())) {
tensor_map.emplace(hie_->inputs()[i], args[i]);
}
return hie_->runWithPolymorphicValues(tensor_map);
}

if (isDebugDumpEnabled(DebugDumpOption::PerfDebugVerbose)) {
debug() << "=================RUNNING FUSION SEGMENTS================="
<< std::endl;
Expand Down Expand Up @@ -333,6 +346,13 @@ void FusionKernelRuntime::compileFusionParallel(KernelArgumentHolder args) {
FusionProfiler::startCompile();
}

// host ir
std::unique_ptr<hir::HostIrContainer> hic;
if (isOptionEnabled(EnableOption::HostIrLowering)) {
hic = std::make_unique<hir::HostIrContainer>(
num_groups); // Some indices will be empty
}

std::atomic<bool> detect_exception_in_thread_pool{false};
std::string thread_pool_error_message;
std::mutex thread_pool_error_message_mutex;
Expand Down Expand Up @@ -362,21 +382,23 @@ void FusionKernelRuntime::compileFusionParallel(KernelArgumentHolder args) {
FUSER_PERF_SCOPE("FusionKernelRuntime::compileFusionParallel");
c10::cuda::CUDAGuard dg(args.getDeviceIndex());
c10::Device device(c10::DeviceType::CUDA, args.getDeviceIndex());
compileKernel(group_runtime_inputs, group_to_run);
compileKernel(group_runtime_inputs, group_to_run, hic.get());
} else {
hir::HostIrContainer* hic_p = hic.get();
// launch compileKernel thread here
getThreadPool()->run([this,
args,
group_runtime_inputs,
group_to_run,
&detect_exception_in_thread_pool,
&thread_pool_error_message,
&thread_pool_error_message_mutex]() {
&thread_pool_error_message_mutex,
hic_p]() {
FUSER_PERF_SCOPE("FusionKernelRuntime::compileFusionParallel");
try {
c10::cuda::CUDAGuard dg(args.getDeviceIndex());
c10::Device device(c10::DeviceType::CUDA, args.getDeviceIndex());
compileKernel(group_runtime_inputs, group_to_run);
compileKernel(group_runtime_inputs, group_to_run, hic_p);
} catch (const std::exception& e) {
// Set flag inside lambda so we can throw an exception after thread
// pool completes its work.
Expand All @@ -401,6 +423,39 @@ void FusionKernelRuntime::compileFusionParallel(KernelArgumentHolder args) {
num_live_args_after_segment_runs_.push_back((int64_t)args.size());
}

// add all expressions and compiled kernels to the host ir container
if (isOptionEnabled(EnableOption::HostIrLowering)) {
IrCloner ir_cloner(hic.get());
FusionGuard::setCurFusion(hic.get());
for (int64_t run_order_id = 0; run_order_id < num_groups; ++run_order_id) {
auto group_to_run = runtime_workspace_.group_run_order.at(run_order_id);
auto in_clone = ir_cloner.clone(group_to_run->inputs());
auto out_clone = ir_cloner.clone(group_to_run->outputs());
nsarka marked this conversation as resolved.
Show resolved Hide resolved
if (hic->hasKernelExecutor(run_order_id)) {
nsarka marked this conversation as resolved.
Show resolved Hide resolved
auto heuristic_params = schedulers().at(run_order_id).get();
auto launch_kernel = IrBuilder::create<hir::LaunchKernel>(
run_order_id,
heuristic_params->lparams,
heuristic_params->cparams,
std::vector<Val*>{in_clone},
std::vector<Val*>{out_clone});
hic->pushBackTopLevelExprs(launch_kernel);
} else {
// push back segment's exprs into the container as top level expressions
for (auto* expr : group_to_run->exprs()) {
auto cloned_expr = ir_cloner.clone(expr);
hic->pushBackTopLevelExprs(cloned_expr);
}
}
}
for (const Val* in : segmented_fusion_->inputs()) {
hic->addInput(ir_cloner.clone(in));
}
for (const Val* out : segmented_fusion_->outputs()) {
hic->addOutput(ir_cloner.clone(out));
}
}

if (num_groups != 1 && !isOptionDisabled(DisableOption::ParallelCompile)) {
// Wait until all segments finish compiling
getThreadPool()->waitWorkComplete();
Expand All @@ -411,6 +466,11 @@ void FusionKernelRuntime::compileFusionParallel(KernelArgumentHolder args) {
thread_pool_error_message,
"\nUse NVFUSER_DISABLE=parallel_compile to simplify error message.");
}

if (isOptionEnabled(EnableOption::HostIrLowering)) {
hie_ = std::make_unique<hir::HostIrEvaluator>(std::move(hic));
wujingyue marked this conversation as resolved.
Show resolved Hide resolved
}

if (isProfilerEnabled()) {
FusionProfiler::stopCompile();
}
Expand Down Expand Up @@ -661,7 +721,8 @@ std::vector<at::Tensor> FusionKernelRuntime::runKernelWithInput(

void FusionKernelRuntime::compileKernel(
const KernelArgumentHolder& args,
SegmentedGroup* sg) {
SegmentedGroup* sg,
hir::HostIrContainer* hic) {
FUSER_PERF_SCOPE("FusionKernelRuntime::compileKernel");
auto group_id = sg->groupId();
auto heuristic_params = schedulers().at(group_id).get();
Expand All @@ -684,17 +745,36 @@ void FusionKernelRuntime::compileKernel(
heuristic_params->cparams.index_type.has_value(),
"Kernel index type is not defined.");

// Initialize associated executors
executors_[group_id] = ExecutorDispatch::makeExecutor(
fusion_to_run.get(), fusion_id_, concrete_id_, runtime_id_, group_id);

ExecutorDispatch::compile(
executors_.at(group_id).get(),
fusion_to_run.get(),
args,
heuristic_params->lparams,
heuristic_params->cparams,
heuristic_params->scheduler_type);
if (hic != nullptr) {
// if it's a kernel executor, compile the segment and append to hic
// otherwise, push the segment's exprs directly to the hic
if (!HostIrExecutor::supported(fusion_to_run.get()) &&
!ExprEvalExecutor::supported(fusion_to_run.get())) {
nsarka marked this conversation as resolved.
Show resolved Hide resolved
NVF_ERROR(
KernelExecutor::supported(fusion_to_run.get()),
"Fusion not supported by any executor type");
auto ke = std::make_unique<KernelExecutor>();
ke->compile(
fusion_to_run.get(),
args,
heuristic_params->lparams,
heuristic_params->cparams,
heuristic_params->scheduler_type);
hic->setKernelExecutor(group_id, std::move(ke));
}
} else {
// Initialize associated executors
executors_[group_id] = ExecutorDispatch::makeExecutor(
fusion_to_run.get(), fusion_id_, concrete_id_, runtime_id_, group_id);

ExecutorDispatch::compile(
executors_.at(group_id).get(),
fusion_to_run.get(),
args,
heuristic_params->lparams,
heuristic_params->cparams,
heuristic_params->scheduler_type);
}
}

std::pair<LaunchParams, CompileParams> FusionKernelRuntime::getKernelConfig(
Expand Down
9 changes: 8 additions & 1 deletion csrc/runtime/fusion_kernel_runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#include <c10/util/ArrayRef.h>

#include <fusion_segmenter.h>
#include <host_ir/executor.h>
#include <polymorphic_value.h>
#include <runtime/executor.h>
#include <runtime/executor_kernel_arg.h>
Expand Down Expand Up @@ -160,7 +161,10 @@ class FusionKernelRuntime {
//! Interface to compile a single kernel. It is either a single kernel for a
//! fusion or a kernel for a segmentedGrouup in a segmented fusion. Returns
//! launch and compile parameters for kernel.
void compileKernel(const KernelArgumentHolder& args, SegmentedGroup* sg);
void compileKernel(
const KernelArgumentHolder& args,
SegmentedGroup* sg,
hir::HostIrContainer* hic);

std::pair<LaunchParams, CompileParams> getKernelConfig(
const KernelArgumentHolder& args,
Expand All @@ -175,6 +179,9 @@ class FusionKernelRuntime {
//! Executors holding compiled kernels
std::vector<std::unique_ptr<ExecutorAbstract>> executors_;

//! Host IR Evaluator
std::unique_ptr<hir::HostIrEvaluator> hie_;

// A metadata copy of initial arguments used to contruct this
// FusionKernelRuntime. Used during deserialization to schedule the fusion
// rather than storing the scheduled fusion directly.
Expand Down
Loading
Loading