Skip to content

Commit 317bd76

Browse files
author
Wil Boayue
committed
cleanup duplications
1 parent 0a3d31e commit 317bd76

File tree

2 files changed

+99
-131
lines changed

2 files changed

+99
-131
lines changed

src/market_data/historical.rs

+98-130
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use time::{Date, OffsetDateTime};
99

1010
use crate::contracts::Contract;
1111
use crate::messages::{IncomingMessages, RequestMessage, ResponseMessage};
12-
use crate::transport::InternalSubscription;
12+
use crate::transport::{InternalSubscription, Response};
1313
use crate::{server_versions, Client, Error, ToField};
1414

1515
mod decoders;
@@ -339,30 +339,32 @@ pub(crate) fn historical_data(
339339
)?;
340340
}
341341

342-
let request_id = client.next_request_id();
343-
let request = encoders::encode_request_historical_data(
344-
client.server_version(),
345-
request_id,
346-
contract,
347-
end_date,
348-
duration,
349-
bar_size,
350-
what_to_show,
351-
use_rth,
352-
false,
353-
Vec::<crate::contracts::TagValue>::default(),
354-
)?;
342+
loop {
343+
let request_id = client.next_request_id();
344+
let request = encoders::encode_request_historical_data(
345+
client.server_version(),
346+
request_id,
347+
contract,
348+
end_date,
349+
duration,
350+
bar_size,
351+
what_to_show,
352+
use_rth,
353+
false,
354+
Vec::<crate::contracts::TagValue>::default(),
355+
)?;
355356

356-
let subscription = client.send_request(request_id, request)?;
357+
let subscription = client.send_request(request_id, request)?;
357358

358-
match subscription.next() {
359-
Some(Ok(mut message)) if message.message_type() == IncomingMessages::HistoricalData => {
360-
Ok(decoders::decode_historical_data(client.server_version, time_zone(client), &mut message)?)
359+
match subscription.next() {
360+
Some(Ok(mut message)) if message.message_type() == IncomingMessages::HistoricalData => {
361+
return decoders::decode_historical_data(client.server_version, time_zone(client), &mut message)
362+
}
363+
Some(Ok(message)) => return Err(Error::UnexpectedResponse(message)),
364+
Some(Err(Error::ConnectionReset)) => continue,
365+
Some(Err(e)) => return Err(e),
366+
None => return Err(Error::UnexpectedEndOfStream),
361367
}
362-
Some(Ok(message)) => Err(Error::UnexpectedResponse(message)),
363-
Some(Err(Error::ConnectionReset)) => historical_data(client, contract, end_date, duration, bar_size, what_to_show, use_rth),
364-
Some(Err(e)) => Err(e),
365-
None => Err(Error::UnexpectedEndOfStream),
366368
}
367369
}
368370

@@ -393,30 +395,32 @@ pub(crate) fn historical_schedule(
393395
"It does not support requesting of historical schedule.",
394396
)?;
395397

396-
let request_id = client.next_request_id();
397-
let request = encoders::encode_request_historical_data(
398-
client.server_version(),
399-
request_id,
400-
contract,
401-
end_date,
402-
duration,
403-
BarSize::Day,
404-
Some(WhatToShow::Schedule),
405-
true,
406-
false,
407-
Vec::<crate::contracts::TagValue>::default(),
408-
)?;
398+
loop {
399+
let request_id = client.next_request_id();
400+
let request = encoders::encode_request_historical_data(
401+
client.server_version(),
402+
request_id,
403+
contract,
404+
end_date,
405+
duration,
406+
BarSize::Day,
407+
Some(WhatToShow::Schedule),
408+
true,
409+
false,
410+
Vec::<crate::contracts::TagValue>::default(),
411+
)?;
409412

410-
let subscription = client.send_request(request_id, request)?;
413+
let subscription = client.send_request(request_id, request)?;
411414

412-
match subscription.next() {
413-
Some(Ok(mut message)) if message.message_type() == IncomingMessages::HistoricalSchedule => {
414-
Ok(decoders::decode_historical_schedule(&mut message)?)
415+
match subscription.next() {
416+
Some(Ok(mut message)) if message.message_type() == IncomingMessages::HistoricalSchedule => {
417+
return decoders::decode_historical_schedule(&mut message)
418+
}
419+
Some(Ok(message)) => return Err(Error::UnexpectedResponse(message)),
420+
Some(Err(Error::ConnectionReset)) => continue,
421+
Some(Err(e)) => return Err(e),
422+
None => return Err(Error::UnexpectedEndOfStream),
415423
}
416-
Some(Ok(message)) => Err(Error::UnexpectedResponse(message)),
417-
Some(Err(Error::ConnectionReset)) => historical_schedule(client, contract, end_date, duration),
418-
Some(Err(e)) => Err(e),
419-
None => Err(Error::UnexpectedEndOfStream),
420424
}
421425
}
422426

@@ -432,7 +436,7 @@ pub(crate) fn historical_ticks_bid_ask(
432436
client.check_server_version(server_versions::HISTORICAL_TICKS, "It does not support historical ticks request.")?;
433437

434438
let request_id = client.next_request_id();
435-
let message = encoders::encode_request_historical_ticks(
439+
let request = encoders::encode_request_historical_ticks(
436440
request_id,
437441
contract,
438442
start,
@@ -442,10 +446,9 @@ pub(crate) fn historical_ticks_bid_ask(
442446
use_rth,
443447
ignore_size,
444448
)?;
449+
let subscription = client.send_request(request_id, request)?;
445450

446-
let messages = client.send_request(request_id, message)?;
447-
448-
Ok(TickSubscription::new(messages))
451+
Ok(TickSubscription::new(subscription))
449452
}
450453

451454
pub(crate) fn historical_ticks_mid_point(
@@ -459,11 +462,10 @@ pub(crate) fn historical_ticks_mid_point(
459462
client.check_server_version(server_versions::HISTORICAL_TICKS, "It does not support historical ticks request.")?;
460463

461464
let request_id = client.next_request_id();
462-
let message = encoders::encode_request_historical_ticks(request_id, contract, start, end, number_of_ticks, WhatToShow::MidPoint, use_rth, false)?;
463-
464-
let messages = client.send_request(request_id, message)?;
465+
let request = encoders::encode_request_historical_ticks(request_id, contract, start, end, number_of_ticks, WhatToShow::MidPoint, use_rth, false)?;
466+
let subscription = client.send_request(request_id, request)?;
465467

466-
Ok(TickSubscription::new(messages))
468+
Ok(TickSubscription::new(subscription))
467469
}
468470

469471
pub(crate) fn historical_ticks_trade(
@@ -477,25 +479,26 @@ pub(crate) fn historical_ticks_trade(
477479
client.check_server_version(server_versions::HISTORICAL_TICKS, "It does not support historical ticks request.")?;
478480

479481
let request_id = client.next_request_id();
480-
let message = encoders::encode_request_historical_ticks(request_id, contract, start, end, number_of_ticks, WhatToShow::Trades, use_rth, false)?;
481-
482-
let messages = client.send_request(request_id, message)?;
482+
let request = encoders::encode_request_historical_ticks(request_id, contract, start, end, number_of_ticks, WhatToShow::Trades, use_rth, false)?;
483+
let subscription = client.send_request(request_id, request)?;
483484

484-
Ok(TickSubscription::new(messages))
485+
Ok(TickSubscription::new(subscription))
485486
}
486487

487488
pub(crate) fn histogram_data(client: &Client, contract: &Contract, use_rth: bool, period: BarSize) -> Result<Vec<HistogramEntry>, Error> {
488489
client.check_server_version(server_versions::REQ_HISTOGRAM, "It does not support histogram data requests.")?;
489490

490-
let request_id = client.next_request_id();
491-
let message = encoders::encode_request_histogram_data(request_id, contract, use_rth, period)?;
492-
493-
let subscription = client.send_request(request_id, message)?;
491+
loop {
492+
let request_id = client.next_request_id();
493+
let request = encoders::encode_request_histogram_data(request_id, contract, use_rth, period)?;
494+
let subscription = client.send_request(request_id, request)?;
494495

495-
match subscription.next() {
496-
Some(Ok(mut message)) => decoders::decode_histogram_data(&mut message),
497-
Some(Err(e)) => Err(e),
498-
None => Ok(Vec::new()),
496+
match subscription.next() {
497+
Some(Ok(mut message)) => return decoders::decode_histogram_data(&mut message),
498+
Some(Err(Error::ConnectionReset)) => continue,
499+
Some(Err(e)) => return Err(e),
500+
None => return Ok(Vec::new()),
501+
}
499502
}
500503
}
501504

@@ -561,86 +564,60 @@ impl<T: TickDecoder<T>> TickSubscription<T> {
561564
}
562565

563566
pub fn next(&self) -> Option<T> {
564-
self.clear_error();
565-
566-
if let Some(message) = self.next_buffered() {
567-
return Some(message);
568-
}
569-
570-
if self.done.load(Ordering::Relaxed) {
571-
return None;
572-
}
573-
574-
match self.messages.next() {
575-
Some(Ok(message)) if message.message_type() == T::MESSAGE_TYPE => {
576-
self.fill_buffer(message);
577-
self.next()
578-
}
579-
Some(Ok(message)) => {
580-
debug!("unexpected message: {:?}", message);
581-
self.next()
582-
}
583-
Some(Err(e)) => {
584-
self.set_error(e);
585-
None
586-
}
587-
None => None,
588-
}
567+
self.next_helper(|| self.messages.next())
589568
}
590569

591570
pub fn try_next(&self) -> Option<T> {
592-
self.clear_error();
571+
self.next_helper(|| self.messages.try_next())
572+
}
593573

594-
if let Some(message) = self.next_buffered() {
595-
return Some(message);
596-
}
574+
pub fn next_timeout(&self, duration: std::time::Duration) -> Option<T> {
575+
self.next_helper(|| self.messages.next_timeout(duration))
576+
}
597577

598-
if self.done.load(Ordering::Relaxed) {
599-
return None;
600-
}
578+
fn next_helper<F>(&self, next_response: F) -> Option<T>
579+
where
580+
F: Fn() -> Option<Response>,
581+
{
582+
self.clear_error();
601583

602-
match self.messages.try_next() {
603-
Some(Ok(message)) if message.message_type() == T::MESSAGE_TYPE => {
604-
self.fill_buffer(message);
605-
self.try_next()
584+
loop {
585+
if let Some(message) = self.next_buffered() {
586+
return Some(message);
606587
}
607-
Some(Ok(message)) => {
608-
debug!("unexpected message: {:?}", message);
609-
self.try_next()
588+
589+
if self.done.load(Ordering::Relaxed) {
590+
return None;
610591
}
611-
Some(Err(e)) => {
612-
self.set_error(e);
613-
None
592+
593+
match self.fill_buffer(next_response()) {
594+
Ok(()) => continue,
595+
Err(()) => return None,
614596
}
615-
None => None,
616597
}
617598
}
618599

619-
pub fn next_timeout(&self, duration: std::time::Duration) -> Option<T> {
620-
self.clear_error();
600+
fn fill_buffer(&self, response: Option<Response>) -> Result<(), ()> {
601+
match response {
602+
Some(Ok(mut message)) if message.message_type() == T::MESSAGE_TYPE => {
603+
let mut buffer = self.buffer.lock().unwrap();
621604

622-
if let Some(message) = self.next_buffered() {
623-
return Some(message);
624-
}
605+
let (ticks, done) = T::decode(&mut message).unwrap();
625606

626-
if self.done.load(Ordering::Relaxed) {
627-
return None;
628-
}
607+
buffer.append(&mut ticks.into());
608+
self.done.store(done, Ordering::Relaxed);
629609

630-
match self.messages.next_timeout(duration) {
631-
Some(Ok(message)) if message.message_type() == T::MESSAGE_TYPE => {
632-
self.fill_buffer(message);
633-
self.next_timeout(duration)
610+
Ok(())
634611
}
635612
Some(Ok(message)) => {
636613
debug!("unexpected message: {:?}", message);
637-
self.next_timeout(duration)
614+
Ok(())
638615
}
639616
Some(Err(e)) => {
640617
self.set_error(e);
641-
None
618+
Err(())
642619
}
643-
None => None,
620+
None => Err(()),
644621
}
645622
}
646623

@@ -658,15 +635,6 @@ impl<T: TickDecoder<T>> TickSubscription<T> {
658635
let mut error = self.error.lock().unwrap();
659636
*error = None;
660637
}
661-
662-
fn fill_buffer(&self, mut message: ResponseMessage) {
663-
let mut buffer = self.buffer.lock().unwrap();
664-
665-
let (ticks, done) = T::decode(&mut message).unwrap();
666-
667-
buffer.append(&mut ticks.into());
668-
self.done.store(done, Ordering::Relaxed);
669-
}
670638
}
671639

672640
/// An iterator that yields items as they become available, blocking if necessary.

src/transport.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ pub(crate) trait MessageBus: Send + Sync {
5555
}
5656
}
5757

58-
type Response = Result<ResponseMessage, Error>;
58+
pub(crate) type Response = Result<ResponseMessage, Error>;
5959

6060
// For requests without an identifier, shared channels are created
6161
// to route request/response pairs based on message type.

0 commit comments

Comments
 (0)