From 8a0676e225a7f44d995260cc2d3f0e2a1c723e9f Mon Sep 17 00:00:00 2001 From: Wil Boayue Date: Mon, 28 Oct 2024 12:00:08 -0700 Subject: [PATCH] Implements exercise options (#144) --- Cargo.toml | 3 +- examples/breakout.rs | 4 +- examples/executions.rs | 4 +- examples/options_exercise.rs | 39 ++ examples/options_purchase.rs | 54 +++ examples/orders.rs | 7 +- examples/place_order.rs | 16 +- examples/readme_place_order.rs | 8 +- examples/readme_realtime_data_2.rs | 7 +- src/client.rs | 133 ++++-- src/contracts.rs | 14 +- src/errors.rs | 6 + src/lib.rs | 4 +- src/market_data/realtime.rs | 21 +- src/messages.rs | 3 +- src/messages/shared_channel_configuration.rs | 14 +- src/orders.rs | 424 +++++++------------ src/orders/encoders.rs | 45 +- src/orders/tests.rs | 32 +- src/transport.rs | 60 ++- 20 files changed, 478 insertions(+), 420 deletions(-) create mode 100644 examples/options_exercise.rs create mode 100644 examples/options_purchase.rs diff --git a/Cargo.toml b/Cargo.toml index 1bb9951f..0c38d279 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,8 +21,9 @@ exclude = [ byteorder = "1.4.3" crossbeam = "0.8.2" log = "0.4.17" -time = {version = "0.3.17", features = ["formatting", "macros", "local-offset", "parsing"]} +time = {version = "0.3.17", features = ["formatting", "macros", "local-offset", "parsing", "serde"]} time-tz = "1.0.2" +serde = {version = "1.0.213" , features = ["derive"]} [dev-dependencies] anyhow = "1.0.66" diff --git a/examples/breakout.rs b/examples/breakout.rs index 0502b393..1f52b8ad 100644 --- a/examples/breakout.rs +++ b/examples/breakout.rs @@ -2,7 +2,7 @@ use std::collections::VecDeque; use ibapi::contracts::Contract; use ibapi::market_data::realtime::{Bar, BarSize, WhatToShow}; -use ibapi::orders::{order_builder, Action, OrderNotification}; +use ibapi::orders::{order_builder, Action, PlaceOrder}; use ibapi::Client; fn main() { @@ -38,7 +38,7 @@ fn main() { let notices = client.place_order(order_id, &contract, &order).unwrap(); for notice in notices { - if let OrderNotification::ExecutionData(data) = notice { + if let PlaceOrder::ExecutionData(data) = notice { println!("{} {} shares of {}", data.execution.side, data.execution.shares, data.contract.symbol); } else { println!("{:?}", notice); diff --git a/examples/executions.rs b/examples/executions.rs index 6020a13e..ab8ef806 100644 --- a/examples/executions.rs +++ b/examples/executions.rs @@ -16,8 +16,8 @@ fn main() -> anyhow::Result<()> { let client = Client::connect("127.0.0.1:4002", 100)?; - let executions = client.executions(filter)?; - for execution in executions { + let subscription = client.executions(filter)?; + for execution in &subscription { println!("{execution:?}") } diff --git a/examples/options_exercise.rs b/examples/options_exercise.rs new file mode 100644 index 00000000..10c34dbb --- /dev/null +++ b/examples/options_exercise.rs @@ -0,0 +1,39 @@ +use ibapi::{ + contracts::{Contract, SecurityType}, + orders::ExerciseAction, + Client, +}; + +fn main() { + env_logger::init(); + + let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed"); + + let contract = create_option_contract("AAPL", 180.0, "C", "20250221"); + + let accounts = client.managed_accounts().expect("could not get managed accounts"); + let account = &accounts[0]; + let manual_order_time = None; + + let subscription = client + .exercise_options(&contract, ExerciseAction::Exercise, 100, account, true, manual_order_time) + .expect("exercise options request failed!"); + + for status in &subscription { + println!("{status:?}") + } +} + +fn create_option_contract(symbol: &str, strike: f64, right: &str, last_trade_date_or_contract_month: &str) -> Contract { + Contract { + symbol: symbol.to_owned(), + security_type: SecurityType::Option, + exchange: "SMART".to_owned(), + currency: "USD".to_owned(), + last_trade_date_or_contract_month: last_trade_date_or_contract_month.to_owned(), + strike, + right: right.to_owned(), + multiplier: "100".to_owned(), + ..Default::default() + } +} diff --git a/examples/options_purchase.rs b/examples/options_purchase.rs new file mode 100644 index 00000000..ecebcbf8 --- /dev/null +++ b/examples/options_purchase.rs @@ -0,0 +1,54 @@ +use ibapi::{ + contracts::{Contract, SecurityType}, + orders::{self, order_builder, PlaceOrder}, + Client, +}; + +fn main() { + env_logger::init(); + + let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed"); + + let contract = create_option_contract("AAPL", 180.0, "C", "20250221"); + + let order_id = client.next_valid_order_id().expect("could not get next valid order id"); + // let order_id = client.next_order_id(); + println!("next order id: {order_id}"); + + let order = order_builder::market_order(orders::Action::Buy, 5.0); + println!("contract: {contract:?}, order: {order:?}"); + + let subscription = client.place_order(order_id, &contract, &order).expect("could not place order"); + for status in subscription { + println!("{status:?}") + } + let order_id = client.next_order_id(); + println!("next order id: {order_id}"); + + let order = order_builder::market_order(orders::Action::Buy, 5.0); + println!("contract: {contract:?}, order: {order:?}"); + + let subscription = client.place_order(order_id, &contract, &order).expect("could not place order"); + for status in subscription { + println!("{status:?}"); + if let PlaceOrder::OrderStatus(order_status) = status { + if order_status.remaining == 0.0 { + break; + } + } + } +} + +fn create_option_contract(symbol: &str, strike: f64, right: &str, last_trade_date_or_contract_month: &str) -> Contract { + Contract { + symbol: symbol.to_owned(), + security_type: SecurityType::Option, + exchange: "SMART".to_owned(), + currency: "USD".to_owned(), + last_trade_date_or_contract_month: last_trade_date_or_contract_month.to_owned(), + strike, + right: right.to_owned(), + multiplier: "100".to_owned(), + ..Default::default() + } +} diff --git a/examples/orders.rs b/examples/orders.rs index 10c328c8..1b911a1f 100644 --- a/examples/orders.rs +++ b/examples/orders.rs @@ -2,7 +2,8 @@ use anyhow::Ok; use clap::builder::PossibleValue; use clap::{arg, Command}; -use ibapi::orders::OrderDataResult; +use ibapi::client::Subscription; +use ibapi::orders::Orders; use ibapi::Client; fn main() -> anyhow::Result<()> { @@ -53,8 +54,8 @@ fn main() -> anyhow::Result<()> { Ok(()) } -fn print_orders(orders: impl Iterator) { - for order in orders { +fn print_orders(orders: Subscription) { + for order in &orders { println!("order: {order:?}") } } diff --git a/examples/place_order.rs b/examples/place_order.rs index 62d3aebe..2920d851 100644 --- a/examples/place_order.rs +++ b/examples/place_order.rs @@ -2,7 +2,7 @@ use clap::{arg, ArgMatches, Command}; use log::{debug, info}; use ibapi::contracts::Contract; -use ibapi::orders::{self, order_builder, OrderNotification}; +use ibapi::orders::{self, order_builder, PlaceOrder}; use ibapi::Client; fn main() { @@ -41,17 +41,17 @@ fn main() { println!("contract: {contract:?}, order: {order:?}"); - let results = client.place_order(order_id, &contract, &order).expect("could not place order"); + let subscription = client.place_order(order_id, &contract, &order).expect("could not place order"); - for status in results { + for status in subscription { match status { - OrderNotification::OrderStatus(order_status) => { + PlaceOrder::OrderStatus(order_status) => { println!("order status: {order_status:?}") } - OrderNotification::OpenOrder(open_order) => println!("open order: {open_order:?}"), - OrderNotification::ExecutionData(execution) => println!("execution: {execution:?}"), - OrderNotification::CommissionReport(report) => println!("commission report: {report:?}"), - OrderNotification::Message(message) => println!("notice: {message}"), + PlaceOrder::OpenOrder(open_order) => println!("open order: {open_order:?}"), + PlaceOrder::ExecutionData(execution) => println!("execution: {execution:?}"), + PlaceOrder::CommissionReport(report) => println!("commission report: {report:?}"), + PlaceOrder::Message(message) => println!("notice: {message}"), } } } diff --git a/examples/readme_place_order.rs b/examples/readme_place_order.rs index 5565c5ca..ad448013 100644 --- a/examples/readme_place_order.rs +++ b/examples/readme_place_order.rs @@ -1,5 +1,5 @@ use ibapi::contracts::Contract; -use ibapi::orders::{order_builder, Action, OrderNotification}; +use ibapi::orders::{order_builder, Action, PlaceOrder}; use ibapi::Client; pub fn main() { @@ -16,11 +16,11 @@ pub fn main() { let subscription = client.place_order(order_id, &contract, &order).expect("place order request failed!"); - for notice in subscription { - if let OrderNotification::ExecutionData(data) = notice { + for event in &subscription { + if let PlaceOrder::ExecutionData(data) = event { println!("{} {} shares of {}", data.execution.side, data.execution.shares, data.contract.symbol); } else { - println!("{:?}", notice); + println!("{:?}", event); } } } diff --git a/examples/readme_realtime_data_2.rs b/examples/readme_realtime_data_2.rs index 006bf5e6..c0eb55b2 100644 --- a/examples/readme_realtime_data_2.rs +++ b/examples/readme_realtime_data_2.rs @@ -17,11 +17,12 @@ fn main() { .realtime_bars(&contract_nvda, BarSize::Sec5, WhatToShow::Trades, false) .expect("realtime bars request failed!"); - while let (Some(bar_nvda), Some(bar_aapl)) = (subscription_nvda.next(), subscription_aapl.next()) { + for (bar_aapl, bar_nvda) in subscription_aapl.iter().zip(subscription_nvda.iter()) { // Process each bar here (e.g., print or use in calculations) - println!("NVDA {}, AAPL {}", bar_nvda.close, bar_aapl.close); + println!("AAPL {}, NVDA {}", bar_nvda.close, bar_aapl.close); - // when your algorithm is done, cancel subscription + // You can simply break the or explicitly cancel the subscription. + // Subscriptions are automatically canceled when they go out of scope. subscription_aapl.cancel(); subscription_nvda.cancel(); } diff --git a/src/client.rs b/src/client.rs index 6f3f31e1..accc812c 100644 --- a/src/client.rs +++ b/src/client.rs @@ -4,7 +4,7 @@ use std::sync::atomic::{AtomicBool, AtomicI32, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; -use log::{debug, error, info, warn}; +use log::{debug, error, warn}; use time::OffsetDateTime; use time_tz::Tz; @@ -16,7 +16,7 @@ use crate::market_data::realtime::{self, Bar, BarSize, DepthMarketDataDescriptio use crate::market_data::MarketDataType; use crate::messages::{IncomingMessages, OutgoingMessages}; use crate::messages::{RequestMessage, ResponseMessage}; -use crate::orders::{Order, OrderDataResult, OrderNotification}; +use crate::orders::{CancelOrder, Executions, ExerciseOptions, Order, Orders, PlaceOrder}; use crate::transport::{Connection, ConnectionMetadata, InternalSubscription, MessageBus, TcpMessageBus}; use crate::{accounts, contracts, market_data, orders}; @@ -77,7 +77,7 @@ impl Client { message_bus, client_id: connection_metadata.client_id, next_request_id: AtomicI32::new(9000), - order_id: AtomicI32::new(-1), + order_id: AtomicI32::new(1000), }; Ok(client) @@ -447,12 +447,12 @@ impl Client { /// /// let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed"); /// - /// let results = client.all_open_orders().expect("request failed"); - /// for order_data in results { + /// let subscription = client.all_open_orders().expect("request failed"); + /// for order_data in &subscription { /// println!("{order_data:?}") /// } /// ``` - pub fn all_open_orders(&self) -> Result, Error> { + pub fn all_open_orders(&self) -> Result, Error> { orders::all_open_orders(self) } @@ -468,12 +468,12 @@ impl Client { /// /// let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed"); /// - /// let results = client.auto_open_orders(false).expect("request failed"); - /// for order_data in results { + /// let subscription = client.auto_open_orders(false).expect("request failed"); + /// for order_data in &subscription { /// println!("{order_data:?}") /// } /// ``` - pub fn auto_open_orders(&self, auto_bind: bool) -> Result, Error> { + pub fn auto_open_orders(&self, auto_bind: bool) -> Result, Error> { orders::auto_open_orders(self, auto_bind) } @@ -491,12 +491,12 @@ impl Client { /// let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed"); /// /// let order_id = 15; - /// let results = client.cancel_order(order_id, "").expect("request failed"); - /// for result in results { + /// let subscription = client.cancel_order(order_id, "").expect("request failed"); + /// for result in subscription { /// println!("{result:?}"); /// } /// ``` - pub fn cancel_order(&self, order_id: i32, manual_order_cancel_time: &str) -> Result, Error> { + pub fn cancel_order(&self, order_id: i32, manual_order_cancel_time: &str) -> Result, Error> { orders::cancel_order(self, order_id, manual_order_cancel_time) } @@ -512,12 +512,12 @@ impl Client { /// /// let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed"); /// - /// let results = client.completed_orders(false).expect("request failed"); - /// for order_data in results { + /// let subscription = client.completed_orders(false).expect("request failed"); + /// for order_data in &subscription { /// println!("{order_data:?}") /// } /// ``` - pub fn completed_orders(&self, api_only: bool) -> Result, Error> { + pub fn completed_orders(&self, api_only: bool) -> Result, Error> { orders::completed_orders(self, api_only) } @@ -543,12 +543,12 @@ impl Client { /// ..ExecutionFilter::default() /// }; /// - /// let executions = client.executions(filter).expect("request failed"); - /// for execution_data in executions { + /// let subscription = client.executions(filter).expect("request failed"); + /// for execution_data in &subscription { /// println!("{execution_data:?}") /// } /// ``` - pub fn executions(&self, filter: orders::ExecutionFilter) -> Result, Error> { + pub fn executions(&self, filter: orders::ExecutionFilter) -> Result, Error> { orders::executions(self, filter) } @@ -593,12 +593,12 @@ impl Client { /// /// let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed"); /// - /// let results = client.open_orders().expect("request failed"); - /// for order_data in results { + /// let subscription = client.open_orders().expect("request failed"); + /// for order_data in &subscription { /// println!("{order_data:?}") /// } /// ``` - pub fn open_orders(&self) -> Result, Error> { + pub fn open_orders(&self) -> Result, Error> { orders::open_orders(self) } @@ -617,7 +617,7 @@ impl Client { /// ```no_run /// use ibapi::Client; /// use ibapi::contracts::Contract; - /// use ibapi::orders::{order_builder, Action, OrderNotification}; + /// use ibapi::orders::{order_builder, Action, PlaceOrder}; /// /// let client = Client::connect("127.0.0.1:4002", 100).expect("connection failed"); /// @@ -629,20 +629,44 @@ impl Client { /// /// for notification in notifications { /// match notification { - /// OrderNotification::OrderStatus(order_status) => { + /// PlaceOrder::OrderStatus(order_status) => { /// println!("order status: {order_status:?}") /// } - /// OrderNotification::OpenOrder(open_order) => println!("open order: {open_order:?}"), - /// OrderNotification::ExecutionData(execution) => println!("execution: {execution:?}"), - /// OrderNotification::CommissionReport(report) => println!("commission report: {report:?}"), - /// OrderNotification::Message(message) => println!("message: {message:?}"), + /// PlaceOrder::OpenOrder(open_order) => println!("open order: {open_order:?}"), + /// PlaceOrder::ExecutionData(execution) => println!("execution: {execution:?}"), + /// PlaceOrder::CommissionReport(report) => println!("commission report: {report:?}"), + /// PlaceOrder::Message(message) => println!("message: {message:?}"), /// } /// } /// ``` - pub fn place_order(&self, order_id: i32, contract: &Contract, order: &Order) -> Result, Error> { + pub fn place_order(&self, order_id: i32, contract: &Contract, order: &Order) -> Result, Error> { orders::place_order(self, order_id, contract, order) } + /// Exercises an options contract. + /// + /// Note: this function is affected by a TWS setting which specifies if an exercise request must be finalized. + /// + /// # Arguments + /// * `contract` - The option [Contract] to be exercised. + /// * `exercise_action` - Exercise option. ExerciseAction::Exercise or ExerciseAction::Lapse. + /// * `exercise_quantity` - Number of contracts to be exercised. + /// * `account` - Destination account. + /// * `ovrd` - Specifies whether your setting will override the system’s natural action. + /// For example, if your action is "exercise" and the option is not in-the-money, by natural action the option would not exercise. If you have override set to true the natural action would be overridden and the out-of-the money option would be exercised. + /// * `manual_order_time - Specify the time at which the options should be exercised. An empty string will assume the current time. Required TWS API 10.26 or higher. + pub fn exercise_options<'a>( + &'a self, + contract: &Contract, + exercise_action: orders::ExerciseAction, + exercise_quantity: i32, + account: &str, + ovrd: bool, + manual_order_time: Option, + ) -> Result, Error> { + orders::exercise_options(self, contract, exercise_action, exercise_quantity, account, ovrd, manual_order_time) + } + // === Historical Market Data === /// Returns the timestamp of earliest available historical data for a contract and data type. @@ -1313,24 +1337,15 @@ impl<'a, T: Subscribable> Subscription<'a, T> { } fn process_message(&self, mut message: ResponseMessage) -> Option { - if T::RESPONSE_MESSAGE_IDS.contains(&message.message_type()) { - match T::decode(self.client.server_version(), &mut message) { - Ok(val) => Some(val), - Err(err) => { - let mut error = self.error.lock().unwrap(); - *error = Some(err); - None - } + match T::decode(self.client.server_version(), &mut message) { + Ok(val) => Some(val), + Err(Error::StreamEnd) => None, + Err(err) => { + error!("error decoding message: {err}"); + let mut error = self.error.lock().unwrap(); + *error = Some(err); + None } - } else if message.message_type() == IncomingMessages::Error { - let error_message = message.peek_string(4); - error!("{error_message}"); - let mut error = self.error.lock().unwrap(); - *error = Some(Error::Simple(error_message)); - None - } else { - info!("subscription iterator unexpected message: {message:?}"); - None } } @@ -1402,6 +1417,10 @@ impl<'a, T: Subscribable> Subscription<'a, T> { SubscriptionIter { subscription: self } } + pub fn into_iter(self) -> SubscriptionOwnedIter<'a, T> { + SubscriptionOwnedIter { subscription: self } + } + pub fn try_iter(&self) -> SubscriptionTryIter { SubscriptionTryIter { subscription: self } } @@ -1423,8 +1442,7 @@ impl<'a, T: Subscribable> Drop for Subscription<'a, T> { } pub(crate) trait Subscribable { - const RESPONSE_MESSAGE_IDS: &[IncomingMessages]; - const CANCEL_MESSAGE_ID: Option = None; + const RESPONSE_MESSAGE_IDS: &[IncomingMessages] = &[]; fn decode(server_version: i32, message: &mut ResponseMessage) -> Result; fn cancel_message(_server_version: i32, _request_id: Option, _context: &ResponseContext) -> Result { @@ -1455,6 +1473,27 @@ impl<'a, T: Subscribable> IntoIterator for &'a Subscription<'a, T> { } } +pub struct SubscriptionOwnedIter<'a, T: Subscribable> { + subscription: Subscription<'a, T>, +} + +impl<'a, T: Subscribable> Iterator for SubscriptionOwnedIter<'a, T> { + type Item = T; + + fn next(&mut self) -> Option { + self.subscription.next() + } +} + +impl<'a, T: Subscribable + 'a> IntoIterator for Subscription<'a, T> { + type Item = T; + type IntoIter = SubscriptionOwnedIter<'a, T>; + + fn into_iter(self) -> Self::IntoIter { + self.into_iter() + } +} + /// Non-Blocking iterator. Returns immediately if not available. #[allow(private_bounds)] pub struct SubscriptionTryIter<'a, T: Subscribable> { diff --git a/src/contracts.rs b/src/contracts.rs index a5546cd8..76cd7fb3 100644 --- a/src/contracts.rs +++ b/src/contracts.rs @@ -3,6 +3,8 @@ use std::fmt::Debug; use std::string::ToString; use log::{error, info}; +use serde::Deserialize; +use serde::Serialize; use tick_types::TickType; use crate::client::ResponseContext; @@ -26,7 +28,7 @@ mod tests; // Models -#[derive(Clone, Debug, PartialEq, Eq, Default)] +#[derive(Clone, Debug, PartialEq, Eq, Default, Serialize, Deserialize)] /// SecurityType enumerates available security types pub enum SecurityType { /// Stock (or ETF) @@ -111,7 +113,7 @@ impl SecurityType { } } -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] /// Contract describes an instrument's definition pub struct Contract { /// The unique IB contract identifier. @@ -212,7 +214,7 @@ impl Contract { } } -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] // ComboLeg represents a leg within combo orders. pub struct ComboLeg { /// The Contract's IB's unique id. @@ -234,7 +236,7 @@ pub struct ComboLeg { pub exempt_code: i32, } -#[derive(Clone, Copy, Debug, Default)] +#[derive(Clone, Copy, Debug, Default, PartialEq, Serialize, Deserialize)] /// OpenClose specifies whether an order is an open or closing order. pub enum ComboLegOpenClose { /// 0 - Same as the parent security. This is the only option for retail customers. @@ -267,7 +269,7 @@ impl From for ComboLegOpenClose { } } -#[derive(Clone, Debug, Default, PartialEq)] +#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] /// Delta and underlying price for Delta-Neutral combo orders. /// Underlying (STK or FUT), delta and underlying price goes into this attribute. pub struct DeltaNeutralContract { @@ -372,7 +374,7 @@ pub struct ContractDetails { } /// TagValue is a convenience struct to define key-value pairs. -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Default, Serialize, Deserialize)] pub struct TagValue { pub tag: String, pub value: String, diff --git a/src/errors.rs b/src/errors.rs index f7139b38..b3beac51 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -1,5 +1,7 @@ use std::{num::ParseIntError, string::FromUtf8Error, sync::Arc}; +use crate::messages::ResponseMessage; + #[derive(Debug, Clone)] #[non_exhaustive] pub enum Error { @@ -19,6 +21,8 @@ pub enum Error { ConnectionReset, Cancelled, Shutdown, + StreamEnd, + UnexpectedResponse(ResponseMessage), } impl std::error::Error for Error {} @@ -39,6 +43,8 @@ impl std::fmt::Display for Error { Error::ConnectionReset => write!(f, "ConnectionReset"), Error::Cancelled => write!(f, "Cancelled"), Error::Shutdown => write!(f, "Shutdown"), + Error::StreamEnd => write!(f, "StreamEnd"), + Error::UnexpectedResponse(message) => write!(f, "UnexpectedResponse: {:?}", message), Error::Simple(ref err) => write!(f, "error occurred: {err}"), } diff --git a/src/lib.rs b/src/lib.rs index 52e26430..01062674 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,7 +19,7 @@ //! //! use ibapi::contracts::Contract; //! use ibapi::market_data::realtime::{BarSize, Bar, WhatToShow}; -//! use ibapi::orders::{order_builder, Action, OrderNotification}; +//! use ibapi::orders::{order_builder, Action, PlaceOrder}; //! use ibapi::Client; //! //! let client = Client::connect("127.0.0.1:4002", 100).unwrap(); @@ -52,7 +52,7 @@ //! //! let notices = client.place_order(order_id, &contract, &order).unwrap(); //! for notice in notices { -//! if let OrderNotification::ExecutionData(data) = notice { +//! if let PlaceOrder::ExecutionData(data) = notice { //! println!("{} {} shares of {}", data.execution.side, data.execution.shares, data.contract.symbol); //! } else { //! println!("{:?}", notice); diff --git a/src/market_data/realtime.rs b/src/market_data/realtime.rs index a71c2d8a..0526f923 100644 --- a/src/market_data/realtime.rs +++ b/src/market_data/realtime.rs @@ -1,4 +1,5 @@ use log::debug; +use serde::{Deserialize, Serialize}; use time::OffsetDateTime; use crate::client::{ResponseContext, Subscribable, Subscription}; @@ -17,7 +18,7 @@ mod tests; // === Models === -#[derive(Clone, Debug, Copy)] +#[derive(Clone, Debug, Copy, Serialize, Deserialize, PartialEq)] pub enum BarSize { // Sec, Sec5, @@ -33,7 +34,7 @@ pub enum BarSize { // Day, } -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize)] pub struct BidAsk { /// The spread's date and time (either as a yyyymmss hh:mm:ss formatted string or as system time according to the request). Time zone is the TWS time zone chosen on login. pub time: OffsetDateTime, @@ -62,13 +63,13 @@ impl Subscribable for BidAsk { } } -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize, PartialEq)] pub struct BidAskAttribute { pub bid_past_low: bool, pub ask_past_high: bool, } -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize, PartialEq)] pub struct MidPoint { /// The trade's date and time (either as a yyyymmss hh:mm:ss formatted string or as system time according to the request). Time zone is the TWS time zone chosen on login. pub time: OffsetDateTime, @@ -90,7 +91,7 @@ impl Subscribable for MidPoint { } /// Represents a real-time bar with OHLCV data -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct Bar { /// The timestamp of the bar in market timezone pub date: OffsetDateTime, @@ -123,7 +124,7 @@ impl Subscribable for Bar { } } -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize, PartialEq)] pub struct Trade { /// Tick type: "Last" or "AllLast" pub tick_type: String, @@ -154,7 +155,7 @@ impl Subscribable for Trade { } } -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize, PartialEq)] pub struct TradeAttribute { pub past_limit: bool, pub unreported: bool, @@ -185,14 +186,14 @@ impl ToField for WhatToShow { } } -#[derive(Debug)] +#[derive(Debug, Serialize, Deserialize, PartialEq)] pub enum MarketDepths { MarketDepth(MarketDepth), MarketDepthL2(MarketDepthL2), Notice(Notice), } -#[derive(Debug, Default)] +#[derive(Debug, Default, Serialize, Deserialize, PartialEq)] /// Returns the order book. pub struct MarketDepth { /// The order book's row being updated @@ -208,7 +209,7 @@ pub struct MarketDepth { } /// Returns the order book. -#[derive(Debug, Default)] +#[derive(Debug, Default, Serialize, Deserialize, PartialEq)] pub struct MarketDepthL2 { /// The order book's row being updated pub position: i32, diff --git a/src/messages.rs b/src/messages.rs index 1043b9a4..cab3a040 100644 --- a/src/messages.rs +++ b/src/messages.rs @@ -3,6 +3,7 @@ use std::ops::Index; use std::str::{self, FromStr}; use log::debug; +use serde::{Deserialize, Serialize}; use time::OffsetDateTime; use crate::{Error, ToField}; @@ -586,7 +587,7 @@ impl ResponseMessage { } /// An error message from the TWS API. -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] pub struct Notice { pub code: i32, pub message: String, diff --git a/src/messages/shared_channel_configuration.rs b/src/messages/shared_channel_configuration.rs index 6c41ef14..cfb05b2f 100644 --- a/src/messages/shared_channel_configuration.rs +++ b/src/messages/shared_channel_configuration.rs @@ -29,7 +29,19 @@ pub(crate) const CHANNEL_MAPPINGS: &[ChannelMapping] = &[ }, ChannelMapping { request: OutgoingMessages::RequestOpenOrders, - responses: &[IncomingMessages::OpenOrder, IncomingMessages::OpenOrderEnd], + responses: &[IncomingMessages::OpenOrder, IncomingMessages::OrderStatus, IncomingMessages::OpenOrderEnd], + }, + ChannelMapping { + request: OutgoingMessages::RequestAllOpenOrders, + responses: &[IncomingMessages::OpenOrder, IncomingMessages::OrderStatus, IncomingMessages::OpenOrderEnd], + }, + ChannelMapping { + request: OutgoingMessages::RequestAutoOpenOrders, + responses: &[IncomingMessages::OpenOrder, IncomingMessages::OrderStatus, IncomingMessages::OpenOrderEnd], + }, + ChannelMapping { + request: OutgoingMessages::RequestCompletedOrders, + responses: &[IncomingMessages::OpenOrder, IncomingMessages::OrderStatus, IncomingMessages::OpenOrderEnd], }, ChannelMapping { request: OutgoingMessages::RequestManagedAccounts, diff --git a/src/orders.rs b/src/orders.rs index 513728f5..1d7ba370 100644 --- a/src/orders.rs +++ b/src/orders.rs @@ -1,10 +1,13 @@ use std::convert::From; -use std::fmt::{self, Debug}; +use std::fmt::Debug; use log::{error, info}; +use serde::{Deserialize, Serialize}; +use time::OffsetDateTime; +use crate::client::{ResponseContext, Subscribable, Subscription}; use crate::contracts::{ComboLeg, ComboLegOpenClose, Contract, DeltaNeutralContract, SecurityType}; -use crate::messages::{IncomingMessages, OutgoingMessages}; +use crate::messages::{IncomingMessages, Notice, OutgoingMessages}; use crate::messages::{RequestMessage, ResponseMessage}; use crate::transport::InternalSubscription; use crate::Client; @@ -13,6 +16,8 @@ use crate::{server_versions, Error}; mod decoders; mod encoders; +#[cfg(test)] +mod tests; /// Make sure to test using only your paper trading account when applicable. A good way of finding out if an order type/exchange combination /// is possible is by trying to place such order manually using the TWS. @@ -24,7 +29,7 @@ pub use crate::contracts::TagValue; const COMPETE_AGAINST_BEST_OFFSET_UP_TO_MID: Option = Some(f64::INFINITY); -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] /// Order describes the order. pub struct Order { /// The API client's order id. @@ -264,7 +269,7 @@ pub struct Order { /// IB, Away, and PTA (post trade allocation). pub clearing_intent: String, /// The algorithm strategy. - /// As of API verion 9.6, the following algorithms are supported: + /// As of API version 9.6, the following algorithms are supported: /// ArrivalPx - Arrival Price /// DarkIce - Dark Ice /// PctVol - Percentage of Volume @@ -284,7 +289,7 @@ pub struct Order { /// For IBDARK orders only. pub not_held: bool, /// Advanced parameters for Smart combo routing. - /// These features are for both guaranteed and nonguaranteed combination orders routed to Smart, and are available based on combo type and order type. SmartComboRoutingParams is similar to AlgoParams in that it makes use of tag/value pairs to add parameters to combo orders. + /// These features are for both guaranteed and non-guaranteed combination orders routed to Smart, and are available based on combo type and order type. SmartComboRoutingParams is similar to AlgoParams in that it makes use of tag/value pairs to add parameters to combo orders. /// Make sure that you fully understand how Advanced Combo Routing works in TWS itself first: /// The parameters cover the following capabilities: /// @@ -329,7 +334,7 @@ pub struct Order { pub scale_table: String, /// Is used to place an order to a model. For example, "Technology" model can be used for tech stocks first created in TWS. pub model_code: String, - /// This is a regulartory attribute that applies to all US Commodity (Futures) Exchanges, provided to allow client to comply with CFTC Tag 50 Rules. + /// This is a regulatory attribute that applies to all US Commodity (Futures) Exchanges, provided to allow client to comply with CFTC Tag 50 Rules. pub ext_operator: String, /// The native cash quantity. pub cash_qty: Option, @@ -583,7 +588,7 @@ impl Order { /// For general account types, a SELL order will be able to enter a short position automatically if the order quantity is larger than your current long position. /// SSHORT is only supported for institutional account configured with Long/Short account segments or clearing with a separate account. /// SLONG is available in specially-configured institutional accounts to indicate that long position not yet delivered is being sold. -#[derive(Clone, Debug, Default, PartialEq, Eq, Copy)] +#[derive(Clone, Debug, Default, PartialEq, Eq, Copy, Serialize, Deserialize)] pub enum Action { #[default] Buy, @@ -634,7 +639,7 @@ impl Action { } } -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub enum Rule80A { Individual, Agency, @@ -700,12 +705,12 @@ pub enum AuctionStrategy { Transparent, } -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] pub struct OrderComboLeg { price: Option, } -#[derive(Copy, Clone, Debug)] +#[derive(Copy, Clone, Debug, PartialEq, Serialize, Deserialize)] pub enum OrderCondition { Price = 1, Time = 3, @@ -742,14 +747,14 @@ impl From for OrderCondition { } /// Stores Soft Dollar Tier information. -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] pub struct SoftDollarTier { pub name: String, pub value: String, pub display_name: String, } -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] pub struct OrderData { /// The order's unique id pub order_id: i32, @@ -762,7 +767,7 @@ pub struct OrderData { } /// Provides an active order's current state. -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug, Default, PartialEq, Serialize, Deserialize)] pub struct OrderState { /// The order's current status pub status: String, @@ -802,7 +807,7 @@ pub struct OrderState { /// Available for institutional clients to determine if this order is to open or close a position. /// When Action = "BUY" and OpenClose = "O" this will open a new position. /// When Action = "BUY" and OpenClose = "C" this will close and existing short position. -#[derive(Clone, Debug, PartialEq)] +#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub enum OrderOpenClose { Open, Close, @@ -934,40 +939,16 @@ pub struct ExecutionData { } #[derive(Clone, Debug)] -pub enum OrderNotification { +pub enum PlaceOrder { OrderStatus(OrderStatus), - OpenOrder(Box), - ExecutionData(Box), + OpenOrder(OrderData), + ExecutionData(ExecutionData), CommissionReport(CommissionReport), - Message(String), -} - -impl From for OrderNotification { - fn from(val: OrderStatus) -> Self { - OrderNotification::OrderStatus(val) - } -} - -impl From for OrderNotification { - fn from(val: OrderData) -> Self { - OrderNotification::OpenOrder(Box::new(val)) - } -} - -impl From for OrderNotification { - fn from(val: ExecutionData) -> Self { - OrderNotification::ExecutionData(Box::new(val)) - } -} - -impl From for OrderNotification { - fn from(val: CommissionReport) -> Self { - OrderNotification::CommissionReport(val) - } + Message(Notice), } /// Contains all relevant information on the current status of the order execution-wise (i.e. amount filled and pending, filling price, etc.). -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)] pub struct OrderStatus { /// The order's client id. pub order_id: i32, @@ -1002,88 +983,28 @@ pub struct OrderStatus { pub market_cap_price: f64, } -#[derive(Debug)] -pub struct Notice(String); - -impl fmt::Display for Notice { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", self.0) - } -} - // Submits an Order. // After the order is submitted correctly, events will be returned concerning the order's activity. // https://interactivebrokers.github.io/tws-api/order_submission.html -pub(crate) fn place_order( - client: &Client, - order_id: i32, - contract: &Contract, - order: &Order, -) -> Result, Error> { +pub(crate) fn place_order<'a>(client: &'a Client, order_id: i32, contract: &Contract, order: &Order) -> Result, Error> { verify_order(client, order, order_id)?; verify_order_contract(client, contract, order_id)?; - let message = encoders::encode_place_order(client.server_version(), order_id, contract, order)?; - - let messages = client.send_order(order_id, message)?; + let request = encoders::encode_place_order(client.server_version(), order_id, contract, order)?; + let subscription = client.send_order(order_id, request)?; - Ok(OrderNotificationIterator { - messages, - server_version: client.server_version(), - }) + Ok(Subscription::new(client, subscription, ResponseContext::default())) } -// Supports iteration over OrderNotification -pub(crate) struct OrderNotificationIterator { - server_version: i32, - messages: InternalSubscription, -} - -impl Iterator for OrderNotificationIterator { - type Item = OrderNotification; - - /// Returns the next [OrderNotification]. Waits up to x seconds for next [OrderNotification]. - fn next(&mut self) -> Option { - fn convert>(result: Result) -> Option { - match result { - Ok(val) => Some(val.into()), - Err(err) => { - info!("error: {err:?}"); - None - } - } - } - - loop { - if let Some(Ok(mut message)) = self.messages.next() { - match message.message_type() { - IncomingMessages::OpenOrder => { - let open_order = decoders::decode_open_order(self.server_version, message); - return convert(open_order); - } - IncomingMessages::OrderStatus => { - let order_status = decoders::decode_order_status(self.server_version, &mut message); - return convert(order_status); - } - IncomingMessages::ExecutionData => { - let execution_data = decoders::decode_execution_data(self.server_version, &mut message); - return convert(execution_data); - } - IncomingMessages::CommissionsReport => { - let commission_report = decoders::decode_commission_report(self.server_version, &mut message); - return convert(commission_report); - } - IncomingMessages::Error => { - let message = message.peek_string(4); - return Some(OrderNotification::Message(message)); - } - message => { - error!("unexpected message: {message:?}"); - } - } - } else { - return None; - } +impl Subscribable for PlaceOrder { + fn decode(server_version: i32, message: &mut ResponseMessage) -> Result { + match message.message_type() { + IncomingMessages::OpenOrder => Ok(PlaceOrder::OpenOrder(decoders::decode_open_order(server_version, message.clone())?)), + IncomingMessages::OrderStatus => Ok(PlaceOrder::OrderStatus(decoders::decode_order_status(server_version, message)?)), + IncomingMessages::ExecutionData => Ok(PlaceOrder::ExecutionData(decoders::decode_execution_data(server_version, message)?)), + IncomingMessages::CommissionsReport => Ok(PlaceOrder::CommissionReport(decoders::decode_commission_report(server_version, message)?)), + IncomingMessages::Error => Ok(PlaceOrder::Message(Notice::from(message))), + _ => Err(Error::UnexpectedResponse(message.clone())), } } } @@ -1295,7 +1216,7 @@ fn verify_order_contract(client: &Client, contract: &Contract, _order_id: i32) - } // Cancels an open [Order]. -pub(crate) fn cancel_order(client: &Client, order_id: i32, manual_order_cancel_time: &str) -> Result { +pub(crate) fn cancel_order<'a>(client: &'a Client, order_id: i32, manual_order_cancel_time: &str) -> Result, Error> { if !manual_order_cancel_time.is_empty() { client.check_server_version( server_versions::MANUAL_ORDER_TIME, @@ -1303,54 +1224,25 @@ pub(crate) fn cancel_order(client: &Client, order_id: i32, manual_order_cancel_t )? } - let message = encoders::encode_cancel_order(client.server_version(), order_id, manual_order_cancel_time)?; - - let messages = client.send_order(order_id, message)?; + let request = encoders::encode_cancel_order(client.server_version(), order_id, manual_order_cancel_time)?; + let subscription = client.send_order(order_id, request)?; - Ok(CancelOrderResultIterator { - messages, - server_version: client.server_version(), - }) + Ok(Subscription::new(client, subscription, ResponseContext::default())) } /// Enumerates possible results from cancelling an order. #[derive(Debug)] -pub enum CancelOrderResult { +pub enum CancelOrder { OrderStatus(OrderStatus), Notice(Notice), } -// Supports iteration over [CancelOrderResult] -pub(crate) struct CancelOrderResultIterator { - server_version: i32, - messages: InternalSubscription, -} - -impl Iterator for CancelOrderResultIterator { - type Item = CancelOrderResult; - - /// Returns the next [CancelOrderResult]. Waits up to x seconds for next [CancelOrderResult]. - fn next(&mut self) -> Option { - loop { - if let Some(Ok(mut message)) = self.messages.next() { - match message.message_type() { - IncomingMessages::OrderStatus => match decoders::decode_order_status(self.server_version, &mut message) { - Ok(val) => return Some(CancelOrderResult::OrderStatus(val)), - Err(err) => { - error!("error decoding order status: {err}"); - } - }, - IncomingMessages::Error => { - let message = message.peek_string(4); - return Some(CancelOrderResult::Notice(Notice(message))); - } - message => { - error!("unexpected messsage: {message:?}"); - } - } - } else { - return None; - } +impl Subscribable for CancelOrder { + fn decode(server_version: i32, message: &mut ResponseMessage) -> Result { + match message.message_type() { + IncomingMessages::OrderStatus => Ok(CancelOrder::OrderStatus(decoders::decode_order_status(server_version, message)?)), + IncomingMessages::Error => Ok(CancelOrder::Notice(Notice::from(message))), + _ => Err(Error::UnexpectedResponse(message.clone())), } } } @@ -1386,68 +1278,34 @@ pub(crate) fn next_valid_order_id(client: &Client) -> Result { } // Requests completed [Order]s. -pub(crate) fn completed_orders(client: &Client, api_only: bool) -> Result { +pub(crate) fn completed_orders(client: &Client, api_only: bool) -> Result, Error> { client.check_server_version(server_versions::COMPLETED_ORDERS, "It does not support completed orders requests.")?; - let message = encoders::encode_completed_orders(api_only)?; + let request = encoders::encode_completed_orders(api_only)?; + let subscription = client.send_shared_request(OutgoingMessages::RequestCompletedOrders, request)?; - let messages = client.send_shared_request(OutgoingMessages::RequestCompletedOrders, message)?; - - Ok(OrderDataIterator { - server_version: client.server_version(), - messages, - }) + Ok(Subscription::new(client, subscription, ResponseContext::default())) } /// Enumerates possible results from querying an [Order]. #[derive(Debug)] -pub enum OrderDataResult { - OrderData(Box), - OrderStatus(Box), -} - -/// Supports iteration over [OrderDataResult]. -pub(crate) struct OrderDataIterator { - server_version: i32, - messages: InternalSubscription, +#[allow(clippy::large_enum_variant)] +pub enum Orders { + OrderData(OrderData), + OrderStatus(OrderStatus), + Notice(Notice), } -impl Iterator for OrderDataIterator { - type Item = OrderDataResult; - - /// Returns the next [OrderDataResult]. Waits up to x seconds for next [OrderDataResult]. - fn next(&mut self) -> Option { - loop { - if let Some(Ok(mut message)) = self.messages.next() { - match message.message_type() { - IncomingMessages::CompletedOrder => match decoders::decode_completed_order(self.server_version, message) { - Ok(val) => return Some(OrderDataResult::OrderData(Box::new(val))), - Err(err) => { - error!("error decoding completed order: {err}"); - } - }, - IncomingMessages::OpenOrder => match decoders::decode_open_order(self.server_version, message) { - Ok(val) => return Some(OrderDataResult::OrderData(Box::new(val))), - Err(err) => { - error!("error decoding open order: {err}"); - } - }, - IncomingMessages::OrderStatus => match decoders::decode_order_status(self.server_version, &mut message) { - Ok(val) => return Some(OrderDataResult::OrderStatus(Box::new(val))), - Err(err) => { - error!("error decoding order status: {err}"); - } - }, - IncomingMessages::OpenOrderEnd | IncomingMessages::CompletedOrdersEnd => { - return None; - } - message => { - error!("order data iterator unexpected message: {message:?}"); - } - } - } else { - return None; - } +impl Subscribable for Orders { + fn decode(server_version: i32, message: &mut ResponseMessage) -> Result { + match message.message_type() { + IncomingMessages::CompletedOrder => Ok(Orders::OrderData(decoders::decode_completed_order(server_version, message.clone())?)), + IncomingMessages::CommissionsReport => Ok(Orders::OrderData(decoders::decode_open_order(server_version, message.clone())?)), + IncomingMessages::OpenOrder => Ok(Orders::OrderData(decoders::decode_open_order(server_version, message.clone())?)), + IncomingMessages::OrderStatus => Ok(Orders::OrderStatus(decoders::decode_order_status(server_version, message)?)), + IncomingMessages::OpenOrderEnd | IncomingMessages::CompletedOrdersEnd => Err(Error::StreamEnd), + IncomingMessages::Error => Ok(Orders::Notice(Notice::from(message))), + _ => Err(Error::UnexpectedResponse(message.clone())), } } } @@ -1458,41 +1316,28 @@ impl Iterator for OrderDataIterator { /// # Arguments /// * `client` - [Client] used to communicate with server. /// -pub(crate) fn open_orders(client: &Client) -> Result { - let message = encoders::encode_open_orders()?; - - let messages = client.send_shared_request(OutgoingMessages::RequestOpenOrders, message)?; +pub(crate) fn open_orders(client: &Client) -> Result, Error> { + let request = encoders::encode_open_orders()?; + let subscription = client.send_shared_request(OutgoingMessages::RequestOpenOrders, request)?; - Ok(OrderDataIterator { - server_version: client.server_version(), - messages, - }) + Ok(Subscription::new(client, subscription, ResponseContext::default())) } // Requests all *current* open orders in associated accounts at the current moment. // Open orders are returned once; this function does not initiate a subscription. -pub(crate) fn all_open_orders(client: &Client) -> Result { - let message = encoders::encode_all_open_orders()?; +pub(crate) fn all_open_orders(client: &Client) -> Result, Error> { + let request = encoders::encode_all_open_orders()?; + let subscription = client.send_shared_request(OutgoingMessages::RequestAllOpenOrders, request)?; - let messages = client.send_shared_request(OutgoingMessages::RequestAllOpenOrders, message)?; - - Ok(OrderDataIterator { - server_version: client.server_version(), - messages, - }) + Ok(Subscription::new(client, subscription, ResponseContext::default())) } // Requests status updates about future orders placed from TWS. Can only be used with client ID 0. -pub(crate) fn auto_open_orders(client: &Client, auto_bind: bool) -> Result { - let message = encoders::encode_auto_open_orders(auto_bind)?; - - // TODO this should probably not timeout. - let messages = client.send_shared_request(OutgoingMessages::RequestAutoOpenOrders, message)?; +pub(crate) fn auto_open_orders(client: &Client, auto_bind: bool) -> Result, Error> { + let request = encoders::encode_auto_open_orders(auto_bind)?; + let subscription = client.send_shared_request(OutgoingMessages::RequestAutoOpenOrders, request)?; - Ok(OrderDataIterator { - server_version: client.server_version(), - messages, - }) + Ok(Subscription::new(client, subscription, ResponseContext::default())) } #[derive(Debug, Default)] @@ -1523,64 +1368,83 @@ pub struct ExecutionFilter { // // # Arguments // * `filter` - filter criteria used to determine which execution reports are returned -pub(crate) fn executions(client: &Client, filter: ExecutionFilter) -> Result { +pub(crate) fn executions(client: &Client, filter: ExecutionFilter) -> Result, Error> { let request_id = client.next_request_id(); - let message = encoders::encode_executions(client.server_version(), request_id, &filter)?; - let messages = client.send_request(request_id, message)?; + let request = encoders::encode_executions(client.server_version(), request_id, &filter)?; + let subscription = client.send_request(request_id, request)?; - Ok(ExecutionDataIterator { - server_version: client.server_version(), - messages, - }) + Ok(Subscription::new(client, subscription, ResponseContext::default())) } /// Enumerates possible results from querying an [Execution]. #[derive(Debug)] -pub enum ExecutionDataResult { - ExecutionData(Box), - CommissionReport(Box), +#[allow(clippy::large_enum_variant)] +pub enum Executions { + ExecutionData(ExecutionData), + CommissionReport(CommissionReport), + Notice(Notice), } -/// Supports iteration over [ExecutionDataResult]. -pub(crate) struct ExecutionDataIterator { - server_version: i32, - messages: InternalSubscription, +impl Subscribable for Executions { + fn decode(server_version: i32, message: &mut ResponseMessage) -> Result { + match message.message_type() { + IncomingMessages::ExecutionData => Ok(Executions::ExecutionData(decoders::decode_execution_data(server_version, message)?)), + IncomingMessages::CommissionsReport => Ok(Executions::CommissionReport(decoders::decode_commission_report(server_version, message)?)), + IncomingMessages::ExecutionDataEnd => Err(Error::StreamEnd), + IncomingMessages::Error => Ok(Executions::Notice(Notice::from(message))), + _ => Err(Error::UnexpectedResponse(message.clone())), + } + } +} + +#[derive(Debug)] +pub enum ExerciseAction { + Exercise = 1, + Lapse = 2, +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +#[allow(clippy::large_enum_variant)] +pub enum ExerciseOptions { + OpenOrder(OrderData), + OrderStatus(OrderStatus), + Notice(Notice), } -impl Iterator for ExecutionDataIterator { - type Item = ExecutionDataResult; - - /// Returns the next [OrderDataResult]. Waits up to x seconds for next [OrderDataResult]. - fn next(&mut self) -> Option { - loop { - if let Some(Ok(mut message)) = self.messages.next() { - match message.message_type() { - IncomingMessages::ExecutionData => match decoders::decode_execution_data(self.server_version, &mut message) { - Ok(val) => return Some(ExecutionDataResult::ExecutionData(Box::new(val))), - Err(err) => { - error!("error decoding execution data: {err}"); - } - }, - IncomingMessages::CommissionsReport => match decoders::decode_commission_report(self.server_version, &mut message) { - Ok(val) => return Some(ExecutionDataResult::CommissionReport(Box::new(val))), - Err(err) => { - error!("error decoding commission report: {err}"); - } - }, - IncomingMessages::ExecutionDataEnd => { - return None; - } - message => { - error!("order data iterator unexpected messsage: {message:?}"); - } - } - } else { - return None; - } +impl Subscribable for ExerciseOptions { + fn decode(server_version: i32, message: &mut ResponseMessage) -> Result { + match message.message_type() { + IncomingMessages::OpenOrder => Ok(ExerciseOptions::OpenOrder(decoders::decode_open_order(server_version, message.clone())?)), + IncomingMessages::OrderStatus => Ok(ExerciseOptions::OrderStatus(decoders::decode_order_status(server_version, message)?)), + IncomingMessages::Error => Ok(ExerciseOptions::Notice(Notice::from(message))), + _ => Err(Error::UnexpectedResponse(message.clone())), } } } -#[cfg(test)] -mod tests; +pub(crate) fn exercise_options<'a>( + client: &'a Client, + contract: &Contract, + exercise_action: ExerciseAction, + exercise_quantity: i32, + account: &str, + ovrd: bool, + manual_order_time: Option, +) -> Result, Error> { + let request_id = client.next_request_id(); + + let request = encoders::encode_exercise_options( + client.server_version(), + request_id, + contract, + exercise_action, + exercise_quantity, + account, + ovrd, + manual_order_time, + )?; + let subscription = client.send_request(request_id, request)?; + + Ok(Subscription::new(client, subscription, ResponseContext::default())) +} diff --git a/src/orders/encoders.rs b/src/orders/encoders.rs index a347a0a9..63d41718 100644 --- a/src/orders/encoders.rs +++ b/src/orders/encoders.rs @@ -2,6 +2,9 @@ use crate::Error; use super::*; +#[cfg(test)] +mod tests; + pub(crate) fn encode_place_order(server_version: i32, order_id: i32, contract: &Contract, order: &Order) -> Result { let mut message = RequestMessage::default(); let message_version = message_version_for(server_version); @@ -506,5 +509,43 @@ fn message_version_for(server_version: i32) -> i32 { } } -#[cfg(test)] -mod tests; +#[allow(clippy::too_many_arguments)] +pub(crate) fn encode_exercise_options( + server_version: i32, + request_id: i32, + contract: &Contract, + exercise_action: ExerciseAction, + exercise_quantity: i32, + account: &str, + ovrd: bool, + manual_order_time: Option, +) -> Result { + const VERSION: i32 = 2; + + let mut message = RequestMessage::default(); + + message.push_field(&OutgoingMessages::ExerciseOptions); + message.push_field(&VERSION); + message.push_field(&request_id); + message.push_field(&contract.contract_id); + message.push_field(&contract.symbol); + message.push_field(&contract.security_type); + message.push_field(&contract.last_trade_date_or_contract_month); + message.push_field(&contract.strike); + message.push_field(&contract.right); + message.push_field(&contract.multiplier); + message.push_field(&contract.exchange); + message.push_field(&contract.currency); + message.push_field(&contract.local_symbol); + message.push_field(&contract.trading_class); + message.push_field(&(exercise_action as i32)); + message.push_field(&exercise_quantity); + message.push_field(&account); + message.push_field(&ovrd); + + if server_version >= server_versions::MANUAL_ORDER_TIME { + message.push_field(&manual_order_time); + } + + Ok(message) +} diff --git a/src/orders/tests.rs b/src/orders/tests.rs index 9b08fa10..f4399cd9 100644 --- a/src/orders/tests.rs +++ b/src/orders/tests.rs @@ -46,7 +46,7 @@ fn place_order() { let mut notifications = result.unwrap(); - if let Some(OrderNotification::OpenOrder(open_order)) = notifications.next() { + if let Some(PlaceOrder::OpenOrder(open_order)) = notifications.next() { assert_eq!(open_order.order_id, 13, "open_order.order_id"); let contract = &open_order.contract; @@ -192,7 +192,7 @@ fn place_order() { assert!(false, "message[0] expected an open order notification"); } - if let Some(OrderNotification::OrderStatus(order_status)) = notifications.next() { + if let Some(PlaceOrder::OrderStatus(order_status)) = notifications.next() { assert_eq!(order_status.order_id, 13, "order_status.order_id"); assert_eq!(order_status.status, "PreSubmitted", "order_status.status"); assert_eq!(order_status.filled, 0.0, "order_status.filled"); @@ -208,7 +208,7 @@ fn place_order() { assert!(false, "message[1] expected order status notification"); } - if let Some(OrderNotification::ExecutionData(execution_data)) = notifications.next() { + if let Some(PlaceOrder::ExecutionData(execution_data)) = notifications.next() { let contract = execution_data.contract; let execution = execution_data.execution; @@ -249,7 +249,7 @@ fn place_order() { assert!(false, "message[2] expected execution notification"); } - if let Some(OrderNotification::OpenOrder(open_order)) = notifications.next() { + if let Some(PlaceOrder::OpenOrder(open_order)) = notifications.next() { let order_state = &open_order.order_state; assert_eq!(open_order.order_id, 13, "open_order.order_id"); @@ -258,7 +258,7 @@ fn place_order() { assert!(false, "message[3] expected an open order notification"); } - if let Some(OrderNotification::OrderStatus(order_status)) = notifications.next() { + if let Some(PlaceOrder::OrderStatus(order_status)) = notifications.next() { assert_eq!(order_status.order_id, 13, "order_status.order_id"); assert_eq!(order_status.status, "Filled", "order_status.status"); assert_eq!(order_status.filled, 100.0, "order_status.filled"); @@ -269,7 +269,7 @@ fn place_order() { assert!(false, "message[4] expected order status notification"); } - if let Some(OrderNotification::OpenOrder(open_order)) = notifications.next() { + if let Some(PlaceOrder::OpenOrder(open_order)) = notifications.next() { let order_state = &open_order.order_state; assert_eq!(open_order.order_id, 13, "open_order.order_id"); @@ -282,7 +282,7 @@ fn place_order() { assert!(false, "message[5] expected an open order notification"); } - if let Some(OrderNotification::CommissionReport(report)) = notifications.next() { + if let Some(PlaceOrder::CommissionReport(report)) = notifications.next() { assert_eq!(report.execution_id, "00025b46.63f8f39c.01.01", "report.execution_id"); assert_eq!(report.commission, 1.0, "report.commission"); assert_eq!(report.currency, "USD", "report.currency"); @@ -317,7 +317,7 @@ fn cancel_order() { let mut results = results.unwrap(); - if let Some(CancelOrderResult::OrderStatus(order_status)) = results.next() { + if let Some(CancelOrder::OrderStatus(order_status)) = results.next() { assert_eq!(order_status.order_id, 41, "order_status.order_id"); assert_eq!(order_status.status, "Cancelled", "order_status.status"); assert_eq!(order_status.filled, 0.0, "order_status.filled"); @@ -331,8 +331,8 @@ fn cancel_order() { assert_eq!(order_status.market_cap_price, 0.0, "order_status.market_cap_price"); } - if let Some(CancelOrderResult::Notice(Notice(message))) = results.next() { - assert_eq!(message, "Order Canceled - reason:", "order status notice"); + if let Some(CancelOrder::Notice(notice)) = results.next() { + assert_eq!(notice.message, "Order Canceled - reason:", "order status notice"); } } @@ -382,10 +382,10 @@ fn completed_orders() { ], }); - let mut client = Client::stubbed(message_bus, server_versions::SIZE_RULES); + let client = Client::stubbed(message_bus, server_versions::SIZE_RULES); let api_only = true; - let results = super::completed_orders(&mut client, api_only); + let results = super::completed_orders(&client, api_only); let request_messages = client.message_bus.request_messages(); @@ -393,8 +393,8 @@ fn completed_orders() { assert!(results.is_ok(), "failed to request completed orders: {}", results.err().unwrap()); - let mut results = results.unwrap(); - if let Some(OrderDataResult::OrderData(order_data)) = results.next() { + let results = results.unwrap(); + if let Some(Orders::OrderData(order_data)) = results.next() { assert_eq!(order_data.order_id, -1, "open_order.order_id"); let contract = &order_data.contract; @@ -515,9 +515,9 @@ fn open_orders() { response_messages: vec!["9|1|43||".to_owned()], }); - let mut client = Client::stubbed(message_bus, server_versions::SIZE_RULES); + let client = Client::stubbed(message_bus, server_versions::SIZE_RULES); - let results = super::open_orders(&mut client); + let results = super::open_orders(&client); let request_messages = client.message_bus.request_messages(); diff --git a/src/transport.rs b/src/transport.rs index 841aec85..a461d2c7 100644 --- a/src/transport.rs +++ b/src/transport.rs @@ -59,7 +59,7 @@ pub(crate) trait MessageBus: Send + Sync { #[derive(Debug)] struct SharedChannels { // Maps an inbound reply to channel used to send responses. - senders: HashMap>>>, + senders: HashMap>>>>, // Maps an outbound request to channel used to receive responses. receivers: HashMap>>>, } @@ -89,12 +89,15 @@ impl SharedChannels { let sender = &Arc::new(sender); for inbound in inbounds { - self.senders.insert(*inbound, Arc::clone(sender)); + if !self.senders.contains_key(inbound) { + self.senders.insert(*inbound, Vec::new()); + } + self.senders.get_mut(inbound).unwrap().push(Arc::clone(sender)); } } // Get receiver for specified message type. Panics if receiver not found. - pub fn get_receiver(&self, message_type: OutgoingMessages) -> Arc>> { + fn get_receiver(&self, message_type: OutgoingMessages) -> Arc>> { let receiver = self .receivers .get(&message_type) @@ -103,24 +106,29 @@ impl SharedChannels { Arc::clone(receiver) } - // Get sender for specified message type. Panics if sender not found. - pub fn get_sender(&self, message_type: IncomingMessages) -> Arc>> { - let sender = self - .senders - .get(&message_type) - .unwrap_or_else(|| panic!("unsupported response message {message_type:?}")); - - Arc::clone(sender) - } - fn contains_sender(&self, message_type: IncomingMessages) -> bool { self.senders.contains_key(&message_type) } + // Notify all listeners of a given message type with message. + fn send_message(&self, message_type: IncomingMessages, message: &ResponseMessage) { + if let Some(senders) = self.senders.get(&message_type) { + for sender in senders { + if let Err(e) = sender.send(Ok(message.clone())) { + warn!("error sending message: {e}"); + } + } + } + } + // Notify all senders with a given message fn notify_all(&self, message: &Result) { - for sender in self.senders.values() { - let _ = sender.send(message.clone()); + for senders in self.senders.values() { + for sender in senders { + if let Err(e) = sender.send(message.clone()) { + warn!("error sending notification: {e}"); + } + } } } } @@ -279,7 +287,7 @@ impl TcpMessageBus { } else if self.orders.contains(&request_id) { self.orders.send(&request_id, Ok(message)).unwrap(); } else if self.shared_channels.contains_sender(message.message_type()) { - self.shared_channels.get_sender(message.message_type()).send(Ok(message)).unwrap() + self.shared_channels.send_message(message.message_type(), &message); } else { info!("no recipient found for: {:?}", message) } @@ -335,25 +343,13 @@ impl TcpMessageBus { if let Err(e) = self.orders.send(&order_id, Ok(message)) { error!("error routing message for order_id({order_id}): {e}"); } - } else if let Err(e) = self.shared_channels.get_sender(IncomingMessages::OpenOrder).send(Ok(message)) { - error!("error sending IncomingMessages::OpenOrder: {e}"); + } else if self.shared_channels.contains_sender(IncomingMessages::OpenOrder) { + self.shared_channels.send_message(message.message_type(), &message); } } } - IncomingMessages::CompletedOrder => { - if let Err(e) = self.shared_channels.get_sender(message.message_type()).send(Ok(message)) { - error!("error sending IncomingMessages::CompletedOrder: {e}"); - } - } - IncomingMessages::OpenOrderEnd => { - if let Err(e) = self.shared_channels.get_sender(message.message_type()).send(Ok(message)) { - error!("error sending IncomingMessages::OpenOrderEnd: {e}"); - } - } - IncomingMessages::CompletedOrdersEnd => { - if let Err(e) = self.shared_channels.get_sender(message.message_type()).send(Ok(message)) { - error!("error sending IncomingMessages::CompletedOrdersEnd: {e}"); - } + IncomingMessages::CompletedOrder | IncomingMessages::OpenOrderEnd | IncomingMessages::CompletedOrdersEnd => { + self.shared_channels.send_message(message.message_type(), &message); } IncomingMessages::CommissionsReport => { if let Some(execution_id) = message.execution_id() {