diff --git a/src/main/java/com/salesforce/emp/connector/EmpConnector.java b/src/main/java/com/salesforce/emp/connector/EmpConnector.java index bd7412f..e40425b 100644 --- a/src/main/java/com/salesforce/emp/connector/EmpConnector.java +++ b/src/main/java/com/salesforce/emp/connector/EmpConnector.java @@ -130,6 +130,7 @@ public Future start() { if (running.compareAndSet(false, true)) { addListener(Channel.META_CONNECT, new AuthFailureListener()); addListener(Channel.META_HANDSHAKE, new AuthFailureListener()); + addListener(Channel.META_CONNECT, new Client403FailureListener()); return connect(); } CompletableFuture future = new CompletableFuture(); @@ -183,11 +184,31 @@ public Future subscribe(String topic, long replayFrom, Consum throw new IllegalStateException(String.format("Connector[%s} has not been started", parameters.endpoint())); } + // -- START BJA + // Windows eats the prefix '/', so let's ensure this is present + if (!topic.startsWith("/")) { + topic = "/" + topic; + } + Object result = replay.putIfAbsent(topic, replayFrom); + // if there is already a mapped value + if (result != null) { + // check the replay Id may not be valid; thus remove, as it will be added below + if ((long) result != replayFrom) { + replay.remove(topic, (long)result); + } else { + // these are the same key/value pair + throw new IllegalStateException( + String.format("Already subscribed to %s [%s]", topic, parameters.endpoint())); + } + } + // -- END BJA + /* if (replay.putIfAbsent(topic, replayFrom) != null) { throw new IllegalStateException(String.format("Already subscribed to %s [%s]", topic, parameters.endpoint())); } + */ SubscriptionImpl subscription = new SubscriptionImpl(topic, consumer); @@ -343,6 +364,44 @@ private String getFailureReason(Message message) { return failureReason; } } + + /** + * BJA -- you can cannibalize ( creating a base class (for 403 and auth) if needed; however, this is explicit) + * Listens to /meta/connect channel messages and handles 403 errors, where client needs + * to check and restart due to Org daily limits exceeded. + */ + private class Client403FailureListener implements ClientSessionChannel.MessageListener { + + @Override + public void onMessage(ClientSessionChannel channel, Message message) { + if (!message.isSuccessful()) { + if (is403Error(message)) { + stop(); + connect(); + } + } + } + + private boolean is403Error(Message message) { + String error = (String)message.get(Message.ERROR_FIELD); + String failureReason = getFailureReason(message); + + return (error != null && error.startsWith("403")) || + (failureReason != null && failureReason.startsWith("403")); + } + + private String getFailureReason(Message message) { + String failureReason = null; + Map ext = message.getExt(); + if (ext != null) { + Map sfdc = (Map)ext.get("sfdc"); + if (sfdc != null) { + failureReason = (String)sfdc.get("failureReason"); + } + } + return failureReason; + } + } private static class MessageListenerInfo { private String channelName;