Skip to content

Commit

Permalink
Add orderbook streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
jordy25519 committed Dec 12, 2023
1 parent eb066a7 commit a9de8c9
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 12 deletions.
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ actix-web = "4.4.0"
argh = "0.1.12"
drift-sdk = { package = "drift-sdk", git = "https://github.com/circuit-research/protocol-v2", branch = "cargo-add-sdk" }
env_logger = "0.10.1"
futures-util = "0.3.29"
log = "0.4.20"
serde = { version = "1.0.193", features = ["derive"] }
thiserror = "1.0.38"
serde_json = "*"
thiserror = "1.0.38"
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ Options:
--help display usage information
```

## Example
## Examples

Place Orders
```bash
~: curl localhost:8080/v2/orders -X POST -H 'content-type: application/json' -d '{
"orders": [{
Expand All @@ -48,4 +49,9 @@ Options:
"orderType": "limit"
}]
}'
```

Stream Orderbook
```
curl localhost:8080/v2/orderbooks -N -X GET -H 'content-type: application/json' -d '{"market":{"id":3,"type":"perp"}}'
```
68 changes: 62 additions & 6 deletions src/controller.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
use std::task::Poll;

use actix_web::{web::Bytes, Error};
use drift_sdk::{
dlob::{DLOBClient, OrderbookStream},
types::{Context, MarketType, SdkError},
DriftClient, Pubkey, TransactionBuilder, Wallet,
};
use futures_util::{Stream, StreamExt};
use log::error;
use thiserror::Error;

use crate::types::{
AllMarketsResponse, CancelOrdersRequest, GetOrdersRequest, GetOrdersResponse,
GetPositionsRequest, GetPositionsResponse, PlaceOrdersRequest,
AllMarketsResponse, CancelOrdersRequest, GetOrderbookRequest, GetOrdersRequest,
GetOrdersResponse, GetPositionsRequest, GetPositionsResponse, PlaceOrdersRequest,
};

#[derive(Error, Debug)]
Expand All @@ -20,6 +25,7 @@ pub enum ControllerError {
pub struct AppState {
wallet: Wallet,
client: DriftClient,
dlob_client: DLOBClient,
}

impl AppState {
Expand All @@ -46,21 +52,40 @@ impl AppState {
.subscribe_account(wallet.user())
.await
.expect("cache on");
Self { wallet, client }

let dlob_endpoint = if devnet {
"https://master.dlob.drift.trade"
} else {
"https://dlob.drift.trade"
};
Self {
wallet,
client,
dlob_client: DLOBClient::new(dlob_endpoint),
}
}

/// Cancel orders
///
/// There are 3 intended scenarios for cancellation, in order of priority:
/// There are 4 intended scenarios for cancellation, in order of priority:
/// 1) "market" is set, cancel all orders in the market
/// 2) ids are given, cancel all orders by id
/// 3) catch all. cancel all orders
/// 2) "user ids" are set, cancel all orders by user assigned id
/// 3) ids are given, cancel all orders by id (global, exchange assigned id)
/// 4) catch all. cancel all orders
pub async fn cancel_orders(&self, req: CancelOrdersRequest) -> Result<String, ControllerError> {
let user_data = self.client.get_account_data(self.user()).await?;
let builder = TransactionBuilder::new(&self.wallet, &user_data);

let tx = if let Some(market) = req.market {
builder.cancel_orders((market.id, market.market_type).into(), None)
} else if !req.user_ids.is_empty() {
let order_ids = user_data
.orders
.iter()
.filter(|o| o.slot > 0 && req.user_ids.contains(&o.user_order_id))
.map(|o| o.order_id)
.collect();
builder.cancel_orders_by_id(order_ids)
} else if !req.ids.is_empty() {
builder.cancel_orders_by_id(req.ids)
} else {
Expand Down Expand Up @@ -149,4 +174,35 @@ impl AppState {

Ok(signature.to_string())
}

pub fn stream_orderbook(&self, req: GetOrderbookRequest) -> DlobStream {
let stream = self
.dlob_client
.subscribe(req.market.as_market_id(), Some(1)); // poll book at 1s interval
DlobStream(stream)
}
}

/// Provides JSON serialized orderbook snapshots
pub struct DlobStream(OrderbookStream);
impl Stream for DlobStream {
type Item = Result<Bytes, Error>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
match self.0.poll_next_unpin(cx) {
std::task::Poll::Pending => std::task::Poll::Pending,
std::task::Poll::Ready(result) => {
let result = result.unwrap();
if let Err(err) = result {
error!("orderbook stream: {err:?}");
return Poll::Ready(None);
}

let msg = serde_json::to_vec(&result.unwrap()).unwrap();
std::task::Poll::Ready(Some(Ok(msg.into())))
}
}
}
}
18 changes: 16 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ use argh::FromArgs;
use log::{error, info};

use controller::AppState;
use types::{CancelOrdersRequest, GetOrdersRequest, GetPositionsRequest, PlaceOrdersRequest};
use types::{
CancelOrdersRequest, GetOrderbookRequest, GetOrdersRequest, GetPositionsRequest,
PlaceOrdersRequest,
};

mod controller;
mod types;
Expand Down Expand Up @@ -74,6 +77,16 @@ async fn get_positions(
}
}

#[get("/orderbooks")]
async fn get_orderbooks(
controller: web::Data<AppState>,
req: Json<GetOrderbookRequest>,
) -> impl Responder {
let dlob = controller.stream_orderbook(req.0);
// there's no graceful shutdown for the stream: https://github.com/actix/actix-web/issues/1313
HttpResponse::Ok().streaming(dlob)
}

#[actix_web::main]
async fn main() -> std::io::Result<()> {
let config: GatewayConfig = argh::from_env();
Expand All @@ -100,7 +113,8 @@ async fn main() -> std::io::Result<()> {
.service(get_positions)
.service(get_orders)
.service(create_orders)
.service(cancel_orders),
.service(cancel_orders)
.service(get_orderbooks),
)
})
.bind((config.host, config.port))?
Expand Down
17 changes: 17 additions & 0 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ pub struct PlaceOrder {
market_type: sdk_types::MarketType,
amount: i64,
price: u64,
#[serde(default)]
user_order_id: u8,
#[serde(serialize_with = "order_type_ser", deserialize_with = "order_type_de")]
order_type: sdk_types::OrderType,
#[serde(default)]
Expand Down Expand Up @@ -173,6 +175,7 @@ impl From<PlaceOrder> for sdk_types::OrderParams {
} else {
PostOnlyParam::None
},
user_order_id: value.user_order_id,
..Default::default()
}
}
Expand All @@ -191,6 +194,12 @@ pub struct Market {
pub market_type: MarketType,
}

impl Market {
pub fn as_market_id(self) -> drift_sdk::types::MarketId {
unsafe { std::mem::transmute(self) }
}
}

#[derive(Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct GetPositionsRequest {
Expand Down Expand Up @@ -254,4 +263,12 @@ pub struct CancelOrdersRequest {
pub market: Option<Market>,
/// order Ids to cancel
pub ids: Vec<u32>,
/// user assigned order Ids to cancel
pub user_ids: Vec<u8>,
}

#[derive(Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct GetOrderbookRequest {
pub market: Market,
}

0 comments on commit a9de8c9

Please sign in to comment.