@@ -27,21 +27,20 @@ import com.njkim.reactivecrypto.core.common.model.order.TickData
27
27
import com.njkim.reactivecrypto.core.common.model.order.TradeSideType
28
28
import com.njkim.reactivecrypto.core.common.util.toEpochMilli
29
29
import com.njkim.reactivecrypto.core.websocket.AbstractExchangeWebsocketClient
30
- import com.njkim.reactivecrypto.hubi.model.HubiMessageFrame
31
- import com.njkim.reactivecrypto.hubi.model.HubiOrderBook
32
- import com.njkim.reactivecrypto.hubi.model.HubiTickDataWrapper
30
+ import com.njkim.reactivecrypto.hubi.model.HubiDepthResponse
33
31
import mu.KotlinLogging
34
32
import reactor.core.publisher.Flux
35
33
import reactor.kotlin.core.publisher.toFlux
36
34
import reactor.netty.http.client.HttpClient
35
+ import java.math.BigDecimal
37
36
import java.time.ZonedDateTime
38
37
import java.util.concurrent.ConcurrentHashMap
39
38
import java.util.concurrent.atomic.AtomicLong
40
39
41
40
class HubiWebsocketClient : AbstractExchangeWebsocketClient () {
42
41
private val log = KotlinLogging .logger {}
43
42
44
- private val baseUri = " wss://api.hubi.com/ws/connect/v1 "
43
+ private val baseUri = " wss://api.hubi.com/ws/futures/public/market "
45
44
46
45
private val objectMapper: ObjectMapper = createJsonObjectMapper().objectMapper()
47
46
@@ -50,12 +49,12 @@ class HubiWebsocketClient : AbstractExchangeWebsocketClient() {
50
49
}
51
50
52
51
override fun createDepthSnapshot (subscribeTargets : List <CurrencyPair >): Flux <OrderBook > {
53
- val subscribeRequests = subscribeTargets
54
- .map { " ${it.baseCurrency.symbol}${it.quoteCurrency.symbol} " .toLowerCase() }
52
+ val currentOrderBookMap: MutableMap <CurrencyPair , OrderBook > = ConcurrentHashMap ()
53
+
54
+ val subscribeRequests = subscribeTargets.asSequence()
55
+ .map { " ${it.baseCurrency.symbol}${it.quoteCurrency.symbol} " .toUpperCase() }
55
56
.map { symbol ->
56
- """
57
- {"channel":"depth_all","symbol":"$symbol "}
58
- """ .trimIndent()
57
+ """ {"op":"subscribe", "channel":"/api/depth/depth", "key":"$symbol "}"""
59
58
}
60
59
.toFlux()
61
60
@@ -68,66 +67,119 @@ class HubiWebsocketClient : AbstractExchangeWebsocketClient() {
68
67
.then()
69
68
.thenMany(inbound.aggregateFrames().receive().asString())
70
69
}
71
- .filter { it.contains(" \" dataType \" : \" depth_all \ "" ) }
72
- .map { objectMapper.readValue<HubiMessageFrame < HubiOrderBook > >(it) }
70
+ .filter { it.contains(""" "event":"/api/depth/depth" " "" ) }
71
+ .map { objectMapper.readValue<HubiDepthResponse >(it) }
73
72
.map { messageFrame ->
74
73
val eventTime = ZonedDateTime .now()
75
74
OrderBook (
76
- " ${messageFrame.symbol }${eventTime.toEpochMilli()} " ,
77
- messageFrame.symbol ,
75
+ " ${messageFrame.key }${eventTime.toEpochMilli()} " ,
76
+ messageFrame.key ,
78
77
eventTime,
79
78
ExchangeVendor .HUBI ,
80
- messageFrame.data.bids.map { OrderBookUnit (it.price, it.amount, TradeSideType .BUY , null ) },
81
- messageFrame.data.asks.map { OrderBookUnit (it.price, it.amount, TradeSideType .SELL , null ) }.sortedBy { it.price }
79
+ messageFrame.buyDepth.map { OrderBookUnit (it.price, it.qty, TradeSideType .BUY , it.count) },
80
+ messageFrame.sellDepth.map { OrderBookUnit (it.price, it.qty, TradeSideType .SELL , it.count) }
81
+ .sortedBy { it.price }
82
82
)
83
83
}
84
+ .map { orderBook ->
85
+ if (! currentOrderBookMap.containsKey(orderBook.currencyPair)) {
86
+ val filteredOrderBook = orderBook.copy(
87
+ bids = orderBook.bids.filter { it.quantity > BigDecimal .ZERO },
88
+ asks = orderBook.asks.filter { it.quantity > BigDecimal .ZERO }
89
+ )
90
+ currentOrderBookMap[orderBook.currencyPair] = filteredOrderBook
91
+ return @map filteredOrderBook
92
+ }
93
+
94
+ val prevOrderBook = currentOrderBookMap[orderBook.currencyPair]!!
95
+
96
+ val askMap: MutableMap <BigDecimal , OrderBookUnit > = prevOrderBook.asks
97
+ .associateBy { it.price.stripTrailingZeros() }
98
+ .toMutableMap()
99
+
100
+ orderBook.asks.forEach { updatedAsk ->
101
+ askMap.compute(updatedAsk.price.stripTrailingZeros()) { _, oldValue ->
102
+ when {
103
+ updatedAsk.quantity <= BigDecimal .ZERO -> null
104
+ oldValue == null -> updatedAsk
105
+ else -> oldValue.copy(
106
+ quantity = updatedAsk.quantity,
107
+ orderNumbers = updatedAsk.orderNumbers
108
+ )
109
+ }
110
+ }
111
+ }
112
+
113
+ val bidMap: MutableMap <BigDecimal , OrderBookUnit > = prevOrderBook.bids
114
+ .associateBy { it.price.stripTrailingZeros() }
115
+ .toMutableMap()
116
+
117
+ orderBook.bids.forEach { updatedBid ->
118
+ bidMap.compute(updatedBid.price.stripTrailingZeros()) { _, oldValue ->
119
+ when {
120
+ updatedBid.quantity <= BigDecimal .ZERO -> null
121
+ oldValue == null -> updatedBid
122
+ else -> oldValue.copy(
123
+ quantity = updatedBid.quantity,
124
+ orderNumbers = updatedBid.orderNumbers
125
+ )
126
+ }
127
+ }
128
+ }
129
+
130
+ val currentOrderBook = prevOrderBook.copy(
131
+ eventTime = orderBook.eventTime,
132
+ asks = askMap.values.sortedBy { orderBookUnit -> orderBookUnit.price },
133
+ bids = bidMap.values.sortedByDescending { orderBookUnit -> orderBookUnit.price }
134
+ )
135
+ currentOrderBookMap[currentOrderBook.currencyPair] = currentOrderBook
136
+ currentOrderBook
137
+ }
138
+ .doFinally { currentOrderBookMap.clear() } // cleanup memory limit orderBook when disconnected
84
139
}
85
140
86
141
override fun createTradeWebsocket (subscribeTargets : List <CurrencyPair >): Flux <TickData > {
87
142
val lastPublishedTimestamp: MutableMap <CurrencyPair , AtomicLong > = ConcurrentHashMap ()
88
143
89
- val subscribeRequests = subscribeTargets
90
- .map { " ${it.baseCurrency.symbol}${it.quoteCurrency.symbol} " .toLowerCase () }
144
+ val subscribeRequests = subscribeTargets.asSequence()
145
+ .map { " ${it.baseCurrency.symbol}${it.quoteCurrency.symbol} " .toUpperCase () }
91
146
.map { symbol ->
92
- """
93
- {"channel":"trade_history","symbol":"$symbol "}
94
- """ .trimIndent()
147
+ """ {"op":"subscribe", "channel":"/api/depth/depth", "key":"$symbol "}"""
95
148
}
96
149
.toFlux()
97
150
98
151
return HttpClient .create()
99
152
.wiretap(log.isDebugEnabled)
100
- .websocket(65536 )
153
+ .websocket(262144 )
101
154
.uri(baseUri)
102
155
.handle { inbound, outbound ->
103
156
outbound.sendString(subscribeRequests)
104
157
.then()
105
- .thenMany(inbound.aggregateFrames(65536 ).receive().asString())
158
+ .thenMany(inbound.aggregateFrames().receive().asString())
106
159
}
107
- .filter { it.contains(" \" dataType\" :\" trade_history\" " ) }
108
- .map { objectMapper.readValue<HubiMessageFrame <HubiTickDataWrapper >>(it) }
109
- .map { it.data }
160
+ .filter { it.contains(""" "event":"/api/depth/depth"""" ) }
161
+ .map { objectMapper.readValue<HubiDepthResponse >(it) }
110
162
.flatMapIterable {
111
163
it.trades
112
164
.takeWhile { hubiTickData ->
113
165
// hubi trade history response contain history data
114
166
val lastTradeEpochMilli =
115
167
lastPublishedTimestamp.computeIfAbsent(hubiTickData.symbol) { AtomicLong () }
116
- val isNew = hubiTickData.time .toEpochMilli() > lastTradeEpochMilli.toLong()
168
+ val isNew = hubiTickData.timestamp .toEpochMilli() > lastTradeEpochMilli.toLong()
117
169
if (isNew) {
118
- lastTradeEpochMilli.set(hubiTickData.time .toEpochMilli())
170
+ lastTradeEpochMilli.set(hubiTickData.timestamp .toEpochMilli())
119
171
}
120
172
isNew
121
173
}
122
174
.map { hubiTickData ->
123
175
TickData (
124
- " ${hubiTickData.symbol}${hubiTickData.time } " ,
125
- hubiTickData.time ,
176
+ " ${hubiTickData.symbol}${hubiTickData.timestamp } " ,
177
+ hubiTickData.timestamp ,
126
178
hubiTickData.price,
127
- hubiTickData.amount ,
179
+ hubiTickData.qty ,
128
180
hubiTickData.symbol,
129
181
ExchangeVendor .HUBI ,
130
- hubiTickData.type
182
+ if ( hubiTickData.buyActive) TradeSideType . BUY else TradeSideType . SELL
131
183
)
132
184
}
133
185
.reversed()
0 commit comments