diff --git a/Cargo.lock b/Cargo.lock index 0c229484..4fdff181 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -241,8 +241,6 @@ name = "arbiter-bindings" version = "0.1.1" dependencies = [ "ethers", - "revm", - "revm-primitives", "serde", ] @@ -297,14 +295,16 @@ dependencies = [ "arbiter-bindings", "arbiter-core", "artemis-core", - "async-stream", "async-trait", "crossbeam-channel", "ethers", + "flume", "futures-util", "serde", "serde_json", "tokio", + "tracing", + "tracing-subscriber", ] [[package]] @@ -1934,6 +1934,18 @@ dependencies = [ "num-traits", ] +[[package]] +name = "flume" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55ac459de2512911e4b674ce33cf20befaba382d05b62b008afc1c8b57cbf181" +dependencies = [ + "futures-core", + "futures-sink", + "nanorand", + "spin 0.9.8", +] + [[package]] name = "fnv" version = "1.0.7" @@ -3240,6 +3252,15 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "nanorand" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a51313c5820b0b02bd422f4b44776fbf47961755c74ce64afc73bfad10226c3" +dependencies = [ + "getrandom 0.2.11", +] + [[package]] name = "native-tls" version = "0.2.11" @@ -5430,6 +5451,9 @@ name = "spin" version = "0.9.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] [[package]] name = "spki" diff --git a/Cargo.toml b/Cargo.toml index b0f1a3cf..cd4bb8b9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ arbiter-core = { path = "./arbiter-core" } crossbeam-channel = { version = "=0.5.8" } futures-util = { version = "=0.3.29" } async-trait = { version = "0.1.74" } +tracing = "0.1.40" # Dependencies for the release build [dependencies] diff --git a/arbiter-bindings/Cargo.toml b/arbiter-bindings/Cargo.toml index df370441..ca326108 100644 --- a/arbiter-bindings/Cargo.toml +++ b/arbiter-bindings/Cargo.toml @@ -9,6 +9,4 @@ license = "Apache-2.0" [dependencies] ethers.workspace = true -revm.workspace = true -revm-primitives.workspace = true serde.workspace = true diff --git a/arbiter-core/Cargo.toml b/arbiter-core/Cargo.toml index 1e3944a4..19322cd3 100644 --- a/arbiter-core/Cargo.toml +++ b/arbiter-core/Cargo.toml @@ -37,7 +37,7 @@ thiserror.workspace = true # Logging futures-util.workspace = true -tracing = "0.1.40" +tracing.workspace = true # File types polars = { version = "0.35.4", features = ["parquet", "csv", "json"] } diff --git a/arbiter-core/src/environment/mod.rs b/arbiter-core/src/environment/mod.rs index 8cbf0d9d..e987a686 100644 --- a/arbiter-core/src/environment/mod.rs +++ b/arbiter-core/src/environment/mod.rs @@ -45,7 +45,6 @@ use revm::{ }, EVM, }; -// use hashbrown::{hash_map, HashMap as HashMapBrown}; use serde::{Deserialize, Serialize}; use thiserror::Error; diff --git a/arbiter-core/src/middleware/mod.rs b/arbiter-core/src/middleware/mod.rs index 5017bc06..a97a581d 100644 --- a/arbiter-core/src/middleware/mod.rs +++ b/arbiter-core/src/middleware/mod.rs @@ -42,11 +42,6 @@ use futures_timer::Delay; use rand::{rngs::StdRng, SeedableRng}; use revm::primitives::{CreateScheme, Output, TransactTo, TxEnv, U256}; use serde::{de::DeserializeOwned, Serialize}; -// use revm::primitives::{ExecutionResult, Output}; -// use super::cast::revm_logs_to_ethers_logs; -// use super::errors::RevmMiddlewareError; - -// use async_trait::async_trait; use thiserror::Error; use super::*; diff --git a/arbiter-engine/Cargo.toml b/arbiter-engine/Cargo.toml index cf3c902b..a7cef2df 100644 --- a/arbiter-engine/Cargo.toml +++ b/arbiter-engine/Cargo.toml @@ -12,7 +12,7 @@ readme = "../README.md" ethers.workspace = true arbiter-core.workspace = true arbiter-bindings = { path = "../arbiter-bindings" } -artemis-core = { git = "https://github.com/paradigmxyz/artemis.git"} +artemis-core = { git = "https://github.com/paradigmxyz/artemis.git" } crossbeam-channel.workspace = true futures-util.workspace = true async-trait.workspace = true @@ -20,4 +20,8 @@ serde_json.workspace = true serde.workspace = true tokio.workspace = true anyhow = { version = "=1.0.75" } -async-stream = "0.3.5" \ No newline at end of file +tracing.workspace = true +flume = "0.11.0" + +[dev-dependencies] +tracing-subscriber = "0.3.18" \ No newline at end of file diff --git a/arbiter-engine/src/agent.rs b/arbiter-engine/src/agent.rs index 116bd44c..2ef6b526 100644 --- a/arbiter-engine/src/agent.rs +++ b/arbiter-engine/src/agent.rs @@ -12,7 +12,7 @@ use artemis_core::{ engine::Engine, - types::{Collector, Executor}, + types::{Collector, Executor, Strategy}, }; /// An agent is an entity capable of processing events and producing actions. @@ -22,17 +22,18 @@ use artemis_core::{ pub struct Agent { /// Identifier for this agent. /// Used for routing messages. - _id: String, + pub id: String, /// The engine that this agent uses to process events and produce actions. - engine: Engine, /* Note, agent shouldn't NEED a client as a field as the engine can - * handle this. */ + pub(crate) engine: Option>, /* Note, agent shouldn't NEED a client as a field + * as the engine can + * handle this. */ /// Agents that this agent depends on. - dependencies: Vec, + pub dependencies: Vec, /// Agents that depend on this agent. - dependents: Vec, + pub dependents: Vec, } impl Agent @@ -44,8 +45,8 @@ where /// Produces a new agent with the given identifier. pub fn new(id: &str) -> Self { Self { - _id: id.to_owned(), - engine: Engine::new(), + id: id.to_owned(), + engine: Some(Engine::new()), dependencies: vec![], dependents: vec![], } @@ -53,12 +54,26 @@ where /// Adds a collector to the agent's engine. pub fn add_collector(&mut self, collector: impl Collector + 'static) { - self.engine.add_collector(Box::new(collector)); + self.engine + .as_mut() + .expect("Engine has already been taken by the `World::run()` method.") + .add_collector(Box::new(collector)); } /// Adds an executor to the agent's engine. pub fn add_executor(&mut self, executor: impl Executor + 'static) { - self.engine.add_executor(Box::new(executor)); + self.engine + .as_mut() + .expect("Engine has already been taken by the `World::run()` method.") + .add_executor(Box::new(executor)); + } + + /// Adds a strategy to the agent's engine. + pub fn add_strategy(&mut self, strategy: impl Strategy + 'static) { + self.engine + .as_mut() + .expect("Engine has already been taken by the `World::run()` method.") + .add_strategy(Box::new(strategy)); } /// Adds a dependency to the agent. @@ -86,6 +101,7 @@ mod tests { use super::*; + #[ignore] #[tokio::test] async fn test_agent() { // Startup diff --git a/arbiter-engine/src/examples.rs b/arbiter-engine/src/examples.rs index 84221a99..986218b4 100644 --- a/arbiter-engine/src/examples.rs +++ b/arbiter-engine/src/examples.rs @@ -113,3 +113,90 @@ impl Strategy for TokenAdmin { } } } + +#[cfg(test)] +mod tests { + + use arbiter_core::{ + environment::builder::EnvironmentBuilder, middleware::connection::Connection, + }; + use ethers::providers::Provider; + + use super::*; + use crate::{agent::Agent, messager::Messager, world::World}; + + struct TimedMessage { + delay: u64, + message: Message, + } + + #[async_trait::async_trait] + impl Strategy for TimedMessage { + #[tracing::instrument(skip(self), level = "trace")] + async fn sync_state(&mut self) -> Result<()> { + trace!("Syncing state."); + Ok(()) + } + + #[tracing::instrument(skip(self, event), level = "trace")] + async fn process_event(&mut self, event: Message) -> Vec { + trace!("Processing event."); + if event.to == self.message.to { + let message = Message { + from: "agent1".to_owned(), + to: "agent1".to_owned(), + data: "Hello, world!".to_owned(), + }; + if event.data == "Start" { + vec![message] + } else { + tokio::time::sleep(std::time::Duration::from_secs(self.delay)).await; + vec![message] + } + } else { + vec![] + } + } + } + + #[ignore] + #[tokio::test] + async fn base_simulation() { + let subscriber = tracing_subscriber::FmtSubscriber::builder() + .with_max_level(tracing::Level::TRACE) // Set the maximum level to TRACE + .finish(); + + let _guard = tracing::subscriber::set_default(subscriber); + let environment = EnvironmentBuilder::new().build(); + let connection = Connection::from(&environment); + let provider = Provider::new(connection); + let mut world = World::new("test_world", provider); + + let mut agent = Agent::new("agent1"); + let messager = Messager::new(); + agent.add_collector(messager.clone()); + agent.add_executor(messager.clone()); + + let strategy = TimedMessage { + delay: 1, + message: Message { + from: "agent1".to_owned(), + to: "agent1".to_owned(), + data: "Hello, world!".to_owned(), + }, + }; + agent.add_strategy(strategy); + + world.add_agent(agent); + let world_task = tokio::spawn(async move { world.run().await }); + + let message = Message { + from: "agent1".to_owned(), + to: "agent1".to_owned(), + data: "Start".to_owned(), + }; + let send_result = messager.execute(message).await; + + world_task.await.unwrap(); + } +} diff --git a/arbiter-engine/src/lib.rs b/arbiter-engine/src/lib.rs index 4e90955a..d345fd34 100644 --- a/arbiter-engine/src/lib.rs +++ b/arbiter-engine/src/lib.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use anyhow::Result; use serde::{Deserialize, Serialize}; +use tracing::{debug, trace, warn}; pub mod agent; pub mod examples; diff --git a/arbiter-engine/src/messager.rs b/arbiter-engine/src/messager.rs index da8af0af..f702cb0c 100644 --- a/arbiter-engine/src/messager.rs +++ b/arbiter-engine/src/messager.rs @@ -3,7 +3,7 @@ //! The messager module contains the core messager layer for the Arbiter Engine. use artemis_core::types::{Collector, CollectorStream, Executor}; -use tokio::sync::broadcast::Sender; +use flume::{unbounded, Receiver, Sender}; use super::*; @@ -22,42 +22,37 @@ pub struct Message { } /// A messager that can be used to send messages between agents. +#[derive(Clone, Debug)] pub struct Messager { - broadcaster: Sender, + sender: Sender, + receiver: Receiver, } impl Messager { /// Creates a new messager with the given capacity. - pub fn new(capacity: usize) -> Self { - Self { - broadcaster: Sender::new(capacity), - } - } -} - -impl Default for Messager { - fn default() -> Self { - Self::new(32) + #[allow(clippy::new_without_default)] + pub fn new() -> Self { + let (sender, receiver) = unbounded(); + Self { sender, receiver } } } #[async_trait::async_trait] impl Collector for Messager { + #[tracing::instrument(skip(self), level = "debug", target = "messager")] async fn get_event_stream(&self) -> Result> { - let mut subscription = self.broadcaster.subscribe(); - let stream = async_stream::stream! { - while let Ok(message) = subscription.recv().await { - yield message; - } - }; + debug!("Getting the event stream for the messager."); + let stream = self.receiver.clone().into_stream(); Ok(Box::pin(stream)) } } #[async_trait::async_trait] impl Executor for Messager { + #[tracing::instrument(skip(self), level = "trace", target = "messager")] async fn execute(&self, message: Message) -> Result<()> { - let _buf_len = self.broadcaster.send(message)?; + trace!("Broadcasting message."); + self.sender.send(message)?; Ok(()) } } diff --git a/arbiter-engine/src/world.rs b/arbiter-engine/src/world.rs index 8801fd3f..39a67eee 100644 --- a/arbiter-engine/src/world.rs +++ b/arbiter-engine/src/world.rs @@ -45,6 +45,8 @@ pub struct Interconnect { impl World where P: PubsubClient, + E: Send + Clone + 'static + std::fmt::Debug, + A: Send + Clone + 'static + std::fmt::Debug, { // TODO: May not need to take in the provider here, but rather get it from the // agents. @@ -62,6 +64,18 @@ where pub fn add_agent(&mut self, agent: Agent) { self.agents.push(agent); } + + /// Runs the agents in the world. + pub async fn run(&mut self) { + for agent in self.agents.iter_mut() { + let mut joinset = agent.engine.take().unwrap().run().await.unwrap(); + while let Some(next) = joinset.join_next().await { + if let Err(e) = next { + panic!("Error: {:?}", e); + } + } + } + } } #[cfg(test)] @@ -93,7 +107,7 @@ mod tests { let client = RevmMiddleware::new(&environment, Some("testname")).unwrap(); let mut agent = Agent::new("agent1"); - let messager = Messager::default(); + let messager = Messager::new(); agent.add_collector(messager); agent.add_executor(MempoolExecutor::new(client.clone())); world.add_agent(agent);