Skip to content

Commit 811d777

Browse files
committed
Improved Tiingo and introduced sendAsync
1 parent 9d3c21f commit 811d777

File tree

8 files changed

+72
-52
lines changed

8 files changed

+72
-52
lines changed

roboquant-ta/src/test/kotlin/org/roboquant/samples/TaSamples.kt

+1
Original file line numberDiff line numberDiff line change
@@ -202,3 +202,4 @@ internal class TaSamples {
202202

203203
}
204204

205+

roboquant-tiingo/src/main/kotlin/org/roboquant/tiingo/TiingoHistoricFeed.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import java.util.concurrent.TimeUnit
3333
/**
3434
* Tiingo historic feed.
3535
*
36-
* This feed uses CSV format for faster processing and less bandwidth usage.
36+
* This feed requests CSV format from Tiingo for faster processing and less bandwidth usage.
3737
*/
3838
class TiingoHistoricFeed(
3939
configure: TiingoConfig.() -> Unit = {}

roboquant-tiingo/src/main/kotlin/org/roboquant/tiingo/TiingoLiveFeed.kt

+25-41
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,8 @@ import okhttp3.OkHttpClient
2323
import okhttp3.Request
2424
import okhttp3.WebSocket
2525
import okhttp3.WebSocketListener
26-
import org.roboquant.common.Asset
27-
import org.roboquant.common.AssetType
28-
import org.roboquant.common.Logging
29-
import org.roboquant.feeds.Event
30-
import org.roboquant.feeds.LiveFeed
31-
import org.roboquant.feeds.PriceAction
32-
import org.roboquant.feeds.PriceQuote
26+
import org.roboquant.common.*
27+
import org.roboquant.feeds.*
3328
import java.time.Instant
3429
import java.util.concurrent.TimeUnit
3530
import kotlin.collections.set
@@ -39,24 +34,26 @@ private val logger = Logging.getLogger(TiingoLiveFeed::class)
3934

4035
private class Handler(private val feed: TiingoLiveFeed) : WebSocketListener() {
4136

42-
val assets = mutableMapOf<String, Asset>()
43-
4437
private fun handleIEX(data: JsonArray) {
4538
val type = data[0].asString
39+
val symbol = data[3].asString.uppercase()
40+
val asset = Config.getOrPutAsset(symbol) { Asset.forexPair(symbol) }
41+
val time = Instant.ofEpochMilli(0).plusNanos(data[2].asLong)
42+
4643
if (type == "Q") {
47-
val symbol = data[3].asString.uppercase()
48-
val asset = assets.getOrPut(symbol) { Asset(symbol) }
4944
val quote = PriceQuote(asset, data[7].asDouble, data[8].asDouble, data[5].asDouble, data[4].asDouble)
50-
val time = Instant.ofEpochMilli(0).plusNanos(data[2].asLong)
5145
feed.deliver(quote, time)
46+
} else if (type == "T") {
47+
val trade = TradePrice(asset, data[9].asDouble, data[10].asDouble)
48+
feed.deliver(trade, time)
5249
}
5350
}
5451

5552
private fun handleFX(data: JsonArray) {
5653
val type = data[0].asString
5754
if (type == "Q") {
5855
val symbol = data[1].asString.uppercase()
59-
val asset = assets.getOrPut(symbol) { Asset.forexPair(symbol) }
56+
val asset = Config.getOrPutAsset(symbol) { Asset.forexPair(symbol) }
6057
val quote = PriceQuote(asset, data[7].asDouble, data[6].asDouble, data[4].asDouble, data[3].asDouble)
6158
val time = Instant.parse(data[2].asString)
6259
feed.deliver(quote, time)
@@ -65,12 +62,15 @@ private class Handler(private val feed: TiingoLiveFeed) : WebSocketListener() {
6562

6663
private fun handleCrypto(data: JsonArray) {
6764
val type = data[0].asString
65+
val symbol = data[1].asString.uppercase()
66+
val asset = Config.getOrPutAsset(symbol) { Asset(symbol, AssetType.CRYPTO, exchange = Exchange.CRYPTO) }
67+
val time = Instant.parse(data[2].asString)
6868
if (type == "Q") {
69-
val symbol = data[1].asString.uppercase()
70-
val asset = assets.getOrPut(symbol) { Asset(symbol, AssetType.CRYPTO) }
7169
val quote = PriceQuote(asset, data[8].asDouble, data[7].asDouble, data[5].asDouble, data[4].asDouble)
72-
val time = Instant.parse(data[2].asString)
7370
feed.deliver(quote, time)
71+
} else if (type == "T") {
72+
val trade = TradePrice(asset, data[5].asDouble, data[4].asDouble)
73+
feed.deliver(trade, time)
7474
}
7575
}
7676

@@ -99,19 +99,21 @@ private class Handler(private val feed: TiingoLiveFeed) : WebSocketListener() {
9999
}
100100

101101
/**
102-
* Tiingo live feed
102+
* Retrieve real-time data from Tiingo. It has support for US stocks, Forex and Crypto.
103103
*
104-
* This feed uses web-sockets for low letency and has nanosecond resolution
104+
* This feed uses the Tiingo websocket API for low letency and has nanosecond time resolution
105105
*/
106106
class TiingoLiveFeed private constructor(
107107
private val type: String,
108-
private val thresholdLevel: Int,
108+
private val thresholdLevel: Int = 5,
109109
configure: TiingoConfig.() -> Unit = {}
110110
) : LiveFeed() {
111111

112112
private val config = TiingoConfig()
113113

114114
init {
115+
val types = setOf("iex", "crypto", "fx")
116+
require(type in types) { "invalid type $type, allowed types are $types"}
115117
config.configure()
116118
require(config.key.isNotBlank()) { "no valid key found"}
117119
}
@@ -178,13 +180,12 @@ class TiingoLiveFeed private constructor(
178180
}
179181

180182
/**
181-
* Subscribe to quotes for provided [symbols].
183+
* Subscribe to real-time trades/quotes for provided [symbols].
182184
*
183-
* If no symbols are provided, all available data is subscribed to. That is a lot of data and can impact
184-
* your monthly quato quickly.
185+
* If no symbols are provided, all available market data is subscribed to. Be aware that this a lot of data.
185186
*
186-
* For crypto and some forex it is challenging to derive the underlying currency from just the symbol name,
187-
* so it is better to use [subscribeAssets] instead.
187+
* For crypto and some forex it is challenging to derive the underlying [Asset] from just its symbol name.
188+
* You can use [Config.registerAsset] to register assets upfront
188189
*/
189190
fun subscribe(vararg symbols: String) {
190191

@@ -206,22 +207,5 @@ class TiingoLiveFeed private constructor(
206207

207208
}
208209

209-
/**
210-
* Subscribe to quotes for the provided [assets].
211-
*
212-
* If no symbols are provided, all available data is subscribed to. That is a lot of data and can impact
213-
* your monthly quato quickly.
214-
*/
215-
fun subscribeAssets(vararg assets: Asset) {
216-
val tickers = mutableListOf<String>()
217-
for (asset in assets) {
218-
val tiingoTicker = asset.symbol.replace("[^a-zA-Z]+".toRegex(), "")
219-
this.handler.assets[tiingoTicker] = asset
220-
tickers.add(tiingoTicker)
221-
}
222-
223-
@Suppress("SpreadOperator")
224-
subscribe(*tickers.toTypedArray())
225-
}
226210

227211
}

roboquant-tiingo/src/test/kotlin/org/roboquant/samples/TiingoSamples.kt

+22-5
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,9 @@ package org.roboquant.samples
1919
import org.roboquant.Roboquant
2020
import org.roboquant.common.*
2121
import org.roboquant.feeds.AggregatorLiveFeed
22+
import org.roboquant.feeds.PriceAction
23+
import org.roboquant.feeds.filter
2224
import org.roboquant.loggers.ConsoleLogger
23-
import org.roboquant.loggers.MemoryLogger
2425
import org.roboquant.metrics.ProgressMetric
2526
import org.roboquant.strategies.EMAStrategy
2627
import org.roboquant.tiingo.TiingoHistoricFeed
@@ -33,6 +34,7 @@ internal class TiingoSamples {
3334
@Test
3435
@Ignore
3536
internal fun testLiveFeed() {
37+
System.setProperty(org.slf4j.simple.SimpleLogger.DEFAULT_LOG_LEVEL_KEY, "TRACE")
3638
val feed = TiingoLiveFeed.iex()
3739
feed.subscribe("AAPL", "TSLA")
3840
val rq = Roboquant(EMAStrategy(), ProgressMetric(), logger = ConsoleLogger())
@@ -46,8 +48,8 @@ internal class TiingoSamples {
4648
val iex = TiingoLiveFeed.iex()
4749
iex.subscribe()
4850
val feed = AggregatorLiveFeed(iex, 5.seconds)
49-
val rq = Roboquant(EMAStrategy(), ProgressMetric(), logger = MemoryLogger())
50-
rq.run(feed, Timeframe.next(3.minutes))
51+
val rq = Roboquant(EMAStrategy(), ProgressMetric(), logger = ConsoleLogger())
52+
rq.run(feed, Timeframe.next(5.minutes))
5153
println(rq.broker.account.fullSummary())
5254
}
5355

@@ -65,13 +67,28 @@ internal class TiingoSamples {
6567
@Ignore
6668
internal fun testLiveFeedCrypto() {
6769
val feed = TiingoLiveFeed.crypto()
68-
val asset = Asset("BNBFDUSD", AssetType.CRYPTO, "FDUSD")
69-
feed.subscribeAssets(asset)
70+
val asset = Asset("BNB/FDUSD", AssetType.CRYPTO, "FDUSD")
71+
Config.registerAsset("BNBFDUSD", asset)
72+
73+
feed.subscribe("BNBFDUSD")
7074
val rq = Roboquant(EMAStrategy(), ProgressMetric(), logger = ConsoleLogger())
7175
rq.run(feed, Timeframe.next(1.minutes))
7276
println(rq.broker.account.fullSummary())
7377
}
7478

79+
80+
@Test
81+
@Ignore
82+
internal fun testLiveFeedCryptoAll() {
83+
val feed = TiingoLiveFeed.crypto()
84+
feed.subscribe()
85+
feed.filter<PriceAction>(Timeframe.next(1.minutes)) {
86+
println(it)
87+
false
88+
}
89+
feed.close()
90+
}
91+
7592
@Test
7693
@Ignore
7794
internal fun historic() {

roboquant-tiingo/src/test/kotlin/org/roboquant/tiingo/TiingoTestIT.kt

+2-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ internal class TiingoTestIT {
5454
Config.getProperty("TEST_TIINGO") ?: return
5555
val feed = TiingoLiveFeed.crypto()
5656
val asset = Asset("BNBFDUSD", AssetType.CRYPTO, "FDUSD")
57-
feed.subscribeAssets(asset)
57+
Config.registerAsset("BNBFDUSD", asset)
58+
feed.subscribe("BNBFDUSD")
5859
val rq = Roboquant(EMAStrategy(), AccountMetric(), logger = LastEntryLogger())
5960
rq.run(feed, Timeframe.next(1.minutes))
6061
val actions = rq.logger.getMetric("progress.actions").latestRun()

roboquant/src/main/kotlin/org/roboquant/common/Config.kt

+11
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ object Config {
4242
private const val ONE_MB = 1024 * 1024
4343
private const val DEFAULT_SEED = 42L
4444

45+
// Maps a symbol to an asset
46+
private val symbolMap = mutableMapOf<String, Asset>()
47+
4548
// Used to handle Double imprecision
4649
internal const val EPS = 1e-10
4750

@@ -62,6 +65,14 @@ object Config {
6265
val build: String
6366
)
6467

68+
fun registerAsset(symbol: String, asset: Asset) {
69+
symbolMap[symbol] = asset
70+
}
71+
72+
fun getOrPutAsset(symbol: String, defaultValue: () -> Asset) : Asset {
73+
return symbolMap.getOrPut(symbol, defaultValue)
74+
}
75+
6576
/**
6677
* MetadataProvider about the build en environment
6778
*/

roboquant/src/main/kotlin/org/roboquant/feeds/LiveFeed.kt

+9-3
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,17 @@ abstract class LiveFeed(var heartbeatInterval: Long = 10_000) : Feed {
4848
get() = channels.isNotEmpty()
4949

5050
/**
51-
* Subclasses should use this method to send an event. If no channel is active, any event sent will be dropped.
52-
* This is a blocking call to ensure that events are send to multiple channels are in the order they
53-
* where delivered.
51+
* Subclasses should use this method or `sendAsync` to send an event. If no channel is active, any event
52+
* sent will be dropped.
5453
*/
5554
protected fun send(event: Event) = runBlocking {
55+
sendAsync(event)
56+
}
57+
58+
/**
59+
* Subclasses should use this method to send an event. If no channel is active, any event sent will be dropped.
60+
*/
61+
protected suspend fun sendAsync(event: Event) {
5662
for (channel in channels) {
5763
try {
5864
channel.send(event)

roboquant/src/test/kotlin/org/roboquant/feeds/LiveFeedTest.kt

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ internal class LiveFeedTest {
6868

6969
while (true) {
7070
try {
71-
send(event = Event(actions, Instant.now()))
71+
sendAsync(event = Event(actions, Instant.now()))
7272
delay(delayInMillis)
7373
if (stop) break
7474
} catch (e: Exception) {

0 commit comments

Comments
 (0)