@@ -36,6 +36,7 @@ import mu.KotlinLogging
36
36
import org.apache.commons.compress.compressors.deflate64.Deflate64CompressorInputStream
37
37
import org.springframework.util.StreamUtils
38
38
import reactor.core.publisher.Flux
39
+ import reactor.kotlin.core.publisher.toFlux
39
40
import reactor.netty.http.client.HttpClient
40
41
import java.math.BigDecimal
41
42
import java.nio.charset.Charset
@@ -44,7 +45,7 @@ import java.util.concurrent.ConcurrentHashMap
44
45
import kotlin.streams.toList
45
46
46
47
open class OkexWebsocketClient (
47
- private val baseUri : String = " wss://real.okex.com:10442 /ws/v3"
48
+ private val baseUri : String = " wss://real.okex.com:8443 /ws/v3"
48
49
) : AbstractExchangeWebsocketClient() {
49
50
private val log = KotlinLogging .logger {}
50
51
@@ -148,7 +149,7 @@ open class OkexWebsocketClient(
148
149
val subscribeMessages = subscribeTargets.stream()
149
150
.map { " ${it.baseCurrency.symbol} -${it.quoteCurrency.symbol} " }
150
151
.map { " {\" op\" : \" subscribe\" , \" args\" : [\" spot/trade:$it \" ]}" }
151
- .toList ()
152
+ .toFlux ()
152
153
153
154
return HttpClient .create()
154
155
.wiretap(log.isDebugEnabled)
@@ -160,7 +161,7 @@ open class OkexWebsocketClient(
160
161
.websocket()
161
162
.uri(baseUri)
162
163
.handle { inbound, outbound ->
163
- outbound.sendString(Flux .fromIterable< String >( subscribeMessages) )
164
+ outbound.sendString(subscribeMessages)
164
165
.then()
165
166
.thenMany(inbound.receive().asString())
166
167
}
0 commit comments