Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(scheduler): enable per FHE computation error return from scheduler #294

Merged
merged 3 commits into from
Feb 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 28 additions & 11 deletions fhevm-engine/coprocessor/src/tfhe_worker.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
use crate::types::CoprocessorError;
use crate::{db_queries::populate_cache_with_tenant_keys, types::TfheTenantKeys};
use fhevm_engine_common::types::{Handle, SupportedFheCiphertexts};
use fhevm_engine_common::types::{FhevmError, Handle, SupportedFheCiphertexts};
use fhevm_engine_common::{tfhe_ops::current_ciphertext_version, types::SupportedFheOperations};
use itertools::Itertools;
use lazy_static::lazy_static;
use opentelemetry::trace::{Span, TraceContextExt, Tracer};
use opentelemetry::KeyValue;
use prometheus::{register_int_counter, IntCounter};
use scheduler::dfg::types::SchedulerError;
use scheduler::dfg::{scheduler::Scheduler, types::DFGTaskInput, DFGraph};
use sqlx::{postgres::PgListener, query, Acquire};
use std::borrow::Borrow;
use std::{
collections::{BTreeSet, HashMap},
num::NonZeroUsize,
Expand Down Expand Up @@ -357,29 +360,43 @@ async fn tfhe_worker_cycle(
let keys = rk.get(tenant_id).expect("Can't get tenant key from cache");

// Schedule computations in parallel as dependences allow
tfhe::set_server_key(keys.sks.clone());
let mut sched = Scheduler::new(&mut graph.graph, args.coprocessor_fhe_threads);
sched.schedule(keys.sks.clone()).await?;
}
// Extract the results from the graph
let res = graph.get_results().unwrap();
let mut res = graph.get_results();

for (idx, w) in work.iter().enumerate() {
// Filter out computations that could not complete
if uncomputable.contains_key(&idx) {
continue;
}
let r = res.iter().find(|(h, _)| *h == w.output_handle).unwrap();
{
let mut rk = tenant_key_cache.write().await;
let keys = rk
.get(&w.tenant_id)
.expect("Can't get tenant key from cache");
tfhe::set_server_key(keys.sks.clone());
}
let r = &mut res
.iter_mut()
.find(|(h, _)| *h == w.output_handle)
.unwrap()
.1;

let finished_work_unit: Result<
_,
(Box<(dyn std::error::Error + Send + Sync)>, i32, Vec<u8>),
> = Ok((w, r.1 .1, &r.1 .2));
> = r
.as_mut()
.map(|rok| (w, rok.0, std::mem::take(&mut rok.1)))
.map_err(|rerr| {
(
CoprocessorError::SchedulerError(
*rerr
.downcast_ref::<SchedulerError>()
.or(Some(&SchedulerError::SchedulerError))
.unwrap(),
)
.into(),
w.tenant_id,
w.output_handle.clone(),
)
});
match finished_work_unit {
Ok((w, db_type, db_bytes)) => {
let mut s = tracer.start_with_context("insert_ct_into_db", &loop_ctx);
Expand Down
18 changes: 13 additions & 5 deletions fhevm-engine/executor/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,21 @@ impl FhevmExecutor for FhevmExecutorService {
now.elapsed().unwrap().as_millis()
);
// Extract the results from the graph
match graph.get_results() {
Ok(mut result_cts) => Some(Resp::ResultCiphertexts(ResultCiphertexts {
ciphertexts: result_cts
let results = graph.get_results();
let outputs: Result<Vec<(Handle, (i16, Vec<u8>))>> = results
.into_iter()
.map(|(h, output)| match output {
Ok(output) => Ok((h, output)),
Err(e) => Err(e),
})
.collect();
match outputs {
Ok(mut outputs) => Some(Resp::ResultCiphertexts(ResultCiphertexts {
ciphertexts: outputs
.iter_mut()
.map(|(h, ct)| CompressedCiphertext {
.map(|(h, output)| CompressedCiphertext {
handle: h.clone(),
serialization: std::mem::take(&mut ct.2),
serialization: std::mem::take(&mut output.1),
})
.collect(),
})),
Expand Down
19 changes: 12 additions & 7 deletions fhevm-engine/scheduler/src/dfg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,23 @@ impl DFGraph {
Ok(())
}

pub fn get_results(
&mut self,
) -> Result<Vec<(Handle, (SupportedFheCiphertexts, i16, Vec<u8>))>> {
pub fn get_results(&mut self) -> Vec<(Handle, Result<(i16, Vec<u8>)>)> {
let mut res = Vec::with_capacity(self.graph.node_count());
for index in 0..self.graph.node_count() {
let node = self.graph.node_weight_mut(NodeIndex::new(index)).unwrap();
if let Some(ct) = &node.result {
res.push((node.result_handle.clone(), ct.clone()));
if let Some(ct) = std::mem::take(&mut node.result) {
if let Ok(ct) = ct {
res.push((node.result_handle.clone(), Ok((ct.1, ct.2))));
} else {
res.push((node.result_handle.clone(), Err(ct.err().unwrap())));
}
} else {
return Err(SchedulerError::DataflowGraphError.into());
res.push((
node.result_handle.clone(),
Err(SchedulerError::DataflowGraphError.into()),
));
}
}
Ok(res)
res
}
}
Loading
Loading