Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,12 @@
transactionsValidTime = 600000
mempoolCleaningTime = 30000
}
ethereumSettings {
userAccount = ""
userPassword = ""
receiverAccount = ""
peerRPCAddress = ""
gasPrice = 39062500000
}
}
}
109 changes: 109 additions & 0 deletions src/main/scala/mvp2/actors/Anchorer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package mvp2.actors

import java.util.UUID.randomUUID
import akka.util.ByteString
import com.google.common.io.BaseEncoding
import io.circe.Json
import mvp2.http.{EthResponse, EthereumService}
import mvp2.utils.{EthRequestType, EthereumSettings}
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global

class Anchorer(ethereumSettings: EthereumSettings) extends CommonActor {

lazy val gasAmount = 30000
lazy val ethToTransfer = 0.01
private var lastEthBlockHash: String = _
private var unconfirmedQueue: List[UnconfirmedTransaction] = _

override def specialBehavior: Receive = {
case blockHash: ByteString =>
unconfirmedQueue = UnconfirmedTransaction(randomUUID().toString, blockHash, System.currentTimeMillis() / 1000L,
"", isUnlocked = false) :: unconfirmedQueue
sendUnlockAccount(unconfirmedQueue.head)
context.system.scheduler.scheduleOnce(30 minutes)(retryUnconfirmed())
case response: EthResponse => response.rtype match {
case EthRequestType.UNLOCKACC => if (getUnlockResult(response.responseBody))
unconfirmedQueue = unconfirmedQueue.filter(_.innerId == response.innerId).head.copy(isUnlocked = true) ::
unconfirmedQueue.filter(_.innerId != response.innerId)
sendEthereumTransaction(unconfirmedQueue.head)
case EthRequestType.SENDTX =>
unconfirmedQueue = unconfirmedQueue.filter(_.innerId == response.innerId)
.head.copy(transactionEthID = getTransactionId(response.responseBody)) ::
unconfirmedQueue.filter(_.innerId != response.innerId)
sendTransactionReceiptRequest(unconfirmedQueue.head)
case EthRequestType.GETRESULT => if (getTransactionReceipt(response.responseBody)._1) {
lastEthBlockHash = getTransactionReceipt(response.responseBody)._2
logger.info("transaction with block hash: " + ByteString.toString +
"written in Ethereum block: " + lastEthBlockHash)
unconfirmedQueue = unconfirmedQueue.filter(_.innerId != response.innerId)
}
}
}

def retryUnconfirmed() : Unit = {
unconfirmedQueue = unconfirmedQueue.sortBy(_.timeStamp)
unconfirmedQueue.foreach(e => sendUnlockAccount(e))
}

def sendEthereumTransaction(transaction: UnconfirmedTransaction): Unit = {
val price = ethereumSettings.gasPrice
val transactionJson: Json = Json.fromFields(List(
("jsonrpc", Json.fromDoubleOrNull(2.0)),
("method", Json.fromString("eth_sendTransaction")),
("params", Json.fromFields(List(
("from", Json.fromString(ethereumSettings.userAccount)),
("to", Json.fromString(ethereumSettings.receiverAccount)),
("value", Json.fromDoubleOrNull(ethToTransfer)),
("gas", Json.fromString(Integer.toHexString(gasAmount))),
("gasPrice", Json.fromString(f"$price%#x")),
("data", Json.fromString(encode2Base16(transaction.blockHash)))
))),
("id", Json.fromInt(1))
))
EthereumService
.sendRequestToEthereum(transaction.innerId, transactionJson, ethereumSettings.peerRPCAddress, EthRequestType.SENDTX)
}

def sendUnlockAccount(transaction: UnconfirmedTransaction): Unit = {
val jsonToUnlock = Json.fromFields(List(
("jsonrpc", Json.fromDoubleOrNull(2.0)),
("method", Json.fromString("personal_unlockAccount")),
("params", Json.fromValues(List(
Json.fromString(ethereumSettings.userAccount),
Json.fromString(ethereumSettings.userPassword),
Json.fromInt(600)
))),
("id", Json.fromInt(67))
))
EthereumService
.sendRequestToEthereum(transaction.innerId, jsonToUnlock, ethereumSettings.peerRPCAddress, EthRequestType.UNLOCKACC)
}

def sendTransactionReceiptRequest(transaction: UnconfirmedTransaction): Unit = {
val requestBody = Json.fromFields(List(
("jsonrpc", Json.fromDoubleOrNull(2.0)),
("method", Json.fromString("eth_getTransactionReceipt")),
("params", Json.fromValues(List(Json.fromString(transaction.transactionEthID)))),
("id", Json.fromInt(1))
))
EthereumService
.sendRequestToEthereum(transaction.innerId, requestBody, ethereumSettings.peerRPCAddress, EthRequestType.GETRESULT)
}

def getUnlockResult(json: Json): Boolean = json.hcursor.downField("result").as[Boolean].getOrElse(false)

def getTransactionReceipt(json: Json): (Boolean, String) =
(json.hcursor.downField("status").as[String].getOrElse("") == "0x1",
json.hcursor.downField("blockHash").as[String].getOrElse(""))

def getTransactionId(json: Json): String = json.hcursor.downField("result").as[String].getOrElse("")

def getLastEthBlockHash: String = lastEthBlockHash

def encode2Base16(bytes: ByteString): String = "0x" + BaseEncoding.base16().encode(bytes.toArray)

}

case class UnconfirmedTransaction(innerId: String, blockHash: ByteString, timeStamp: Long,
transactionEthID: String, isUnlocked: Boolean)
47 changes: 47 additions & 0 deletions src/main/scala/mvp2/http/EthereumService.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package mvp2.http

import java.util.concurrent.TimeUnit
import akka.actor.ActorSelection
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import com.typesafe.scalalogging.StrictLogging
import io.circe.Json
import io.circe.parser.parse
import mvp2.MVP2.system
import mvp2.utils.EthRequestType.EthRequestType
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success}

object EthereumService extends StrictLogging {
implicit val materializer: ActorMaterializer = ActorMaterializer()
val anchorer: ActorSelection = system.actorSelection("user/starter/anchorer")

def sendRequestToEthereum(innerId: String, requestBody: Json, peerRPCAddress: String, requestType: EthRequestType): Unit = {
val responseFuture: Future[HttpResponse] = Http().singleRequest(
HttpRequest(
method = HttpMethods.POST,
uri = peerRPCAddress,
entity = HttpEntity(ContentTypes.`application/json`, requestBody.toString)
))
responseFuture.onComplete{
case Success(response) => response.status match {
case StatusCodes.OK if response.entity.contentType == ContentTypes.`application/json` =>
Unmarshal(response.entity).to[String].onComplete {
case Success(s) => parse(s) match {
case Right(json) => anchorer ! EthResponse(innerId, requestType, json)
case Left(e) => logger.error("failed to parse json response from ethereum:" + e.getMessage)
}
case Failure(e) => logger.error("failed to Unmarshal response from ethereum:" + e.getMessage)
}
}
case Failure(e) => logger.error("failed to get response from ethereum:" + e.getMessage)
}
Await.result(responseFuture, Duration.apply(5, TimeUnit.SECONDS))
}
}

case class EthResponse(innerId: String, rtype: EthRequestType, responseBody: Json)
6 changes: 6 additions & 0 deletions src/main/scala/mvp2/utils/EthRequestType.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package mvp2.utils

object EthRequestType extends Enumeration {
type EthRequestType = Value
val UNLOCKACC, SENDTX, GETRESULT = Value
}
8 changes: 6 additions & 2 deletions src/main/scala/mvp2/utils/Settings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ case class Settings(port: Int,
ntp: NetworkTimeProviderSettings,
influx: InfluxSettings,
testingSettings: TestingSettings,
mempoolSetting: MempoolSetting
mempoolSetting: MempoolSetting,
ethereumSettings: EthereumSettings
)

case class Node(host: String, port: Int)
Expand All @@ -28,4 +29,7 @@ case class MempoolSetting(transactionsValidTime: Long, mempoolCleaningTime: Long

case class TestingSettings(messagesTime: Boolean, iteratorsSyncTime: Int)

case class NetworkSettings(maxBlockQtyInBlocksMessage: Int)
case class NetworkSettings(maxBlockQtyInBlocksMessage: Int)

case class EthereumSettings(userAccount: String, userPassword: String, receiverAccount: String,
peerRPCAddress: String, gasPrice: Long)