Skip to content

Commit

Permalink
refactor: allow multiple reducer types (#152)
Browse files Browse the repository at this point in the history
  • Loading branch information
scarmuega authored Dec 15, 2023
1 parent b075a3d commit c3e5902
Show file tree
Hide file tree
Showing 22 changed files with 243 additions and 107 deletions.
13 changes: 4 additions & 9 deletions src/bin/scrolls/daemon.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
use clap;

use gasket::runtime::Tether;
use scrolls::{
enrich,
framework::*,
reducers::{self, ConfigTrait},
sources, storage,
};
use scrolls::{enrich, framework::*, reducers, sources, storage};
use serde::Deserialize;
use std::{collections::VecDeque, time::Duration};
use tracing::{info, warn};
Expand All @@ -19,7 +14,7 @@ use crate::console;
struct ConfigRoot {
source: sources::Config,
enrich: Option<enrich::Config>,
reducers: Vec<reducers::Config>,
reducer: reducers::Config,
storage: storage::Config,
intersect: IntersectConfig,
finalize: Option<FinalizeConfig>,
Expand Down Expand Up @@ -126,7 +121,7 @@ fn chain_stages<'a>(
fn bootstrap(
mut source: sources::Bootstrapper,
mut enrich: enrich::Bootstrapper,
mut reducer: reducers::Stage,
mut reducer: reducers::Bootstrapper,
mut storage: storage::Bootstrapper,
policy: gasket::runtime::Policy,
) -> Result<Runtime, Error> {
Expand Down Expand Up @@ -168,7 +163,7 @@ pub fn run(args: &Args) -> Result<(), Error> {
.unwrap_or(enrich::Config::default())
.bootstrapper(&ctx)?;

let reducer = config.reducers.bootstrapper(&ctx)?;
let reducer = config.reducer.bootstrapper(&ctx)?;
let storage = config.storage.bootstrapper(&ctx)?;

let retries = define_gasket_policy(config.retries.as_ref());
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
123 changes: 123 additions & 0 deletions src/reducers/builtin/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
use gasket::framework::*;
use gasket::{
messaging::{RecvPort, SendPort},
runtime::Tether,
};
use pallas::ledger::traverse::MultiEraBlock;
use serde::Deserialize;

use crate::framework::model::CRDTCommand;
use crate::framework::*;

mod full_utxos_by_address;

#[derive(Deserialize)]
#[serde(tag = "type")]
pub enum ReducerConfig {
FullUtxosByAddress(full_utxos_by_address::Config),
}

impl ReducerConfig {
pub fn into_reducer(self) -> Box<dyn ReducerTrait> {
match self {
ReducerConfig::FullUtxosByAddress(x) => x.plugin(),
}
}
}

#[derive(Deserialize)]
pub struct Config {
reducers: Vec<ReducerConfig>,
}

impl Config {
pub fn bootstrapper(self, _ctx: &Context) -> Result<Stage, Error> {
let stage = Stage {
reducers: self
.reducers
.into_iter()
.map(|x| x.into_reducer())
.collect(),
..Default::default()
};

Ok(stage)
}
}

#[derive(Default, Stage)]
#[stage(name = "reducer", unit = "ChainEvent", worker = "Worker")]
pub struct Stage {
reducers: Vec<Box<dyn ReducerTrait>>,

pub input: ReducerInputPort,
pub output: ReducerOutputPort,

#[metric]
ops_count: gasket::metrics::Counter,
}

impl StageBootstrapper for Stage {
fn connect_input(&mut self, adapter: InputAdapter) {
self.input.connect(adapter)
}

fn connect_output(&mut self, adapter: OutputAdapter) {
self.output.connect(adapter)
}

fn spawn(self, policy: gasket::runtime::Policy) -> Tether {
gasket::runtime::spawn_stage(self, policy)
}
}

#[derive(Default)]
pub struct Worker;

impl From<&Stage> for Worker {
fn from(_: &Stage) -> Self {
Self
}
}

gasket::impl_splitter!(|_worker: Worker, stage: Stage, unit: ChainEvent| => {
let record = unit.record();
if record.is_none() {
return Ok(());
}

let record = record.unwrap();

let commands = match record {
Record::EnrichedBlockPayload(block, ctx) => {
let block = MultiEraBlock::decode(block)
.map_err(Error::cbor)
// .apply_policy(&self.policy)
.or_panic()?;

let mut commands: Vec<CRDTCommand> = Vec::new();

for x in stage.reducers.iter_mut() {
commands.append(&mut x.reduce_block(&block, ctx).await.or_retry()?)
}

Ok(commands)
},
_ => todo!(),
}?;

Some(ChainEvent::apply(unit.point().clone(), Record::CRDTCommand(commands)))
});

#[async_trait::async_trait]
pub trait ReducerTrait: Send + Sync {
async fn reduce_block<'b>(
&mut self,
block: &'b MultiEraBlock<'b>,
ctx: &model::BlockContext,
) -> Result<Vec<CRDTCommand>, Error>;
}

trait ReducerConfigTrait {
fn plugin(self) -> Box<dyn ReducerTrait>;
}
86 changes: 86 additions & 0 deletions src/reducers/deno/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use gasket::framework::*;
use gasket::{
messaging::{RecvPort, SendPort},
runtime::Tether,
};
use pallas::ledger::traverse::MultiEraBlock;
use serde::Deserialize;

use crate::framework::model::CRDTCommand;
use crate::framework::*;

#[derive(Deserialize)]

pub struct Config {
// TODO: specify javascript file
}

impl Config {
pub fn bootstrapper(self, _ctx: &Context) -> Result<Stage, Error> {
let stage = Stage {
..Default::default()
};

Ok(stage)
}
}

#[derive(Default, Stage)]
#[stage(name = "reducer", unit = "ChainEvent", worker = "Worker")]
pub struct Stage {
pub input: ReducerInputPort,
pub output: ReducerOutputPort,

#[metric]
ops_count: gasket::metrics::Counter,
}

impl StageBootstrapper for Stage {
fn connect_input(&mut self, adapter: InputAdapter) {
self.input.connect(adapter)
}

fn connect_output(&mut self, adapter: OutputAdapter) {
self.output.connect(adapter)
}

fn spawn(self, policy: gasket::runtime::Policy) -> Tether {
gasket::runtime::spawn_stage(self, policy)
}
}

#[derive(Default)]
pub struct Worker;

impl From<&Stage> for Worker {
fn from(_: &Stage) -> Self {
Self
}
}

gasket::impl_splitter!(|_worker: Worker, stage: Stage, unit: ChainEvent| => {
let record = unit.record();
if record.is_none() {
return Ok(());
}

let record = record.unwrap();

let commands = match record {
Record::EnrichedBlockPayload(block, ctx) => {
let block = MultiEraBlock::decode(block)
.map_err(Error::cbor)
// .apply_policy(&self.policy)
.or_panic()?;

let commands = vec![];

// TODO: call deno runtime

Ok(commands)
},
_ => todo!(),
}?;

Some(ChainEvent::apply(unit.point().clone(), Record::CRDTCommand(commands)))
});
128 changes: 30 additions & 98 deletions src/reducers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,116 +1,48 @@
use gasket::framework::*;
use gasket::{
messaging::{RecvPort, SendPort},
runtime::Tether,
};
use pallas::ledger::traverse::MultiEraBlock;
use gasket::{messaging::SendPort, runtime::Tether};
use serde::Deserialize;

use crate::framework::model::CRDTCommand;
use crate::framework::*;
use crate::framework::{errors::Error, *};

mod full_utxos_by_address;
pub mod builtin;
pub mod deno;

#[derive(Deserialize)]
#[serde(tag = "type")]
pub enum Config {
FullUtxosByAddress(full_utxos_by_address::Config),
pub enum Bootstrapper {
BuiltIn(builtin::Stage),
Deno(deno::Stage),
}
impl Config {
pub fn plugin(self) -> Box<dyn ReducerTrait> {

impl StageBootstrapper for Bootstrapper {
fn connect_output(&mut self, adapter: OutputAdapter) {
match self {
Config::FullUtxosByAddress(c) => c.plugin(),
Bootstrapper::BuiltIn(p) => p.output.connect(adapter),
Bootstrapper::Deno(p) => p.output.connect(adapter),
}
}
}

pub trait ConfigTrait {
fn bootstrapper(self, ctx: &Context) -> Result<Stage, Error>;
}
impl ConfigTrait for Vec<Config> {
fn bootstrapper(self, _ctx: &Context) -> Result<Stage, Error> {
let reducers: Vec<Box<dyn ReducerTrait>> =
self.into_iter().map(|c: Config| c.plugin()).collect();

let stage = Stage {
reducers,
..Default::default()
};

Ok(stage)
}
}

#[derive(Default, Stage)]
#[stage(name = "reducer", unit = "ChainEvent", worker = "Worker")]
pub struct Stage {
reducers: Vec<Box<dyn ReducerTrait>>,

pub input: ReducerInputPort,
pub output: ReducerOutputPort,

#[metric]
ops_count: gasket::metrics::Counter,
}
impl StageBootstrapper for Stage {
fn connect_input(&mut self, adapter: InputAdapter) {
self.input.connect(adapter)
}

fn connect_output(&mut self, adapter: OutputAdapter) {
self.output.connect(adapter)
fn connect_input(&mut self, _: InputAdapter) {
panic!("attempted to use source stage as receiver");
}

fn spawn(self, policy: gasket::runtime::Policy) -> Tether {
gasket::runtime::spawn_stage(self, policy)
}
}

#[derive(Default)]
pub struct Worker;
impl From<&Stage> for Worker {
fn from(_: &Stage) -> Self {
Self
match self {
Bootstrapper::BuiltIn(s) => gasket::runtime::spawn_stage(s, policy),
Bootstrapper::Deno(s) => gasket::runtime::spawn_stage(s, policy),
}
}
}
gasket::impl_splitter!(|_worker: Worker, stage: Stage, unit: ChainEvent| => {
let record = unit.record();
if record.is_none() {
return Ok(());
}

let record = record.unwrap();

let commands = match record {
Record::EnrichedBlockPayload(block, ctx) => {
let block = MultiEraBlock::decode(block)
.map_err(Error::cbor)
// .apply_policy(&self.policy)
.or_panic()?;

let mut commands: Vec<CRDTCommand> = Vec::new();

for x in stage.reducers.iter_mut() {
commands.append(&mut x.reduce_block(&block, ctx).await.or_retry()?)
}

Ok(commands)
},
_ => todo!(),
}?;

Some(ChainEvent::apply(unit.point().clone(), Record::CRDTCommand(commands)))
});

#[async_trait::async_trait]
pub trait ReducerTrait: Send + Sync {
async fn reduce_block<'b>(
&mut self,
block: &'b MultiEraBlock<'b>,
ctx: &model::BlockContext,
) -> Result<Vec<CRDTCommand>, Error>;
#[derive(Deserialize)]
#[serde(tag = "type")]
pub enum Config {
BuiltIn(builtin::Config),
Deno(deno::Config),
}

trait ReducerConfigTrait {
fn plugin(self) -> Box<dyn ReducerTrait>;
impl Config {
pub fn bootstrapper(self, ctx: &Context) -> Result<Bootstrapper, Error> {
match self {
Config::BuiltIn(c) => Ok(Bootstrapper::BuiltIn(c.bootstrapper(ctx)?)),
Config::Deno(c) => Ok(Bootstrapper::Deno(c.bootstrapper(ctx)?)),
}
}
}

0 comments on commit c3e5902

Please sign in to comment.