diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 4ba61ac..7831898 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -35,5 +35,12 @@ transactionsValidTime = 600000 mempoolCleaningTime = 30000 } + ethereumSettings { + userAccount = "" + userPassword = "" + receiverAccount = "" + peerRPCAddress = "" + gasPrice = 39062500000 + } } } \ No newline at end of file diff --git a/src/main/scala/mvp2/actors/Anchorer.scala b/src/main/scala/mvp2/actors/Anchorer.scala new file mode 100644 index 0000000..e92bd92 --- /dev/null +++ b/src/main/scala/mvp2/actors/Anchorer.scala @@ -0,0 +1,109 @@ +package mvp2.actors + +import java.util.UUID.randomUUID +import akka.util.ByteString +import com.google.common.io.BaseEncoding +import io.circe.Json +import mvp2.http.{EthResponse, EthereumService} +import mvp2.utils.{EthRequestType, EthereumSettings} +import scala.concurrent.duration._ +import scala.concurrent.ExecutionContext.Implicits.global + +class Anchorer(ethereumSettings: EthereumSettings) extends CommonActor { + + lazy val gasAmount = 30000 + lazy val ethToTransfer = 0.01 + private var lastEthBlockHash: String = _ + private var unconfirmedQueue: List[UnconfirmedTransaction] = _ + + override def specialBehavior: Receive = { + case blockHash: ByteString => + unconfirmedQueue = UnconfirmedTransaction(randomUUID().toString, blockHash, System.currentTimeMillis() / 1000L, + "", isUnlocked = false) :: unconfirmedQueue + sendUnlockAccount(unconfirmedQueue.head) + context.system.scheduler.scheduleOnce(30 minutes)(retryUnconfirmed()) + case response: EthResponse => response.rtype match { + case EthRequestType.UNLOCKACC => if (getUnlockResult(response.responseBody)) + unconfirmedQueue = unconfirmedQueue.filter(_.innerId == response.innerId).head.copy(isUnlocked = true) :: + unconfirmedQueue.filter(_.innerId != response.innerId) + sendEthereumTransaction(unconfirmedQueue.head) + case EthRequestType.SENDTX => + unconfirmedQueue = unconfirmedQueue.filter(_.innerId == response.innerId) + .head.copy(transactionEthID = getTransactionId(response.responseBody)) :: + unconfirmedQueue.filter(_.innerId != response.innerId) + sendTransactionReceiptRequest(unconfirmedQueue.head) + case EthRequestType.GETRESULT => if (getTransactionReceipt(response.responseBody)._1) { + lastEthBlockHash = getTransactionReceipt(response.responseBody)._2 + logger.info("transaction with block hash: " + ByteString.toString + + "written in Ethereum block: " + lastEthBlockHash) + unconfirmedQueue = unconfirmedQueue.filter(_.innerId != response.innerId) + } + } + } + + def retryUnconfirmed() : Unit = { + unconfirmedQueue = unconfirmedQueue.sortBy(_.timeStamp) + unconfirmedQueue.foreach(e => sendUnlockAccount(e)) + } + + def sendEthereumTransaction(transaction: UnconfirmedTransaction): Unit = { + val price = ethereumSettings.gasPrice + val transactionJson: Json = Json.fromFields(List( + ("jsonrpc", Json.fromDoubleOrNull(2.0)), + ("method", Json.fromString("eth_sendTransaction")), + ("params", Json.fromFields(List( + ("from", Json.fromString(ethereumSettings.userAccount)), + ("to", Json.fromString(ethereumSettings.receiverAccount)), + ("value", Json.fromDoubleOrNull(ethToTransfer)), + ("gas", Json.fromString(Integer.toHexString(gasAmount))), + ("gasPrice", Json.fromString(f"$price%#x")), + ("data", Json.fromString(encode2Base16(transaction.blockHash))) + ))), + ("id", Json.fromInt(1)) + )) + EthereumService + .sendRequestToEthereum(transaction.innerId, transactionJson, ethereumSettings.peerRPCAddress, EthRequestType.SENDTX) + } + + def sendUnlockAccount(transaction: UnconfirmedTransaction): Unit = { + val jsonToUnlock = Json.fromFields(List( + ("jsonrpc", Json.fromDoubleOrNull(2.0)), + ("method", Json.fromString("personal_unlockAccount")), + ("params", Json.fromValues(List( + Json.fromString(ethereumSettings.userAccount), + Json.fromString(ethereumSettings.userPassword), + Json.fromInt(600) + ))), + ("id", Json.fromInt(67)) + )) + EthereumService + .sendRequestToEthereum(transaction.innerId, jsonToUnlock, ethereumSettings.peerRPCAddress, EthRequestType.UNLOCKACC) + } + + def sendTransactionReceiptRequest(transaction: UnconfirmedTransaction): Unit = { + val requestBody = Json.fromFields(List( + ("jsonrpc", Json.fromDoubleOrNull(2.0)), + ("method", Json.fromString("eth_getTransactionReceipt")), + ("params", Json.fromValues(List(Json.fromString(transaction.transactionEthID)))), + ("id", Json.fromInt(1)) + )) + EthereumService + .sendRequestToEthereum(transaction.innerId, requestBody, ethereumSettings.peerRPCAddress, EthRequestType.GETRESULT) + } + + def getUnlockResult(json: Json): Boolean = json.hcursor.downField("result").as[Boolean].getOrElse(false) + + def getTransactionReceipt(json: Json): (Boolean, String) = + (json.hcursor.downField("status").as[String].getOrElse("") == "0x1", + json.hcursor.downField("blockHash").as[String].getOrElse("")) + + def getTransactionId(json: Json): String = json.hcursor.downField("result").as[String].getOrElse("") + + def getLastEthBlockHash: String = lastEthBlockHash + + def encode2Base16(bytes: ByteString): String = "0x" + BaseEncoding.base16().encode(bytes.toArray) + +} + +case class UnconfirmedTransaction(innerId: String, blockHash: ByteString, timeStamp: Long, + transactionEthID: String, isUnlocked: Boolean) \ No newline at end of file diff --git a/src/main/scala/mvp2/http/EthereumService.scala b/src/main/scala/mvp2/http/EthereumService.scala new file mode 100644 index 0000000..f7f2d4f --- /dev/null +++ b/src/main/scala/mvp2/http/EthereumService.scala @@ -0,0 +1,47 @@ +package mvp2.http + +import java.util.concurrent.TimeUnit +import akka.actor.ActorSelection +import akka.http.scaladsl.Http +import akka.http.scaladsl.model._ +import akka.http.scaladsl.unmarshalling.Unmarshal +import akka.stream.ActorMaterializer +import com.typesafe.scalalogging.StrictLogging +import io.circe.Json +import io.circe.parser.parse +import mvp2.MVP2.system +import mvp2.utils.EthRequestType.EthRequestType +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future} +import scala.util.{Failure, Success} + +object EthereumService extends StrictLogging { + implicit val materializer: ActorMaterializer = ActorMaterializer() + val anchorer: ActorSelection = system.actorSelection("user/starter/anchorer") + + def sendRequestToEthereum(innerId: String, requestBody: Json, peerRPCAddress: String, requestType: EthRequestType): Unit = { + val responseFuture: Future[HttpResponse] = Http().singleRequest( + HttpRequest( + method = HttpMethods.POST, + uri = peerRPCAddress, + entity = HttpEntity(ContentTypes.`application/json`, requestBody.toString) + )) + responseFuture.onComplete{ + case Success(response) => response.status match { + case StatusCodes.OK if response.entity.contentType == ContentTypes.`application/json` => + Unmarshal(response.entity).to[String].onComplete { + case Success(s) => parse(s) match { + case Right(json) => anchorer ! EthResponse(innerId, requestType, json) + case Left(e) => logger.error("failed to parse json response from ethereum:" + e.getMessage) + } + case Failure(e) => logger.error("failed to Unmarshal response from ethereum:" + e.getMessage) + } + } + case Failure(e) => logger.error("failed to get response from ethereum:" + e.getMessage) + } + Await.result(responseFuture, Duration.apply(5, TimeUnit.SECONDS)) + } +} + +case class EthResponse(innerId: String, rtype: EthRequestType, responseBody: Json) diff --git a/src/main/scala/mvp2/utils/EthRequestType.scala b/src/main/scala/mvp2/utils/EthRequestType.scala new file mode 100644 index 0000000..14b3fe7 --- /dev/null +++ b/src/main/scala/mvp2/utils/EthRequestType.scala @@ -0,0 +1,6 @@ +package mvp2.utils + +object EthRequestType extends Enumeration { + type EthRequestType = Value + val UNLOCKACC, SENDTX, GETRESULT = Value +} diff --git a/src/main/scala/mvp2/utils/Settings.scala b/src/main/scala/mvp2/utils/Settings.scala index d8c72a9..885eda2 100644 --- a/src/main/scala/mvp2/utils/Settings.scala +++ b/src/main/scala/mvp2/utils/Settings.scala @@ -13,7 +13,8 @@ case class Settings(port: Int, ntp: NetworkTimeProviderSettings, influx: InfluxSettings, testingSettings: TestingSettings, - mempoolSetting: MempoolSetting + mempoolSetting: MempoolSetting, + ethereumSettings: EthereumSettings ) case class Node(host: String, port: Int) @@ -28,4 +29,7 @@ case class MempoolSetting(transactionsValidTime: Long, mempoolCleaningTime: Long case class TestingSettings(messagesTime: Boolean, iteratorsSyncTime: Int) -case class NetworkSettings(maxBlockQtyInBlocksMessage: Int) \ No newline at end of file +case class NetworkSettings(maxBlockQtyInBlocksMessage: Int) + +case class EthereumSettings(userAccount: String, userPassword: String, receiverAccount: String, + peerRPCAddress: String, gasPrice: Long) \ No newline at end of file