diff --git a/csrc/host_ir/executor.cpp b/csrc/host_ir/executor.cpp index 1f8dc97bbfa..d825a2a941f 100644 --- a/csrc/host_ir/executor.cpp +++ b/csrc/host_ir/executor.cpp @@ -467,6 +467,22 @@ void HostIrEvaluator::handle(MatmulOp* matmul) { } } +void HostIrEvaluator::handle(kir::Allocate* allocate) { + NVF_ERROR( + allocate->buffer()->isA(), + "Allocation must be on a TensorView but got ", + allocate->buffer()); + TensorView* tv = allocate->buffer()->as(); + GlobalBufferInfo info = + getBufferInfos(expr_evaluator_, PrimDataType::Int, {tv}).at(0); + AliasInfo alias_info = { + .type = AllocationType::New, .aliased_io = nullptr, .hide_output = false}; + c10::Device device = + communicator_ ? communicator_->device() : at::Device("cuda:0"); + at::Tensor tensor = allocateTensor(info, alias_info, device, expr_evaluator_); + expr_evaluator_.bind(tv, tensor); +} + void HostIrEvaluator::unhandled(Statement* stmt) { NVF_ERROR(stmt->isA(), stmt, " must be an Expr"); auto* expr = stmt->as(); diff --git a/csrc/host_ir/executor.h b/csrc/host_ir/executor.h index 53d37d7560b..76a27c2f5d1 100644 --- a/csrc/host_ir/executor.h +++ b/csrc/host_ir/executor.h @@ -114,6 +114,7 @@ class HostIrEvaluator final : public OptOutDispatch { void handle(EndCoalescing* end_coalescing) override; void handle(kir::IfThenElse* if_then_else) override; void handle(MatmulOp* matmul) override; + void handle(kir::Allocate* allocate) override; void unhandled(Statement* stmt) override; c10::cuda::CUDAStream getCUDAStream(Stream* stream); diff --git a/csrc/kernel_ir.cpp b/csrc/kernel_ir.cpp index 32bd5b8fe2a..fc464eac315 100644 --- a/csrc/kernel_ir.cpp +++ b/csrc/kernel_ir.cpp @@ -142,8 +142,8 @@ Allocate::Allocate( : Expr(passkey) { NVF_ERROR(passkey.ir_container_ != nullptr); NVF_ERROR( - passkey.ir_container_->isA(), - "IR type only valid for Kernel container."); + (passkey.ir_container_->isOneOf()), + "IR type only valid for Kernel or HostIr container."); if (!shape.empty()) { NVF_ERROR( (shape.size() == 1 && shape[0]->isOneInt()) || diff --git a/csrc/multidevice/executor.cpp b/csrc/multidevice/executor.cpp index 235f51a27c1..157dd6b99dc 100644 --- a/csrc/multidevice/executor.cpp +++ b/csrc/multidevice/executor.cpp @@ -27,51 +27,6 @@ namespace nvfuser { -namespace { - -// returns a copied fusion where the original outputs have been replaced by -// the ones given as argument -std::unique_ptr copyFusionAndChangeOutputs( - Fusion* fusion, - const std::vector& outputs) { - std::unique_ptr fusion_copy = std::make_unique(); - std::unordered_map copy_to_original_map; - auto original_to_copy_cloner = Fusion::copy(fusion, fusion_copy.get()); - - auto original_outputs = fusion_copy->outputs(); - - // Remove original outputs - std::for_each( - original_outputs.begin(), original_outputs.end(), [&](auto& output) { - fusion_copy->removeOutput(output); - }); - - // Add new outputs - std::for_each(outputs.begin(), outputs.end(), [&](Val* const& output) { - fusion_copy->addOutput(original_to_copy_cloner.clone(output)); - }); - - return fusion_copy; -} - -// Used in distributed setting where we only want to allocate output space and -// receive output data from a different rank instead of computing them. -std::vector allocateOutputSpace( - const at::ArrayRef& inputs, - Fusion* fusion, - const c10::Device& device) { - FUSER_PERF_SCOPE("multidevice::executor::allocateOutputSpace"); - auto fusion_inputs = KernelArgumentHolder::createKernelArgumentHolder(inputs); - auto expr_eval = executor_utils::bindInputs(fusion_inputs, fusion); - - auto output_info = - getBufferInfos(expr_eval, PrimDataType::Int, fusion->outputs()); - - return allocateOutputs(fusion, output_info, device, expr_eval); -} - -} // namespace - MultiDeviceExecutor::MultiDeviceExecutor( std::unique_ptr fusion, Communicator& comm, @@ -138,8 +93,15 @@ MultiDeviceExecutor::MultiDeviceExecutor( std::vector communications = lowerCommunication(ir_cloner.clone(group->exprs().at(0))); for (Communication* communication : communications) { - auto wait = IrBuilder::create(communication); + // Allocate the recv buffers of communications + TensorView* tv = communication->out(); + if (tv->getDeviceMesh().has(comm_.deviceId())) { + auto* allocate = + IrBuilder::create(tv, MemoryType::Global); + hic->pushBackTopLevelExprs(allocate); + } hic->pushBackTopLevelExprs(communication); + auto wait = IrBuilder::create(communication); hic->pushBackTopLevelExprs(wait); } } else { @@ -160,27 +122,6 @@ MultiDeviceExecutor::MultiDeviceExecutor( // Create the HostIrEvaluator representing the host program host_ir_executor_ = std::make_unique(std::move(hic), &comm, params); - - // Allocator setup - // vals_to_allocate_ stores the tensors that need to be allocated at runtime, - // which correspond to the destination buffers of interdevice communications. - // TODO: reuse allocated buffers and support inplace collectives - // TODO: handle allocation as Host Ir - for (SegmentedGroup* group : staged_fusion->groups()) { - if (isResharding(group->exprs().at(0))) { - NVF_ERROR(group->exprs().at(0)->outputs().size() == 1); - auto val = group->exprs().at(0)->outputs().at(0); - NVF_ERROR(val->isA()); - auto tv = val->as(); - NVF_ERROR(tv->hasDeviceMesh()); - if (tv->getDeviceMesh().has(comm_.deviceId())) { - vals_to_allocate_.push_back(val); - } - } - } - allocator_fusion_ = copyFusionAndChangeOutputs( - staged_fusion->completeFusion(), vals_to_allocate_); - vals_to_allocate_ = clone(vals_to_allocate_); } std::vector MultiDeviceExecutor::runWithInput( @@ -203,13 +144,6 @@ std::vector MultiDeviceExecutor::runWithInput( inputs.at(input_idx); } - auto allocations = - allocateOutputSpace(inputs, allocator_fusion_.get(), comm()->device()); - NVF_ERROR(vals_to_allocate_.size() == allocations.size()); - for (auto i : c10::irange(allocations.size())) { - val_to_IValue[vals_to_allocate_.at(i)] = allocations.at(i); - } - return host_ir_executor_->runWithInput(val_to_IValue); } diff --git a/csrc/multidevice/executor.h b/csrc/multidevice/executor.h index 3f1181a79e4..4ff8065099c 100644 --- a/csrc/multidevice/executor.h +++ b/csrc/multidevice/executor.h @@ -107,12 +107,6 @@ class MultiDeviceExecutor { std::unique_ptr complete_fusion_; // holds the HostIrEvaluator used for execution std::unique_ptr host_ir_executor_; - // Cached objects used for MultiDevice allocation - // TODO: remove and handle the allocation through Host Irs - std::unique_ptr allocator_fusion_; - // Cache the tensors that need to be allocated at runtime, which correspond to - // the destination buffers of interdevice communications. - std::vector vals_to_allocate_; }; } // namespace nvfuser diff --git a/csrc/multidevice/lower_communication.cpp b/csrc/multidevice/lower_communication.cpp index 4b878ac7376..c9f410041da 100644 --- a/csrc/multidevice/lower_communication.cpp +++ b/csrc/multidevice/lower_communication.cpp @@ -238,6 +238,9 @@ std::vector lowerCommunication(Expr* c) { auto* input_tv = c->input(0)->as(); auto* output_tv = c->output(0)->as(); + input_tv->setMemoryType(MemoryType::Global); + output_tv->setMemoryType(MemoryType::Global); + const DeviceMesh& sender_mesh = input_tv->getDeviceMesh(); const DeviceMesh& receiver_mesh = output_tv->getDeviceMesh(); const bool same_mesh = sender_mesh == receiver_mesh; diff --git a/csrc/runtime/allocations.cpp b/csrc/runtime/allocations.cpp index 29fa52461e6..45bfe431c3f 100644 --- a/csrc/runtime/allocations.cpp +++ b/csrc/runtime/allocations.cpp @@ -243,14 +243,12 @@ void fillTensorWithNan(at::Tensor& t) { } } -namespace { -// Allocate an `at::Tensor` for `out_info` or compute it as an alias. -at::Tensor allocateOutput( +at::Tensor allocateTensor( const GlobalBufferInfo& out_info, const AliasInfo& alias_info, const c10::Device& device, ExpressionEvaluator& ee) { - FUSER_PERF_SCOPE("fusion_executor::allocations::allocateOutput"); + FUSER_PERF_SCOPE("fusion_executor::allocations::allocateTensor"); // Handle a fusion with duplicated outputs. TensorView* out_tv = out_info.tv; if (ee.isKnown(out_tv)) { @@ -312,7 +310,6 @@ at::Tensor allocateOutput( NVF_THROW("Unrecognized AllocationType."); } } -} // namespace std::vector allocateOutputs( const Fusion* fusion, @@ -354,7 +351,7 @@ std::vector allocateOutputs( std::vector out_tensors(num_outs); for (const auto& [out_index, out] : sorted_outs) { - at::Tensor out_tensor = allocateOutput( + at::Tensor out_tensor = allocateTensor( output_info[out_index], fusion->getOutputAlias(out), device, ee); // Bind `out_tensor` so // 1. duplicated outputs map to the same tensor, diff --git a/csrc/runtime/allocations.h b/csrc/runtime/allocations.h index 1ec77eb3ce2..64bdb50a9bd 100644 --- a/csrc/runtime/allocations.h +++ b/csrc/runtime/allocations.h @@ -61,6 +61,13 @@ std::pair, std::vector> inferShapeOfOutput( TensorView* tv, ExpressionEvaluator& expr_eval); +// Allocate an `at::Tensor` for `out_info` or compute it as an alias. +at::Tensor allocateTensor( + const GlobalBufferInfo& out_info, + const AliasInfo& alias_info, + const c10::Device& device, + ExpressionEvaluator& ee); + // Allocate output tensors for a given fusion. Outputs may alias inputs, in // that case output tensors are shallow copies of the aliased inputs std::vector allocateOutputs( diff --git a/tests/cpp/test_host_irs.cpp b/tests/cpp/test_host_irs.cpp index f7edd4685d5..cb44550b883 100644 --- a/tests/cpp/test_host_irs.cpp +++ b/tests/cpp/test_host_irs.cpp @@ -1012,6 +1012,64 @@ TEST_F(IfThenElseTest, HostIr) { } } +using AllocationTest = NVFuserTest; + +TEST_F(AllocationTest, HostIr) { + const std::vector sizes = {8, 64}; + + auto hic = std::make_unique(); + FusionGuard fg(hic.get()); + + auto* tv = makeConcreteTensor(sizes); + tv->setMemoryType(MemoryType::Global); + auto* allocate = IrBuilder::create(tv, MemoryType::Global); + hic->addOutput(tv); + hic->pushBackTopLevelExprs(allocate); + + HostIrEvaluator hie(std::move(hic)); + + auto outputs = hie.runWithInput({}); + + EXPECT_EQ(sizes, outputs.at(0).sizes()); +} + +TEST_F(AllocationTest, inHostForLoop) { + constexpr int64_t kForLoopStop = 4; + const std::vector sizes = {8, 64}; + + auto hic = std::make_unique(); + FusionGuard fg(hic.get()); + + auto* for_loop = IrBuilder::create( + /*IterDomain=*/makeContigConcreteTensor({0})->axis(0), // unused + /*index=*/IrBuilder::create(DataType::Index), + /*start=*/hic->zeroVal(), + /*stop=*/IrBuilder::create(kForLoopStop, DataType::Index), + /*step=*/hic->oneVal(), + /*vectorize=*/false, + /*vectorize_shift=*/nullptr, + /*unroll_required=*/false, + CircularBufferLoopStage::NotApplicable, + /*circular_buffer_loop_stage_depth=*/0); + + TensorView* tv0 = makeConcreteTensor(sizes); + tv0->setMemoryType(MemoryType::Global); + auto* allocate = IrBuilder::create(tv0, MemoryType::Global); + TensorView* tv1 = abs(tv0); + + for_loop->body().push_back(allocate); + for_loop->body().push_back(tv1->definition()); + + hic->pushBackTopLevelExprs(for_loop); + hic->addOutput(tv1); + + HostIrEvaluator hie(std::move(hic)); + + auto outputs = hie.runWithInput({}); + + EXPECT_EQ(sizes, outputs.at(0).sizes()); +} + } // namespace hir } // namespace nvfuser