Skip to content

Commit

Permalink
fix: get custom event listeners building
Browse files Browse the repository at this point in the history
  • Loading branch information
drewstone authored and Serial-ATA committed Feb 14, 2025
1 parent 77daa97 commit f75ebea
Show file tree
Hide file tree
Showing 15 changed files with 454 additions and 272 deletions.
2 changes: 1 addition & 1 deletion blueprints/examples/src/raw_tangle_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pub struct MyContext {

pub async fn constructor(
env: GadgetConfiguration,
) -> color_eyre::Result<impl InitializableEventHandler> {
) -> color_eyre::Result<impl InitializableEventHandler + Clone + Send + Sync + 'static> {
let signer = env
.clone()
.keystore()
Expand Down
40 changes: 23 additions & 17 deletions blueprints/examples/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,7 @@ use blueprint_sdk::testing::tempfile;
use blueprint_sdk::testing::utils::anvil::keys::{inject_anvil_key, ANVIL_PRIVATE_KEYS};
use blueprint_sdk::testing::utils::anvil::{get_receipt, start_default_anvil_testnet};
use blueprint_sdk::testing::utils::harness::TestHarness;
use blueprint_sdk::testing::utils::runner::TestEnv;
use blueprint_sdk::testing::utils::tangle::{InputValue, OutputValue, TangleTestHarness};
use blueprint_sdk::testing::utils::tangle::{InputValue, TangleTestHarness};
use blueprint_sdk::tokio;
use blueprint_sdk::tokio::task::JoinHandle;
use blueprint_sdk::tokio::time::timeout;
Expand Down Expand Up @@ -154,17 +153,22 @@ async fn test_periodic_web_poller() -> Result<()> {
let harness = TangleTestHarness::setup(temp_dir).await?;

// Setup service
let (mut test_env, service_id, _blueprint_id) = harness.setup_services(false).await?;
let (mut test_env, service_id, _blueprint_id) = harness.setup_services::<1>(false).await?;

// Add the web poller job
test_env.add_job(crate::periodic_web_poller::constructor("*/5 * * * * *"));
test_env
.add_job(|_env| async move {
Ok::<_, ()>(crate::periodic_web_poller::constructor("*/5 * * * * *"))
})
.await
.unwrap();

// Run the test environment
test_env.run_runner().await.unwrap();
test_env.start().await?;

// Execute job and verify result
let result = tokio::select! {
result = harness.execute_job(service_id, 1, vec![], vec![OutputValue::Uint64(1)]) => {
result = harness.submit_job(service_id, 1, vec![]) => {
match result {
Ok(_) => {Ok(())},
Err(e) => Err(e),
Expand Down Expand Up @@ -199,26 +203,26 @@ async fn test_services_context() -> Result<()> {
// Initialize test harness
let temp_dir = tempfile::TempDir::new()?;
let harness = TangleTestHarness::setup(temp_dir).await?;
let env = harness.env().clone();

// Setup service
let (mut test_env, service_id, _blueprint_id) = harness.setup_services(false).await?;
let (mut test_env, service_id, _blueprint_id) = harness.setup_services::<1>(false).await?;

// Add the raw tangle events job
test_env.add_job(crate::services_context::constructor(env.clone()).await?);
test_env
.add_job(|env| async move { crate::services_context::constructor(env).await })
.await?;

// Run the test environment
let _test_handle = tokio::spawn(async move {
test_env.run_runner().await.unwrap();
test_env.start().await.unwrap();
});

// Execute job and verify result
let results = harness
.execute_job(
.submit_job(
service_id,
3,
vec![InputValue::List(BoundedVec(vec![InputValue::Uint8(0)]))],
vec![OutputValue::Uint64(1)],
)
.await?;

Expand All @@ -236,20 +240,22 @@ async fn test_raw_tangle_events() -> Result<()> {
let env = harness.env().clone();

// Setup service
let (mut test_env, service_id, _blueprint_id) = harness.setup_services(false).await?;
let (mut test_env, service_id, _blueprint_id) = harness.setup_services::<1>(false).await?;

// Add the raw tangle events job
test_env.add_job(crate::raw_tangle_events::constructor(env.clone()).await?);
test_env
.add_job(|env| async move { crate::raw_tangle_events::constructor(env).await })
.await?;

// Spawn the balance transfer task
let _handle = balance_transfer_event(env.clone()).await.unwrap();
let _handle = balance_transfer_event(env.clone()).await?;

// Run the test environment
test_env.run_runner().await.unwrap();
test_env.start().await?;

// Execute job and verify result
let result = tokio::select! {
result = harness.execute_job(service_id, 2, vec![], vec![OutputValue::Uint64(1)]) => {
result = harness.submit_job(service_id, 2, vec![]) => {
match result {
Ok(_) => {Ok(())},
Err(e) => Err(e),
Expand Down
3 changes: 0 additions & 3 deletions blueprints/incredible-squaring/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
use crate::{MyContext, XsquareEventHandler};
use blueprint_sdk::config::GadgetConfiguration;
use blueprint_sdk::logging::setup_log;
use blueprint_sdk::testing::tempfile;
use blueprint_sdk::testing::utils::harness::TestHarness;
use blueprint_sdk::testing::utils::runner::TestEnv;
use blueprint_sdk::testing::utils::tangle::{InputValue, OutputValue, TangleTestHarness};
use color_eyre::Result;
use std::time::Duration;

#[tokio::test]
async fn test_incredible_squaring() -> Result<()> {
Expand Down
66 changes: 41 additions & 25 deletions crates/event-listeners/core/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,36 +159,44 @@ pub struct EventFlowFilter<O: Send + 'static> {

impl<
Ctx: Clone + Send + 'static,
Event: Send + 'static,
Creator: Send + 'static,
Event: Send + 'static,
PreProcessOut: Send + 'static,
JobOutput: Send + 'static,
ProcessorError: core::error::Error + Send + Sync + 'static,
> EventFlowWrapper<Ctx, Creator, Event, PreProcessOut, JobOutput, ProcessorError>
{
pub fn new<T, Pre, PreFut, Job, JobFut, Post, PostFut>(
pub fn new<
PreProcessor,
PreProcessorFut,
JobProcessor,
JobProcessorFut,
PostProcessor,
PostProcessorFut,
L,
>(
context: Ctx,
event_listener: T,
preprocessor: Pre,
job_processor: Job,
mut postprocessor: Post,
event_listener: L,
preprocessor: PreProcessor,
job_processor: JobProcessor,
mut postprocessor: PostProcessor,
) -> Self
where
T: EventListener<Event, Ctx, Creator, ProcessorError = ProcessorError>,
Pre: Fn(Event) -> PreFut + Send + 'static,
PreFut:
PreProcessor: Fn(Event) -> PreProcessorFut + Send + 'static,
PreProcessorFut:
Future<Output = Result<Option<PreProcessOut>, Error<ProcessorError>>> + Send + 'static,
Job: Fn((PreProcessOut, Ctx)) -> JobFut + Send + 'static,
JobFut: Future<Output = Result<JobOutput, Error<ProcessorError>>> + Send + 'static,
Post: FnMut(JobOutput) -> PostFut + Send + 'static,
PostFut: Future<Output = Result<(), Error<ProcessorError>>> + Send + 'static,
JobProcessor: Fn((PreProcessOut, Ctx)) -> JobProcessorFut + Send + 'static,
JobProcessorFut: Future<Output = Result<JobOutput, Error<ProcessorError>>> + Send + 'static,
PostProcessor: FnMut(JobOutput) -> PostProcessorFut + Send + 'static,
PostProcessorFut: Future<Output = Result<(), Error<ProcessorError>>> + Send + 'static,
L: EventListener<Event, Ctx, Creator, ProcessorError = ProcessorError> + 'static,
{
Self {
context,
event_listener: Box::new(event_listener),
preprocessor: Box::new(move |event| Box::pin(preprocessor(event))),
job_processor: Box::new(move |(event, ctx)| Box::pin(job_processor((event, ctx)))),
postprocessor: Box::new(move |event| Box::pin(postprocessor(event))),
preprocessor: Box::new(move |evt| Box::pin(preprocessor(evt))),
job_processor: Box::new(move |input| Box::pin(job_processor(input))),
postprocessor: Box::new(move |output| Box::pin(postprocessor(output))),
_pd: PhantomData,
}
}
Expand Down Expand Up @@ -290,20 +298,28 @@ mod tests {
}
}

async fn preprocess(event: TestEvent) -> Result<Option<(u64, TestEvent)>, Error<Infallible>> {
let amount = event.fetch_add(1, Ordering::SeqCst) + 1;
Ok(Some((amount, event)))
fn preprocess(
event: TestEvent,
) -> impl Future<Output = Result<Option<(u64, TestEvent)>, Error<Infallible>>> + Send {
async move {
let amount = event.fetch_add(1, Ordering::SeqCst) + 1;
Ok(Some((amount, event)))
}
}

async fn job_processor(
fn job_processor(
preprocessed_event: ((u64, TestEvent), Arc<AtomicU64>),
) -> Result<u64, Error<Infallible>> {
let amount = preprocessed_event.1.fetch_add(1, Ordering::SeqCst) + 1;
Ok(amount)
) -> impl Future<Output = Result<u64, Error<Infallible>>> + Send {
async move {
let amount = preprocessed_event.1.fetch_add(1, Ordering::SeqCst) + 1;
Ok(amount)
}
}

async fn post_process(_job_output: u64) -> Result<(), Error<Infallible>> {
Ok(())
fn post_process(
_job_output: u64,
) -> impl Future<Output = Result<(), Error<Infallible>>> + Send {
async move { Ok(()) }
}

#[tokio::test]
Expand Down
121 changes: 93 additions & 28 deletions crates/macros/blueprint-proc-macro/src/job/args/event_listener.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#[cfg(feature = "evm")]
use crate::job::evm::EvmArgs;
use crate::job::ParameterType;
use crate::shared;
#[cfg(feature = "evm")]
use proc_macro2::Ident;
#[cfg(feature = "tangle")]
Expand All @@ -11,7 +12,7 @@ use std::str::FromStr;
use syn::parse::{Parse, ParseBuffer, ParseStream};
#[cfg(any(not(feature = "evm"), not(feature = "tangle")))]
use syn::spanned::Spanned;
use syn::{Index, Token, Type};
use syn::{GenericArgument, Index, PathArguments, Token, Type};

const EVM_EVENT_LISTENER_TAG: &str = "EvmContractEventListener";
const TANGLE_EVENT_LISTENER_TAG: &str = "TangleEventListener";
Expand Down Expand Up @@ -64,7 +65,8 @@ pub enum ListenerType {
Custom,
}

pub(crate) struct SingleListener {
#[derive(Debug, Clone)]
pub struct SingleListener {
pub listener: Type,
#[cfg(feature = "evm")]
pub evm_args: Option<EvmArgs>,
Expand All @@ -87,7 +89,7 @@ impl SingleListener {

/// `#[job(event_listener(MyCustomListener, MyCustomListener2)]`
/// Accepts an optional argument that specifies the event listener to use that implements EventListener
pub(crate) struct EventListenerArgs {
pub struct EventListenerArgs {
pub(crate) listeners: Vec<SingleListener>,
}

Expand Down Expand Up @@ -172,8 +174,8 @@ impl Parse for EventListenerArgs {
}
} else {
return Err(content.error(
"Unexpected field parsed. Expected one of `listener`, `event`, `pre_processor`, `post_processor`",
));
"Unexpected field parsed. Expected one of `listener`, `event`, `pre_processor`, `post_processor`",
));
}
}

Expand Down Expand Up @@ -253,43 +255,106 @@ impl EventListenerArgs {
let listener_type = self.get_event_listener().listener_type;

params
.iter()
.enumerate()
.map(|(i, param_ty)| {
let ident = format_ident!("param{i}");
let index = Index::from(i);
match listener_type {
#[cfg(feature = "tangle")]
ListenerType::Tangle => {
let ty_token_stream = proc_macro2::TokenStream::from_str(&param_ty.ty.as_rust_type()).expect("should be valid");
let ty_tokens = quote_spanned! {param_ty.span.expect("should always be available")=>
.iter()
.enumerate()
.map(|(i, param_ty)| {
let ident = format_ident!("param{i}");
let index = Index::from(i);
match listener_type {
#[cfg(feature = "tangle")]
ListenerType::Tangle => {
let ty_token_stream = proc_macro2::TokenStream::from_str(&param_ty.ty.as_rust_type()).expect("should be valid");
let ty_tokens = quote_spanned! {param_ty.span.expect("should always be available")=>
#ty_token_stream
};
quote! {
quote! {
let __arg = args.next().expect("parameter count checked before");
let Ok(#ident) = ::blueprint_sdk::macros::ext::blueprint_serde::from_field::<#ty_tokens>(__arg) else {
return Err(::blueprint_sdk::macros::ext::event_listeners::core::Error::BadArgumentDecoding(format!("Failed to decode the field `{}` to `{}`", stringify!(#ident), stringify!(#ty_tokens))));
};
}
}
}

#[cfg(feature = "evm")]
ListenerType::Evm => {
#[cfg(feature = "evm")]
ListenerType::Evm => {
let _ = param_ty;
quote! {
quote! {
let #ident = inputs.#index;
}
}
}

// All other event listeners will return just one type
ListenerType::Custom => {
quote! {
ListenerType::Custom => {
quote! {
let #ident = inputs.#index;
}
}
}
})
.collect::<Vec<_>>()
}
}
})
.collect::<Vec<_>>()
}

pub fn infer_types(&mut self, sig: &syn::Signature) -> syn::Result<()> {
for listener in &mut self.listeners {
if let Type::Path(path) = &mut listener.listener {
if let Some(last_segment) = path.path.segments.last_mut() {
// Don't try to infer types if they're explicitly specified
let has_explicit_types = match &last_segment.arguments {
PathArguments::AngleBracketed(args) => !args.args.is_empty(),
_ => false,
};

if !has_explicit_types {
// Get all parameter types from the function signature
let param_types = shared::param_types(sig)?;

// Find the context parameter - it's usually the one with a type containing "Context" or ending with "Ctx"
let mut context_type = None;
let mut event_type = None;
for ty in param_types.values() {
if let Type::Path(type_path) = ty {
let type_name = type_path
.path
.segments
.last()
.map(|seg| seg.ident.to_string())
.unwrap_or_default();
if type_name.contains("Context") || type_name.ends_with("Ctx") {
context_type = Some(ty.clone());
} else {
// If it's not a context type, it's probably an event type
event_type = Some(ty.clone());
}
}
}

// If we couldn't find a context type, default to unit type
if context_type.is_none() {
context_type = Some(syn::parse_quote! { () });
}

// If we couldn't find an event type, default to unit type
if event_type.is_none() {
event_type = Some(syn::parse_quote! { () });
}

// Create the generic arguments based on what we found
let generic_args = syn::AngleBracketedGenericArguments {
colon2_token: None,
lt_token: syn::token::Lt::default(),
args: syn::punctuated::Punctuated::from_iter(vec![
GenericArgument::Type(event_type.unwrap()),
GenericArgument::Type(context_type.unwrap()),
]),
gt_token: syn::token::Gt::default(),
};

// Set the generic arguments directly
last_segment.arguments = PathArguments::AngleBracketed(generic_args);
}
}
}
}
Ok(())
}

#[cfg(feature = "tangle")]
Expand Down
Loading

0 comments on commit f75ebea

Please sign in to comment.