Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Script engine support for Balance #983 #992

Merged
merged 16 commits into from
Aug 29, 2020
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,7 @@ public final class State {

@Override
public void set(String key, String value) {
HashMap<String, String> newState = new HashMap<>();
newState.putAll(job.state());
HashMap<String, String> newState = new HashMap<>(job.state());
newState.put(key, value);
jobControl.replace(job.toBuilder().state(newState).build());
}
Expand All @@ -367,8 +366,7 @@ public String get(String key) {

@Override
public void remove(String key) {
HashMap<String, String> newState = new HashMap<>();
newState.putAll(job.state());
HashMap<String, String> newState = new HashMap<>(job.state());
newState.remove(key);
jobControl.replace(job.toBuilder().state(newState).build());
}
Expand Down Expand Up @@ -428,13 +426,13 @@ public String toString() {
}

public interface StateManager<T> {
public T get(String key);
T get(String key);

public void set(String key, T value);
void set(String key, T value);

public void remove(String key);
void remove(String key);

public void increment(String key);
void increment(String key);
}

Disposable onInterval(Runnable runnable, long timeout, String description) {
Expand Down Expand Up @@ -470,7 +468,35 @@ private TickerSpec convertTickerSpec(JSObject tickerSpec) {
.build();
}

public Disposable onTick(
public class DisposableSubscription implements Disposable {
private ExchangeEventSubscription subscription;
badgerwithagun marked this conversation as resolved.
Show resolved Hide resolved
private Disposable disposable;
private String description;

public DisposableSubscription(ExchangeEventSubscription subs, Disposable disp, String desc) {
subscription = subs;
badgerwithagun marked this conversation as resolved.
Show resolved Hide resolved
disposable = disp;
description = desc;
}

@Override
public boolean isDisposed() {
return disposable.isDisposed();
}

@Override
public void dispose() {
SafelyDispose.of(disposable);
SafelyClose.the(subscription);
}

@Override
public String toString() {
return description;
}
}

public DisposableSubscription onTick(
io.reactivex.functions.Consumer<TickerEvent> handler,
TickerSpec tickerSpec,
MarketDataType type,
Expand All @@ -481,27 +507,10 @@ public Disposable onTick(

Disposable disposable = subscription.getTickers().subscribe(handler);

return new Disposable() {

@Override
public boolean isDisposed() {
return disposable.isDisposed();
}

@Override
public void dispose() {
SafelyDispose.of(disposable);
SafelyClose.the(subscription);
}

@Override
public String toString() {
return description;
}
};
return new DisposableSubscription(subscription, disposable, description);
}

public Disposable onBalance(
public DisposableSubscription onBalance(
io.reactivex.functions.Consumer<BalanceEvent> handler,
TickerSpec tickerSpec,
MarketDataType type,
Expand All @@ -512,27 +521,10 @@ public Disposable onBalance(

Disposable disposable = subscription.getBalances().subscribe(handler);

return new Disposable() {

@Override
public boolean isDisposed() {
return disposable.isDisposed();
}

@Override
public void dispose() {
SafelyDispose.of(disposable);
SafelyClose.the(subscription);
}

@Override
public String toString() {
return description;
}
};
return new DisposableSubscription(subscription, disposable, description);
}

public Disposable onOpenOrders(
public DisposableSubscription onOpenOrders(
io.reactivex.functions.Consumer<OpenOrdersEvent> handler,
TickerSpec tickerSpec,
MarketDataType type,
Expand All @@ -543,27 +535,10 @@ public Disposable onOpenOrders(

Disposable disposable = subscription.getOrderSnapshots().subscribe(handler);

return new Disposable() {

@Override
public boolean isDisposed() {
return disposable.isDisposed();
}

@Override
public void dispose() {
SafelyDispose.of(disposable);
SafelyClose.the(subscription);
}

@Override
public String toString() {
return description;
}
};
return new DisposableSubscription(subscription, disposable, description);
}

public Disposable onOrderBook(
public DisposableSubscription onOrderBook(
io.reactivex.functions.Consumer<OrderBookEvent> handler,
TickerSpec tickerSpec,
MarketDataType type,
Expand All @@ -574,27 +549,10 @@ public Disposable onOrderBook(

Disposable disposable = subscription.getOrderBooks().subscribe(handler);

return new Disposable() {

@Override
public boolean isDisposed() {
return disposable.isDisposed();
}

@Override
public void dispose() {
SafelyDispose.of(disposable);
SafelyClose.the(subscription);
}

@Override
public String toString() {
return description;
}
};
return new DisposableSubscription(subscription, disposable, description);
}

public Disposable onUserTrades(
public DisposableSubscription onUserTrades(
io.reactivex.functions.Consumer<UserTradeEvent> handler,
TickerSpec tickerSpec,
MarketDataType type,
Expand All @@ -605,24 +563,7 @@ public Disposable onUserTrades(

Disposable disposable = subscription.getUserTrades().subscribe(handler);

return new Disposable() {

@Override
public boolean isDisposed() {
return disposable.isDisposed();
}

@Override
public void dispose() {
SafelyDispose.of(disposable);
SafelyClose.the(subscription);
}

@Override
public String toString() {
return description;
}
};
return new DisposableSubscription(subscription, disposable, description);
}

private void notifyAndLogError(String message) {
Expand Down