From eb040ce981babbb07fe799d77d7bc594caf525f0 Mon Sep 17 00:00:00 2001 From: Colin Ho Date: Tue, 16 Jul 2024 17:48:31 -0700 Subject: [PATCH] [FEAT] Add concat to new execution model + buffered intermediate ops (#2519) This PR adds Concat and buffered intermediate ops to the new execution model, in addition to some refactors to support this change. - There are now two types of sinks, single input (limit, agg) and double input (concat, join). - Intermediate ops can now buffer their outputs via OperatorTaskState. - Add a channel abstraction for in-order vs out-of-order channels. The in-order channel uses a round robin implementation over a vec of channels with capacity 1, while the out-of-order channel just uses a single MPSC channel with capacity `n`. - Removed Pipelines and opt for Actors. Each actor has control over a single source / op / sink, can control it's own parallelism, and can also configure it's own input channel. --------- Co-authored-by: Colin Ho --- Cargo.lock | 1 - src/daft-local-execution/Cargo.toml | 1 - src/daft-local-execution/src/channel.rs | 157 +++++++++++ .../src/create_pipeline.rs | 93 ------- .../src/intermediate_ops/intermediate_op.rs | 142 +++++++++- src/daft-local-execution/src/lib.rs | 16 +- src/daft-local-execution/src/pipeline.rs | 256 ------------------ src/daft-local-execution/src/run.rs | 115 +++++++- .../src/sinks/aggregate.rs | 6 +- src/daft-local-execution/src/sinks/concat.rs | 55 ++++ src/daft-local-execution/src/sinks/limit.rs | 6 +- src/daft-local-execution/src/sinks/mod.rs | 1 + src/daft-local-execution/src/sinks/sink.rs | 136 +++++++++- .../src/sources/in_memory.rs | 4 +- .../src/sources/scan_task.rs | 26 +- .../src/sources/source.rs | 34 ++- src/daft-physical-plan/src/lib.rs | 4 +- src/daft-physical-plan/src/local_plan.rs | 27 +- src/daft-physical-plan/src/translate.rs | 6 + 19 files changed, 686 insertions(+), 400 deletions(-) create mode 100644 src/daft-local-execution/src/channel.rs delete mode 100644 src/daft-local-execution/src/create_pipeline.rs delete mode 100644 src/daft-local-execution/src/pipeline.rs create mode 100644 src/daft-local-execution/src/sinks/concat.rs diff --git a/Cargo.lock b/Cargo.lock index 9ca82bdc9b..467baffdc8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1817,7 +1817,6 @@ dependencies = [ name = "daft-local-execution" version = "0.2.0-dev0" dependencies = [ - "async-trait", "common-error", "daft-core", "daft-dsl", diff --git a/src/daft-local-execution/Cargo.toml b/src/daft-local-execution/Cargo.toml index 57033c6a6b..d0761e8b71 100644 --- a/src/daft-local-execution/Cargo.toml +++ b/src/daft-local-execution/Cargo.toml @@ -1,5 +1,4 @@ [dependencies] -async-trait = {workspace = true} common-error = {path = "../common/error", default-features = false} daft-core = {path = "../daft-core", default-features = false} daft-dsl = {path = "../daft-dsl", default-features = false} diff --git a/src/daft-local-execution/src/channel.rs b/src/daft-local-execution/src/channel.rs new file mode 100644 index 0000000000..7b05376451 --- /dev/null +++ b/src/daft-local-execution/src/channel.rs @@ -0,0 +1,157 @@ +use std::sync::Arc; + +use common_error::DaftResult; +use daft_micropartition::MicroPartition; + +pub type SingleSender = tokio::sync::mpsc::Sender>>; +pub type SingleReceiver = tokio::sync::mpsc::Receiver>>; + +pub fn spawn_compute_task(future: F) +where + F: std::future::Future> + Send + 'static, +{ + tokio::spawn(async move { + let _ = future.await; + }); +} + +pub fn create_single_channel(buffer_size: usize) -> (SingleSender, SingleReceiver) { + tokio::sync::mpsc::channel(buffer_size) +} + +pub fn create_channel(buffer_size: usize, in_order: bool) -> (MultiSender, MultiReceiver) { + if in_order { + let (senders, receivers) = (0..buffer_size).map(|_| create_single_channel(1)).unzip(); + let sender = MultiSender::InOrder(InOrderSender::new(senders)); + let receiver = MultiReceiver::InOrder(InOrderReceiver::new(receivers)); + (sender, receiver) + } else { + let (sender, receiver) = create_single_channel(buffer_size); + let sender = MultiSender::OutOfOrder(OutOfOrderSender::new(sender)); + let receiver = MultiReceiver::OutOfOrder(OutOfOrderReceiver::new(receiver)); + (sender, receiver) + } +} + +pub enum MultiSender { + InOrder(InOrderSender), + OutOfOrder(OutOfOrderSender), +} + +impl MultiSender { + pub fn get_next_sender(&mut self) -> SingleSender { + match self { + Self::InOrder(sender) => sender.get_next_sender(), + Self::OutOfOrder(sender) => sender.get_sender(), + } + } + + pub fn buffer_size(&self) -> usize { + match self { + Self::InOrder(sender) => sender.senders.len(), + Self::OutOfOrder(sender) => sender.sender.max_capacity(), + } + } + + pub fn in_order(&self) -> bool { + match self { + Self::InOrder(_) => true, + Self::OutOfOrder(_) => false, + } + } +} +pub struct InOrderSender { + senders: Vec, + curr_sender_idx: usize, +} + +impl InOrderSender { + pub fn new(senders: Vec) -> Self { + Self { + senders, + curr_sender_idx: 0, + } + } + + pub fn get_next_sender(&mut self) -> SingleSender { + let next_idx = self.curr_sender_idx; + self.curr_sender_idx = (next_idx + 1) % self.senders.len(); + self.senders[next_idx].clone() + } +} + +pub struct OutOfOrderSender { + sender: SingleSender, +} + +impl OutOfOrderSender { + pub fn new(sender: SingleSender) -> Self { + Self { sender } + } + + pub fn get_sender(&self) -> SingleSender { + self.sender.clone() + } +} + +pub enum MultiReceiver { + InOrder(InOrderReceiver), + OutOfOrder(OutOfOrderReceiver), +} + +impl MultiReceiver { + pub async fn recv(&mut self) -> Option>> { + match self { + Self::InOrder(receiver) => receiver.recv().await, + Self::OutOfOrder(receiver) => receiver.recv().await, + } + } +} + +pub struct InOrderReceiver { + receivers: Vec, + curr_receiver_idx: usize, + is_done: bool, +} + +impl InOrderReceiver { + pub fn new(receivers: Vec) -> Self { + Self { + receivers, + curr_receiver_idx: 0, + is_done: false, + } + } + + pub async fn recv(&mut self) -> Option>> { + if self.is_done { + return None; + } + for i in 0..self.receivers.len() { + let next_idx = (i + self.curr_receiver_idx) % self.receivers.len(); + if let Some(val) = self.receivers[next_idx].recv().await { + self.curr_receiver_idx = (next_idx + 1) % self.receivers.len(); + return Some(val); + } + } + self.is_done = true; + None + } +} + +pub struct OutOfOrderReceiver { + receiver: SingleReceiver, +} + +impl OutOfOrderReceiver { + pub fn new(receiver: SingleReceiver) -> Self { + Self { receiver } + } + + pub async fn recv(&mut self) -> Option>> { + if let Some(val) = self.receiver.recv().await { + return Some(val); + } + None + } +} diff --git a/src/daft-local-execution/src/create_pipeline.rs b/src/daft-local-execution/src/create_pipeline.rs deleted file mode 100644 index 46faebe059..0000000000 --- a/src/daft-local-execution/src/create_pipeline.rs +++ /dev/null @@ -1,93 +0,0 @@ -use std::{collections::HashMap, sync::Arc}; - -use daft_dsl::Expr; -use daft_micropartition::MicroPartition; -use daft_physical_plan::{ - Filter, InMemoryScan, Limit, LocalPhysicalPlan, PhysicalScan, Project, UnGroupedAggregate, -}; -use daft_plan::populate_aggregation_stages; - -use crate::{ - intermediate_ops::{ - aggregate::AggregateOperator, filter::FilterOperator, project::ProjectOperator, - }, - pipeline::Pipeline, - sinks::{aggregate::AggregateSink, limit::LimitSink}, - sources::{in_memory::InMemorySource, scan_task::ScanTaskSource}, -}; - -pub fn physical_plan_to_pipeline( - physical_plan: &LocalPhysicalPlan, - psets: &HashMap>>, -) -> Pipeline { - match physical_plan { - LocalPhysicalPlan::PhysicalScan(PhysicalScan { scan_tasks, .. }) => { - Pipeline::new(Box::new(ScanTaskSource::new(scan_tasks.clone()))) - } - LocalPhysicalPlan::InMemoryScan(InMemoryScan { info, .. }) => { - let partitions = psets.get(&info.cache_key).expect("Cache key not found"); - Pipeline::new(Box::new(InMemorySource::new(partitions.clone()))) - } - LocalPhysicalPlan::Project(Project { - input, projection, .. - }) => { - let current_pipeline = physical_plan_to_pipeline(input, psets); - let proj_op = ProjectOperator::new(projection.clone()); - current_pipeline.with_intermediate_operator(Box::new(proj_op)) - } - LocalPhysicalPlan::Filter(Filter { - input, predicate, .. - }) => { - let current_pipeline = physical_plan_to_pipeline(input, psets); - let filter_op = FilterOperator::new(predicate.clone()); - current_pipeline.with_intermediate_operator(Box::new(filter_op)) - } - LocalPhysicalPlan::Limit(Limit { - input, num_rows, .. - }) => { - let current_pipeline = physical_plan_to_pipeline(input, psets); - let sink = LimitSink::new(*num_rows as usize); - let current_pipeline = current_pipeline.with_sink(Box::new(sink)); - - Pipeline::new(Box::new(current_pipeline)) - } - LocalPhysicalPlan::UnGroupedAggregate(UnGroupedAggregate { - input, - aggregations, - schema, - .. - }) => { - let current_pipeline = physical_plan_to_pipeline(input, psets); - let (first_stage_aggs, second_stage_aggs, final_exprs) = - populate_aggregation_stages(aggregations, schema, &[]); - - let first_stage_agg_op = AggregateOperator::new( - first_stage_aggs - .values() - .cloned() - .map(|e| Arc::new(Expr::Agg(e.clone()))) - .collect(), - vec![], - ); - let current_pipeline = - current_pipeline.with_intermediate_operator(Box::new(first_stage_agg_op)); - - let second_stage_agg_sink = AggregateSink::new( - second_stage_aggs - .values() - .cloned() - .map(|e| Arc::new(Expr::Agg(e.clone()))) - .collect(), - vec![], - ); - let current_pipeline = current_pipeline.with_sink(Box::new(second_stage_agg_sink)); - - let final_stage_project = ProjectOperator::new(final_exprs); - let new_pipeline = Pipeline::new(Box::new(current_pipeline)); - new_pipeline.with_intermediate_operator(Box::new(final_stage_project)) - } - _ => { - unimplemented!("Physical plan not supported: {}", physical_plan.name()); - } - } -} diff --git a/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs b/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs index 3d8e5dd52a..f2af72c9ca 100644 --- a/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs +++ b/src/daft-local-execution/src/intermediate_ops/intermediate_op.rs @@ -1,11 +1,151 @@ -use std::sync::Arc; +use std::{env, sync::Arc}; use common_error::DaftResult; use daft_micropartition::MicroPartition; +use crate::{ + channel::{ + create_channel, create_single_channel, spawn_compute_task, MultiReceiver, MultiSender, + SingleReceiver, SingleSender, + }, + NUM_CPUS, +}; + pub trait IntermediateOperator: dyn_clone::DynClone + Send + Sync { fn execute(&self, input: &Arc) -> DaftResult>; fn name(&self) -> &'static str; } dyn_clone::clone_trait_object!(IntermediateOperator); + +/// The number of rows that will trigger an intermediate operator to output its data. +fn get_output_threshold() -> usize { + env::var("OUTPUT_THRESHOLD") + .unwrap_or_else(|_| "1000".to_string()) + .parse() + .expect("OUTPUT_THRESHOLD must be a number") +} + +/// State of an operator task, used to buffer data and output it when a threshold is reached. +pub struct OperatorTaskState { + pub buffer: Vec>, + pub curr_len: usize, + pub threshold: usize, +} + +impl OperatorTaskState { + pub fn new() -> Self { + Self { + buffer: vec![], + curr_len: 0, + threshold: get_output_threshold(), + } + } + + // Add a micro partition to the buffer. + pub fn add(&mut self, part: Arc) { + self.curr_len += part.len(); + self.buffer.push(part); + } + + // Try to clear the buffer if the threshold is reached. + pub fn try_clear(&mut self) -> Option>> { + if self.curr_len >= self.threshold { + self.clear() + } else { + None + } + } + + // Clear the buffer and return the concatenated MicroPartition. + pub fn clear(&mut self) -> Option>> { + if self.buffer.is_empty() { + return None; + } + let concated = + MicroPartition::concat(&self.buffer.iter().map(|x| x.as_ref()).collect::>()) + .map(Arc::new); + self.buffer.clear(); + self.curr_len = 0; + Some(concated) + } +} + +/// An actor that runs an intermediate operator. +/// The actor can run multiple tasks in parallel, depending on the buffer size of the sender. +/// Each parallel task is mapped to a single sender. +pub struct IntermediateOpActor { + sender: MultiSender, + receiver: MultiReceiver, + op: Box, +} + +impl IntermediateOpActor { + pub fn new( + op: Box, + receiver: MultiReceiver, + sender: MultiSender, + ) -> Self { + Self { + op, + receiver, + sender, + } + } + + // Run a single instance of the operator. + async fn run_single( + mut receiver: SingleReceiver, + sender: SingleSender, + op: Box, + ) -> DaftResult<()> { + let mut state = OperatorTaskState::new(); + while let Some(morsel) = receiver.recv().await { + let result = op.execute(&morsel?)?; + state.add(result); + if let Some(part) = state.try_clear() { + let _ = sender.send(part).await; + } + } + if let Some(part) = state.clear() { + let _ = sender.send(part).await; + } + Ok(()) + } + + // Create and run parallel tasks for the operator. + pub async fn run_parallel(&mut self) -> DaftResult<()> { + // Initialize senders to send data to parallel tasks. + let mut inner_task_senders: Vec = + Vec::with_capacity(self.sender.buffer_size()); + let mut curr_task_idx = 0; + + while let Some(morsel) = self.receiver.recv().await { + // If the task sender already exists for the current index, send the data to it. + if let Some(s) = inner_task_senders.get(curr_task_idx) { + let _ = s.send(morsel).await; + } + // Otherwise, create a new task and send the data to it. + else { + let (task_sender, task_receiver) = create_single_channel(1); + let op = self.op.clone(); + let next_sender = self.sender.get_next_sender(); + spawn_compute_task(Self::run_single(task_receiver, next_sender, op)); + let _ = task_sender.send(morsel).await; + + inner_task_senders.push(task_sender); + } + curr_task_idx = (curr_task_idx + 1) % self.sender.buffer_size(); + } + Ok(()) + } +} + +pub fn run_intermediate_op(op: Box, send_to: MultiSender) -> MultiSender { + let (sender, receiver) = create_channel(*NUM_CPUS, send_to.in_order()); + let mut actor = IntermediateOpActor::new(op, receiver, send_to); + tokio::spawn(async move { + let _ = actor.run_parallel().await; + }); + sender +} diff --git a/src/daft-local-execution/src/lib.rs b/src/daft-local-execution/src/lib.rs index 7f70f11fed..8db6b11957 100644 --- a/src/daft-local-execution/src/lib.rs +++ b/src/daft-local-execution/src/lib.rs @@ -1,22 +1,16 @@ -mod create_pipeline; +mod channel; mod intermediate_ops; -mod pipeline; mod run; mod sinks; mod sources; -use std::sync::Arc; - -use common_error::{DaftError, DaftResult}; -use daft_micropartition::MicroPartition; +use common_error::DaftError; pub use run::NativeExecutor; use snafu::Snafu; -type Sender = tokio::sync::mpsc::Sender>>; -type Receiver = tokio::sync::mpsc::Receiver>>; - -pub fn create_channel() -> (Sender, Receiver) { - tokio::sync::mpsc::channel(1) +use lazy_static::lazy_static; +lazy_static! { + pub static ref NUM_CPUS: usize = std::thread::available_parallelism().unwrap().get(); } #[cfg(feature = "python")] diff --git a/src/daft-local-execution/src/pipeline.rs b/src/daft-local-execution/src/pipeline.rs deleted file mode 100644 index bf1b34292a..0000000000 --- a/src/daft-local-execution/src/pipeline.rs +++ /dev/null @@ -1,256 +0,0 @@ -use std::{borrow::BorrowMut, sync::Arc}; - -use async_trait::async_trait; -use common_error::DaftResult; -use daft_micropartition::MicroPartition; -use futures::{ - pin_mut, - stream::{FuturesOrdered, FuturesUnordered}, - StreamExt, -}; -use lazy_static::lazy_static; -use snafu::ResultExt; -use tokio::task::JoinHandle; -use tokio_stream::wrappers::ReceiverStream; - -use crate::{ - create_channel, - intermediate_ops::intermediate_op::IntermediateOperator, - sinks::sink::{Sink, SinkResultType}, - sources::source::{Source, SourceStream}, - JoinSnafu, Receiver, Sender, -}; - -lazy_static! { - pub static ref NUM_CPUS: usize = std::thread::available_parallelism().unwrap().get(); -} - -pub struct InnerPipelineManager { - senders: Vec, - curr_idx: usize, - handles: Vec>>, -} - -impl InnerPipelineManager { - pub fn new( - intermediate_operators: Vec>, - sink_senders: Vec, - ) -> Self { - let mut senders = vec![]; - let mut handles = vec![]; - - for sink_sender in sink_senders.into_iter() { - let (source_sender, source_receiver) = create_channel(); - let handle = tokio::spawn(Self::run_single_inner_pipeline( - intermediate_operators.clone(), - sink_sender, - source_receiver, - )); - - handles.push(handle); - senders.push(source_sender); - } - Self { - senders, - curr_idx: 0, - handles, - } - } - - async fn run_single_inner_pipeline( - intermediate_operators: Vec>, - sender: Sender, - mut receiver: Receiver, - ) -> DaftResult<()> { - log::debug!( - "Running intermediate operators: {}", - intermediate_operators - .iter() - .fold(String::new(), |acc, op| { acc + op.name() + " -> " }) - ); - - while let Some(morsel) = receiver.recv().await { - let mut result = morsel?; - for op in intermediate_operators.iter() { - result = op.execute(&result)?; - } - let _ = sender.send(Ok(result)).await; - } - - log::debug!("Intermediate operators finished"); - Ok(()) - } - - pub fn get_next_sender(&mut self) -> Sender { - let idx = self.curr_idx; - self.curr_idx = (self.curr_idx + 1) % self.senders.len(); - self.senders[idx].clone() - } - - pub fn retrieve_handles(self) -> Vec>> { - self.handles - } -} - -pub struct SinkManager { - sink: Option>, - send_to_next_source: Sender, -} - -impl SinkManager { - pub fn new(sink: Option>, send_to_next_source: Sender) -> Self { - Self { - sink, - send_to_next_source, - } - } - - async fn process_value( - &mut self, - val: DaftResult>, - ) -> DaftResult { - if let Some(sink) = self.sink.borrow_mut() { - sink.sink(&val?) - } else { - let _ = self.send_to_next_source.send(Ok(val?)).await; - Ok(SinkResultType::NeedMoreInput) - } - } - - async fn finalize_values(&mut self) -> DaftResult<()> { - if let Some(sink) = self.sink.borrow_mut() { - for part in sink.finalize()? { - let _ = self.send_to_next_source.send(Ok(part)).await; - } - } - Ok(()) - } - - pub async fn run(&mut self, mut receivers: Vec) -> DaftResult<()> { - let in_order = match self.sink { - Some(ref sink) => sink.in_order(), - None => false, - }; - log::debug!("Launching sink with in_order: {}", in_order); - - if in_order { - let mut in_order_receivers = receivers - .iter_mut() - .map(|r| r.recv()) - .collect::>(); - while let Some(val) = in_order_receivers.next().await { - if let Some(val) = val { - if matches!(self.process_value(val).await?, SinkResultType::Finished) { - break; - } - } - } - } else { - let mut unordered_receivers = receivers - .iter_mut() - .map(|r| r.recv()) - .collect::>(); - while let Some(val) = unordered_receivers.next().await { - if let Some(val) = val { - if matches!(self.process_value(val).await?, SinkResultType::Finished) { - break; - } - } - } - } - - self.finalize_values().await - } -} - -pub struct Pipeline { - source: Box, - intermediate_operators: Vec>, - sink: Option>, -} - -impl Pipeline { - pub fn new(source: Box) -> Self { - Self { - source, - intermediate_operators: vec![], - sink: None, - } - } - - pub fn with_intermediate_operator(mut self, op: Box) -> Self { - self.intermediate_operators.push(op); - self - } - - pub fn with_sink(mut self, sink: Box) -> Self { - self.sink = Some(sink); - self - } - - pub async fn run(&self, send_to_next_source: Sender) -> DaftResult<()> { - log::debug!("Running pipeline"); - - let mut all_handles = vec![]; - - // Initialize the channels to send morsels from the source to the inner pipelines, and from the inner pipelines to the sink - let (sink_senders, sink_receivers) = (0..*NUM_CPUS) - .map(|_| create_channel()) - .unzip::<_, _, Vec<_>, Vec<_>>(); - - // Spawn the sink manager - let sink = self.sink.clone(); - let sink_job = tokio::spawn(async move { - let mut sink_manager = SinkManager::new(sink, send_to_next_source); - sink_manager.run(sink_receivers).await - }); - all_handles.push(sink_job); - - // Create the inner pipeline manager and send the data from the source to the inner pipelines - let mut inner_pipeline_manager = - InnerPipelineManager::new(self.intermediate_operators.clone(), sink_senders); - - // Get the data from the source - let source_stream = self.source.get_data().await; - pin_mut!(source_stream); - while let Some(morsel) = source_stream.next().await { - let inner_pipeline_sender = inner_pipeline_manager.get_next_sender(); - let _ = inner_pipeline_sender.send(morsel).await; - } - let inner_pipeline_jobs = inner_pipeline_manager.retrieve_handles(); - all_handles.extend(inner_pipeline_jobs); - - // Wait for all the tasks to finish - let awaiting_all_handles = async { - for handle in all_handles.iter_mut() { - let _ = handle.await.context(JoinSnafu {}); - } - Ok(()) - }; - tokio::select! { - _ = tokio::signal::ctrl_c() => { - for handle in all_handles { - handle.abort(); - } - } - result = awaiting_all_handles => { - result.context(JoinSnafu {})?; - } - } - - log::debug!("Pipeline finished"); - Ok(()) - } -} - -#[async_trait] -impl Source for Pipeline { - async fn get_data(&self) -> SourceStream { - log::debug!("Pipeline::get_data"); - let (tx, rx) = create_channel(); - - let _ = self.run(tx.clone()).await; - - ReceiverStream::new(rx).boxed() - } -} diff --git a/src/daft-local-execution/src/run.rs b/src/daft-local-execution/src/run.rs index 3b1193da3b..96cc4e1c9a 100644 --- a/src/daft-local-execution/src/run.rs +++ b/src/daft-local-execution/src/run.rs @@ -1,9 +1,13 @@ use std::{collections::HashMap, sync::Arc}; use common_error::DaftResult; +use daft_dsl::Expr; use daft_micropartition::MicroPartition; -use daft_physical_plan::{translate, LocalPhysicalPlan}; -use futures::StreamExt; +use daft_physical_plan::{ + translate, Concat, Filter, InMemoryScan, Limit, LocalPhysicalPlan, PhysicalScan, Project, + UnGroupedAggregate, +}; +use daft_plan::populate_aggregation_stages; #[cfg(feature = "python")] use { @@ -12,7 +16,20 @@ use { pyo3::{pyclass, pymethods, IntoPy, PyObject, PyRef, PyRefMut, PyResult, Python}, }; -use crate::{create_pipeline::physical_plan_to_pipeline, sources::source::Source}; +use crate::{ + channel::{create_channel, MultiSender}, + intermediate_ops::{ + aggregate::AggregateOperator, filter::FilterOperator, intermediate_op::run_intermediate_op, + project::ProjectOperator, + }, + sinks::{ + aggregate::AggregateSink, + concat::ConcatSink, + limit::LimitSink, + sink::{run_double_input_sink, run_single_input_sink}, + }, + sources::{in_memory::InMemorySource, scan_task::ScanTaskSource, source::run_source}, +}; #[cfg(feature = "python")] #[pyclass] @@ -86,9 +103,93 @@ pub fn run_local( ) -> DaftResult>> + Send>> { let runtime = tokio::runtime::Runtime::new().expect("Failed to create tokio runtime"); let res = runtime.block_on(async { - let pipeline = physical_plan_to_pipeline(physical_plan, &psets); - let stream = pipeline.get_data().await; - stream.collect::>().await + let (sender, mut receiver) = create_channel(1, true); + run_physical_plan(physical_plan, &psets, sender); + let mut result = vec![]; + while let Some(val) = receiver.recv().await { + result.push(val); + } + result.into_iter() }); - Ok(Box::new(res.into_iter())) + Ok(Box::new(res)) +} + +pub fn run_physical_plan( + physical_plan: &LocalPhysicalPlan, + psets: &HashMap>>, + sender: MultiSender, +) { + match physical_plan { + LocalPhysicalPlan::PhysicalScan(PhysicalScan { scan_tasks, .. }) => { + run_source(Arc::new(ScanTaskSource::new(scan_tasks.clone())), sender); + } + LocalPhysicalPlan::InMemoryScan(InMemoryScan { info, .. }) => { + let partitions = psets.get(&info.cache_key).expect("Cache key not found"); + run_source(Arc::new(InMemorySource::new(partitions.clone())), sender); + } + LocalPhysicalPlan::Project(Project { + input, projection, .. + }) => { + let proj_op = ProjectOperator::new(projection.clone()); + let next_sender = run_intermediate_op(Box::new(proj_op), sender); + run_physical_plan(input, psets, next_sender); + } + LocalPhysicalPlan::Filter(Filter { + input, predicate, .. + }) => { + let filter_op = FilterOperator::new(predicate.clone()); + let next_sender = run_intermediate_op(Box::new(filter_op), sender); + run_physical_plan(input, psets, next_sender); + } + LocalPhysicalPlan::Limit(Limit { + input, num_rows, .. + }) => { + let sink = LimitSink::new(*num_rows as usize); + let sink_sender = run_single_input_sink(Box::new(sink), sender); + run_physical_plan(input, psets, sink_sender); + } + LocalPhysicalPlan::Concat(Concat { input, other, .. }) => { + let sink = ConcatSink::new(); + let (left_sender, right_sender) = run_double_input_sink(Box::new(sink), sender); + + run_physical_plan(input, psets, left_sender); + run_physical_plan(other, psets, right_sender); + } + LocalPhysicalPlan::UnGroupedAggregate(UnGroupedAggregate { + input, + aggregations, + schema, + .. + }) => { + let (first_stage_aggs, second_stage_aggs, final_exprs) = + populate_aggregation_stages(aggregations, schema, &[]); + + let final_stage_project = ProjectOperator::new(final_exprs); + let next_sender = run_intermediate_op(Box::new(final_stage_project), sender); + + let second_stage_agg_sink = AggregateSink::new( + second_stage_aggs + .values() + .cloned() + .map(|e| Arc::new(Expr::Agg(e.clone()))) + .collect(), + vec![], + ); + let next_sender = run_single_input_sink(Box::new(second_stage_agg_sink), next_sender); + + let first_stage_agg_op = AggregateOperator::new( + first_stage_aggs + .values() + .cloned() + .map(|e| Arc::new(Expr::Agg(e.clone()))) + .collect(), + vec![], + ); + let next_sender = run_intermediate_op(Box::new(first_stage_agg_op), next_sender); + run_physical_plan(input, psets, next_sender); + } + _ => { + unimplemented!("Physical plan not supported: {}", physical_plan.name()); + } + } } diff --git a/src/daft-local-execution/src/sinks/aggregate.rs b/src/daft-local-execution/src/sinks/aggregate.rs index 61da649f9c..5dce7e6c8d 100644 --- a/src/daft-local-execution/src/sinks/aggregate.rs +++ b/src/daft-local-execution/src/sinks/aggregate.rs @@ -4,7 +4,7 @@ use common_error::DaftResult; use daft_dsl::ExprRef; use daft_micropartition::MicroPartition; -use super::sink::{Sink, SinkResultType}; +use super::sink::{SingleInputSink, SinkResultType}; #[derive(Clone)] pub struct AggregateSink { @@ -23,7 +23,7 @@ impl AggregateSink { } } -impl Sink for AggregateSink { +impl SingleInputSink for AggregateSink { fn sink(&mut self, input: &Arc) -> DaftResult { log::debug!("AggregateSink::sink"); @@ -32,7 +32,7 @@ impl Sink for AggregateSink { } fn in_order(&self) -> bool { - false + true } fn finalize(&mut self) -> DaftResult>> { diff --git a/src/daft-local-execution/src/sinks/concat.rs b/src/daft-local-execution/src/sinks/concat.rs new file mode 100644 index 0000000000..d0531a041f --- /dev/null +++ b/src/daft-local-execution/src/sinks/concat.rs @@ -0,0 +1,55 @@ +use std::sync::Arc; + +use common_error::DaftResult; +use daft_micropartition::MicroPartition; + +use super::sink::{DoubleInputSink, SinkResultType}; + +#[derive(Clone)] +pub struct ConcatSink { + result_left: Vec>, + result_right: Vec>, +} + +impl ConcatSink { + pub fn new() -> Self { + Self { + result_left: Vec::new(), + result_right: Vec::new(), + } + } +} + +impl DoubleInputSink for ConcatSink { + fn sink_left(&mut self, input: &Arc) -> DaftResult { + log::debug!("Concat::sink_left"); + + self.result_left.push(input.clone()); + Ok(SinkResultType::NeedMoreInput) + } + + fn sink_right(&mut self, input: &Arc) -> DaftResult { + log::debug!("Concat::sink_right"); + + self.result_right.push(input.clone()); + Ok(SinkResultType::NeedMoreInput) + } + + fn in_order(&self) -> bool { + true + } + + fn finalize(&mut self) -> DaftResult>> { + log::debug!("Concat::finalize"); + Ok(self + .result_left + .clone() + .into_iter() + .chain(self.result_right.clone()) + .collect()) + } + + fn name(&self) -> &'static str { + "Concat" + } +} diff --git a/src/daft-local-execution/src/sinks/limit.rs b/src/daft-local-execution/src/sinks/limit.rs index b768daf378..44f2667243 100644 --- a/src/daft-local-execution/src/sinks/limit.rs +++ b/src/daft-local-execution/src/sinks/limit.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use common_error::DaftResult; use daft_micropartition::MicroPartition; -use super::sink::{Sink, SinkResultType}; +use super::sink::{SingleInputSink, SinkResultType}; #[derive(Clone)] pub struct LimitSink { @@ -22,7 +22,7 @@ impl LimitSink { } } -impl Sink for LimitSink { +impl SingleInputSink for LimitSink { fn sink(&mut self, input: &Arc) -> DaftResult { log::debug!("LimitSink::sink"); let input_num_rows = input.len(); @@ -45,7 +45,7 @@ impl Sink for LimitSink { } fn in_order(&self) -> bool { - true + false } fn finalize(&mut self) -> DaftResult>> { diff --git a/src/daft-local-execution/src/sinks/mod.rs b/src/daft-local-execution/src/sinks/mod.rs index 16ff97803c..e160aab9c0 100644 --- a/src/daft-local-execution/src/sinks/mod.rs +++ b/src/daft-local-execution/src/sinks/mod.rs @@ -1,3 +1,4 @@ pub mod aggregate; +pub mod concat; pub mod limit; pub mod sink; diff --git a/src/daft-local-execution/src/sinks/sink.rs b/src/daft-local-execution/src/sinks/sink.rs index bea706aeb8..62192b4c56 100644 --- a/src/daft-local-execution/src/sinks/sink.rs +++ b/src/daft-local-execution/src/sinks/sink.rs @@ -3,15 +3,147 @@ use std::sync::Arc; use common_error::DaftResult; use daft_micropartition::MicroPartition; +use crate::{ + channel::{create_channel, MultiReceiver, MultiSender}, + NUM_CPUS, +}; + pub enum SinkResultType { NeedMoreInput, Finished, } -pub trait Sink: dyn_clone::DynClone + Send + Sync { +pub trait SingleInputSink: Send + Sync + dyn_clone::DynClone { fn sink(&mut self, input: &Arc) -> DaftResult; fn in_order(&self) -> bool; fn finalize(&mut self) -> DaftResult>>; } -dyn_clone::clone_trait_object!(Sink); +dyn_clone::clone_trait_object!(SingleInputSink); + +pub struct SingleInputSinkActor { + sink: Box, + receiver: MultiReceiver, + sender: MultiSender, +} + +impl SingleInputSinkActor { + pub fn new( + sink: Box, + receiver: MultiReceiver, + sender: MultiSender, + ) -> Self { + Self { + sink, + receiver, + sender, + } + } + + pub async fn run(&mut self) -> DaftResult<()> { + while let Some(val) = self.receiver.recv().await { + let sink_result = self.sink.sink(&val?)?; + match sink_result { + SinkResultType::NeedMoreInput => { + continue; + } + SinkResultType::Finished => { + break; + } + } + } + + let finalized_values = self.sink.finalize()?; + for val in finalized_values { + let _ = self.sender.get_next_sender().send(Ok(val)).await; + } + Ok(()) + } +} + +pub fn run_single_input_sink(sink: Box, send_to: MultiSender) -> MultiSender { + let (sender, receiver) = create_channel(*NUM_CPUS, sink.in_order()); + let mut actor = SingleInputSinkActor::new(sink, receiver, send_to); + tokio::spawn(async move { + let _ = actor.run().await; + }); + sender +} + +pub trait DoubleInputSink: Send + Sync + dyn_clone::DynClone { + fn sink_left(&mut self, input: &Arc) -> DaftResult; + fn sink_right(&mut self, input: &Arc) -> DaftResult; + fn in_order(&self) -> bool; + fn finalize(&mut self) -> DaftResult>>; + fn name(&self) -> &'static str; +} + +dyn_clone::clone_trait_object!(DoubleInputSink); + +pub struct DoubleInputSinkActor { + sink: Box, + left_receiver: MultiReceiver, + right_receiver: MultiReceiver, + sender: MultiSender, +} + +impl DoubleInputSinkActor { + pub fn new( + sink: Box, + left_receiver: MultiReceiver, + right_receiver: MultiReceiver, + sender: MultiSender, + ) -> Self { + Self { + sink, + left_receiver, + right_receiver, + sender, + } + } + + pub async fn run(&mut self) -> DaftResult<()> { + while let Some(val) = self.left_receiver.recv().await { + let sink_result = self.sink.sink_left(&val?)?; + match sink_result { + SinkResultType::NeedMoreInput => { + continue; + } + SinkResultType::Finished => { + break; + } + } + } + + while let Some(val) = self.right_receiver.recv().await { + let sink_result = self.sink.sink_right(&val?)?; + match sink_result { + SinkResultType::NeedMoreInput => { + continue; + } + SinkResultType::Finished => { + break; + } + } + } + + let finalized_values = self.sink.finalize()?; + for val in finalized_values { + let _ = self.sender.get_next_sender().send(Ok(val)).await; + } + Ok(()) + } +} + +pub fn run_double_input_sink( + sink: Box, + send_to: MultiSender, +) -> (MultiSender, MultiSender) { + let (left_sender, left_receiver) = create_channel(*NUM_CPUS, sink.in_order()); + let (right_sender, right_receiver) = create_channel(*NUM_CPUS, sink.in_order()); + let mut actor = DoubleInputSinkActor::new(sink, left_receiver, right_receiver, send_to); + tokio::spawn(async move { + let _ = actor.run().await; + }); + (left_sender, right_sender) +} diff --git a/src/daft-local-execution/src/sources/in_memory.rs b/src/daft-local-execution/src/sources/in_memory.rs index b81cdf351d..aa19b7aae4 100644 --- a/src/daft-local-execution/src/sources/in_memory.rs +++ b/src/daft-local-execution/src/sources/in_memory.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use async_trait::async_trait; use daft_micropartition::MicroPartition; use futures::{stream, StreamExt}; @@ -16,9 +15,8 @@ impl InMemorySource { } } -#[async_trait] impl Source for InMemorySource { - async fn get_data(&self) -> SourceStream { + fn get_data(&self) -> SourceStream { log::debug!("InMemorySource::get_data"); stream::iter(self.data.clone().into_iter().map(Ok)).boxed() } diff --git a/src/daft-local-execution/src/sources/scan_task.rs b/src/daft-local-execution/src/sources/scan_task.rs index e45dd4df36..8203c7b4c0 100644 --- a/src/daft-local-execution/src/sources/scan_task.rs +++ b/src/daft-local-execution/src/sources/scan_task.rs @@ -1,10 +1,12 @@ use std::sync::Arc; -use async_trait::async_trait; use daft_io::IOStatsContext; use daft_micropartition::MicroPartition; use daft_scan::ScanTask; use futures::{stream, StreamExt}; +use snafu::ResultExt; + +use crate::JoinSnafu; use super::source::{Source, SourceStream}; @@ -18,20 +20,20 @@ impl ScanTaskSource { } } -#[async_trait] impl Source for ScanTaskSource { - async fn get_data(&self) -> SourceStream { + fn get_data(&self) -> SourceStream { log::debug!("ScanTaskSource::get_data"); - let stream = stream::iter(self.scan_tasks.clone().into_iter().map(|scan_task| { - let io_stats = IOStatsContext::new("MicroPartition::from_scan_task"); - let out = - std::thread::spawn(move || MicroPartition::from_scan_task(scan_task, io_stats)) - .join() - .expect("Failed to join thread")?; - + let stream = stream::iter(self.scan_tasks.clone().into_iter().map(|scan_task| async { + tokio::task::spawn_blocking(move || { + let io_stats = IOStatsContext::new("MicroPartition::from_scan_task"); + MicroPartition::from_scan_task(scan_task, io_stats) + .map(Arc::new) + .map_err(Into::into) + }) + .await + .context(JoinSnafu {})? // TODO: Implement dynamic splitting / merging of MicroPartition from scan task - Ok(Arc::new(out)) })); - stream.boxed() + stream.buffered(self.scan_tasks.len()).boxed() } } diff --git a/src/daft-local-execution/src/sources/source.rs b/src/daft-local-execution/src/sources/source.rs index 0f84780a4c..f8465fba6f 100644 --- a/src/daft-local-execution/src/sources/source.rs +++ b/src/daft-local-execution/src/sources/source.rs @@ -1,13 +1,39 @@ use std::sync::Arc; -use async_trait::async_trait; use common_error::DaftResult; use daft_micropartition::MicroPartition; -use futures::stream::BoxStream; +use futures::{stream::BoxStream, StreamExt}; + +use crate::channel::MultiSender; pub type SourceStream<'a> = BoxStream<'a, DaftResult>>; -#[async_trait] pub trait Source: Send + Sync { - async fn get_data(&self) -> SourceStream; + fn get_data(&self) -> SourceStream; +} + +pub struct SourceActor { + source: Arc, + sender: MultiSender, +} + +impl SourceActor { + pub fn new(source: Arc, sender: MultiSender) -> Self { + Self { source, sender } + } + + pub async fn run(&mut self) -> DaftResult<()> { + let mut source_stream = self.source.get_data(); + while let Some(val) = source_stream.next().await { + let _ = self.sender.get_next_sender().send(val).await; + } + Ok(()) + } +} + +pub fn run_source(source: Arc, sender: MultiSender) { + let mut actor = SourceActor::new(source, sender); + tokio::spawn(async move { + let _ = actor.run().await; + }); } diff --git a/src/daft-physical-plan/src/lib.rs b/src/daft-physical-plan/src/lib.rs index f82c6796f1..195ea5457b 100644 --- a/src/daft-physical-plan/src/lib.rs +++ b/src/daft-physical-plan/src/lib.rs @@ -3,7 +3,7 @@ mod local_plan; mod translate; pub use local_plan::{ - Filter, HashAggregate, HashJoin, InMemoryScan, Limit, LocalPhysicalPlan, LocalPhysicalPlanRef, - PhysicalScan, PhysicalWrite, Project, Sort, UnGroupedAggregate, + Concat, Filter, HashAggregate, HashJoin, InMemoryScan, Limit, LocalPhysicalPlan, + LocalPhysicalPlanRef, PhysicalScan, PhysicalWrite, Project, Sort, UnGroupedAggregate, }; pub use translate::translate; diff --git a/src/daft-physical-plan/src/local_plan.rs b/src/daft-physical-plan/src/local_plan.rs index a0013b7492..92d54ef1dd 100644 --- a/src/daft-physical-plan/src/local_plan.rs +++ b/src/daft-physical-plan/src/local_plan.rs @@ -30,7 +30,7 @@ pub enum LocalPhysicalPlan { UnGroupedAggregate(UnGroupedAggregate), HashAggregate(HashAggregate), // Pivot(Pivot), - // Concat(Concat), + Concat(Concat), HashJoin(HashJoin), // SortMergeJoin(SortMergeJoin), // BroadcastJoin(BroadcastJoin), @@ -142,6 +142,21 @@ impl LocalPhysicalPlan { }) .arced() } + + pub(crate) fn concat( + input: LocalPhysicalPlanRef, + other: LocalPhysicalPlanRef, + schema: SchemaRef, + ) -> LocalPhysicalPlanRef { + LocalPhysicalPlan::Concat(Concat { + input, + other, + schema, + plan_stats: PlanStats {}, + }) + .arced() + } + pub(crate) fn schema(&self) -> &SchemaRef { match self { LocalPhysicalPlan::PhysicalScan(PhysicalScan { schema, .. }) @@ -216,6 +231,16 @@ pub struct HashAggregate { #[derive(Debug)] pub struct HashJoin {} + +#[derive(Debug)] + +pub struct Concat { + pub input: LocalPhysicalPlanRef, + pub other: LocalPhysicalPlanRef, + pub schema: SchemaRef, + pub plan_stats: PlanStats, +} + #[derive(Debug)] pub struct PhysicalWrite {} diff --git a/src/daft-physical-plan/src/translate.rs b/src/daft-physical-plan/src/translate.rs index 786aefe02b..805960fc0a 100644 --- a/src/daft-physical-plan/src/translate.rs +++ b/src/daft-physical-plan/src/translate.rs @@ -56,6 +56,12 @@ pub fn translate(plan: &LogicalPlanRef) -> DaftResult { )) } } + LogicalPlan::Concat(concat) => { + let schema = concat.input.schema().clone(); + let input = translate(&concat.input)?; + let other = translate(&concat.other)?; + Ok(LocalPhysicalPlan::concat(input, other, schema)) + } _ => todo!("{} not yet implemented", plan.name()), } }