Skip to content

Commit

Permalink
refactor subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
Wil Boayue committed Oct 6, 2024
1 parent 380a64c commit ef4464c
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 65 deletions.
2 changes: 2 additions & 0 deletions examples/pnl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ fn main() {
// Consume items blocking for next
while let Some(pnl) = subscription.next() {
println!("PnL: {:?}", pnl);

// After processing items subscription could be cancelled.
subscription.cancel();
}
}
2 changes: 2 additions & 0 deletions examples/pnl_single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ fn main() {
// Consume items blocking for next
while let Some(pnl) = subscription.next() {
println!("PnL: {:?}", pnl);

// After processing items subscription could be cancelled.
subscription.cancel();
}
}
79 changes: 16 additions & 63 deletions src/accounts.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,28 @@
//! # Account Management
//!
//! This module provides functionality for managing positions and profit and loss (PnL)
//! information in a trading system. It includes structures and implementations for:
//!
//! - Position tracking
//! - Daily, unrealized, and realized PnL calculations
//! - Family code management
//! - Real-time PnL updates for individual positions
//!
use std::marker::PhantomData;

use log::error;

use crate::client::transport::{GlobalResponseIterator, ResponseIterator};
use crate::client::transport::GlobalResponseIterator;
use crate::client::{Subscribable, Subscription};
use crate::contracts::Contract;
use crate::messages::{IncomingMessages, ResponseMessage};
use crate::{server_versions, Client, Error};

mod decoders;
mod encoders;

// Realtime PnL update for account.
#[derive(Debug, Default)]
pub struct PnL {
/// DailyPnL for the position
Expand All @@ -28,6 +41,7 @@ impl Subscribable<PnL> for PnL {
}
}

// Realtime PnL update for a position in account.
#[derive(Debug, Default)]
pub struct PnLSingle {
// Current size of the position
Expand Down Expand Up @@ -129,7 +143,7 @@ pub(crate) fn pnl<'a>(client: &'a Client, account: &str, model_code: Option<&str
}

// Requests real time updates for daily PnL of individual positions.
///
//
// # Arguments
// * `client` - Client
// * `account` - Account in which position exists
Expand Down Expand Up @@ -192,66 +206,5 @@ impl<'a> Iterator for PositionIterator<'a> {
}
}

// Supports iteration over [Pnl].
pub struct Subscription<'a, T> {
client: &'a Client,
responses: ResponseIterator,
phantom: PhantomData<T>,
}

impl<'a, T: Subscribable<T>> Subscription<'a, T> {
pub fn try_next(&mut self) -> Option<T> {
if let Some(mut message) = self.responses.try_next() {
if message.message_type() == T::INCOMING_MESSAGE_ID {
match T::decode(self.client.server_version(), &mut message) {
Ok(val) => return Some(val),
Err(err) => {
error!("error decoding execution data: {err}");
return None;
}
}
}
return None;
} else {
None
}
}

pub fn cancel(&mut self) {}
}

trait Subscribable<T> {
const INCOMING_MESSAGE_ID: IncomingMessages;
fn decode(server_version: i32, message: &mut ResponseMessage) -> Result<T, Error>;
}

impl<'a, T: Subscribable<T>> Iterator for Subscription<'a, T> {
type Item = T;

// Returns the next [Position]. Waits up to x seconds for next [OrderDataResult].
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(mut message) = self.responses.next() {
if message.message_type() == T::INCOMING_MESSAGE_ID {
match T::decode(self.client.server_version(), &mut message) {
Ok(val) => return Some(val),
Err(err) => {
error!("error decoding execution data: {err}");
}
}
} else if message.message_type() == IncomingMessages::Error {
let error_message = message.peek_string(4);
error!("{error_message}");
return None;
} else {
error!("subscription iterator unexpected message: {message:?}");
}
} else {
return None;
}
}
}
}

#[cfg(test)]
mod tests;
66 changes: 64 additions & 2 deletions src/client.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::fmt::Debug;
use std::io::Write;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicI32, Ordering};
use std::sync::{Arc, Mutex};

Expand All @@ -9,14 +10,14 @@ use time::macros::format_description;
use time::OffsetDateTime;
use time_tz::{timezones, OffsetResult, PrimitiveDateTimeExt, Tz};

use crate::accounts::{FamilyCode, PnL, PnLSingle, Position, Subscription};
use crate::accounts::{FamilyCode, PnL, PnLSingle, Position};
use crate::client::transport::{GlobalResponseIterator, MessageBus, ResponseIterator, TcpMessageBus};
use crate::contracts::Contract;
use crate::errors::Error;
use crate::market_data::historical;
use crate::market_data::realtime::{self, Bar, BarSize, WhatToShow};
use crate::messages::RequestMessage;
use crate::messages::{IncomingMessages, OutgoingMessages};
use crate::messages::{RequestMessage, ResponseMessage};
use crate::orders::{Order, OrderDataResult, OrderNotification};
use crate::{accounts, contracts, orders, server_versions};

Expand Down Expand Up @@ -1031,6 +1032,67 @@ impl Debug for Client {
}
}

/// Supports the handling of responses from TWS.
pub struct Subscription<'a, T> {
pub(crate) client: &'a Client,
pub(crate) responses: ResponseIterator,
pub(crate) phantom: PhantomData<T>,
}

impl<'a, T: Subscribable<T>> Subscription<'a, T> {
pub fn try_next(&mut self) -> Option<T> {
if let Some(mut message) = self.responses.try_next() {
if message.message_type() == T::INCOMING_MESSAGE_ID {
match T::decode(self.client.server_version(), &mut message) {
Ok(val) => return Some(val),
Err(err) => {
error!("error decoding execution data: {err}");
return None;
}
}
}
return None;
} else {
None
}
}

pub fn cancel(&mut self) {}
}

pub(crate) trait Subscribable<T> {
const INCOMING_MESSAGE_ID: IncomingMessages;
fn decode(server_version: i32, message: &mut ResponseMessage) -> Result<T, Error>;
}

impl<'a, T: Subscribable<T>> Iterator for Subscription<'a, T> {
type Item = T;

// Returns the next [Position]. Waits up to x seconds for next [OrderDataResult].
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(mut message) = self.responses.next() {
if message.message_type() == T::INCOMING_MESSAGE_ID {
match T::decode(self.client.server_version(), &mut message) {
Ok(val) => return Some(val),
Err(err) => {
error!("error decoding execution data: {err}");
}
}
} else if message.message_type() == IncomingMessages::Error {
let error_message = message.peek_string(4);
error!("{error_message}");
return None;
} else {
error!("subscription iterator unexpected message: {message:?}");
}
} else {
return None;
}
}
}
}

// Parses following format: 20230405 22:20:39 PST
fn parse_connection_time(connection_time: &str) -> (Option<OffsetDateTime>, Option<&'static Tz>) {
let parts: Vec<&str> = connection_time.split(' ').collect();
Expand Down

0 comments on commit ef4464c

Please sign in to comment.