From 1ae9605be4021eb3e0d066b15cb6516892705f16 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Wed, 12 Feb 2025 17:38:23 -0800 Subject: [PATCH] perf(swordfish): Parallel expression evaluation (#3593) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Addresses: https://github.com/Eventual-Inc/Daft/issues/3389. More generally, this PR optimizes for projections with many expressions, particularly memory intensive expressions like UDFs. **Problem:** Currently, swordfish parallelizes projections across morsels, with 1 CPU per morsel. However, if each projection has many memory intensive expressions, we could experience a massive inflation in memory because we will have many materialized morsels living in memory at once. **Proposed solution:** Instead, we can parallelize the expressions within the projection (but only for expressions that require compute). This way, we still have good CPU utilization, but we keep a lower number of materialized morsels in memory. In the linked issue above, we see that a 128cpu machine will parallelize morsels across the cores, each doing multiple udfs, resulting in "317GB allocations and duration 351 secs". This PR reduces that to 7.8GB peak memory and runtime of 66 seconds. Screenshot 2024-12-17 at 3 54 06 PM **Notes:** - Found a bug with the loole channels where an `async` send to a `sync` receive was not respecting capacity constraints, and was allowing sends even though the receive did not happen. Moved over to https://github.com/fereidani/kanal, which worked much better. Todos for next time: - We should also be able to parallelize expression evaluation within a single expression, since it is a tree. We can calculate max width of the tree and set that as max parallel tasks. --------- Co-authored-by: EC2 Default User Co-authored-by: Colin Ho Co-authored-by: Colin Ho --- Cargo.lock | 82 +++++++++-------- src/daft-dsl/src/expr/mod.rs | 29 ++++++ src/daft-local-execution/Cargo.toml | 2 +- src/daft-local-execution/src/channel.rs | 20 ++-- src/daft-local-execution/src/dispatcher.rs | 4 +- .../src/intermediate_ops/intermediate_op.rs | 2 +- .../src/intermediate_ops/project.rs | 92 +++++++++++++------ src/daft-local-execution/src/pipeline.rs | 6 +- src/daft-local-execution/src/run.rs | 79 +++++++--------- src/daft-local-execution/src/runtime_stats.rs | 7 +- .../src/sinks/blocking_sink.rs | 2 +- .../src/sinks/streaming_sink.rs | 2 +- .../src/sources/source.rs | 2 +- .../src/ops/eval_expressions.rs | 30 ++++++ src/daft-recordbatch/Cargo.toml | 2 + src/daft-recordbatch/src/lib.rs | 55 +++++++++++ 16 files changed, 281 insertions(+), 135 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c67ba07d17..7ce8714f46 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -367,7 +367,7 @@ version = "3.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff6e472cdea888a4bd64f342f09b3f50e1886d32afe8df3d663c01140b811b18" dependencies = [ - "event-listener 5.3.1", + "event-listener 5.4.0", "event-listener-strategy", "pin-project-lite", ] @@ -1351,9 +1351,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.23" +version = "4.5.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3135e7ec2ef7b10c6ed8950f0f792ed96ee093fa088608f1c76e569722700c84" +checksum = "9560b07a799281c7e0958b9296854d6fafd4c5f31444a7e5bb1ad6dde5ccf1bd" dependencies = [ "clap_builder", "clap_derive", @@ -1361,9 +1361,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.5.23" +version = "4.5.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "30582fc632330df2bd26877bde0c1f4470d57c582bbc070376afcd04d8cb4838" +checksum = "874e0dd3eb68bf99058751ac9712f622e61e6f393a94f7128fa26e3f02f5c7cd" dependencies = [ "anstream", "anstyle", @@ -1373,9 +1373,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.18" +version = "4.5.24" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4ac6a0c7b1a9e9a5186361f67dfa1b88213572f427fb9ab038efb2bd8c582dab" +checksum = "54b755194d6389280185988721fffba69495eed5ee9feeee9a599b53db80318c" dependencies = [ "heck 0.5.0", "proc-macro2", @@ -2340,9 +2340,9 @@ dependencies = [ "indexmap 2.7.0", "indicatif", "itertools 0.11.0", + "kanal", "lazy_static", "log", - "loole", "num-format", "pin-project", "pyo3", @@ -2518,10 +2518,12 @@ dependencies = [ "common-arrow-ffi", "common-display", "common-error", + "common-runtime", "daft-core", "daft-dsl", "daft-image", "daft-logical-plan", + "futures", "html-escape", "indexmap 2.7.0", "num-traits", @@ -2973,9 +2975,9 @@ checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" [[package]] name = "event-listener" -version = "5.3.1" +version = "5.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6032be9bd27023a771701cc49f9f053c751055f71efb2e0ae5c15809093675ba" +checksum = "3492acde4c3fc54c845eaab3eed8bd00c7a7d881f78bfc801e43a93dec1331ae" dependencies = [ "concurrent-queue", "parking", @@ -2988,7 +2990,7 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c3e4e0dd3673c1139bf041f3008816d9cf2946bbfac2945c09e523b8d7b05b2" dependencies = [ - "event-listener 5.3.1", + "event-listener 5.4.0", "pin-project-lite", ] @@ -3985,9 +3987,9 @@ dependencies = [ [[package]] name = "inventory" -version = "0.3.16" +version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5d80fade88dd420ce0d9ab6f7c58ef2272dde38db874657950f827d4982c817" +checksum = "3b31349d02fe60f80bbbab1a9402364cad7460626d6030494b08ac4a2075bf81" dependencies = [ "rustversion", ] @@ -4152,6 +4154,16 @@ dependencies = [ "simple_asn1", ] +[[package]] +name = "kanal" +version = "0.1.0-pre8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b05d55519627edaf7fd0f29981f6dc03fb52df3f5b257130eb8d0bf2801ea1d7" +dependencies = [ + "futures-core", + "lock_api", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -4330,9 +4342,9 @@ checksum = "8355be11b20d696c8f18f6cc018c4e372165b1fa8126cef092399c9951984ffa" [[package]] name = "libz-ng-sys" -version = "1.1.20" +version = "1.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f0f7295a34685977acb2e8cc8b08ee4a8dffd6cf278eeccddbe1ed55ba815d5" +checksum = "7cee1488e961a80d172564fd6fcda11d8a4ac6672c06fe008e9213fa60520c2b" dependencies = [ "cmake", "libc", @@ -4340,9 +4352,9 @@ dependencies = [ [[package]] name = "linux-raw-sys" -version = "0.4.14" +version = "0.4.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" +checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab" [[package]] name = "litemap" @@ -4366,21 +4378,11 @@ version = "0.4.22" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" -[[package]] -name = "loole" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2998397c725c822c6b2ba605fd9eb4c6a7a0810f1629ba3cc232ef4f0308d96" -dependencies = [ - "futures-core", - "futures-sink", -] - [[package]] name = "lz4" -version = "1.28.0" +version = "1.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4d1febb2b4a79ddd1980eede06a8f7902197960aa0383ffcfdd62fe723036725" +checksum = "a20b523e860d03443e98350ceaac5e71c6ba89aea7d960769ec3ce37f4de5af4" dependencies = [ "lz4-sys", ] @@ -5838,9 +5840,9 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.42" +version = "0.38.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f93dc38ecbab2eb790ff964bb77fa94faf256fd3e73285fd7ba0903b76bedb85" +checksum = "a78891ee6bf2340288408954ac787aa063d8e8817e9f53abb37c695c6d834ef6" dependencies = [ "bitflags 2.6.0", "errno", @@ -6024,9 +6026,9 @@ dependencies = [ [[package]] name = "security-framework-sys" -version = "2.13.0" +version = "2.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1863fd3768cd83c56a7f60faa4dc0d403f1b6df0a38c3c25f44b7894e45370d5" +checksum = "49db231d56a190491cb4aeda9527f1ad45345af50b0851622a7adb8c03b01c32" dependencies = [ "core-foundation-sys", "libc", @@ -6079,9 +6081,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.134" +version = "1.0.135" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d00f4175c42ee48b15416f6193a959ba3a0d67fc699a0db9ad12df9f83991c7d" +checksum = "2b0d7ba2887406110130a978386c4e1befb98c674b4fba677954e4db976630d9" dependencies = [ "indexmap 2.7.0", "itoa", @@ -6564,7 +6566,7 @@ checksum = "257822358c6f206fed78bfe6369cf959063b0644d70f88df6b19f2dadc93423e" dependencies = [ "alloca", "anyhow", - "clap 4.5.23", + "clap 4.5.24", "colorz", "glob-match", "goblin", @@ -6802,9 +6804,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.42.0" +version = "1.43.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cec9b21b0450273377fc97bd4c33a8acffc8c996c987a7c5b319a0083707551" +checksum = "3d61fa4ffa3de412bfea335c6ecff681de2b609ba3c77ef3e00e521813a9ed9e" dependencies = [ "backtrace", "bytes", @@ -6820,9 +6822,9 @@ dependencies = [ [[package]] name = "tokio-macros" -version = "2.4.0" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" +checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" dependencies = [ "proc-macro2", "quote", diff --git a/src/daft-dsl/src/expr/mod.rs b/src/daft-dsl/src/expr/mod.rs index c1966cfd5a..878cd50082 100644 --- a/src/daft-dsl/src/expr/mod.rs +++ b/src/daft-dsl/src/expr/mod.rs @@ -1286,6 +1286,35 @@ impl Expr { } } + pub fn has_compute(&self) -> bool { + match self { + Self::Column(..) => false, + Self::Literal(..) => false, + Self::Subquery(..) => false, + Self::Exists(..) => false, + Self::OuterReferenceColumn(..) => false, + Self::Function { .. } => true, + Self::ScalarFunction(..) => true, + Self::Agg(_) => true, + Self::IsIn(..) => true, + Self::Between(..) => true, + Self::BinaryOp { .. } => true, + Self::Alias(expr, ..) => expr.has_compute(), + Self::Cast(expr, ..) => expr.has_compute(), + Self::Not(expr) => expr.has_compute(), + Self::IsNull(expr) => expr.has_compute(), + Self::NotNull(expr) => expr.has_compute(), + Self::FillNull(expr, fill_value) => expr.has_compute() || fill_value.has_compute(), + Self::IfElse { + if_true, + if_false, + predicate, + } => if_true.has_compute() || if_false.has_compute() || predicate.has_compute(), + Self::InSubquery(expr, _) => expr.has_compute(), + Self::List(..) => true, + } + } + pub fn eq_null_safe(self: ExprRef, other: ExprRef) -> ExprRef { binary_op(Operator::EqNullSafe, self, other) } diff --git a/src/daft-local-execution/Cargo.toml b/src/daft-local-execution/Cargo.toml index e235b2d667..cacaf11c9d 100644 --- a/src/daft-local-execution/Cargo.toml +++ b/src/daft-local-execution/Cargo.toml @@ -26,9 +26,9 @@ futures = {workspace = true} indexmap = {workspace = true} indicatif = "0.17.9" itertools = {workspace = true} +kanal = "0.1.0-pre8" lazy_static = {workspace = true} log = {workspace = true} -loole = "0.4.0" num-format = {workspace = true} pin-project = "1" pyo3 = {workspace = true, optional = true} diff --git a/src/daft-local-execution/src/channel.rs b/src/daft-local-execution/src/channel.rs index 8adaae0616..9db656fe53 100644 --- a/src/daft-local-execution/src/channel.rs +++ b/src/daft-local-execution/src/channel.rs @@ -1,29 +1,25 @@ #[derive(Clone)] -pub(crate) struct Sender(loole::Sender); +pub(crate) struct Sender(kanal::AsyncSender); impl Sender { - pub(crate) async fn send(&self, val: T) -> Result<(), loole::SendError> { - self.0.send_async(val).await + pub(crate) async fn send(&self, val: T) -> Result<(), kanal::SendError> { + self.0.send(val).await } } #[derive(Clone)] -pub(crate) struct Receiver(loole::Receiver); +pub(crate) struct Receiver(kanal::AsyncReceiver); impl Receiver { pub(crate) async fn recv(&self) -> Option { - self.0.recv_async().await.ok() + self.0.recv().await.ok() } - pub(crate) fn blocking_recv(&self) -> Option { - self.0.recv().ok() - } - - pub(crate) fn into_inner(self) -> loole::Receiver { + pub(crate) fn into_inner(self) -> kanal::AsyncReceiver { self.0 } } pub(crate) fn create_channel(buffer_size: usize) -> (Sender, Receiver) { - let (tx, rx) = loole::bounded(buffer_size); + let (tx, rx) = kanal::bounded_async::(buffer_size); (Sender(tx), Receiver(rx)) } @@ -36,7 +32,7 @@ pub(crate) fn create_ordering_aware_receiver_channel( ) -> (Vec>, OrderingAwareReceiver) { match ordered { true => { - let (senders, receiver) = (0..buffer_size).map(|_| create_channel::(1)).unzip(); + let (senders, receiver) = (0..buffer_size).map(|_| create_channel::(0)).unzip(); ( senders, OrderingAwareReceiver::InOrder(RoundRobinReceiver::new(receiver)), diff --git a/src/daft-local-execution/src/dispatcher.rs b/src/daft-local-execution/src/dispatcher.rs index 9e487028f1..05fdaf0b00 100644 --- a/src/daft-local-execution/src/dispatcher.rs +++ b/src/daft-local-execution/src/dispatcher.rs @@ -95,7 +95,7 @@ impl DispatchSpawner for RoundRobinDispatcher { runtime_handle: &mut RuntimeHandle, ) -> SpawnedDispatchResult { let (worker_senders, worker_receivers): (Vec<_>, Vec<_>) = - (0..num_workers).map(|_| create_channel(1)).unzip(); + (0..num_workers).map(|_| create_channel(0)).unzip(); let morsel_size = self.morsel_size; let task = runtime_handle.spawn(async move { Self::dispatch_inner(worker_senders, input_receivers, morsel_size).await @@ -213,7 +213,7 @@ impl DispatchSpawner for PartitionedDispatcher { runtime_handle: &mut RuntimeHandle, ) -> SpawnedDispatchResult { let (worker_senders, worker_receivers): (Vec<_>, Vec<_>) = - (0..num_workers).map(|_| create_channel(1)).unzip(); + (0..num_workers).map(|_| create_channel(0)).unzip(); let partition_by = self.partition_by.clone(); let dispatch_task = runtime_handle.spawn(async move { Self::dispatch_inner(worker_senders, input_receivers, partition_by).await diff --git a/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs b/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs index accc41eccb..317fcbc2ed 100644 --- a/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs +++ b/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs @@ -241,7 +241,7 @@ impl PipelineNode for IntermediateNode { let num_workers = op.max_concurrency().context(PipelineExecutionSnafu { node_name: self.name(), })?; - let (destination_sender, destination_receiver) = create_channel(1); + let (destination_sender, destination_receiver) = create_channel(0); let counting_sender = CountingSender::new(destination_sender, self.runtime_stats.clone(), progress_bar); diff --git a/src/daft-local-execution/src/intermediate_ops/project.rs b/src/daft-local-execution/src/intermediate_ops/project.rs index c41977e829..e5763b8493 100644 --- a/src/daft-local-execution/src/intermediate_ops/project.rs +++ b/src/daft-local-execution/src/intermediate_ops/project.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{cmp::max, sync::Arc}; use common_error::{DaftError, DaftResult}; use daft_dsl::{functions::python::get_resource_request, ExprRef}; @@ -12,21 +12,74 @@ use super::intermediate_op::{ }; use crate::{ExecutionTaskSpawner, NUM_CPUS}; +fn num_parallel_exprs(projection: &[ExprRef]) -> usize { + max( + projection.iter().filter(|expr| expr.has_compute()).count(), + 1, + ) +} + pub struct ProjectOperator { projection: Arc>, + max_concurrency: usize, + parallel_exprs: usize, memory_request: u64, } impl ProjectOperator { - pub fn new(projection: Vec) -> Self { + pub fn new(projection: Vec) -> DaftResult { let memory_request = get_resource_request(&projection) .and_then(|req| req.memory_bytes()) .map(|m| m as u64) .unwrap_or(0); - Self { + let (max_concurrency, parallel_exprs) = Self::get_optimal_allocation(&projection)?; + Ok(Self { projection: Arc::new(projection), memory_request, - } + max_concurrency, + parallel_exprs, + }) + } + + // This function is used to determine the optimal allocation of concurrency and expression parallelism + fn get_optimal_allocation(projection: &[ExprRef]) -> DaftResult<(usize, usize)> { + let resource_request = get_resource_request(projection); + // The number of CPUs available for the operator. + let available_cpus = match resource_request { + // If the resource request specifies a number of CPUs, the available cpus is the number of actual CPUs + // divided by the requested number of CPUs, clamped to (1, NUM_CPUS). + // E.g. if the resource request specifies 2 CPUs and NUM_CPUS is 4, the number of available cpus is 2. + Some(resource_request) if resource_request.num_cpus().is_some() => { + let requested_num_cpus = resource_request.num_cpus().unwrap(); + if requested_num_cpus > *NUM_CPUS as f64 { + Err(DaftError::ValueError(format!( + "Requested {} CPUs but found only {} available", + requested_num_cpus, *NUM_CPUS + ))) + } else { + Ok( + (*NUM_CPUS as f64 / requested_num_cpus).clamp(1.0, *NUM_CPUS as f64) + as usize, + ) + } + } + _ => Ok(*NUM_CPUS), + }?; + + let max_parallel_exprs = num_parallel_exprs(projection); + + // Calculate optimal concurrency using ceiling division + // Example: For 128 CPUs and 60 parallel expressions: + // max_concurrency = 128.div_ceil(60) = 3 concurrent tasks + let max_concurrency = available_cpus.div_ceil(max_parallel_exprs); + + // Calculate actual parallel expressions per task using floor division + // Example: For 128 CPUs and 3 concurrent tasks: + // num_parallel_exprs = 128 / 3 = 42 parallel expressions per task + // This ensures even distribution across concurrent tasks + let num_parallel_exprs = available_cpus / max_concurrency; + + Ok((max_concurrency, num_parallel_exprs)) } } @@ -39,12 +92,19 @@ impl IntermediateOperator for ProjectOperator { task_spawner: &ExecutionTaskSpawner, ) -> IntermediateOpExecuteResult { let projection = self.projection.clone(); + let num_parallel_exprs = self.parallel_exprs; let memory_request = self.memory_request; task_spawner .spawn_with_memory_request( memory_request, async move { - let out = input.eval_expression_list(&projection)?; + let out = if num_parallel_exprs > 1 { + input + .par_eval_expression_list(&projection, num_parallel_exprs) + .await? + } else { + input.eval_expression_list(&projection)? + }; Ok(( state, IntermediateOperatorResult::NeedMoreInput(Some(Arc::new(out))), @@ -78,26 +138,6 @@ impl IntermediateOperator for ProjectOperator { } fn max_concurrency(&self) -> DaftResult { - let resource_request = get_resource_request(&self.projection); - match resource_request { - // If the resource request specifies a number of CPUs, the max concurrency is the number of CPUs - // divided by the requested number of CPUs, clamped to (1, NUM_CPUS). - // E.g. if the resource request specifies 2 CPUs and NUM_CPUS is 4, the max concurrency is 2. - Some(resource_request) if resource_request.num_cpus().is_some() => { - let requested_num_cpus = resource_request.num_cpus().unwrap(); - if requested_num_cpus > *NUM_CPUS as f64 { - Err(DaftError::ValueError(format!( - "Requested {} CPUs but found only {} available", - requested_num_cpus, *NUM_CPUS - ))) - } else { - Ok( - (*NUM_CPUS as f64 / requested_num_cpus).clamp(1.0, *NUM_CPUS as f64) - as usize, - ) - } - } - _ => Ok(*NUM_CPUS), - } + Ok(self.max_concurrency) } } diff --git a/src/daft-local-execution/src/pipeline.rs b/src/daft-local-execution/src/pipeline.rs index 49844f2c61..54aa623d48 100644 --- a/src/daft-local-execution/src/pipeline.rs +++ b/src/daft-local-execution/src/pipeline.rs @@ -140,7 +140,11 @@ pub fn physical_plan_to_pipeline( stats_state, .. }) => { - let proj_op = ProjectOperator::new(projection.clone()); + let proj_op = ProjectOperator::new(projection.clone()).with_context(|_| { + PipelineCreationSnafu { + plan_name: physical_plan.name(), + } + })?; let child_node = physical_plan_to_pipeline(input, psets, cfg)?; IntermediateNode::new(Arc::new(proj_op), vec![child_node], stats_state.clone()).boxed() } diff --git a/src/daft-local-execution/src/run.rs b/src/daft-local-execution/src/run.rs index cddcf28248..a3b486125c 100644 --- a/src/daft-local-execution/src/run.rs +++ b/src/daft-local-execution/src/run.rs @@ -16,8 +16,7 @@ use daft_micropartition::{ partitioning::{InMemoryPartitionSetCache, MicroPartitionSet, PartitionSetCache}, MicroPartition, MicroPartitionRef, }; -use futures::{FutureExt, Stream}; -use loole::RecvFuture; +use futures::Stream; use tokio_util::sync::CancellationToken; #[cfg(feature = "python")] use { @@ -201,7 +200,7 @@ impl NativeExecutor { refresh_chrome_trace(); let cancel = self.cancel.clone(); let pipeline = physical_plan_to_pipeline(&physical_plan, psets, &cfg)?; - let (tx, rx) = create_channel(results_buffer_size.unwrap_or(1)); + let (tx, rx) = create_channel(results_buffer_size.unwrap_or(0)); let rt = self.runtime.clone(); let pb_manager = self.pb_manager.clone(); @@ -357,7 +356,7 @@ fn should_enable_progress_bar() -> bool { } pub struct ExecutionEngineReceiverIterator { - receiver: Receiver>, + receiver: kanal::Receiver>, handle: Option>>, } @@ -365,7 +364,7 @@ impl Iterator for ExecutionEngineReceiverIterator { type Item = DaftResult>; fn next(&mut self) -> Option { - match self.receiver.blocking_recv() { + match self.receiver.recv().ok() { Some(part) => Some(Ok(part)), None => { if self.handle.is_some() { @@ -387,41 +386,6 @@ impl Iterator for ExecutionEngineReceiverIterator { } } -pub struct ExecutionEngineReceiverStream { - receive_fut: RecvFuture>, - handle: Option>>, -} - -impl Stream for ExecutionEngineReceiverStream { - type Item = DaftResult>; - - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - match self.receive_fut.poll_unpin(cx) { - std::task::Poll::Ready(Ok(part)) => std::task::Poll::Ready(Some(Ok(part))), - std::task::Poll::Ready(Err(_)) => { - if self.handle.is_some() { - let join_result = self - .handle - .take() - .unwrap() - .join() - .expect("Execution engine thread panicked"); - match join_result { - Ok(()) => std::task::Poll::Ready(None), - Err(e) => std::task::Poll::Ready(Some(Err(e))), - } - } else { - std::task::Poll::Ready(None) - } - } - std::task::Poll::Pending => std::task::Poll::Pending, - } - } -} - pub struct ExecutionEngineResult { handle: std::thread::JoinHandle>, receiver: Receiver>, @@ -429,10 +393,37 @@ pub struct ExecutionEngineResult { impl ExecutionEngineResult { pub fn into_stream(self) -> impl Stream>> { - ExecutionEngineReceiverStream { - receive_fut: self.receiver.into_inner().recv_async(), - handle: Some(self.handle), + struct StreamState { + receiver: Receiver>, + handle: Option>>, } + + let state = StreamState { + receiver: self.receiver, + handle: Some(self.handle), + }; + + futures::stream::unfold(state, |mut state| async { + match state.receiver.recv().await { + Some(part) => Some((Ok(part), state)), + None => { + if state.handle.is_some() { + let join_result = state + .handle + .take() + .unwrap() + .join() + .expect("Execution engine thread panicked"); + match join_result { + Ok(()) => None, + Err(e) => Some((Err(e), state)), + } + } else { + None + } + } + } + }) } } @@ -442,7 +433,7 @@ impl IntoIterator for ExecutionEngineResult { fn into_iter(self) -> Self::IntoIter { ExecutionEngineReceiverIterator { - receiver: self.receiver, + receiver: self.receiver.into_inner().to_sync(), handle: Some(self.handle), } } diff --git a/src/daft-local-execution/src/runtime_stats.rs b/src/daft-local-execution/src/runtime_stats.rs index 835997c5b7..7cc6e77e2a 100644 --- a/src/daft-local-execution/src/runtime_stats.rs +++ b/src/daft-local-execution/src/runtime_stats.rs @@ -9,7 +9,7 @@ use std::{ }; use daft_micropartition::MicroPartition; -use loole::SendError; +use kanal::SendError; use tracing::{instrument::Instrumented, Instrument}; use crate::{ @@ -174,10 +174,7 @@ impl CountingSender { } } #[inline] - pub(crate) async fn send( - &self, - v: Arc, - ) -> Result<(), SendError>> { + pub(crate) async fn send(&self, v: Arc) -> Result<(), SendError> { self.rt.mark_rows_emitted(v.len() as u64); if let Some(ref pb) = self.progress_bar { pb.render(); diff --git a/src/daft-local-execution/src/sinks/blocking_sink.rs b/src/daft-local-execution/src/sinks/blocking_sink.rs index efb09c2be5..ed68042be7 100644 --- a/src/daft-local-execution/src/sinks/blocking_sink.rs +++ b/src/daft-local-execution/src/sinks/blocking_sink.rs @@ -184,7 +184,7 @@ impl PipelineNode for BlockingSinkNode { progress_bar.clone(), ); - let (destination_sender, destination_receiver) = create_channel(1); + let (destination_sender, destination_receiver) = create_channel(0); let counting_sender = CountingSender::new(destination_sender, self.runtime_stats.clone(), progress_bar); diff --git a/src/daft-local-execution/src/sinks/streaming_sink.rs b/src/daft-local-execution/src/sinks/streaming_sink.rs index d054be142b..b8dd330195 100644 --- a/src/daft-local-execution/src/sinks/streaming_sink.rs +++ b/src/daft-local-execution/src/sinks/streaming_sink.rs @@ -234,7 +234,7 @@ impl PipelineNode for StreamingSinkNode { )); } - let (destination_sender, destination_receiver) = create_channel(1); + let (destination_sender, destination_receiver) = create_channel(0); let counting_sender = CountingSender::new(destination_sender, self.runtime_stats.clone(), progress_bar); diff --git a/src/daft-local-execution/src/sources/source.rs b/src/daft-local-execution/src/sources/source.rs index 6fad499f57..bad39d4070 100644 --- a/src/daft-local-execution/src/sources/source.rs +++ b/src/daft-local-execution/src/sources/source.rs @@ -117,7 +117,7 @@ impl PipelineNode for SourceNode { ); let source = self.source.clone(); let io_stats = self.io_stats.clone(); - let (destination_sender, destination_receiver) = create_channel(1); + let (destination_sender, destination_receiver) = create_channel(0); let counting_sender = CountingSender::new(destination_sender, self.runtime_stats.clone(), progress_bar); runtime_handle.spawn( diff --git a/src/daft-micropartition/src/ops/eval_expressions.rs b/src/daft-micropartition/src/ops/eval_expressions.rs index 14baff4c37..29a6be0531 100644 --- a/src/daft-micropartition/src/ops/eval_expressions.rs +++ b/src/daft-micropartition/src/ops/eval_expressions.rs @@ -54,6 +54,36 @@ impl MicroPartition { )) } + pub async fn par_eval_expression_list( + &self, + exprs: &[ExprRef], + num_parallel_tasks: usize, + ) -> DaftResult { + let io_stats = IOStatsContext::new("MicroPartition::eval_expression_list"); + + let expected_schema = infer_schema(exprs, &self.schema)?; + + let tables = self.tables_or_read(io_stats)?; + + let evaluated_table_futs = tables + .iter() + .map(|table| table.par_eval_expression_list(exprs, num_parallel_tasks)); + + let evaluated_tables = futures::future::try_join_all(evaluated_table_futs).await?; + + let eval_stats = self + .statistics + .as_ref() + .map(|table_statistics| table_statistics.eval_expression_list(exprs, &expected_schema)) + .transpose()?; + + Ok(Self::new_loaded( + expected_schema.into(), + Arc::new(evaluated_tables), + eval_stats, + )) + } + pub fn explode(&self, exprs: &[ExprRef]) -> DaftResult { let io_stats = IOStatsContext::new("MicroPartition::explode"); diff --git a/src/daft-recordbatch/Cargo.toml b/src/daft-recordbatch/Cargo.toml index c6dc83e282..5770e04d10 100644 --- a/src/daft-recordbatch/Cargo.toml +++ b/src/daft-recordbatch/Cargo.toml @@ -4,10 +4,12 @@ comfy-table = {workspace = true} common-arrow-ffi = {path = "../common/arrow-ffi", default-features = false} common-display = {path = "../common/display", default-features = false} common-error = {path = "../common/error", default-features = false} +common-runtime = {path = "../common/runtime", default-features = false} daft-core = {path = "../daft-core", default-features = false} daft-dsl = {path = "../daft-dsl", default-features = false} daft-image = {path = "../daft-image", default-features = false} daft-logical-plan = {path = "../daft-logical-plan", default-features = false} +futures = {workspace = true} html-escape = {workspace = true} indexmap = {workspace = true} num-traits = {workspace = true} diff --git a/src/daft-recordbatch/src/lib.rs b/src/daft-recordbatch/src/lib.rs index 8107c2df5b..44cb9dcc5d 100644 --- a/src/daft-recordbatch/src/lib.rs +++ b/src/daft-recordbatch/src/lib.rs @@ -13,6 +13,7 @@ use std::{ use arrow2::array::Array; use common_display::table_display::{make_comfy_table, StrValue}; use common_error::{DaftError, DaftResult}; +use common_runtime::get_compute_runtime; use daft_core::{ array::ops::{ full::FullNull, DaftApproxCountDistinctAggable, DaftHllSketchAggable, GroupIndices, @@ -24,6 +25,7 @@ use daft_dsl::{ LiteralValue, SketchType, }; use daft_logical_plan::FileInfos; +use futures::{StreamExt, TryStreamExt}; use num_traits::ToPrimitive; #[cfg(feature = "python")] pub mod ffi; @@ -675,6 +677,59 @@ impl RecordBatch { .map(|e| self.eval_expression(e)) .try_collect()?; + self.process_eval_results(exprs, result_series) + } + + pub async fn par_eval_expression_list( + &self, + exprs: &[ExprRef], + num_parallel_tasks: usize, + ) -> DaftResult { + // Partition the expressions into compute and non-compute + let (compute_exprs, non_compute_exprs): (Vec<_>, Vec<_>) = exprs + .iter() + .cloned() + .enumerate() + .partition(|(_, e)| e.has_compute()); + + // Evaluate non-compute expressions + let non_compute_results = non_compute_exprs + .into_iter() + .map(|(i, e)| (i, self.eval_expression(&e))) + .collect::>(); + + // Spawn tasks for the compute expressions + let compute_runtime = get_compute_runtime(); + let compute_futures = compute_exprs.into_iter().map(|(i, e)| { + let table = self.clone(); + compute_runtime.spawn(async move { (i, table.eval_expression(&e)) }) + }); + + // Collect the results of the compute expressions + let compute_results = futures::stream::iter(compute_futures) + .buffered(num_parallel_tasks) + .try_collect::>() + .await?; + + // Combine and sort by original index + let mut all_results = non_compute_results; + all_results.extend(compute_results); + all_results.sort_by_key(|(i, _)| *i); + + // Extract just the results in order + let result_series = all_results + .into_iter() + .map(|(_, result)| result) + .collect::>>()?; + + self.process_eval_results(exprs, result_series) + } + + fn process_eval_results( + &self, + exprs: &[ExprRef], + result_series: Vec, + ) -> DaftResult { let fields: Vec<_> = result_series.iter().map(|s| s.field().clone()).collect(); let mut seen = HashSet::new();