diff --git a/README.md b/README.md index 8d63a00..5ddb820 100644 --- a/README.md +++ b/README.md @@ -3,41 +3,42 @@ Self hosted API gateway to easily interact with Drift V2 Protocol ## Table of Contents + 1. [Build & Run](#build--run) - - [From Source](#from-source) - - [From Docker](#from-docker) + - [From Source](#from-source) + - [From Docker](#from-docker) 2. [Usage](#usage) - - [Environment Variables](#environment-variables) - - [Delegated Signing Mode](#delegated-signing-mode) - - [Sub-account Switching](#sub-account-switching) - - [Emulation Mode](#emulation-mode) - - [Transaction Confirmation](#transaction-confirmtaion-and-ttl) - - [CU price/limits](#cu-price--limits) + - [Environment Variables](#environment-variables) + - [Delegated Signing Mode](#delegated-signing-mode) + - [Sub-account Switching](#sub-account-switching) + - [Emulation Mode](#emulation-mode) + - [Transaction Confirmation](#transaction-confirmtaion-and-ttl) + - [CU price/limits](#cu-price--limits) 3. [API Examples](#api-examples) - - [HTTP API](#http-api) - - [`GET` Market Info](#get-market-info) - - [`GET` Orderbook](#get-orderbook) - - [`GET` Orders](#get-orders) - - [`GET` Positions](#get-positions) - - [`GET` Perp Position Info](#get-position-info-perps-only) - - [`GET` Transaction Events](#get-transaction-events) - - [`GET` SOL Balance](#get-sol-balance) - - [`GET` Authority](#get-authority) - - [`GET` Margin Info](#get-margin-info) - - [`GET` Leverage](#get-leverage) - - [`POST` Leverage](#set-leverage) - - [`GET` Collateral](#get-collateral) - - [`POST` Place Orders](#place-orders) - - [`PATCH` Modify Orders](#modify-orders) - - [`DELETE` Cancel Orders](#cancel-orders) - - [`POST` Swap](#swap-orders) - - [`POST` Titan Swap](#titan-swap-orders) - - [`PUT` Atomic Cancel/Modify/Place Orders](#atomic-cancelmodifyplace-orders) - - [Websocket API](#websocket-api) - - [Subscribing](#subscribing) - - [Event Payloads](#event-payloads) - 4. [Errors](#errors) - 5. [FAQ](#faq) + - [HTTP API](#http-api) + - [`GET` Market Info](#get-market-info) + - [`GET` Orderbook](#get-orderbook) + - [`GET` Orders](#get-orders) + - [`GET` Positions](#get-positions) + - [`GET` Perp Position Info](#get-position-info-perps-only) + - [`GET` Transaction Events](#get-transaction-events) + - [`GET` SOL Balance](#get-sol-balance) + - [`GET` Authority](#get-authority) + - [`GET` Margin Info](#get-margin-info) + - [`GET` Leverage](#get-leverage) + - [`POST` Leverage](#set-leverage) + - [`GET` Collateral](#get-collateral) + - [`POST` Place Orders](#place-orders) + - [`PATCH` Modify Orders](#modify-orders) + - [`DELETE` Cancel Orders](#cancel-orders) + - [`POST` Swap](#swap-orders) + - [`POST` Titan Swap](#titan-swap-orders) + - [`PUT` Atomic Cancel/Modify/Place Orders](#atomic-cancelmodifyplace-orders) + - [Websocket API](#websocket-api) + - [Subscribing](#subscribing) + - [Event Payloads](#event-payloads) +4. [Errors](#errors) +5. [FAQ](#faq) ## Build & Run @@ -46,6 +47,7 @@ Self hosted API gateway to easily interact with Drift V2 Protocol ### From Docker Use prebuilt image, ghcr.io: + ```bash # authenticate to github container registry docker login -u -P @@ -95,6 +97,7 @@ ldconfig ``` Run: + ```bash # configure the gateway signing key export DRIFT_GATEWAY_KEY= @@ -109,7 +112,9 @@ drift-gateway https://rpc-provider.example.com --markets sol-perp,sol,weth ``` ## gRPC mode + Run gateway subscriptions with geyser gRPC updates + ```bash export GRPC_HOST="grpc.example.com" export GRPC_X_TOKEN="aabbccddeeff112233" @@ -124,16 +129,16 @@ drift-gateway https://rpc-provider.example.com --grpc These runtime environment variables are required: -| Variable | Description | Example Value | -|---------------------|-------------------------------------------|------------------------------| -| `DRIFT_GATEWAY_KEY` | Path to your key file or seed in Base58. Transactions will be signed with this keypair | `` or `seedBase58` | -| `GRPC_HOST` | endpoint for gRPC subscription mode | `https://grpc.example.com` -| `GRPC_X_TOKEN` | authentication token for gRPC subscription mode | `aabbccddeeff112233` -| `INIT_RPC_THROTTLE` | Adds a delay (seconds) between RPC bursts during gateway startup. Useful to avoid 429/rate-limit errors. Can be set to `0`, if RPC node is highspec | `1` | -| `JUPITER_API_KEY` | **Required for `/v2/swap`**. Jupiter API key for swap operations. Get a free key at [portal.jup.ag](https://portal.jup.ag). See [migration guide](https://dev.jup.ag/portal/migrate-from-lite-api) | `your-jupiter-api-key` | -| `JUPITER_API_URL` | (Optional) Override Jupiter API base URL | `https://api.jup.ag/swap/v1` | -| `TITAN_AUTH_TOKEN` | **Required for `/v2/titan-swap`**. Authentication token for Titan API | `your-titan-auth-token` | -| `TITAN_BASE_URL` | (Optional) Titan API base URL | `https://api.titan.exchange` | +| Variable | Description | Example Value | +| ------------------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------------------------------------- | +| `DRIFT_GATEWAY_KEY` | Path to your key file or seed in Base58. Transactions will be signed with this keypair | `` or `seedBase58` | +| `GRPC_HOST` | endpoint for gRPC subscription mode | `https://grpc.example.com` | +| `GRPC_X_TOKEN` | authentication token for gRPC subscription mode | `aabbccddeeff112233` | +| `INIT_RPC_THROTTLE` | Adds a delay (seconds) between RPC bursts during gateway startup. Useful to avoid 429/rate-limit errors. Can be set to `0`, if RPC node is highspec | `1` | +| `JUPITER_API_KEY` | **Required for `/v2/swap`**. Jupiter API key for swap operations. Get a free key at [portal.jup.ag](https://portal.jup.ag). See [migration guide](https://dev.jup.ag/portal/migrate-from-lite-api) | `your-jupiter-api-key` | +| `JUPITER_API_URL` | (Optional) Override Jupiter API base URL | `https://api.jup.ag/swap/v1` | +| `TITAN_AUTH_TOKEN` | **Required for `/v2/titan-swap`**. Authentication token for Titan API | `your-titan-auth-token` | +| `TITAN_BASE_URL` | (Optional) Titan API base URL | `https://api.titan.exchange` | ```bash ./target/release/drift-gateway --help @@ -201,7 +206,7 @@ note therefore `DRIFT_GATEWAY_KEY` is not required to be set. **CU limit** may be set on transaction request with the query parameter `computeUnitLimit=300000`, the default if unset is `200000`. -**CU price** in micro-lamports may be set on transaction request with the query parameter `computeUnitPrice=1000`, the default if unset is a dynamic value from chain set at 90-th percentile of the local fee market. +**CU price** in micro-lamports may be set on transaction request with the query parameter `computeUnitPrice=1000`, the default if unset is a dynamic value from chain set at 90-th percentile of the local fee market. The following error is logged when a tx does not have enough CU limit, increasing the cu limit can fix it or reducing number complexity of the order e..g number of orders/markets per batch. @@ -220,18 +225,19 @@ $ curl 'localhost:8080/v2/orders?computeUnitLimit=300000&computeUnitPrice=1000' ## Transaction Confirmation and TTLs Gateway endpoints that place network transactions will return the signature as a base64 string. -User's can poll `transactionEvent` to confirm success by signature or watch Ws events for e.g. confirmation by order Ids instead. +User's can poll `transactionEvent` to confirm success by signature or watch Ws events for e.g. confirmation by order Ids instead. Gateway will resubmit txs until they are either confirmed by the network or timeout. This allows gateway txs to have a higher chance of confirmation during busy network periods. -setting `?ttl=` on a request determines how long gateway will resubmit txs for, (default: 4s/~10 slots). -e.g. `ttl?=2` means that the tx will be rebroadcast over the next 5 slots (5 * 400ms). +setting `?ttl=` on a request determines how long gateway will resubmit txs for, (default: 4s/~10 slots). +e.g. `ttl?=2` means that the tx will be rebroadcast over the next 5 slots (5 \* 400ms). ⚠️ users should take care to set either `max_order` ts or use atomic place/cancel/modify requests to prevent -double orders or orders being accepted later than intended. +double orders or orders being accepted later than intended. improving tx confirmation rates will require trial and error, try adjusting tx TTL and following parameters until results are meet requirements: + - set `--extra-rpcs=,` to broadcast tx to multiple nodes - set `--skip-tx-preflight` to disable preflight RPC checks - setting a longer `ttl` per request @@ -277,7 +283,7 @@ $ curl localhost:8080/v2/markets "symbol": "SOL", "priceStep": "0.0001", "amountStep": "0.1", - "minOrderSize": "0.1", + "minOrderSize": "0.1" } // ... ], @@ -297,6 +303,7 @@ $ curl localhost:8080/v2/markets ``` ## Get Margin Info + Returns the account margin requirements ```bash @@ -313,6 +320,7 @@ $ curl localhost:8080/v2/user/marginInfo ``` ## Get Leverage + Returns the account leverage ```bash @@ -323,11 +331,12 @@ $ curl localhost:8080/v2/leverage ```json { - "leverage" : "0.094489" + "leverage": "0.094489" } ``` ## Set Leverage + Set the max initial margin / leverage on sub-account ```bash @@ -340,6 +349,7 @@ $ curl localhost:8080/v2/leverage -d '{"leverage":"0.1"}' Returns solana tx signature on success ## Get Collateral + Returns the account's maintenance collateral ```bash @@ -350,8 +360,8 @@ $ curl localhost:8080/v2/collateral ```json { - "total":"1661.195815", - "free":"1653.531255" + "total": "1661.195815", + "free": "1653.531255" } ``` @@ -479,6 +489,7 @@ $ curl localhost:8080/v2/positionInfo/0 ``` note: + - `unrealizedPnL` is based on the oracle price at time of query - `unsettledPnl` does not include unsettled funding amounts @@ -508,11 +519,10 @@ $ curl localhost:8080/v2/transactionEvent/5JuobpnzPzwgdha4d7FpUHpvkinhyXCJhnPPkw A successful tx must bed confirm onchain and also execute successfully, the following table shows the possible responses from this endpoint: -| | execute ok | execute fail | -|-------------------|------------------------------------------|-------------------------| +| | execute ok | execute fail | +| ------------ | ------------------------------------------ | ------------------------------------------- | | confirm ok | `200, {"success": true, "events": [...] }` | `200, { "success": false, "error": "msg" }` | -| confirm fail | `404` | `404` - +| confirm fail | `404` | `404` | **Response** @@ -536,7 +546,7 @@ A response with a fill belonging to sub-account 0 } } ], - "success":true + "success": true } ``` @@ -554,7 +564,7 @@ A response for a transaction that was found, but doesn't contain any events for ```json { "events": [], - "success":true + "success": true } ``` @@ -567,10 +577,13 @@ A response for a transaction that was confirmed onchain but failed execution e.g "success": false } ``` + full list of error codes [here](https://drift-labs.github.io/v2-teacher/#errors) ### Get SOL balance + Return the on-chain SOL balance of the transaction signer (`DRIFT_GATEWAY_KEY`) + ```bash $ curl localhost:8080/v2/balance ``` @@ -580,7 +593,9 @@ $ curl localhost:8080/v2/balance ``` ### Get Authority + Return the on-chain SOL balance of the transaction signer (`DRIFT_GATEWAY_KEY`) + ```bash $ curl localhost:8080/v2/authority ``` @@ -589,7 +604,6 @@ $ curl localhost:8080/v2/authority { "pubkey": "key" } ``` - ### Place Orders - use sub-zero `amount` to indicate sell/offer order @@ -722,22 +736,25 @@ for more info see jup docs: https://dev.jup.ag/docs/api/swap-api/quote **Parameters**: Request body: + ```json { "inputMarket": number, // Input spot market index "outputMarket": number, // Output spot market index - "exactIn": boolean, // true = exactIn, false, exactOut + "exactIn": boolean, // true = exactIn, false, exactOut "amount": string, // Amount of input token to sell when exactIn=true OR amount of output token to buy when exactIn=false "slippage": number, // Max slippage in bps "useDirectRoutes": bool, // Direct Routes limits Jupiter routing to single hop routes only "excludeDexes": bool, // comma separated list of dexes to exclude } ``` + dexes: https://api.jup.ag/swap/v1/program-id-to-label **Response**: Success (200): + ```json { "signature": string, // Transaction signature @@ -746,6 +763,7 @@ Success (200): ``` Error (400/500): + ```json { "code": number, // Error code @@ -755,6 +773,7 @@ Error (400/500): **Example**: USDC to SOL + ```bash curl -X POST "http://localhost:8080/v2/swap" \ -H "Content-Type: application/json" \ @@ -784,11 +803,12 @@ Executes a spot token swap using Titan routing and liquidity aggregation. **Parameters**: Request body: + ```json { "inputMarket": number, // Input spot market index "outputMarket": number, // Output spot market index - "exactIn": boolean, // true = exactIn, false = exactOut + "exactIn": boolean, // true = exactIn, false = exactOut "amount": string, // Amount of input token to sell when exactIn=true OR amount of output token to buy when exactIn=false "slippageBps": number, // Max slippage in bps "useDirectRoutes": bool, // (Optional) Direct Routes limits routing to single hop routes only @@ -800,6 +820,7 @@ Request body: **Response**: Success (200): + ```json { "signature": string, // Transaction signature @@ -808,6 +829,7 @@ Success (200): ``` Error (400/500): + ```json { "code": number, // Error code @@ -817,6 +839,7 @@ Error (400/500): **Example**: USDC to SOL + ```bash curl -X POST "http://localhost:8080/v2/titan-swap" \ -H "Content-Type: application/json" \ @@ -1029,6 +1052,22 @@ denotes a subaccount swapped some amount of token (in) for another token (out) } ``` +**transaction not confirmed** + +Emitted when a transaction was not confirmed on-chain within the gateway TTL (e.g. dropped or expired). Clients subscribed to the same `subAccountId` as the request receive this event so they can stop polling or handle the failure without relying only on the `transactionEvent` HTTP endpoint. + +```json +{ + "data": { + "txNotConfirmed": { + "signature": "4ZCsjPSTrtGeToPzYDcd464MuhZbkJ8cg6G4qmtjgW1ZMLiAMXwMvVJtUqFPnrsN9HhrAJZyPE3Hn3QQqjfDFwAr" + } + }, + "channel": "transaction", + "subAccountId": 0 +} +``` + ### Errors error responses have the following JSON structure: @@ -1060,9 +1099,9 @@ Use the UI or Ts/Python sdk to initialize the sub-account first. #### `429`s / gateway hitting RPC rate limits this can occur during gateway startup as drift market data is pulled from the network and subscriptions are initialized. -try setting `INIT_RPC_THROTTLE=2` for e.g. 2s or longer, this allows some time between request bursts on start up. +try setting `INIT_RPC_THROTTLE=2` for e.g. 2s or longer, this allows some time between request bursts on start up. -The free \_api.mainnet-beta.solana.com_ RPC support is limited due to rate-limits +The free \_api.mainnet-beta.solana.com\_ RPC support is limited due to rate-limits ```rust Some(GetProgramAccounts), kind: Reqwest(reqwest::Error { kind: Status(429), ... @@ -1071,4 +1110,4 @@ Some(GetProgramAccounts), kind: Reqwest(reqwest::Error { kind: Status(429), ... #### Slow queries Queries longer than a few _ms_ may be due to missing market subscriptions. -Ensure gateway is properly configured with intended markets. +Ensure gateway is properly configured with intended markets. diff --git a/src/controller.rs b/src/controller.rs index 537bd95..e6e2486 100644 --- a/src/controller.rs +++ b/src/controller.rs @@ -1,3 +1,5 @@ +#![cfg_attr(test, allow(dead_code, unused_imports))] + use std::{ borrow::Cow, collections::HashSet, @@ -6,6 +8,8 @@ use std::{ time::{Duration, SystemTime}, }; +use tokio::sync::broadcast; + use base64::Engine as _; use drift_rs::{ constants::{ProgramData, DEFAULT_PUBKEY}, @@ -57,7 +61,7 @@ use crate::{ TxEventsResponse, TxResponse, UserCollateralResponse, UserLeverageResponse, UserMarginResponse, PRICE_DECIMALS, }, - websocket::map_drift_event_for_account, + websocket::{map_drift_event_for_account, TxNotConfirmedMessage}, Context, LOG_TARGET, }; @@ -96,6 +100,8 @@ pub struct AppState { extra_rpcs: Vec>, /// swift node url swift_node: String, + /// broadcast sender for tx-not-confirmed WS events (None when not wired) + tx_not_confirmed_tx: Option>, } impl AppState { @@ -133,6 +139,7 @@ impl AppState { skip_tx_preflight: bool, extra_rpcs: Vec<&str>, swift_node: String, + tx_not_confirmed_tx: Option>, ) -> Self { let (state_commitment, tx_commitment) = commitment.unwrap_or((CommitmentConfig::confirmed(), CommitmentConfig::confirmed())); @@ -198,6 +205,7 @@ impl AppState { .map(|u| Arc::new(RpcClient::new(get_http_url(u).expect("valid RPC url")))) .collect(), swift_node, + tx_not_confirmed_tx, } } @@ -391,7 +399,9 @@ impl AppState { ) .with_priority_fee(priority_fee, ctx.cu_limit); let tx = build_cancel_ix(builder, req)?.build(); - self.send_tx(tx, "cancel_orders", ctx.ttl).await + let sub_account_id = ctx.sub_account_id.unwrap_or(self.default_sub_account_id()); + self.send_tx(tx, "cancel_orders", ctx.ttl, Some(sub_account_id)) + .await } /// Return position for market if given, otherwise return all positions @@ -619,7 +629,9 @@ impl AppState { .place_orders(orders) .build(); - self.send_tx(tx, "cancel_and_place", ctx.ttl).await + let sub_account_id = ctx.sub_account_id.unwrap_or(self.default_sub_account_id()); + self.send_tx(tx, "cancel_and_place", ctx.ttl, Some(sub_account_id)) + .await } pub async fn place_orders( @@ -653,7 +665,10 @@ impl AppState { .place_orders(orders) .build(); - let tx_res = self.send_tx(tx, "place_orders", ctx.ttl).await; + let sub_account_id = ctx.sub_account_id.unwrap_or(self.default_sub_account_id()); + let tx_res = self + .send_tx(tx, "place_orders", ctx.ttl, Some(sub_account_id)) + .await; match tx_res { Ok(tx_res) => Ok(PlaceOrderResponse::Tx(tx_res)), Err(e) => Err(e), @@ -764,7 +779,9 @@ impl AppState { ) .with_priority_fee(ctx.cu_price.unwrap_or(pf), ctx.cu_limit); let tx = build_modify_ix(builder, req, self.client.program_data())?.build(); - self.send_tx(tx, "modify_orders", ctx.ttl).await + let sub_account_id = ctx.sub_account_id.unwrap_or(self.default_sub_account_id()); + self.send_tx(tx, "modify_orders", ctx.ttl, Some(sub_account_id)) + .await } pub async fn swap(&self, ctx: Context, req: SwapRequest) -> GatewayResult { @@ -828,7 +845,9 @@ impl AppState { .with_priority_fee(ctx.cu_price.unwrap_or(pf), ctx.cu_limit) .build(); - self.send_tx(tx, "swap", ctx.ttl).await + let sub_account_id = ctx.sub_account_id.unwrap_or(self.default_sub_account_id()); + self.send_tx(tx, "swap", ctx.ttl, Some(sub_account_id)) + .await } pub async fn titan_swap( @@ -897,7 +916,9 @@ impl AppState { .with_priority_fee(ctx.cu_price.unwrap_or(pf), ctx.cu_limit) .build(); - self.send_tx(tx, "titan_swap", ctx.ttl).await + let sub_account_id = ctx.sub_account_id.unwrap_or(self.default_sub_account_id()); + self.send_tx(tx, "titan_swap", ctx.ttl, Some(sub_account_id)) + .await } pub async fn get_tx_events_for_subaccount_id( @@ -1010,7 +1031,8 @@ impl AppState { sub_account_id, ) .build(); - self.send_tx(tx, "set_margin_ratio", ctx.ttl).await + self.send_tx(tx, "set_margin_ratio", ctx.ttl, Some(sub_account_id)) + .await } pub fn default_sub_account_id(&self) -> u16 { @@ -1028,6 +1050,7 @@ impl AppState { tx: VersionedMessage, reason: &'static str, _ttl: Option, + _sub_account_id: Option, ) -> GatewayResult { match self.client.simulate_tx(tx).await?.err { Some(err) => { @@ -1047,6 +1070,7 @@ impl AppState { tx: VersionedMessage, reason: &'static str, ttl: Option, + sub_account_id: Option, ) -> GatewayResult { let recent_block_hash = self.client.get_latest_blockhash().await?; let tx = self.wallet.sign_tx(tx, recent_block_hash)?; @@ -1080,6 +1104,8 @@ impl AppState { let primary_rpc = Arc::clone(&self.client).rpc(); let tx_signature = sig; let extra_rpcs = self.extra_rpcs.clone(); + let tx_not_confirmed_tx = self.tx_not_confirmed_tx.clone(); + let sub_account_id_for_ws = sub_account_id; tokio::spawn(async move { let start = SystemTime::now(); let ttl = Duration::from_secs(ttl.unwrap_or(DEFAULT_TX_TTL) as u64); @@ -1115,6 +1141,9 @@ impl AppState { } if !confirmed { warn!(target: LOG_TARGET, "tx was not confirmed: {tx_signature:?}"); + if let (Some(tx), Some(sid)) = (tx_not_confirmed_tx, sub_account_id_for_ws) { + let _ = tx.send((sid, tx_signature.to_string())); + } } }); diff --git a/src/main.rs b/src/main.rs index d1bd372..76be422 100644 --- a/src/main.rs +++ b/src/main.rs @@ -302,6 +302,8 @@ async fn main() -> std::io::Result<()> { ); sub_account_ids.dedup(); + let (tx_not_confirmed_tx, tx_not_confirmed_rx) = tokio::sync::broadcast::channel(64); + let state = AppState::new( &config.rpc_host, config.dev, @@ -313,6 +315,7 @@ async fn main() -> std::io::Result<()> { .map(|s| s.split(",").collect()) .unwrap_or_default(), config.swift_node, + Some(tx_not_confirmed_tx), ) .await; @@ -378,6 +381,7 @@ async fn main() -> std::io::Result<()> { client.ws(), Arc::clone(&state.wallet), client.program_data(), + Some(tx_not_confirmed_rx), ) .await; @@ -606,12 +610,14 @@ mod tests { false, vec![], "https://master.swift.drift.trade".to_string(), + None, ) .await } // likely safe to ignore during development, mainly regression test for CI #[actix_web::test] + #[ignore = "network/rpc dependent; run with --ignored when TEST_DELEGATED_SIGNER set"] async fn delegated_signing_ok() { let _ = env_logger::try_init(); let delegated_seed = @@ -637,6 +643,7 @@ mod tests { false, vec![], "https://master.swift.drift.trade".to_string(), + None, ) .await; @@ -660,6 +667,7 @@ mod tests { // likely safe to ignore during development, mainly regression test for CI #[actix_web::test] + #[ignore = "requires JUPITER_API_KEY; run with --ignored when key set"] async fn delegated_swap_works() { let _ = env_logger::try_init(); let delegated_seed = @@ -685,6 +693,7 @@ mod tests { false, vec![], "https://master.swift.drift.trade".to_string(), + None, ) .await; @@ -713,6 +722,7 @@ mod tests { // likely safe to ignore during development, mainly regression test for CI #[actix_web::test] + #[ignore = "requires JUPITER_API_KEY; run with --ignored when key set"] async fn swap_works() { let _ = env_logger::try_init(); let wallet = create_wallet(Some(get_seed()), None, None); @@ -728,6 +738,7 @@ mod tests { false, vec![], "https://master.swift.drift.trade".to_string(), + None, ) .await; @@ -864,6 +875,7 @@ mod tests { } #[actix_web::test] + #[ignore = "network/rpc dependent; run with --ignored when RPC returns expected tx shape"] async fn get_tx_events_works() { let _ = env_logger::try_init(); let controller = setup_controller(Some( @@ -916,6 +928,7 @@ mod tests { } #[actix_web::test] + #[ignore = "network/rpc dependent; run with --ignored when RPC returns expected tx shape"] async fn get_tx_events_works_for_wrong_subaccount() { let _ = env_logger::try_init(); let controller = setup_controller(Some( diff --git a/src/websocket.rs b/src/websocket.rs index 279fbc4..8f3cc97 100644 --- a/src/websocket.rs +++ b/src/websocket.rs @@ -16,7 +16,7 @@ use serde_json::json; use tokio::{ io::AsyncWriteExt, net::{TcpListener, TcpStream}, - sync::Mutex, + sync::{broadcast, Mutex}, task::JoinHandle, }; use tokio_tungstenite::{accept_async, tungstenite::Message}; @@ -26,12 +26,17 @@ use crate::{ LOG_TARGET, }; +/// Message sent when a tx was not confirmed within the gateway TTL. +/// (sub_account_id, signature) +pub type TxNotConfirmedMessage = (u16, String); + /// Start the websocket server pub async fn start_ws_server( listen_address: &str, ws_client: Arc, wallet: Arc, program_data: &'static ProgramData, + tx_not_confirmed_rx: Option>, ) { // Create the event loop and TCP listener we'll accept connections on. let listener = TcpListener::bind(&listen_address) @@ -40,11 +45,13 @@ pub async fn start_ws_server( info!("Ws server listening at: ws://{}", listen_address); tokio::spawn(async move { while let Ok((stream, _)) = listener.accept().await { + let tx_rx = tx_not_confirmed_rx.as_ref().map(|r| r.resubscribe()); tokio::spawn(accept_connection( stream, Arc::clone(&ws_client), Arc::clone(&wallet), program_data, + tx_rx, )); } }); @@ -55,6 +62,7 @@ async fn accept_connection( ws_client: Arc, wallet: Arc, program_data: &'static ProgramData, + tx_not_confirmed_rx: Option>, ) { let addr = stream.peer_addr().expect("peer address"); @@ -92,6 +100,45 @@ async fn accept_connection( let (message_tx, mut message_rx) = tokio::sync::mpsc::channel::(64); let subscriptions = Arc::new(Mutex::new(HashMap::>::default())); + // forward tx-not-confirmed events to this connection when subscribed to the matching sub_account + if let Some(mut rx) = tx_not_confirmed_rx { + let message_tx = message_tx.clone(); + let subscriptions = Arc::clone(&subscriptions); + tokio::spawn(async move { + loop { + match rx.recv().await { + Ok((sub_account_id, signature)) => { + let subs = subscriptions.lock().await; + if subs.contains_key(&(sub_account_id as u8)) { + let event = WsEvent { + data: AccountEvent::TxNotConfirmed { signature }, + channel: Channel::Transaction, + sub_account_id: sub_account_id as u8, + }; + if message_tx + .send(Message::text( + serde_json::to_string(&event).expect("serializes"), + )) + .await + .is_err() + { + break; + } + } + } + Err(broadcast::error::RecvError::Lagged(n)) => { + log::warn!( + target: LOG_TARGET, + "tx not confirmed broadcast lagged, dropped {} messages", + n + ); + } + Err(broadcast::error::RecvError::Closed) => break, + } + } + }); + } + // writes messages to the connection tokio::spawn(async move { while let Some(msg) = message_rx.recv().await { @@ -235,13 +282,14 @@ enum Method { Unsubscribe, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Serialize, Deserialize, Debug, Clone, Copy)] #[serde(rename_all = "lowercase")] pub(crate) enum Channel { Fills, Orders, Funding, Swap, + Transaction, } #[derive(Deserialize, Debug)] @@ -334,6 +382,9 @@ pub(crate) enum AccountEvent { tx_idx: usize, signature: String, }, + /// Emitted when a transaction was not confirmed within the gateway TTL (e.g. dropped or expired). + #[serde(rename_all = "camelCase")] + TxNotConfirmed { signature: String }, #[serde(rename_all = "camelCase")] Trigger { order_id: u32, oracle_price: u64 }, } @@ -708,12 +759,12 @@ pub(crate) fn map_drift_event_for_account( }), ), DriftEvent::Swap { - user, + user: _, amount_in, amount_out, market_in, market_out, - fee, + fee: _, ts, signature, tx_idx, @@ -735,3 +786,28 @@ pub(crate) fn map_drift_event_for_account( } } } + +#[cfg(test)] +mod tests { + use super::{AccountEvent, Channel, WsEvent}; + + #[test] + fn tx_not_confirmed_serializes_with_expected_shape() { + let event = WsEvent { + data: AccountEvent::TxNotConfirmed { + signature: "4ZCsjPSTrtGeToPzYDcd464MuhZbkJ8cg6G4qmtjgW1Z".to_string(), + }, + channel: Channel::Transaction, + sub_account_id: 0, + }; + let json = serde_json::to_string(&event).expect("serializes"); + let parsed: serde_json::Value = serde_json::from_str(&json).expect("valid json"); + assert_eq!(parsed["channel"], "transaction"); + assert_eq!(parsed["subAccountId"], 0); + assert!(parsed["data"]["txNotConfirmed"].is_object()); + assert_eq!( + parsed["data"]["txNotConfirmed"]["signature"], + "4ZCsjPSTrtGeToPzYDcd464MuhZbkJ8cg6G4qmtjgW1Z" + ); + } +}