diff --git a/README.md b/README.md index da0aebc..71dac3d 100644 --- a/README.md +++ b/README.md @@ -111,7 +111,38 @@ Authentication becomes invalid when a Salesforce session is invalidated or an ac // Set the bearer token function connector.setBearerTokenProvider(bearerTokenProvider); -For a full example, see [LoginExample.java](src/main/java/com/salesforce/emp/connector/example/LoginExample.java). +## Proxy +To connect to Salesforce through a proxy, you can use the ProxyBayeuxParameters class, adding it the proxy configuration, and eventually, also the authentication + + BayeuxParameters params = LoginHelper.login(new URL(loginUrl), argv[0], argv[1]); + + + Consumer> consumer = event -> System.out + .println(String.format("Received:\n%s", JSON.toString(event))); + + CustomBayeuxParameter dbp = new CustomBayeuxParameter(params); + Address a = new Address(proxyHost, proxyPort); + HttpProxy p = new HttpProxy(a, proxyProtocol.equals("https")); + dbp.addProxy(p); + if (!proxyUsername.isEmpty()) { + BasicAuthentication auth = new BasicAuthentication( + new URI(String.format("%s://%s:%s", proxyProtocol, proxyHost, proxyPort)), "*", proxyUsername, + proxyPassword) { + @Override + public boolean matches(String type, URI uri, String realm) { + realm = "*"; + return super.matches(type, uri, realm); + } + + }; + dbp.addAuthentication(auth); + } + + EmpConnector connector = new EmpConnector(dbp); + + connector.start().get(5, TimeUnit.SECONDS); + +For a full example, see [LoginProxyExample.java](src/main/java/com/salesforce/emp/connector/example/LoginProxyExample.java). ## Documentation For more information about the components of the EMP Connector and a walkthrough, see the [Java Client Example](https://developer.salesforce.com/docs/atlas.en-us.api_streaming.meta/api_streaming/code_sample_java_client_intro.htm) diff --git a/pom.xml b/pom.xml index 3dff04c..5a5cf04 100644 --- a/pom.xml +++ b/pom.xml @@ -26,6 +26,12 @@ Salesforce.com, Inc. http://www.salesforce.com + + Alessandro Casolla + casolla.alessandro@gmail.com + Softphone srl + http://www.softphone.it + @@ -68,13 +74,13 @@ org.cometd.java cometd-java-client - 3.1.4 + 4.0.4 ch.qos.logback logback-classic 1.1.6 - test + test junit @@ -82,6 +88,13 @@ 4.10 test + + + org.slf4j + slf4j-simple + 1.7.21 + test + diff --git a/src/main/java/com/salesforce/emp/connector/BayeuxParameters.java b/src/main/java/com/salesforce/emp/connector/BayeuxParameters.java index 383ce2f..163c49a 100644 --- a/src/main/java/com/salesforce/emp/connector/BayeuxParameters.java +++ b/src/main/java/com/salesforce/emp/connector/BayeuxParameters.java @@ -83,7 +83,14 @@ default int maxNetworkDelay() { /** * @return a list of proxies to use for outbound connections */ - default Collection proxies() { + default Collection proxies() { + return Collections.emptyList(); + } + + /** + * @return a list of authentications to use for proxies + */ + default Collection authentications() { return Collections.emptyList(); } diff --git a/src/main/java/com/salesforce/emp/connector/DelegatingBayeuxParameters.java b/src/main/java/com/salesforce/emp/connector/DelegatingBayeuxParameters.java index 22bfc85..bdea2e3 100644 --- a/src/main/java/com/salesforce/emp/connector/DelegatingBayeuxParameters.java +++ b/src/main/java/com/salesforce/emp/connector/DelegatingBayeuxParameters.java @@ -61,7 +61,7 @@ public int maxNetworkDelay() { } @Override - public Collection proxies() { + public Collection proxies() { return parameters.proxies(); } diff --git a/src/main/java/com/salesforce/emp/connector/EmpConnector.java b/src/main/java/com/salesforce/emp/connector/EmpConnector.java index 75b1c0c..0ae2a90 100644 --- a/src/main/java/com/salesforce/emp/connector/EmpConnector.java +++ b/src/main/java/com/salesforce/emp/connector/EmpConnector.java @@ -7,21 +7,27 @@ package com.salesforce.emp.connector; import java.net.ConnectException; -import java.util.Arrays; -import java.util.List; +import java.net.URISyntaxException; import java.util.Map; import java.util.Set; -import java.util.concurrent.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Consumer; import java.util.function.Function; import org.cometd.bayeux.Channel; import org.cometd.bayeux.Message; +import org.cometd.bayeux.client.ClientSession.MessageListener; import org.cometd.bayeux.client.ClientSessionChannel; import org.cometd.client.BayeuxClient; import org.cometd.client.transport.LongPollingTransport; import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.Authentication; +import org.eclipse.jetty.client.api.AuthenticationStore; import org.eclipse.jetty.client.api.Request; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,357 +37,375 @@ * @since API v37.0 */ public class EmpConnector { - private static final String ERROR = "error"; - private static final String FAILURE = "failure"; - - private class SubscriptionImpl implements TopicSubscription { - private final String topic; - private final Consumer> consumer; - - private SubscriptionImpl(String topic, Consumer> consumer) { - this.topic = topic; - this.consumer = consumer; - subscriptions.add(this); - } - - /* - * (non-Javadoc) - * @see com.salesforce.emp.connector.Subscription#cancel() - */ - @Override - public void cancel() { - replay.remove(topicWithoutQueryString(topic)); - if (running.get() && client != null) { - client.getChannel(topic).unsubscribe(); - subscriptions.remove(this); - } - } - - /* - * (non-Javadoc) - * @see com.salesforce.emp.connector.Subscription#getReplay() - */ - @Override - public long getReplayFrom() { - return replay.getOrDefault(topicWithoutQueryString(topic), REPLAY_FROM_EARLIEST); - } - - /* - * (non-Javadoc) - * @see com.salesforce.emp.connector.Subscription#getTopic() - */ - @Override - public String getTopic() { - return topic; - } - - @Override - public String toString() { - return String.format("Subscription [%s:%s]", getTopic(), getReplayFrom()); - } - - Future subscribe() { - Long replayFrom = getReplayFrom(); - ClientSessionChannel channel = client.getChannel(topic); - CompletableFuture future = new CompletableFuture<>(); - channel.subscribe((c, message) -> consumer.accept(message.getDataAsMap()), (c, message) -> { - if (message.isSuccessful()) { - future.complete(this); - } else { - Object error = message.get(ERROR); - if (error == null) { - error = message.get(FAILURE); - } - future.completeExceptionally( - new CannotSubscribe(parameters.endpoint(), topic, replayFrom, error != null ? error : message)); - } - }); - return future; - } - } - - public static long REPLAY_FROM_EARLIEST = -2L; - public static long REPLAY_FROM_TIP = -1L; - - private static String AUTHORIZATION = "Authorization"; - private static final Logger log = LoggerFactory.getLogger(EmpConnector.class); - - private volatile BayeuxClient client; - private final HttpClient httpClient; - private final BayeuxParameters parameters; - private final ConcurrentMap replay = new ConcurrentHashMap<>(); - private final AtomicBoolean running = new AtomicBoolean(); - - private final Set subscriptions = new CopyOnWriteArraySet<>(); - private final Set listenerInfos = new CopyOnWriteArraySet<>(); - - private Function bearerTokenProvider; - private AtomicBoolean reauthenticate = new AtomicBoolean(false); - - public EmpConnector(BayeuxParameters parameters) { - this.parameters = parameters; - httpClient = new HttpClient(parameters.sslContextFactory()); - httpClient.getProxyConfiguration().getProxies().addAll(parameters.proxies()); - } - - /** - * Start the connector. - * @return true if connection was established, false otherwise - */ - public Future start() { - if (running.compareAndSet(false, true)) { - addListener(Channel.META_CONNECT, new AuthFailureListener()); - addListener(Channel.META_HANDSHAKE, new AuthFailureListener()); - replay.clear(); - return connect(); - } - CompletableFuture future = new CompletableFuture(); - future.complete(true); - return future; - } - - /** - * Stop the connector - */ - public void stop() { - if (!running.compareAndSet(true, false)) { - return; - } - if (client != null) { - log.info("Disconnecting Bayeux Client in EmpConnector"); - client.disconnect(); - client = null; - } - if (httpClient != null) { - try { - httpClient.stop(); - } catch (Exception e) { - log.error("Unable to stop HTTP transport[{}]", parameters.endpoint(), e); - } - } - } - - /** - * Set a bearer token / session id provider function that takes a boolean as input and returns a valid token. - * If the input is true, the provider function is supposed to re-authenticate with the Salesforce server - * and get a fresh session id or token. - * - * @param bearerTokenProvider a bearer token provider function. - */ - public void setBearerTokenProvider(Function bearerTokenProvider) { - this.bearerTokenProvider = bearerTokenProvider; - } - - /** - * Subscribe to a topic, receiving events after the replayFrom position - * - * @param topic - * - the topic to subscribe to - * @param replayFrom - * - the replayFrom position in the event stream - * @param consumer - * - the consumer of the events - * @return a Future returning the Subscription - on completion returns a Subscription or throws a CannotSubscribe - * exception - */ - public Future subscribe(String topic, long replayFrom, Consumer> consumer) { - if (!running.get()) { - throw new IllegalStateException(String.format("Connector[%s} has not been started", - parameters.endpoint())); - } - topic = topic.replaceAll("/$", ""); - - final String topicWithoutQueryString = topicWithoutQueryString(topic); - if (replay.putIfAbsent(topicWithoutQueryString, replayFrom) != null) { - throw new IllegalStateException(String.format("Already subscribed to %s [%s]", - topic, parameters.endpoint())); - } - - SubscriptionImpl subscription = new SubscriptionImpl(topic, consumer); - - return subscription.subscribe(); - } - - /** - * Subscribe to a topic, receiving events from the earliest event position in the stream - * - * @param topic - * - the topic to subscribe to - * @param consumer - * - the consumer of the events - * @return a Future returning the Subscription - on completion returns a Subscription or throws a CannotSubscribe - * exception - */ - public Future subscribeEarliest(String topic, Consumer> consumer) { - return subscribe(topic, REPLAY_FROM_EARLIEST, consumer); - } - - /** - * Subscribe to a topic, receiving events from the latest event position in the stream - * - * @param topic - * - the topic to subscribe to - * @param consumer - * - the consumer of the events - * @return a Future returning the Subscription - on completion returns a Subscription or throws a CannotSubscribe - * exception - */ - public Future subscribeTip(String topic, Consumer> consumer) { - return subscribe(topic, REPLAY_FROM_TIP, consumer); - } - - public EmpConnector addListener(String channel, ClientSessionChannel.MessageListener messageListener) { - listenerInfos.add(new MessageListenerInfo(channel, messageListener)); - return this; - } - - public boolean isConnected() { - return client != null && client.isConnected(); - } - - public boolean isDisconnected() { - return client == null || client.isDisconnected(); - } - - public boolean isHandshook() { - return client != null && client.isHandshook(); - } - - public long getLastReplayId(String topic) { - return replay.get(topic); - } - - private static String topicWithoutQueryString(String fullTopic) { - return fullTopic.split("\\?")[0]; - } - - private Future connect() { - log.info("EmpConnector connecting"); - CompletableFuture future = new CompletableFuture<>(); - - try { - httpClient.start(); - } catch (Exception e) { - log.error("Unable to start HTTP transport[{}]", parameters.endpoint(), e); - running.set(false); - future.complete(false); - return future; - } - - String bearerToken = bearerToken(); - - LongPollingTransport httpTransport = new LongPollingTransport(parameters.longPollingOptions(), httpClient) { - @Override - protected void customize(Request request) { - request.header(AUTHORIZATION, bearerToken); - } - }; - - client = new BayeuxClient(parameters.endpoint().toExternalForm(), httpTransport); - - client.addExtension(new ReplayExtension(replay)); - - addListeners(client); - - client.handshake((c, m) -> { - if (!m.isSuccessful()) { - Object error = m.get(ERROR); - if (error == null) { - error = m.get(FAILURE); - } - future.completeExceptionally(new ConnectException( - String.format("Cannot connect [%s] : %s", parameters.endpoint(), error))); - running.set(false); - } else { - subscriptions.forEach(SubscriptionImpl::subscribe); - future.complete(true); - } - }); - - return future; - } - - private void addListeners(BayeuxClient client) { - for (MessageListenerInfo info : listenerInfos) { - client.getChannel(info.getChannelName()).addListener(info.getMessageListener()); - } - } - - private String bearerToken() { - String bearerToken; - if (bearerTokenProvider != null) { - bearerToken = bearerTokenProvider.apply(reauthenticate.get()); - reauthenticate.compareAndSet(true, false); - } else { - bearerToken = parameters.bearerToken(); - } - - return bearerToken; - } - - private void reconnect() { - if (running.compareAndSet(false, true)) { - connect(); - } else { - log.error("The current value of running is not as we expect, this means our reconnection may not happen"); - } - } - - /** - * Listens to /meta/connect channel messages and handles 401 errors, where client needs - * to reauthenticate. - */ - private class AuthFailureListener implements ClientSessionChannel.MessageListener { - private static final String ERROR_401 = "401"; - private static final String ERROR_403 = "403"; - - @Override - public void onMessage(ClientSessionChannel channel, Message message) { - if (!message.isSuccessful()) { - if (isError(message, ERROR_401) || isError(message, ERROR_403)) { - reauthenticate.set(true); - stop(); - reconnect(); - } - } - } - - private boolean isError(Message message, String errorCode) { - String error = (String)message.get(Message.ERROR_FIELD); - String failureReason = getFailureReason(message); - - return (error != null && error.startsWith(errorCode)) || - (failureReason != null && failureReason.startsWith(errorCode)); - } - - private String getFailureReason(Message message) { - String failureReason = null; - Map ext = message.getExt(); - if (ext != null) { - Map sfdc = (Map)ext.get("sfdc"); - if (sfdc != null) { - failureReason = (String)sfdc.get("failureReason"); - } - } - return failureReason; - } - } - - private static class MessageListenerInfo { - private String channelName; - private ClientSessionChannel.MessageListener messageListener; - - MessageListenerInfo(String channelName, ClientSessionChannel.MessageListener messageListener) { - this.channelName = channelName; - this.messageListener = messageListener; - } - - String getChannelName() { - return channelName; - } - - ClientSessionChannel.MessageListener getMessageListener() { - return messageListener; - } - } + private static final String ERROR = "error"; + private static final String FAILURE = "failure"; + + private class SubscriptionImpl implements TopicSubscription { + private final String topic; + private final Consumer> consumer; + + private SubscriptionImpl(String topic, Consumer> consumer) { + this.topic = topic; + this.consumer = consumer; + subscriptions.add(this); + } + + /* + * (non-Javadoc) + * + * @see com.salesforce.emp.connector.Subscription#cancel() + */ + @Override + public void cancel() { + replay.remove(topicWithoutQueryString(topic)); + if (running.get() && client != null) { + client.getChannel(topic).unsubscribe(); + subscriptions.remove(this); + } + } + + /* + * (non-Javadoc) + * + * @see com.salesforce.emp.connector.Subscription#getReplay() + */ + @Override + public long getReplayFrom() { + return replay.getOrDefault(topicWithoutQueryString(topic), REPLAY_FROM_EARLIEST); + } + + /* + * (non-Javadoc) + * + * @see com.salesforce.emp.connector.Subscription#getTopic() + */ + @Override + public String getTopic() { + return topic; + } + + @Override + public String toString() { + return String.format("Subscription [%s:%s]", getTopic(), getReplayFrom()); + } + + Future subscribe() { + Long replayFrom = getReplayFrom(); + ClientSessionChannel channel = client.getChannel(topic); + CompletableFuture future = new CompletableFuture<>(); + + channel.subscribe((c, message) -> consumer.accept(message.getDataAsMap()), (c, message) -> { + if (message.isSuccessful()) { + future.complete(this); + } else { + Object error = message.get(ERROR); + if (error == null) { + error = message.get(FAILURE); + } + future.completeExceptionally(new CannotSubscribe(parameters.endpoint(), topic, replayFrom, + error != null ? error : message)); + } + }); + return future; + } + } + + public static long REPLAY_FROM_EARLIEST = -2L; + public static long REPLAY_FROM_TIP = -1L; + + private static String AUTHORIZATION = "Authorization"; + private static final Logger logger = LoggerFactory.getLogger(EmpConnector.class); + + private volatile BayeuxClient client; + private final HttpClient httpClient; + private final BayeuxParameters parameters; + private final ConcurrentMap replay = new ConcurrentHashMap<>(); + private final AtomicBoolean running = new AtomicBoolean(); + + private final Set subscriptions = new CopyOnWriteArraySet<>(); + private final Set listenerInfos = new CopyOnWriteArraySet<>(); + + private Function bearerTokenProvider; + private AtomicBoolean reauthenticate = new AtomicBoolean(false); + + public EmpConnector(BayeuxParameters parameters) throws URISyntaxException { + this.parameters = parameters; + httpClient = new HttpClient(parameters.sslContextFactory()); + httpClient.getProxyConfiguration().getProxies().addAll(parameters.proxies()); + logger.info("proxyes : {}", parameters.proxies()); + AuthenticationStore authenticationStore = httpClient.getAuthenticationStore(); + for (Authentication auth : parameters.authentications()) { + logger.info("proxy auth: {}", parameters.proxies()); + authenticationStore.addAuthentication(auth); + } + + } + + /** + * Start the connector. + * + * @return true if connection was established, false otherwise + */ + public Future start() { + if (running.compareAndSet(false, true)) { + addListener(Channel.META_CONNECT, new AuthFailureListener()); + addListener(Channel.META_HANDSHAKE, new AuthFailureListener()); + replay.clear(); + return connect(); + } + CompletableFuture future = new CompletableFuture(); + future.complete(true); + return future; + } + + /** + * Stop the connector + */ + public void stop() { + if (!running.compareAndSet(true, false)) { + return; + } + if (client != null) { + logger.info("Disconnecting Bayeux Client in EmpConnector"); + client.disconnect(); + client = null; + } + if (httpClient != null) { + try { + httpClient.stop(); + } catch (Exception e) { + logger.error("Unable to stop HTTP transport[{}]", parameters.endpoint(), e); + } + } + } + + /** + * Set a bearer token / session id provider function that takes a boolean as + * input and returns a valid token. If the input is true, the provider function + * is supposed to re-authenticate with the Salesforce server and get a fresh + * session id or token. + * + * @param bearerTokenProvider a bearer token provider function. + */ + public void setBearerTokenProvider(Function bearerTokenProvider) { + this.bearerTokenProvider = bearerTokenProvider; + } + + /** + * Subscribe to a topic, receiving events after the replayFrom position + * + * @param topic - the topic to subscribe to + * @param replayFrom - the replayFrom position in the event stream + * @param consumer - the consumer of the events + * @return a Future returning the Subscription - on completion returns a + * Subscription or throws a CannotSubscribe exception + */ + public Future subscribe(String topic, long replayFrom, Consumer> consumer) { + if (!running.get()) { + throw new IllegalStateException(String.format("Connector[%s} has not been started", parameters.endpoint())); + } + topic = topic.replaceAll("/$", ""); + + final String topicWithoutQueryString = topicWithoutQueryString(topic); + if (replay.putIfAbsent(topicWithoutQueryString, replayFrom) != null) { + throw new IllegalStateException( + String.format("Already subscribed to %s [%s]", topic, parameters.endpoint())); + } + + SubscriptionImpl subscription = new SubscriptionImpl(topic, consumer); + + return subscription.subscribe(); + } + + /** + * Subscribe to a topic, receiving events from the earliest event position in + * the stream + * + * @param topic - the topic to subscribe to + * @param consumer - the consumer of the events + * @return a Future returning the Subscription - on completion returns a + * Subscription or throws a CannotSubscribe exception + */ + public Future subscribeEarliest(String topic, Consumer> consumer) { + return subscribe(topic, REPLAY_FROM_EARLIEST, consumer); + } + + /** + * Subscribe to a topic, receiving events from the latest event position in the + * stream + * + * @param topic - the topic to subscribe to + * @param consumer - the consumer of the events + * @return a Future returning the Subscription - on completion returns a + * Subscription or throws a CannotSubscribe exception + */ + public Future subscribeTip(String topic, Consumer> consumer) { + return subscribe(topic, REPLAY_FROM_TIP, consumer); + } + + public EmpConnector addListener(String channel, ClientSessionChannel.MessageListener messageListener) { + listenerInfos.add(new MessageListenerInfo(channel, messageListener)); + return this; + } + + public boolean isConnected() { + return client != null && client.isConnected(); + } + + public boolean isDisconnected() { + return client == null || client.isDisconnected(); + } + + public boolean isHandshook() { + return client != null && client.isHandshook(); + } + + public long getLastReplayId(String topic) { + return replay.get(topic); + } + + private static String topicWithoutQueryString(String fullTopic) { + return fullTopic.split("\\?")[0]; + } + + private Future connect() { + logger.info("EmpConnector connecting"); + CompletableFuture future = new CompletableFuture<>(); + + try { + httpClient.start(); + } catch (Exception e) { + logger.error("Unable to start HTTP transport[{}]", parameters.endpoint(), e); + running.set(false); + future.complete(false); + return future; + } + + String bearerToken = bearerToken(); + + LongPollingTransport httpTransport = new LongPollingTransport(parameters.longPollingOptions(), httpClient) { + @Override + protected void customize(Request request) { + logger.info("httpTransport customize, request : {}",request); + request.header(AUTHORIZATION, bearerToken); + + } + }; + + + + client = new BayeuxClient(parameters.endpoint().toExternalForm(), httpTransport); + + client.addExtension(new ReplayExtension(replay)); + + addListeners(client); + + client.handshake(new MessageListener() { + + @Override + public void onMessage(Message m) { + logger.info("client.handshake : {}", m); + if (!m.isSuccessful()) { + Object error = m.get(ERROR); + if (error == null) { + error = m.get(FAILURE); + } + future.completeExceptionally(new ConnectException( + String.format("Cannot connect [%s] : %s", parameters.endpoint(), error))); + running.set(false); + } else { + subscriptions.forEach(SubscriptionImpl::subscribe); + future.complete(true); + } + + } + }); + + + return future; + } + + private void addListeners(BayeuxClient client) { + for (MessageListenerInfo info : listenerInfos) { + client.getChannel(info.getChannelName()).addListener(info.getMessageListener()); + } + } + + private String bearerToken() { + String bearerToken; + if (bearerTokenProvider != null) { + bearerToken = bearerTokenProvider.apply(reauthenticate.get()); + reauthenticate.compareAndSet(true, false); + } else { + bearerToken = parameters.bearerToken(); + } + + return bearerToken; + } + + private void reconnect() { + if (running.compareAndSet(false, true)) { + connect(); + } else { + logger.error("The current value of running is not as we expect, this means our reconnection may not happen"); + } + } + + /** + * Listens to /meta/connect channel messages and handles 401 errors, where + * client needs to reauthenticate. + */ + private class AuthFailureListener implements ClientSessionChannel.MessageListener { + private static final String ERROR_401 = "401"; + private static final String ERROR_403 = "403"; + + @Override + public void onMessage(ClientSessionChannel channel, Message message) { + if (!message.isSuccessful()) { + if (isError(message, ERROR_401) || isError(message, ERROR_403)) { + reauthenticate.set(true); + stop(); + reconnect(); + } + } + } + + private boolean isError(Message message, String errorCode) { + String error = (String) message.get(Message.ERROR_FIELD); + String failureReason = getFailureReason(message); + + return (error != null && error.startsWith(errorCode)) + || (failureReason != null && failureReason.startsWith(errorCode)); + } + + private String getFailureReason(Message message) { + String failureReason = null; + Map ext = message.getExt(); + if (ext != null) { + Map sfdc = (Map) ext.get("sfdc"); + if (sfdc != null) { + failureReason = (String) sfdc.get("failureReason"); + } + } + return failureReason; + } + } + + private static class MessageListenerInfo { + private String channelName; + private ClientSessionChannel.MessageListener messageListener; + + MessageListenerInfo(String channelName, ClientSessionChannel.MessageListener messageListener) { + this.channelName = channelName; + this.messageListener = messageListener; + } + + String getChannelName() { + return channelName; + } + + ClientSessionChannel.MessageListener getMessageListener() { + return messageListener; + } + } } diff --git a/src/main/java/com/salesforce/emp/connector/LoginHelper.java b/src/main/java/com/salesforce/emp/connector/LoginHelper.java index c7b64d7..24854d4 100644 --- a/src/main/java/com/salesforce/emp/connector/LoginHelper.java +++ b/src/main/java/com/salesforce/emp/connector/LoginHelper.java @@ -14,9 +14,13 @@ import javax.xml.parsers.SAXParserFactory; import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.Authentication; +import org.eclipse.jetty.client.api.AuthenticationStore; import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.util.ByteBufferContentProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.xml.sax.Attributes; import org.xml.sax.helpers.DefaultHandler; @@ -27,6 +31,8 @@ * @since API v37.0 */ public class LoginHelper { + private static final Logger logger = LoggerFactory.getLogger(LoginHelper.class); + private static class LoginResponseParser extends DefaultHandler { @@ -115,6 +121,12 @@ public static BayeuxParameters login(URL loginEndpoint, String username, String HttpClient client = new HttpClient(parameters.sslContextFactory()); try { client.getProxyConfiguration().getProxies().addAll(parameters.proxies()); + logger.info("proxyes : {}", parameters.proxies()); + AuthenticationStore authenticationStore = client.getAuthenticationStore(); + for (Authentication auth : parameters.authentications()) { + logger.info("proxy auth: {}", parameters.proxies()); + authenticationStore.addAuthentication(auth); + } client.start(); URL endpoint = new URL(loginEndpoint, getSoapUri()); Request post = client.POST(endpoint.toURI()); diff --git a/src/main/java/com/salesforce/emp/connector/ProxyBayeuxParameter.java b/src/main/java/com/salesforce/emp/connector/ProxyBayeuxParameter.java new file mode 100644 index 0000000..48cc788 --- /dev/null +++ b/src/main/java/com/salesforce/emp/connector/ProxyBayeuxParameter.java @@ -0,0 +1,107 @@ +package com.salesforce.emp.connector; + +import java.net.URL; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.client.ProxyConfiguration.Proxy; +import org.eclipse.jetty.client.api.Authentication; +import org.eclipse.jetty.util.ssl.SslContextFactory; + +import com.salesforce.emp.connector.BayeuxParameters; + +public class ProxyBayeuxParameter implements BayeuxParameters{ + private final BayeuxParameters parameters; + + private List proxies = new ArrayList<>(); + private List auths = new ArrayList<>(); + + public void addProxy( Proxy proxy ){ + proxies.add(proxy); + } + + public void addAuthentication( Authentication auth ){ + auths.add(auth); + } + + public ProxyBayeuxParameter() { + this(new BayeuxParameters() { + + @Override + public String bearerToken() { + throw new IllegalStateException("Have not authenticated"); + } + + @Override + public URL endpoint() { + throw new IllegalStateException("Have not established replay endpoint"); + } + }); + } + + public ProxyBayeuxParameter(BayeuxParameters parameters) { + this.parameters = parameters; + } + + + @Override + public String bearerToken() { + return parameters.bearerToken(); + } + + @Override + public URL endpoint() { + return parameters.endpoint(); + } + + @Override + public long keepAlive() { + return parameters.keepAlive(); + } + + @Override + public TimeUnit keepAliveUnit() { + return parameters.keepAliveUnit(); + } + + @Override + public Map longPollingOptions() { + return parameters.longPollingOptions(); + } + + @Override + public int maxBufferSize() { + return parameters.maxBufferSize(); + } + + @Override + public int maxNetworkDelay() { + return parameters.maxNetworkDelay(); + } + + @Override + public List proxies() { + return proxies; + } + + + @Override + public Collection authentications() { + + return auths; + } + + @Override + public SslContextFactory sslContextFactory() { + return parameters.sslContextFactory(); + } + + @Override + public String version() { + return parameters.version(); + } + +} diff --git a/src/main/java/com/salesforce/emp/connector/example/LoginProxyExample.java b/src/main/java/com/salesforce/emp/connector/example/LoginProxyExample.java new file mode 100644 index 0000000..788f466 --- /dev/null +++ b/src/main/java/com/salesforce/emp/connector/example/LoginProxyExample.java @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2016, salesforce.com, inc. + * All rights reserved. + * Licensed under the BSD 3-Clause license. + * For full license text, see LICENSE.TXT file in the repo root or https://opensource.org/licenses/BSD-3-Clause + */ +package com.salesforce.emp.connector.example; + +import static com.salesforce.emp.connector.LoginHelper.login; + +import java.net.URI; +import java.net.URL; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import org.eclipse.jetty.client.HttpProxy; +import org.eclipse.jetty.client.Origin.Address; +import org.eclipse.jetty.client.util.BasicAuthentication; +import org.eclipse.jetty.util.ajax.JSON; + +import com.salesforce.emp.connector.BayeuxParameters; +import com.salesforce.emp.connector.EmpConnector; +import com.salesforce.emp.connector.LoginHelper; +import com.salesforce.emp.connector.ProxyBayeuxParameter; +import com.salesforce.emp.connector.TopicSubscription; + +/** + * An example of using the EMP connector using login credentials + * + * @author hal.hildebrand + * @since API v37.0 + */ +public class LoginProxyExample { + public static void main(String[] argv) throws Exception { + System.out.print("argv size : " + argv.length); + + if (argv.length < 5) { + System.err.println( + "Usage: LoginExample username password topic loginUrl proxyProtocol proxyHost proxyPort [proxyUsername] [proxyPassword]"); + System.exit(1); + } + long replayFrom = EmpConnector.REPLAY_FROM_EARLIEST; + int count = 3; + String loginUrl = argv[count++]; + String proxyProtocol = argv[count++]; + String proxyHost = argv[count++]; + int proxyPort = Integer.valueOf(argv[count++]); + + String proxyUsername = ""; + String proxyPassword = ""; + if (argv.length > 6) { + proxyUsername = argv[count++]; + proxyPassword = argv[count++]; + } + + System.out.println(String.format( + "username : %s, password : %s, topic : %s, loginUrl : %s, proxyProtocol : %s, proxyHost : %s,proxyUsername : %s,proxyPassword : %s ", + argv[0], argv[1], argv[2],loginUrl, proxyProtocol, proxyHost, proxyUsername, proxyPassword)); + + + + Consumer> consumer = event -> System.out + .println(String.format("Received:\n%s", JSON.toString(event))); + + ProxyBayeuxParameter dbp = new ProxyBayeuxParameter(); + Address a = new Address(proxyHost, proxyPort); + HttpProxy p = new HttpProxy(a, proxyProtocol.equals("https")); + dbp.addProxy(p); + if (!proxyUsername.isEmpty()) { + BasicAuthentication auth = new BasicAuthentication( + new URI(String.format("%s://%s:%s", proxyProtocol, proxyHost, proxyPort)), "*", proxyUsername, + proxyPassword) { + @Override + public boolean matches(String type, URI uri, String realm) { + realm = "*"; + return super.matches(type, uri, realm); + } + + }; + dbp.addAuthentication(auth); + } + + BayeuxParameters params = LoginHelper.login(new URL(loginUrl), argv[0], argv[1],dbp); + + + EmpConnector connector = new EmpConnector(params); + + connector.start().get(5, TimeUnit.SECONDS); + + TopicSubscription subscription = connector.subscribe(argv[2], replayFrom, consumer).get(5, TimeUnit.SECONDS); + + System.out.println(String.format("Subscribed: %s", subscription)); + } +}