Skip to content

Commit

Permalink
Land blockscout#255: Index pending transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
KronicDeth authored Jun 7, 2018
2 parents fa455e3 + 58782ea commit 5f2e845
Show file tree
Hide file tree
Showing 17 changed files with 449 additions and 78 deletions.
22 changes: 13 additions & 9 deletions apps/ethereum_jsonrpc/lib/ethereum_jsonrpc.ex
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,19 @@ defmodule EthereumJSONRPC do
hexadecimal_to_integer(quantity)
end

@doc """
A request payload for a JSONRPC.
"""
@spec request(%{id: term, method: String.t(), params: list()}) :: %{String.t() => term}
def request(%{id: id, method: method, params: params}) do
%{
"id" => id,
"jsonrpc" => "2.0",
"method" => method,
"params" => params
}
end

@doc """
Converts `t:timestamp/0` to `t:DateTime.t/0`
"""
Expand Down Expand Up @@ -254,15 +267,6 @@ defmodule EthereumJSONRPC do
get_block_by_number_request(%{id: tag, tag: tag, transactions: :hashes})
end

defp request(%{id: id, method: method, params: params}) do
%{
"id" => id,
"jsonrpc" => "2.0",
"method" => method,
"params" => params
}
end

defp get_block_by_number_params(options) do
[get_block_by_number_subject(options), get_block_transactions(options)]
end
Expand Down
35 changes: 26 additions & 9 deletions apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/parity.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ defmodule EthereumJSONRPC.Parity do
Ethereum JSONRPC methods that are only supported by [Parity](https://wiki.parity.io/).
"""

import EthereumJSONRPC, only: [config: 1, json_rpc: 2]
import EthereumJSONRPC, only: [config: 1, json_rpc: 2, request: 1]

alias EthereumJSONRPC.Parity.Traces
alias EthereumJSONRPC.{Transaction, Transactions}

@doc """
Fetches the `t:Explorer.Chain.InternalTransaction.changeset/2` params from the Parity trace URL.
Expand Down Expand Up @@ -34,7 +35,7 @@ defmodule EthereumJSONRPC.Parity do
def fetch_internal_transactions(transaction_hashes) when is_list(transaction_hashes) do
with {:ok, responses} <-
transaction_hashes
|> Enum.map(&transaction_hash_to_internal_transaction_json/1)
|> Enum.map(&transaction_hash_to_internal_transaction_request/1)
|> json_rpc(config(:trace_url)) do
internal_transactions_params =
responses
Expand All @@ -46,6 +47,27 @@ defmodule EthereumJSONRPC.Parity do
end
end

@doc """
Fetches the pending transactions from the Parity node.
*NOTE*: The pending transactions are local to the node that is contacted and may not be consistent across nodes based
on the transactions that each node has seen and how each node prioritizes collating transactions into the next block.
"""
@spec fetch_pending_transactions() :: {:ok, [Transaction.params()]} | {:error, reason :: term}
def fetch_pending_transactions do
with {:ok, transactions} <-
%{id: 1, method: "parity_pendingTransactions", params: []}
|> request()
|> json_rpc(config(:url)) do
transactions_params =
transactions
|> Transactions.to_elixir()
|> Transactions.elixir_to_params()

{:ok, transactions_params}
end
end

defp response_to_trace(%{"id" => transaction_hash, "result" => %{"trace" => traces}}) when is_list(traces) do
traces
|> Stream.with_index()
Expand All @@ -58,12 +80,7 @@ defmodule EthereumJSONRPC.Parity do
Enum.flat_map(responses, &response_to_trace/1)
end

defp transaction_hash_to_internal_transaction_json(transaction_hash) do
%{
"id" => transaction_hash,
"jsonrpc" => "2.0",
"method" => "trace_replayTransaction",
"params" => [transaction_hash, ["trace"]]
}
defp transaction_hash_to_internal_transaction_request(transaction_hash) do
request(%{id: transaction_hash, method: "trace_replayTransaction", params: [transaction_hash, ["trace"]]})
end
end
65 changes: 63 additions & 2 deletions apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/transaction.ex
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,57 @@ defmodule EthereumJSONRPC.Transaction do

@doc """
Decodes the stringly typed numerical fields to `t:non_neg_integer/0`.
Pending transactions have a `nil` `"blockHash"`, `"blockNumber"`, and `"transactionIndex"` because those fields are
related to the block the transaction is collated in.
iex> EthereumJSONRPC.Transaction.to_elixir(
...> %{
...> "blockHash" => nil,
...> "blockNumber" => nil,
...> "chainId" => "0x4d",
...> "condition" => nil,
...> "creates" => nil,
...> "from" => "0x40aa34fb35ef0804a41c2b4be7d3e3d65c7f6d5c",
...> "gas" => "0xcf08",
...> "gasPrice" => "0x0",
...> "hash" => "0x6b80a90c958fb5791a070929379ed6eb7a33ecdf9f9cafcada2f6803b3f25ec3",
...> "input" => "0x",
...> "nonce" => "0x77",
...> "publicKey" => "0xd0bf6fb4ce4ada1ddfb754b98cd89dc61c3ff143a260cf1712517af2af602b699aab554a2532051e5ba205eb41068c3423f23acde87313211750a8cbf862170e",
...> "r" => "0x3cfc2a34c2e4e09913934a5ade1055206e39b1e34fabcfcc820f6f70c740944c",
...> "raw" => "0xf868778082cf08948e854802d695269a6f1f3fcabb2111d2f5a0e6f9880de0b6b3a76400008081bea03cfc2a34c2e4e09913934a5ade1055206e39b1e34fabcfcc820f6f70c740944ca014cf6f15b5855f9b68eb58c95f76603a54b2ca612f921bb8d424de11bf085390",
...> "s" => "0x14cf6f15b5855f9b68eb58c95f76603a54b2ca612f921bb8d424de11bf085390",
...> "standardV" => "0x1",
...> "to" => "0x8e854802d695269a6f1f3fcabb2111d2f5a0e6f9",
...> "transactionIndex" => nil,
...> "v" => "0xbe",
...> "value" => "0xde0b6b3a7640000"
...> }
...> )
%{
"blockHash" => nil,
"blockNumber" => nil,
"chainId" => 77,
"condition" => nil,
"creates" => nil,
"from" => "0x40aa34fb35ef0804a41c2b4be7d3e3d65c7f6d5c",
"gas" => 53000,
"gasPrice" => 0,
"hash" => "0x6b80a90c958fb5791a070929379ed6eb7a33ecdf9f9cafcada2f6803b3f25ec3",
"input" => "0x",
"nonce" => 119,
"publicKey" => "0xd0bf6fb4ce4ada1ddfb754b98cd89dc61c3ff143a260cf1712517af2af602b699aab554a2532051e5ba205eb41068c3423f23acde87313211750a8cbf862170e",
"r" => 27584307671108667307432650922507113611469948945973084068788107666229588694092,
"raw" => "0xf868778082cf08948e854802d695269a6f1f3fcabb2111d2f5a0e6f9880de0b6b3a76400008081bea03cfc2a34c2e4e09913934a5ade1055206e39b1e34fabcfcc820f6f70c740944ca014cf6f15b5855f9b68eb58c95f76603a54b2ca612f921bb8d424de11bf085390",
"s" => 9412760993194218539611435541875082818858943210434840876051960418568625476496,
"standardV" => 1,
"to" => "0x8e854802d695269a6f1f3fcabb2111d2f5a0e6f9",
"transactionIndex" => nil,
"v" => 190,
"value" => 1000000000000000000
}
"""
def to_elixir(transaction) when is_map(transaction) do
Enum.into(transaction, %{}, &entry_to_elixir/1)
Expand All @@ -144,11 +195,21 @@ defmodule EthereumJSONRPC.Transaction do
when key in ~w(blockHash condition creates from hash input jsonrpc publicKey raw to),
do: {key, value}

defp entry_to_elixir({key, quantity})
when key in ~w(blockNumber gas gasPrice nonce r s standardV transactionIndex v value) do
defp entry_to_elixir({key, quantity}) when key in ~w(gas gasPrice nonce r s standardV v value) and quantity != nil do
{key, quantity_to_integer(quantity)}
end

# quantity or nil for pending
defp entry_to_elixir({key, quantity_or_nil}) when key in ~w(blockNumber transactionIndex) do
elixir =
case quantity_or_nil do
nil -> nil
quantity -> quantity_to_integer(quantity)
end

{key, elixir}
end

# chainId is *sometimes* nil
defp entry_to_elixir({"chainId" = key, chainId}) do
case chainId do
Expand Down
83 changes: 70 additions & 13 deletions apps/explorer/lib/explorer/chain.ex
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,19 @@ defmodule Explorer.Chain do
@typep direction_option :: {:direction, direction}
@typep inserted_after_option :: {:inserted_after, DateTime.t()}
@typep necessity_by_association_option :: {:necessity_by_association, necessity_by_association}
@typep on_conflict_option :: {:on_conflict, :nothing | :replace_all}
@typep pagination_option :: {:pagination, pagination}
@typep paging_options :: {:paging_options, PagingOptions.t()}
@typep params_option :: {:params, map()}
@typep params_option :: {:params, [map()]}
@typep timeout_option :: {:timeout, timeout}
@typep timestamps :: %{inserted_at: DateTime.t(), updated_at: DateTime.t()}
@typep timestamps_option :: {:timestamps, timestamps}
@typep addresses_option :: {:adddresses, [params_option | timeout_option]}
@typep addresses_option :: {:addresses, [params_option | timeout_option]}
@typep blocks_option :: {:blocks, [params_option | timeout_option]}
@typep internal_transactions_option :: {:internal_transactions, [params_option | timeout_option]}
@typep logs_option :: {:logs, [params_option | timeout_option]}
@typep receipts_option :: {:receipts, [params_option | timeout_option]}
@typep transactions_option :: {:transactions, [params_option | timeout_option]}
@typep transactions_option :: {:transactions, [on_conflict_option | params_option | timeout_option]}

@doc """
`t:Explorer.Chain.InternalTransaction/0`s from `address`.
Expand Down Expand Up @@ -650,6 +651,7 @@ defmodule Explorer.Chain do
...> ],
...> ],
...> transactions: [
...> on_conflict: :replace_all,
...> params: [
...> %{
...> block_hash: "0xf6b4b8c88df3ebd252ec476328334dc026cf66606a84fb769b3d3cbccc8471bd",
Expand Down Expand Up @@ -874,6 +876,13 @@ defmodule Explorer.Chain do
* `:timeout` - the timeout for the whole `c:Ecto.Repo.transaction/0` call. Defaults to `#{@transaction_timeout}`
milliseconds.
* `:transactions`
* `:on_conflict` - Whether to do `:nothing` or `:replace_all` columns when there is a pre-existing transaction
with the same hash.
*NOTE*: Because the repository transaction for a pending `Explorer.Chain.Transaction`s could `COMMIT` after the
repository transaction for that same transaction being collated into a block, writers, it is recomended to use
`:nothing` for pending transactions and `:replace_all` for collated transactions, so that collated transactions
win.
* `:params` - `list` of params for `Explorer.Chain.Transaction.changeset/2`.
* `:timeout` - the timeout for inserting all transactions found in the params lists across all
types. Defaults to `#{@insert_transactions_timeout}` milliseconds.
Expand Down Expand Up @@ -1109,7 +1118,34 @@ defmodule Explorer.Chain do
end

@doc """
Returns a stream of all transactions with unfetched internal transactions.
Returns a stream of all collated transactions with unfetched internal transactions.
Only transactions that have been collated into a block are returned; pending transactions not in a block are filtered
out.
iex> pending = insert(:transaction)
iex> unfetched_collated =
...> :transaction |>
...> insert() |>
...> with_block()
iex> fetched_collated =
...> :transaction |>
...> insert() |>
...> with_block(internal_transactions_indexed_at: DateTime.utc_now())
iex> {:ok, hash_set} = Explorer.Chain.stream_transactions_with_unfetched_internal_transactions(
...> [:hash],
...> MapSet.new(),
...> fn %Explorer.Chain.Transaction{hash: hash}, acc ->
...> MapSet.put(acc, hash)
...> end
...> )
iex> pending.hash in hash_set
false
iex> unfetched_collated.hash in hash_set
true
iex> fetched_collated.hash in hash_set
false
"""
@spec stream_transactions_with_unfetched_internal_transactions(
fields :: [
Expand Down Expand Up @@ -1137,7 +1173,13 @@ defmodule Explorer.Chain do
def stream_transactions_with_unfetched_internal_transactions(fields, initial, reducer) when is_function(reducer, 2) do
Repo.transaction(
fn ->
query = from(t in Transaction, where: is_nil(t.internal_transactions_indexed_at), select: ^fields)
query =
from(
t in Transaction,
# exclude pending transactions
where: not is_nil(t.block_hash) and is_nil(t.internal_transactions_indexed_at),
select: ^fields
)

query
|> Repo.stream(timeout: :infinity)
Expand Down Expand Up @@ -1845,12 +1887,13 @@ defmodule Explorer.Chain do
{:ok, inserted}
end

@spec insert_transactions([map()], [timeout_option | timestamps_option]) ::
@spec insert_transactions([map()], [on_conflict_option | timeout_option | timestamps_option]) ::
{:ok, [Hash.t()]} | {:error, [Changeset.t()]}
defp insert_transactions(changes_list, named_arguments)
when is_list(changes_list) and is_list(named_arguments) do
timestamps = Keyword.fetch!(named_arguments, :timestamps)
timeout = Keyword.fetch!(named_arguments, :timeout)
on_conflict = Keyword.fetch!(named_arguments, :on_conflict)

# order so that row ShareLocks are grabbed in a consistent order
ordered_changes_list = Enum.sort_by(changes_list, & &1.hash)
Expand All @@ -1859,7 +1902,7 @@ defmodule Explorer.Chain do
insert_changes_list(
ordered_changes_list,
conflict_target: :hash,
on_conflict: :replace_all,
on_conflict: on_conflict,
for: Transaction,
returning: [:hash],
timeout: timeout,
Expand Down Expand Up @@ -1914,11 +1957,13 @@ defmodule Explorer.Chain do
when is_map(ecto_schema_module_to_changes_list) and is_list(options) do
case ecto_schema_module_to_changes_list do
%{Address => addresses_changes} ->
timestamps = Keyword.fetch!(options, :timestamps)

Multi.run(multi, :addresses, fn _ ->
insert_addresses(
addresses_changes,
timeout: options[:addresses][:timeout] || @insert_addresses_timeout,
timestamps: Keyword.fetch!(options, :timestamps)
timestamps: timestamps
)
end)

Expand All @@ -1931,11 +1976,13 @@ defmodule Explorer.Chain do
when is_map(ecto_schema_module_to_changes_list) and is_list(options) do
case ecto_schema_module_to_changes_list do
%{Block => blocks_changes} ->
timestamps = Keyword.fetch!(options, :timestamps)

Multi.run(multi, :blocks, fn _ ->
insert_blocks(
blocks_changes,
timeout: options[:blocks][:timeout] || @insert_blocks_timeout,
timestamps: Keyword.fetch!(options, :timestamps)
timestamps: timestamps
)
end)

Expand All @@ -1948,11 +1995,17 @@ defmodule Explorer.Chain do
when is_map(ecto_schema_module_to_changes_list) and is_list(options) do
case ecto_schema_module_to_changes_list do
%{Transaction => transactions_changes} ->
# check required options as early as possible
transactions_options = Keyword.fetch!(options, :transactions)
on_conflict = Keyword.fetch!(transactions_options, :on_conflict)
timestamps = Keyword.fetch!(options, :timestamps)

Multi.run(multi, :transactions, fn _ ->
insert_transactions(
transactions_changes,
timeout: options[:transations][:timeout] || @insert_transactions_timeout,
timestamps: Keyword.fetch!(options, :timestamps)
on_conflict: on_conflict,
timeout: transactions_options[:timeout] || @insert_transactions_timeout,
timestamps: timestamps
)
end)

Expand All @@ -1965,11 +2018,13 @@ defmodule Explorer.Chain do
when is_map(ecto_schema_module_to_changes_list) and is_list(options) do
case ecto_schema_module_to_changes_list do
%{InternalTransaction => internal_transactions_changes} ->
timestamps = Keyword.fetch!(options, :timestamps)

Multi.run(multi, :internal_transactions, fn _ ->
insert_internal_transactions(
internal_transactions_changes,
timeout: options[:internal_transactions][:timeout] || @insert_internal_transactions_timeout,
timestamps: Keyword.fetch!(options, :timestamps)
timestamps: timestamps
)
end)

Expand All @@ -1982,11 +2037,13 @@ defmodule Explorer.Chain do
when is_map(ecto_schema_module_to_changes_list) and is_list(options) do
case ecto_schema_module_to_changes_list do
%{Log => logs_changes} ->
timestamps = Keyword.fetch!(options, :timestamps)

Multi.run(multi, :logs, fn _ ->
insert_logs(
logs_changes,
timeout: options[:logs][:timeout] || @insert_logs_timeout,
timestamps: Keyword.fetch!(options, :timestamps)
timestamps: timestamps
)
end)

Expand Down
4 changes: 2 additions & 2 deletions apps/explorer/lib/explorer/chain/transaction.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ defmodule Explorer.Chain.Transaction do
alias Explorer.Chain.{Address, Block, Data, Gas, Hash, InternalTransaction, Log, Wei}
alias Explorer.Chain.Transaction.Status

@optional_attrs ~w(block_hash block_number cumulative_gas_used from_address_hash gas_used index status
to_address_hash)a
@optional_attrs ~w(block_hash block_number cumulative_gas_used from_address_hash gas_used index
internal_transactions_indexed_at status to_address_hash)a
@required_attrs ~w(gas gas_price hash input nonce public_key r s standard_v v value)a

@typedoc """
Expand Down
Loading

0 comments on commit 5f2e845

Please sign in to comment.