Skip to content

Commit

Permalink
Add --extra-rpc to allow multiple RPCs for tx submission (#90)
Browse files Browse the repository at this point in the history
* Add --extra-rpcs flag to allow multiple RPCs for tx submission
* Tx submission will be retried by gateway until some given deadline in seconds
* add `ttl` to request parameters
* Update readme w Tx confirmation and TTL*
  • Loading branch information
jordy25519 authored Nov 11, 2024
1 parent 32b31af commit 99e84e8
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 21 deletions.
38 changes: 35 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Self hosted API gateway to easily interact with Drift V2 Protocol
- [Delegated Signing Mode](#delegated-signing-mode)
- [Sub-account Switching](#sub-account-switching)
- [Emulation Mode](#emulation-mode)
- [Transaction Confirmation](#transaction-confirmtaion-and-ttl)
- [CU price/limits](#cu-price--limits)
3. [API Examples](#api-examples)
- [HTTP API](#http-api)
Expand Down Expand Up @@ -49,7 +50,9 @@ docker run \
-e DRIFT_GATEWAY_KEY=<BASE58_SEED> \
-p 8080:8080 \
--platform linux/x86_64 \
ghcr.io/drift-labs/gateway https://rpc-provider.example.com --host 0.0.0.0 --markets wbtc,drift
ghcr.io/drift-labs/gateway https://rpc-provider.example.com --host 0.0.0.0 \
--markets wbtc,drift
--extra-rpcs https://api.mainnet-beta.solana.com
```

Build the Docker image:
Expand Down Expand Up @@ -145,6 +148,7 @@ Options:
default sub_account_id to use (default: 0)
--skip-tx-preflight
skip tx preflight checks
--extra-rpc extra solana RPC urls for improved Tx broadcast
--verbose enable debug logging
--help display usage information
```
Expand All @@ -168,7 +172,7 @@ e.g `http://<gateway>/v1/orders?subAccountId=3` will return orders for the walle

## Emulation Mode

Passing the `--emulate <EMULATED_PUBBKEY>` flag will instruct the gateway to run in read-only mode.
Passing the `--emulate <EMULATED_PUBKEY>` flag will instruct the gateway to run in read-only mode.

The gateway will receive all events, positions, etc. as normal but be unable to send transactions.

Expand All @@ -194,6 +198,34 @@ $ curl 'localhost:8080/v2/orders?computeUnitLimit=300000&computeUnitPrice=1000'
-d # { order data ...}
```

## Transaction Confirmation and TTLs

Gateway endpoints that place network transactions will return the signature as a base64 string.
User's can poll `transactionEvent` to confirm success by signature or watch Ws events for e.g. confirmation by order Ids instead.

Gateway will resubmit txs until they are either confirmed by the network or timeout.
This allows gateway txs to have a higher chance of confirmation during busy network periods.
setting `?ttl=<TIMEOUT_IN_SECS>` on a request determines how long gateway will resubmit txs for, (default: 4s/~10 slots).
e.g. `ttl?=2` means that the tx will be rebroadcast over the next 5 slots (5 * 400ms).

⚠️ users should take care to set either `max_order` ts or use atomic place/cancel/modify requests to prevent
double orders or orders being accepted later than intended.

improving tx confirmation rates will require trial and error, try adjusting tx TTL and following parameters until
results are meet requirements:
- set `--extra-rpcs=<RPC_1>,<RPC_2>` to broadcast tx to multiple nodes
- set `--skip-tx-preflight` to disable preflight RPC checks
- setting a longer `ttl` per request
- set statically higher CU prices per request (see previous section) when no ack rates increase

**example request**

```bash
$ curl 'localhost:8080/v2/orders?ttl=2' -X POST \
-H 'content-type: application/json' \
-d # { order data ...}
```

## API Examples

Please refer to https://drift-labs.github.io/v2-teacher/ for further examples and reference documentation on various types, fields, and operations available on drift.
Expand Down Expand Up @@ -819,7 +851,7 @@ Use the UI or Ts/Python sdk to initialize the sub-account first.

#### `429`s / gateway hitting RPC rate limits

this can occur during gateway startup as drift market data is pulled from the network and subscriptions are intialized.
this can occur during gateway startup as drift market data is pulled from the network and subscriptions are initialized.
try setting `INIT_RPC_THROTTLE=2` for e.g. 2s or longer, this allows some time between request bursts on start up.

The free \_api.mainnet-beta.solana.com_ RPC support is limited due to rate-limits
Expand Down
96 changes: 80 additions & 16 deletions src/controller.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
use std::{borrow::Cow, collections::HashSet, str::FromStr, sync::Arc, time::Duration};
use std::{
borrow::Cow,
collections::HashSet,
str::FromStr,
sync::Arc,
time::{Duration, SystemTime},
};

use drift_rs::{
constants::ProgramData,
Expand All @@ -17,6 +23,7 @@ use drift_rs::{
self, accounts::SpotMarket, MarketId, MarketType, ModifyOrderParams, OrderStatus,
RpcSendTransactionConfig, SdkError, SdkResult, VersionedMessage,
},
utils::get_http_url,
DriftClient, Pubkey, TransactionBuilder, Wallet,
};
use futures_util::{stream::FuturesUnordered, StreamExt};
Expand All @@ -39,6 +46,11 @@ use crate::{
Context, LOG_TARGET,
};

/// Default TTL in seconds of gateway tx retry
/// afterwhich gateway will no longer resubmit or monitor the tx
// ~10 slots
const DEFAULT_TX_TTL: u16 = 4;

pub type GatewayResult<T> = Result<T, ControllerError>;

#[derive(Error, Debug)]
Expand Down Expand Up @@ -66,6 +78,8 @@ pub struct AppState {
/// skip tx preflight on send or not (default: false)
skip_tx_preflight: bool,
priority_fee_subscriber: Arc<PriorityFeeSubscriber>,
/// list of additional RPC endpoints for tx broadcast
extra_rpcs: Vec<Arc<RpcClient>>,
}

impl AppState {
Expand Down Expand Up @@ -93,13 +107,15 @@ impl AppState {
/// * `commitment` - Slot finalisation/commitement levels
/// * `default_subaccount_id` - by default all queries will use this subaccount
/// * `skip_tx_preflight` - submit txs without checking preflight results
/// * `extra_rpcs` - list of additional RPC endpoints for tx submission
pub async fn new(
endpoint: &str,
devnet: bool,
wallet: Wallet,
commitment: Option<(CommitmentConfig, CommitmentConfig)>,
default_subaccount_id: Option<u16>,
skip_tx_preflight: bool,
extra_rpcs: Vec<&str>,
) -> Self {
let (state_commitment, tx_commitment) =
commitment.unwrap_or((CommitmentConfig::confirmed(), CommitmentConfig::confirmed()));
Expand Down Expand Up @@ -152,6 +168,10 @@ impl AppState {
skip_tx_preflight,
priority_fee_subscriber,
wallet,
extra_rpcs: extra_rpcs
.into_iter()
.map(|u| Arc::new(RpcClient::new(get_http_url(u).expect("valid RPC url"))))
.collect(),
}
}

Expand Down Expand Up @@ -232,7 +252,7 @@ impl AppState {
)
.with_priority_fee(priority_fee, ctx.cu_limit);
let tx = build_cancel_ix(builder, req)?.build();
self.send_tx(tx, "cancel_orders").await
self.send_tx(tx, "cancel_orders", ctx.ttl).await
}

/// Return position for market if given, otherwise return all positions
Expand Down Expand Up @@ -461,7 +481,7 @@ impl AppState {
.place_orders(orders)
.build();

self.send_tx(tx, "cancel_and_place").await
self.send_tx(tx, "cancel_and_place", ctx.ttl).await
}

pub async fn place_orders(
Expand Down Expand Up @@ -493,7 +513,7 @@ impl AppState {
.place_orders(orders)
.build();

self.send_tx(tx, "place_orders").await
self.send_tx(tx, "place_orders", ctx.ttl).await
}

pub async fn modify_orders(
Expand All @@ -512,7 +532,7 @@ impl AppState {
)
.with_priority_fee(ctx.cu_price.unwrap_or(pf), ctx.cu_limit);
let tx = build_modify_ix(builder, req, self.client.program_data())?.build();
self.send_tx(tx, "modify_orders").await
self.send_tx(tx, "modify_orders", ctx.ttl).await
}

pub async fn get_tx_events_for_subaccount_id(
Expand Down Expand Up @@ -589,6 +609,7 @@ impl AppState {
&self,
tx: VersionedMessage,
reason: &'static str,
ttl: Option<u16>,
) -> GatewayResult<TxResponse> {
let recent_block_hash = self.client.get_latest_blockhash().await?;
let tx = self.wallet.sign_tx(tx, recent_block_hash)?;
Expand All @@ -598,34 +619,77 @@ impl AppState {
skip_preflight: self.skip_tx_preflight,
..Default::default()
};
let result = self

// submit to primary RPC first,
let sig = self
.client
.inner()
.send_transaction_with_config(&tx, tx_config)
.await
.map(|s| {
.inspect(|s| {
debug!(target: LOG_TARGET, "sent tx ({reason}): {s}");
TxResponse::new(s.to_string())
})
.map_err(|err| {
warn!(target: LOG_TARGET, "sending tx ({reason}) failed: {err:?}");
// tx has some program/logic error, retry won't fix
handle_tx_err(err.into())
})?;

// double send the tx to help chances of landing
let client = Arc::clone(&self.client);
// start a dedicated tx sending task
// - tx is broadcast to all available RPCs
// - retried at set intervals
// - retried upto some given deadline
// client should poll for the tx to confirm success
let primary_rpc = Arc::clone(&self.client);
let tx_signature = sig;
let extra_rpcs = self.extra_rpcs.clone();
tokio::spawn(async move {
if let Err(err) = client
.inner()
.send_transaction_with_config(&tx, tx_config)
.await
let start = SystemTime::now();
let ttl = Duration::from_secs(ttl.unwrap_or(DEFAULT_TX_TTL) as u64);
let mut confirmed = false;
while SystemTime::now()
.duration_since(start)
.is_ok_and(|x| x < ttl)
{
warn!(target: LOG_TARGET, "retry tx failed: {err:?}");
let mut futs = FuturesUnordered::new();
for rpc in extra_rpcs.iter() {
futs.push(rpc.send_transaction_with_config(&tx, tx_config));
}
futs.push(
primary_rpc
.inner()
.send_transaction_with_config(&tx, tx_config),
);

while let Some(res) = futs.next().await {
match res {
Ok(sig) => {
debug!(target: LOG_TARGET, "sent tx ({reason}): {sig}");
}
Err(err) => {
warn!(target: LOG_TARGET, "sending tx ({reason}) failed: {err:?}");
}
}
}

tokio::time::sleep(Duration::from_millis(400)).await;

if let Ok(Some(Ok(()))) = primary_rpc
.inner()
.get_signature_status(&tx_signature)
.await
{
confirmed = true;
info!(target: LOG_TARGET, "tx confirmed onchain: {tx_signature:?}");
break;
}
}
if !confirmed {
warn!(target: LOG_TARGET, "tx was not confirmed: {tx_signature:?}");
}
});

Ok(result)
Ok(TxResponse::new(sig.to_string()))
}
}

Expand Down
11 changes: 9 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ struct Context {
pub cu_limit: Option<u32>,
#[serde(default, rename = "computeUnitPrice")]
pub cu_price: Option<u64>,
/// Tx retry TTL
#[serde(default, rename = "ttl")]
pub ttl: Option<u16>,
}

#[get("/markets")]
Expand Down Expand Up @@ -240,6 +243,7 @@ async fn main() -> std::io::Result<()> {
Some((state_commitment, tx_commitment)),
Some(config.default_sub_account_id),
config.skip_tx_preflight,
config.extra_rpcs.split(",").collect(),
)
.await;

Expand Down Expand Up @@ -409,6 +413,9 @@ struct GatewayConfig {
/// skip tx preflight checks
#[argh(switch)]
skip_tx_preflight: bool,
/// extra solana RPC urls for improved Tx broadcast
#[argh(option)]
extra_rpcs: String,
/// enable debug logging
#[argh(switch)]
verbose: bool,
Expand Down Expand Up @@ -451,7 +458,7 @@ mod tests {
};
let rpc_endpoint = std::env::var("TEST_RPC_ENDPOINT")
.unwrap_or_else(|_| "https://api.devnet.solana.com".to_string());
AppState::new(&rpc_endpoint, true, wallet, None, None, false).await
AppState::new(&rpc_endpoint, true, wallet, None, None, false, vec![]).await
}

// likely safe to ignore during development, mainy regression tests for CI
Expand All @@ -472,7 +479,7 @@ mod tests {

let rpc_endpoint = std::env::var("TEST_RPC_ENDPOINT")
.unwrap_or_else(|_| "https://api.devnet.solana.com".to_string());
let state = AppState::new(&rpc_endpoint, true, wallet, None, None, false).await;
let state = AppState::new(&rpc_endpoint, true, wallet, None, None, false, vec![]).await;

let app = test::init_service(
App::new()
Expand Down

0 comments on commit 99e84e8

Please sign in to comment.