Skip to content

Commit d5aca2a

Browse files
committed
Add close/1
1 parent 7552d06 commit d5aca2a

File tree

4 files changed

+172
-25
lines changed

4 files changed

+172
-25
lines changed

lib/ex_ice/ice_agent.ex

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ defmodule ExICE.ICEAgent do
3030
For exact meaning refer to the W3C WebRTC standard, sec. 5.6.4.
3131
"""
3232
@type connection_state_change() ::
33-
{:connection_state_change, :checking | :connected | :completed | :failed}
33+
{:connection_state_change, :checking | :connected | :completed | :failed | :closed}
3434

3535
@typedoc """
3636
Messages sent by the ExICE.
@@ -277,7 +277,21 @@ defmodule ExICE.ICEAgent do
277277
end
278278

279279
@doc """
280-
Stops ICE agent and all of its sockets.
280+
Irreversibly closes ICE agent and all of its sockets but does not terminate its process.
281+
282+
The only practical thing that you can do after closing an agent,
283+
is to get its stats using `get_stats/1`.
284+
Most of the other functions have no effect.
285+
286+
To terminate ICE agent process, see `stop/1`.
287+
"""
288+
@spec close(pid()) :: :ok
289+
def close(ice_agent) do
290+
GenServer.call(ice_agent, :close)
291+
end
292+
293+
@doc """
294+
Stops ICE agent process.
281295
"""
282296
@spec stop(pid()) :: :ok
283297
def stop(ice_agent) do
@@ -346,6 +360,12 @@ defmodule ExICE.ICEAgent do
346360
{:reply, stats, state}
347361
end
348362

363+
@impl true
364+
def handle_call(:close, _from, state) do
365+
ice_agent = ExICE.Priv.ICEAgent.close(state.ice_agent)
366+
{:reply, :ok, %{state | ice_agent: ice_agent}}
367+
end
368+
349369
@impl true
350370
def handle_cast({:set_role, role}, state) do
351371
ice_agent = ExICE.Priv.ICEAgent.set_role(state.ice_agent, role)

lib/ex_ice/priv/ice_agent.ex

Lines changed: 107 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,11 @@ defmodule ExICE.Priv.ICEAgent do
240240
end
241241

242242
@spec set_role(t(), ExICE.ICEAgent.role()) :: t()
243+
def set_role(%__MODULE__{state: :closed} = ice_agent, _) do
244+
Logger.debug("Tried to set role in closed state. Ignoring.")
245+
ice_agent
246+
end
247+
243248
def set_role(%__MODULE__{role: nil} = ice_agent, role) do
244249
%__MODULE__{ice_agent | role: role}
245250
end
@@ -250,8 +255,9 @@ defmodule ExICE.Priv.ICEAgent do
250255
end
251256

252257
@spec set_remote_credentials(t(), binary(), binary()) :: t()
253-
def set_remote_credentials(%__MODULE__{state: :failed} = ice_agent, _, _) do
254-
Logger.debug("Tried to set remote credentials in failed state. ICE restart needed. Ignoring.")
258+
def set_remote_credentials(%__MODULE__{state: state} = ice_agent, _, _)
259+
when state in [:failed, :closed] do
260+
Logger.debug("Tried to set remote credentials in state #{state}. Ignoring.")
255261
ice_agent
256262
end
257263

@@ -286,8 +292,8 @@ defmodule ExICE.Priv.ICEAgent do
286292
end
287293

288294
@spec gather_candidates(t()) :: t()
289-
def gather_candidates(%__MODULE__{state: :failed} = ice_agent) do
290-
Logger.warning("Can't gather candidates in state failed. ICE restart needed. Ignoring.")
295+
def gather_candidates(%__MODULE__{state: state} = ice_agent) when state in [:failed, :closed] do
296+
Logger.warning("Can't gather candidates in state #{state}. Ignoring.")
291297
ice_agent
292298
end
293299

@@ -364,9 +370,10 @@ defmodule ExICE.Priv.ICEAgent do
364370
end
365371

366372
@spec add_remote_candidate(t(), Candidate.t()) :: t()
367-
def add_remote_candidate(%__MODULE__{state: :failed} = ice_agent, _) do
373+
def add_remote_candidate(%__MODULE__{state: state} = ice_agent, _)
374+
when state in [:failed, :closed] do
368375
# Completed state will be caught by the next clause
369-
Logger.debug("Can't add remote candidate in state failed. ICE restart needed. Ignoring.")
376+
Logger.debug("Can't add remote candidate in state #{state}. Ignoring.")
370377
ice_agent
371378
end
372379

@@ -455,7 +462,7 @@ defmodule ExICE.Priv.ICEAgent do
455462
end
456463

457464
@spec end_of_candidates(t()) :: t()
458-
def end_of_candidates(%__MODULE__{state: :failed} = ice_agent) do
465+
def end_of_candidates(%__MODULE__{state: state} = ice_agent) when state in [:failed, :closed] do
459466
Logger.debug("Can't set end-of-candidates flag in state failed. Ignoring.")
460467
ice_agent
461468
end
@@ -537,12 +544,22 @@ defmodule ExICE.Priv.ICEAgent do
537544
end
538545

539546
@spec restart(t()) :: t()
547+
def restart(%__MODULE__{state: :closed} = ice_agent) do
548+
Logger.debug("Can't restart ICE in state closed. Ignoring.")
549+
ice_agent
550+
end
551+
540552
def restart(ice_agent) do
541553
Logger.debug("Restarting ICE")
542554
do_restart(ice_agent)
543555
end
544556

545557
@spec handle_ta_timeout(t()) :: t()
558+
def handle_ta_timeout(%__MODULE__{state: :closed} = ice_agent) do
559+
Logger.debug("Ta timer fired in closed state. Ignoring.")
560+
ice_agent
561+
end
562+
546563
def handle_ta_timeout(%__MODULE__{state: state} = ice_agent)
547564
when state in [:completed, :failed] do
548565
Logger.warning("""
@@ -694,6 +711,11 @@ defmodule ExICE.Priv.ICEAgent do
694711
end
695712

696713
@spec handle_tr_rtx_timeout(t(), integer()) :: t()
714+
def handle_tr_rtx_timeout(%__MODULE__{state: :closed} = ice_agent, _) do
715+
Logger.debug("Transaction rtx timer fired in state closed. Ignoring.")
716+
ice_agent
717+
end
718+
697719
def handle_tr_rtx_timeout(ice_agent, t_id) when is_map_key(ice_agent.conn_checks, t_id) do
698720
# Mark transaction id as ready to be retransmitted.
699721
# We will do this in handle_ta_timeout as it has to be paced.
@@ -725,8 +747,9 @@ defmodule ExICE.Priv.ICEAgent do
725747
end
726748

727749
@spec handle_eoc_timeout(t()) :: t()
728-
def handle_eoc_timeout(%__MODULE__{state: :failed} = ice_agent) do
729-
Logger.debug("EOC timer fired but we are in the failed state. Ignoring.")
750+
def handle_eoc_timeout(%__MODULE__{state: state} = ice_agent)
751+
when state in [:failed, :closed] do
752+
Logger.debug("EOC timer fired but we are in the #{state} state. Ignoring.")
730753
%{ice_agent | eoc_timer: nil}
731754
end
732755

@@ -742,6 +765,11 @@ defmodule ExICE.Priv.ICEAgent do
742765
end
743766

744767
@spec handle_pair_timeout(t()) :: t()
768+
def handle_pair_timeout(%__MODULE__{state: :closed} = ice_agent) do
769+
Logger.debug("Pair timer fired in closed state. Ignoring.")
770+
ice_agent
771+
end
772+
745773
def handle_pair_timeout(ice_agent) do
746774
start_pair_timer()
747775

@@ -792,6 +820,11 @@ defmodule ExICE.Priv.ICEAgent do
792820
end
793821

794822
@spec handle_keepalive_timeout(t(), integer()) :: t()
823+
def handle_keepalive_timeout(%__MODULE__{state: :closed} = ice_agent, _) do
824+
Logger.debug("Keepalive timer fired in closed state. Ignoring.")
825+
ice_agent
826+
end
827+
795828
def handle_keepalive_timeout(%__MODULE__{selected_pair_id: id} = ice_agent, id) do
796829
# if pair was selected, send keepalives only on that pair
797830
s_pair = Map.fetch!(ice_agent.checklist, id)
@@ -842,7 +875,8 @@ defmodule ExICE.Priv.ICEAgent do
842875
:inet.port_number(),
843876
binary()
844877
) :: t()
845-
def handle_udp(%{state: :failed} = ice_agent, _socket, _src_ip, _src_port, _packet) do
878+
def handle_udp(%{state: state} = ice_agent, _socket, _src_ip, _src_port, _packet)
879+
when state in [:failed, :closed] do
846880
ice_agent
847881
end
848882

@@ -868,6 +902,11 @@ defmodule ExICE.Priv.ICEAgent do
868902
end
869903

870904
@spec handle_ex_turn_msg(t(), reference(), ExTURN.Client.notification_message()) :: t()
905+
def handle_ex_turn_msg(%__MODULE__{state: :closed} = ice_agent, _, _) do
906+
Logger.debug("Received ex_turn message in closed state. Ignoring.")
907+
ice_agent
908+
end
909+
871910
def handle_ex_turn_msg(ice_agent, client_ref, msg) do
872911
tr_id_tr = find_gathering_transaction(ice_agent.gathering_transactions, client_ref)
873912

@@ -919,6 +958,18 @@ defmodule ExICE.Priv.ICEAgent do
919958
end
920959
end
921960

961+
@spec close(t()) :: t()
962+
def close(%__MODULE__{state: :closed} = ice_agent) do
963+
ice_agent
964+
end
965+
966+
def close(%__MODULE__{} = ice_agent) do
967+
ice_agent.sockets
968+
|> Enum.reduce(ice_agent, fn socket, ice_agent -> close_socket(ice_agent, socket) end)
969+
|> change_gathering_state(:complete, notify: false)
970+
|> change_connection_state(:closed, notify: false)
971+
end
972+
922973
## PRIV API
923974

924975
defp create_srflx_gathering_transactions(stun_servers, sockets) do
@@ -2290,6 +2341,17 @@ defmodule ExICE.Priv.ICEAgent do
22902341
end
22912342

22922343
defp close_socket(ice_agent, socket) do
2344+
# Use sockname/1 to determine if a socket is still open.
2345+
# Alternatively, we could create a callback for `:inet.info/1`,
2346+
# but it's return type is not standardized - sometimes it's %{states: [:closed]},
2347+
# some other time %{rstates: [:closed], wstates: [:closed]}.
2348+
case ice_agent.transport_module.sockname(socket) do
2349+
{:error, :closed} -> ice_agent
2350+
_ -> do_close_socket(ice_agent, socket)
2351+
end
2352+
end
2353+
2354+
defp do_close_socket(ice_agent, socket) do
22932355
Logger.debug("Closing socket: #{inspect(socket)}")
22942356

22952357
ice_agent =
@@ -2308,9 +2370,21 @@ defmodule ExICE.Priv.ICEAgent do
23082370

23092371
tr_rtx = ice_agent.tr_rtx -- Map.keys(removed_gathering_transactions)
23102372

2373+
:ok = ice_agent.transport_module.close(socket)
2374+
:ok = flush_socket_msg(socket)
2375+
23112376
%{ice_agent | tr_rtx: tr_rtx, gathering_transactions: gathering_transactions}
23122377
end
23132378

2379+
defp flush_socket_msg(socket) do
2380+
receive do
2381+
{:udp, ^socket, _src_ip, _src_port, _packet} ->
2382+
flush_socket_msg(socket)
2383+
after
2384+
0 -> :ok
2385+
end
2386+
end
2387+
23142388
defp maybe_nominate(ice_agent) do
23152389
if time_to_nominate?(ice_agent) do
23162390
Logger.debug("Time to nominate a pair! Looking for a best valid pair...")
@@ -2367,6 +2441,7 @@ defmodule ExICE.Priv.ICEAgent do
23672441
# clearing the whole state anyway, we can close the socket manually
23682442
Logger.debug("Closing socket: #{inspect(ip)}:#{port}.")
23692443
:ok = ice_agent.transport_module.close(socket)
2444+
:ok = flush_socket_msg(socket)
23702445

23712446
{:error, :closed} ->
23722447
# socket already closed
@@ -2497,7 +2572,6 @@ defmodule ExICE.Priv.ICEAgent do
24972572
end
24982573

24992574
defp generate_credentials() do
2500-
# TODO am I using Base.encode64 correctly?
25012575
ufrag = :crypto.strong_rand_bytes(3) |> Base.encode64()
25022576
pwd = :crypto.strong_rand_bytes(16) |> Base.encode64()
25032577
{ufrag, pwd}
@@ -2523,9 +2597,13 @@ defmodule ExICE.Priv.ICEAgent do
25232597
end
25242598
end
25252599

2526-
defp change_gathering_state(ice_agent, new_gathering_state) do
2527-
Logger.debug("Gathering state change: #{ice_agent.gathering_state} -> #{new_gathering_state}")
2528-
notify(ice_agent.on_gathering_state_change, {:gathering_state_change, new_gathering_state})
2600+
defp change_gathering_state(ice_agent, new_gathering_state, opts \\ []) do
2601+
Logger.debug("Gatering state change: #{ice_agent.gathering_state} -> #{new_gathering_state}")
2602+
2603+
if opts[:notify] != false do
2604+
notify(ice_agent.on_gathering_state_change, {:gathering_state_change, new_gathering_state})
2605+
end
2606+
25292607
%__MODULE__{ice_agent | gathering_state: new_gathering_state}
25302608
end
25312609

@@ -2550,8 +2628,10 @@ defmodule ExICE.Priv.ICEAgent do
25502628
end
25512629

25522630
@doc false
2553-
@spec change_connection_state(t(), atom()) :: t()
2554-
def change_connection_state(ice_agent, :failed) do
2631+
@spec change_connection_state(t(), atom(), Keyword.t()) :: t()
2632+
def change_connection_state(ice_agent, new_state, opts \\ [])
2633+
2634+
def change_connection_state(ice_agent, :failed, opts) do
25552635
ice_agent =
25562636
Enum.reduce(ice_agent.sockets, ice_agent, fn socket, ice_agent ->
25572637
close_socket(ice_agent, socket)
@@ -2599,10 +2679,10 @@ defmodule ExICE.Priv.ICEAgent do
25992679
nominating?: {false, nil}
26002680
}
26012681
|> disable_timer()
2602-
|> do_change_connection_state(:failed)
2682+
|> do_change_connection_state(:failed, opts)
26032683
end
26042684

2605-
def change_connection_state(ice_agent, :completed) do
2685+
def change_connection_state(ice_agent, :completed, opts) do
26062686
selected_pair = Map.fetch!(ice_agent.checklist, ice_agent.selected_pair_id)
26072687
succeeded_pair = Map.fetch!(ice_agent.checklist, selected_pair.succeeded_pair_id)
26082688

@@ -2632,16 +2712,20 @@ defmodule ExICE.Priv.ICEAgent do
26322712
end
26332713
end)
26342714

2635-
do_change_connection_state(ice_agent, :completed)
2715+
do_change_connection_state(ice_agent, :completed, opts)
26362716
end
26372717

2638-
def change_connection_state(ice_agent, new_conn_state) do
2639-
do_change_connection_state(ice_agent, new_conn_state)
2718+
def change_connection_state(ice_agent, new_conn_state, opts) do
2719+
do_change_connection_state(ice_agent, new_conn_state, opts)
26402720
end
26412721

2642-
defp do_change_connection_state(ice_agent, new_conn_state) do
2722+
defp do_change_connection_state(ice_agent, new_conn_state, opts) do
26432723
Logger.debug("Connection state change: #{ice_agent.state} -> #{new_conn_state}")
2644-
notify(ice_agent.on_connection_state_change, {:connection_state_change, new_conn_state})
2724+
2725+
if opts[:notify] != false do
2726+
notify(ice_agent.on_connection_state_change, {:connection_state_change, new_conn_state})
2727+
end
2728+
26452729
%__MODULE__{ice_agent | state: new_conn_state}
26462730
end
26472731

test/integration/p2p_test.exs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,12 @@ defmodule ExICE.Integration.P2PTest do
7979

8080
assert File.read!(Path.join([tmp_dir, "a2_restart_recv_data"])) ==
8181
File.read!("./test/fixtures/lotr.txt")
82+
83+
assert :ok = ICEAgent.close(agent1)
84+
assert :ok = ICEAgent.close(agent2)
85+
86+
assert :ok = ICEAgent.stop(agent1)
87+
assert :ok = ICEAgent.stop(agent2)
8288
end
8389

8490
@tag :tmp_dir

test/priv/ice_agent_test.exs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,43 @@ defmodule ExICE.Priv.ICEAgentTest do
345345
assert new_ice_agent == ice_agent
346346
end
347347

348+
test "close/1" do
349+
ice_agent =
350+
ICEAgent.new(
351+
controlling_process: self(),
352+
role: :controlling,
353+
if_discovery_module: IfDiscovery.Mock,
354+
transport_module: Transport.Mock
355+
)
356+
|> ICEAgent.set_remote_credentials("remoteufrag", "remotepwd")
357+
|> ICEAgent.gather_candidates()
358+
|> ICEAgent.add_remote_candidate(@remote_cand)
359+
360+
assert_receive {:ex_ice, _pid, {:gathering_state_change, :complete}}
361+
362+
ice_agent = ICEAgent.close(ice_agent)
363+
364+
assert ice_agent.state == :closed
365+
assert ice_agent.gathering_state == :complete
366+
assert [%{state: :failed}] = Map.values(ice_agent.checklist)
367+
assert [%{base: %{closed?: true}}] = Map.values(ice_agent.local_cands)
368+
assert [_remote_cand] = Map.values(ice_agent.remote_cands)
369+
assert ice_agent.sockets != []
370+
371+
# check stats
372+
stats = ICEAgent.get_stats(ice_agent)
373+
assert stats.local_candidates != %{}
374+
assert stats.remote_candidates != %{}
375+
assert stats.candidate_pairs != %{}
376+
assert stats.state == :closed
377+
378+
refute_received {:ex_ice, _pid, {:connection_state_change, :closed}}
379+
refute_received {:ex_ice, _pid, {:gathering_state_change, :complete}}
380+
381+
# try to restart ICE, this should be ignored
382+
%{state: :closed, gathering_state: :complete} = ICEAgent.restart(ice_agent)
383+
end
384+
348385
test "doesn't add pairs with srflx local candidate to the checklist" do
349386
ice_agent =
350387
ICEAgent.new(

0 commit comments

Comments
 (0)