diff --git a/play-java-websocket-example/.gitignore b/play-java-websocket-example/.gitignore new file mode 100644 index 000000000..07f0e59c0 --- /dev/null +++ b/play-java-websocket-example/.gitignore @@ -0,0 +1,10 @@ +logs +target +build +/.idea +/.idea_modules +/.classpath +/.project +/.gradle +/.settings +/RUNNING_PID diff --git a/play-java-websocket-example/.mergify.yml b/play-java-websocket-example/.mergify.yml new file mode 100644 index 000000000..32f8689ae --- /dev/null +++ b/play-java-websocket-example/.mergify.yml @@ -0,0 +1,27 @@ +pull_request_rules: + - name: automatic merge on CI success require review + conditions: + - status-success=Travis CI - Pull Request + - "#approved-reviews-by>=1" + - "#changes-requested-reviews-by=0" + - label!=block-merge + actions: + merge: + method: squash + strict: smart + + - name: automatic merge on CI success for TemplateControl + conditions: + - status-success=Travis CI - Pull Request + - label=merge-when-green + - label!=block-merge + actions: + merge: + method: squash + strict: smart + + - name: delete branch after merge + conditions: + - merged + actions: + delete_head_branch: {} diff --git a/play-java-websocket-example/.travis.yml b/play-java-websocket-example/.travis.yml new file mode 100644 index 000000000..1e8c0e7c4 --- /dev/null +++ b/play-java-websocket-example/.travis.yml @@ -0,0 +1,50 @@ +language: scala +scala: + - 2.12.8 + +before_install: + - curl -sL https://github.com/shyiko/jabba/raw/master/install.sh | bash && . ~/.jabba/jabba.sh + +env: + global: + - JABBA_HOME=$HOME/.jabba + matrix: + # There is no concise way to specify multi-dimensional build matrix: + # https://github.com/travis-ci/travis-ci/issues/1519 + - SCRIPT=scripts/test-sbt TRAVIS_JDK=adopt@1.8.192-12 + - SCRIPT=scripts/test-sbt TRAVIS_JDK=adopt@1.11.0-1 + - SCRIPT=scripts/test-gradle TRAVIS_JDK=adopt@1.8.192-12 + - SCRIPT=scripts/test-gradle TRAVIS_JDK=adopt@1.11.0-1 + +# Exclude some combinations from build matrix. See: +# https://docs.travis-ci.com/user/customizing-the-build/#Build-Matrix +matrix: + fast_finish: true + allow_failures: + # Current release of Gradle still does not supports Play 2.7.x releases + # As soon as there is a release of Gradle that fixes that, we can then + # remove this allowed failure. + - env: SCRIPT=scripts/test-gradle TRAVIS_JDK=adopt@1.8.192-12 + - env: SCRIPT=scripts/test-gradle TRAVIS_JDK=adopt@1.11.0-1 + # Java 11 is still not fully supported. It is good that we are already + # testing our sample applications to better discover possible problems + # but we can allow failures here too. + - env: SCRIPT=scripts/test-sbt TRAVIS_JDK=adopt@1.11.0-1 + +install: + - $JABBA_HOME/bin/jabba install $TRAVIS_JDK + - unset _JAVA_OPTIONS + - export JAVA_HOME="$JABBA_HOME/jdk/$TRAVIS_JDK" && export PATH="$JAVA_HOME/bin:$PATH" && java -Xmx32m -version + +script: + - $SCRIPT + +before_cache: + - find $HOME/.ivy2 -name "ivydata-*.properties" -delete + - find $HOME/.sbt -name "*.lock" -delete + +cache: + directories: + - "$HOME/.ivy2/cache" + - "$HOME/.gradle/caches" + - "$HOME/.jabba/jdk" diff --git a/play-java-websocket-example/LICENSE b/play-java-websocket-example/LICENSE new file mode 100644 index 000000000..670154e35 --- /dev/null +++ b/play-java-websocket-example/LICENSE @@ -0,0 +1,116 @@ +CC0 1.0 Universal + +Statement of Purpose + +The laws of most jurisdictions throughout the world automatically confer +exclusive Copyright and Related Rights (defined below) upon the creator and +subsequent owner(s) (each and all, an "owner") of an original work of +authorship and/or a database (each, a "Work"). + +Certain owners wish to permanently relinquish those rights to a Work for the +purpose of contributing to a commons of creative, cultural and scientific +works ("Commons") that the public can reliably and without fear of later +claims of infringement build upon, modify, incorporate in other works, reuse +and redistribute as freely as possible in any form whatsoever and for any +purposes, including without limitation commercial purposes. These owners may +contribute to the Commons to promote the ideal of a free culture and the +further production of creative, cultural and scientific works, or to gain +reputation or greater distribution for their Work in part through the use and +efforts of others. + +For these and/or other purposes and motivations, and without any expectation +of additional consideration or compensation, the person associating CC0 with a +Work (the "Affirmer"), to the extent that he or she is an owner of Copyright +and Related Rights in the Work, voluntarily elects to apply CC0 to the Work +and publicly distribute the Work under its terms, with knowledge of his or her +Copyright and Related Rights in the Work and the meaning and intended legal +effect of CC0 on those rights. + +1. Copyright and Related Rights. A Work made available under CC0 may be +protected by copyright and related or neighboring rights ("Copyright and +Related Rights"). Copyright and Related Rights include, but are not limited +to, the following: + + i. the right to reproduce, adapt, distribute, perform, display, communicate, + and translate a Work; + + ii. moral rights retained by the original author(s) and/or performer(s); + + iii. publicity and privacy rights pertaining to a person's image or likeness + depicted in a Work; + + iv. rights protecting against unfair competition in regards to a Work, + subject to the limitations in paragraph 4(a), below; + + v. rights protecting the extraction, dissemination, use and reuse of data in + a Work; + + vi. database rights (such as those arising under Directive 96/9/EC of the + European Parliament and of the Council of 11 March 1996 on the legal + protection of databases, and under any national implementation thereof, + including any amended or successor version of such directive); and + + vii. other similar, equivalent or corresponding rights throughout the world + based on applicable law or treaty, and any national implementations thereof. + +2. Waiver. To the greatest extent permitted by, but not in contravention of, +applicable law, Affirmer hereby overtly, fully, permanently, irrevocably and +unconditionally waives, abandons, and surrenders all of Affirmer's Copyright +and Related Rights and associated claims and causes of action, whether now +known or unknown (including existing as well as future claims and causes of +action), in the Work (i) in all territories worldwide, (ii) for the maximum +duration provided by applicable law or treaty (including future time +extensions), (iii) in any current or future medium and for any number of +copies, and (iv) for any purpose whatsoever, including without limitation +commercial, advertising or promotional purposes (the "Waiver"). Affirmer makes +the Waiver for the benefit of each member of the public at large and to the +detriment of Affirmer's heirs and successors, fully intending that such Waiver +shall not be subject to revocation, rescission, cancellation, termination, or +any other legal or equitable action to disrupt the quiet enjoyment of the Work +by the public as contemplated by Affirmer's express Statement of Purpose. + +3. Public License Fallback. Should any part of the Waiver for any reason be +judged legally invalid or ineffective under applicable law, then the Waiver +shall be preserved to the maximum extent permitted taking into account +Affirmer's express Statement of Purpose. In addition, to the extent the Waiver +is so judged Affirmer hereby grants to each affected person a royalty-free, +non transferable, non sublicensable, non exclusive, irrevocable and +unconditional license to exercise Affirmer's Copyright and Related Rights in +the Work (i) in all territories worldwide, (ii) for the maximum duration +provided by applicable law or treaty (including future time extensions), (iii) +in any current or future medium and for any number of copies, and (iv) for any +purpose whatsoever, including without limitation commercial, advertising or +promotional purposes (the "License"). The License shall be deemed effective as +of the date CC0 was applied by Affirmer to the Work. Should any part of the +License for any reason be judged legally invalid or ineffective under +applicable law, such partial invalidity or ineffectiveness shall not +invalidate the remainder of the License, and in such case Affirmer hereby +affirms that he or she will not (i) exercise any of his or her remaining +Copyright and Related Rights in the Work or (ii) assert any associated claims +and causes of action with respect to the Work, in either case contrary to +Affirmer's express Statement of Purpose. + +4. Limitations and Disclaimers. + + a. No trademark or patent rights held by Affirmer are waived, abandoned, + surrendered, licensed or otherwise affected by this document. + + b. Affirmer offers the Work as-is and makes no representations or warranties + of any kind concerning the Work, express, implied, statutory or otherwise, + including without limitation warranties of title, merchantability, fitness + for a particular purpose, non infringement, or the absence of latent or + other defects, accuracy, or the present or absence of errors, whether or not + discoverable, all to the greatest extent permissible under applicable law. + + c. Affirmer disclaims responsibility for clearing rights of other persons + that may apply to the Work or any use thereof, including without limitation + any person's Copyright and Related Rights in the Work. Further, Affirmer + disclaims responsibility for obtaining any necessary consents, permissions + or other rights required for any use of the Work. + + d. Affirmer understands and acknowledges that Creative Commons is not a + party to this document and has no duty or obligation with respect to this + CC0 or use of the Work. + +For more information, please see + diff --git a/play-java-websocket-example/NOTICE b/play-java-websocket-example/NOTICE new file mode 100644 index 000000000..6d6c034d3 --- /dev/null +++ b/play-java-websocket-example/NOTICE @@ -0,0 +1,8 @@ +Written by Lightbend + +To the extent possible under law, the author(s) have dedicated all copyright and +related and neighboring rights to this software to the public domain worldwide. +This software is distributed without any warranty. + +You should have received a copy of the CC0 Public Domain Dedication along with +this software. If not, see . diff --git a/play-java-websocket-example/README.md b/play-java-websocket-example/README.md new file mode 100644 index 000000000..614de6b1d --- /dev/null +++ b/play-java-websocket-example/README.md @@ -0,0 +1,85 @@ +# play-websocket-java-example + +[![Build Status](https://travis-ci.org/playframework/play-java-websocket-example.svg?branch=2.6.x)](https://travis-ci.org/playframework/play-java-websocket-example) [![GitHub issues](https://img.shields.io/github/issues/playframework/play-websocket-java.svg?style=flat)](https://github.com/playframework/play-websocket-java/issues) [![GitHub forks](https://img.shields.io/github/forks/playframework/play-websocket-java.svg?style=flat)](https://github.com/playframework/play-websocket-java/network) [![GitHub stars](https://img.shields.io/github/stars/playframework/play-websocket-java.svg?style=flat)](https://github.com/playframework/play-websocket-java/stargazers) + +This is an example Play application that shows how to use Play's Websocket API in Java, by showing a series of stock tickers updated using WebSocket. + +The Websocket API is built on Akka Streams, and so is async, non-blocking, and backpressure aware. Using Akka Streams also means that interacting with Akka Actors is simple. + +There are also tests showing how Junit and Akka Testkit are used to test actors and flows. + +## Reactive Push + +This application uses a WebSocket to push data to the browser in real-time. To create a WebSocket connection in Play, first a route must be defined in the `routes` file. Here is the route which will be used to setup the WebSocket connection: + +```routes +GET /ws controllers.Application.ws +``` + +The `ws` method in the HomeController.java controller handles the request and does the protocol upgrade to the WebSocket connection. The `UserActor` stores the handle to the WebSocket connection. + +Once the `UserActor` is created, the default stocks (defined in `application.conf`) are added to the user's list of watched stocks. + +Each stock symbol has its own `StockActor` defined in StockActor.java. This actor holds the last 50 prices for the stock. Using a `FetchHistory` message the whole history can be retrieved. A `FetchLatest` message will generate a new price. Every `StockActor` sends itself a `FetchLatest` message every 75 milliseconds. Once a new price is generated it is added to the history and then a message is sent to each `UserActor` that is watching the stock. The `UserActor` then serializes the data as JSON and pushes it to the client using the WebSocket. + +Underneath the covers, resources (threads) are only allocated to the Actors and WebSockets when they are needed. This is why Reactive Push is scalable with Play and Akka. + +## Reactive UI - Real-time Chart + +On the client-side, a Reactive UI updates the stock charts every time a message is received. The `index.scala.html` file produces the web page at and loads the JavaScript and CSS needed render the page and setup the UI. + +The JavaScript for the page is compiled from the index.coffee file which is written in CoffeeScript (an elegant way to write JavaScript). Using jQuery, a page ready handler sets up the WebSocket connection and sets up functions which will be called when the server sends a message to the client through the WebSocket: + +```coffee +$ -> + ws = new WebSocket $("body").data("ws-url") + ws.onmessage = (event) -> + message = JSON.parse event.data +``` + +The message is parsed and depending on whether the message contains the stock history or a stock update, a stock chart is either created or updated. The charts are created using the **Flot** JavaScript charting library. Using CoffeeScript, jQuery, and Flot makes it easy to build Reactive UI in the browser that can receive WebSocket push events and update the UI in real-time. + +## Reactive Requests + +When a web server gets a request, it allocates a thread to handle the request and produce a response. In a typical model the thread is allocated for the entire duration of the request and response, even if the web request is waiting for some other resource. A Reactive Request is a typical web request and response, but handled in an asynchronous and non-blocking way on the server. This means that when the thread for a web request is not actively being used, it can be released and reused for something else. + +In the Reactive Stocks application the service which determines the stock sentiments is a Reactive Request. The route is defined in the `routes` file: + +```routes +GET /sentiment/:symbol controllers.StockSentiment.get(symbol) +``` + +A `GET` request to `/sentiment/GOOG` will call `get("GOOG")` on the StockSentiment.java controller. That method begins with: + +```scala +def get(symbol: String): Action[AnyContent] = Action.async { +``` + +The `async` block indicates that the controller will return a `Future[Result]` which is a handle to something that will produce a `Result` in the future. The `Future` provides a way to do asynchronous handling but doesn't necessarily have to be non-blocking. Often times web requests need to talk to other systems (databases, web services, etc). If a thread can't be deallocated while waiting for those other systems to respond, then it is blocking. + +In this case a request is made to Twitter and then for each tweet, another request is made to a sentiment service. All of these requests, including the request from the browser, are all handled as Reactive Requests so that the entire pipeline is Reactive (asynchronous and non-blocking). This is called Reactive Composition. + +## Reactive Composition + +Combining multiple Reactive Requests together is Reactive Composition. The StockSentiment controller does Reactive Composition since it receives a request, makes a request to Twitter for tweets about a stock, and then for each tweet it makes a request to a sentiment service. All of these requests are Reactive Requests. None use threads when they are waiting for a response. Scala's **for comprehensions** make it very easy and elegant to do Reactive Composition. The basic structure is: + +```scala +for { + tweets <- tweetsFuture + sentiments <- Future.sequence(futuresForTweetSentiment(tweets)) +} yield Ok(sentiments) +``` + +Because the web client library in Play, `WS`, is asynchronous and non-blocking, all of the requests needed to get a stock's sentiments are Reactive Requests. Combined together these Reactive Requests are Reactive Composition. + +## Reactive UI - Sentiments + +The client-side of Reactive Requests and Reactive Composition is no different than the non-Reactive model. The browser makes an Ajax request to the server and then calls a JavaScript function when it receives a response. In the Reactive Stocks application, when a stock chart is flipped over it makes the request for the stock's sentiments. That is done using jQuery's `ajax` method in the index.coffee file. When the request returns data the `success` handler updates the UI. + +## Further Learning + +For more information, please see the documentation for Websockets and Akka Streams: + +* +* +* diff --git a/play-java-websocket-example/app/Module.java b/play-java-websocket-example/app/Module.java new file mode 100644 index 000000000..348fef887 --- /dev/null +++ b/play-java-websocket-example/app/Module.java @@ -0,0 +1,13 @@ +import actors.*; +import com.google.inject.AbstractModule; +import play.libs.akka.AkkaGuiceSupport; + +@SuppressWarnings("unused") +public class Module extends AbstractModule implements AkkaGuiceSupport { + @Override + protected void configure() { + bindActor(StocksActor.class, "stocksActor"); + bindActor(UserParentActor.class, "userParentActor"); + bindActorFactory(UserActor.class, UserActor.Factory.class); + } +} diff --git a/play-java-websocket-example/app/actors/Messages.java b/play-java-websocket-example/app/actors/Messages.java new file mode 100644 index 000000000..fed81f212 --- /dev/null +++ b/play-java-websocket-example/app/actors/Messages.java @@ -0,0 +1,45 @@ +package actors; + +import stocks.Stock; + +import java.util.Set; + +import static java.util.Objects.requireNonNull; + +public final class Messages { + + public static final class WatchStocks { + final Set symbols; + + public WatchStocks(Set symbols) { + this.symbols = requireNonNull(symbols); + } + + @Override + public String toString() { + return "WatchStocks(" + symbols.toString() + ")"; + } + } + + public static final class UnwatchStocks { + final Set symbols; + + public UnwatchStocks(Set symbols) { + this.symbols = requireNonNull(symbols); + } + + @Override + public String toString() { + return "UnwatchStocks(" + symbols.toString() + ")"; + } + } + + public static class Stocks { + final Set stocks; + + public Stocks(Set stocks) { + this.stocks = requireNonNull(stocks); + } + } +} + diff --git a/play-java-websocket-example/app/actors/StocksActor.java b/play-java-websocket-example/app/actors/StocksActor.java new file mode 100644 index 000000000..d936bc40a --- /dev/null +++ b/play-java-websocket-example/app/actors/StocksActor.java @@ -0,0 +1,34 @@ +package actors; + +import akka.actor.AbstractActor; +import akka.event.Logging; +import akka.event.LoggingAdapter; +import stocks.Stock; + +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collector; +import java.util.stream.Collectors; + +/** + * This actor contains a set of stocks internally that may be used by + * all websocket clients. + */ +public class StocksActor extends AbstractActor { + + private final Map stocksMap = new HashMap<>(); + + private final LoggingAdapter log = Logging.getLogger(getContext().system(), this); + + @Override + public Receive createReceive() { + return receiveBuilder() + .match(Messages.WatchStocks.class, watchStocks -> { + Set stocks = watchStocks.symbols.stream() + .map(symbol -> stocksMap.compute(symbol, (k, v) -> new Stock(k))) + .collect(Collectors.toSet()); + sender().tell(new Messages.Stocks(stocks), self()); + }).build(); + } +} diff --git a/play-java-websocket-example/app/actors/UserActor.java b/play-java-websocket-example/app/actors/UserActor.java new file mode 100644 index 000000000..6e24d8461 --- /dev/null +++ b/play-java-websocket-example/app/actors/UserActor.java @@ -0,0 +1,166 @@ +package actors; + +import actors.Messages.Stocks; +import actors.Messages.UnwatchStocks; +import actors.Messages.WatchStocks; +import akka.Done; +import akka.NotUsed; +import akka.actor.AbstractActor; +import akka.actor.Actor; +import akka.actor.ActorRef; +import akka.event.Logging; +import akka.event.LoggingAdapter; +import akka.japi.Pair; +import akka.stream.KillSwitches; +import akka.stream.Materializer; +import akka.stream.UniqueKillSwitch; +import akka.stream.javadsl.*; +import akka.util.Timeout; +import com.fasterxml.jackson.databind.JsonNode; +import com.google.inject.assistedinject.Assisted; +import play.libs.Json; +import scala.concurrent.duration.Duration; +import stocks.Stock; + +import javax.inject.Inject; +import javax.inject.Named; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + +import static akka.pattern.PatternsCS.ask; + +/** + * The broker between the WebSocket and the StockActor(s). The UserActor holds the connection and sends serialized + * JSON data to the client. + */ +public class UserActor extends AbstractActor { + + private final Timeout timeout = new Timeout(Duration.create(5, TimeUnit.SECONDS)); + + private final LoggingAdapter logger = Logging.getLogger(getContext().system(), this); + + private final Map stocksMap = new HashMap<>(); + + private final String id; + private final ActorRef stocksActor; + private final Materializer mat; + + private final Sink hubSink; + private final Flow websocketFlow; + + @Inject + public UserActor(@Assisted String id, + @Named("stocksActor") ActorRef stocksActor, + Materializer mat) { + this.id = id; + this.stocksActor = stocksActor; + this.mat = mat; + + Pair, Source> sinkSourcePair = + MergeHub.of(JsonNode.class, 16) + .toMat(BroadcastHub.of(JsonNode.class, 256), Keep.both()) + .run(mat); + + this.hubSink = sinkSourcePair.first(); + Source hubSource = sinkSourcePair.second(); + + Sink> jsonSink = Sink.foreach((JsonNode json) -> { + // When the user types in a stock in the upper right corner, this is triggered, + String symbol = json.findPath("symbol").asText(); + addStocks(Collections.singleton(symbol)); + }); + + // Put the source and sink together to make a flow of hub source as output (aggregating all + // stocks as JSON to the browser) and the actor as the sink (receiving any JSON messages + // from the browse), using a coupled sink and source. + this.websocketFlow = Flow.fromSinkAndSourceCoupled(jsonSink, hubSource) + //.log("actorWebsocketFlow", logger) + .watchTermination((n, stage) -> { + // When the flow shuts down, make sure this actor also stops. + stage.thenAccept(f -> context().stop(self())); + return NotUsed.getInstance(); + }); + } + + /** + * The receive block, useful if other actors want to manipulate the flow. + */ + @Override + public Receive createReceive() { + return receiveBuilder() + .match(WatchStocks.class, watchStocks -> { + logger.info("Received message {}", watchStocks); + addStocks(watchStocks.symbols); + sender().tell(websocketFlow, self()); + }) + .match(UnwatchStocks.class, unwatchStocks -> { + logger.info("Received message {}", unwatchStocks); + unwatchStocks(unwatchStocks.symbols); + }).build(); + } + + /** + * Adds several stocks to the hub, by asking the stocks actor for stocks. + */ + private void addStocks(Set symbols) { + // Ask the stocksActor for a stream containing these stocks. + CompletionStage future = ask(stocksActor, new WatchStocks(symbols), timeout).thenApply(Stocks.class::cast); + + // when we get the response back, we want to turn that into a flow by creating a single + // source and a single sink, so we merge all of the stock sources together into one by + // pointing them to the hubSink, so we can add them dynamically even after the flow + // has started. + future.thenAccept((Stocks newStocks) -> { + newStocks.stocks.forEach(stock -> { + if (!stocksMap.containsKey(stock.symbol)) { + addStock(stock); + } + }); + }); + } + + /** + * Adds a single stock to the hub. + */ + private void addStock(Stock stock) { + logger.info("Adding stock {}", stock); + + // We convert everything to JsValue so we get a single stream for the websocket. + // Make sure the history gets written out before the updates for this stock... + final Source historySource = stock.history(50).map(Json::toJson); + final Source updateSource = stock.update().map(Json::toJson); + final Source stockSource = historySource.concat(updateSource); + + // Set up a flow that will let us pull out a killswitch for this specific stock, + // and automatic cleanup for very slow subscribers (where the browser has crashed, etc). + final Flow killswitchFlow = Flow.of(JsonNode.class) + .joinMat(KillSwitches.singleBidi(), Keep.right()); + // Set up a complete runnable graph from the stock source to the hub's sink + String name = "stock-" + stock.symbol + "-" + id; + final RunnableGraph graph = stockSource + .viaMat(killswitchFlow, Keep.right()) + .to(hubSink) + .named(name); + + // Start it up! + UniqueKillSwitch killSwitch = graph.run(mat); + + // Pull out the kill switch so we can stop it when we want to unwatch a stock. + stocksMap.put(stock.symbol, killSwitch); + } + + private void unwatchStocks(Set symbols) { + symbols.forEach(symbol -> { + stocksMap.get(symbol).shutdown(); + stocksMap.remove(symbol); + }); + } + + public interface Factory { + Actor create(String id); + } +} diff --git a/play-java-websocket-example/app/actors/UserParentActor.java b/play-java-websocket-example/app/actors/UserParentActor.java new file mode 100644 index 000000000..1ab85a920 --- /dev/null +++ b/play-java-websocket-example/app/actors/UserParentActor.java @@ -0,0 +1,49 @@ +package actors; + +import akka.actor.AbstractActor; +import akka.actor.ActorRef; +import akka.util.Timeout; +import com.typesafe.config.Config; +import play.libs.akka.InjectedActorSupport; + +import javax.inject.Inject; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + +import static akka.pattern.PatternsCS.ask; +import static akka.pattern.PatternsCS.pipe; + +public class UserParentActor extends AbstractActor implements InjectedActorSupport { + + private final Timeout timeout = new Timeout(2, TimeUnit.SECONDS); + private final Set defaultStocks; + + public static class Create { + final String id; + + public Create(String id) { + this.id = id; + } + } + + private final UserActor.Factory childFactory; + + @Inject + public UserParentActor(UserActor.Factory childFactory, Config config) { + this.childFactory = childFactory; + this.defaultStocks = new HashSet<>(config.getStringList("default.stocks")); + } + + @Override + public Receive createReceive() { + return receiveBuilder() + .match(UserParentActor.Create.class, create -> { + ActorRef child = injectedChild(() -> childFactory.create(create.id), "userActor-" + create.id); + CompletionStage future = ask(child, new Messages.WatchStocks(defaultStocks), timeout); + pipe(future, context().dispatcher()).to(sender()); + }).build(); + } + +} diff --git a/play-java-websocket-example/app/assets/javascripts/index.coffee b/play-java-websocket-example/app/assets/javascripts/index.coffee new file mode 100644 index 000000000..3a84024d7 --- /dev/null +++ b/play-java-websocket-example/app/assets/javascripts/index.coffee @@ -0,0 +1,100 @@ +$ -> + ws = new WebSocket $("body").data("ws-url") + ws.onmessage = (event) -> + message = JSON.parse event.data + switch message.type + when "stockhistory" + populateStockHistory(message) + when "stockupdate" + updateStockChart(message) + else + console.log(message) + + $("#addsymbolform").submit (event) -> + event.preventDefault() + # send the message to watch the stock + ws.send(JSON.stringify({symbol: $("#addsymboltext").val()})) + # reset the form + $("#addsymboltext").val("") + +getPricesFromArray = (data) -> + (v[1] for v in data) + +getChartArray = (data) -> + ([i, v] for v, i in data) + +getChartOptions = (data) -> + series: + shadowSize: 0 + yaxis: + min: getAxisMin(data) + max: getAxisMax(data) + xaxis: + show: false + +getAxisMin = (data) -> + Math.min.apply(Math, data) * 0.9 + +getAxisMax = (data) -> + Math.max.apply(Math, data) * 1.1 + +populateStockHistory = (message) -> + chart = $("
").addClass("chart").prop("id", message.symbol) + chartHolder = $("
").addClass("chart-holder").append(chart) + chartHolder.append($("

").text("values are simulated")) + detailsHolder = $("

").addClass("details-holder") + flipper = $("
").addClass("flipper").append(chartHolder).append(detailsHolder).attr("data-content", message.symbol) + flipContainer = $("
").addClass("flip-container").append(flipper).click (event) -> + handleFlip($(this)) + $("#stocks").prepend(flipContainer) + plot = chart.plot([getChartArray(message.history)], getChartOptions(message.history)).data("plot") + +updateStockChart = (message) -> + if ($("#" + message.symbol).size() > 0) + plot = $("#" + message.symbol).data("plot") + data = getPricesFromArray(plot.getData()[0].data) + data.shift() + data.push(message.price) + plot.setData([getChartArray(data)]) + # update the yaxes if either the min or max is now out of the acceptable range + yaxes = plot.getOptions().yaxes[0] + if ((getAxisMin(data) < yaxes.min) || (getAxisMax(data) > yaxes.max)) + # reseting yaxes + yaxes.min = getAxisMin(data) + yaxes.max = getAxisMax(data) + plot.setupGrid() + # redraw the chart + plot.draw() + +handleFlip = (container) -> + if (container.hasClass("flipped")) + container.removeClass("flipped") + container.find(".details-holder").empty() + else + container.addClass("flipped") + # fetch stock details and tweet + $.ajax + url: "/sentiment/" + container.children(".flipper").attr("data-content") + dataType: "json" + context: container + success: (data) -> + detailsHolder = $(this).find(".details-holder") + detailsHolder.empty() + switch data.label + when "pos" + detailsHolder.append($("

").text("The tweets say BUY!")) + detailsHolder.append($("").attr("src", "/assets/images/buy.png")) + when "neg" + detailsHolder.append($("

").text("The tweets say SELL!")) + detailsHolder.append($("").attr("src", "/assets/images/sell.png")) + else + detailsHolder.append($("

").text("The tweets say HOLD!")) + detailsHolder.append($("").attr("src", "/assets/images/hold.png")) + error: (jqXHR, textStatus, error) -> + detailsHolder = $(this).find(".details-holder") + detailsHolder.empty() + detailsHolder.append($("

").text("Error: " + JSON.parse(jqXHR.responseText).error)) + # display loading info + detailsHolder = container.find(".details-holder") + detailsHolder.append($("

").text("Determining whether you should buy or sell based on the sentiment of recent tweets...")) + detailsHolder.append($("
").addClass("progress progress-striped active").append($("
").addClass("bar").css("width", "100%"))) \ No newline at end of file diff --git a/play-java-websocket-example/app/assets/stylesheets/main.less b/play-java-websocket-example/app/assets/stylesheets/main.less new file mode 100644 index 000000000..7bfb923e6 --- /dev/null +++ b/play-java-websocket-example/app/assets/stylesheets/main.less @@ -0,0 +1,128 @@ +.perspective (@value) { + -webkit-perspective: @value; + -moz-perspective: @value; + perspective: @value; +} + +.transform (@value) { + -webkit-transform: rotateY(@value); + -moz-transform: rotateY(@value); + transform: rotateY(@value); +} + +.border-radius (@value) { + -webkit-border-radius: @value; + -moz-border-radius: @value; + border-radius: @value; +} + + +body { + margin-top: 50px; +} + +.flip-container { + .perspective(1000); + margin-bottom: 20px; + &:hover .flipper { + .transform(10deg); + } + &.flipped .flipper { + .transform(180deg); + } +} + +.flipper { + height: 250px; + + background-color: #fafafa; + border: 1px solid #ddd; + + .border-radius(4px); + + cursor: hand; + cursor: pointer; + + -webkit-transition: 0.6s; + -moz-transition: 0.6s; + transition: 0.6s; + + -webkit-transform-style: preserve-3d; + -moz-transform-style: preserve-3d; + transform-style: preserve-3d; + + &:after { + content: attr(data-content); + position: absolute; + top: -1px; + left: -1px; + padding: 3px 7px; + font-size: 12px; + font-weight: bold; + background-color: #ffffff; + border: 1px solid #ddd; + color: #9da0a4; + + .border-radius(4px 0 4px 0); + } +} + +.chart-holder, .details-holder { + position: absolute; + width: 100%; + height: 250px; + top: 0px; + left: 0px; + + -webkit-backface-visibility: hidden; + -moz-backface-visibility: hidden; + backface-visibility: hidden; +} + +.details-holder { + z-index: 1; + transform-style: preserve-3d; +} + +.chart-holder { + z-index: 2; + & p { + position: absolute; + bottom: 7px; + right: 20px; + font-size: 10px; + color: #aaaaaa; + font-style: italic; + } +} + +.details-holder { + .transform(180deg); + text-align: center; + & h4 { + padding: 20px; + } + & .progress { + padding: 20px; + background: none; + border: none; + + -webkit-box-shadow: none; + -moz-box-shadow: none; + box-shadow: none; + } + & img { + height: 128px; + width: 128px; + } +} + +.chart { + position: relative; + width: 920px; + height: 210px; + margin-top: 30px; + margin-bottom: 10px; + margin-left: 10px; + margin-right: 10px; +} diff --git a/play-java-websocket-example/app/controllers/HomeController.java b/play-java-websocket-example/app/controllers/HomeController.java new file mode 100644 index 000000000..a43bade47 --- /dev/null +++ b/play-java-websocket-example/app/controllers/HomeController.java @@ -0,0 +1,112 @@ +package controllers; + +import actors.UserParentActor; +import akka.NotUsed; +import akka.actor.ActorRef; +import akka.event.Logging; +import akka.event.LoggingAdapter; +import akka.stream.javadsl.Flow; +import akka.util.Timeout; +import com.fasterxml.jackson.databind.JsonNode; +import org.slf4j.Logger; +import play.libs.F.Either; +import play.mvc.*; +import scala.concurrent.duration.Duration; + +import javax.inject.Inject; +import javax.inject.Named; +import javax.inject.Singleton; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.TimeUnit; + +import static akka.pattern.PatternsCS.ask; + +import org.webjars.play.WebJarsUtil; + +/** + * The main web controller that handles returning the index page, setting up a WebSocket, and watching a stock. + */ +@Singleton +public class HomeController extends Controller { + + private final Timeout t = new Timeout(Duration.create(1, TimeUnit.SECONDS)); + private final Logger logger = org.slf4j.LoggerFactory.getLogger("controllers.HomeController"); + private final ActorRef userParentActor; + + private WebJarsUtil webJarsUtil; + + @Inject + public HomeController(@Named("userParentActor") ActorRef userParentActor, WebJarsUtil webJarsUtil) { + this.userParentActor = userParentActor; + this.webJarsUtil = webJarsUtil; + } + + public Result index() { + return ok(views.html.index.render(request(), webJarsUtil)); + } + + public WebSocket ws() { + return WebSocket.Json.acceptOrResult(request -> { + if (sameOriginCheck(request)) { + final CompletionStage> future = wsFutureFlow(request); + final CompletionStage>> stage = future.thenApply(Either::Right); + return stage.exceptionally(this::logException); + } else { + return forbiddenResult(); + } + }); + } + + @SuppressWarnings("unchecked") + private CompletionStage> wsFutureFlow(Http.RequestHeader request) { + long id = request.asScala().id(); + UserParentActor.Create create = new UserParentActor.Create(Long.toString(id)); + + return ask(userParentActor, create, t).thenApply((Object flow) -> { + final Flow f = (Flow) flow; + return f.named("websocket"); + }); + } + + private CompletionStage>> forbiddenResult() { + final Result forbidden = Results.forbidden("forbidden"); + final Either> left = Either.Left(forbidden); + + return CompletableFuture.completedFuture(left); + } + + private Either> logException(Throwable throwable) { + logger.error("Cannot create websocket", throwable); + Result result = Results.internalServerError("error"); + return Either.Left(result); + } + + /** + * Checks that the WebSocket comes from the same origin. This is necessary to protect + * against Cross-Site WebSocket Hijacking as WebSocket does not implement Same Origin Policy. + *

+ * See https://tools.ietf.org/html/rfc6455#section-1.3 and + * http://blog.dewhurstsecurity.com/2013/08/30/security-testing-html5-websockets.html + */ + private boolean sameOriginCheck(Http.RequestHeader rh) { + final Optional origin = rh.header("Origin"); + + if (! origin.isPresent()) { + logger.error("originCheck: rejecting request because no Origin header found"); + return false; + } else if (originMatches(origin.get())) { + logger.debug("originCheck: originValue = " + origin); + return true; + } else { + logger.error("originCheck: rejecting request because Origin header value " + origin + " is not in the same origin"); + return false; + } + } + + private boolean originMatches(String origin) { + return origin.contains("localhost:9000") || origin.contains("localhost:19001"); + } + +} diff --git a/play-java-websocket-example/app/controllers/StockSentiment.java b/play-java-websocket-example/app/controllers/StockSentiment.java new file mode 100644 index 000000000..65ad9e7e0 --- /dev/null +++ b/play-java-websocket-example/app/controllers/StockSentiment.java @@ -0,0 +1,104 @@ +package controllers; + +import com.fasterxml.jackson.databind.JsonNode; +import com.typesafe.config.Config; +import play.libs.Json; +import play.libs.concurrent.Futures; +import play.libs.concurrent.HttpExecutionContext; +import play.libs.ws.WSClient; +import play.libs.ws.WSResponse; +import play.mvc.Controller; +import play.mvc.Http; +import play.mvc.Result; +import play.mvc.Results; + +import javax.inject.Inject; +import javax.inject.Singleton; +import java.util.List; +import java.util.concurrent.CompletionStage; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.averagingDouble; +import static java.util.stream.Collectors.toList; +import static java.util.stream.StreamSupport.stream; + +@Singleton +public class StockSentiment extends Controller { + + private final String sentimentUrl; + private final String tweetUrl; + private final WSClient wsClient; + private final HttpExecutionContext ec; + + @Inject + public StockSentiment(WSClient wsClient, Config configuration, HttpExecutionContext ec) { + this.wsClient = wsClient; + this.ec = ec; + this.sentimentUrl = configuration.getString("sentiment.url"); + this.tweetUrl = configuration.getString("tweet.url"); + } + + public CompletionStage get(String symbol) { + return fetchTweets(symbol) + .thenComposeAsync(this::fetchSentiments) + .thenApplyAsync(this::averageSentiment) + .thenApplyAsync(Results::ok) + .exceptionally(this::errorResponse); + } + + private CompletionStage> fetchTweets(String symbol) { + final CompletionStage futureResponse = wsClient.url(tweetUrl) + .addQueryParameter("q", "$" + symbol) + .get(); + + final CompletionStage filter = futureResponse.thenApplyAsync(response -> { + if (response.getStatus() == Http.Status.OK) { + return response; + } else { + return null; + } + }, ec.current()); + + return filter.thenApplyAsync(response -> { + final List statuses = stream(response.asJson().findPath("statuses").spliterator(), false) + .map(s -> s.findValue("text").asText()) + .collect(Collectors.toList()); + return statuses; + }); + } + + private CompletionStage> fetchSentiments(List tweets) { + Stream> sentiments = tweets.stream().map(text -> { + return wsClient.url(sentimentUrl).post("text=" + text); + }); + return Futures.sequence(sentiments::iterator).thenApplyAsync(this::responsesAsJson); + } + + private List responsesAsJson(List responses) { + return responses.stream().map(WSResponse::asJson).collect(toList()); + } + + private JsonNode averageSentiment(List sentiments) { + double neg = collectAverage(sentiments, "neg"); + double neutral = collectAverage(sentiments, "neutral"); + double pos = collectAverage(sentiments, "pos"); + + String label = (neutral > 0.5) ? "neutral" : (neg > pos) ? "neg" : "pos"; + + return Json.newObject() + .put("label", label) + .set("probability", Json.newObject() + .put("neg", neg) + .put("neutral", neutral) + .put("pos", pos)); + } + + private double collectAverage(List jsons, String label) { + return jsons.stream().collect(averagingDouble(json -> json.findValue(label).asDouble())); + } + + private Result errorResponse(Throwable ignored) { + return internalServerError(Json.newObject().put("error", "Could not fetch the tweets")); + } +} diff --git a/play-java-websocket-example/app/stocks/FakeStockQuoteGenerator.java b/play-java-websocket-example/app/stocks/FakeStockQuoteGenerator.java new file mode 100644 index 000000000..664c3d920 --- /dev/null +++ b/play-java-websocket-example/app/stocks/FakeStockQuoteGenerator.java @@ -0,0 +1,26 @@ +package stocks; + +import java.util.concurrent.ThreadLocalRandom; + +public class FakeStockQuoteGenerator implements StockQuoteGenerator { + + private final String symbol; + + public FakeStockQuoteGenerator(String symbol) { + this.symbol = symbol; + } + + private Double random() { + return ThreadLocalRandom.current().nextDouble(); + } + + @Override + public StockQuote newQuote(StockQuote last) { + return new StockQuote(last.symbol, last.price * (0.95 + (0.1 * random()))); + } + + @Override + public StockQuote seed() { + return new StockQuote(symbol, random() * 800); + } +} diff --git a/play-java-websocket-example/app/stocks/Stock.java b/play-java-websocket-example/app/stocks/Stock.java new file mode 100644 index 000000000..87e57f30d --- /dev/null +++ b/play-java-websocket-example/app/stocks/Stock.java @@ -0,0 +1,59 @@ +package stocks; + +import akka.NotUsed; +import akka.japi.Pair; +import akka.japi.function.Function; +import akka.stream.ThrottleMode; +import akka.stream.javadsl.Source; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static java.util.Objects.requireNonNull; + +/** + * A stock is a source of stock quotes and a symbol. + */ +public class Stock { + public final String symbol; + + private final StockQuoteGenerator stockQuoteGenerator; + + private final Source source; + + private static final FiniteDuration duration = Duration.create(75, TimeUnit.MILLISECONDS); + + public Stock(String symbol) { + this.symbol = requireNonNull(symbol); + stockQuoteGenerator = new FakeStockQuoteGenerator(symbol); + source = Source.unfold(stockQuoteGenerator.seed(), (Function>>) last -> { + StockQuote next = stockQuoteGenerator.newQuote(last); + return Optional.of(Pair.apply(next, next)); + }); + } + + /** + * Returns a source of stock history, containing a single element. + */ + public Source history(int n) { + return source.grouped(n) + .map(quotes -> new StockHistory(symbol, quotes.stream().map(sq -> sq.price).collect(Collectors.toList()))) + .take(1); + } + + /** + * Provides a source that returns a stock quote every 75 milliseconds. + */ + public Source update() { + return source.throttle(1, duration, 1, ThrottleMode.shaping()) + .map(sq -> new StockUpdate(sq.symbol, sq.price)); + } + + @Override + public String toString() { + return "Stock(" + symbol + ")"; + } +} diff --git a/play-java-websocket-example/app/stocks/StockHistory.java b/play-java-websocket-example/app/stocks/StockHistory.java new file mode 100644 index 000000000..da6b488ae --- /dev/null +++ b/play-java-websocket-example/app/stocks/StockHistory.java @@ -0,0 +1,28 @@ +package stocks; + +import java.util.List; + +import static java.util.Objects.requireNonNull; + +/** A JSON presentation class for stock history. */ +public class StockHistory { + private final String symbol; + private final List prices; + + public StockHistory(String symbol, List prices) { + this.symbol = requireNonNull(symbol); + this.prices = requireNonNull(prices); + } + + public String getType() { + return "stockhistory"; + } + + public String getSymbol() { + return symbol; + } + + public List getHistory() { + return prices; + } +} diff --git a/play-java-websocket-example/app/stocks/StockQuote.java b/play-java-websocket-example/app/stocks/StockQuote.java new file mode 100644 index 000000000..6cd2eff9b --- /dev/null +++ b/play-java-websocket-example/app/stocks/StockQuote.java @@ -0,0 +1,15 @@ +package stocks; + +import java.util.Objects; + +import static java.util.Objects.requireNonNull; + +public class StockQuote { + public final String symbol; + public final Double price; + + public StockQuote(String symbol, Double price) { + this.symbol = requireNonNull(symbol); + this.price = requireNonNull(price); + } +} diff --git a/play-java-websocket-example/app/stocks/StockQuoteGenerator.java b/play-java-websocket-example/app/stocks/StockQuoteGenerator.java new file mode 100644 index 000000000..cb5b7092e --- /dev/null +++ b/play-java-websocket-example/app/stocks/StockQuoteGenerator.java @@ -0,0 +1,7 @@ +package stocks; + +public interface StockQuoteGenerator { + StockQuote newQuote(StockQuote last); + + StockQuote seed(); +} diff --git a/play-java-websocket-example/app/stocks/StockUpdate.java b/play-java-websocket-example/app/stocks/StockUpdate.java new file mode 100644 index 000000000..db5ab69f7 --- /dev/null +++ b/play-java-websocket-example/app/stocks/StockUpdate.java @@ -0,0 +1,28 @@ +package stocks; + +import com.fasterxml.jackson.annotation.JsonTypeInfo; + +import static java.util.Objects.requireNonNull; + +/** A JSON presentation class for stock updates. */ +public class StockUpdate { + private final String symbol; + private final Double price; + + public StockUpdate(String symbol, Double price) { + this.symbol = requireNonNull(symbol); + this.price = requireNonNull(price); + } + + public String getType() { + return "stockupdate"; + } + + public Double getPrice() { + return price; + } + + public String getSymbol() { + return symbol; + } +} diff --git a/play-java-websocket-example/app/views/index.scala.html b/play-java-websocket-example/app/views/index.scala.html new file mode 100644 index 000000000..61304d14f --- /dev/null +++ b/play-java-websocket-example/app/views/index.scala.html @@ -0,0 +1,34 @@ +@(request: play.mvc.Http.Request, webJarsUtil: org.webjars.play.WebJarsUtil) + + + + + Reactive Stock News Dashboard + + + + @Html(webJarsUtil.css("bootstrap.min.css")) + + + @Html(webJarsUtil.script("jquery.min.js")) + @Html(webJarsUtil.script("jquery.flot.js")) + + + +

+ +
+ +
+ + diff --git a/play-java-websocket-example/build.gradle b/play-java-websocket-example/build.gradle new file mode 100644 index 000000000..7222c4108 --- /dev/null +++ b/play-java-websocket-example/build.gradle @@ -0,0 +1,49 @@ +plugins { + id 'play' + id 'idea' +} + +def playVersion = "2.6.21" +def scalaVersion = System.getProperty("scala.binary.version", /* default = */ "2.12") + +model { + components { + play { + platform play: playVersion, scala: scalaVersion, java: '1.8' + injectedRoutesGenerator = true + + sources { + twirlTemplates { + defaultImports = TwirlImports.JAVA + } + } + } + } +} + +dependencies { + play "com.typesafe.play:play-guice_$scalaVersion:$playVersion" + play "com.typesafe.play:play-logback_$scalaVersion:$playVersion" + play "com.typesafe.play:play-ahc-ws_$scalaVersion:$playVersion" + + play "org.webjars:webjars-play_$scalaVersion:2.6.2 + + play "org.webjars:bootstrap:2.3.2" + play "org.webjars:flot:0.8.3" + + playTest "org.assertj:assertj-core:3.8.0" + playTest "org.awaitility:awaitility:3.0.0" +} + +repositories { + jcenter() + maven { + name "lightbend-maven-releases" + url "https://repo.lightbend.com/lightbend/maven-release" + } + ivy { + name "lightbend-ivy-release" + url "https://repo.lightbend.com/lightbend/ivy-releases" + layout "ivy" + } +} diff --git a/play-java-websocket-example/build.sbt b/play-java-websocket-example/build.sbt new file mode 100644 index 000000000..725c313e1 --- /dev/null +++ b/play-java-websocket-example/build.sbt @@ -0,0 +1,26 @@ +name := "play-java-websocket-example" + +version := "1.0" + +scalaVersion := "2.12.8" + +crossScalaVersions := Seq("2.11.12", "2.12.4") + +// https://github.com/sbt/junit-interface +testOptions += Tests.Argument(TestFrameworks.JUnit, "-a", "-v") + +javacOptions ++= Seq("-source", "1.8", "-target", "1.8", "-deprecation", "-Xlint") + +lazy val root = (project in file(".")).enablePlugins(PlayJava) + +libraryDependencies += guice +libraryDependencies += ws +libraryDependencies += "org.webjars" %% "webjars-play" % "2.6.2" +libraryDependencies += "org.webjars" % "bootstrap" % "2.3.2" +libraryDependencies += "org.webjars" % "flot" % "0.8.3" + +// Testing libraries for dealing with CompletionStage... +libraryDependencies += "org.assertj" % "assertj-core" % "3.8.0" % Test +libraryDependencies += "org.awaitility" % "awaitility" % "3.0.0" % Test + +LessKeys.compress := true diff --git a/play-java-websocket-example/conf/application.conf b/play-java-websocket-example/conf/application.conf new file mode 100644 index 000000000..fdb255ca4 --- /dev/null +++ b/play-java-websocket-example/conf/application.conf @@ -0,0 +1,33 @@ +# This is the main configuration file for the application. +# ~~~~~ + +# Uncomment this for the most verbose Akka debugging: +akka { + loggers = ["akka.event.slf4j.Slf4jLogger"] + loglevel = "INFO" + logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" + #actor { + # debug { + # receive = on + # autoreceive = on + # lifecycle = on + # } + #} +} + +# https://www.playframework.com/documentation/latest/SecurityHeaders +# Connect to localhost:9000 for content security policy on websockets +play.filters.headers { + contentSecurityPolicy = "connect-src 'self' ws://localhost:9000" +} + +# https://www.playframework.com/documentation/latest/AllowedHostsFilter +# Allow requests to localhost:9000. +play.filters.hosts { + allowed = ["localhost:9000"] +} + +default.stocks=["GOOG", "AAPL", "ORCL"] + +sentiment.url="http://text-processing.com/api/sentiment/" +tweet.url="http://twitter-search-proxy.herokuapp.com/search/tweets" diff --git a/play-java-websocket-example/conf/logback.xml b/play-java-websocket-example/conf/logback.xml new file mode 100644 index 000000000..a0f93c444 --- /dev/null +++ b/play-java-websocket-example/conf/logback.xml @@ -0,0 +1,31 @@ + + + + + + logs/application.log + + %date [%level] from %logger in %thread - %message%n%xException + + + + + + %coloredLevel %logger{15} - %message%n%xException{10} + + + + + + + + + + + + + + + + + diff --git a/play-java-websocket-example/conf/routes b/play-java-websocket-example/conf/routes new file mode 100644 index 000000000..13e0174c0 --- /dev/null +++ b/play-java-websocket-example/conf/routes @@ -0,0 +1,12 @@ +# Routes +# This file defines all application routes (Higher priority routes first) +# ~~~~ + +GET / controllers.HomeController.index +GET /ws controllers.HomeController.ws +GET /sentiment/:symbol controllers.StockSentiment.get(symbol) + +# Map static resources from the /public folder to the /assets URL path +GET /assets/*file controllers.Assets.at(path="/public", file) + +-> /webjars webjars.Routes diff --git a/play-java-websocket-example/gradle/wrapper/gradle-wrapper.jar b/play-java-websocket-example/gradle/wrapper/gradle-wrapper.jar new file mode 100644 index 000000000..01b8bf6b1 Binary files /dev/null and b/play-java-websocket-example/gradle/wrapper/gradle-wrapper.jar differ diff --git a/play-java-websocket-example/gradle/wrapper/gradle-wrapper.properties b/play-java-websocket-example/gradle/wrapper/gradle-wrapper.properties new file mode 100644 index 000000000..89dba2d9d --- /dev/null +++ b/play-java-websocket-example/gradle/wrapper/gradle-wrapper.properties @@ -0,0 +1,5 @@ +distributionUrl=https\://services.gradle.org/distributions/gradle-4.9-bin.zip +distributionBase=GRADLE_USER_HOME +distributionPath=wrapper/dists +zipStorePath=wrapper/dists +zipStoreBase=GRADLE_USER_HOME diff --git a/play-java-websocket-example/gradlew b/play-java-websocket-example/gradlew new file mode 100755 index 000000000..cccdd3d51 --- /dev/null +++ b/play-java-websocket-example/gradlew @@ -0,0 +1,172 @@ +#!/usr/bin/env sh + +############################################################################## +## +## Gradle start up script for UN*X +## +############################################################################## + +# Attempt to set APP_HOME +# Resolve links: $0 may be a link +PRG="$0" +# Need this for relative symlinks. +while [ -h "$PRG" ] ; do + ls=`ls -ld "$PRG"` + link=`expr "$ls" : '.*-> \(.*\)$'` + if expr "$link" : '/.*' > /dev/null; then + PRG="$link" + else + PRG=`dirname "$PRG"`"/$link" + fi +done +SAVED="`pwd`" +cd "`dirname \"$PRG\"`/" >/dev/null +APP_HOME="`pwd -P`" +cd "$SAVED" >/dev/null + +APP_NAME="Gradle" +APP_BASE_NAME=`basename "$0"` + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS="" + +# Use the maximum available, or set MAX_FD != -1 to use that value. +MAX_FD="maximum" + +warn () { + echo "$*" +} + +die () { + echo + echo "$*" + echo + exit 1 +} + +# OS specific support (must be 'true' or 'false'). +cygwin=false +msys=false +darwin=false +nonstop=false +case "`uname`" in + CYGWIN* ) + cygwin=true + ;; + Darwin* ) + darwin=true + ;; + MINGW* ) + msys=true + ;; + NONSTOP* ) + nonstop=true + ;; +esac + +CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar + +# Determine the Java command to use to start the JVM. +if [ -n "$JAVA_HOME" ] ; then + if [ -x "$JAVA_HOME/jre/sh/java" ] ; then + # IBM's JDK on AIX uses strange locations for the executables + JAVACMD="$JAVA_HOME/jre/sh/java" + else + JAVACMD="$JAVA_HOME/bin/java" + fi + if [ ! -x "$JAVACMD" ] ; then + die "ERROR: JAVA_HOME is set to an invalid directory: $JAVA_HOME + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." + fi +else + JAVACMD="java" + which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + +Please set the JAVA_HOME variable in your environment to match the +location of your Java installation." +fi + +# Increase the maximum file descriptors if we can. +if [ "$cygwin" = "false" -a "$darwin" = "false" -a "$nonstop" = "false" ] ; then + MAX_FD_LIMIT=`ulimit -H -n` + if [ $? -eq 0 ] ; then + if [ "$MAX_FD" = "maximum" -o "$MAX_FD" = "max" ] ; then + MAX_FD="$MAX_FD_LIMIT" + fi + ulimit -n $MAX_FD + if [ $? -ne 0 ] ; then + warn "Could not set maximum file descriptor limit: $MAX_FD" + fi + else + warn "Could not query maximum file descriptor limit: $MAX_FD_LIMIT" + fi +fi + +# For Darwin, add options to specify how the application appears in the dock +if $darwin; then + GRADLE_OPTS="$GRADLE_OPTS \"-Xdock:name=$APP_NAME\" \"-Xdock:icon=$APP_HOME/media/gradle.icns\"" +fi + +# For Cygwin, switch paths to Windows format before running java +if $cygwin ; then + APP_HOME=`cygpath --path --mixed "$APP_HOME"` + CLASSPATH=`cygpath --path --mixed "$CLASSPATH"` + JAVACMD=`cygpath --unix "$JAVACMD"` + + # We build the pattern for arguments to be converted via cygpath + ROOTDIRSRAW=`find -L / -maxdepth 1 -mindepth 1 -type d 2>/dev/null` + SEP="" + for dir in $ROOTDIRSRAW ; do + ROOTDIRS="$ROOTDIRS$SEP$dir" + SEP="|" + done + OURCYGPATTERN="(^($ROOTDIRS))" + # Add a user-defined pattern to the cygpath arguments + if [ "$GRADLE_CYGPATTERN" != "" ] ; then + OURCYGPATTERN="$OURCYGPATTERN|($GRADLE_CYGPATTERN)" + fi + # Now convert the arguments - kludge to limit ourselves to /bin/sh + i=0 + for arg in "$@" ; do + CHECK=`echo "$arg"|egrep -c "$OURCYGPATTERN" -` + CHECK2=`echo "$arg"|egrep -c "^-"` ### Determine if an option + + if [ $CHECK -ne 0 ] && [ $CHECK2 -eq 0 ] ; then ### Added a condition + eval `echo args$i`=`cygpath --path --ignore --mixed "$arg"` + else + eval `echo args$i`="\"$arg\"" + fi + i=$((i+1)) + done + case $i in + (0) set -- ;; + (1) set -- "$args0" ;; + (2) set -- "$args0" "$args1" ;; + (3) set -- "$args0" "$args1" "$args2" ;; + (4) set -- "$args0" "$args1" "$args2" "$args3" ;; + (5) set -- "$args0" "$args1" "$args2" "$args3" "$args4" ;; + (6) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" ;; + (7) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" ;; + (8) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" ;; + (9) set -- "$args0" "$args1" "$args2" "$args3" "$args4" "$args5" "$args6" "$args7" "$args8" ;; + esac +fi + +# Escape application args +save () { + for i do printf %s\\n "$i" | sed "s/'/'\\\\''/g;1s/^/'/;\$s/\$/' \\\\/" ; done + echo " " +} +APP_ARGS=$(save "$@") + +# Collect all arguments for the java command, following the shell quoting and substitution rules +eval set -- $DEFAULT_JVM_OPTS $JAVA_OPTS $GRADLE_OPTS "\"-Dorg.gradle.appname=$APP_BASE_NAME\"" -classpath "\"$CLASSPATH\"" org.gradle.wrapper.GradleWrapperMain "$APP_ARGS" + +# by default we should be in the correct project dir, but when run from Finder on Mac, the cwd is wrong +if [ "$(uname)" = "Darwin" ] && [ "$HOME" = "$PWD" ]; then + cd "$(dirname "$0")" +fi + +exec "$JAVACMD" "$@" diff --git a/play-java-websocket-example/gradlew.bat b/play-java-websocket-example/gradlew.bat new file mode 100644 index 000000000..e95643d6a --- /dev/null +++ b/play-java-websocket-example/gradlew.bat @@ -0,0 +1,84 @@ +@if "%DEBUG%" == "" @echo off +@rem ########################################################################## +@rem +@rem Gradle startup script for Windows +@rem +@rem ########################################################################## + +@rem Set local scope for the variables with windows NT shell +if "%OS%"=="Windows_NT" setlocal + +set DIRNAME=%~dp0 +if "%DIRNAME%" == "" set DIRNAME=. +set APP_BASE_NAME=%~n0 +set APP_HOME=%DIRNAME% + +@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +set DEFAULT_JVM_OPTS= + +@rem Find java.exe +if defined JAVA_HOME goto findJavaFromJavaHome + +set JAVA_EXE=java.exe +%JAVA_EXE% -version >NUL 2>&1 +if "%ERRORLEVEL%" == "0" goto init + +echo. +echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:findJavaFromJavaHome +set JAVA_HOME=%JAVA_HOME:"=% +set JAVA_EXE=%JAVA_HOME%/bin/java.exe + +if exist "%JAVA_EXE%" goto init + +echo. +echo ERROR: JAVA_HOME is set to an invalid directory: %JAVA_HOME% +echo. +echo Please set the JAVA_HOME variable in your environment to match the +echo location of your Java installation. + +goto fail + +:init +@rem Get command-line arguments, handling Windows variants + +if not "%OS%" == "Windows_NT" goto win9xME_args + +:win9xME_args +@rem Slurp the command line arguments. +set CMD_LINE_ARGS= +set _SKIP=2 + +:win9xME_args_slurp +if "x%~1" == "x" goto execute + +set CMD_LINE_ARGS=%* + +:execute +@rem Setup the command line + +set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar + +@rem Execute Gradle +"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS% + +:end +@rem End local scope for the variables with windows NT shell +if "%ERRORLEVEL%"=="0" goto mainEnd + +:fail +rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of +rem the _cmd.exe /c_ return code! +if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 +exit /b 1 + +:mainEnd +if "%OS%"=="Windows_NT" endlocal + +:omega diff --git a/play-java-websocket-example/project/build.properties b/play-java-websocket-example/project/build.properties new file mode 100644 index 000000000..c0bab0494 --- /dev/null +++ b/play-java-websocket-example/project/build.properties @@ -0,0 +1 @@ +sbt.version=1.2.8 diff --git a/play-java-websocket-example/project/plugins.sbt b/play-java-websocket-example/project/plugins.sbt new file mode 100644 index 000000000..6fb1c18af --- /dev/null +++ b/play-java-websocket-example/project/plugins.sbt @@ -0,0 +1,6 @@ +addSbtPlugin("com.typesafe.play" % "sbt-plugin" % "2.6.21") + +addSbtPlugin("com.typesafe.sbt" % "sbt-less" % "1.1.2") + +addSbtPlugin("com.typesafe.sbt" % "sbt-coffeescript" % "1.0.2") + diff --git a/play-java-websocket-example/public/images/buy.png b/play-java-websocket-example/public/images/buy.png new file mode 100644 index 000000000..ccb20e581 Binary files /dev/null and b/play-java-websocket-example/public/images/buy.png differ diff --git a/play-java-websocket-example/public/images/favicon.png b/play-java-websocket-example/public/images/favicon.png new file mode 100644 index 000000000..c7d92d2ae Binary files /dev/null and b/play-java-websocket-example/public/images/favicon.png differ diff --git a/play-java-websocket-example/public/images/hold.png b/play-java-websocket-example/public/images/hold.png new file mode 100644 index 000000000..2645b27ab Binary files /dev/null and b/play-java-websocket-example/public/images/hold.png differ diff --git a/play-java-websocket-example/public/images/sell.png b/play-java-websocket-example/public/images/sell.png new file mode 100644 index 000000000..294cc20fd Binary files /dev/null and b/play-java-websocket-example/public/images/sell.png differ diff --git a/play-java-websocket-example/scripts/script-helper b/play-java-websocket-example/scripts/script-helper new file mode 100644 index 000000000..9a2faa643 --- /dev/null +++ b/play-java-websocket-example/scripts/script-helper @@ -0,0 +1,13 @@ +#!/usr/bin/env bash + +set -e +set -o pipefail + +java_version=$(java -version 2>&1 | java -version 2>&1 | awk -F '"' '/version/ {print $2}') + +if [[ $java_version = 1.8* ]] ; then + echo "The build is using Java 8 ($java_version). No addional JVM params needed." +else + echo "The build is using Java 9+ ($java_version). We need additional JVM parameters" + export _JAVA_OPTIONS="$_JAVA_OPTIONS --add-modules=java.xml.bind" +fi diff --git a/play-java-websocket-example/scripts/test-gradle b/play-java-websocket-example/scripts/test-gradle new file mode 100755 index 000000000..84a051a20 --- /dev/null +++ b/play-java-websocket-example/scripts/test-gradle @@ -0,0 +1,13 @@ +#!/usr/bin/env bash + +. "$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )/script-helper" + +# Using cut because TRAVIS_SCALA_VERSION is the full Scala +# version (for example 2.12.4), but Gradle expects just the +# binary version (for example 2.12) +scala_binary_version=$(echo $TRAVIS_SCALA_VERSION | cut -c1-4) + +echo "+------------------------------+" +echo "| Executing tests using Gradle |" +echo "+------------------------------+" +./gradlew -Dscala.binary.version=$scala_binary_version check -i --stacktrace diff --git a/play-java-websocket-example/scripts/test-sbt b/play-java-websocket-example/scripts/test-sbt new file mode 100755 index 000000000..0425367b1 --- /dev/null +++ b/play-java-websocket-example/scripts/test-sbt @@ -0,0 +1,8 @@ +#!/usr/bin/env bash + +. "$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )/script-helper" + +echo "+----------------------------+" +echo "| Executing tests using sbt |" +echo "+----------------------------+" +sbt ++$TRAVIS_SCALA_VERSION test diff --git a/play-java-websocket-example/test/controllers/FunctionalTest.java b/play-java-websocket-example/test/controllers/FunctionalTest.java new file mode 100644 index 000000000..3926f4df7 --- /dev/null +++ b/play-java-websocket-example/test/controllers/FunctionalTest.java @@ -0,0 +1,88 @@ +package controllers; + +import com.fasterxml.jackson.databind.JsonNode; +import play.libs.Json; +import play.shaded.ahc.org.asynchttpclient.AsyncHttpClient; +import play.shaded.ahc.org.asynchttpclient.AsyncHttpClientConfig; +import play.shaded.ahc.org.asynchttpclient.DefaultAsyncHttpClient; +import play.shaded.ahc.org.asynchttpclient.DefaultAsyncHttpClientConfig; +import play.shaded.ahc.org.asynchttpclient.ws.WebSocket; +import org.junit.Test; +import play.test.TestServer; + +import java.util.Collections; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CompletableFuture; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; +import static play.test.Helpers.running; +import static play.test.Helpers.testServer; +import static org.awaitility.Awaitility.*; + +public class FunctionalTest { + + @Test + public void testRejectWebSocket() { + TestServer server = testServer(37117); + running(server, () -> { + try { + AsyncHttpClientConfig config = new DefaultAsyncHttpClientConfig.Builder().setMaxRequestRetry(0).build(); + AsyncHttpClient client = new DefaultAsyncHttpClient(config); + WebSocketClient webSocketClient = new WebSocketClient(client); + + try { + String serverURL = "ws://localhost:37117/ws"; + WebSocketClient.LoggingListener listener = new WebSocketClient.LoggingListener(message -> {}); + CompletableFuture completionStage = webSocketClient.call(serverURL, listener); + await().until(completionStage::isDone); + assertThat(completionStage) + .hasFailedWithThrowableThat() + .hasMessageContaining("Invalid Status Code 403"); + } finally { + client.close(); + } + } catch (Exception e) { + fail("Unexpected exception", e); + } + }); + } + + @Test + public void testAcceptWebSocket() { + TestServer server = testServer(19001); + running(server, () -> { + try { + AsyncHttpClientConfig config = new DefaultAsyncHttpClientConfig.Builder().setMaxRequestRetry(0).build(); + AsyncHttpClient client = new DefaultAsyncHttpClient(config); + WebSocketClient webSocketClient = new WebSocketClient(client); + + try { + String serverURL = "ws://localhost:19001/ws"; + ArrayBlockingQueue queue = new ArrayBlockingQueue(10); + WebSocketClient.LoggingListener listener = new WebSocketClient.LoggingListener((message) -> { + try { + queue.put(message); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + CompletableFuture completionStage = webSocketClient.call(serverURL, listener); + + await().until(completionStage::isDone); + WebSocket websocket = completionStage.get(); + await().until(() -> websocket.isOpen() && queue.peek() != null); + String input = queue.take(); + + JsonNode json = Json.parse(input); + String symbol = json.get("symbol").asText(); + assertThat(Collections.singletonList(symbol)).isSubsetOf("AAPL", "GOOG", "ORCL"); + } finally { + client.close(); + } + } catch (Exception e) { + fail("Unexpected exception", e); + } + }); + } +} diff --git a/play-java-websocket-example/test/controllers/WebSocketClient.java b/play-java-websocket-example/test/controllers/WebSocketClient.java new file mode 100644 index 000000000..97af202fb --- /dev/null +++ b/play-java-websocket-example/test/controllers/WebSocketClient.java @@ -0,0 +1,73 @@ +package controllers; + +import play.shaded.ahc.org.asynchttpclient.AsyncHttpClient; +import play.shaded.ahc.org.asynchttpclient.BoundRequestBuilder; +import play.shaded.ahc.org.asynchttpclient.ListenableFuture; +import play.shaded.ahc.org.asynchttpclient.ws.WebSocket; +import play.shaded.ahc.org.asynchttpclient.ws.WebSocketListener; +import play.shaded.ahc.org.asynchttpclient.ws.WebSocketTextListener; +import play.shaded.ahc.org.asynchttpclient.ws.WebSocketUpgradeHandler; +import org.slf4j.Logger; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.function.Consumer; + +/** + * A quick wrapper around AHC WebSocket + * + * https://github.com/AsyncHttpClient/async-http-client/blob/2.0/client/src/main/java/org/asynchttpclient/ws/WebSocket.java + */ +public class WebSocketClient { + + private AsyncHttpClient client; + + public WebSocketClient(AsyncHttpClient c) { + this.client = c; + } + + public CompletableFuture call(String url, WebSocketTextListener listener) throws ExecutionException, InterruptedException { + final BoundRequestBuilder requestBuilder = client.prepareGet(url); + + final WebSocketUpgradeHandler handler = new WebSocketUpgradeHandler.Builder().addWebSocketListener(listener).build(); + final ListenableFuture future = requestBuilder.execute(handler); + return future.toCompletableFuture(); + } + + static class LoggingListener implements WebSocketTextListener { + private final Consumer onMessageCallback; + + public LoggingListener(Consumer onMessageCallback) { + this.onMessageCallback = onMessageCallback; + } + + private Logger logger = org.slf4j.LoggerFactory.getLogger(LoggingListener.class); + + private Throwable throwableFound = null; + + public Throwable getThrowable() { + return throwableFound; + } + + public void onOpen(WebSocket websocket) { + //logger.info("onClose: "); + //websocket.sendMessage("hello"); + } + + public void onClose(WebSocket websocket) { + //logger.info("onClose: "); + } + + public void onError(Throwable t) { + //logger.error("onError: ", t); + throwableFound = t; + } + + @Override + public void onMessage(String s) { + //logger.info("onMessage: s = " + s); + onMessageCallback.accept(s); + } + } + +}