Skip to content

Commit

Permalink
chore: Upgrade WS client (blockscout#10407)
Browse files Browse the repository at this point in the history
* chore: Upgrade WS client

* Websocket client refactoring
  • Loading branch information
Qwerty5Uiop authored Aug 15, 2024
1 parent ec2e25b commit 8101bfc
Show file tree
Hide file tree
Showing 41 changed files with 379 additions and 182 deletions.
1 change: 0 additions & 1 deletion apps/block_scout_web/mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ defmodule BlockScoutWeb.Mixfile do
{:timex, "~> 3.7.1"},
{:wallaby, "~> 0.30", only: :test, runtime: false},
# `:cowboy` `~> 2.0` and Phoenix 1.4 compatibility
{:websocket_client, git: "https://github.com/blockscout/websocket_client.git", branch: "master", override: true},
{:ex_json_schema, "~> 0.10.1"},
{:ueberauth, "~> 0.7"},
{:ueberauth_auth0, "~> 2.0"},
Expand Down
4 changes: 2 additions & 2 deletions apps/ethereum_jsonrpc/lib/ethereum_jsonrpc/web_socket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ defmodule EthereumJSONRPC.WebSocket do
@behaviour Transport

@enforce_keys ~w(url web_socket)a
defstruct ~w(url web_socket web_socket_options)a
defstruct ~w(url fallback_url web_socket web_socket_options)a

@typedoc """
WebSocket name
Expand Down Expand Up @@ -43,7 +43,7 @@ defmodule EthereumJSONRPC.WebSocket do
Starts web socket attached to `url` with `options`.
"""
# Return is same as `t:GenServer.on_start/0`
@callback start_link([(url :: String.t()) | (options :: term())]) ::
@callback start_link(url :: String.t(), options :: term()) ::
{:ok, pid()} | :ignore | {:error, {:already_started, pid()} | (reason :: term())}

@doc """
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
defmodule EthereumJSONRPC.WebSocket.RetryWorker do
@moduledoc """
Stores the unavailable websocket endpoint state and periodically checks if it is already available.
"""

use GenServer

require Logger

alias EthereumJSONRPC.WebSocket.Supervisor, as: WebSocketSupervisor

def start_link(_) do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end

def activate(ws_state) do
GenServer.cast(__MODULE__, {:activate, ws_state})
end

def deactivate do
GenServer.cast(__MODULE__, :deactivate)
end

def init(_) do
schedule_next_retry()

{:ok, %{active?: false, ws_state: nil}}
end

def handle_cast({:activate, ws_state}, state) do
{:noreply, %{state | active?: true, ws_state: %{ws_state | retry: true}}}
end

def handle_cast(:deactivate, state) do
{:noreply, %{state | active?: false}}
end

def handle_info(:retry, %{active?: false} = state) do
schedule_next_retry()

{:noreply, state}
end

def handle_info(:retry, %{active?: true, ws_state: ws_state} = state) do
WebSocketSupervisor.start_client(ws_state)

schedule_next_retry()

{:noreply, %{state | active?: false}}
end

defp schedule_next_retry do
Process.send_after(self(), :retry, retry_interval())
end

defp retry_interval do
Application.get_env(:ethereum_jsonrpc, __MODULE__)[:retry_interval]
end
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
defmodule EthereumJSONRPC.WebSocket.Supervisor do
@moduledoc """
Supervises the processes related to `EthereumJSONRPC.WebSocket`.
"""

use Supervisor

alias EthereumJSONRPC.WebSocket.RetryWorker

def start_link(transport_options) do
Supervisor.start_link(__MODULE__, transport_options, name: __MODULE__)
end

def start_client(ws_state) do
subscribe_named_arguments =
Application.get_env(:indexer, :realtime_overrides)[:subscribe_named_arguments] ||
Application.get_env(:indexer, :subscribe_named_arguments)

web_socket_module =
subscribe_named_arguments
|> Keyword.fetch!(:transport_options)
|> Keyword.fetch!(:web_socket)

client_spec = client_spec(web_socket_module, Indexer.Block.Realtime.WebSocketCopy, ws_state.url, nil, ws_state)

Supervisor.start_child(__MODULE__, client_spec)
end

def stop_other_client(pid) do
__MODULE__
|> Supervisor.which_children()
|> Enum.reject(fn {child_id, child_pid, _type, _modules} -> child_pid == pid or child_id == RetryWorker end)
|> Enum.each(fn {child_id, _child_pid, _type, _modules} ->
Supervisor.terminate_child(__MODULE__, child_id)
Supervisor.delete_child(__MODULE__, child_id)
Process.unregister(Indexer.Block.Realtime.WebSocketCopy)
Process.register(pid, Indexer.Block.Realtime.WebSocket)
end)
end

def init(%{
url: url,
fallback_url: fallback_url,
web_socket: web_socket_module,
web_socket_options: %{web_socket: web_socket}
}) do
children = [
{RetryWorker, []},
client_spec(web_socket_module, web_socket, url, fallback_url)
]

Supervisor.init(children, strategy: :one_for_one)
end

defp client_spec(web_socket_module, name, url, fallback_url, init_state \\ nil) do
%{
id: name,
start: {
web_socket_module,
:start_link,
[url, [name: name, fallback_url: fallback_url, init_state: init_state]]
},
restart: :temporary
}
end
end
Loading

0 comments on commit 8101bfc

Please sign in to comment.