Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(arbiter-engine): Run method for agents and messager echo example #746

Merged
merged 7 commits into from
Dec 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ arbiter-core = { path = "./arbiter-core" }
crossbeam-channel = { version = "=0.5.8" }
futures-util = { version = "=0.3.29" }
async-trait = { version = "0.1.74" }
tracing = "0.1.40"

# Dependencies for the release build
[dependencies]
Expand Down
2 changes: 0 additions & 2 deletions arbiter-bindings/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,4 @@ license = "Apache-2.0"

[dependencies]
ethers.workspace = true
revm.workspace = true
revm-primitives.workspace = true
serde.workspace = true
2 changes: 1 addition & 1 deletion arbiter-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ thiserror.workspace = true

# Logging
futures-util.workspace = true
tracing = "0.1.40"
tracing.workspace = true

# File types
polars = { version = "0.35.4", features = ["parquet", "csv", "json"] }
Expand Down
1 change: 0 additions & 1 deletion arbiter-core/src/environment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use revm::{
},
EVM,
};
// use hashbrown::{hash_map, HashMap as HashMapBrown};
use serde::{Deserialize, Serialize};
use thiserror::Error;

Expand Down
5 changes: 0 additions & 5 deletions arbiter-core/src/middleware/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@
use rand::{rngs::StdRng, SeedableRng};
use revm::primitives::{CreateScheme, Output, TransactTo, TxEnv, U256};
use serde::{de::DeserializeOwned, Serialize};
// use revm::primitives::{ExecutionResult, Output};
// use super::cast::revm_logs_to_ethers_logs;
// use super::errors::RevmMiddlewareError;

// use async_trait::async_trait;
use thiserror::Error;

use super::*;
Expand Down Expand Up @@ -173,8 +168,8 @@

let provider = Provider::new(connection);
info!(
"Created new `RevmMiddleware` instance attached to environment labeled:
{:?}",

Check warning on line 172 in arbiter-core/src/middleware/mod.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-core/src/middleware/mod.rs#L171-L172

Added lines #L171 - L172 were not covered by tests
environment.parameters.label
);
Ok(Arc::new(Self {
Expand Down
8 changes: 6 additions & 2 deletions arbiter-engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@ readme = "../README.md"
ethers.workspace = true
arbiter-core.workspace = true
arbiter-bindings = { path = "../arbiter-bindings" }
artemis-core = { git = "https://github.com/paradigmxyz/artemis.git"}
artemis-core = { git = "https://github.com/paradigmxyz/artemis.git" }
crossbeam-channel.workspace = true
futures-util.workspace = true
async-trait.workspace = true
serde_json.workspace = true
serde.workspace = true
tokio.workspace = true
anyhow = { version = "=1.0.75" }
async-stream = "0.3.5"
tracing.workspace = true
flume = "0.11.0"

[dev-dependencies]
tracing-subscriber = "0.3.18"
36 changes: 26 additions & 10 deletions arbiter-engine/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@

use artemis_core::{
engine::Engine,
types::{Collector, Executor},
types::{Collector, Executor, Strategy},
};

/// An agent is an entity capable of processing events and producing actions.
Expand All @@ -22,17 +22,18 @@
pub struct Agent<E, A> {
/// Identifier for this agent.
/// Used for routing messages.
_id: String,
pub id: String,

/// The engine that this agent uses to process events and produce actions.
engine: Engine<E, A>, /* Note, agent shouldn't NEED a client as a field as the engine can
* handle this. */
pub(crate) engine: Option<Engine<E, A>>, /* Note, agent shouldn't NEED a client as a field
* as the engine can
* handle this. */

/// Agents that this agent depends on.
dependencies: Vec<String>,
pub dependencies: Vec<String>,

/// Agents that depend on this agent.
dependents: Vec<String>,
pub dependents: Vec<String>,
}

impl<E, A> Agent<E, A>
Expand All @@ -42,36 +43,50 @@
{
#[allow(clippy::new_without_default)]
/// Produces a new agent with the given identifier.
pub fn new(id: &str) -> Self {
Self {
_id: id.to_owned(),
engine: Engine::new(),
id: id.to_owned(),
engine: Some(Engine::new()),
dependencies: vec![],
dependents: vec![],
}
}

Check warning on line 53 in arbiter-engine/src/agent.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-engine/src/agent.rs#L46-L53

Added lines #L46 - L53 were not covered by tests

/// Adds a collector to the agent's engine.
pub fn add_collector(&mut self, collector: impl Collector<E> + 'static) {
self.engine.add_collector(Box::new(collector));
self.engine
.as_mut()
.expect("Engine has already been taken by the `World::run()` method.")
.add_collector(Box::new(collector));
}

Check warning on line 61 in arbiter-engine/src/agent.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-engine/src/agent.rs#L56-L61

Added lines #L56 - L61 were not covered by tests

/// Adds an executor to the agent's engine.
pub fn add_executor(&mut self, executor: impl Executor<A> + 'static) {
self.engine.add_executor(Box::new(executor));
self.engine
.as_mut()
.expect("Engine has already been taken by the `World::run()` method.")
.add_executor(Box::new(executor));
}

Check warning on line 69 in arbiter-engine/src/agent.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-engine/src/agent.rs#L64-L69

Added lines #L64 - L69 were not covered by tests

/// Adds a strategy to the agent's engine.
pub fn add_strategy(&mut self, strategy: impl Strategy<E, A> + 'static) {
self.engine
.as_mut()
.expect("Engine has already been taken by the `World::run()` method.")
.add_strategy(Box::new(strategy));
}

Check warning on line 77 in arbiter-engine/src/agent.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-engine/src/agent.rs#L72-L77

Added lines #L72 - L77 were not covered by tests

/// Adds a dependency to the agent.
/// Dependencies are agents that this agent depends on.
pub fn add_dependency(&mut self, dependency: &str) {
self.dependencies.push(dependency.to_owned());
}

Check warning on line 83 in arbiter-engine/src/agent.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-engine/src/agent.rs#L81-L83

Added lines #L81 - L83 were not covered by tests

/// Adds a dependent to the agent.
/// Dependents are agents that depend on this agent.
pub fn add_dependent(&mut self, dependent: &str) {
self.dependents.push(dependent.to_owned());
}

Check warning on line 89 in arbiter-engine/src/agent.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-engine/src/agent.rs#L87-L89

Added lines #L87 - L89 were not covered by tests
}

#[cfg(test)]
Expand All @@ -86,31 +101,32 @@

use super::*;

#[ignore]
#[tokio::test]
async fn test_agent() {
// Startup
let environment = EnvironmentBuilder::new().build();
let client = RevmMiddleware::new(&environment, None).unwrap();
let arb = ArbiterToken::deploy(
client.clone(),
("Arbiter Token".to_string(), "ARB".to_string(), 18),
)
.unwrap()
.send()
.await
.unwrap();

// Build the agent
let mut agent = Agent::new("test");
let collector = LogCollector::new(client.clone(), arb.transfer_filter().filter);
agent.add_collector(collector);
let executor = MempoolExecutor::new(client.clone());
agent.add_executor(executor);

let tx = arb.mint(client.address(), U256::from(1)).tx;
let _submit_tx = SubmitTxToMempool {
tx,
gas_bid_info: None,
};

Check warning on line 130 in arbiter-engine/src/agent.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-engine/src/agent.rs#L105-L130

Added lines #L105 - L130 were not covered by tests
}
}
87 changes: 87 additions & 0 deletions arbiter-engine/src/examples.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
}

/// Used as an action to set new block number and timestamp.
#[derive(Clone, Debug, Deserialize, Serialize)]

Check warning on line 31 in arbiter-engine/src/examples.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-engine/src/examples.rs#L31

Added line #L31 was not covered by tests
pub struct NewBlock {
timestamp: u64,
number: u64,
Expand All @@ -37,12 +37,12 @@
// TODO: Consider replacing this with a cheatcode executor.
#[async_trait::async_trait]
impl Executor<NewBlock> for BlockExecutor {
async fn execute(&self, new_block: NewBlock) -> Result<()> {
let _receipt_data = self
.client
.update_block(new_block.number, new_block.timestamp)?;
Ok(())
}

Check warning on line 45 in arbiter-engine/src/examples.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-engine/src/examples.rs#L40-L45

Added lines #L40 - L45 were not covered by tests
}

// TODO: This may not be necessary in this way.
Expand All @@ -55,18 +55,18 @@

#[async_trait::async_trait]
impl Strategy<Message, NewBlock> for BlockAdmin {
async fn sync_state(&mut self) -> Result<()> {
Ok(())
}

Check warning on line 60 in arbiter-engine/src/examples.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-engine/src/examples.rs#L58-L60

Added lines #L58 - L60 were not covered by tests

async fn process_event(&mut self, event: Message) -> Vec<NewBlock> {
if event.to == self.id {
let new_block: NewBlock = serde_json::from_str(&event.data).unwrap();
vec![new_block]

Check warning on line 65 in arbiter-engine/src/examples.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-engine/src/examples.rs#L62-L65

Added lines #L62 - L65 were not covered by tests
} else {
vec![]

Check warning on line 67 in arbiter-engine/src/examples.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-engine/src/examples.rs#L67

Added line #L67 was not covered by tests
}
}

Check warning on line 69 in arbiter-engine/src/examples.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-engine/src/examples.rs#L69

Added line #L69 was not covered by tests
}

/// The token admin is responsible for handling token minting requests.
Expand All @@ -79,7 +79,7 @@
}

/// Used as an action to mint tokens.
#[derive(Clone, Debug, Deserialize, Serialize)]

Check warning on line 82 in arbiter-engine/src/examples.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-engine/src/examples.rs#L82

Added line #L82 was not covered by tests
pub struct TokenRequest {
/// The token to mint.
pub token: String,
Expand All @@ -93,23 +93,110 @@

#[async_trait::async_trait]
impl Strategy<Message, SubmitTxToMempool> for TokenAdmin {
async fn sync_state(&mut self) -> Result<()> {
Ok(())
}

Check warning on line 98 in arbiter-engine/src/examples.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-engine/src/examples.rs#L96-L98

Added lines #L96 - L98 were not covered by tests

async fn process_event(&mut self, event: Message) -> Vec<SubmitTxToMempool> {
if event.to == self.id {
let token_request: TokenRequest = serde_json::from_str(&event.data).unwrap();
let token = self.tokens.get(&token_request.token).unwrap();
let tx = SubmitTxToMempool {
tx: token
.mint(token_request.mint_to, U256::from(token_request.mint_amount))
.tx,
gas_bid_info: None,
};
vec![tx]

Check warning on line 110 in arbiter-engine/src/examples.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-engine/src/examples.rs#L100-L110

Added lines #L100 - L110 were not covered by tests
} else {
vec![]

Check warning on line 112 in arbiter-engine/src/examples.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-engine/src/examples.rs#L112

Added line #L112 was not covered by tests
}
}

Check warning on line 114 in arbiter-engine/src/examples.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-engine/src/examples.rs#L114

Added line #L114 was not covered by tests
}

#[cfg(test)]
mod tests {

use arbiter_core::{
environment::builder::EnvironmentBuilder, middleware::connection::Connection,
};
use ethers::providers::Provider;

use super::*;
use crate::{agent::Agent, messager::Messager, world::World};

struct TimedMessage {
delay: u64,
message: Message,
}

#[async_trait::async_trait]
impl Strategy<Message, Message> for TimedMessage {
#[tracing::instrument(skip(self), level = "trace")]
async fn sync_state(&mut self) -> Result<()> {
trace!("Syncing state.");
Ok(())
}

Check warning on line 139 in arbiter-engine/src/examples.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-engine/src/examples.rs#L135-L139

Added lines #L135 - L139 were not covered by tests

#[tracing::instrument(skip(self, event), level = "trace")]
async fn process_event(&mut self, event: Message) -> Vec<Message> {
trace!("Processing event.");
if event.to == self.message.to {
let message = Message {
from: "agent1".to_owned(),
to: "agent1".to_owned(),
data: "Hello, world!".to_owned(),
};
if event.data == "Start" {
vec![message]

Check warning on line 151 in arbiter-engine/src/examples.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-engine/src/examples.rs#L141-L151

Added lines #L141 - L151 were not covered by tests
} else {
tokio::time::sleep(std::time::Duration::from_secs(self.delay)).await;
vec![message]

Check warning on line 154 in arbiter-engine/src/examples.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-engine/src/examples.rs#L153-L154

Added lines #L153 - L154 were not covered by tests
}
} else {
vec![]

Check warning on line 157 in arbiter-engine/src/examples.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-engine/src/examples.rs#L157

Added line #L157 was not covered by tests
}
}

Check warning on line 159 in arbiter-engine/src/examples.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-engine/src/examples.rs#L159

Added line #L159 was not covered by tests
}

#[ignore]
#[tokio::test]
async fn base_simulation() {
let subscriber = tracing_subscriber::FmtSubscriber::builder()
.with_max_level(tracing::Level::TRACE) // Set the maximum level to TRACE
.finish();

let _guard = tracing::subscriber::set_default(subscriber);
let environment = EnvironmentBuilder::new().build();
let connection = Connection::from(&environment);
let provider = Provider::new(connection);
let mut world = World::new("test_world", provider);

let mut agent = Agent::new("agent1");
let messager = Messager::new();
agent.add_collector(messager.clone());
agent.add_executor(messager.clone());

let strategy = TimedMessage {
delay: 1,
message: Message {
from: "agent1".to_owned(),
to: "agent1".to_owned(),
data: "Hello, world!".to_owned(),
},
};
agent.add_strategy(strategy);

world.add_agent(agent);
let world_task = tokio::spawn(async move { world.run().await });

let message = Message {
from: "agent1".to_owned(),
to: "agent1".to_owned(),
data: "Start".to_owned(),
};
let send_result = messager.execute(message).await;

Check warning on line 198 in arbiter-engine/src/examples.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-engine/src/examples.rs#L163-L198

Added lines #L163 - L198 were not covered by tests

world_task.await.unwrap();

Check warning on line 200 in arbiter-engine/src/examples.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-engine/src/examples.rs#L200

Added line #L200 was not covered by tests
}
}
1 change: 1 addition & 0 deletions arbiter-engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::HashMap;

use anyhow::Result;
use serde::{Deserialize, Serialize};
use tracing::{debug, trace, warn};

pub mod agent;
pub mod examples;
Expand Down
33 changes: 14 additions & 19 deletions arbiter-engine/src/messager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@
//! The messager module contains the core messager layer for the Arbiter Engine.

use artemis_core::types::{Collector, CollectorStream, Executor};
use tokio::sync::broadcast::Sender;
use flume::{unbounded, Receiver, Sender};

use super::*;

/// A message that can be sent between agents.
#[derive(Clone, Debug, Deserialize, Serialize)]

Check warning on line 11 in arbiter-engine/src/messager.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-engine/src/messager.rs#L11

Added line #L11 was not covered by tests
pub struct Message {
/// The sender of the message.
pub from: String,
Expand All @@ -22,42 +22,37 @@
}

/// A messager that can be used to send messages between agents.
#[derive(Clone, Debug)]

Check warning on line 25 in arbiter-engine/src/messager.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-engine/src/messager.rs#L25

Added line #L25 was not covered by tests
pub struct Messager {
broadcaster: Sender<Message>,
sender: Sender<Message>,
receiver: Receiver<Message>,
}

impl Messager {
/// Creates a new messager with the given capacity.
pub fn new(capacity: usize) -> Self {
Self {
broadcaster: Sender::new(capacity),
}
}
}

impl Default for Messager {
fn default() -> Self {
Self::new(32)
#[allow(clippy::new_without_default)]
pub fn new() -> Self {
let (sender, receiver) = unbounded();
Self { sender, receiver }
}

Check warning on line 37 in arbiter-engine/src/messager.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-engine/src/messager.rs#L34-L37

Added lines #L34 - L37 were not covered by tests
}

#[async_trait::async_trait]
impl Collector<Message> for Messager {
#[tracing::instrument(skip(self), level = "debug", target = "messager")]
async fn get_event_stream(&self) -> Result<CollectorStream<'_, Message>> {
let mut subscription = self.broadcaster.subscribe();
let stream = async_stream::stream! {
while let Ok(message) = subscription.recv().await {
yield message;
}
};
debug!("Getting the event stream for the messager.");
let stream = self.receiver.clone().into_stream();
Ok(Box::pin(stream))
}

Check warning on line 47 in arbiter-engine/src/messager.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-engine/src/messager.rs#L42-L47

Added lines #L42 - L47 were not covered by tests
}

#[async_trait::async_trait]
impl Executor<Message> for Messager {
#[tracing::instrument(skip(self), level = "trace", target = "messager")]
async fn execute(&self, message: Message) -> Result<()> {
let _buf_len = self.broadcaster.send(message)?;
trace!("Broadcasting message.");
self.sender.send(message)?;
Ok(())
}

Check warning on line 57 in arbiter-engine/src/messager.rs

View check run for this annotation

Codecov / codecov/patch

arbiter-engine/src/messager.rs#L52-L57

Added lines #L52 - L57 were not covered by tests
}
Loading
Loading