Skip to content

Commit baf37e3

Browse files
author
Wil Boayue
committed
refactored request order ids
1 parent 6458b28 commit baf37e3

File tree

4 files changed

+34
-40
lines changed

4 files changed

+34
-40
lines changed

src/client.rs

+3-6
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,7 @@ impl Client {
8686
client.start_api()?;
8787
client.receive_account_info()?;
8888

89-
client
90-
.message_bus
91-
.lock()?
92-
.process_messages(client.server_version)?;
89+
client.message_bus.lock()?.process_messages(client.server_version)?;
9390

9491
Ok(client)
9592
}
@@ -971,8 +968,8 @@ impl Client {
971968
}
972969

973970
/// Sends request for the next valid order id.
974-
pub(crate) fn request_next_order_id(&self, message: RequestMessage) -> Result<BusSubscription, Error> {
975-
self.message_bus.lock()?.request_next_order_id(&message)
971+
pub(crate) fn send_shared_message(&self, message_id: OutgoingMessages, message: RequestMessage) -> Result<BusSubscription, Error> {
972+
self.message_bus.lock()?.send_shared_message(message_id, &message)
976973
}
977974

978975
/// Sends request for open orders.

src/orders.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1371,7 +1371,7 @@ pub(crate) fn global_cancel(client: &Client) -> Result<(), Error> {
13711371
pub(crate) fn next_valid_order_id(client: &Client) -> Result<i32, Error> {
13721372
let message = encoders::encode_next_valid_order_id()?;
13731373

1374-
let mut messages = client.request_next_order_id(message)?;
1374+
let mut messages = client.send_shared_message(OutgoingMessages::RequestIds, message)?;
13751375

13761376
if let Some(message) = messages.next() {
13771377
let order_id_index = 2;

src/stubs.rs

+5-14
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::sync::{Arc, RwLock};
22

33
use crossbeam::channel;
44

5-
use crate::messages::{RequestMessage, ResponseMessage};
5+
use crate::messages::{OutgoingMessages, RequestMessage, ResponseMessage};
66
use crate::transport::{BusSubscription, MessageBus, SubscriptionBuilder};
77
use crate::Error;
88

@@ -24,10 +24,7 @@ impl MessageBus for MessageBusStub {
2424
}
2525

2626
fn write_message(&mut self, message: &RequestMessage) -> Result<(), Error> {
27-
self.request_messages
28-
.write()
29-
.unwrap()
30-
.push(message.clone());
27+
self.request_messages.write().unwrap().push(message.clone());
3128
Ok(())
3229
}
3330

@@ -43,7 +40,7 @@ impl MessageBus for MessageBusStub {
4340
mock_request(self, request_id, message)
4441
}
4542

46-
fn request_next_order_id(&mut self, message: &RequestMessage) -> Result<BusSubscription, Error> {
43+
fn send_shared_message(&mut self, _message_id: OutgoingMessages, message: &RequestMessage) -> Result<BusSubscription, Error> {
4744
mock_global_request(self, message)
4845
}
4946

@@ -73,10 +70,7 @@ impl MessageBus for MessageBusStub {
7370
}
7471

7572
fn mock_request(stub: &mut MessageBusStub, _request_id: i32, message: &RequestMessage) -> Result<BusSubscription, Error> {
76-
stub.request_messages
77-
.write()
78-
.unwrap()
79-
.push(message.clone());
73+
stub.request_messages.write().unwrap().push(message.clone());
8074

8175
let (sender, receiver) = channel::unbounded();
8276
let (s1, _r1) = channel::unbounded();
@@ -91,10 +85,7 @@ fn mock_request(stub: &mut MessageBusStub, _request_id: i32, message: &RequestMe
9185
}
9286

9387
fn mock_global_request(stub: &mut MessageBusStub, message: &RequestMessage) -> Result<BusSubscription, Error> {
94-
stub.request_messages
95-
.write()
96-
.unwrap()
97-
.push(message.clone());
88+
stub.request_messages.write().unwrap().push(message.clone());
9889

9990
let (sender, receiver) = channel::unbounded();
10091

src/transport.rs

+25-19
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,8 @@ pub(crate) trait MessageBus: Send + Sync {
3131
fn send_durable_message(&mut self, request_id: i32, packet: &RequestMessage) -> Result<BusSubscription, Error>;
3232
fn send_order_message(&mut self, request_id: i32, packet: &RequestMessage) -> Result<BusSubscription, Error>;
3333

34-
fn send_shared_message(&mut self, message_id: OutgoingMessages, packet: &RequestMessage) -> Result<BusSubscription, Error> {
35-
Ok(BusSubscription::new(todo!(), todo!(), todo!(), todo!(), todo!()))
36-
}
34+
fn send_shared_message(&mut self, message_id: OutgoingMessages, packet: &RequestMessage) -> Result<BusSubscription, Error>;
3735

38-
fn request_next_order_id(&mut self, message: &RequestMessage) -> Result<BusSubscription, Error>;
3936
fn request_open_orders(&mut self, message: &RequestMessage) -> Result<BusSubscription, Error>;
4037
fn request_market_rule(&mut self, message: &RequestMessage) -> Result<BusSubscription, Error>;
4138
fn request_positions(&mut self, message: &RequestMessage) -> Result<BusSubscription, Error>;
@@ -70,6 +67,20 @@ impl SharedChannels {
7067

7168
// Register request/response pairs.
7269
instance.register(OutgoingMessages::RequestIds, &[IncomingMessages::NextValidId]);
70+
instance.register(OutgoingMessages::RequestFamilyCodes, &[IncomingMessages::FamilyCodes]);
71+
instance.register(OutgoingMessages::RequestMarketRule, &[IncomingMessages::MarketRule]);
72+
instance.register(
73+
OutgoingMessages::RequestPositions,
74+
&[IncomingMessages::Position, IncomingMessages::PositionEnd],
75+
);
76+
instance.register(
77+
OutgoingMessages::RequestPositionsMulti,
78+
&[IncomingMessages::PositionMulti, IncomingMessages::PositionMultiEnd],
79+
);
80+
instance.register(
81+
OutgoingMessages::RequestOpenOrders,
82+
&[IncomingMessages::OpenOrder, IncomingMessages::OpenOrderEnd],
83+
);
7384

7485
instance
7586
}
@@ -88,15 +99,20 @@ impl SharedChannels {
8899
}
89100

90101
pub fn get_receiver(&self, message_id: OutgoingMessages) -> Arc<Receiver<ResponseMessage>> {
91-
let receiver = self.receivers.get(&message_id).expect("unsupport type");
102+
let receiver = self
103+
.receivers
104+
.get(&message_id)
105+
.expect(&format!("unsupported request message {:?}", message_id));
92106
Arc::clone(receiver)
93107
}
94108

95-
pub fn get_sender(&self, message_id: OutgoingMessages) -> Arc<Receiver<ResponseMessage>> {
96-
let receiver = self.receivers.get(&message_id).expect("unsupport type");
97-
Arc::clone(receiver)
109+
pub fn get_sender(&self, message_id: IncomingMessages) -> Arc<Sender<ResponseMessage>> {
110+
let sender = self
111+
.senders
112+
.get(&message_id)
113+
.expect(&format!("unsupported response message {:?}", message_id));
114+
Arc::clone(sender)
98115
}
99-
100116
}
101117

102118
#[derive(Debug)]
@@ -254,16 +270,6 @@ impl MessageBus for TcpMessageBus {
254270
Ok(subscription)
255271
}
256272

257-
fn request_next_order_id(&mut self, message: &RequestMessage) -> Result<BusSubscription, Error> {
258-
self.write_message(message)?;
259-
260-
let subscription = SubscriptionBuilder::new()
261-
.shared_receiver(Arc::clone(&self.globals.order_ids_out))
262-
.build();
263-
264-
Ok(subscription)
265-
}
266-
267273
fn request_open_orders(&mut self, message: &RequestMessage) -> Result<BusSubscription, Error> {
268274
self.write_message(message)?;
269275

0 commit comments

Comments
 (0)