Skip to content

Commit

Permalink
Merge pull request #41 from Overmuse/SR/actors
Browse files Browse the repository at this point in the history
refactor order_sender into actor
  • Loading branch information
SebRollen authored Jul 20, 2021
2 parents bd497aa + aaf4783 commit 8ec16cb
Show file tree
Hide file tree
Showing 10 changed files with 55 additions and 27 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
[package]
name = "order-manager"
version = "7.0.0"
version = "7.0.1"
authors = ["Sebastian Rollen <seb@overmu.se>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
alpaca = {git = "ssh://git@github.com/Overmuse/alpaca.git", tag = "v0.7.0", default-features = false, features = ["rest", "ws"]}
alpaca = {git = "ssh://git@github.com/Overmuse/alpaca.git", tag = "v0.7.1", default-features = false, features = ["rest", "ws"]}
anyhow = "1.0.39"
chrono = "0.4.19"
config = "0.10.1"
Expand Down
11 changes: 11 additions & 0 deletions src/db/positions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,14 @@ pub async fn get_positions_by_ticker(
.map(TryInto::try_into)
.collect()
}

#[tracing::instrument(skip(client))]
pub async fn get_positions(client: Arc<Client>) -> Result<Vec<Position>, Error> {
trace!("Getting positions");
client
.query("SELECT * FROM allocations", &[])
.await?
.into_iter()
.map(TryInto::try_into)
.collect()
}
12 changes: 5 additions & 7 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ use tracing::error;

mod db;
mod intent_scheduler;
pub mod manager;
pub mod order_manager;
mod order_sender;
mod settings;
pub mod types;
mod webserver;

use crate::order_manager::OrderManager;
use intent_scheduler::IntentScheduler;
use manager::OrderManager;
use order_sender::OrderSender;
use order_sender::OrderSenderHandle;
pub use settings::Settings;

mod embedded {
Expand All @@ -26,10 +26,9 @@ mod embedded {
pub async fn run(settings: Settings) -> Result<()> {
let consumer = consumer(&settings.kafka).context("Failed to create kafka consumer")?;
let producer = producer(&settings.kafka).context("Failed to create kafka producer")?;
let (order_tx, order_rx) = unbounded_channel();
let (scheduled_intents_tx1, scheduled_intents_rx1) = unbounded_channel();
let (scheduled_intents_tx2, scheduled_intents_rx2) = unbounded_channel();
let order_sender = OrderSender::new(producer, order_rx);
let order_sender_handle = OrderSenderHandle::new(producer);
let intent_scheduler = IntentScheduler::new(scheduled_intents_tx1, scheduled_intents_rx2);
let (mut client, connection) = connect(
&format!("{}/{}", settings.database.url, settings.database.name,),
Expand All @@ -49,13 +48,12 @@ pub async fn run(settings: Settings) -> Result<()> {
consumer,
scheduled_intents_tx2,
scheduled_intents_rx1,
order_tx,
order_sender_handle,
client.clone(),
);
tokio::join!(
webserver::run(settings.webserver.port, client),
order_manager.run(),
order_sender.run(),
intent_scheduler.run()
);
Ok(())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ impl OrderManager {
for order in orders {
self.order_sender
.send(order)
.await
.context("Failed to send dependent-order to OrderSender")?
}
}
Expand Down
File renamed without changes.
3 changes: 3 additions & 0 deletions src/manager/intents.rs → src/order_manager/intents.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ impl OrderManager {
Ok(self
.order_sender
.send(sent)
.await
.context("Failed to send order")?)
}
(Some(claim), Some(sent), Some(saved)) => {
Expand Down Expand Up @@ -159,6 +160,7 @@ impl OrderManager {
Ok(self
.order_sender
.send(sent)
.await
.context("Failed to send order")?)
}
_ => unreachable!(),
Expand Down Expand Up @@ -234,6 +236,7 @@ impl OrderManager {
Ok(self
.order_sender
.send(order_intent)
.await
.context("Failed to send order")?)
} else {
Err(anyhow!("Can't close position of house account"))
Expand Down
6 changes: 3 additions & 3 deletions src/manager/mod.rs → src/order_manager/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::db;
use alpaca::orders::OrderIntent;
use crate::OrderSenderHandle;
use alpaca::AlpacaMessage;
use anyhow::{Context, Result};
use rdkafka::consumer::StreamConsumer;
Expand All @@ -20,7 +20,7 @@ pub struct OrderManager {
kafka_consumer: StreamConsumer,
scheduler_sender: UnboundedSender<PositionIntent>,
scheduler_receiver: UnboundedReceiver<PositionIntent>,
order_sender: UnboundedSender<OrderIntent>,
order_sender: OrderSenderHandle,
db_client: Arc<Client>,
}

Expand All @@ -29,7 +29,7 @@ impl OrderManager {
kafka_consumer: StreamConsumer,
scheduler_sender: UnboundedSender<PositionIntent>,
scheduler_receiver: UnboundedReceiver<PositionIntent>,
order_sender: UnboundedSender<OrderIntent>,
order_sender: OrderSenderHandle,
db_client: Arc<Client>,
) -> Self {
Self {
Expand Down
File renamed without changes.
39 changes: 27 additions & 12 deletions src/order_sender.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,32 @@
use alpaca::orders::OrderIntent;
use anyhow::Result;
use rdkafka::producer::{FutureProducer, FutureRecord};
use std::time::Duration;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc;
use tracing::{error, info};

pub struct OrderSender {
struct OrderSender {
producer: FutureProducer,
order_queue: UnboundedReceiver<OrderIntent>,
receiver: mpsc::Receiver<OrderIntent>,
}

impl OrderSender {
pub fn new(producer: FutureProducer, order_queue: UnboundedReceiver<OrderIntent>) -> Self {
Self {
producer,
order_queue,
}
fn new(producer: FutureProducer, receiver: mpsc::Receiver<OrderIntent>) -> Self {
Self { producer, receiver }
}

#[tracing::instrument(skip(self))]
pub async fn run(mut self) {
pub async fn run(&mut self) {
info!("Starting OrderSender");
let producer = self.producer;
while let Some(oi) = self.order_queue.recv().await {
while let Some(oi) = self.receiver.recv().await {
info!("Sending order_intent {:?}", oi);
let payload = serde_json::to_string(&oi);
match payload {
Ok(payload) => {
let record = FutureRecord::to("order-intents")
.key(&oi.symbol)
.payload(&payload);
let send = producer.send(record, Duration::from_secs(0)).await;
let send = self.producer.send(record, Duration::from_secs(0)).await;
if let Err((e, m)) = send {
error!("Error: {:?}\nMessage: {:?}", e, m)
}
Expand All @@ -42,3 +39,21 @@ impl OrderSender {
info!("Ending OrderSender");
}
}

pub struct OrderSenderHandle {
sender: mpsc::Sender<OrderIntent>,
}

impl OrderSenderHandle {
pub fn new(producer: FutureProducer) -> Self {
let (sender, receiver) = mpsc::channel(8);
let mut actor = OrderSender::new(producer, receiver);
tokio::spawn(async move { actor.run().await });
Self { sender }
}

pub async fn send(&self, msg: OrderIntent) -> Result<()> {
self.sender.send(msg).await?;
Ok(())
}
}

0 comments on commit 8ec16cb

Please sign in to comment.