Skip to content

Commit

Permalink
added response enumeration
Browse files Browse the repository at this point in the history
  • Loading branch information
Wil Boayue committed Oct 14, 2024
1 parent 3fcb044 commit 635752f
Show file tree
Hide file tree
Showing 8 changed files with 100 additions and 71 deletions.
14 changes: 11 additions & 3 deletions src/accounts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use crate::client::{SharesChannel, Subscribable, Subscription};
use crate::contracts::Contract;
use crate::messages::{IncomingMessages, OutgoingMessages, RequestMessage, ResponseMessage};
use crate::transport::Response;
use crate::{server_versions, Client, Error};

mod decoders;
Expand Down Expand Up @@ -355,7 +356,8 @@ pub(crate) fn family_codes(client: &Client) -> Result<Vec<FamilyCode>, Error> {
let request = encoders::encode_request_family_codes()?;
let subscription = client.send_shared_request(OutgoingMessages::RequestFamilyCodes, request)?;

if let Some(mut message) = subscription.next() {
// TODO: enumerate
if let Some(Response::Message(mut message)) = subscription.next() {
decoders::decode_family_codes(&mut message)
} else {
Ok(Vec::default())
Expand Down Expand Up @@ -415,13 +417,19 @@ pub fn managed_accounts(client: &Client) -> Result<Vec<String>, Error> {
let subscription = client.send_shared_request(OutgoingMessages::RequestManagedAccounts, request)?;

match subscription.next() {
Some(mut message) => {
Some(Response::Message(mut message)) => {
message.skip(); // message type
message.skip(); // message version

let accounts = message.next_string()?;
Ok(accounts.split(",").map(String::from).collect())
}
},
Some(Response::Cancelled) => {
Err(Error::Simple(format!("request cancelled")))
},
Some(Response::Disconnected) => {
Err(Error::Simple(format!("server disconnected")))
},
None => Ok(Vec::default()),
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::market_data::realtime::{self, Bar, BarSize, MidPoint, WhatToShow};
use crate::messages::{IncomingMessages, OutgoingMessages};
use crate::messages::{RequestMessage, ResponseMessage};
use crate::orders::{Order, OrderDataResult, OrderNotification};
use crate::transport::{AccountInfo, Connection, InternalSubscription, MessageBus, TcpMessageBus};
use crate::transport::{AccountInfo, Connection, InternalSubscription, MessageBus, Response, TcpMessageBus};
use crate::{accounts, contracts, orders};

// Client
Expand Down Expand Up @@ -1017,7 +1017,7 @@ impl<'a, T: Subscribable<T>> Subscription<'a, T> {
/// Blocks until the item become available.
pub fn next(&self) -> Option<T> {
loop {
if let Some(mut message) = self.subscription.next() {
if let Some(Response::Message(mut message)) = self.subscription.next() {
if T::RESPONSE_MESSAGE_IDS.contains(&message.message_type()) {
match T::decode(self.client.server_version(), &mut message) {
Ok(val) => return Some(val),
Expand Down Expand Up @@ -1050,7 +1050,7 @@ impl<'a, T: Subscribable<T>> Subscription<'a, T> {
/// //}
/// ```
pub fn try_next(&self) -> Option<T> {
if let Some(mut message) = self.subscription.try_next() {
if let Some(Response::Message(mut message)) = self.subscription.try_next() {
if message.message_type() == IncomingMessages::Error {
error!("{}", message.peek_string(4));
return None;
Expand Down Expand Up @@ -1080,7 +1080,7 @@ impl<'a, T: Subscribable<T>> Subscription<'a, T> {
/// //}
/// ```
pub fn next_timeout(&self, timeout: Duration) -> Option<T> {
if let Some(mut message) = self.subscription.next_timeout(timeout) {
if let Some(Response::Message(mut message)) = self.subscription.next_timeout(timeout) {
if message.message_type() == IncomingMessages::Error {
error!("{}", message.peek_string(4));
return None;
Expand Down
9 changes: 6 additions & 3 deletions src/contracts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::encode_option_field;
use crate::messages::IncomingMessages;
use crate::messages::OutgoingMessages;
use crate::messages::RequestMessage;
use crate::transport::Response;
use crate::Client;
use crate::{server_versions, Error, ToField};

Expand Down Expand Up @@ -403,7 +404,7 @@ pub(crate) fn contract_details(client: &Client, contract: &Contract) -> Result<V
let mut contract_details: Vec<ContractDetails> = Vec::default();

// TODO create iterator
while let Some(mut message) = responses.next() {
while let Some(Response::Message(mut message)) = responses.next() {
match message.message_type() {
IncomingMessages::ContractData => {
let decoded = decoders::contract_details(client.server_version(), &mut message)?;
Expand Down Expand Up @@ -476,7 +477,7 @@ pub(crate) fn matching_symbols(client: &Client, pattern: &str) -> Result<Vec<Con
let request = encoders::request_matching_symbols(request_id, pattern)?;
let subscription = client.send_request(request_id, request)?;

if let Some(mut message) = subscription.next() {
if let Some(Response::Message(mut message)) = subscription.next() {
match message.message_type() {
IncomingMessages::SymbolSamples => {
return decoders::contract_descriptions(client.server_version(), &mut message);
Expand Down Expand Up @@ -519,7 +520,9 @@ pub(crate) fn market_rule(client: &Client, market_rule_id: i32) -> Result<Market
let subscription = client.send_shared_request(OutgoingMessages::RequestMarketRule, request)?;

match subscription.next() {
Some(mut message) => Ok(decoders::market_rule(&mut message)?),
Some(Response::Message(mut message)) => Ok(decoders::market_rule(&mut message)?),
Some(Response::Cancelled) => Err(Error::Simple("subscription cancelled".into())),
Some(Response::Disconnected) => Err(Error::Simple("server gone".into())),
None => Err(Error::Simple("no market rule found".into())),
}
}
13 changes: 7 additions & 6 deletions src/market_data/historical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use time::{Date, OffsetDateTime};

use crate::contracts::Contract;
use crate::messages::{IncomingMessages, RequestMessage, ResponseMessage};
use crate::transport::InternalSubscription;
use crate::transport::{InternalSubscription, Response};
use crate::{server_versions, Client, Error, ToField};

mod decoders;
Expand Down Expand Up @@ -304,7 +304,7 @@ pub(crate) fn head_timestamp(client: &Client, contract: &Contract, what_to_show:

let subscription = client.send_request(request_id, request)?;

if let Some(mut message) = subscription.next() {
if let Some(Response::Message(mut message)) = subscription.next() {
decoders::decode_head_timestamp(&mut message)
} else {
Err(Error::Simple("did not receive head timestamp message".into()))
Expand Down Expand Up @@ -359,7 +359,7 @@ pub(crate) fn historical_data(

let subscription = client.send_request(request_id, request)?;

if let Some(mut message) = subscription.next() {
if let Some(Response::Message(mut message)) = subscription.next() {
let time_zone = if let Some(tz) = client.time_zone {
tz
} else {
Expand Down Expand Up @@ -410,7 +410,7 @@ pub(crate) fn historical_schedule(

let subscription = client.send_request(request_id, request)?;

if let Some(mut message) = subscription.next() {
if let Some(Response::Message(mut message)) = subscription.next() {
match message.message_type() {
IncomingMessages::HistoricalSchedule => decoders::decode_historical_schedule(&mut message),
IncomingMessages::Error => Err(Error::Simple(message.peek_string(4))),
Expand Down Expand Up @@ -547,7 +547,7 @@ impl<T: TickDecoder<T> + Debug> Iterator for TickIterator<T> {

loop {
match self.messages.next() {
Some(mut message) => {
Some(Response::Message(mut message)) => {
if message.message_type() == Self::Item::message_type() {
let (ticks, done) = Self::Item::decode(&mut message).unwrap();

Expand All @@ -568,7 +568,8 @@ impl<T: TickDecoder<T> + Debug> Iterator for TickIterator<T> {
error!("unexpected message: {:?}", message)
}
}
None => return None,
// TODO enumerate
_ => return None,
}
}
}
Expand Down
12 changes: 7 additions & 5 deletions src/market_data/realtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::contracts::Contract;
use crate::messages::{IncomingMessages, RequestMessage, ResponseMessage};
use crate::orders::TagValue;
use crate::server_versions;
use crate::transport::InternalSubscription;
use crate::transport::{InternalSubscription, Response};
use crate::ToField;
use crate::{Client, Error};

Expand Down Expand Up @@ -320,14 +320,15 @@ impl<'a> Iterator for TradeIterator<'a> {
fn next(&mut self) -> Option<Self::Item> {
loop {
match self.responses.next() {
Some(mut message) => match message.message_type() {
Some(Response::Message(mut message)) => match message.message_type() {
IncomingMessages::TickByTick => match decoders::decode_trade_tick(&mut message) {
Ok(tick) => return Some(tick),
Err(e) => error!("unexpected message {message:?}: {e:?}"),
},
_ => error!("unexpected message {message:?}"),
},
None => return None,
// TODO enumerate
_ => return None,
}
}
}
Expand Down Expand Up @@ -362,14 +363,15 @@ impl<'a> Iterator for BidAskIterator<'a> {
fn next(&mut self) -> Option<Self::Item> {
loop {
match self.responses.next() {
Some(mut message) => match message.message_type() {
Some(Response::Message(mut message)) => match message.message_type() {
IncomingMessages::TickByTick => match decoders::bid_ask_tick(&mut message) {
Ok(tick) => return Some(tick),
Err(e) => error!("unexpected message {message:?}: {e:?}"),
},
_ => error!("unexpected message {message:?}"),
},
None => return None,
// TODO enumerate
_ => return None,
}
}
}
Expand Down
12 changes: 6 additions & 6 deletions src/orders.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use log::{error, info};
use crate::contracts::{ComboLeg, ComboLegOpenClose, Contract, DeltaNeutralContract, SecurityType};
use crate::messages::{IncomingMessages, OutgoingMessages};
use crate::messages::{RequestMessage, ResponseMessage};
use crate::transport::InternalSubscription;
use crate::transport::{InternalSubscription, Response};
use crate::Client;
use crate::{encode_option_field, ToField};
use crate::{server_versions, Error};
Expand Down Expand Up @@ -1055,7 +1055,7 @@ impl Iterator for OrderNotificationIterator {
}

loop {
if let Some(mut message) = self.messages.next() {
if let Some(Response::Message(mut message)) = self.messages.next() {
match message.message_type() {
IncomingMessages::OpenOrder => {
let open_order = decoders::decode_open_order(self.server_version, message);
Expand Down Expand Up @@ -1332,7 +1332,7 @@ impl Iterator for CancelOrderResultIterator {
/// Returns the next [CancelOrderResult]. Waits up to x seconds for next [CancelOrderResult].
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(mut message) = self.messages.next() {
if let Some(Response::Message(mut message)) = self.messages.next() {
match message.message_type() {
IncomingMessages::OrderStatus => match decoders::decode_order_status(self.server_version, &mut message) {
Ok(val) => return Some(CancelOrderResult::OrderStatus(val)),
Expand Down Expand Up @@ -1373,7 +1373,7 @@ pub(crate) fn next_valid_order_id(client: &Client) -> Result<i32, Error> {

let subscription = client.send_shared_request(OutgoingMessages::RequestIds, message)?;

if let Some(message) = subscription.next() {
if let Some(Response::Message(message)) = subscription.next() {
let order_id_index = 2;
let next_order_id = message.peek_int(order_id_index)?;

Expand Down Expand Up @@ -1418,7 +1418,7 @@ impl Iterator for OrderDataIterator {
/// Returns the next [OrderDataResult]. Waits up to x seconds for next [OrderDataResult].
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(mut message) = self.messages.next() {
if let Some(Response::Message(mut message)) = self.messages.next() {
match message.message_type() {
IncomingMessages::CompletedOrder => match decoders::decode_completed_order(self.server_version, message) {
Ok(val) => return Some(OrderDataResult::OrderData(Box::new(val))),
Expand Down Expand Up @@ -1554,7 +1554,7 @@ impl Iterator for ExecutionDataIterator {
/// Returns the next [OrderDataResult]. Waits up to x seconds for next [OrderDataResult].
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(mut message) = self.messages.next() {
if let Some(Response::Message(mut message)) = self.messages.next() {
match message.message_type() {
IncomingMessages::ExecutionData => match decoders::decode_execution_data(self.server_version, &mut message) {
Ok(val) => return Some(ExecutionDataResult::ExecutionData(Box::new(val))),
Expand Down
8 changes: 5 additions & 3 deletions src/stubs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::{Arc, RwLock};
use crossbeam::channel;

use crate::messages::{OutgoingMessages, RequestMessage, ResponseMessage};
use crate::transport::{InternalSubscription, MessageBus, SubscriptionBuilder};
use crate::transport::{InternalSubscription, MessageBus, Response, SubscriptionBuilder};
use crate::Error;

pub(crate) struct MessageBusStub {
Expand Down Expand Up @@ -63,7 +63,8 @@ fn mock_request(
let (s1, _r1) = channel::unbounded();

for message in &stub.response_messages {
sender.send(ResponseMessage::from(&message.replace('|', "\0"))).unwrap();
let message = ResponseMessage::from(&message.replace('|', "\0"));
sender.send(Response::from(message)).unwrap();
}

let mut subscription = SubscriptionBuilder::new().shared_receiver(Arc::new(receiver)).signaler(s1);
Expand All @@ -83,7 +84,8 @@ fn mock_global_request(stub: &mut MessageBusStub, message: &RequestMessage) -> R
let (sender, receiver) = channel::unbounded();

for message in &stub.response_messages {
sender.send(ResponseMessage::from(&message.replace('|', "\0"))).unwrap();
let message = ResponseMessage::from(&message.replace('|', "\0"));
sender.send(Response::from(message)).unwrap();
}

let subscription = SubscriptionBuilder::new().shared_receiver(Arc::new(receiver)).build();
Expand Down
Loading

0 comments on commit 635752f

Please sign in to comment.