diff --git a/.env.sample b/.env.sample index 9d6a6f8..c13c36f 100644 --- a/.env.sample +++ b/.env.sample @@ -1,3 +1,11 @@ ETH_NODE_URL= ETH_ACCOUNT_ADDRESS= ACCOUNT_PRIVATE_KEY= + +BINANCE_API_KEY= +BINANCE_SECRET= + +BINANCE_TEST_API_KEY= +BINANCE_TEST_SECRET= + +RUST_LOG="info" \ No newline at end of file diff --git a/barter-data-rs/examples/uniswapx_trades.rs b/barter-data-rs/examples/uniswapx_trades.rs index 4e3d378..0830604 100644 --- a/barter-data-rs/examples/uniswapx_trades.rs +++ b/barter-data-rs/examples/uniswapx_trades.rs @@ -1,6 +1,5 @@ use barter_data::dex::uniswapx; use dotenv::dotenv; -use tokio::time::{sleep, Duration}; #[tokio::main] async fn main() { diff --git a/barter-data-rs/src/exchange/binance/futures/mod.rs b/barter-data-rs/src/exchange/binance/futures/mod.rs index ecc0642..b91c173 100644 --- a/barter-data-rs/src/exchange/binance/futures/mod.rs +++ b/barter-data-rs/src/exchange/binance/futures/mod.rs @@ -14,9 +14,6 @@ pub mod l2; /// Liquidation types. pub mod liquidation; -/// Account types. -pub mod account; - /// [`BinanceFuturesUsd`] WebSocket server base url. /// /// See docs: diff --git a/barter-execution-rs/src/error.rs b/barter-execution-rs/src/error.rs index 42027d2..2bac0f4 100644 --- a/barter-execution-rs/src/error.rs +++ b/barter-execution-rs/src/error.rs @@ -25,3 +25,27 @@ pub enum ExecutionError { #[error("SocketError: {0}")] Socket(#[from] SocketError), } + +/// All errors generated in the barter::portfolio module. +#[derive(Error, Clone, Copy, Debug)] +pub enum PositionError { + #[error("Failed to build struct due to missing attributes: {0}")] + BuilderIncomplete(&'static str), + + #[error("Failed to parse Position entry Side due to ambiguous fill quantity & Decision.")] + ParseEntrySide, + + #[error("Cannot exit Position with an entry decision FillEvent.")] + CannotEnterPositionWithExitFill, + + #[error("Cannot exit Position with an entry decision FillEvent.")] + CannotExitPositionWithEntryFill, + + #[error("Cannot generate PositionExit from Position that has not been exited")] + PositionExit, + + #[error("Negative Trade Quantity")] + NegativeTradeQuantity, + // #[error("Failed to interact with repository")] + // RepositoryInteraction(#[from] RepositoryError), +} diff --git a/barter-execution-rs/src/execution/binance/connection.rs b/barter-execution-rs/src/execution/binance/connection.rs index 0c2b37d..515cc5a 100644 --- a/barter-execution-rs/src/execution/binance/connection.rs +++ b/barter-execution-rs/src/execution/binance/connection.rs @@ -11,7 +11,6 @@ use barter_integration::{ }, }; use chrono::Utc; -use dotenv::dotenv; use hmac::Hmac; use reqwest::{RequestBuilder, StatusCode}; use serde::Deserialize; @@ -19,7 +18,6 @@ use tokio::sync::mpsc; use crate::{ error::ExecutionError, - fill::Decision, model::order::{Order, OrderKind, RequestOpen}, }; @@ -258,15 +256,6 @@ impl BinanceClient { } } -pub(super) fn get_order_side(side: Decision) -> &'static str { - match side { - Decision::Long => "BUY", - Decision::Short => "SELL", - Decision::CloseLong => "SELL", - Decision::CloseShort => "BUY", - } -} - #[derive(Debug, Clone)] pub struct BinanceSigner { pub api_key: String, @@ -371,10 +360,7 @@ impl HttpParser for BinanceParser { mod tests { use super::*; use crate::{ - execution::binance::requests::FutOrderResponse, - fill::MarketMeta, - model::{order_event::OrderEventBuilder, ClientOrderId}, - ExecutionId, + execution::binance::requests::FutOrderResponse, model::ClientOrderId, ExecutionId, }; use barter_integration::model::{ instrument::{kind::InstrumentKind, symbol::Symbol, Instrument}, diff --git a/barter-execution-rs/src/execution/binance/mod.rs b/barter-execution-rs/src/execution/binance/mod.rs index 44d1195..8b8784d 100644 --- a/barter-execution-rs/src/execution/binance/mod.rs +++ b/barter-execution-rs/src/execution/binance/mod.rs @@ -1,15 +1,17 @@ use async_trait::async_trait; +use barter_data::exchange::ExchangeId; use barter_integration::model::{instrument::symbol::Symbol, Exchange}; -use futures::future::join_all; -use tracing::{error, info}; +use futures::{future::join_all, stream::BoxStream}; +use tracing::error; use crate::{ error::ExecutionError, model::{ balance::SymbolBalance, - order::{self, Cancelled, Open, Order, OrderId, RequestCancel, RequestOpen}, + order::{Cancelled, Open, Order, OrderId, RequestCancel, RequestOpen}, + AccountEventKind, }, - ExecutionClient, ExecutionId, + ExecutionClient, }; use self::{ @@ -20,13 +22,14 @@ use self::{ pub mod connection; pub mod requests; +pub mod types; pub mod websocket; /// Binance [`ExecutionClient`] implementation that integrates with the Barter -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct BinanceExecution { - client: BinanceClient, - // client_type: BinanceApi, + pub client: BinanceClient, + client_type: BinanceApi, } /// Config for initializing a [`BinanceExecution`] instance. @@ -35,25 +38,31 @@ pub struct BinanceConfig { pub client_type: BinanceApi, } +/// Binance Execution Client +impl BinanceExecution {} + #[async_trait] impl ExecutionClient for BinanceExecution { type Config = BinanceConfig; fn exchange(&self) -> Exchange { - Exchange::from(ExecutionId::Simulated) + Exchange::from(ExchangeId::BinanceFuturesUsd) } async fn init(config: Self::Config) -> Self { let client = BinanceClient::new(config.client_type); - let url = BinanceClient::get_url(config.client_type); - let (api_key, _) = BinanceClient::get_key_secret(config.client_type); - init_listener(&api_key, url).await; Self { client, - // client_type: config.client_type, + client_type: config.client_type, } } + async fn init_stream(&self) -> Option> { + let url = BinanceClient::get_url(self.client_type); + let (api_key, _) = BinanceClient::get_key_secret(self.client_type); + Some(init_listener(&api_key, url).await) + } + async fn fetch_orders_open(&self) -> Result>, ExecutionError> { todo!() } diff --git a/barter-data-rs/src/exchange/binance/futures/account.rs b/barter-execution-rs/src/execution/binance/types/account_update.rs similarity index 83% rename from barter-data-rs/src/exchange/binance/futures/account.rs rename to barter-execution-rs/src/execution/binance/types/account_update.rs index 12f74db..f2e2ade 100644 --- a/barter-data-rs/src/exchange/binance/futures/account.rs +++ b/barter-execution-rs/src/execution/binance/types/account_update.rs @@ -1,23 +1,22 @@ -use super::super::BinanceChannel; -use crate::{ - event::{MarketEvent, MarketIter}, - exchange::ExchangeId, - subscription::account_update::{AccountUpdate, BalanceUpdate, PositionUpdate}, - Identifier, -}; +use super::BinanceFuturesEventType; use barter_integration::model::{ - instrument::{symbol::Symbol, Instrument}, - Exchange, PerpSide, SubscriptionId, + instrument::{kind::InstrumentKind, symbol::Symbol, Instrument}, + PerpSide, Side, }; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; +use crate::model::{ + balance::{Balance, SymbolBalance}, + position::{Position, PositionMeta}, + AccountEventKind, +}; + /// [`BinanceFuturesUsd`](super::BinanceFuturesUsd) AccountUpdate messages. /// /// ### Raw Payload Examples /// See docs: /// ```json -//// ``` /// { /// "e": "ACCOUNT_UPDATE", // Event Type /// "E": 1564745798939, // Event Time @@ -76,10 +75,12 @@ use serde::{Deserialize, Serialize}; /// ] /// } /// } +/// ``` + #[derive(Clone, PartialEq, PartialOrd, Debug, Deserialize, Serialize)] pub struct BinanceAccountUpdate { #[serde(alias = "e")] - pub event_type: String, + pub event_type: BinanceFuturesEventType, #[serde( alias = "E", deserialize_with = "barter_integration::de::de_u64_epoch_ms_as_datetime_utc" @@ -138,6 +139,7 @@ pub struct BinanceBalanceUpdate { /// [`BinanceFuturesUsd`](super::BinanceFuturesUsd) BinancePositionUpdate. /// ### Raw Payload Examples +/// ```json /// { /// "s":"BTCUSDT", // Symbol /// "pa":"0", // Position Amount @@ -148,7 +150,7 @@ pub struct BinanceBalanceUpdate { /// "mt":"isolated", // Margin Type /// "iw":"0.00000000", // Isolated Wallet (if isolated position) /// "ps":"BOTH" // Position Side -/// }, +/// } /// ``` #[derive(Clone, PartialEq, PartialOrd, Debug, Deserialize, Serialize)] @@ -173,73 +175,80 @@ pub struct BinancePositionUpdate { pub position_side: PerpSide, } -impl Identifier> for BinanceAccountUpdate { - fn id(&self) -> Option { - Some(SubscriptionId::from(BinanceChannel::ACCOUNT_UPDATE.0)) - } -} - -impl From for BalanceUpdate { +impl From for SymbolBalance { fn from(balance_update: BinanceBalanceUpdate) -> Self { Self { - asset: balance_update.asset, - wallet_balance: balance_update.wallet_balance, - cross_wallet_balance: balance_update.cross_wallet_balance, - balance_change: balance_update.balance_change, + symbol: balance_update.asset, + balance: Balance { + // TODO not totally clear if these are total or available + total: balance_update.wallet_balance + balance_update.cross_wallet_balance, + available: balance_update.wallet_balance + balance_update.cross_wallet_balance, + }, } } } -impl From for PositionUpdate { - fn from(position_update: BinancePositionUpdate) -> Self { +impl From for Position { + fn from(update: BinancePositionUpdate) -> Self { + let mut side = Side::Sell; + if update.position_amount > 0.0 { + side = Side::Buy; + } + Self { - symbol: position_update.symbol, - position_amount: position_update.position_amount, - position_side: position_update.position_side, - unrealized_pnl: position_update.unrealized_pnl, - entry_price: position_update.entry_price, - breakeven_price: position_update.breakeven_price, + position_id: update.symbol.clone().to_string(), + meta: PositionMeta { + update_time: Utc::now(), + exit_balance: None, + }, + instrument: Instrument::from(( + update.symbol.clone(), + update.symbol.clone(), + InstrumentKind::Perpetual, + )), + side, + quantity: update.position_amount, + enter_avg_price_gross: update.entry_price, + enter_value_gross: update.entry_price * update.position_amount, + unrealised_profit_loss: update.unrealized_pnl, + realised_profit_loss: update.accumulated_realized, + + // Fees + exit_avg_price_gross: 0.0, + exit_value_gross: 0.0, + current_symbol_price: 0.0, + enter_fees: Default::default(), + exit_fees: Default::default(), + enter_fees_total: 0.0, + exit_fees_total: 0.0, + current_value_gross: 0.0, } } } -impl From<(ExchangeId, Instrument, BinanceAccountUpdate)> for MarketIter { - fn from( - (exchange_id, instrument, account_update): (ExchangeId, Instrument, BinanceAccountUpdate), - ) -> Self { - Self(vec![Ok(MarketEvent { - exchange_time: account_update.event_time, - received_time: Utc::now(), - exchange: Exchange::from(exchange_id), - instrument, - kind: AccountUpdate { - time: account_update.event_time, - balance_updates: account_update +impl From for (AccountEventKind, AccountEventKind) { + fn from(update: BinanceAccountUpdate) -> Self { + ( + AccountEventKind::Balances( + update .update_data .balance_updates .into_iter() - .map(BalanceUpdate::from) + .map(SymbolBalance::from) .collect(), - position_updates: account_update + ), + AccountEventKind::Positions( + update .update_data .position_updates .into_iter() - .map(PositionUpdate::from) + .map(Position::from) .collect(), - }, - })]) + ), + ) } } -/// Deserialize a [`BinanceAccountUpdate`] "s" as the associated [`SubscriptionId`]. -pub fn de_liquidation_subscription_id<'de, D>(deserializer: D) -> Result -where - D: serde::de::Deserializer<'de>, -{ - Deserialize::deserialize(deserializer) - .map(|_market: String| SubscriptionId::from(BinanceChannel::ACCOUNT_UPDATE.0)) -} - #[cfg(test)] mod tests { use super::*; @@ -310,7 +319,7 @@ mod tests { }"#; let expected = BinanceAccountUpdate { - event_type: "ACCOUNT_UPDATE".to_string(), + event_type: BinanceFuturesEventType::AccountUpdate, event_time: datetime_utc_from_epoch_duration(Duration::from_millis(1564745798939)), transaction_time: datetime_utc_from_epoch_duration(Duration::from_millis( 1564745798938, diff --git a/barter-execution-rs/src/execution/binance/types/mod.rs b/barter-execution-rs/src/execution/binance/types/mod.rs new file mode 100644 index 0000000..9fa613f --- /dev/null +++ b/barter-execution-rs/src/execution/binance/types/mod.rs @@ -0,0 +1,53 @@ +use serde::{Deserialize, Serialize}; + +pub mod account_update; +pub mod order_update; + +#[derive(Clone, Copy, PartialEq, PartialOrd, Debug, Serialize)] +pub enum BinanceFuturesEventType { + AccountUpdate, + OrderTradeUpdate, + // AccountConfigUpdate, + // MarginCall, + // ForceOrder, + // AccountPositionUpdate, + // LiquidationOrder, + // ContractPositionUpdate, + // ContractForceOrder, + // MarginCallForce, + // OrderBookTicker, + // Other, +} + +impl<'de> Deserialize<'de> for BinanceFuturesEventType { + fn deserialize(deserializer: D) -> Result + where + D: serde::de::Deserializer<'de>, + { + let s = String::deserialize(deserializer)?; + match s.as_str() { + "ACCOUNT_UPDATE" => Ok(Self::AccountUpdate), + "ORDER_TRADE_UPDATE" => Ok(Self::OrderTradeUpdate), + // "ACCOUNT_CONFIG_UPDATE" => Ok(Self::AccountConfigUpdate), + // "MARGIN_CALL" => Ok(Self::MarginCall), + // "FORCE_ORDER" => Ok(Self::ForceOrder), + // "ACCOUNT_POSITION_UPDATE" => Ok(Self::AccountPositionUpdate), + // "LIQUIDATION_ORDER" => Ok(Self::LiquidationOrder), + // "CONTRACT_POSITION_UPDATE" => Ok(Self::ContractPositionUpdate), + // "CONTRACT_FORCE_ORDER" => Ok(Self::ContractForceOrder), + // "MARGIN_CALL_FORCE" => Ok(Self::MarginCallForce), + // "ORDER_BOOK_TICKER" => Ok(Self::OrderBookTicker), + // "OTHER" => Ok(Self::Other), + _ => Err(serde::de::Error::custom(format!( + "unknown BinanceFuturesEventType: {}", + s + ))), + } + } +} + +#[derive(Clone, Copy, PartialEq, PartialOrd, Debug, Deserialize, Serialize)] +pub struct BinanceFutAccountEvent { + #[serde(alias = "e")] + pub event_type: BinanceFuturesEventType, +} diff --git a/barter-execution-rs/src/execution/binance/types/order_update.rs b/barter-execution-rs/src/execution/binance/types/order_update.rs new file mode 100644 index 0000000..ff96a7b --- /dev/null +++ b/barter-execution-rs/src/execution/binance/types/order_update.rs @@ -0,0 +1,375 @@ +use crate::model::{ + order::OrderId, + trade::{SymbolFees, Trade, TradeId}, + AccountEventKind, +}; + +use super::BinanceFuturesEventType; +use barter_integration::model::{ + instrument::{kind::InstrumentKind, symbol::Symbol, Instrument}, + PerpSide, Side, +}; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; + +/// enums for the order status +/// Order Type +/// MARKET +/// LIMIT +/// STOP +/// TAKE_PROFIT +/// LIQUIDATION +/// +/// Execution Type +/// NEW +/// CANCELED +/// CALCULATED - Liquidation Execution +/// EXPIRED +/// TRADE +/// AMENDMENT - Order Modified +/// +/// Order Status +/// NEW +/// PARTIALLY_FILLED +/// FILLED +/// CANCELED +/// EXPIRED +/// EXPIRED_IN_MATCH +/// +/// Time in force +/// GTC +/// IOC +/// FOK +/// GTX +/// +/// Working Type +/// MARK_PRICE +/// CONTRACT_PRICE + +//// Order status +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub enum BinanceFutOrderStatus { + #[serde(rename = "NEW")] + New, + #[serde(rename = "PARTIALLY_FILLED")] + PartiallyFilled, + #[serde(rename = "FILLED")] + Filled, + #[serde(rename = "CANCELED")] + Canceled, + #[serde(rename = "EXPIRED")] + Expired, + #[serde(rename = "EXPIRED_IN_MATCH")] + ExpiredInMatch, +} + +/// Execution type +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub enum BinFutExecutionType { + #[serde(rename = "NEW")] + New, + #[serde(rename = "CANCELED")] + Canceled, + #[serde(rename = "CALCULATED")] + Calculated, + #[serde(rename = "EXPIRED")] + Expired, + #[serde(rename = "TRADE")] + Trade, + #[serde(rename = "AMENDMENT")] + Amendment, +} + +/// Order type +#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +pub enum BinFutOrderType { + #[serde(rename = "MARKET")] + Market, + #[serde(rename = "LIMIT")] + Limit, + #[serde(rename = "STOP")] + Stop, + #[serde(rename = "TAKE_PROFIT")] + TakeProfit, + #[serde(rename = "LIQUIDATION")] + Liquidation, +} + +//// [`BinanceFuturesUsd`](super::BinanceFuturesUsd) AccountUpdate messages. +/// +/// ### Raw Payload Examples +/// See docs: +/// ```json +/// { +/// "e":"ORDER_TRADE_UPDATE", // Event Type +/// "E":1568879465651, // Event Time +/// "T":1568879465650, // Transaction Time +/// "o":{ +/// "s":"BTCUSDT", // Symbol +/// "c":"TEST", // Client Order Id +/// // special client order id: +/// // starts with "autoclose-": liquidation order +/// // "adl_autoclose": ADL auto close order +/// // "settlement_autoclose-": settlement order for delisting or delivery +/// "S":"SELL", // Side +/// "o":"TRAILING_STOP_MARKET", // Order Type +/// "f":"GTC", // Time in Force +/// "q":"0.001", // Original Quantity +/// "p":"0", // Original Price +/// "ap":"0", // Average Price +/// "sp":"7103.04", // Stop Price. Please ignore with TRAILING_STOP_MARKET order +/// "x":"NEW", // Execution Type +/// "X":"NEW", // Order Status +/// "i":8886774, // Order Id +/// "l":"0", // Order Last Filled Quantity +/// "z":"0", // Order Filled Accumulated Quantity +/// "L":"0", // Last Filled Price +/// "N":"USDT", // Commission Asset, will not push if no commission +/// "n":"0", // Commission, will not push if no commission +/// "T":1568879465650, // Order Trade Time +/// "t":0, // Trade Id +/// "b":"0", // Bids Notional +/// "a":"9.91", // Ask Notional +/// "m":false, // Is this trade the maker side? +/// "R":false, // Is this reduce only +/// "wt":"CONTRACT_PRICE", // Stop Price Working Type +/// "ot":"TRAILING_STOP_MARKET",// Original Order Type +/// "ps":"LONG", // Position Side +/// "cp":false, // If Close-All, pushed with conditional order +/// "AP":"7476.89", // Activation Price, only puhed with TRAILING_STOP_MARKET order +/// "cr":"5.0", // Callback Rate, only puhed with TRAILING_STOP_MARKET order +/// "pP": false, // If price protection is turned on +/// "si": 0, // ignore +/// "ss": 0, // ignore +/// "rp":"0", // Realized Profit of the trade +/// "V":"EXPIRE_TAKER", // STP mode +/// "pm":"OPPONENT", // Price match mode +/// "gtd":0 // TIF GTD order auto cancel time +/// } +/// } +/// ``` +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BinanceFutOrderUpdate { + /// Event type. + #[serde(rename = "e")] + pub event_type: BinanceFuturesEventType, + /// Event time. + #[serde( + rename = "E", + deserialize_with = "barter_integration::de::de_u64_epoch_ms_as_datetime_utc" + )] + pub event_time: DateTime, + /// Transaction time. + #[serde( + rename = "T", + deserialize_with = "barter_integration::de::de_u64_epoch_ms_as_datetime_utc" + )] + pub transaction_time: DateTime, + /// Order. + #[serde(rename = "o")] + pub order: BinanceFutOrder, +} + +/// Order +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BinanceFutOrder { + /// Symbol. + #[serde(rename = "s")] + pub symbol: Symbol, + /// Client order id. + #[serde(rename = "c")] + pub client_order_id: String, + /// Side. + #[serde(rename = "S")] + pub side: Side, + /// Order type. + #[serde(rename = "o")] + pub order_type: BinFutOrderType, + /// Time in force. + #[serde(rename = "f")] + pub time_in_force: String, + /// Original quantity. + #[serde(rename = "q", deserialize_with = "barter_integration::de::de_str")] + pub original_quantity: f64, + /// Original price. + #[serde(rename = "p", deserialize_with = "barter_integration::de::de_str")] + pub original_price: f64, + /// Average price. + #[serde(rename = "ap", deserialize_with = "barter_integration::de::de_str")] + pub average_price: f64, + /// Stop price. + #[serde(rename = "sp", deserialize_with = "barter_integration::de::de_str")] + pub stop_price: f64, + /// Execution type. + #[serde(rename = "x")] + pub execution_type: BinFutExecutionType, + /// Order status. + #[serde(rename = "X")] + pub order_status: BinanceFutOrderStatus, + /// Order id. + #[serde(rename = "i")] + pub order_id: u64, + /// Order last filled quantity. + #[serde(rename = "l", deserialize_with = "barter_integration::de::de_str")] + pub order_last_filled_quantity: f64, + /// Order filled accumulated quantity. + #[serde(rename = "z", deserialize_with = "barter_integration::de::de_str")] + pub order_filled_accumulated_quantity: f64, + /// Last filled price. + #[serde(rename = "L", deserialize_with = "barter_integration::de::de_str")] + pub last_filled_price: f64, + /// Commission asset. + #[serde(default, rename = "N")] + pub commission_asset: Option, + /// Commission. + #[serde( + default, + rename = "n", + deserialize_with = "barter_integration::de::de_option_f64" + )] + pub commission: Option, + /// Order trade time. + #[serde( + rename = "T", + deserialize_with = "barter_integration::de::de_u64_epoch_ms_as_datetime_utc" + )] + pub order_trade_time: DateTime, + /// Trade id. + #[serde(rename = "t")] + pub trade_id: u64, + /// Bids notional. + #[serde(rename = "b", deserialize_with = "barter_integration::de::de_str")] + pub bids_notional: f64, + /// Ask notional. + #[serde(rename = "a", deserialize_with = "barter_integration::de::de_str")] + pub ask_notional: f64, + /// Is this trade the maker side? + #[serde(rename = "m")] + pub is_maker_side: bool, + /// Is this reduce only. + #[serde(rename = "R")] + pub is_reduce_only: bool, + #[serde(rename = "cp")] + pub is_close_all: Option, + #[serde( + default, + rename = "AP", + deserialize_with = "barter_integration::de::de_option_f64" + )] + pub activation_price: Option, + #[serde( + default, + rename = "cr", + deserialize_with = "barter_integration::de::de_option_f64" + )] + pub callback_rate: Option, + #[serde(rename = "pP")] + pub is_price_protection: Option, + #[serde( + rename = "rp", + default, + deserialize_with = "barter_integration::de::de_option_f64" + )] + pub realized_profit: Option, + #[serde(rename = "V")] + pub stp_mode: String, + #[serde(rename = "pm")] + pub price_match_mode: String, + #[serde(rename = "gtd")] + pub tif_gtd_order_auto_cancel_time: i64, + #[serde(rename = "ps")] + pub position_side: PerpSide, +} + +impl From for AccountEventKind { + fn from(update: BinanceFutOrderUpdate) -> Self { + AccountEventKind::Trade({ + let order = update.order; + Trade { + time: update.event_time, + id: TradeId(order.trade_id.to_string()), + order_id: OrderId(order.order_id.to_string()), + instrument: Instrument { + base: order.symbol.clone(), + quote: order.symbol, + kind: InstrumentKind::Perpetual, + }, + side: order.side, + price: order.last_filled_price, + quantity: order.order_last_filled_quantity, + fees: SymbolFees { + symbol: order.commission_asset.unwrap_or_default(), + fees: order.commission.unwrap_or_default(), + } + .into(), + } + }) + } +} + +// Test order update +#[cfg(test)] +mod test { + use super::*; + use barter_integration::model::instrument::symbol::Symbol; + use chrono::Utc; + + #[test] + fn test_order_update() { + let order_update = BinanceFutOrderUpdate { + event_type: BinanceFuturesEventType::OrderTradeUpdate, + event_time: Utc::now(), + transaction_time: Utc::now(), + order: BinanceFutOrder { + symbol: Symbol::new("BTCUSDT"), + client_order_id: "TEST".to_string(), + side: Side::Buy, + order_type: BinFutOrderType::Limit, + time_in_force: "GTC".to_string(), + original_quantity: 0.001, + original_price: 0.0, + average_price: 0.0, + stop_price: 7103.04, + execution_type: BinFutExecutionType::New, + order_status: BinanceFutOrderStatus::New, + order_id: 8886774, + order_last_filled_quantity: 0.0, + order_filled_accumulated_quantity: 0.0, + last_filled_price: 0.0, + commission_asset: Some(Symbol::new("USDT")), + commission: Some(0.0), + order_trade_time: Utc::now(), + trade_id: 0, + bids_notional: 0.0, + ask_notional: 9.91, + is_maker_side: false, + is_reduce_only: false, + is_close_all: None, + activation_price: None, + callback_rate: None, + is_price_protection: None, + realized_profit: None, + stp_mode: "EXPIRE_TAKER".to_string(), + price_match_mode: "OPPONENT".to_string(), + tif_gtd_order_auto_cancel_time: 0, + position_side: PerpSide::Long, + }, + }; + + let account_event: AccountEventKind = order_update.into(); + match account_event { + AccountEventKind::Trade(trade) => { + assert_eq!(trade.id.0, "0"); + assert_eq!(trade.order_id.0, "8886774"); + assert_eq!(trade.instrument.base, "BTCUSDT".into()); + assert_eq!(trade.instrument.quote, "BTCUSDT".into()); + assert_eq!(trade.side, Side::Buy); + assert_eq!(trade.price, 0.0); + assert_eq!(trade.quantity, 0.0); + assert_eq!(trade.fees.clone().exchange.unwrap().symbol, "USDT".into()); + assert_eq!(trade.fees.exchange.unwrap().fees, 0.0); + } + _ => panic!("unexpected account event"), + } + } +} diff --git a/barter-execution-rs/src/execution/binance/websocket.rs b/barter-execution-rs/src/execution/binance/websocket.rs index 1ce42df..6a8a7ed 100644 --- a/barter-execution-rs/src/execution/binance/websocket.rs +++ b/barter-execution-rs/src/execution/binance/websocket.rs @@ -1,15 +1,24 @@ -use futures::StreamExt; +use crate::{ + execution::binance::types::{order_update::BinanceFutOrderStatus, BinanceFuturesEventType}, + model::AccountEventKind, +}; + +use super::types::{ + account_update::BinanceAccountUpdate, order_update::BinanceFutOrderUpdate, + BinanceFutAccountEvent, +}; +use futures::stream::{BoxStream, StreamExt}; use reqwest::Client; -use tokio::{net::TcpStream, spawn}; -use tokio_tungstenite::{connect_async, MaybeTlsStream, WebSocketStream}; +use tokio_tungstenite::{connect_async, tungstenite::Message}; use tracing::{error, info}; use url::Url; -pub async fn init_listener(api_key: &str, api_url: &str) { +pub async fn init_listener(api_key: &str, api_url: &str) -> BoxStream<'static, AccountEventKind> { let listen_key = get_listen_key(api_key, api_url) .await .expect("Failed to get listen key"); let stream_url = format!("wss://fstream.binancefuture.com/ws/{}", listen_key); + let stream_url = Url::parse(&stream_url).expect("Failed to parse stream url"); // TODO: // PUT /fapi/v1/listenKey @@ -23,11 +32,12 @@ pub async fn init_listener(api_key: &str, api_url: &str) { // loop { // interval.tick().await; - spawn(async move { - if let Err(e) = listen_to_user_data_stream(&stream_url).await { - error!("Error listening to user data stream: {}", e); - } - }); + // spawn(async move { + // if let Err(e) = listen_to_user_data_stream(&stream_url).await { + // error!("Error listening to user data stream: {}", e); + // } + // }); + listen_to_user_data_stream(stream_url).await } async fn get_listen_key( @@ -46,34 +56,60 @@ async fn get_listen_key( Ok(res["listenKey"].as_str().unwrap().to_string()) } -// async fn return_ws_stream(stream_url: &str) -> WebSocketStream> { -// let ws_stream = connect_async(Url::parse(stream_url).expect("Failed to parse url")) -// .await -// .map(|(websocket, _)| websocket) -// .expect("Failed to conenct"); +fn process_message(msg: Message) -> Vec { + match msg { + Message::Text(text) => { + let event: BinanceFutAccountEvent = serde_json::from_str(&text).unwrap(); + let mut events = Vec::new(); -// info!("Connected to Binance user data stream"); -// println!("stream_url: {}", stream_url); -// ws_stream -// } + match event.event_type { + BinanceFuturesEventType::AccountUpdate => { + match serde_json::from_str::(&text) { + Ok(account_update) => { + let (bal_event, pos_event) = + <(AccountEventKind, AccountEventKind)>::from(account_update); + events.push(bal_event); + events.push(pos_event); + } + Err(e) => { + error!("Failed to parse account update: {}", e); + } + } + } + BinanceFuturesEventType::OrderTradeUpdate => { + match serde_json::from_str::(&text) { + Ok(trade_update) => match trade_update.order.order_status { + BinanceFutOrderStatus::Filled => { + events.push(trade_update.into()); + } + BinanceFutOrderStatus::PartiallyFilled => { + events.push(trade_update.into()); + } + _ => {} + }, + Err(e) => { + error!("Failed to parse order trade update: {}", e); + } + } + } + } + events + } + _ => Vec::new(), // Non-text messages generate no events + } +} -async fn listen_to_user_data_stream(stream_url: &str) -> Result<(), Box> { - let ws_stream = connect_async(Url::parse(stream_url)?) +async fn listen_to_user_data_stream(ws_url: Url) -> BoxStream<'static, AccountEventKind> { + let ws_stream = connect_async(ws_url) .await .map(|(websocket, _)| websocket) - .expect("Failed to conenct"); + .expect("Failed to conenct to Binance websocket"); info!("Connected to Binance user data stream"); - println!("stream_url: {}", stream_url); - let (write, read) = ws_stream.split(); - - // TODO track account state here - // update account state on diff and send to account state manager - read.for_each(|message| async { - if let Ok(msg) = message { - info!("Received message from Binance: {}", msg.to_text().unwrap()); - } - }) - .await; - Ok(()) + ws_stream + .flat_map(|message| { + let events = process_message(message.unwrap()); // Safely handle unwrap in real code + futures::stream::iter(events) + }) + .boxed() } diff --git a/barter-execution-rs/src/lib.rs b/barter-execution-rs/src/lib.rs index 8c8fed6..22daac6 100644 --- a/barter-execution-rs/src/lib.rs +++ b/barter-execution-rs/src/lib.rs @@ -31,6 +31,8 @@ use crate::{ use async_trait::async_trait; use barter_integration::model::Exchange; use execution::binance::{BinanceConfig, BinanceExecution}; +use futures::stream::BoxStream; +use model::AccountEventKind; use serde::{Deserialize, Serialize}; use simulated::execution::{SimulatedExecution, SimulationConfig}; use std::fmt::{Display, Formatter}; @@ -70,6 +72,10 @@ pub trait ExecutionClient { /// from the exchange, as well as returning the HTTP client `Self`. async fn init(config: Self::Config) -> Self; + async fn init_stream(&self) -> Option> { + None + } + /// Return a [`FillEvent`] from executing the input [`OrderEvent`]. // fn generate_fill(&self, order: &OrderEvent) -> Result; @@ -145,17 +151,23 @@ impl ExecutionId { pub mod test_util { use crate::{ model::{ - trade::{SymbolFees, Trade, TradeId}, + position::Position, + trade::{Fees, SymbolFees, Trade, TradeId}, ClientOrderId, }, simulated::exchange::account::order::Orders, Open, Order, OrderId, }; - use barter_data::subscription::trade::PublicTrade; + use barter_data::{ + event::{DataKind, MarketEvent}, + exchange::ExchangeId, + subscription::trade::PublicTrade, + }; use barter_integration::model::{ instrument::{kind::InstrumentKind, Instrument}, Exchange, Side, }; + use chrono::{TimeZone, Utc}; pub fn client_orders( trade_number: u64, @@ -201,13 +213,70 @@ pub mod test_util { pub fn trade(id: TradeId, side: Side, price: f64, quantity: f64, fees: SymbolFees) -> Trade { Trade { + time: Utc.timestamp_opt(0, 0).unwrap(), id, order_id: OrderId::from("order_id"), instrument: Instrument::from(("base", "quote", InstrumentKind::Perpetual)), side, price, quantity, - fees, + fees: fees.into(), + } + } + + pub fn test_trade(sid: Side, price: f64, quantity: f64) -> Trade { + Trade { + time: Utc.timestamp_opt(0, 0).unwrap(), + id: TradeId::from("trade_id"), + order_id: OrderId::from("order_id"), + instrument: Instrument::from(("base", "quote", InstrumentKind::Perpetual)), + side: sid, + price, + quantity, + fees: Fees { + exchange: Some(SymbolFees::new("usdt", 1.0)), + slippage: Some(SymbolFees::new("usdt", 1.0)), + network: Some(SymbolFees::new("usdt", 1.0)), + }, + } + } + + /// Build a [`Position`]. + pub fn position() -> Position { + Position { + position_id: "engine_id_trader_{}_{}_position".to_owned(), + instrument: Instrument::from(("eth", "usdt", InstrumentKind::Spot)), + meta: Default::default(), + side: Side::Buy, + quantity: 1.0, + enter_fees: Default::default(), + enter_fees_total: 0.0, + enter_avg_price_gross: 100.0, + enter_value_gross: 100.0, + exit_fees: Default::default(), + exit_fees_total: 0.0, + exit_avg_price_gross: 0.0, + exit_value_gross: 0.0, + current_symbol_price: 100.0, + current_value_gross: 100.0, + unrealised_profit_loss: 0.0, + realised_profit_loss: 0.0, + } + } + + /// Build a [`MarketEvent`] of [`DataKind::PublicTrade`](DataKind) with the provided [`Side`]. + pub fn market_event_trade(side: Side) -> MarketEvent { + MarketEvent { + exchange_time: Utc::now(), + received_time: Utc::now(), + exchange: Exchange::from(ExchangeId::BinanceSpot), + instrument: Instrument::from(("btc", "usdt", InstrumentKind::Spot)), + kind: DataKind::Trade(PublicTrade { + id: "trade_id".to_string(), + price: 1000.0, + amount: 1.0, + side, + }), } } } diff --git a/barter-execution-rs/src/model/execution_event.rs b/barter-execution-rs/src/model/execution_event.rs index 4717b44..7d38c9e 100644 --- a/barter-execution-rs/src/model/execution_event.rs +++ b/barter-execution-rs/src/model/execution_event.rs @@ -14,17 +14,3 @@ pub enum ExecutionRequest { CancelOrders(Vec<(Exchange, Vec>)>), CancelOrdersAll(Vec), } - -// Todo: If we pass tuple (Exchange, Order), the OrderRequest should maybe be diff that doesn't include Exchange -#[derive(Debug)] -pub enum ExchangeRequest { - // Fetch Account State - FetchBalances, - FetchOrdersOpen, - - // Open Orders - OpenOrders(Vec>), - - CancelOrders(Vec>), - CancelOrdersAll, -} diff --git a/barter-execution-rs/src/model/mod.rs b/barter-execution-rs/src/model/mod.rs index 3d8159f..3d9fc71 100644 --- a/barter-execution-rs/src/model/mod.rs +++ b/barter-execution-rs/src/model/mod.rs @@ -1,6 +1,7 @@ use self::{ balance::SymbolBalance, order::{Cancelled, Open, Order}, + position::Position, trade::Trade, }; use barter_integration::model::Exchange; @@ -13,6 +14,7 @@ pub mod balance; pub mod execution_event; pub mod order; pub mod order_event; +pub mod position; pub mod trade; /// Normalised Barter [`AccountEvent`] containing metadata about the included @@ -34,15 +36,27 @@ pub enum AccountEventKind { // WebSocket Only Balance(SymbolBalance), + Trade(Trade), // HTTP & WebSocket Balances(Vec), + Positions(Vec), // TODO // ExecutionError(ExecutionError), // ConnectionStatus, } +impl From<(Exchange, AccountEventKind)> for AccountEvent { + fn from((exchange, kind): (Exchange, AccountEventKind)) -> Self { + Self { + received_time: Utc::now(), + exchange, + kind, + } + } +} + #[derive(Copy, Clone, Eq, PartialEq, Ord, PartialOrd, Hash, Debug, Deserialize, Serialize)] pub struct ClientOrderId(pub Uuid); diff --git a/barter-execution-rs/src/model/position.rs b/barter-execution-rs/src/model/position.rs new file mode 100644 index 0000000..993d15e --- /dev/null +++ b/barter-execution-rs/src/model/position.rs @@ -0,0 +1,1414 @@ +use barter_data::event::{DataKind, MarketEvent}; +use barter_integration::model::{instrument::Instrument, Exchange, Side}; +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use std::convert::TryFrom; + +use crate::error::PositionError; + +use super::{ + balance::Balance, + trade::{FeeAmount, Fees, Trade}, +}; + +/// Enters a new [`Position`]. +pub trait PositionEnterer { + /// Returns a new [`Position`], given an input [`Trade`] & an associated engine_id. + fn enter(exchange: &Exchange, fill: &Trade) -> Result; +} + +/// Updates an open [`Position`]. +pub trait PositionUpdater { + /// Updates an open [`Position`] using the latest input [`MarketEvent`], returning a + /// [`PositionUpdate`] that communicates the open [`Position`]'s change in state. + fn update(&mut self, market: &MarketEvent) -> Option; +} + +/// Exits an open [`Position`]. +pub trait PositionExiter { + /// Exits an open [`Position`], given the input Portfolio equity & the [`Trade`] returned + /// from an Execution handler. + fn exit(&mut self, balance: Balance, fill: &Trade) -> Result; +} + +/// Communicates a String represents a unique [`Position`] identifier. +pub type PositionId = String; + +/// Returns a unique identifier for a [`Position`] given an engine_id, [`Exchange`] & [`Instrument`]. +pub fn determine_position_id(exchange: &Exchange, instrument: &Instrument) -> PositionId { + format!("{}_{}_position", exchange, instrument) +} + +/// Data encapsulating the state of an ongoing or closed [`Position`]. +#[derive(Clone, PartialEq, PartialOrd, Debug, Deserialize, Serialize)] +pub struct Position { + /// Unique identifier for a [`Position`] generated from an engine_id, [`Exchange`] & [`Instrument`]. + pub position_id: PositionId, + + /// Metadata detailing trace UUIDs, timestamps & equity associated with entering, updating & exiting. + pub meta: PositionMeta, + + /// [`Instrument`] associated with this [`Position`]. + pub instrument: Instrument, + + /// Buy or Sell. + /// + /// Notes: + /// - Side::Buy considered synonymous with Long. + /// - Side::Sell considered synonymous with Short. + pub side: Side, + + /// +ve or -ve quantity of symbol contracts opened. + pub quantity: f64, + + /// All fees types incurred from entering a [`Position`], and their associated [`FeeAmount`]. + pub enter_fees: Fees, + + /// Total of enter_fees incurred. Sum of every [`FeeAmount`] in [`Fees`] when entering a [`Position`]. + pub enter_fees_total: FeeAmount, + + /// Enter average price excluding the entry_fees_total. + pub enter_avg_price_gross: f64, + + /// abs(Quantity) * enter_avg_price_gross. + pub enter_value_gross: f64, + + /// All fees types incurred from exiting a [`Position`], and their associated [`FeeAmount`]. + pub exit_fees: Fees, + + /// Total of exit_fees incurred. Sum of every [`FeeAmount`] in [`Fees`] when entering a [`Position`]. + pub exit_fees_total: FeeAmount, + + /// Exit average price excluding the exit_fees_total. + pub exit_avg_price_gross: f64, + + /// abs(Quantity) * exit_avg_price_gross. + pub exit_value_gross: f64, + + /// Symbol current close price. + pub current_symbol_price: f64, + + /// abs(Quantity) * current_symbol_price. + pub current_value_gross: f64, + + /// Unrealised P&L whilst the [`Position`] is open. + pub unrealised_profit_loss: f64, + + /// Realised P&L after the [`Position`] has closed. + pub realised_profit_loss: f64, +} + +impl PositionEnterer for Position { + fn enter(exchange: &Exchange, trade: &Trade) -> Result { + // Initialise Position Metadata + let metadata = PositionMeta { + update_time: trade.time, + exit_balance: None, + }; + + // Enter fees + let enter_fees_total = trade.fees.calculate_total_fees(); + + // Enter price + let enter_avg_price_gross = Position::calculate_avg_price_gross(trade); + + // Unreal profit & loss + let unrealised_profit_loss = -enter_fees_total * 2.0; + + if trade.quantity.is_sign_negative() { + return Err(PositionError::NegativeTradeQuantity); + } + + let quantity = Position::position_quantity(trade); + + Ok(Position { + position_id: determine_position_id(&exchange, &trade.instrument), + instrument: trade.instrument.clone(), + meta: metadata, + side: trade.side, + quantity, + enter_fees: trade.fees.clone(), + enter_fees_total, + enter_avg_price_gross, + enter_value_gross: trade.price, + exit_fees: Fees::default(), + exit_fees_total: 0.0, + exit_avg_price_gross: 0.0, + exit_value_gross: 0.0, + current_symbol_price: enter_avg_price_gross, + current_value_gross: trade.price, + unrealised_profit_loss, + realised_profit_loss: 0.0, + }) + } +} + +impl PositionUpdater for Position { + fn update(&mut self, market: &MarketEvent) -> Option { + // Determine close from MarketEvent + let close = match &market.kind { + DataKind::Trade(trade) => trade.price, + DataKind::Candle(candle) => candle.close, + DataKind::OrderBookL1(book_l1) => book_l1.volume_weighed_mid_price(), + DataKind::OrderBook(book) => book.volume_weighed_mid_price()?, + DataKind::Liquidation(_) => return None, + DataKind::IntentOrder(_) => return None, + }; + + self.meta.update_time = market.exchange_time; + + self.current_symbol_price = close; + + // Market value gross + self.current_value_gross = close * self.quantity.abs(); + + // Unreal profit & loss + self.unrealised_profit_loss = self.calculate_unrealised_profit_loss(); + + // Return a PositionUpdate event that communicates the change in state + Some(PositionUpdate::from(self)) + } +} + +impl PositionExiter for Position { + fn exit(&mut self, mut balance: Balance, trade: &Trade) -> Result { + let trade_position_quantity = Position::position_quantity(trade); + let final_quantity = self.quantity + trade_position_quantity; + + if final_quantity != 0 as f64 { + return Err(PositionError::CannotExitPositionWithEntryFill); + } + + // Exit fees + self.exit_fees = trade.fees.clone(); + self.exit_fees_total = trade.fees.calculate_total_fees(); + + // Exit value & price + self.exit_value_gross = trade.price; + self.exit_avg_price_gross = Position::calculate_avg_price_gross(trade); + + // Result profit & loss + self.realised_profit_loss = self.calculate_realised_profit_loss(); + self.unrealised_profit_loss = self.realised_profit_loss; + + // Metadata + balance.total += self.realised_profit_loss; + self.meta.update_time = trade.time; + self.meta.exit_balance = Some(balance); + + PositionExit::try_from(self) + } +} + +impl Position { + /// Returns a [`PositionBuilder`] instance. + pub fn builder() -> PositionBuilder { + PositionBuilder::new() + } + + /// Determine the [`Position`] entry [`Side`] by analysing the input [`FillEvent`]. + pub fn position_quantity(trade: &Trade) -> f64 { + match trade.side { + Side::Buy => trade.quantity, + Side::Sell => -trade.quantity, + } + } + + /// Calculates the [`Position::enter_avg_price_gross`] or [`Position::exit_avg_price_gross`] of + /// a [`Trade`]. + pub fn calculate_avg_price_gross(fill: &Trade) -> f64 { + (fill.price / fill.quantity).abs() + } + + /// Calculate the approximate [`Position::unrealised_profit_loss`] of a [`Position`]. + pub fn calculate_unrealised_profit_loss(&self) -> f64 { + let approx_total_fees = self.enter_fees_total * 2.0; + + match self.side { + Side::Buy => self.current_value_gross - self.enter_value_gross - approx_total_fees, + Side::Sell => self.enter_value_gross - self.current_value_gross - approx_total_fees, + } + } + + /// Calculate the exact [`Position::realised_profit_loss`] of a [`Position`]. + pub fn calculate_realised_profit_loss(&self) -> f64 { + let total_fees = self.enter_fees_total + self.exit_fees_total; + + match self.side { + Side::Buy => self.exit_value_gross - self.enter_value_gross - total_fees, + Side::Sell => self.enter_value_gross - self.exit_value_gross - total_fees, + } + } + + /// Calculate the PnL return of a closed [`Position`] - assumed [`Position::realised_profit_loss`] is + /// appropriately calculated. + pub fn calculate_profit_loss_return(&self) -> f64 { + self.realised_profit_loss / self.enter_value_gross + } +} + +/// Builder to construct [`Position`] instances. +#[derive(Debug, Default)] +pub struct PositionBuilder { + pub position_id: Option, + pub exchange: Option, + pub instrument: Option, + pub meta: Option, + pub side: Option, + pub quantity: Option, + pub enter_fees: Option, + pub enter_fees_total: Option, + pub enter_avg_price_gross: Option, + pub enter_value_gross: Option, + pub exit_fees: Option, + pub exit_fees_total: Option, + pub exit_avg_price_gross: Option, + pub exit_value_gross: Option, + pub current_symbol_price: Option, + pub current_value_gross: Option, + pub unrealised_profit_loss: Option, + pub realised_profit_loss: Option, +} + +impl PositionBuilder { + pub fn new() -> Self { + Self::default() + } + + pub fn position_id(self, value: PositionId) -> Self { + Self { + position_id: Some(value), + ..self + } + } + + pub fn exchange(self, value: Exchange) -> Self { + Self { + exchange: Some(value), + ..self + } + } + + pub fn instrument(self, value: Instrument) -> Self { + Self { + instrument: Some(value), + ..self + } + } + + pub fn meta(self, value: PositionMeta) -> Self { + Self { + meta: Some(value), + ..self + } + } + + pub fn side(self, value: Side) -> Self { + Self { + side: Some(value), + ..self + } + } + + pub fn quantity(self, value: f64) -> Self { + Self { + quantity: Some(value), + ..self + } + } + + pub fn enter_fees(self, value: Fees) -> Self { + Self { + enter_fees: Some(value), + ..self + } + } + + pub fn enter_fees_total(self, value: FeeAmount) -> Self { + Self { + enter_fees_total: Some(value), + ..self + } + } + + pub fn enter_avg_price_gross(self, value: f64) -> Self { + Self { + enter_avg_price_gross: Some(value), + ..self + } + } + + pub fn enter_value_gross(self, value: f64) -> Self { + Self { + enter_value_gross: Some(value), + ..self + } + } + + pub fn exit_fees(self, value: Fees) -> Self { + Self { + exit_fees: Some(value), + ..self + } + } + + pub fn exit_fees_total(self, value: FeeAmount) -> Self { + Self { + exit_fees_total: Some(value), + ..self + } + } + + pub fn exit_avg_price_gross(self, value: f64) -> Self { + Self { + exit_avg_price_gross: Some(value), + ..self + } + } + + pub fn exit_value_gross(self, value: f64) -> Self { + Self { + exit_value_gross: Some(value), + ..self + } + } + + pub fn current_symbol_price(self, value: f64) -> Self { + Self { + current_symbol_price: Some(value), + ..self + } + } + + pub fn current_value_gross(self, value: f64) -> Self { + Self { + current_value_gross: Some(value), + ..self + } + } + + pub fn unrealised_profit_loss(self, value: f64) -> Self { + Self { + unrealised_profit_loss: Some(value), + ..self + } + } + + pub fn realised_profit_loss(self, value: f64) -> Self { + Self { + realised_profit_loss: Some(value), + ..self + } + } + + pub fn build(self) -> Result { + Ok(Position { + position_id: self + .position_id + .ok_or(PositionError::BuilderIncomplete("position_id"))?, + instrument: self + .instrument + .ok_or(PositionError::BuilderIncomplete("instrument"))?, + meta: self.meta.ok_or(PositionError::BuilderIncomplete("meta"))?, + side: self.side.ok_or(PositionError::BuilderIncomplete("side"))?, + quantity: self + .quantity + .ok_or(PositionError::BuilderIncomplete("quantity"))?, + enter_fees: self + .enter_fees + .ok_or(PositionError::BuilderIncomplete("enter_fees"))?, + enter_fees_total: self + .enter_fees_total + .ok_or(PositionError::BuilderIncomplete("enter_fees_total"))?, + enter_avg_price_gross: self + .enter_avg_price_gross + .ok_or(PositionError::BuilderIncomplete("enter_avg_price_gross"))?, + enter_value_gross: self + .enter_value_gross + .ok_or(PositionError::BuilderIncomplete("enter_value_gross"))?, + exit_fees: self + .exit_fees + .ok_or(PositionError::BuilderIncomplete("exit_fees"))?, + exit_fees_total: self + .exit_fees_total + .ok_or(PositionError::BuilderIncomplete("exit_fees_total"))?, + exit_avg_price_gross: self + .exit_avg_price_gross + .ok_or(PositionError::BuilderIncomplete("exit_avg_price_gross"))?, + exit_value_gross: self + .exit_value_gross + .ok_or(PositionError::BuilderIncomplete("exit_value_gross"))?, + current_symbol_price: self + .current_symbol_price + .ok_or(PositionError::BuilderIncomplete("current_symbol_price"))?, + current_value_gross: self + .current_value_gross + .ok_or(PositionError::BuilderIncomplete("current_value_gross"))?, + unrealised_profit_loss: self + .unrealised_profit_loss + .ok_or(PositionError::BuilderIncomplete("unrealised_profit_loss"))?, + realised_profit_loss: self + .realised_profit_loss + .ok_or(PositionError::BuilderIncomplete("realised_profit_loss"))?, + }) + } +} + +/// Metadata detailing the trace UUIDs & timestamps associated with entering, updating & exiting +/// a [`Position`]. +#[derive(Copy, Clone, PartialEq, PartialOrd, Debug, Deserialize, Serialize)] +pub struct PositionMeta { + /// [`Trade`] timestamp that triggered the entering of this [`Position`]. + // pub enter_time: DateTime, + + /// Timestamp of the last event to trigger a [`Position`] state change (enter, update, exit). + pub update_time: DateTime, + + /// Portfolio [`Balance`] calculated at the point of exiting a [`Position`]. + pub exit_balance: Option, +} + +impl Default for PositionMeta { + fn default() -> Self { + Self { + // enter_time: Utc::now(), + update_time: Utc::now(), + exit_balance: None, + } + } +} + +/// [`Position`] update event. Occurs as a result of receiving new [`MarketEvent`] data. +#[derive(Clone, PartialEq, PartialOrd, Debug, Deserialize, Serialize)] +pub struct PositionUpdate { + /// Unique identifier for a [`Position`], generated from an exchange, symbol, and enter_time. + pub position_id: String, + /// Event timestamp of the last event to trigger a [`Position`] update. + pub update_time: DateTime, + /// Symbol current close price. + pub current_symbol_price: f64, + /// abs(Quantity) * current_symbol_price. + pub current_value_gross: f64, + /// Unrealised P&L whilst the [`Position`] is open. + pub unrealised_profit_loss: f64, +} + +impl From<&mut Position> for PositionUpdate { + fn from(updated_position: &mut Position) -> Self { + Self { + position_id: updated_position.position_id.clone(), + update_time: updated_position.meta.update_time, + current_symbol_price: updated_position.current_symbol_price, + current_value_gross: updated_position.current_value_gross, + unrealised_profit_loss: updated_position.unrealised_profit_loss, + } + } +} + +/// [`Position`] exit event. Occurs as a result of a [`Trade`] that exits a [`Position`]. +#[derive(Clone, PartialEq, PartialOrd, Debug, Deserialize, Serialize)] +pub struct PositionExit { + /// Unique identifier for a [`Position`], generated from an exchange, symbol, and enter_time. + pub position_id: String, + + /// [`Trade`] timestamp that triggered the exiting of this [`Position`]. + pub exit_time: DateTime, + + /// Portfolio [`Balance`] calculated at the point of exiting a [`Position`]. + pub exit_balance: Balance, + + /// All fees types incurred from exiting a [`Position`], and their associated [`FeeAmount`]. + pub exit_fees: Fees, + + /// Total of exit_fees incurred. Sum of every [`FeeAmount`] in [`Fees`] when entering a [`Position`]. + pub exit_fees_total: FeeAmount, + + /// Exit average price excluding the exit_fees_total. + pub exit_avg_price_gross: f64, + + /// abs(Quantity) * exit_avg_price_gross. + pub exit_value_gross: f64, + + /// Realised P&L after the [`Position`] has closed. + pub realised_profit_loss: f64, +} + +impl TryFrom<&mut Position> for PositionExit { + type Error = PositionError; + + fn try_from(exited_position: &mut Position) -> Result { + Ok(Self { + position_id: exited_position.position_id.clone(), + exit_time: exited_position.meta.update_time, + exit_balance: exited_position + .meta + .exit_balance + .ok_or(PositionError::PositionExit)?, + exit_fees: exited_position.exit_fees.clone(), + exit_fees_total: exited_position.exit_fees_total, + exit_avg_price_gross: exited_position.exit_avg_price_gross, + exit_value_gross: exited_position.exit_value_gross, + realised_profit_loss: exited_position.realised_profit_loss, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + model::{ + order::OrderId, + trade::{SymbolFees, TradeId}, + }, + test_util::{market_event_trade, position, test_trade}, + }; + use barter_integration::model::{instrument::kind::InstrumentKind, Side}; + + #[test] + fn enter_new_position_with_long_decision_provided() { + let trade = Trade { + time: Utc::now(), + id: TradeId::from("trade_id"), + order_id: OrderId::from("order_id"), + instrument: Instrument::from(("eth", "usdc", InstrumentKind::Perpetual)), + side: Side::Buy, + price: 1.0, + quantity: 100.0, + fees: Fees { + exchange: Some(SymbolFees::new("usdt", 1.0)), + slippage: Some(SymbolFees::new("usdt", 1.0)), + network: Some(SymbolFees::new("usdt", 1.0)), + }, + }; + + let exchange = Exchange::from("binance"); + let position = Position::enter(&exchange, &trade).unwrap(); + + assert_eq!(position.side, Side::Buy); + assert_eq!(position.quantity, trade.quantity); + assert_eq!(position.enter_fees_total, 3.0); + assert_eq!(position.enter_fees.exchange, trade.fees.exchange); + assert_eq!(position.enter_fees.slippage, trade.fees.slippage); + assert_eq!(position.enter_fees.network, trade.fees.network); + assert_eq!( + position.enter_avg_price_gross, + (trade.price / trade.quantity.abs()) + ); + assert_eq!(position.enter_value_gross, trade.price); + assert_eq!(position.exit_fees_total, 0.0); + assert_eq!(position.exit_avg_price_gross, 0.0); + assert_eq!(position.exit_value_gross, 0.0); + assert_eq!( + position.current_symbol_price, + (trade.price / trade.quantity.abs()) + ); + assert_eq!(position.current_value_gross, trade.price); + assert_eq!(position.unrealised_profit_loss, -6.0); // -2 * enter_fees_total + assert_eq!(position.realised_profit_loss, 0.0); + } + + #[test] + fn enter_new_position_with_short_decision_provided() { + let trade = Trade { + time: Utc::now(), + id: TradeId::from("trade_id"), + order_id: OrderId::from("order_id"), + instrument: Instrument::from(("eth", "usdc", InstrumentKind::Perpetual)), + side: Side::Sell, + price: 1.0, + quantity: 100.0, + fees: Fees { + exchange: Some(SymbolFees::new("usdt", 1.0)), + slippage: Some(SymbolFees::new("usdt", 1.0)), + network: Some(SymbolFees::new("usdt", 1.0)), + }, + }; + let exchange = Exchange::from("binance"); + let position = Position::enter(&exchange, &trade).unwrap(); + + assert_eq!(position.side, Side::Sell); + assert_eq!(position.quantity, -trade.quantity); + assert_eq!(position.enter_fees_total, 3.0); + assert_eq!(position.enter_fees.exchange, trade.fees.exchange); + assert_eq!(position.enter_fees.slippage, trade.fees.slippage); + assert_eq!(position.enter_fees.network, trade.fees.network); + assert_eq!( + position.enter_avg_price_gross, + (trade.price / trade.quantity.abs()) + ); + assert_eq!(position.enter_value_gross, trade.price); + assert_eq!(position.exit_fees_total, 0.0); + assert_eq!(position.exit_avg_price_gross, 0.0); + assert_eq!(position.exit_value_gross, 0.0); + assert_eq!( + position.current_symbol_price, + (trade.price / trade.quantity.abs()) + ); + assert_eq!(position.current_value_gross, trade.price); + assert_eq!(position.unrealised_profit_loss, -6.0); // -2 * enter_fees_total + assert_eq!(position.realised_profit_loss, 0.0); + } + + #[test] + fn enter_new_position_and_return_err_with_close_long_decision_provided() -> Result<(), String> { + // this is close long case trade.decision = Side::Buy; + + let trade = Trade { + time: Utc::now(), + id: TradeId::from("trade_id"), + order_id: OrderId::from("order_id"), + instrument: Instrument::from(("eth", "usdc", InstrumentKind::Perpetual)), + side: Side::Buy, + price: 100.0, + quantity: -1.0, + fees: Fees { + exchange: Some(SymbolFees::new("usdt", 1.0)), + slippage: Some(SymbolFees::new("usdt", 1.0)), + network: Some(SymbolFees::new("usdt", 1.0)), + }, + }; + + let exchange = Exchange::from("binance"); + + if let Err(_) = Position::enter(&exchange, &trade) { + Ok(()) + } else { + Err(String::from( + "Position::enter did not return an Err and it should have.", + )) + } + } + + #[test] + fn enter_new_position_and_return_err_with_negative_quantity_buy_provided() -> Result<(), String> + { + let trade = test_trade( + Side::Buy, + 100.0, // price + -1.0, // quantity + ); + let exchange = Exchange::from("binance"); + + if let Err(_) = Position::enter(&exchange, &trade) { + Ok(()) + } else { + Err(String::from( + "Position::enter did not return an Err and it should have.", + )) + } + } + + #[test] + fn update_long_position_so_unreal_pnl_increases() { + // Initial Position + let mut position = position(); + position.side = Side::Buy; + position.quantity = 1.0; + position.enter_fees_total = 3.0; + position.enter_fees = Fees::new_all("usdt", 1.0); + position.enter_avg_price_gross = 100.0; + position.enter_value_gross = 100.0; + position.current_symbol_price = 100.0; + position.current_value_gross = 100.0; + position.unrealised_profit_loss = position.enter_fees_total * -2.0; + + // Input MarketEvent + let mut input_market = market_event_trade(Side::Buy); + match input_market.kind { + // +100.0 higher than current_symbol_price + DataKind::Candle(ref mut candle) => candle.close = 200.0, + DataKind::Trade(ref mut trade) => trade.price = 200.0, + _ => todo!(), + }; + + // Update Position + position.update(&input_market); + + // Assert update hasn't changed fields that are constant after creation + assert_eq!(position.side, Side::Buy); + assert_eq!(position.quantity, 1.0); + assert_eq!(position.enter_fees_total, 3.0); + assert_eq!(position.enter_avg_price_gross, 100.0); + assert_eq!(position.enter_value_gross, 100.0); + + // Assert updated fields are correct + let close = match &input_market.kind { + DataKind::Trade(trade) => trade.price, + DataKind::Candle(candle) => candle.close, + _ => todo!(), + }; + assert_eq!(position.current_symbol_price, close); + assert_eq!( + position.current_value_gross, + close * position.quantity.abs() + ); + + // current_value_gross - enter_value_gross - approx_total_fees + assert_eq!(position.unrealised_profit_loss, (200.0 - 100.0 - 6.0)); + } + + #[test] + fn update_long_position_so_unreal_pnl_decreases() { + // Initial Position + let mut position = position(); + position.side = Side::Buy; + position.quantity = 1.0; + position.enter_fees_total = 3.0; + position.enter_fees = Fees::new_all("usdt", 1.0); + position.enter_avg_price_gross = 100.0; + position.enter_value_gross = 100.0; + position.current_symbol_price = 100.0; + position.current_value_gross = 100.0; + position.unrealised_profit_loss = position.enter_fees_total * -2.0; + + // Input MarketEvent + let mut input_market = market_event_trade(Side::Sell); + + match input_market.kind { + // -50.0 lower than current_symbol_price + DataKind::Candle(ref mut candle) => candle.close = 50.0, + DataKind::Trade(ref mut trade) => trade.price = 50.0, + _ => todo!(), + }; + + // Update Position + position.update(&input_market); + + // Assert update hasn't changed fields that are constant after creation + assert_eq!(position.side, Side::Buy); + assert_eq!(position.quantity, 1.0); + assert_eq!(position.enter_fees_total, 3.0); + assert_eq!(position.enter_fees.exchange.unwrap().fees, 1.0); + assert_eq!(position.enter_fees.slippage.unwrap().fees, 1.0); + assert_eq!(position.enter_fees.network.unwrap().fees, 1.0); + assert_eq!(position.enter_avg_price_gross, 100.0); + assert_eq!(position.enter_value_gross, 100.0); + + // Assert updated fields are correct + let close = match &input_market.kind { + DataKind::Trade(trade) => trade.price, + DataKind::Candle(candle) => candle.close, + _ => todo!(), + }; + assert_eq!(position.current_symbol_price, close); + assert_eq!( + position.current_value_gross, + close * position.quantity.abs() + ); + + // current_value_gross - enter_value_gross - approx_total_fees + assert_eq!(position.unrealised_profit_loss, (50.0 - 100.0 - 6.0)); + } + + #[test] + fn update_short_position_so_unreal_pnl_increases() { + // Initial Position + let mut position = position(); + position.side = Side::Sell; + position.quantity = -1.0; + position.enter_fees_total = 3.0; + position.enter_fees = Fees::new_all("usdt", 1.0); + position.enter_avg_price_gross = 100.0; + position.enter_value_gross = 100.0; + position.current_symbol_price = 100.0; + position.current_value_gross = 100.0; + position.unrealised_profit_loss = position.enter_fees_total * -2.0; + + // Input MarketEvent + let mut input_market = market_event_trade(Side::Buy); + + match input_market.kind { + // -50.0 lower than current_symbol_price + DataKind::Candle(ref mut candle) => candle.close = 50.0, + DataKind::Trade(ref mut trade) => trade.price = 50.0, + _ => todo!(), + }; + + // Update Position + position.update(&input_market); + + // Assert update hasn't changed fields that are constant after creation + assert_eq!(position.side, Side::Sell); + assert_eq!(position.quantity, -1.0); + assert_eq!(position.enter_fees_total, 3.0); + assert_eq!(position.enter_fees.exchange.unwrap().fees, 1.0); + assert_eq!(position.enter_fees.slippage.unwrap().fees, 1.0); + assert_eq!(position.enter_fees.network.unwrap().fees, 1.0); + assert_eq!(position.enter_avg_price_gross, 100.0); + assert_eq!(position.enter_value_gross, 100.0); + + // Assert updated fields are correct + let close = match &input_market.kind { + DataKind::Trade(trade) => trade.price, + DataKind::Candle(candle) => candle.close, + _ => todo!(), + }; + assert_eq!(position.current_symbol_price, close); + assert_eq!( + position.current_value_gross, + close * position.quantity.abs() + ); + + // enter_value_gross - current_value_gross - approx_total_fees + assert_eq!(position.unrealised_profit_loss, (100.0 - 50.0 - 6.0)); + } + + #[test] + fn update_short_position_so_unreal_pnl_decreases() { + // Initial Position + let mut position = position(); + position.side = Side::Sell; + position.quantity = -1.0; + position.enter_fees_total = 3.0; + position.enter_fees = Fees::new_all("usdt", 1.0); + position.enter_avg_price_gross = 100.0; + position.enter_value_gross = 100.0; + position.current_symbol_price = 100.0; + position.current_value_gross = 100.0; + position.unrealised_profit_loss = position.enter_fees_total * -2.0; + + // Input MarketEvent + let mut input_market = market_event_trade(Side::Sell); + + match input_market.kind { + // +100.0 higher than current_symbol_price + DataKind::Candle(ref mut candle) => candle.close = 200.0, + DataKind::Trade(ref mut trade) => trade.price = 200.0, + _ => todo!(), + }; + + // Update Position + position.update(&input_market); + + // Assert update hasn't changed fields that are constant after creation + assert_eq!(position.side, Side::Sell); + assert_eq!(position.quantity, -1.0); + assert_eq!(position.enter_fees_total, 3.0); + assert_eq!(position.enter_avg_price_gross, 100.0); + assert_eq!(position.enter_value_gross, 100.0); + + // Assert updated fields are correct + let close = match &input_market.kind { + DataKind::Trade(trade) => trade.price, + DataKind::Candle(candle) => candle.close, + _ => todo!(), + }; + assert_eq!(position.current_symbol_price, close); + assert_eq!( + position.current_value_gross, + close * position.quantity.abs() + ); + + // enter_value_gross - current_value_gross - approx_total_fees + assert_eq!(position.unrealised_profit_loss, (100.0 - 200.0 - 6.0)); + } + + #[test] + fn exit_long_position_with_positive_real_pnl() { + // Initial Position + let mut position = position(); + position.side = Side::Buy; + position.quantity = 1.0; + position.enter_fees_total = 3.0; + position.enter_fees = Fees::new_all("usdt", 1.0); + position.enter_avg_price_gross = 100.0; + position.enter_value_gross = 100.0; + position.current_symbol_price = 100.0; + position.current_value_gross = 100.0; + position.unrealised_profit_loss = position.enter_fees_total * -2.0; + + // Input Portfolio Current Balance + let current_balance = Balance { + // time: Utc::now(), + total: 10000.0, + available: 10000.0, + }; + + // Input Trade + let trade = test_trade( + Side::Sell, + 200.0, // price + position.quantity, // quantity + ); + + // Exit Position + position.exit(current_balance, &trade).unwrap(); + + // Assert exit hasn't changed fields that are constant after creation + assert_eq!(position.side, Side::Buy); + assert_eq!(position.quantity, 1.0); + assert_eq!(position.enter_fees_total, 3.0); + assert_eq!(position.enter_avg_price_gross, 100.0); + assert_eq!(position.enter_value_gross, 100.0); + + // Assert fields changed by exit are correct + assert_eq!(position.exit_fees_total, 3.0); + assert_eq!(position.exit_value_gross, trade.price); + assert_eq!( + position.exit_avg_price_gross, + trade.price / trade.quantity.abs() + ); + + // exit_value_gross - enter_value_gross - total_fees + assert_eq!(position.realised_profit_loss, (200.0 - 100.0 - 6.0)); + assert_eq!(position.unrealised_profit_loss, (200.0 - 100.0 - 6.0)); + + // Assert EquityPoint on Exit is correct + assert_eq!( + position.meta.exit_balance.unwrap().total, + current_balance.total + (200.0 - 100.0 - 6.0) + ) + } + + #[test] + fn exit_long_position_with_negative_real_pnl() { + // Initial Position + let mut position = position(); + position.side = Side::Buy; + position.quantity = 1.0; + position.enter_fees_total = 3.0; + position.enter_fees = Fees::new_all("usdt", 1.0); + position.enter_avg_price_gross = 100.0; + position.enter_value_gross = 100.0; + position.current_symbol_price = 100.0; + position.current_value_gross = 100.0; + position.unrealised_profit_loss = position.enter_fees_total * -2.0; + + // Input Portfolio Current Balance + let current_balance = Balance { + // time: Utc::now(), + total: 10000.0, + available: 10000.0, + }; + + // Input Trade + let trade = test_trade( + Side::Sell, + 50.0, // price + position.quantity, // quantity + ); + + // Exit Position + position.exit(current_balance, &trade).unwrap(); + + // Assert exit hasn't changed fields that are constant after creation + assert_eq!(position.side, Side::Buy); + assert_eq!(position.quantity, 1.0); + assert_eq!(position.enter_fees_total, 3.0); + assert_eq!(position.enter_avg_price_gross, 100.0); + assert_eq!(position.enter_value_gross, 100.0); + + // Assert fields changed by exit are correct + assert_eq!(position.exit_fees_total, 3.0); + assert_eq!(position.exit_value_gross, trade.price); + assert_eq!( + position.exit_avg_price_gross, + trade.price / trade.quantity.abs() + ); + + // exit_value_gross - enter_value_gross - total_fees + assert_eq!(position.realised_profit_loss, (50.0 - 100.0 - 6.0)); + assert_eq!(position.unrealised_profit_loss, (50.0 - 100.0 - 6.0)); + + // Assert EquityPoint on Exit is correct + assert_eq!( + position.meta.exit_balance.unwrap().total, + current_balance.total + (50.0 - 100.0 - 6.0) + ) + } + + #[test] + fn exit_short_position_with_positive_real_pnl() { + // Initial Position + let mut position = position(); + position.side = Side::Sell; + position.quantity = -1.0; + position.enter_fees_total = 3.0; + position.enter_fees = Fees::new_all("usdt", 1.0); + position.enter_avg_price_gross = 100.0; + position.enter_value_gross = 100.0; + position.current_symbol_price = 100.0; + position.current_value_gross = 100.0; + position.unrealised_profit_loss = position.enter_fees_total * -2.0; + + // Input Portfolio Current Balance + let current_balance = Balance { + // time: Utc::now(), + total: 10000.0, + available: 10000.0, + }; + + let trade = test_trade( + Side::Buy, + 50.0, // price + -position.quantity, // quantity + ); + + // Exit Position + position.exit(current_balance, &trade).unwrap(); + + // Assert exit hasn't changed fields that are constant after creation + assert_eq!(position.side, Side::Sell); + assert_eq!(position.quantity, -1.0); + assert_eq!(position.enter_fees_total, 3.0); + assert_eq!(position.enter_avg_price_gross, 100.0); + assert_eq!(position.enter_value_gross, 100.0); + + // Assert fields changed by exit are correct + assert_eq!(position.exit_fees_total, 3.0); + assert_eq!(position.exit_value_gross, trade.price); + assert_eq!( + position.exit_avg_price_gross, + trade.price / trade.quantity.abs() + ); + + // enter_value_gross - current_value_gross - approx_total_fees + assert_eq!(position.realised_profit_loss, (100.0 - 50.0 - 6.0)); + assert_eq!(position.unrealised_profit_loss, (100.0 - 50.0 - 6.0)); + + // Assert EquityPoint on Exit is correct + assert_eq!( + position.meta.exit_balance.unwrap().total, + current_balance.total + (100.0 - 50.0 - 6.0) + ) + } + + #[test] + fn exit_short_position_with_negative_real_pnl() { + // Initial Position + let mut position = position(); + position.side = Side::Sell; + position.quantity = -1.0; + position.enter_fees_total = 3.0; + position.enter_fees = Fees::new_all("usdt", 1.0); + position.enter_avg_price_gross = 100.0; + position.enter_value_gross = 100.0; + position.current_symbol_price = 100.0; + position.current_value_gross = 100.0; + position.unrealised_profit_loss = position.enter_fees_total * -2.0; + + // Input Portfolio Current Balance + let current_balance = Balance { + // time: Utc::now(), + total: 10000.0, + available: 10000.0, + }; + + // Input Trade + let trade = test_trade( + Side::Buy, + 200.0, // price + -position.quantity, // quantity + ); + + // Exit Position + position.exit(current_balance, &trade).unwrap(); + + // Assert exit hasn't changed fields that are constant after creation + assert_eq!(position.side, Side::Sell); + assert_eq!(position.quantity, -1.0); + assert_eq!(position.enter_fees_total, 3.0); + assert_eq!(position.enter_avg_price_gross, 100.0); + assert_eq!(position.enter_value_gross, 100.0); + + // Assert fields changed by exit are correct + assert_eq!(position.exit_fees_total, 3.0); + assert_eq!(position.exit_value_gross, trade.price); + assert_eq!( + position.exit_avg_price_gross, + trade.price / trade.quantity.abs() + ); + + // enter_value_gross - current_value_gross - approx_total_fees + assert_eq!(position.realised_profit_loss, (100.0 - 200.0 - 6.0)); + assert_eq!(position.unrealised_profit_loss, (100.0 - 200.0 - 6.0)); + + // Assert EquityPoint on Exit is correct + assert_eq!( + position.meta.exit_balance.unwrap().total, + current_balance.total + (100.0 - 200.0 - 6.0) + ) + } + + #[test] + fn exit_long_position_with_long_entry_fill_and_return_err() -> Result<(), String> { + // Initial Position + let mut position = position(); + position.side = Side::Sell; + position.quantity = -1.0; + position.enter_fees_total = 3.0; + position.enter_fees = Fees::new_all("usdt", 1.0); + position.enter_avg_price_gross = 100.0; + position.enter_value_gross = 100.0; + position.current_symbol_price = 100.0; + position.current_value_gross = 100.0; + position.unrealised_profit_loss = position.enter_fees_total * -2.0; + + // Input Portfolio Current Balance + let current_balance = Balance { + // time: Utc::now(), + total: 10000.0, + available: 10000.0, + }; + + // Input Trade + let trade = test_trade( + Side::Buy, + 200.0, // price + position.quantity.abs() + 1.0, // quantity + ); + + // Exit Position + if let Err(_) = position.exit(current_balance, &trade) { + Ok(()) + } else { + Err(String::from( + "Position::exit did not return an Err and it should have.", + )) + } + } + + #[test] + fn exit_short_position_with_short_entry_fill_and_return_err() -> Result<(), String> { + // Initial Position + let mut position = position(); + position.side = Side::Sell; + position.quantity = 1.0; + position.enter_fees_total = 3.0; + position.enter_fees = Fees::new_all("usdt", 1.0); + position.enter_avg_price_gross = 100.0; + position.enter_value_gross = 100.0; + position.current_symbol_price = 100.0; + position.current_value_gross = 100.0; + position.unrealised_profit_loss = position.enter_fees_total * -2.0; + + // Input Portfolio Current Balance + let current_balance = Balance { + // time: Utc::now(), + total: 10000.0, + available: 10000.0, + }; + + // Input Trade + let trade = test_trade( + Side::Sell, + 200.0, // price + -position.quantity, // quantity + ); + + // Exit Position + if let Err(_) = position.exit(current_balance, &trade) { + Ok(()) + } else { + Err(String::from( + "Position::exit did not return an Err and it should have.", + )) + } + } + + #[test] + fn calculate_avg_price_gross_correctly_with_positive_quantity() { + let trade = test_trade( + Side::Buy, + 1000.0, // price + 1.0, // quantity + ); + + let actual = Position::calculate_avg_price_gross(&trade); + + assert_eq!(actual, 1000.0) + } + + #[test] + fn calculate_avg_price_gross_correctly_with_negative_quantity() { + let trade = test_trade( + Side::Sell, + 1000.0, // price + -1.0, // quantity + ); + + let actual = Position::calculate_avg_price_gross(&trade); + + assert_eq!(actual, 1000.0) + } + + #[test] + fn calculate_unreal_profit_loss() { + let mut long_win = position(); // Expected PnL = +8.0 + long_win.side = Side::Buy; + long_win.enter_value_gross = 100.0; + long_win.enter_fees_total = 1.0; + long_win.current_value_gross = 110.0; + + let mut long_lose = position(); // Expected PnL = -12.0 + long_lose.side = Side::Buy; + long_lose.enter_value_gross = 100.0; + long_lose.enter_fees_total = 1.0; + long_lose.current_value_gross = 90.0; + + let mut short_win = position(); // Expected PnL = +8.0 + short_win.side = Side::Sell; + short_win.enter_value_gross = 100.0; + short_win.enter_fees_total = 1.0; + short_win.current_value_gross = 90.0; + + let mut short_lose = position(); // Expected PnL = -12.0 + short_lose.side = Side::Sell; + short_lose.enter_value_gross = 100.0; + short_lose.enter_fees_total = 1.0; + short_lose.current_value_gross = 110.0; + + let inputs = vec![long_win, long_lose, short_win, short_lose]; + + let expected_pnl = vec![8.0, -12.0, 8.0, -12.0]; + + for (position, expected) in inputs.into_iter().zip(expected_pnl.into_iter()) { + let actual = position.calculate_unrealised_profit_loss(); + assert_eq!(actual, expected); + } + } + + #[test] + fn calculate_realised_profit_loss() { + let mut long_win = position(); // Expected PnL = +18.0 + long_win.side = Side::Buy; + long_win.enter_value_gross = 100.0; + long_win.enter_fees_total = 1.0; + long_win.exit_value_gross = 120.0; + long_win.exit_fees_total = 1.0; + + let mut long_lose = position(); // Expected PnL = -22.0 + long_lose.side = Side::Buy; + long_lose.enter_value_gross = 100.0; + long_lose.enter_fees_total = 1.0; + long_lose.exit_value_gross = 80.0; + long_lose.exit_fees_total = 1.0; + + let mut short_win = position(); // Expected PnL = +18.0 + short_win.side = Side::Sell; + short_win.enter_value_gross = 100.0; + short_win.enter_fees_total = 1.0; + short_win.exit_value_gross = 80.0; + short_win.exit_fees_total = 1.0; + + let mut short_lose = position(); // Expected PnL = -22.0 + short_lose.side = Side::Sell; + short_lose.enter_value_gross = 100.0; + short_lose.enter_fees_total = 1.0; + short_lose.exit_value_gross = 120.0; + short_lose.exit_fees_total = 1.0; + + let inputs = vec![long_win, long_lose, short_win, short_lose]; + + let expected_pnl = vec![18.0, -22.0, 18.0, -22.0]; + + for (position, expected) in inputs.into_iter().zip(expected_pnl.into_iter()) { + let actual = position.calculate_realised_profit_loss(); + assert_eq!(actual, expected); + } + } + + #[test] + fn calculate_profit_loss_return() { + let mut long_win = position(); // Expected Return = 0.08 + long_win.side = Side::Buy; + long_win.enter_value_gross = 100.0; + long_win.realised_profit_loss = 8.0; + + let mut long_lose = position(); // Expected Return = -0.12 + long_lose.side = Side::Buy; + long_lose.enter_value_gross = 100.0; + long_lose.realised_profit_loss = -12.0; + + let mut short_win = position(); // Expected Return = 0.08 + short_win.side = Side::Sell; + short_win.enter_value_gross = 100.0; + short_win.realised_profit_loss = 8.0; + + let mut short_lose = position(); // Expected Return = -0.12 + short_lose.side = Side::Sell; + short_lose.enter_value_gross = 100.0; + short_lose.realised_profit_loss = -12.0; + + let inputs = vec![long_win, long_lose, short_win, short_lose]; + + let expected_return = vec![0.08, -0.12, 0.08, -0.12]; + + for (position, expected) in inputs.into_iter().zip(expected_return.into_iter()) { + let actual = position.calculate_profit_loss_return(); + assert_eq!(actual, expected); + } + } + + #[test] + fn position_update_from_position() { + let mut input_position = position(); + input_position.current_symbol_price = 100.0; + input_position.current_value_gross = 200.0; + input_position.unrealised_profit_loss = 150.0; + + let actual_update = PositionUpdate::from(&mut input_position); + + assert_eq!( + actual_update.current_symbol_price, + input_position.current_symbol_price + ); + assert_eq!( + actual_update.current_value_gross, + input_position.current_value_gross + ); + assert_eq!( + actual_update.unrealised_profit_loss, + input_position.unrealised_profit_loss + ); + } + + #[test] + fn position_exit_try_from_exited_position() { + let time = Utc::now(); + + let mut exited_position = position(); + exited_position.meta.update_time = time; + exited_position.meta.exit_balance = Some(Balance { + // time, + total: 0.0, + available: 0.0, + }); + + exited_position.exit_fees = Fees::default(); + exited_position.exit_fees_total = 0.0; + exited_position.exit_avg_price_gross = 100.0; + exited_position.exit_value_gross = 100.0; + exited_position.realised_profit_loss = 100.0; + + let actual_exit = PositionExit::try_from(&mut exited_position).unwrap(); + + assert_eq!( + actual_exit.exit_balance, + exited_position.meta.exit_balance.unwrap() + ); + assert_eq!(actual_exit.exit_fees, exited_position.exit_fees); + assert_eq!(actual_exit.exit_fees_total, exited_position.exit_fees_total); + assert_eq!( + actual_exit.exit_avg_price_gross, + exited_position.exit_avg_price_gross + ); + assert_eq!( + actual_exit.exit_value_gross, + exited_position.exit_value_gross + ); + assert_eq!( + actual_exit.realised_profit_loss, + exited_position.realised_profit_loss + ); + } + + #[test] + fn position_exit_try_from_open_position() { + let mut exited_position = position(); + exited_position.meta.exit_balance = None; + + assert!(PositionExit::try_from(&mut exited_position).is_err()); + } +} diff --git a/barter-execution-rs/src/model/trade.rs b/barter-execution-rs/src/model/trade.rs index d034a93..4ce0ed1 100644 --- a/barter-execution-rs/src/model/trade.rs +++ b/barter-execution-rs/src/model/trade.rs @@ -3,18 +3,20 @@ use barter_integration::model::{ instrument::{symbol::Symbol, Instrument}, Side, }; +use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; /// Normalised Barter private [`Trade`] model. #[derive(Clone, PartialEq, PartialOrd, Debug, Deserialize, Serialize)] pub struct Trade { pub id: TradeId, + pub time: DateTime, pub order_id: OrderId, pub instrument: Instrument, pub side: Side, pub price: f64, pub quantity: f64, - pub fees: SymbolFees, + pub fees: Fees, } /// Private [`Trade`] identifier generated by an exchange. Cannot assume this is unique across each @@ -52,3 +54,60 @@ impl SymbolFees { } } } + +impl From for Fees { + fn from(fees: SymbolFees) -> Self { + Self { + exchange: Some(fees), + slippage: None, + network: None, + } + } +} + +/// Communicative type alias for Fee amount as f64. +pub type FeeAmount = f64; + +// Todo: where should the fee currency type / conversion live? +/// All potential fees incurred by a [`FillEvent`]. +#[derive(Clone, PartialEq, PartialOrd, Debug, Default, Deserialize, Serialize)] +pub struct Fees { + /// Fee taken by the exchange/broker (eg/ commission). + pub exchange: Option, + /// Order book slippage modelled as a fee. + pub slippage: Option, + /// Fee incurred by any required network transactions (eg/ GAS). + pub network: Option, +} + +impl Fees { + pub fn new_ex(symbol: S, fee: f64) -> Self + where + S: Into, + { + Self { + exchange: Some(SymbolFees::new(symbol, fee)), + slippage: None, + network: None, + } + } + + pub fn new_all(symbol: S, fee: f64) -> Self + where + S: Into + Clone, + { + Self { + exchange: Some(SymbolFees::new(symbol.clone(), fee)), + slippage: Some(SymbolFees::new(symbol.clone(), fee)), + network: Some(SymbolFees::new(symbol.clone(), fee)), + } + } + + /// Calculates the sum of every [FeeAmount] in [Fees]. + pub fn calculate_total_fees(&self) -> f64 { + // Todo currency conversion? + self.exchange.as_ref().map(|f| f.fees).unwrap_or(0.0) + + self.network.as_ref().map(|f| f.fees).unwrap_or(0.0) + + self.slippage.as_ref().map(|f| f.fees).unwrap_or(0.0) + } +} diff --git a/barter-execution-rs/src/simulated/exchange/account/balance.rs b/barter-execution-rs/src/simulated/exchange/account/balance.rs index 671641e..257af28 100644 --- a/barter-execution-rs/src/simulated/exchange/account/balance.rs +++ b/barter-execution-rs/src/simulated/exchange/account/balance.rs @@ -130,7 +130,8 @@ impl ClientBalances { let (base_delta, quote_delta) = match trade.side { Side::Buy => { // Base total & available increase by trade.quantity minus base trade.fees - let base_increase = trade.quantity - trade.fees.fees; + let base_increase = trade.quantity - trade.fees.calculate_total_fees(); + let base_delta = BalanceDelta { total: base_increase, available: base_increase, @@ -154,7 +155,8 @@ impl ClientBalances { }; // Quote total & available increase by (trade.quantity * price) minus quote fees - let quote_increase = (trade.quantity * trade.price) - trade.fees.fees; + let quote_increase = + (trade.quantity * trade.price) - trade.fees.calculate_total_fees(); let quote_delta = BalanceDelta { total: quote_increase, available: quote_increase, diff --git a/barter-execution-rs/src/simulated/exchange/account/order.rs b/barter-execution-rs/src/simulated/exchange/account/order.rs index 8770f41..e8f2f5c 100644 --- a/barter-execution-rs/src/simulated/exchange/account/order.rs +++ b/barter-execution-rs/src/simulated/exchange/account/order.rs @@ -4,6 +4,8 @@ use crate::{ }; use barter_data::subscription::trade::PublicTrade; use barter_integration::model::{instrument::Instrument, Side}; +use chrono::TimeZone; +use chrono::Utc; use serde::{Deserialize, Serialize}; use std::{cmp::Ordering, collections::HashMap}; @@ -213,13 +215,14 @@ impl Orders { // Generate execution Trade from the Order match Trade { + time: Utc.timestamp_opt(0, 0).unwrap(), id: self.trade_id(), order_id: order.state.id, instrument: order.instrument, side: order.side, price: order.state.price, quantity: trade_quantity, - fees, + fees: fees.into(), } } diff --git a/barter-execution-rs/tests/simulated_exchange.rs b/barter-execution-rs/tests/simulated_exchange.rs index 5729f69..28e4f92 100644 --- a/barter-execution-rs/tests/simulated_exchange.rs +++ b/barter-execution-rs/tests/simulated_exchange.rs @@ -19,6 +19,8 @@ use barter_integration::model::{ instrument::{kind::InstrumentKind, symbol::Symbol, Instrument}, Side, }; +use chrono::TimeZone; +use chrono::Utc; use tokio::sync::mpsc; use uuid::Uuid; @@ -496,13 +498,14 @@ async fn test_7_send_market_event_that_exact_full_matches_order( .. }) => { let expected = Trade { + time: Utc.timestamp_opt(0, 0).unwrap(), id: TradeId(1.to_string()), order_id: OrderId(3.to_string()), instrument: Instrument::from(("btc", "usdt", InstrumentKind::Perpetual)), side: Side::Buy, price: 200.0, quantity: 1.0, - fees: SymbolFees::new("btc", 1.0 * fees_50_percent()), + fees: SymbolFees::new("btc", 1.0 * fees_50_percent()).into(), }; assert_eq!(trade, expected); } @@ -735,13 +738,14 @@ async fn test_10_send_market_event_that_full_and_partial_matches_orders( .. }) => { let expected = Trade { + time: Utc.timestamp_opt(0, 0).unwrap(), id: TradeId(2.to_string()), order_id: OrderId(4.to_string()), instrument: Instrument::from(("btc", "usdt", InstrumentKind::Perpetual)), side: Side::Sell, price: 500.0, quantity: 1.0, - fees: SymbolFees::new("usdt", first_full_fill_fees), + fees: SymbolFees::new("usdt", first_full_fill_fees).into(), }; assert_eq!(trade, expected); } @@ -792,13 +796,14 @@ async fn test_10_send_market_event_that_full_and_partial_matches_orders( .. }) => { let expected = Trade { + time: Utc.timestamp_opt(0, 0).unwrap(), id: TradeId(3.to_string()), order_id: OrderId(5.to_string()), instrument: Instrument::from(("btc", "usdt", InstrumentKind::Perpetual)), side: Side::Sell, price: 1000.0, quantity: 0.5, - fees: SymbolFees::new("usdt", second_partial_fill_fees), + fees: SymbolFees::new("usdt", second_partial_fill_fees).into(), }; assert_eq!(trade, expected); } diff --git a/barter-integration-rs/src/de.rs b/barter-integration-rs/src/de.rs index a39739d..1338be2 100644 --- a/barter-integration-rs/src/de.rs +++ b/barter-integration-rs/src/de.rs @@ -1,3 +1,4 @@ +use serde::Deserialize; use serde_json::Value; /// Determine the `DateTime` from the provided `Duration` since the epoch. @@ -26,6 +27,22 @@ where } } +/// Custom deserializer that tries to parse a string as f64. +/// Returns None if the input is null or cannot be parsed. +pub fn de_option_f64<'de, D>(deserializer: D) -> Result, D::Error> +where + D: serde::de::Deserializer<'de>, +{ + let opt_str: Option<&str> = Option::deserialize(deserializer)?; + match opt_str { + Some(text) => match text.parse::() { + Ok(num) => Ok(Some(num)), + Err(_) => Err(serde::de::Error::custom("Failed to parse string as f64")), + }, + None => Ok(None), + } +} + /// Deserialize a `u64` milliseconds value as `DateTime`. pub fn de_u64_epoch_ms_as_datetime_utc<'de, D>( deserializer: D, diff --git a/barter-integration-rs/src/model/instrument/symbol.rs b/barter-integration-rs/src/model/instrument/symbol.rs index e0c4c97..5a4342e 100644 --- a/barter-integration-rs/src/model/instrument/symbol.rs +++ b/barter-integration-rs/src/model/instrument/symbol.rs @@ -13,6 +13,12 @@ impl Debug for Symbol { } } +impl Default for Symbol { + fn default() -> Self { + Self::new("n/a") + } +} + impl Display for Symbol { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "{}", self.0) diff --git a/barter-rs/examples/cerebrum_binance.rs b/barter-rs/examples/cerebrum_binance.rs index 4bd8a7f..54f1fa8 100644 --- a/barter-rs/examples/cerebrum_binance.rs +++ b/barter-rs/examples/cerebrum_binance.rs @@ -1,5 +1,5 @@ use barter::cerebrum::{ - account::{Account, Accounts, Position}, + account::{Account, Accounts}, event::{Command, Event, EventFeed}, exchange::ExchangePortal, exchange_client::ClientId, @@ -16,9 +16,9 @@ use barter_execution::{ balance::Balance, execution_event::ExecutionRequest, order::{Order, OrderKind, RequestCancel, RequestOpen}, + position::Position, ClientOrderId, }, - ExecutionId, }; use dotenv::dotenv; @@ -44,7 +44,7 @@ use barter_integration::{ use std::ops::Add; use std::{collections::HashMap, time::Duration}; use tokio::sync::mpsc; -use tracing::info; +// use tracing::info; struct StrategyExample { counter: usize, @@ -79,12 +79,12 @@ impl strategy::OrderGenerator for StrategyExample { return None; } - let sim_acc = accounts.get(&Exchange::from(ExecutionId::Simulated)); - let num_open_orders = sim_acc.orders_open.len(); + let account = accounts.get(&ExchangeId::BinanceFuturesUsd.into()); + let num_open_orders = account.orders_open.len(); if self.counter > num_open_orders { return None; } - println!("accounts {:#?}", sim_acc.orders_open.len()); + println!("accounts {:#?}", accounts); let order = order_request_limit( Instrument::new("eth", "usdt", InstrumentKind::Perpetual), @@ -95,7 +95,7 @@ impl strategy::OrderGenerator for StrategyExample { ); self.counter += 1; - Some(vec![(Exchange::from(ExecutionId::Binance), vec![order])]) + Some(vec![(ExchangeId::BinanceFuturesUsd.into(), vec![order])]) } } @@ -111,7 +111,7 @@ where I: Into, { Order { - exchange: Exchange::from(ExecutionId::Simulated), + exchange: ExchangeId::BinanceFuturesUsd.into(), instrument: instrument.into(), cid, side, @@ -154,7 +154,7 @@ async fn main() { // EventFeed Component: CommandFeed init_command_feed(event_tx, terminate); - let exchange: Exchange = Exchange::from(ExchangeId::BinanceFuturesUsd); + let exchange = Exchange::from(ExchangeId::BinanceFuturesUsd); // Accounts(HashMap): let accounts = init_accounts(exchange, subscriptions); @@ -246,7 +246,10 @@ async fn init_account_feed( let execution_config = BinanceConfig { client_type: BinanceApi::Futures(LiveOrTest::Test), }; - exchanges.insert(ExecutionId::Binance, ClientId::Binance(execution_config)); + exchanges.insert( + Exchange::from(ExchangeId::BinanceFuturesUsd), + ClientId::Binance(execution_config), + ); let ex_portal = ExchangePortal::init(exchanges, exchange_rx, event_tx) .await .expect("failed to init ExchangePortal"); @@ -282,20 +285,16 @@ where let mut accounts = HashMap::new(); accounts.insert(exchange, init_account(instruments.clone())); - // we need to init instruments for simulated exchange - accounts.insert( - Exchange::from(ExecutionId::Simulated), - init_account(instruments), - ); Accounts(accounts) } fn init_account(instruments: Vec) -> Account { - let positions = instruments - .iter() - .cloned() - .map(|instrument| (instrument, Position)) - .collect(); + // let positions = instruments + // .iter() + // .cloned() + // .map(|instrument| (instrument, Position)) + // .collect(); + let positions: HashMap = HashMap::new(); let balances = instruments .into_iter() @@ -306,8 +305,8 @@ fn init_account(instruments: Vec) -> Account { ( symbol, Balance { - total: 1000.0, - available: 1000.0, + total: 0.0, + available: 0.0, }, ) }) diff --git a/barter-rs/examples/cerebrum_simulation.rs b/barter-rs/examples/cerebrum_simulation.rs index c800e7d..6146e64 100644 --- a/barter-rs/examples/cerebrum_simulation.rs +++ b/barter-rs/examples/cerebrum_simulation.rs @@ -1,5 +1,5 @@ use barter::cerebrum::{ - account::{Account, Accounts, Position}, + account::{Account, Accounts}, event::{Command, Event, EventFeed}, exchange::ExchangePortal, exchange_client::ClientId, @@ -13,6 +13,7 @@ use barter_execution::{ balance::Balance, execution_event::ExecutionRequest, order::{Order, OrderKind, RequestCancel, RequestOpen}, + position::Position, ClientOrderId, }, simulated::{execution::SimulationConfig, util::run_default_exchange, SimulatedEvent}, @@ -70,8 +71,7 @@ impl strategy::OrderGenerator for StrategyExample { &mut self, accounts: &Accounts, ) -> Option>)>> { - return None; - if self.counter > 10 { + if self.counter > 1 { return None; } let sim_acc = accounts.get(&Exchange::from(ExecutionId::Simulated)); @@ -268,7 +268,10 @@ async fn init_account_feed( }, request_tx: event_simulated_tx, }; - exchanges.insert(ExecutionId::Simulated, ClientId::Simulated(sim_config)); + exchanges.insert( + Exchange::from(ExecutionId::Simulated), + ClientId::Simulated(sim_config), + ); let ex_portal = ExchangePortal::init(exchanges, exchange_rx, event_tx) .await .expect("failed to init ExchangePortal"); @@ -313,11 +316,7 @@ where } fn init_account(instruments: Vec) -> Account { - let positions = instruments - .iter() - .cloned() - .map(|instrument| (instrument, Position)) - .collect(); + let positions: HashMap = HashMap::new(); let balances = instruments .into_iter() diff --git a/barter-rs/src/cerebrum/account.rs b/barter-rs/src/cerebrum/account.rs index 7d2f61a..c562bd2 100644 --- a/barter-rs/src/cerebrum/account.rs +++ b/barter-rs/src/cerebrum/account.rs @@ -3,6 +3,7 @@ use barter_data::event::{DataKind, MarketEvent}; use barter_execution::model::{ balance::{Balance, SymbolBalance}, order::{Cancelled, InFlight, Open, Order}, + position::Position, AccountEvent, AccountEventKind, ClientOrderId, }; use barter_integration::model::{ @@ -41,6 +42,12 @@ impl Cerebrum { .for_each(|order| self.accounts.update_orders_from_open(&order)); } + AccountEventKind::Positions(positions) => { + info!(kind = "Account", exchange = ?account.exchange, payload = ?positions, "received Event"); + self.accounts + .update_positions(&account.exchange, &positions); + } + // TODO: do we need to treat OrdersNew differently to OrdersOpen? // inflight vs open? AccountEventKind::OrdersNew(orders) => { @@ -121,15 +128,21 @@ impl Accounts { } pub fn update_balances(&mut self, exchange: &Exchange, balances: &Vec) { + info!(exchange = ?exchange, "Received Balance Update"); + balances .into_iter() .for_each(|balance| self.update_balance(exchange, balance)) } - pub fn update_positions(&mut self, _market: &MarketEvent) { + pub fn update_positions_from_market_event(&mut self, _market: &MarketEvent) { // Todo: Update relevant Positions } + pub fn update_positions(&mut self, exchange: &Exchange, _positions: &Vec) { + info!(exchange = ?exchange, "Received Position Update"); + } + // Todo: refactor this if we don't use in_flight pub fn update_order_from_new(&mut self, order: &Order) { // Exchange Account associated with the Order @@ -272,6 +285,3 @@ impl Accounts { }; } } - -#[derive(Debug, Clone, Copy)] -pub struct Position; diff --git a/barter-rs/src/cerebrum/exchange.rs b/barter-rs/src/cerebrum/exchange.rs index 9bbf640..d9d3d5c 100644 --- a/barter-rs/src/cerebrum/exchange.rs +++ b/barter-rs/src/cerebrum/exchange.rs @@ -4,31 +4,35 @@ use barter_execution::error::ExecutionError; use barter_execution::model::execution_event::ExecutionRequest; use barter_execution::model::{AccountEvent, AccountEventKind}; use barter_execution::ExecutionClient; -use barter_execution::ExecutionId; use barter_integration::model::Exchange; +use futures::stream::{self, BoxStream, SelectAll}; +use futures::StreamExt; use std::collections::HashMap; use std::sync::Arc; +use tokio::select; use tokio::sync::mpsc; use tracing::{error, info}; /// Responsibilities: -/// - Determines best way to action an [`ExchangeRequest`] given the constraints of the exchange. +/// - Determines best way to action an [`ExecutionRequest`] given the constraints of the exchange. /// Responsibilities: /// - Manages every [`ExchangeClient`]. -/// - Forwards an [`ExchangeRequest`] to the appropriate [`ExchangeClient`]. +/// - Forwards the request to the appropriate [`ExchangeClient`]. /// - Map InternalClientOrderId to exchange ClientOrderId. -#[derive(Debug)] +// #[derive(Debug)] +#[allow(missing_debug_implementations)] pub struct ExchangePortal { clients: HashMap>>, request_rx: mpsc::UnboundedReceiver, event_tx: mpsc::UnboundedSender, + account_stream: SelectAll>, } impl ExchangePortal { pub async fn init( - exchanges: HashMap, + exchanges: HashMap, request_rx: mpsc::UnboundedReceiver, event_tx: mpsc::UnboundedSender, ) -> Result { @@ -47,15 +51,27 @@ impl ExchangePortal { info!("initializing ExchangePortal {:?}", exchanges); - for (execution_id, client_id) in exchanges.into_iter() { + let mut streams = Vec::new(); + + for (exchange, client_id) in exchanges.into_iter() { let client = ExchangeClient::init(client_id).await; - clients.insert(Exchange::from(execution_id), Arc::new(Box::new(client))); + let stream = client.init_stream().await; + let ex = exchange.clone(); + if let Some(stream) = stream { + // map stream to (exchange, stream) tuple + let stream = stream.map(move |item| (ex.clone(), item)).boxed(); + streams.push(stream); + } + clients.insert(exchange, Arc::new(Box::new(client))); } + let account_stream = stream::select_all(streams); + Ok(Self { clients, request_rx, event_tx, + account_stream, }) } @@ -77,91 +93,105 @@ impl ExchangePortal { /// Todo: /// - Should be run on it's own OS thread. /// - This may live in Barter... ExchangeClient impls would live here. Order would be in Barter! - /// - Just use HTTP for trading for the time being... /// - May need to run enum ExchangeEvent { request, ConnectionStatus } in order to re-spawn clients! -> state machine like Cerebrum! pub async fn run(mut self) { - while let Some(request) = self.request_rx.recv().await { - // info!(payload = ?request, "received ExchangeRequest"); - - // Action ExecutionRequest - match request { - ExecutionRequest::OpenOrders(open_requests) => { - open_requests.into_iter().for_each(|open_request| { - let exchange = open_request.0; - let orders = open_request.1; - let client = self.client(&exchange); - let tx = self.event_tx.clone(); - info!("sending OpenOrders "); - tokio::spawn(async move { - let open_orders = client.open_orders(orders).await; - let open_orders = remove_error_responses(open_orders); - let account_event = AccountEventKind::OrdersNew(open_orders); - Self::send_account_tx(tx, exchange, account_event); - }); - }); - } - ExecutionRequest::FetchOrdersOpen(exchanges) => { - exchanges.into_iter().for_each(|exchange| { - let client = self.client(&exchange); - let tx = self.event_tx.clone(); - tokio::spawn(async move { - match client.fetch_orders_open().await { - Ok(orders) => Self::send_account_tx( - tx, - exchange.clone(), - AccountEventKind::OrdersOpen(orders), - ), - Err(e) => error!(error = ?e, "failed to fetch open orders"), - }; - }); - }); - } - ExecutionRequest::FetchBalances(exchanges) => { - exchanges.into_iter().for_each(|exchange| { - let client = self.client(&exchange); - let tx = self.event_tx.clone(); - tokio::spawn(async move { - match client.fetch_balances().await { - Ok(balances) => Self::send_account_tx( - tx, - exchange, - AccountEventKind::Balances(balances), - ), - Err(e) => error!(error = ?e, "failed to fetch balances"), - }; - }); - }); - } - ExecutionRequest::CancelOrders(cancel_requests) => { - cancel_requests.into_iter().for_each(|cancel_request| { - let exchange = cancel_request.0; - let orders = cancel_request.1; - let client = self.client(&exchange); - let tx = self.event_tx.clone(); - tokio::spawn(async move { - let cancelled_orders = client.cancel_orders(orders).await; - let cancelled_orders = remove_error_responses(cancelled_orders); - let account_event = AccountEventKind::OrdersCancelled(cancelled_orders); - Self::send_account_tx(tx, exchange, account_event); - }); - }); - } - ExecutionRequest::CancelOrdersAll(exchanges) => { - exchanges.into_iter().for_each(|exchange| { - let client = self.client(&exchange); - let tx = self.event_tx.clone(); - tokio::spawn(async move { - match client.cancel_orders_all().await { - Ok(cancelled_orders) => Self::send_account_tx( - tx, - exchange, - AccountEventKind::OrdersCancelled(cancelled_orders), - ), - Err(e) => error!(error = ?e, "failed to cancel all orders"), - }; - }); - }); - } + loop { + select! { + // Recieves events from websocket streams and forwards them to + // Account Engine + Some((exchange, account_event)) = self.account_stream.next() => { + let event_tx = self.event_tx.clone(); + Self::send_account_tx(event_tx, exchange, account_event); + }, + + // Processes execution requests + Some(request) = self.request_rx.recv() => { + + // while let Some(request) = self.request_rx.recv().await { + // info!(payload = ?request, "received ExchangeRequest"); + + // Action ExecutionRequest + match request { + ExecutionRequest::OpenOrders(open_requests) => { + open_requests.into_iter().for_each(|open_request| { + let exchange = open_request.0; + let orders = open_request.1; + let client = self.client(&exchange); + let tx = self.event_tx.clone(); + info!("sending OpenOrders "); + tokio::spawn(async move { + let open_orders = client.open_orders(orders).await; + let open_orders = remove_error_responses(open_orders); + let account_event = AccountEventKind::OrdersNew(open_orders); + Self::send_account_tx(tx, exchange, account_event); + }); + }); + } + ExecutionRequest::FetchOrdersOpen(exchanges) => { + exchanges.into_iter().for_each(|exchange| { + let client = self.client(&exchange); + let tx = self.event_tx.clone(); + tokio::spawn(async move { + match client.fetch_orders_open().await { + Ok(orders) => Self::send_account_tx( + tx, + exchange.clone(), + AccountEventKind::OrdersOpen(orders), + ), + Err(e) => error!(error = ?e, "failed to fetch open orders"), + }; + }); + }); + } + ExecutionRequest::FetchBalances(exchanges) => { + exchanges.into_iter().for_each(|exchange| { + let client = self.client(&exchange); + let tx = self.event_tx.clone(); + tokio::spawn(async move { + match client.fetch_balances().await { + Ok(balances) => Self::send_account_tx( + tx, + exchange, + AccountEventKind::Balances(balances), + ), + Err(e) => error!(error = ?e, "failed to fetch balances"), + }; + }); + }); + } + ExecutionRequest::CancelOrders(cancel_requests) => { + cancel_requests.into_iter().for_each(|cancel_request| { + let exchange = cancel_request.0; + let orders = cancel_request.1; + let client = self.client(&exchange); + let tx = self.event_tx.clone(); + tokio::spawn(async move { + let cancelled_orders = client.cancel_orders(orders).await; + let cancelled_orders = remove_error_responses(cancelled_orders); + let account_event = AccountEventKind::OrdersCancelled(cancelled_orders); + Self::send_account_tx(tx, exchange, account_event); + }); + }); + } + ExecutionRequest::CancelOrdersAll(exchanges) => { + exchanges.into_iter().for_each(|exchange| { + let client = self.client(&exchange); + let tx = self.event_tx.clone(); + tokio::spawn(async move { + match client.cancel_orders_all().await { + Ok(cancelled_orders) => Self::send_account_tx( + tx, + exchange, + AccountEventKind::OrdersCancelled(cancelled_orders), + ), + Err(e) => error!(error = ?e, "failed to cancel all orders"), + }; + }); + }); + } + } + }, + else => break, } } } diff --git a/barter-rs/src/cerebrum/exchange_client.rs b/barter-rs/src/cerebrum/exchange_client.rs index d8776ff..54ebf64 100644 --- a/barter-rs/src/cerebrum/exchange_client.rs +++ b/barter-rs/src/cerebrum/exchange_client.rs @@ -5,16 +5,18 @@ use barter_execution::{ model::{ balance::SymbolBalance, order::{Cancelled, Open, Order, RequestCancel, RequestOpen}, + AccountEventKind, }, simulated::execution::{SimulatedExecution, SimulationConfig}, ExecutionClient, }; use barter_integration::model::Exchange; +use futures::stream::BoxStream; // Todo: // - Better name for this? This is the equivilant to ExchangeId... // '--> renamed to ClientId for now to avoid confusion in development -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum ClientId { Simulated(SimulationConfig), Binance(BinanceConfig), @@ -50,6 +52,13 @@ impl ExecutionClient for ExchangeClient { } } + async fn init_stream(&self) -> Option> { + match self { + ExchangeClient::Simulated(client) => client.init_stream().await, + ExchangeClient::Binance(client) => client.init_stream().await, + } + } + async fn fetch_orders_open(&self) -> Result>, ExecutionError> { match self { ExchangeClient::Simulated(client) => client.fetch_orders_open().await, diff --git a/barter-rs/src/cerebrum/market.rs b/barter-rs/src/cerebrum/market.rs index b81e273..d7bc953 100644 --- a/barter-rs/src/cerebrum/market.rs +++ b/barter-rs/src/cerebrum/market.rs @@ -1,6 +1,6 @@ use super::{order::Algorithmic, strategy::IndicatorUpdater, Cerebrum, Engine, OrderGenerator}; use barter_data::event::{DataKind, MarketEvent}; - +// use tracing::info; /// MarketUpdater can transition to: /// a) OrderGenerator @@ -15,7 +15,7 @@ where // info!(kind = "Market", exchange = ?market.exchange, instrument = %market.instrument, payload = ?market, "received Event"); // Update Positions - self.accounts.update_positions(&market); + self.accounts.update_positions_from_market_event(&market); // Update Indicators self.strategy.update_indicators(&market);