From 50f949e427b69631d7c3c49c2f5e6607e8bb3fb1 Mon Sep 17 00:00:00 2001 From: Antoniu Pop Date: Wed, 16 Oct 2024 13:33:11 +0100 Subject: [PATCH] fix comments from DK --- fhevm-engine/executor/src/cli.rs | 3 ++ fhevm-engine/executor/src/dfg/scheduler.rs | 33 ++++++++++--------- fhevm-engine/executor/src/server.rs | 3 +- .../executor/tests/scheduling_mapping.rs | 7 ++-- 4 files changed, 27 insertions(+), 19 deletions(-) diff --git a/fhevm-engine/executor/src/cli.rs b/fhevm-engine/executor/src/cli.rs index 27b86d1a..7fc813c4 100644 --- a/fhevm-engine/executor/src/cli.rs +++ b/fhevm-engine/executor/src/cli.rs @@ -9,6 +9,9 @@ pub struct Args { #[arg(long, default_value_t = 96)] pub fhe_compute_threads: usize, + #[arg(long, default_value_t = 8)] + pub fhe_operation_threads: usize, + #[arg(long, default_value = "127.0.0.1:50051")] pub server_addr: String, } diff --git a/fhevm-engine/executor/src/dfg/scheduler.rs b/fhevm-engine/executor/src/dfg/scheduler.rs index dbf4b21e..286e8e79 100644 --- a/fhevm-engine/executor/src/dfg/scheduler.rs +++ b/fhevm-engine/executor/src/dfg/scheduler.rs @@ -76,18 +76,19 @@ impl<'a> Scheduler<'a> { pub async fn schedule(&mut self) -> Result<(), SyncComputeError> { let schedule_type = std::env::var("FHEVM_DF_SCHEDULE"); match schedule_type { - Ok(val) if val == "MAX_PARALLELISM" => { - self.schedule_coarse_grain(PartitionStrategy::MaxParallelism) - .await - } - Ok(val) if val == "MAX_LOCALITY" => { - self.schedule_coarse_grain(PartitionStrategy::MaxLocality) - .await - } - Ok(val) if val == "LOOP" => self.schedule_component_loop().await, - Ok(val) if val == "FINE_GRAIN" => self.schedule_fine_grain().await, - Ok(unhandled) => panic!("Scheduling strategy {:?} does not exist", unhandled), - + Ok(val) => match val.as_str() { + "MAX_PARALLELISM" => { + self.schedule_coarse_grain(PartitionStrategy::MaxParallelism) + .await + } + "MAX_LOCALITY" => { + self.schedule_coarse_grain(PartitionStrategy::MaxLocality) + .await + } + "LOOP" => self.schedule_component_loop().await, + "FINE_GRAIN" => self.schedule_fine_grain().await, + unhandled => panic!("Scheduling strategy {:?} does not exist", unhandled), + }, _ => self.schedule_component_loop().await, } } @@ -151,14 +152,14 @@ impl<'a> Scheduler<'a> { Result<(Vec<(usize, InMemoryCiphertext)>, NodeIndex), SyncComputeError>, > = JoinSet::new(); let mut execution_graph: Dag = Dag::default(); - match strategy { + let _ = match strategy { PartitionStrategy::MaxLocality => { - let _ = partition_components(self.graph, &mut execution_graph); + partition_components(self.graph, &mut execution_graph) } PartitionStrategy::MaxParallelism => { - let _ = partition_preserving_parallelism(self.graph, &mut execution_graph); + partition_preserving_parallelism(self.graph, &mut execution_graph) } - } + }; let task_dependences = execution_graph.map(|_, _| (), |_, edge| *edge); // Prime the scheduler with all nodes without dependences diff --git a/fhevm-engine/executor/src/server.rs b/fhevm-engine/executor/src/server.rs index 0e450e92..9014c274 100644 --- a/fhevm-engine/executor/src/server.rs +++ b/fhevm-engine/executor/src/server.rs @@ -36,6 +36,7 @@ thread_local! { pub fn start(args: &crate::cli::Args) -> Result<()> { let keys: FhevmKeys = SerializedFhevmKeys::load_from_disk().into(); let executor = FhevmExecutorService::new(); + let rayon_threads = args.fhe_operation_threads; rayon::broadcast(|_| { set_server_key(keys.server_key.clone()); }); @@ -45,7 +46,7 @@ pub fn start(args: &crate::cli::Args) -> Result<()> { .on_thread_start(move || { set_server_key(keys.server_key.clone()); let rayon_pool = rayon::ThreadPoolBuilder::new() - .num_threads(8) + .num_threads(rayon_threads) .build() .unwrap(); rayon_pool.broadcast(|_| { diff --git a/fhevm-engine/executor/tests/scheduling_mapping.rs b/fhevm-engine/executor/tests/scheduling_mapping.rs index 43a0308a..48f1169b 100644 --- a/fhevm-engine/executor/tests/scheduling_mapping.rs +++ b/fhevm-engine/executor/tests/scheduling_mapping.rs @@ -14,8 +14,11 @@ use utils::get_test; mod utils; fn get_handle(h: u32) -> Vec { - let tmp = [h; HANDLE_LEN / 4]; - let res: [u8; HANDLE_LEN] = unsafe { std::mem::transmute(tmp) }; + let mut res: Vec = Vec::with_capacity(HANDLE_LEN); + let slice: [u8; 4] = h.to_be_bytes(); + for _i in 0..HANDLE_LEN / 4 { + res.extend_from_slice(&slice); + } res.to_vec() }