Skip to content

Commit

Permalink
Implement InternalClient
Browse files Browse the repository at this point in the history
  • Loading branch information
qurben committed Apr 2, 2020
1 parent ca5c8de commit 510b562
Show file tree
Hide file tree
Showing 8 changed files with 128 additions and 104 deletions.
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor-testkit-typed" % akkaVersion % Test,
"com.typesafe.akka" %% "akka-stream-testkit" % akkaVersion % Test,
"com.typesafe.akka" %% "akka-http-testkit" % akkaHttpVersion % Test,
"org.scalaj" %% "scalaj-http" % "2.4.2" % Test,
)
2 changes: 1 addition & 1 deletion src/main/scala/DistributedHashTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ object DistributedHashTable {
// We need to convert it to a hex string in order to make it unsigned
val hexString = DistributedHashTable.convertBytesToHex(hash)
// Radix 16 because we use hex
BigInt(hexString, 16)
BigInt(hexString, 16) % 3
}

// Helper method that converts bytes to a hex string
Expand Down
14 changes: 9 additions & 5 deletions src/main/scala/ExternalRoutes.scala
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import InternalClient.{Get, Put}
import InternalClient.{Get, KO, Put, ValueRes}
import akka.actor.typed.{ActorRef, ActorSystem}
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.server.Directives._
Expand Down Expand Up @@ -26,9 +26,9 @@ class ExternalRoutes(buildValueRepository: ActorRef[ValueRepository.Command], in
entity(as[ValueRepository.Value]) { job =>
val putResult = internalClient.ask(Put(job, _: ActorRef[InternalClient.Response]))
onSuccess(putResult) {
case InternalClient.ValueRes(_) => complete(StatusCodes.InternalServerError)
case InternalClient.ValueRes(_) => complete(StatusCodes.OK -> "Value added")
case InternalClient.OK => complete("Value added")
case InternalClient.KO => complete(StatusCodes.InternalServerError)
case InternalClient.KO(reason) => complete(StatusCodes.InternalServerError -> reason)
}
}
},
Expand All @@ -50,8 +50,12 @@ class ExternalRoutes(buildValueRepository: ActorRef[ValueRepository.Command], in
}
},
(get & path(Remaining)) { id =>
val getResult = internalClient.ask(Get(id, _: ActorRef[InternalClient.ValueRes]))
onSuccess(getResult)(v => complete(v.value))
val getResult = internalClient.ask(Get(id, _: ActorRef[InternalClient.Response]))
onSuccess(getResult) {

case ValueRes(value) => complete(value.value)
case KO(reason) => complete(StatusCodes.InternalServerError -> reason)
}
}
)
}
Expand Down
158 changes: 62 additions & 96 deletions src/main/scala/InternalClient.scala
Original file line number Diff line number Diff line change
@@ -1,53 +1,64 @@
import java.util.concurrent.TimeoutException

import DistributedHashTable.GetTopN
import InternalClient._
import akka.actor
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.scaladsl.{AbstractBehavior, ActorContext, Behaviors}
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.ContentTypes.`application/json`
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.Materializer
import akka.util.Timeout

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}

object InternalClient {
def apply(valueRepository: ActorRef[ValueRepository.Command], dht: ActorRef[DistributedHashTable.Command], host: String, port: Int, n:Int, r:Int, w:Int): Behavior[Command] =
def apply(valueRepository: ActorRef[ValueRepository.Command], dht: ActorRef[DistributedHashTable.Command], host: String, port: Int, n: Int, r: Int, w: Int): Behavior[Command] =
Behaviors.setup(context => new InternalClient(context, valueRepository, dht, host, port, n, r, w))

// Trait defining responses
sealed trait Response

final case class ValueRes(value: ValueRepository.Value) extends Response

case object OK extends Response
case object KO extends Response

case class KO(reason: String) extends Response

sealed trait Command

final case class Put(value: ValueRepository.Value, replyTo: ActorRef[Response]) extends Command
final case class Get(key: String, replyTo: ActorRef[ValueRes]) extends Command
final case class Init(host:String, port:Int, n:Int, r:Int, w:Int) extends Command

final case class Putted(replyTo: ActorRef[Response]) extends Command

final case class InternalClientFailure(replyTo: ActorRef[Response], reason: String) extends Command

final case class Get(key: String, replyTo: ActorRef[Response]) extends Command

final case class Retrieved(value: ValueRepository.Value, replyTo: ActorRef[Response]) extends Command

final case class Init(host: String, port: Int, n: Int, r: Int, w: Int) extends Command

}


class InternalClient(context: ActorContext[InternalClient.Command], valueRepository: ActorRef[ValueRepository.Command], dht: ActorRef[DistributedHashTable.Command], host: String, port: Int, n:Int, r:Int, w:Int)
class InternalClient(context: ActorContext[InternalClient.Command], valueRepository: ActorRef[ValueRepository.Command], dht: ActorRef[DistributedHashTable.Command], host: String, port: Int, n: Int, r: Int, w: Int)
extends AbstractBehavior[InternalClient.Command](context) {

import InternalClient._
import JsonSupport._
import spray.json._

implicit val actorSystem: ActorSystem[Nothing] = context.system
implicit val classicActorSystem: actor.ActorSystem = context.system.toClassic
implicit val materializer: Materializer = Materializer(classicActorSystem)
implicit val timeout: Timeout = 5.seconds

// TODO make sure to initialize self with proper host + port
var self: Uri = Uri.from(scheme = "http", host ="localhost", port = 9001, path = "/internal")
var meHost: Uri = Uri.from(scheme = "http", host = host, port = port, path = "/internal")

var N: Int = n
var R: Int = r
Expand All @@ -71,91 +82,36 @@ class InternalClient(context: ActorContext[InternalClient.Command], valueReposit
* @return the value of that key
*/
def read(key: String): Future[ValueRepository.Value] = {
// add to it's own server
val futures: List[Future[HttpResponse]] = List(getOtherNodes(key, self))

// Use DHT to get top N nodes
val topNListFuture = dht.ask(GetTopN(DistributedHashTable.getHash(key), N, _: ActorRef[Option[LazyList[RingNode]]]))(5.seconds, schedulerFromActorSystem).map {
case Some(topNList) => topNList
case None => throw new Exception("Error getting top N nodes")
}

topNListFuture.map(topNList => {
for (node <- topNList) {
futures :+ getOtherNodes(key, Uri.from(scheme = "http", host = node.host, port = node.port, path = "/internal"))
}
})

// Convert to Future[Try[T]] to catch exceptions in the Future.sequence line
val listOfFutureTrys = futures.map(futureToFutureTry)
// Convert to Future[List[Try[HttpResponse]]]
val responsesFuture = Future.sequence(listOfFutureTrys)

// Get only the successful responses
val successesFuture = responsesFuture.map(_.filter(_.isSuccess))
val failuresFuture = responsesFuture.map(_.filter(_.isFailure))

// Remove the Try from Future[List[Try[HttpResponse]]]
val successResponsesFuture = successesFuture.map(f => f.map(t => t.get))
val valueFuture = successResponsesFuture.map(l => {
if (l.size < R - 1) {
throw new TimeoutException("Did not get R - 1 successfull reads from other nodes")
} else {
val values = List.empty[ValueRepository.Value]
// For all responses
for (y <- l) {
values :+ Unmarshal(y).to[ValueRepository.Value]
}
checkVersion(values)
for {
topN <- dht.ask(GetTopN(DistributedHashTable.getHash(key), N, _: ActorRef[Option[LazyList[RingNode]]]))
responses <- topN match {
case Some(topn) => Future.sequence(topn.map(n => getOtherNodes(key, Uri.from("http", "", n.host, n.port, "/internal/"))))
case _ => throw new Exception("Error getting top N nodes")
}
})
valueFuture
successfulResponses = responses.filter(response => response.status == StatusCodes.OK)
versions <- Future.sequence(successfulResponses.map(Unmarshal(_).to[ValueRepository.Value]))
} yield checkVersion(versions.toList)
}

/**
* Write value to other nodes
*
* @param v value to write
* @return True if written to W - 1 other nodes successfully, false otherwise
*/
def write(v: ValueRepository.Value): Future[Boolean] = {
// add to it's own server
val futures: List[Future[HttpResponse]] = List(putOtherNodes(v, self))

// Use DHT to get top N nodes
val topNListFuture = dht.ask(GetTopN(DistributedHashTable.getHash(v.key), N, _: ActorRef[Option[LazyList[RingNode]]]))(5.seconds, schedulerFromActorSystem).map {
case Some(topNList) => topNList
case None => throw new Exception("Error getting top N nodes")
}

topNListFuture.map(topNList => {
for (node <-topNList) {
futures :+ putOtherNodes(v, Uri.from(scheme = "http", host = node.host, port = node.port, path = "/internal"))
for {
topN <- dht.ask(GetTopN(DistributedHashTable.getHash(v.key), N, _: ActorRef[Option[LazyList[RingNode]]]))
responses <- topN match {
case Some(topN) => Future.sequence(topN.map(node => putOtherNodes(v, Uri.from("http", "", node.host, node.port, "/internal"))))
case _ => throw new Exception("Error getting top N nodes")
}
})

// Convert to Future[Try[T]] to catch exceptions in the Future.sequence line
val listOfFutureTrys = futures.map(futureToFutureTry)
// Convert to Future[List[Try[HttpResponse]]]
val responsesFuture = Future.sequence(listOfFutureTrys)

// Get only the successful responses
val successesFuture = responsesFuture.map(_.filter(_.isSuccess))
val failuresFuture = responsesFuture.map(_.filter(_.isFailure))

// Remove the Try from Future[List[Try[HttpResponse]]]
val successResponsesFuture = successesFuture.map(f => f.map(t => t.get))
val booleanFuture = successResponsesFuture.map(l => {
if (l.size < W - 1) {
false
} else {
true
}
})
booleanFuture
} yield responses.count(r => r.status == StatusCodes.OK) >= W - 1
}

/**
* Send get request to server
*
* @param key key of the value to get
* @param address address of the server
* @return Http Response
Expand All @@ -175,9 +131,8 @@ class InternalClient(context: ActorContext[InternalClient.Command], valueReposit
Http().singleRequest(HttpRequest(
method = HttpMethods.POST,
uri = address,
entity = HttpEntity(ContentTypes.`application/json`, v.toJson.prettyPrint)
)
)
entity = HttpEntity(`application/json`, v.toJson.compactPrint)
))
}

/**
Expand All @@ -189,7 +144,7 @@ class InternalClient(context: ActorContext[InternalClient.Command], valueReposit
def checkVersion(values: List[ValueRepository.Value]): ValueRepository.Value = {
var result = values.head
for (a <- values; b <- values) {
if (a.version.>(b.version) && result.version.<(a.version)) {
if (a.version > b.version && result.version < a.version) {
result = a
}
}
Expand All @@ -198,14 +153,15 @@ class InternalClient(context: ActorContext[InternalClient.Command], valueReposit

/**
* Helper method to adjust parameters of the client
*
* @param h hostname
* @param p port
* @param n N value
* @param r R value
* @param w W value
*/
def initParams(h:String, p:Int, n:Int, r:Int, w:Int) = {
this.self = Uri.from(scheme = "http", host = h, port = p, path = "/internal")
def initParams(h: String, p: Int, n: Int, r: Int, w: Int): Unit = {
this.meHost = Uri.from(scheme = "http", host = h, port = p, path = "/internal")
this.N = n
this.R = r
this.W = w
Expand All @@ -214,16 +170,26 @@ class InternalClient(context: ActorContext[InternalClient.Command], valueReposit
override def onMessage(msg: InternalClient.Command): Behavior[InternalClient.Command] = {
msg match {
case Put(value, replyTo) =>
val putFuture = write(value)
putFuture.map(putRes => {
if (putRes) replyTo ! OK else replyTo ! KO
})
this
context.pipeToSelf(write(value)) {
case Success(true) => Putted(replyTo)
case Success(false) => InternalClientFailure(replyTo, "Not enough writes")
case Failure(exception) => InternalClientFailure(replyTo, exception.getMessage)
}
Behaviors.same
case Putted(replyTo) =>
replyTo ! OK
Behaviors.same
case InternalClientFailure(replyTo, reason) =>
replyTo ! KO(reason)
Behaviors.same
case Retrieved(value, replyTo) =>
replyTo ! ValueRes(value)
Behaviors.same
case Get(key, replyTo) =>
val f = read(key)
f.map(valRes => {
replyTo ! ValueRes(valRes)
})
context.pipeToSelf(read(key)) {
case Success(value) => Retrieved(value, replyTo)
case Failure(exception) => InternalClientFailure(replyTo, exception.getMessage)
}
Behaviors.same
case Init(h, p, n, r, w) => initParams(h, p, n, r, w)
this
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/Node.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import DistributedHashTable.{AddNode, Response}
import akka.actor.typed.{ActorRef, Behavior, Scheduler}
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{Behavior, Scheduler}
import akka.http.scaladsl.Http.ServerBinding
import akka.util.Timeout
import main.NodeConfig
Expand Down
1 change: 0 additions & 1 deletion src/main/scala/ValueRepository.scala
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import akka.actor.Status.Status
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior}
import akka.cluster.VectorClock
Expand Down
7 changes: 7 additions & 0 deletions src/resources/application.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
akka {
actor {
typed {
timeout = infinite
}
}
}
47 changes: 47 additions & 0 deletions src/test/scala/ClusterSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import Node.Stop
import akka.actor.typed.ActorSystem
import main.NodeConfig
import org.scalatest.BeforeAndAfter
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import scalaj.http.Http

class ClusterSpec extends AnyWordSpec with Matchers with BeforeAndAfter {
var cluster: List[ActorSystem[Node.Message]] = List()

before {
val nodes = List(
NodeConfig(BigInt("0"), "node1", "localhost", 8001, "localhost", 9001),
NodeConfig(BigInt("1"), "node2", "localhost", 8002, "localhost", 9002),
NodeConfig(BigInt("3"), "node3", "localhost", 8003, "localhost", 9003),
NodeConfig(BigInt("4"), "node4", "localhost", 8004, "localhost", 9004),
)

cluster = nodes.map(n => ActorSystem(Node(n, nodes), n.name))
}

after {
cluster.foreach(n => n ! Stop)
}

"The cluster" should {
"start" in {
Http("http://localhost:8001/values")
.timeout(1000000, 100000)
.postData("""{"key": "myKey", "value": "myValue", "version": {}}""")
.header("content-type", "application/json")
.asString
.body should be ("Value added")

// This host should know about it
Http("http://localhost:8001/values/myKey")
.asString
.body should be ("myValue")

// It should be replicated here
Http("http://localhost:8002/values/myKey")
.asString
.body should be ("myValue")
}
}
}

0 comments on commit 510b562

Please sign in to comment.