From 9a29a367897c7168e0b33a3ded593bfc1c788900 Mon Sep 17 00:00:00 2001 From: Jeffrey Jiang Date: Mon, 3 Mar 2025 10:32:02 -0600 Subject: [PATCH 1/8] add borrowed tensor support for aggregate_as_tensor --- ttnn/cpp/ttnn/distributed/api.cpp | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/ttnn/cpp/ttnn/distributed/api.cpp b/ttnn/cpp/ttnn/distributed/api.cpp index b97c390a98a..c1e60e31558 100644 --- a/ttnn/cpp/ttnn/distributed/api.cpp +++ b/ttnn/cpp/ttnn/distributed/api.cpp @@ -93,6 +93,29 @@ Tensor aggregate_as_tensor( } auto storage = MultiDeviceHostStorage{config, std::move(host_owned_buffers), specs}; return Tensor(std::move(storage), reference_shard.get_tensor_spec()); +} else if (storage_type == StorageType::BORROWED) { + std::vector specs; + std::vector host_owned_buffers; + for (const auto& shard : tensor_shards) { + auto buffer = std::get(shard.get_storage()).buffer; + specs.push_back(shard.get_tensor_spec()); + + auto visitor = tt::stl::overloaded{[&shard, &host_owned_buffers](const auto& buffer) -> OwnedBuffer { + using BufferType = std::decay_t; + using ValueType = typename BufferType::value_type; + + std::vector physical_data(buffer.begin(), buffer.end()); + + std::vector logical_data = + tensor_impl::decode_tensor_data(std::move(physical_data), shard.get_tensor_spec()); + + return owned_buffer::create(std::move(logical_data)); + }}; + + host_owned_buffers.push_back(std::visit(visitor, buffer)); + } + auto storage = MultiDeviceHostStorage{config, std::move(host_owned_buffers), specs}; + return Tensor(std::move(storage), reference_shard.get_tensor_spec()); } else { std::vector ordered_device_ids; std::unordered_map specs; From 212e51b6f16a68d41b68c7091bfcb920d58acf0e Mon Sep 17 00:00:00 2001 From: Jeffrey Jiang Date: Mon, 3 Mar 2025 10:44:44 -0600 Subject: [PATCH 2/8] add TT_FATAL check --- ttnn/cpp/ttnn/distributed/api.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/ttnn/cpp/ttnn/distributed/api.cpp b/ttnn/cpp/ttnn/distributed/api.cpp index c1e60e31558..c91ce4850c1 100644 --- a/ttnn/cpp/ttnn/distributed/api.cpp +++ b/ttnn/cpp/ttnn/distributed/api.cpp @@ -117,6 +117,7 @@ Tensor aggregate_as_tensor( auto storage = MultiDeviceHostStorage{config, std::move(host_owned_buffers), specs}; return Tensor(std::move(storage), reference_shard.get_tensor_spec()); } else { + TT_FATAL(storage_type == StorageType::DEVICE, "Unexpected storage type {}", storage_type); std::vector ordered_device_ids; std::unordered_map specs; std::unordered_map> device_buffers; From 2580ef7ed29e2c5611c7ed29980c454dacc30990 Mon Sep 17 00:00:00 2001 From: Jeffrey Jiang Date: Mon, 3 Mar 2025 21:54:18 +0000 Subject: [PATCH 3/8] simplify borrowed aggregate --- ttnn/cpp/ttnn/distributed/api.cpp | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/ttnn/cpp/ttnn/distributed/api.cpp b/ttnn/cpp/ttnn/distributed/api.cpp index c91ce4850c1..5cc40d69316 100644 --- a/ttnn/cpp/ttnn/distributed/api.cpp +++ b/ttnn/cpp/ttnn/distributed/api.cpp @@ -7,14 +7,17 @@ #include #include +#include "tt-metalium/assert.hpp" #include "tt-metalium/mesh_coord.hpp" #include "ttnn/tensor/tensor.hpp" +#include "ttnn/tensor/host_buffer/functions.hpp" #include "ttnn/tensor/tensor_utils.hpp" #include "ttnn/distributed/distributed_tensor_config.hpp" #include #include #include "ttnn/distributed/distributed_tensor_config.hpp" + using namespace tt::tt_metal; namespace ttnn::distributed { @@ -101,15 +104,9 @@ Tensor aggregate_as_tensor( specs.push_back(shard.get_tensor_spec()); auto visitor = tt::stl::overloaded{[&shard, &host_owned_buffers](const auto& buffer) -> OwnedBuffer { - using BufferType = std::decay_t; - using ValueType = typename BufferType::value_type; - - std::vector physical_data(buffer.begin(), buffer.end()); - - std::vector logical_data = - tensor_impl::decode_tensor_data(std::move(physical_data), shard.get_tensor_spec()); + using BorrowedBufferType = std::vector::value_type>; - return owned_buffer::create(std::move(logical_data)); + return owned_buffer::create(BorrowedBufferType(buffer.begin(), buffer.end())); }}; host_owned_buffers.push_back(std::visit(visitor, buffer)); From bbf76cf1c1d1e31448cd75934fc2afd1f4fa92cc Mon Sep 17 00:00:00 2001 From: Jeffrey Jiang Date: Fri, 7 Mar 2025 21:09:53 +0000 Subject: [PATCH 4/8] duplicate to other file --- .../gtests/tensor/test_distributed_tensor.cpp | 64 +++++++++++++++++++ 1 file changed, 64 insertions(+) diff --git a/tests/ttnn/unit_tests/gtests/tensor/test_distributed_tensor.cpp b/tests/ttnn/unit_tests/gtests/tensor/test_distributed_tensor.cpp index 810da702d59..7e93bd5a7bd 100644 --- a/tests/ttnn/unit_tests/gtests/tensor/test_distributed_tensor.cpp +++ b/tests/ttnn/unit_tests/gtests/tensor/test_distributed_tensor.cpp @@ -22,6 +22,70 @@ TensorSpec get_tensor_spec(const ttnn::Shape& shape, DataType dtype) { return TensorSpec(shape, TensorLayout(dtype, Layout::ROW_MAJOR, MemoryConfig{})); } +TEST_F(TensorDistributionTest, DeviceAggregate) { + const int num_devices = mesh_device_->num_devices(); + std::vector> test_data(num_devices); + for (int i = 0; i < num_devices; i++) { + test_data[i].insert(test_data[i].end(), {i * 1.F, i * 2.F, i * 3.F}); + } + + std::vector tensors(num_devices); + + for(int i = 0; i < num_devices; i++) { + tensors.push_back(Tensor::from_vector( + test_data[i], get_tensor_spec(ttnn::Shape{1, num_devices, 3, 1}, DataType::FLOAT32), mesh_device_.get())); + } + + Tensor aggregated_tensor = aggregate_as_tensor(tensors, AllGatherTensor{}); + EXPECT_TRUE(aggregated_tensor.storage_type() == StorageType::MULTI_DEVICE); + + std::vector test_data_1d; + + for (std::vector device : test_data) { + for (float datum : device) { + test_data_1d.push_back(datum); + } + } + + std::vector out_vector = aggregated_tensor.to_vector(); + EXPECT_EQ(out_vector, test_data_1d); +} + +TEST_F(TensorDistributionTest, BorrowedAggregate) { + const int num_devices = mesh_device_->num_devices(); + std::vector> test_data(num_devices); + for (int i = 0; i < num_devices; i++) { + test_data[i].insert(test_data[i].end(), {i * 1.F, i * 2.F, i * 3.F}); + } + + std::vector tensors(num_devices); + auto on_creation_callback = [] {}; + auto on_destruction_callback = [] {}; + for(int i = 0; i < num_devices; i++) { + tensors.push_back(Tensor( + BorrowedStorage{ + tt::tt_metal::borrowed_buffer::Buffer(static_cast(test_data[i].data()), test_data[i].size()), + on_creation_callback, + on_destruction_callback}, + ttnn::Shape{1, num_devices, 3, 1}, + DataType::FLOAT32, + Layout::TILE)); + } + Tensor aggregated_tensor = aggregate_as_tensor(tensors, AllGatherTensor{}); + EXPECT_TRUE(aggregated_tensor.storage_type() == StorageType::MULTI_DEVICE_HOST); + + std::vector test_data_1d; + + for (std::vector device : test_data) { + for (float datum : device) { + test_data_1d.push_back(datum); + } + } + + std::vector out_vector = aggregated_tensor.to_vector(); + EXPECT_EQ(out_vector, test_data_1d); +} + TEST_F(TensorDistributionTest, DistributeToDevice) { Tensor input_tensor = Tensor::from_vector( std::vector{42.F, 13.F, -99.F}, get_tensor_spec(ttnn::Shape{1, 1, 1, 3}, DataType::FLOAT32)); From 07291e9307fbbe96cfabdbe85fa12cc2c68a22e0 Mon Sep 17 00:00:00 2001 From: Jeffrey Jiang Date: Mon, 3 Mar 2025 10:32:02 -0600 Subject: [PATCH 5/8] add borrowed tensor support for aggregate_as_tensor --- ttnn/cpp/ttnn/distributed/api.cpp | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/ttnn/cpp/ttnn/distributed/api.cpp b/ttnn/cpp/ttnn/distributed/api.cpp index 5cc40d69316..42d4ad186c1 100644 --- a/ttnn/cpp/ttnn/distributed/api.cpp +++ b/ttnn/cpp/ttnn/distributed/api.cpp @@ -104,9 +104,15 @@ Tensor aggregate_as_tensor( specs.push_back(shard.get_tensor_spec()); auto visitor = tt::stl::overloaded{[&shard, &host_owned_buffers](const auto& buffer) -> OwnedBuffer { - using BorrowedBufferType = std::vector::value_type>; + using BufferType = std::decay_t; + using ValueType = typename BufferType::value_type; - return owned_buffer::create(BorrowedBufferType(buffer.begin(), buffer.end())); + std::vector physical_data(buffer.begin(), buffer.end()); + + std::vector logical_data = + tensor_impl::decode_tensor_data(std::move(physical_data), shard.get_tensor_spec()); + + return owned_buffer::create(std::move(logical_data)); }}; host_owned_buffers.push_back(std::visit(visitor, buffer)); From 8af7cd894ebd74b3423d1d630f529ba07d3ef926 Mon Sep 17 00:00:00 2001 From: Jeffrey Jiang Date: Mon, 3 Mar 2025 21:54:18 +0000 Subject: [PATCH 6/8] simplify borrowed aggregate --- ttnn/cpp/ttnn/distributed/api.cpp | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/ttnn/cpp/ttnn/distributed/api.cpp b/ttnn/cpp/ttnn/distributed/api.cpp index 42d4ad186c1..5cc40d69316 100644 --- a/ttnn/cpp/ttnn/distributed/api.cpp +++ b/ttnn/cpp/ttnn/distributed/api.cpp @@ -104,15 +104,9 @@ Tensor aggregate_as_tensor( specs.push_back(shard.get_tensor_spec()); auto visitor = tt::stl::overloaded{[&shard, &host_owned_buffers](const auto& buffer) -> OwnedBuffer { - using BufferType = std::decay_t; - using ValueType = typename BufferType::value_type; + using BorrowedBufferType = std::vector::value_type>; - std::vector physical_data(buffer.begin(), buffer.end()); - - std::vector logical_data = - tensor_impl::decode_tensor_data(std::move(physical_data), shard.get_tensor_spec()); - - return owned_buffer::create(std::move(logical_data)); + return owned_buffer::create(BorrowedBufferType(buffer.begin(), buffer.end())); }}; host_owned_buffers.push_back(std::visit(visitor, buffer)); From a1dc6deae2da9483a808ecbb7d96e5a47a68c597 Mon Sep 17 00:00:00 2001 From: Jeffrey Jiang Date: Thu, 6 Mar 2025 23:12:50 +0000 Subject: [PATCH 7/8] add untested tests --- .../gtests/tensor/test_distributed_tensor.cpp | 40 +++++-------------- 1 file changed, 9 insertions(+), 31 deletions(-) diff --git a/tests/ttnn/unit_tests/gtests/tensor/test_distributed_tensor.cpp b/tests/ttnn/unit_tests/gtests/tensor/test_distributed_tensor.cpp index 7e93bd5a7bd..213566117d9 100644 --- a/tests/ttnn/unit_tests/gtests/tensor/test_distributed_tensor.cpp +++ b/tests/ttnn/unit_tests/gtests/tensor/test_distributed_tensor.cpp @@ -32,23 +32,15 @@ TEST_F(TensorDistributionTest, DeviceAggregate) { std::vector tensors(num_devices); for(int i = 0; i < num_devices; i++) { - tensors.push_back(Tensor::from_vector( - test_data[i], get_tensor_spec(ttnn::Shape{1, num_devices, 3, 1}, DataType::FLOAT32), mesh_device_.get())); + tensors.push_back(Tensor::from_vector(test_data[i], get_tensor_spec(ttnn::Shape{1, num_devices, 3, 1}, DataType::FLOAT32)), mesh_device_); } Tensor aggregated_tensor = aggregate_as_tensor(tensors, AllGatherTensor{}); + System.out.println(aggregated_tensor) EXPECT_TRUE(aggregated_tensor.storage_type() == StorageType::MULTI_DEVICE); - - std::vector test_data_1d; - - for (std::vector device : test_data) { - for (float datum : device) { - test_data_1d.push_back(datum); - } - } - + std::vector out_vector = aggregated_tensor.to_vector(); - EXPECT_EQ(out_vector, test_data_1d); + EXPECT_EQ(out_vector, test_data); } TEST_F(TensorDistributionTest, BorrowedAggregate) { @@ -59,31 +51,17 @@ TEST_F(TensorDistributionTest, BorrowedAggregate) { } std::vector tensors(num_devices); - auto on_creation_callback = [] {}; - auto on_destruction_callback = [] {}; + for(int i = 0; i < num_devices; i++) { - tensors.push_back(Tensor( - BorrowedStorage{ - tt::tt_metal::borrowed_buffer::Buffer(static_cast(test_data[i].data()), test_data[i].size()), - on_creation_callback, - on_destruction_callback}, - ttnn::Shape{1, num_devices, 3, 1}, - DataType::FLOAT32, - Layout::TILE)); + tensors.push_back(Tensor() Tensor::from_vector(test_data[i], get_tensor_spec(ttnn::Shape{1, num_devices, 3, 1}, DataType::FLOAT32))); } +Storage storage, const ttnn::Shape& shape, DataType dtype, Layout layout, const std::optional& tile Tensor aggregated_tensor = aggregate_as_tensor(tensors, AllGatherTensor{}); + System.out.println(aggregated_tensor) EXPECT_TRUE(aggregated_tensor.storage_type() == StorageType::MULTI_DEVICE_HOST); - std::vector test_data_1d; - - for (std::vector device : test_data) { - for (float datum : device) { - test_data_1d.push_back(datum); - } - } - std::vector out_vector = aggregated_tensor.to_vector(); - EXPECT_EQ(out_vector, test_data_1d); + EXPECT_EQ(out_vector, test_data); } TEST_F(TensorDistributionTest, DistributeToDevice) { From df8f0f584a12c770faabcf6aa8dc3e0720077639 Mon Sep 17 00:00:00 2001 From: Jeffrey Jiang Date: Fri, 7 Mar 2025 21:08:58 +0000 Subject: [PATCH 8/8] fix syntax errors on tests --- .../gtests/tensor/test_distributed_tensor.cpp | 40 ++++++++++++++----- 1 file changed, 31 insertions(+), 9 deletions(-) diff --git a/tests/ttnn/unit_tests/gtests/tensor/test_distributed_tensor.cpp b/tests/ttnn/unit_tests/gtests/tensor/test_distributed_tensor.cpp index 213566117d9..7e93bd5a7bd 100644 --- a/tests/ttnn/unit_tests/gtests/tensor/test_distributed_tensor.cpp +++ b/tests/ttnn/unit_tests/gtests/tensor/test_distributed_tensor.cpp @@ -32,15 +32,23 @@ TEST_F(TensorDistributionTest, DeviceAggregate) { std::vector tensors(num_devices); for(int i = 0; i < num_devices; i++) { - tensors.push_back(Tensor::from_vector(test_data[i], get_tensor_spec(ttnn::Shape{1, num_devices, 3, 1}, DataType::FLOAT32)), mesh_device_); + tensors.push_back(Tensor::from_vector( + test_data[i], get_tensor_spec(ttnn::Shape{1, num_devices, 3, 1}, DataType::FLOAT32), mesh_device_.get())); } Tensor aggregated_tensor = aggregate_as_tensor(tensors, AllGatherTensor{}); - System.out.println(aggregated_tensor) EXPECT_TRUE(aggregated_tensor.storage_type() == StorageType::MULTI_DEVICE); - + + std::vector test_data_1d; + + for (std::vector device : test_data) { + for (float datum : device) { + test_data_1d.push_back(datum); + } + } + std::vector out_vector = aggregated_tensor.to_vector(); - EXPECT_EQ(out_vector, test_data); + EXPECT_EQ(out_vector, test_data_1d); } TEST_F(TensorDistributionTest, BorrowedAggregate) { @@ -51,17 +59,31 @@ TEST_F(TensorDistributionTest, BorrowedAggregate) { } std::vector tensors(num_devices); - + auto on_creation_callback = [] {}; + auto on_destruction_callback = [] {}; for(int i = 0; i < num_devices; i++) { - tensors.push_back(Tensor() Tensor::from_vector(test_data[i], get_tensor_spec(ttnn::Shape{1, num_devices, 3, 1}, DataType::FLOAT32))); + tensors.push_back(Tensor( + BorrowedStorage{ + tt::tt_metal::borrowed_buffer::Buffer(static_cast(test_data[i].data()), test_data[i].size()), + on_creation_callback, + on_destruction_callback}, + ttnn::Shape{1, num_devices, 3, 1}, + DataType::FLOAT32, + Layout::TILE)); } -Storage storage, const ttnn::Shape& shape, DataType dtype, Layout layout, const std::optional& tile Tensor aggregated_tensor = aggregate_as_tensor(tensors, AllGatherTensor{}); - System.out.println(aggregated_tensor) EXPECT_TRUE(aggregated_tensor.storage_type() == StorageType::MULTI_DEVICE_HOST); + std::vector test_data_1d; + + for (std::vector device : test_data) { + for (float datum : device) { + test_data_1d.push_back(datum); + } + } + std::vector out_vector = aggregated_tensor.to_vector(); - EXPECT_EQ(out_vector, test_data); + EXPECT_EQ(out_vector, test_data_1d); } TEST_F(TensorDistributionTest, DistributeToDevice) {