Skip to content

Commit

Permalink
feat: error! output from behavior (#913)
Browse files Browse the repository at this point in the history
* feat: `error!` output from behavior
* submodule: update `template`
* fix: `minter` example
* fix: clippy
  • Loading branch information
Autoparallel authored Feb 26, 2024
1 parent 509b624 commit 8211c6e
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 9 deletions.
26 changes: 22 additions & 4 deletions engine/src/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use anyhow::Result;
use arbiter_core::middleware::ArbiterMiddleware;
use futures_util::{Stream, StreamExt};
use tokio::task::JoinHandle;
use tracing::error;

use super::*;

Expand Down Expand Up @@ -216,23 +217,40 @@ where
async fn execute(&mut self, instruction: MachineInstruction) -> Result<()> {
// NOTE: The unwraps here are safe because the `Behavior` in an engine is only
// accessed here and it is private.
let id: Option<String>;
match instruction {
MachineInstruction::Start(client, messager) => {
id = messager.id.clone();
let id_clone = id.clone();
self.state = State::Starting;
let mut behavior = self.behavior.take().unwrap();
let behavior_task: JoinHandle<Result<(Option<EventStream<E>>, B)>> =
tokio::spawn(async move {
let id = messager.id.clone();
let stream = behavior.startup(client, messager).await?;
debug!("startup complete for behavior {:?}", id);
let stream = match behavior.startup(client, messager).await {
Ok(stream) => stream,
Err(e) => {
error!(
"startup failed for behavior {:?}: \n reason: {:?}",
id_clone, e
);
// Throw a panic as we cannot recover from this for now.
panic!();
}
};
debug!("startup complete for behavior {:?}", id_clone);
Ok((stream, behavior))
});
let (stream, behavior) = behavior_task.await??;
match stream {
Some(stream) => {
self.event_stream = Some(stream);
self.behavior = Some(behavior);
self.execute(MachineInstruction::Process).await?;
match self.execute(MachineInstruction::Process).await {
Ok(_) => {}
Err(e) => {
error!("process failed for behavior {:?}: \n reason: {:?}", id, e);
}
}
Ok(())
}
None => {
Expand Down
4 changes: 2 additions & 2 deletions examples/minter/behaviors/token_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl Behavior<Message> for TokenAdmin {
&mut self,
client: Arc<ArbiterMiddleware>,
messager: Messager,
) -> Result<EventStream<Message>> {
) -> Result<Option<EventStream<Message>>> {
self.messager = Some(messager.clone());
self.client = Some(client.clone());
for token_data in self.token_data.values_mut() {
Expand All @@ -74,7 +74,7 @@ impl Behavior<Message> for TokenAdmin {
.get_or_insert_with(HashMap::new)
.insert(token_data.name.clone(), token.clone());
}
Ok(messager.stream()?)
Ok(Some(messager.stream()?))
}

#[tracing::instrument(skip(self), fields(id =
Expand Down
4 changes: 2 additions & 2 deletions examples/minter/behaviors/token_requester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ impl Behavior<TransferFilter> for TokenRequester {
&mut self,
client: Arc<ArbiterMiddleware>,
mut messager: Messager,
) -> Result<EventStream<TransferFilter>> {
) -> Result<Option<EventStream<TransferFilter>>> {
messager
.send(
To::Agent(self.request_to.clone()),
Expand All @@ -59,7 +59,7 @@ impl Behavior<TransferFilter> for TokenRequester {
self.messager = Some(messager.clone());
self.client = Some(client.clone());
let transfer_stream = stream_event(token.transfer_filter());
Ok(transfer_stream)
Ok(Some(transfer_stream))
}

#[tracing::instrument(skip(self), fields(id =
Expand Down
2 changes: 1 addition & 1 deletion examples/template

0 comments on commit 8211c6e

Please sign in to comment.