Skip to content

Commit e5c22d9

Browse files
author
Wil Boayue
committed
refactored requests. process messages
1 parent 27841f6 commit e5c22d9

File tree

5 files changed

+80
-96
lines changed

5 files changed

+80
-96
lines changed

src/accounts.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ pub(crate) fn pnl<'a>(client: &'a Client, account: &str, model_code: Option<&str
133133
let request_id = client.next_request_id();
134134

135135
let request = encoders::encode_request_pnl(request_id, account, model_code)?;
136-
let responses = client.send_durable_request(request_id, request)?;
136+
let responses = client.send_request(request_id, request)?;
137137

138138
Ok(Subscription {
139139
client,
@@ -160,7 +160,7 @@ pub(crate) fn pnl_single<'a>(
160160
let request_id = client.next_request_id();
161161

162162
let request = encoders::encode_request_pnl_single(request_id, account, contract_id, model_code)?;
163-
let responses = client.send_durable_request(request_id, request)?;
163+
let responses = client.send_request(request_id, request)?;
164164

165165
Ok(Subscription {
166166
client,

src/client.rs

+2-9
Original file line numberDiff line numberDiff line change
@@ -950,21 +950,14 @@ impl Client {
950950
self.message_bus.lock()?.write_message(&packet)
951951
}
952952

953-
// wait timeout
954953
pub(crate) fn send_request(&self, request_id: i32, message: RequestMessage) -> Result<BusSubscription, Error> {
955954
debug!("send_message({:?}, {:?})", request_id, message);
956-
self.message_bus.lock()?.send_generic_message(request_id, &message)
957-
}
958-
959-
// wait indefinitely. until cancelled.
960-
pub(crate) fn send_durable_request(&self, request_id: i32, message: RequestMessage) -> Result<BusSubscription, Error> {
961-
debug!("send_durable_request({:?}, {:?})", request_id, message);
962-
self.message_bus.lock()?.send_durable_message(request_id, &message)
955+
self.message_bus.lock()?.send_request(request_id, &message)
963956
}
964957

965958
pub(crate) fn send_order(&self, order_id: i32, message: RequestMessage) -> Result<BusSubscription, Error> {
966959
debug!("send_order({:?}, {:?})", order_id, message);
967-
self.message_bus.lock()?.send_order_message(order_id, &message)
960+
self.message_bus.lock()?.send_order_request(order_id, &message)
968961
}
969962

970963
/// Sends request for the next valid order id.

src/market_data/realtime.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ pub(crate) fn realtime_bars<'a>(
146146
let request_id = client.next_request_id();
147147
let packet = encoders::encode_request_realtime_bars(client.server_version(), request_id, contract, bar_size, what_to_show, use_rth, options)?;
148148

149-
let responses = client.send_durable_request(request_id, packet)?;
149+
let responses = client.send_request(request_id, packet)?;
150150

151151
Ok(RealTimeBarIterator::new(client, request_id, responses))
152152
}
@@ -164,7 +164,7 @@ pub(crate) fn tick_by_tick_all_last<'a>(
164164
let request_id = client.next_request_id();
165165

166166
let message = encoders::tick_by_tick(server_version, request_id, contract, "AllLast", number_of_ticks, ignore_size)?;
167-
let responses = client.send_durable_request(request_id, message)?;
167+
let responses = client.send_request(request_id, message)?;
168168

169169
Ok(TradeIterator {
170170
client,
@@ -200,7 +200,7 @@ pub(crate) fn tick_by_tick_last<'a>(
200200
let request_id = client.next_request_id();
201201

202202
let message = encoders::tick_by_tick(server_version, request_id, contract, "Last", number_of_ticks, ignore_size)?;
203-
let responses = client.send_durable_request(request_id, message)?;
203+
let responses = client.send_request(request_id, message)?;
204204

205205
Ok(TradeIterator {
206206
client,
@@ -222,7 +222,7 @@ pub(crate) fn tick_by_tick_bid_ask<'a>(
222222
let request_id = client.next_request_id();
223223

224224
let message = encoders::tick_by_tick(server_version, request_id, contract, "BidAsk", number_of_ticks, ignore_size)?;
225-
let responses = client.send_durable_request(request_id, message)?;
225+
let responses = client.send_request(request_id, message)?;
226226

227227
Ok(BidAskIterator {
228228
client,
@@ -244,7 +244,7 @@ pub(crate) fn tick_by_tick_midpoint<'a>(
244244
let request_id = client.next_request_id();
245245

246246
let message = encoders::tick_by_tick(server_version, request_id, contract, "MidPoint", number_of_ticks, ignore_size)?;
247-
let responses = client.send_durable_request(request_id, message)?;
247+
let responses = client.send_request(request_id, message)?;
248248

249249
Ok(MidPointIterator {
250250
client,

src/stubs.rs

+2-6
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,11 @@ impl MessageBus for MessageBusStub {
2828
Ok(())
2929
}
3030

31-
fn send_generic_message(&mut self, request_id: i32, message: &RequestMessage) -> Result<BusSubscription, Error> {
31+
fn send_request(&mut self, request_id: i32, message: &RequestMessage) -> Result<BusSubscription, Error> {
3232
mock_request(self, request_id, message)
3333
}
3434

35-
fn send_durable_message(&mut self, request_id: i32, message: &RequestMessage) -> Result<BusSubscription, Error> {
36-
mock_request(self, request_id, message)
37-
}
38-
39-
fn send_order_message(&mut self, request_id: i32, message: &RequestMessage) -> Result<BusSubscription, Error> {
35+
fn send_order_request(&mut self, request_id: i32, message: &RequestMessage) -> Result<BusSubscription, Error> {
4036
mock_request(self, request_id, message)
4137
}
4238

src/transport.rs

+69-74
Original file line numberDiff line numberDiff line change
@@ -23,21 +23,28 @@ use recorder::MessageRecorder;
2323
mod recorder;
2424

2525
pub(crate) trait MessageBus: Send + Sync {
26+
// Reads the next available message from TWS
2627
fn read_message(&mut self) -> Result<ResponseMessage, Error>;
2728

29+
// Sends a formatted packet TWS
2830
fn write_message(&mut self, packet: &RequestMessage) -> Result<(), Error>;
2931

30-
fn send_generic_message(&mut self, request_id: i32, packet: &RequestMessage) -> Result<BusSubscription, Error>;
31-
fn send_durable_message(&mut self, request_id: i32, packet: &RequestMessage) -> Result<BusSubscription, Error>;
32-
fn send_order_message(&mut self, request_id: i32, packet: &RequestMessage) -> Result<BusSubscription, Error>;
32+
// Sends raw data to TWS
33+
fn write(&mut self, packet: &str) -> Result<(), Error>;
3334

34-
fn send_shared_request(&mut self, message_id: OutgoingMessages, packet: &RequestMessage) -> Result<BusSubscription, Error>;
35+
// Sends formatted message to TWS and creates a reply channel by request id.
36+
fn send_request(&mut self, request_id: i32, packet: &RequestMessage) -> Result<BusSubscription, Error>;
3537

36-
fn write(&mut self, packet: &str) -> Result<(), Error>;
38+
// Sends formatted order specific message to TWS and creates a reply channel by order id.
39+
fn send_order_request(&mut self, request_id: i32, packet: &RequestMessage) -> Result<BusSubscription, Error>;
3740

41+
// Sends formatted message to TWS and creates a reply channel by message type.
42+
fn send_shared_request(&mut self, message_id: OutgoingMessages, packet: &RequestMessage) -> Result<BusSubscription, Error>;
43+
44+
// Starts a dedicated thread to process responses from TWS.
3845
fn process_messages(&mut self, server_version: i32) -> Result<(), Error>;
3946

40-
// Exists for testing when request are stubbed
47+
// Testing interface. Tracks requests sent when Bus is stubbed.
4148
fn request_messages(&self) -> Vec<RequestMessage> {
4249
vec![]
4350
}
@@ -54,6 +61,7 @@ struct SharedChannels {
5461
}
5562

5663
impl SharedChannels {
64+
// Creates new instance and registers request/reply pairs.
5765
pub fn new() -> Self {
5866
let mut instance = Self {
5967
senders: HashMap::new(),
@@ -93,6 +101,7 @@ impl SharedChannels {
93101
}
94102
}
95103

104+
// Get receiver for specified message type. Panics if receiver not found.
96105
pub fn get_receiver(&self, message_type: OutgoingMessages) -> Arc<Receiver<ResponseMessage>> {
97106
let receiver = self
98107
.receivers
@@ -102,6 +111,7 @@ impl SharedChannels {
102111
Arc::clone(receiver)
103112
}
104113

114+
// Get sender for specified message type. Panics if sender not found.
105115
pub fn get_sender(&self, message_type: IncomingMessages) -> Arc<Sender<ResponseMessage>> {
106116
let sender = self
107117
.senders
@@ -116,6 +126,13 @@ impl SharedChannels {
116126
}
117127
}
118128

129+
// Signals are used to notify the backend when a subscriber is dropped.
130+
// This facilitates the cleanup of the SenderHashes.
131+
pub enum Signal {
132+
Request(i32),
133+
Order(i32),
134+
}
135+
119136
#[derive(Debug)]
120137
pub struct TcpMessageBus {
121138
reader: Arc<TcpStream>,
@@ -129,11 +146,6 @@ pub struct TcpMessageBus {
129146
signals_recv: Receiver<Signal>,
130147
}
131148

132-
pub enum Signal {
133-
Request(i32),
134-
Order(i32),
135-
}
136-
137149
impl TcpMessageBus {
138150
// establishes TCP connection to server
139151
pub fn connect(connection_string: &str) -> Result<TcpMessageBus, Error> {
@@ -159,14 +171,47 @@ impl TcpMessageBus {
159171
})
160172
}
161173

162-
fn add_request(&mut self, request_id: i32, sender: Sender<ResponseMessage>) -> Result<(), Error> {
163-
self.requests.insert(request_id, sender);
164-
Ok(())
174+
fn start_dispatcher_thread(&mut self, server_version: i32) -> JoinHandle<i32> {
175+
let reader = Arc::clone(&self.reader);
176+
let requests = Arc::clone(&self.requests);
177+
let recorder = self.recorder.clone();
178+
let orders = Arc::clone(&self.orders);
179+
let shared_channels = Arc::clone(&self.shared_channels);
180+
let executions = SenderHash::<String, ResponseMessage>::new();
181+
182+
thread::spawn(move || loop {
183+
match read_packet(&reader) {
184+
Ok(message) => {
185+
recorder.record_response(&message);
186+
dispatch_message(message, server_version, &requests, &orders, &shared_channels, &executions);
187+
}
188+
Err(err) => {
189+
error!("error reading packet: {:?}", err);
190+
continue;
191+
}
192+
};
193+
})
165194
}
166195

167-
fn add_order(&mut self, order_id: i32, sender: Sender<ResponseMessage>) -> Result<(), Error> {
168-
self.orders.insert(order_id, sender);
169-
Ok(())
196+
fn start_cleanup_thread(&mut self) -> JoinHandle<i32> {
197+
let requests = Arc::clone(&self.requests);
198+
let orders = Arc::clone(&self.orders);
199+
let signal_recv = self.signals_recv.clone();
200+
201+
thread::spawn(move || loop {
202+
for signal in &signal_recv {
203+
match signal {
204+
Signal::Request(request_id) => {
205+
requests.remove(&request_id);
206+
debug!("released request_id {}, requests.len()={}", request_id, requests.len());
207+
}
208+
Signal::Order(order_id) => {
209+
orders.remove(&order_id);
210+
debug!("released order_id {}, orders.len()={}", order_id, requests.len());
211+
}
212+
}
213+
}
214+
})
170215
}
171216
}
172217

@@ -177,25 +222,11 @@ impl MessageBus for TcpMessageBus {
177222
read_packet(&self.reader)
178223
}
179224

180-
fn send_generic_message(&mut self, request_id: i32, packet: &RequestMessage) -> Result<BusSubscription, Error> {
225+
fn send_request(&mut self, request_id: i32, packet: &RequestMessage) -> Result<BusSubscription, Error> {
181226
let (sender, receiver) = channel::unbounded();
182227

183-
self.add_request(request_id, sender)?;
184-
self.write_message(packet)?;
185-
186-
let subscription = SubscriptionBuilder::new()
187-
.receiver(receiver)
188-
.signaler(self.signals_send.clone())
189-
.request_id(request_id)
190-
.build();
191-
192-
Ok(subscription)
193-
}
194-
195-
fn send_durable_message(&mut self, request_id: i32, packet: &RequestMessage) -> Result<BusSubscription, Error> {
196-
let (sender, receiver) = channel::unbounded();
228+
self.requests.insert(request_id, sender);
197229

198-
self.add_request(request_id, sender)?;
199230
self.write_message(packet)?;
200231

201232
let subscription = SubscriptionBuilder::new()
@@ -207,10 +238,11 @@ impl MessageBus for TcpMessageBus {
207238
Ok(subscription)
208239
}
209240

210-
fn send_order_message(&mut self, order_id: i32, message: &RequestMessage) -> Result<BusSubscription, Error> {
241+
fn send_order_request(&mut self, order_id: i32, message: &RequestMessage) -> Result<BusSubscription, Error> {
211242
let (sender, receiver) = channel::unbounded();
212243

213-
self.add_order(order_id, sender)?;
244+
self.orders.insert(order_id, sender);
245+
214246
self.write_message(message)?;
215247

216248
let subscription = SubscriptionBuilder::new()
@@ -257,47 +289,10 @@ impl MessageBus for TcpMessageBus {
257289
}
258290

259291
fn process_messages(&mut self, server_version: i32) -> Result<(), Error> {
260-
let reader = Arc::clone(&self.reader);
261-
let requests = Arc::clone(&self.requests);
262-
let recorder = self.recorder.clone();
263-
let orders = Arc::clone(&self.orders);
264-
let shared_channels = Arc::clone(&self.shared_channels);
265-
let executions = SenderHash::<String, ResponseMessage>::new();
266-
267-
let handle = thread::spawn(move || loop {
268-
match read_packet(&reader) {
269-
Ok(message) => {
270-
recorder.record_response(&message);
271-
dispatch_message(message, server_version, &requests, &orders, &shared_channels, &executions);
272-
}
273-
Err(err) => {
274-
error!("error reading packet: {:?}", err);
275-
continue;
276-
}
277-
};
278-
});
279-
292+
let handle = self.start_dispatcher_thread(server_version);
280293
self.handles.push(handle);
281294

282-
let requests = Arc::clone(&self.requests);
283-
let orders = Arc::clone(&self.orders);
284-
let signal_recv = self.signals_recv.clone();
285-
286-
let handle = thread::spawn(move || loop {
287-
for signal in &signal_recv {
288-
match signal {
289-
Signal::Request(request_id) => {
290-
requests.remove(&request_id);
291-
debug!("released request_id {}, requests.len()={}", request_id, requests.len());
292-
}
293-
Signal::Order(order_id) => {
294-
orders.remove(&order_id);
295-
debug!("released order_id {}, orders.len()={}", order_id, requests.len());
296-
}
297-
}
298-
}
299-
});
300-
295+
let handle = self.start_cleanup_thread();
301296
self.handles.push(handle);
302297

303298
Ok(())

0 commit comments

Comments
 (0)