Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP - AWS 2.0 client #93

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
name := "akka-persistence-dynamodb"

scalaVersion := "2.11.12"
crossScalaVersions := Seq("2.11.12", "2.12.8", "2.13.0")
crossScalaVersions := Seq("2.11.12", "2.12.8", "2.13.1")
crossVersion := CrossVersion.binary

val akkaVersion = "2.5.29"
val amzVersion = "1.11.602"
val amzVersion = "2.13.41"

scalacOptions += "-target:jvm-1.8"

libraryDependencies ++= Seq(
"com.amazonaws" % "aws-java-sdk-core" % amzVersion,
"com.amazonaws" % "aws-java-sdk-dynamodb" % amzVersion,
"software.amazon.awssdk" % "dynamodb" % amzVersion,
"com.typesafe.akka" %% "akka-persistence" % akkaVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.typesafe.akka" %% "akka-persistence-tck" % akkaVersion % "test",
Expand Down
17 changes: 9 additions & 8 deletions src/main/scala/akka/persistence/dynamodb/DynamoDBConfig.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,19 @@ import java.net.InetAddress

import akka.persistence.dynamodb.journal.DynamoDBHelper
import akka.serialization.Serialization
import com.amazonaws.{ ClientConfiguration, Protocol }
// import com.amazonaws.{ ClientConfiguration, Protocol }
import com.typesafe.config.Config

trait ClientConfig {
/* trait ClientConfig {
val config: ClientConfiguration
}
} */

trait DynamoDBConfig {
val AwsKey: String
val AwsSecret: String
val Endpoint: String
val ClientDispatcher: String
val client: ClientConfig
// val client: ClientConfig
val Tracing: Boolean
val MaxBatchGet: Int
val MaxBatchWrite: Int
Expand All @@ -28,7 +29,7 @@ trait DynamoDBConfig {

}

class DynamoDBClientConfig(c: Config) extends ClientConfig {
class DynamoDBClientConfig(c: Config) { //extends ClientConfig {
private val cc = c getConfig "aws-client-config"
private def get[T](path: String, extract: (Config, String) => T, set: T => Unit): Unit =
if (cc.getString(path) == "default") ()
Expand All @@ -40,9 +41,9 @@ class DynamoDBClientConfig(c: Config) extends ClientConfig {

private var foundSettings = List.empty[String]
override lazy val toString: String = foundSettings.reverse.mkString("{", ",", "}")
val config = new ClientConfiguration
// val config = new ClientConfiguration

get("client-execution-timeout", _.getInt(_), config.setClientExecutionTimeout)
/*get("client-execution-timeout", _.getInt(_), config.setClientExecutionTimeout)
get("connection-max-idle-millis", _.getLong(_), config.setConnectionMaxIdleMillis)
get("connection-timeout", _.getInt(_), config.setConnectionTimeout)
get("connection-ttl", _.getLong(_), config.setConnectionTTL)
Expand Down Expand Up @@ -70,5 +71,5 @@ class DynamoDBClientConfig(c: Config) extends ClientConfig {
get("use-gzip", _.getBoolean(_), config.setUseExpectContinue)
get("use-reaper", _.getBoolean(_), config.setUseReaper)
get("use-tcp-keepalive", _.getBoolean(_), config.setUseTcpKeepAlive)
get("user-agent", _.getString(_), config.setUserAgent)
get("user-agent", _.getString(_), config.setUserAgent) */
}
26 changes: 14 additions & 12 deletions src/main/scala/akka/persistence/dynamodb/DynamoDBRequests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@ import java.util.{ Collections, HashMap => JHMap, List => JList, Map => JMap }

import akka.Done
import akka.actor.{ Actor, ActorLogging }
import akka.persistence.dynamodb.journal.{ DynamoDBHelper }
import com.amazonaws.services.dynamodbv2.model._
import akka.persistence.dynamodb.journal.DynamoDBHelper

import scala.collection.JavaConverters._
import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal
import scala.concurrent.duration._
import akka.pattern.after
import software.amazon.awssdk.services.dynamodb.model._

private[dynamodb] trait DynamoDBRequests {
this: ActorLogging with Actor =>
Expand All @@ -26,15 +27,16 @@ private[dynamodb] trait DynamoDBRequests {
import context.dispatcher
import settings._

def putItem(item: Item): PutItemRequest = new PutItemRequest().withTableName(Table).withItem(item)
def putItem(item: Item): PutItemRequest = PutItemRequest.builder().tableName(Table).item(item).build()

def batchWriteReq(writes: Seq[WriteRequest]): BatchWriteItemRequest =
batchWriteReq(Collections.singletonMap(Table, writes.asJava))

def batchWriteReq(items: JMap[String, JList[WriteRequest]]): BatchWriteItemRequest =
new BatchWriteItemRequest()
.withRequestItems(items)
.withReturnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
BatchWriteItemRequest.builder()
.requestItems(items)
.returnConsumedCapacity(ReturnConsumedCapacity.TOTAL)
.build()

/*
* Request execution helpers.
Expand Down Expand Up @@ -66,18 +68,18 @@ private[dynamodb] trait DynamoDBRequests {
* the retries from the client, then we are hosed and cannot continue; that is why we have a RuntimeException here
*/
private def sendUnprocessedItems(
result: BatchWriteItemResult,
retriesRemaining: Int = 10,
backoff: FiniteDuration = 1.millis): Future[BatchWriteItemResult] = {
val unprocessed: Int = result.getUnprocessedItems.get(Table) match {
result: BatchWriteItemResponse,
retriesRemaining: Int = 10,
backoff: FiniteDuration = 1.millis): Future[BatchWriteItemResponse] = {
val unprocessed: Int = result.unprocessedItems.get(Table) match {
case null => 0
case items => items.size
}
if (unprocessed == 0) Future.successful(result)
else if (retriesRemaining == 0) {
throw new RuntimeException(s"unable to batch write ${result.getUnprocessedItems.get(Table)} after 10 tries")
throw new RuntimeException(s"unable to batch write ${result.unprocessedItems.get(Table)} after 10 tries")
} else {
val rest = batchWriteReq(result.getUnprocessedItems)
val rest = batchWriteReq(result.unprocessedItems)
after(backoff, context.system.scheduler)(dynamo.batchWriteItem(rest).flatMap(r => sendUnprocessedItems(r, retriesRemaining - 1, backoff * 2)))
}
}
Expand Down
140 changes: 72 additions & 68 deletions src/main/scala/akka/persistence/dynamodb/journal/DynamoDBHelper.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright (C) 2016 Typesafe Inc. <http://www.typesafe.com>
* Copyright (C) 2020 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.persistence.dynamodb.journal

import com.amazonaws.{ AmazonServiceException, AmazonWebServiceRequest }
import com.amazonaws.handlers.AsyncHandler
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsyncClient
import com.amazonaws.services.dynamodbv2.model._
import java.util.function.BiConsumer

import akka.actor.ActorRef
import akka.persistence.dynamodb.{ DynamoDBConfig, Item }
import akka.actor.Scheduler
import akka.event.LoggingAdapter
import akka.pattern.after
Expand All @@ -15,10 +15,8 @@ import java.util.{ concurrent => juc }
import scala.collection.JavaConverters._
import scala.concurrent.{ ExecutionContext, Future, Promise }
import scala.concurrent.duration._
import scala.reflect.ClassTag
import scala.util.control.NoStackTrace
import akka.actor.ActorRef
import akka.persistence.dynamodb.{ DynamoDBConfig, Item }
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.dynamodb.model._

case class LatencyReport(nanos: Long, retries: Int)
private class RetryStateHolder(var retries: Int = 10, var backoff: FiniteDuration = 1.millis)
Expand All @@ -27,43 +25,49 @@ trait DynamoDBHelper {

implicit val ec: ExecutionContext
val scheduler: Scheduler
val dynamoDB: AmazonDynamoDBAsyncClient
val dynamoDB: DynamoDbAsyncClient
val log: LoggingAdapter
val settings: DynamoDBConfig
import settings._

def shutdown(): Unit = dynamoDB.shutdown()
def shutdown(): Unit = dynamoDB.close()

private var reporter: ActorRef = _
def setReporter(ref: ActorRef): Unit = reporter = ref

private def send[In <: AmazonWebServiceRequest, Out](aws: In, func: AsyncHandler[In, Out] => juc.Future[Out])(implicit d: Describe[_ >: In]): Future[Out] = {

private def send[In <: DynamoDbRequest, Out](aws: In, func: In => juc.CompletableFuture[Out])(implicit d: Describe[_ >: In]): Future[Out] = {
def name = d.desc(aws)

def sendSingle(): Future[Out] = {
val p = Promise[Out]

val handler = new AsyncHandler[In, Out] {
override def onError(ex: Exception) = ex match {
case e: ProvisionedThroughputExceededException =>
p.tryFailure(ex)
case _ =>
val n = name
log.error(ex, "failure while executing {}", n)
p.tryFailure(new DynamoDBJournalFailure("failure while executing " + n, ex))
}
override def onSuccess(req: In, resp: Out) = p.trySuccess(resp)
}

try {
func(handler)
func(aws).whenCompleteAsync(
new BiConsumer[Out, Throwable] {
override def accept(resp: Out, ex: Throwable): Unit = {
if (resp != null) {
p.trySuccess(resp)
} else {
// Handle the error
ex match {
case c: java.util.concurrent.CompletionException =>
val cause = c.getCause
cause match {
case e: ProvisionedThroughputExceededException =>
p.tryFailure(e)
case _ =>
val n = name
log.error(cause, "failure while executing {}", n)
p.tryFailure(new DynamoDBJournalFailure("failure while executing " + n, cause))
}
}
}
}
})
} catch {
case ex: Throwable =>
log.error(ex, "failure while preparing {}", name)
p.tryFailure(ex)
}

p.future
}

Expand Down Expand Up @@ -92,94 +96,94 @@ trait DynamoDBHelper {
trait Describe[T] {
def desc(t: T): String
protected def formatKey(i: Item): String = {
val key = i.get(Key) match { case null => "<none>" case x => x.getS }
val sort = i.get(Sort) match { case null => "<none>" case x => x.getN }
val key = i.get(Key) match { case null => "<none>" case x => x.s }
val sort = i.get(Sort) match { case null => "<none>" case x => x.n }
s"[$Key=$key,$Sort=$sort]"
}
}

object Describe {
implicit object GenericDescribe extends Describe[AmazonWebServiceRequest] {
def desc(aws: AmazonWebServiceRequest): String = aws.getClass.getSimpleName
implicit object GenericDescribe extends Describe[DynamoDbRequest] {
def desc(aws: DynamoDbRequest): String = aws.getClass.getSimpleName
}
}

implicit object DescribeDescribe extends Describe[DescribeTableRequest] {
def desc(aws: DescribeTableRequest): String = s"DescribeTableRequest(${aws.getTableName})"
def desc(aws: DescribeTableRequest): String = s"DescribeTableRequest(${aws.tableName()})"
}

implicit object QueryDescribe extends Describe[QueryRequest] {
def desc(aws: QueryRequest): String = s"QueryRequest(${aws.getTableName},${aws.getExpressionAttributeValues})"
def desc(aws: QueryRequest): String = s"QueryRequest(${aws.tableName},${aws.expressionAttributeValues})"
}

implicit object PutItemDescribe extends Describe[PutItemRequest] {
def desc(aws: PutItemRequest): String = s"PutItemRequest(${aws.getTableName},${formatKey(aws.getItem)})"
def desc(aws: PutItemRequest): String = s"PutItemRequest(${aws.tableName},${formatKey(aws.item)})"
}

implicit object DeleteDescribe extends Describe[DeleteItemRequest] {
def desc(aws: DeleteItemRequest): String = s"DeleteItemRequest(${aws.getTableName},${formatKey(aws.getKey)})"
def desc(aws: DeleteItemRequest): String = s"DeleteItemRequest(${aws.tableName},${formatKey(aws.key)})"
}

implicit object BatchGetItemDescribe extends Describe[BatchGetItemRequest] {
def desc(aws: BatchGetItemRequest): String = {
val entry = aws.getRequestItems.entrySet.iterator.next()
val entry = aws.requestItems.entrySet.iterator.next()
val table = entry.getKey
val keys = entry.getValue.getKeys.asScala.map(formatKey)
val keys = entry.getValue.keys().asScala.map(formatKey)
s"BatchGetItemRequest($table, ${keys.mkString("(", ",", ")")})"
}
}

implicit object BatchWriteItemDescribe extends Describe[BatchWriteItemRequest] {
def desc(aws: BatchWriteItemRequest): String = {
val entry = aws.getRequestItems.entrySet.iterator.next()
val entry = aws.requestItems.entrySet.iterator.next()
val table = entry.getKey
val keys = entry.getValue.asScala.map { write =>
write.getDeleteRequest match {
case null => "put" + formatKey(write.getPutRequest.getItem)
case del => "del" + formatKey(del.getKey)
write.deleteRequest() match {
case null => "put" + formatKey(write.putRequest.item)
case del => "del" + formatKey(del.key)
}
}
s"BatchWriteItemRequest($table, ${keys.mkString("(", ",", ")")})"
}
}

def listTables(aws: ListTablesRequest): Future[ListTablesResult] =
send[ListTablesRequest, ListTablesResult](aws, dynamoDB.listTablesAsync(aws, _))
def listTables(aws: ListTablesRequest): Future[ListTablesResponse] =
send[ListTablesRequest, ListTablesResponse](aws, dynamoDB.listTables)

def describeTable(aws: DescribeTableRequest): Future[DescribeTableResult] =
send[DescribeTableRequest, DescribeTableResult](aws, dynamoDB.describeTableAsync(aws, _))
def describeTable(aws: DescribeTableRequest): Future[DescribeTableResponse] =
send[DescribeTableRequest, DescribeTableResponse](aws, dynamoDB.describeTable)

def createTable(aws: CreateTableRequest): Future[CreateTableResult] =
send[CreateTableRequest, CreateTableResult](aws, dynamoDB.createTableAsync(aws, _))
def createTable(aws: CreateTableRequest): Future[CreateTableResponse] =
send[CreateTableRequest, CreateTableResponse](aws, dynamoDB.createTable)

def updateTable(aws: UpdateTableRequest): Future[UpdateTableResult] =
send[UpdateTableRequest, UpdateTableResult](aws, dynamoDB.updateTableAsync(aws, _))
def updateTable(aws: UpdateTableRequest): Future[UpdateTableResponse] =
send[UpdateTableRequest, UpdateTableResponse](aws, dynamoDB.updateTable)

def deleteTable(aws: DeleteTableRequest): Future[DeleteTableResult] =
send[DeleteTableRequest, DeleteTableResult](aws, dynamoDB.deleteTableAsync(aws, _))
def deleteTable(aws: DeleteTableRequest): Future[DeleteTableResponse] =
send[DeleteTableRequest, DeleteTableResponse](aws, dynamoDB.deleteTable)

def query(aws: QueryRequest): Future[QueryResult] =
send[QueryRequest, QueryResult](aws, dynamoDB.queryAsync(aws, _))
def query(aws: QueryRequest): Future[QueryResponse] =
send[QueryRequest, QueryResponse](aws, dynamoDB.query)

def scan(aws: ScanRequest): Future[ScanResult] =
send[ScanRequest, ScanResult](aws, dynamoDB.scanAsync(aws, _))
def scan(aws: ScanRequest): Future[ScanResponse] =
send[ScanRequest, ScanResponse](aws, dynamoDB.scan)

def putItem(aws: PutItemRequest): Future[PutItemResult] =
send[PutItemRequest, PutItemResult](aws, dynamoDB.putItemAsync(aws, _))
def putItem(aws: PutItemRequest): Future[PutItemResponse] =
send[PutItemRequest, PutItemResponse](aws, dynamoDB.putItem)

def getItem(aws: GetItemRequest): Future[GetItemResult] =
send[GetItemRequest, GetItemResult](aws, dynamoDB.getItemAsync(aws, _))
def getItem(aws: GetItemRequest): Future[GetItemResponse] =
send[GetItemRequest, GetItemResponse](aws, dynamoDB.getItem)

def updateItem(aws: UpdateItemRequest): Future[UpdateItemResult] =
send[UpdateItemRequest, UpdateItemResult](aws, dynamoDB.updateItemAsync(aws, _))
def updateItem(aws: UpdateItemRequest): Future[UpdateItemResponse] =
send[UpdateItemRequest, UpdateItemResponse](aws, dynamoDB.updateItem)

def deleteItem(aws: DeleteItemRequest): Future[DeleteItemResult] =
send[DeleteItemRequest, DeleteItemResult](aws, dynamoDB.deleteItemAsync(aws, _))
def deleteItem(aws: DeleteItemRequest): Future[DeleteItemResponse] =
send[DeleteItemRequest, DeleteItemResponse](aws, dynamoDB.deleteItem)

def batchWriteItem(aws: BatchWriteItemRequest): Future[BatchWriteItemResult] =
send[BatchWriteItemRequest, BatchWriteItemResult](aws, dynamoDB.batchWriteItemAsync(aws, _))
def batchWriteItem(aws: BatchWriteItemRequest): Future[BatchWriteItemResponse] =
send[BatchWriteItemRequest, BatchWriteItemResponse](aws, dynamoDB.batchWriteItem)

def batchGetItem(aws: BatchGetItemRequest): Future[BatchGetItemResult] =
send[BatchGetItemRequest, BatchGetItemResult](aws, dynamoDB.batchGetItemAsync(aws, _))
def batchGetItem(aws: BatchGetItemRequest): Future[BatchGetItemResponse] =
send[BatchGetItemRequest, BatchGetItemResponse](aws, dynamoDB.batchGetItem)

}
Loading