Skip to content

Commit

Permalink
Initial implementation of both sns-executor and sns-worker (#277)
Browse files Browse the repository at this point in the history
* feat: duplicate switch_and_squash algorithm

* feat: implement a scalable sns executor

* feat: implement a sns-executor

* feat: support to_regular_ciphertext for all types

* chore: add sns-executor to the workspace

* fix: enable to_regular_ciphertext for FheBool

* fix: update SQL queries to use is_allowed, is_sent columns

 * fix: make retry-acquire conn cancellable

* fix: add db migration for large_ct, is_sent, is_allowed columns

* docs: add readme.md

* fix: minor improvements and fixes

- add decrypt_128 feature
- support polling_interval, max_connections CLI args
- rollback db txn, if no tasks available
- use pg_notify to avoid SQL inject

* chore: rename to_regular_ciphertext to to_ciphertext64

* fix: handle sigint signal
  • Loading branch information
goshawk-3 authored Feb 4, 2025
1 parent 685e253 commit 95b43ee
Show file tree
Hide file tree
Showing 12 changed files with 868 additions and 1 deletion.
3 changes: 2 additions & 1 deletion fhevm-engine/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[workspace]
resolver = "2"
members = ["coprocessor", "executor", "fhevm-engine-common", "listener"]
members = ["coprocessor", "executor", "fhevm-engine-common", "listener", "sns-executor"]


[workspace.package]
authors = ["Zama"]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ALTER TABLE ciphertexts
ADD COLUMN IF NOT EXISTS large_ct BYTEA,
ADD COLUMN IF NOT EXISTS is_sent BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN IF NOT EXISTS is_allowed BOOLEAN NOT NULL DEFAULT FALSE;
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
ALTER TABLE ciphertexts
ADD COLUMN IF NOT EXISTS large_ct BYTEA,
ADD COLUMN IF NOT EXISTS is_sent BOOLEAN NOT NULL DEFAULT FALSE,
ADD COLUMN IF NOT EXISTS is_allowed BOOLEAN NOT NULL DEFAULT FALSE;
24 changes: 24 additions & 0 deletions fhevm-engine/fhevm-engine-common/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use anyhow::Result;
use bigdecimal::num_bigint::BigInt;
use tfhe::integer::bigint::StaticUnsignedBigInt;
use tfhe::integer::ciphertext::BaseRadixCiphertext;
use tfhe::integer::U256;
use tfhe::prelude::{CiphertextList, FheDecrypt};
use tfhe::shortint::Ciphertext;
use tfhe::{CompressedCiphertextList, CompressedCiphertextListBuilder};

use crate::utils::{safe_deserialize, safe_serialize};
Expand Down Expand Up @@ -389,6 +391,28 @@ impl SupportedFheCiphertexts {
}
}

pub fn to_ciphertext64(self) -> BaseRadixCiphertext<Ciphertext> {
match self {
SupportedFheCiphertexts::FheBool(v) => {
BaseRadixCiphertext::from(vec![v.into_raw_parts()])
}
SupportedFheCiphertexts::FheUint4(v) => v.into_raw_parts().0,
SupportedFheCiphertexts::FheUint8(v) => v.into_raw_parts().0,
SupportedFheCiphertexts::FheUint16(v) => v.into_raw_parts().0,
SupportedFheCiphertexts::FheUint32(v) => v.into_raw_parts().0,
SupportedFheCiphertexts::FheUint64(v) => v.into_raw_parts().0,
SupportedFheCiphertexts::FheUint128(v) => v.into_raw_parts().0,
SupportedFheCiphertexts::FheUint160(v) => v.into_raw_parts().0,
SupportedFheCiphertexts::FheUint256(v) => v.into_raw_parts().0,
SupportedFheCiphertexts::FheBytes64(v) => v.into_raw_parts().0,
SupportedFheCiphertexts::FheBytes128(v) => v.into_raw_parts().0,
SupportedFheCiphertexts::FheBytes256(v) => v.into_raw_parts().0,
SupportedFheCiphertexts::Scalar(_) => {
panic!("scalar cannot be converted to regular ciphertext")
}
}
}

pub fn type_num(&self) -> i16 {
match self {
// values taken to match with solidity library
Expand Down
45 changes: 45 additions & 0 deletions fhevm-engine/sns-executor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
[package]
name = "sns-executor"
version = "0.1.0"
authors.workspace = true
edition.workspace = true
license.workspace = true

[dependencies]
# workspace dependencies
bincode = { workspace = true }
clap = { workspace = true }
prometheus = { workspace = true }
prost = { workspace = true }
rayon = { workspace = true }
sha3 = { workspace = true }
tokio = { workspace = true }
tonic = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
anyhow = { workspace = true }
serde = { workspace = true }
hex = "0.4"

aligned-vec = "0.5.0"
num-traits = "=0.2.19"
sqlx = { version = "0.7", features = ["runtime-tokio", "tls-rustls", "postgres", "uuid"] }

serde_json = "=1.0"

# local dependencies
fhevm-engine-common = { path = "../fhevm-engine-common" }

# arch-specific dependencies
[target.'cfg(target_arch = "x86_64")'.dependencies]
tfhe = { workspace = true, features = ["x86_64-unix"] }
[target.'cfg(target_arch = "aarch64")'.dependencies]
tfhe = { workspace = true, features = ["aarch64-unix"] }

[[bin]]
name = "sns_worker"
path = "src/bin/sns_worker.rs"


[features]
decrypt_128 = []
29 changes: 29 additions & 0 deletions fhevm-engine/sns-executor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# SnS executor

## Description

### Library crate (sns-executor)

Executes a loop that:
- Retrieves `(handle, compressed_ct)` pairs from PG table.ciphertexts marked as `allowed`.
- Computes `large_ct` using the SnS algorithm.
- Updates the `large_ct` column corresponding to the specified handle.
- Sends a signal indicating the availability of newly computed `large_ct`.

#### Features
**decrypt_128** - Decrypt each `large_ct` and print it as a plaintext (for testing purposes only).

### Binary (sns-worker)

Runs sns-executor. See also `src/bin/utils/daemon_cli.rs`


## How to run a sns-worker

```
DATABASE_URL=postgresql://postgres:postgres@localhost:5432/coprocessor \
cargo run --release -- \
--pg-listen-channel "allowed_handles" \
--pg-notify-channel "computed_handles" \
--keys-file-path "./default_keys.bin"
```
56 changes: 56 additions & 0 deletions fhevm-engine/sns-executor/src/bin/sns_worker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
use serde::{de::DeserializeOwned, Serialize};
use sns_executor::DBConfig;
use std::fs;
use tokio::{signal::unix, sync::broadcast};

mod utils;

fn read_element<T: DeserializeOwned + Serialize>(file_path: String) -> anyhow::Result<T> {
let read_element = fs::read(file_path.clone())?;
Ok(bincode::deserialize_from(read_element.as_slice())?)
}

fn handle_sigint(cancel_tx: broadcast::Sender<()>) {
tokio::spawn(async move {
let mut signal = unix::signal(unix::SignalKind::interrupt()).unwrap();
signal.recv().await;
cancel_tx.send(()).unwrap();
});
}

#[tokio::main]
async fn main() {
let args = utils::daemon_cli::parse_args();

// Read keys from the file path, if specified
let mut keys = None;
if let Some(path) = args.keys_file_path {
keys = Some(read_element(path).expect("Failed to read keys."));
}

let db_url = args
.database_url
.clone()
.unwrap_or_else(|| std::env::var("DATABASE_URL").expect("DATABASE_URL is undefined"));

tracing_subscriber::fmt().json().with_level(true).init();

let conf = sns_executor::Config {
db: DBConfig {
url: db_url,
listen_channel: args.pg_listen_channel,
notify_channel: args.pg_notify_channel,
batch_limit: args.work_items_batch_size,
polling_interval: args.pg_polling_interval,
max_connections: args.pg_pool_connections,
},
};

// Handle SIGINIT signals
let (cancel_tx, cancel_rx) = broadcast::channel(1);
handle_sigint(cancel_tx);

if let Err(err) = sns_executor::run(keys, &conf, cancel_rx).await {
tracing::error!("Worker failed: {:?}", err);
}
}
41 changes: 41 additions & 0 deletions fhevm-engine/sns-executor/src/bin/utils/daemon_cli.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
use clap::{command, Parser};

#[derive(Parser, Debug, Clone)]
#[command(version, about, long_about = None)]
pub struct Args {
/// Work items batch size
#[arg(long, default_value_t = 4)]
pub work_items_batch_size: u32,

/// NOTIFY/LISTEN channel for database that the worker listen to
#[arg(long)]
pub pg_listen_channel: String,

/// NOTIFY/LISTEN channel for database that the worker notify to
#[arg(long)]
pub pg_notify_channel: String,

/// Polling interval in seconds
#[arg(long, default_value_t = 60)]
pub pg_polling_interval: u32,

/// Postgres pool connections
#[arg(long, default_value_t = 10)]
pub pg_pool_connections: u32,

/// Postgres database url. If unspecified DATABASE_URL environment variable is used
#[arg(long)]
pub database_url: Option<String>,

/// KeySet file. If unspecified the the keys are read from the database (not implemented)
#[arg(long)]
pub keys_file_path: Option<String>,

/// sns-executor service name in OTLP traces (not implemented)
#[arg(long, default_value = "sns-executor")]
pub service_name: String,
}

pub fn parse_args() -> Args {
Args::parse()
}
1 change: 1 addition & 0 deletions fhevm-engine/sns-executor/src/bin/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod daemon_cli;
Loading

0 comments on commit 95b43ee

Please sign in to comment.