Skip to content

Commit fac0cff

Browse files
author
Alvaro Nistal
committed
First commit
1 parent 627fb96 commit fac0cff

21 files changed

+656
-0
lines changed

README.md

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# Akka & Docker
2+
3+
```bash
4+
sbt docker:publishLocal
5+
docker run --name seed-1 akka-docker:2.3.4 --seed
6+
docker run --name seed-2 akka-docker:2.3.4 --seed <ip-of-your-seed-1>:2551
7+
docker run --name node-1 akka-docker:2.3.4 <ip-of-your-seed-1>:2551 <ip-of-your-seed-2>:2551
8+
docker run --name node-2 akka-docker:2.3.4 <ip-of-your-seed-1>:2551 <ip-of-your-seed-2>:2551
9+
```
10+

build.sbt

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
name := "anistal-akka-cluster"
2+
version := "0.1"
3+
scalaVersion := "2.11.5"
4+
5+
enablePlugins(JavaAppPackaging)
6+
7+
maintainer := "Alvaro Nistal"
8+
packageSummary := s"My akka cluster experiment"
9+
10+
libraryDependencies ++= Seq(
11+
"com.typesafe.akka" %% "akka-actor" % "2.4.3",
12+
"com.typesafe.akka" %% "akka-cluster" % "2.4.3",
13+
"com.typesafe.akka" %% "akka-cluster-tools" % "2.4.3",
14+
"com.typesafe.akka" %% "akka-http-experimental" % "2.4.3",
15+
// "com.typesafe.akka" %% "akka-http-core-experimental" % "2.4.3",
16+
"com.typesafe.akka" %% "akka-stream" % "2.4.3",
17+
"com.github.scopt" %% "scopt" % "3.4.0",
18+
"org.json4s" %% "json4s-native" % "3.3.0",
19+
"org.json4s" %% "json4s-ext" % "3.3.0",
20+
"org.twitter4j" % "twitter4j-core" % "4.0.4",
21+
"org.scalaj" %% "scalaj-http" % "2.2.1",
22+
"net.debasishg" %% "redisclient" % "3.0"
23+
)
24+

project/build.properties

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
sbt.version=0.13.8

project/plugins.sbt

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.0.3")

src/main/resources/application.conf

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
akka {
2+
log-dead-letters = 1
3+
log-dead-letters-during-shutdown = off
4+
loglevel = info
5+
6+
actor {
7+
provider = "akka.cluster.ClusterActorRefProvider"
8+
debug {
9+
receive = off
10+
lifecycle = off
11+
}
12+
}
13+
14+
remote {
15+
netty.tcp {
16+
hostname = ${clustering.ip}
17+
port = ${clustering.port}
18+
}
19+
}
20+
21+
cluster {
22+
auto-down-unreachable-after = 10s
23+
}
24+
}
25+
26+
twitter {
27+
consumer-key = ${consumer.key}
28+
consumer-secret = ${consumer.secret}
29+
access-token = ${access.token}
30+
access-secret = ${access.secret}
31+
}
32+
33+
redis {
34+
host: "localhost"
35+
port: 2343
36+
}
37+
38+
clustering {
39+
port = 2551
40+
port = ${?CLUSTER_PORT}
41+
cluster.name = application
42+
}

src/main/resources/node.cluster.conf

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
akka.cluster {
2+
roles = [node]
3+
}

src/main/resources/node.seed.conf

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
akka.cluster {
2+
seed-nodes += "akka.tcp://"${clustering.cluster.name}"@"${clustering.ip}":"${clustering.port}
3+
roles = [seed]
4+
}

src/main/resources/web/index.html

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
<!DOCTYPE html>
2+
<html lang="en">
3+
<head>
4+
<meta charset="utf-8">
5+
<title>WebSockets Demo</title>
6+
</head>
7+
<body>
8+
<div id="page-wrapper">
9+
<h1>WebSockets Demo</h1>
10+
11+
<div id="status">Connecting...</div>
12+
13+
<ul id="messages"></ul>
14+
15+
<form id="message-form" action="#" method="post">
16+
<textarea id="message" placeholder="Write your message here..." required></textarea>
17+
<button type="submit">Send Message</button>
18+
<button type="button" id="close">Close Connection</button>
19+
</form>
20+
</div>
21+
22+
<script src="js/app.js"></script>
23+
</body>
24+
</html>

src/main/resources/web/js/app.js

+12
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
window.onload = function() {
2+
var socket = new WebSocket('ws://localhost:8080/ws');
3+
4+
socket.onopen = function(event) {
5+
console.info("connected!")
6+
}
7+
8+
socket.onmessage = function(event) {
9+
var message = event.data;
10+
console.info(message);
11+
};
12+
};
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package com.anistal.cluster.actors
2+
3+
import akka.actor._
4+
import akka.cluster.ClusterEvent._
5+
import akka.cluster._
6+
import akka.cluster.pubsub.DistributedPubSub
7+
import akka.cluster.pubsub.DistributedPubSubMediator.Put
8+
import com.anistal.cluster.actors.BackendMessages.{BackendMessageGet, BackendMessagePut, BackendMessageStart}
9+
import com.anistal.cluster.actors.FrontendMessages.FrontendMessageQuery
10+
import com.anistal.cluster.constants.ClusterConstants
11+
import com.anistal.cluster.helpers.SocialHelper
12+
import com.anistal.cluster.models.{GithubItem, TwitterModel}
13+
import com.redis.RedisClient
14+
import com.typesafe.config.Config
15+
import org.json4s.DefaultFormats
16+
import org.json4s.native.Serialization._
17+
18+
import scala.collection.mutable.Queue
19+
20+
class BackendActor(config: Config) extends Actor with ActorLogging {
21+
22+
implicit val formats = DefaultFormats
23+
24+
val cluster = Cluster(context.system)
25+
var messages = Queue[Seq[GithubItem]]()
26+
27+
lazy val socialHelper = new SocialHelper(config)
28+
lazy val redisClient = new RedisClient("172.19.1.170", 6379)
29+
30+
val mediator = DistributedPubSub(context.system).mediator
31+
mediator ! Put(self)
32+
33+
override def preStart(): Unit =
34+
cluster.subscribe(
35+
subscriber = self,
36+
initialStateMode = InitialStateAsEvents,
37+
classOf[MemberEvent], classOf[UnreachableMember])
38+
39+
override def postStop(): Unit = cluster.unsubscribe(self)
40+
41+
def receive = {
42+
case BackendMessageStart =>
43+
if(messages.isEmpty)
44+
(1 to ClusterConstants.GithubMaxNumberOfQueries).foreach(x => {
45+
log.info("Received BackendMessageStart")
46+
val githubProjects = socialHelper.githubQuery(
47+
ClusterConstants.GithubEndpoint,
48+
ClusterConstants.GithubParams + ("page" -> x.toString),
49+
ClusterConstants.GithubNumberOfElements)
50+
(messages ++= githubProjects.items.grouped(10))
51+
})
52+
else log.info("The queue is not empty yet")
53+
54+
55+
case BackendMessageGet =>
56+
if(messages.nonEmpty) {
57+
log.info(s"Message requested from ${sender()}")
58+
sender() ! FrontendMessageQuery(messages.dequeue())
59+
}
60+
61+
case BackendMessagePut(twitterModel) =>
62+
val twitterModelJson = write[TwitterModel](twitterModel)
63+
64+
if(redisClient.exists(s"${twitterModel.id}:${twitterModel.keyword}") == false) {
65+
redisClient.lpush("main:timeline", twitterModelJson)
66+
redisClient.lpush(s"${twitterModel.keyword}:timeline", twitterModelJson)
67+
redisClient.publish("main:timeline", twitterModelJson)
68+
redisClient.publish(s"${twitterModel.keyword}:timeline", twitterModelJson)
69+
}
70+
}
71+
}
72+
73+
case object BackendMessages {
74+
75+
case object BackendMessageStart
76+
case object BackendMessageGet
77+
case class BackendMessagePut(twitterModel: TwitterModel)
78+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package com.anistal.cluster.actors
2+
3+
import akka.actor._
4+
import akka.cluster.ClusterEvent._
5+
import akka.cluster._
6+
import akka.cluster.pubsub.DistributedPubSub
7+
import akka.cluster.pubsub.DistributedPubSubMediator.Send
8+
import com.anistal.cluster.actors.BackendMessages.{BackendMessageGet, BackendMessagePut}
9+
import com.anistal.cluster.actors.FrontendMessages.{FrontendMessageQuery, FrontendMessageStart}
10+
import com.anistal.cluster.helpers.SocialHelper
11+
import com.anistal.cluster.models.GithubItem
12+
import com.typesafe.config.Config
13+
import org.json4s.DefaultFormats
14+
15+
class FrontendActor(config: Config) extends Actor with ActorLogging {
16+
17+
implicit val formats = DefaultFormats
18+
19+
val cluster = Cluster(context.system)
20+
val mediator = DistributedPubSub(context.system).mediator
21+
22+
lazy val socialHelper = new SocialHelper(config)
23+
var frontendCache = scala.collection.mutable.Map[Long, Long]()
24+
25+
override def preStart(): Unit =
26+
cluster.subscribe(
27+
subscriber = self,
28+
initialStateMode = InitialStateAsEvents,
29+
classOf[MemberEvent], classOf[UnreachableMember])
30+
31+
override def postStop(): Unit = cluster.unsubscribe(self)
32+
33+
def receive = {
34+
case FrontendMessageStart =>
35+
log.info("Received FrontendMessageStart")
36+
mediator ! Send(path = "/user/backend", msg = BackendMessageGet, localAffinity = true)
37+
38+
case FrontendMessageQuery(items) =>
39+
log.info("Received FrontendMessageQuery")
40+
41+
val query = items.mkString(" OR ")
42+
socialHelper.twitterQuery(query).foreach(tweet => {
43+
if(frontendCache.contains(tweet.id) == false) {
44+
mediator ! Send(path = "/user/backend", msg = BackendMessagePut(tweet), localAffinity = true)
45+
frontendCache.put(tweet.id, tweet.id)
46+
}
47+
})
48+
}
49+
}
50+
51+
case object FrontendMessages {
52+
53+
case object FrontendMessageStart
54+
case class FrontendMessageQuery(items: Seq[GithubItem])
55+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package com.anistal.cluster.actors
2+
3+
import akka.actor._
4+
import akka.cluster.ClusterEvent._
5+
import akka.cluster._
6+
7+
class ListenerActor extends Actor with ActorLogging {
8+
9+
val cluster = Cluster(context.system)
10+
11+
override def preStart(): Unit =
12+
cluster.subscribe(
13+
subscriber = self,
14+
initialStateMode = InitialStateAsEvents,
15+
classOf[MemberEvent], classOf[UnreachableMember])
16+
17+
override def postStop(): Unit = cluster.unsubscribe(self)
18+
19+
def receive = {
20+
case MemberUp(member) =>
21+
log.info(s"Member is [Up]: ${member.address}")
22+
case UnreachableMember(member) =>
23+
log.info(s"Member is [Unreachable]: ${member.address}")
24+
case MemberRemoved(member, previousStatus) =>
25+
log.info(s"Member is [Removed]: ${member.address}")
26+
case MemberExited(member) =>
27+
log.info(s"Member is [Exited]: ${member.address}")
28+
case _: MemberEvent =>
29+
log.debug("Nothing to do")
30+
}
31+
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.anistal.cluster.actors
2+
3+
import akka.actor.Props
4+
import akka.stream.actor.ActorPublisher
5+
6+
class PublishActor extends ActorPublisher[String] {
7+
8+
override def preStart = {
9+
context.system.eventStream.subscribe(self, classOf[String])
10+
}
11+
12+
override def receive = {
13+
case tweet: String =>
14+
if(isActive && totalDemand > 0) onNext(tweet)
15+
}
16+
}
17+
18+
object PublishActor {
19+
def props: Props = Props(new PublishActor())
20+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.anistal.cluster.actors
2+
3+
import akka.actor.{ActorLogging, Actor, Props}
4+
5+
class WebsocketActor extends Actor with ActorLogging {
6+
7+
override def preStart = {
8+
//context.system.eventStream.subscribe(self, classOf[String])
9+
}
10+
11+
override def receive = {
12+
case value: String =>
13+
log.info(value)
14+
context.system.eventStream.publish(value)
15+
}
16+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package com.anistal.cluster.constants
2+
3+
object ClusterConstants {
4+
5+
6+
val GithubMaxNumberOfQueries = 1
7+
val GithubNumberOfElements = 100
8+
val GithubEndpoint = "https://api.github.com/search/repositories"
9+
val GithubParams = Map(
10+
"q" -> "reactive",
11+
"sort" -> "stars",
12+
"order" -> "desc",
13+
"per_page" -> "100")
14+
}

0 commit comments

Comments
 (0)