Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Script engine support for Balance #983 #992

Merged
merged 16 commits into from
Aug 29, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,11 @@
*/
package com.gruelbox.orko.job.script;

import static com.gruelbox.orko.exchange.MarketDataType.BALANCE;
import static com.gruelbox.orko.exchange.MarketDataType.OPEN_ORDERS;
import static com.gruelbox.orko.exchange.MarketDataType.ORDERBOOK;
import static com.gruelbox.orko.exchange.MarketDataType.TICKER;
import static com.gruelbox.orko.exchange.MarketDataType.USER_TRADE;
import static com.gruelbox.orko.job.LimitOrderJob.Direction.BUY;
import static com.gruelbox.orko.jobrun.spi.Status.FAILURE_PERMANENT;
import static com.gruelbox.orko.jobrun.spi.Status.RUNNING;
Expand All @@ -28,10 +32,15 @@
import com.google.inject.assistedinject.FactoryModuleBuilder;
import com.gruelbox.orko.auth.Hasher;
import com.gruelbox.orko.db.Transactionally;
import com.gruelbox.orko.exchange.BalanceEvent;
import com.gruelbox.orko.exchange.ExchangeEventRegistry;
import com.gruelbox.orko.exchange.ExchangeEventRegistry.ExchangeEventSubscription;
import com.gruelbox.orko.exchange.MarketDataSubscription;
import com.gruelbox.orko.exchange.MarketDataType;
import com.gruelbox.orko.exchange.OpenOrdersEvent;
import com.gruelbox.orko.exchange.OrderBookEvent;
import com.gruelbox.orko.exchange.TickerEvent;
import com.gruelbox.orko.exchange.UserTradeEvent;
import com.gruelbox.orko.job.LimitOrderJob;
import com.gruelbox.orko.job.LimitOrderJob.Direction;
import com.gruelbox.orko.jobrun.JobSubmitter;
Expand Down Expand Up @@ -256,6 +265,34 @@ public Disposable setTick(JSObject callback, JSObject tickerSpec) {
callback.toString());
}

public Disposable setBalance(JSObject callback, JSObject tickerSpec) {
return onBalance(
event -> processEvent(() -> callback.call(null, event)),
convertTickerSpec(tickerSpec),
callback.toString());
}

public Disposable setOpenOrders(JSObject callback, JSObject tickerSpec) {
return onOpenOrders(
event -> processEvent(() -> callback.call(null, event)),
convertTickerSpec(tickerSpec),
callback.toString());
}

public Disposable setOrderBook(JSObject callback, JSObject tickerSpec) {
return onOrderBook(
event -> processEvent(() -> callback.call(null, event)),
convertTickerSpec(tickerSpec),
callback.toString());
}

public Disposable setUserTrades(JSObject callback, JSObject tickerSpec) {
return onUserTrades(
event -> processEvent(() -> callback.call(null, event)),
convertTickerSpec(tickerSpec),
callback.toString());
}

public Disposable setInterval(JSObject callback, Integer timeout) {
return onInterval(
() -> processEvent(() -> callback.call(null)), timeout, callback.toString());
Expand Down Expand Up @@ -311,33 +348,31 @@ public final class State {
new StateManager<>() {

@Override
public void set(String key, String value) {
HashMap<String, String> newState = new HashMap<>();
newState.putAll(job.state());
public final void set(String key, String value) {
HashMap<String, String> newState = new HashMap<>(job.state());
newState.put(key, value);
jobControl.replace(job.toBuilder().state(newState).build());
}

@Override
public String get(String key) {
public final String get(String key) {
return job.state().get(key);
}

@Override
public void remove(String key) {
HashMap<String, String> newState = new HashMap<>();
newState.putAll(job.state());
public final void remove(String key) {
HashMap<String, String> newState = new HashMap<>(job.state());
newState.remove(key);
jobControl.replace(job.toBuilder().state(newState).build());
}

@Override
public String toString() {
public final String toString() {
return job.state().toString();
}

@Override
public void increment(String key) {
public final void increment(String key) {
String value = get(key);
try {
long asLong = Long.parseLong(value);
Expand Down Expand Up @@ -386,13 +421,13 @@ public String toString() {
}

public interface StateManager<T> {
public T get(String key);
T get(String key);

public void set(String key, T value);
void set(String key, T value);

public void remove(String key);
void remove(String key);

public void increment(String key);
void increment(String key);
}

Disposable onInterval(Runnable runnable, long timeout, String description) {
Expand Down Expand Up @@ -428,32 +463,98 @@ private TickerSpec convertTickerSpec(JSObject tickerSpec) {
.build();
}

public Disposable onTick(
public final class DisposableSubscription implements Disposable {
private final ExchangeEventSubscription subscription;
private final Disposable disposable;
private final String description;

public DisposableSubscription(
ExchangeEventSubscription subscription, Disposable disposable, String description) {
this.subscription = subscription;
this.disposable = disposable;
this.description = description;
}

@Override
public boolean isDisposed() {
return disposable.isDisposed();
}

@Override
public void dispose() {
SafelyDispose.of(disposable);
SafelyClose.the(subscription);
}

@Override
public String toString() {
return description;
}
}

public final DisposableSubscription onTick(
io.reactivex.functions.Consumer<TickerEvent> handler,
TickerSpec tickerSpec,
String description) {

ExchangeEventSubscription subscription =
exchangeEventRegistry.subscribe(MarketDataSubscription.create(tickerSpec, TICKER));

Disposable disposable = subscription.getTickers().subscribe(handler);

return new Disposable() {
return new DisposableSubscription(subscription, disposable, description);
}

@Override
public boolean isDisposed() {
return disposable.isDisposed();
}
public final DisposableSubscription onBalance(
io.reactivex.functions.Consumer<BalanceEvent> handler,
TickerSpec tickerSpec,
String description) {

@Override
public void dispose() {
SafelyDispose.of(disposable);
SafelyClose.the(subscription);
}
ExchangeEventSubscription subscription =
exchangeEventRegistry.subscribe(MarketDataSubscription.create(tickerSpec, BALANCE));

@Override
public String toString() {
return description;
}
};
Disposable disposable = subscription.getBalances().subscribe(handler);

return new DisposableSubscription(subscription, disposable, description);
}

public final DisposableSubscription onOpenOrders(
io.reactivex.functions.Consumer<OpenOrdersEvent> handler,
TickerSpec tickerSpec,
String description) {

ExchangeEventSubscription subscription =
exchangeEventRegistry.subscribe(MarketDataSubscription.create(tickerSpec, OPEN_ORDERS));

Disposable disposable = subscription.getOrderSnapshots().subscribe(handler);

return new DisposableSubscription(subscription, disposable, description);
}

public final DisposableSubscription onOrderBook(
io.reactivex.functions.Consumer<OrderBookEvent> handler,
TickerSpec tickerSpec,
String description) {

ExchangeEventSubscription subscription =
exchangeEventRegistry.subscribe(MarketDataSubscription.create(tickerSpec, ORDERBOOK));

Disposable disposable = subscription.getOrderBooks().subscribe(handler);

return new DisposableSubscription(subscription, disposable, description);
}

public final DisposableSubscription onUserTrades(
io.reactivex.functions.Consumer<UserTradeEvent> handler,
TickerSpec tickerSpec,
String description) {

ExchangeEventSubscription subscription =
exchangeEventRegistry.subscribe(MarketDataSubscription.create(tickerSpec, USER_TRADE));

Disposable disposable = subscription.getUserTrades().subscribe(handler);

return new DisposableSubscription(subscription, disposable, description);
}

private void notifyAndLogError(String message) {
Expand Down