From 744d2fe9ef696ff2038de0752b4d4bd82d9ceb34 Mon Sep 17 00:00:00 2001 From: benbot Date: Tue, 19 Dec 2023 11:19:50 -0500 Subject: [PATCH 1/5] Adds mounts to fly backend --- lib/flame/fly_backend.ex | 7 +++++-- lib/flame/fly_backend/mounts.ex | 8 ++++++++ 2 files changed, 13 insertions(+), 2 deletions(-) create mode 100644 lib/flame/fly_backend/mounts.ex diff --git a/lib/flame/fly_backend.ex b/lib/flame/fly_backend.ex index b99701f..7ed3ba8 100644 --- a/lib/flame/fly_backend.ex +++ b/lib/flame/fly_backend.ex @@ -54,6 +54,7 @@ defmodule FLAME.FlyBackend do :cpu_kind, :cpus, :gpu_kind, + :mounts, :memory_mb, :image, :app, @@ -73,6 +74,7 @@ defmodule FLAME.FlyBackend do cpus: nil, memory_mb: nil, gpu_kind: nil, + mounts: [], image: nil, services: [], app: nil, @@ -86,7 +88,7 @@ defmodule FLAME.FlyBackend do runner_private_ip: nil, runner_node_name: nil - @valid_opts ~w(app image token host cpu_kind cpus memory_mb gpu_kind boot_timeout env terminator_sup log services)a + @valid_opts ~w(app image token host cpu_kind cpus mounts memory_mb gpu_kind boot_timeout env terminator_sup log services)a @impl true def init(opts) do @@ -179,11 +181,12 @@ defmodule FLAME.FlyBackend do name: "#{state.app}-flame-#{rand_id(20)}", config: %{ image: state.image, + mounts: state.mounts, guest: %{ cpu_kind: state.cpu_kind, cpus: state.cpus, memory_mb: state.memory_mb, - gpu_kind: state.gpu_kind + gpu_kind: state.gpu_kind, }, auto_destroy: true, restart: %{policy: "no"}, diff --git a/lib/flame/fly_backend/mounts.ex b/lib/flame/fly_backend/mounts.ex new file mode 100644 index 0000000..842df40 --- /dev/null +++ b/lib/flame/fly_backend/mounts.ex @@ -0,0 +1,8 @@ +defmodule FLAME.FlyBackend.Mounts do + @derive Jason.Encoder + defstruct name: nil, + path: nil + # extend_threshold_percent: 0, + # add_size_gb: 0, + # size_gb_limit: 0 +end From bb98ffdd9830e6a4ce86d5be20e61fd64268ed82 Mon Sep 17 00:00:00 2001 From: benbot Date: Thu, 21 Dec 2023 12:05:01 -0500 Subject: [PATCH 2/5] Adds automatic volume id discovery and matching --- lib/flame/fly_backend.ex | 64 +++++++++++++++++++++++++++++++-- lib/flame/fly_backend/mounts.ex | 9 ++--- 2 files changed, 66 insertions(+), 7 deletions(-) diff --git a/lib/flame/fly_backend.ex b/lib/flame/fly_backend.ex index 7ed3ba8..77f803c 100644 --- a/lib/flame/fly_backend.ex +++ b/lib/flame/fly_backend.ex @@ -105,7 +105,8 @@ defmodule FLAME.FlyBackend do memory_mb: 4096, boot_timeout: 30_000, runner_node_basename: node_base, - services: [] + services: [], + mounts: [] } provided_opts = @@ -168,8 +169,65 @@ defmodule FLAME.FlyBackend do {result, div(micro, 1000)} end + defp get_volume_id(%FlyBackend{ mounts: mounts } = state) when is_list(mounts) do + case mounts do + [] -> + {nil, 0} + mounts -> + {vols, time} = get_volumes(state) + + case vols do + [] -> + {:error, "no volumes to mount"} + all_vols -> + vols = + all_vols + |> Enum.filter(fn vol -> + vol["attached_machine_id"] == nil + end) + + volume_ids_by_name = + vols + |> Enum.group_by(fn vol -> + vol["name"] + end) + |> Enum.map(fn {name, vols} -> + {name, Enum.map(vols, fn vol -> vol["id"] end)} + end) + + {new_mounts, leftover_vols} = Enum.reduce(mounts, {[], volume_ids_by_name}, fn mount, {new_mounts, leftover_vols} -> + case Enum.find(leftover_vols, fn {name, ids} -> name == mount.name end) do + nil -> + raise ArgumentError, "not enough fly volumes with the name \"#{mount.name}\" to a FLAME child" + {_, [id | rest]} -> + {new_mount, leftover_vols} = Enum.split(rest, 1) + {new_mounts ++ [%{mount | volume: id}], leftover_vols} + end + end) + + {new_mounts, time} + end + end + end + defp get_volume_id(_) do + raise ArgumentError, "expected a list of mounts" + end + + defp get_volumes(%FlyBackend{} = state) do + {vols, get_vols_time} = with_elapsed_ms(fn -> + Req.get!("#{state.host}/v1/apps/#{state.app}/volumes", + connect_options: [timeout: state.boot_timeout], + retry: false, + auth: {:bearer, state.token}, + ) + end) + + {vols.body, get_vols_time} + end + @impl true def remote_boot(%FlyBackend{parent_ref: parent_ref} = state) do + {mounts, volume_validate_time} = get_volume_id(state) {req, req_connect_time} = with_elapsed_ms(fn -> Req.post!("#{state.host}/v1/apps/#{state.app}/machines", @@ -181,7 +239,7 @@ defmodule FLAME.FlyBackend do name: "#{state.app}-flame-#{rand_id(20)}", config: %{ image: state.image, - mounts: state.mounts, + mounts: mounts, guest: %{ cpu_kind: state.cpu_kind, cpus: state.cpus, @@ -197,7 +255,7 @@ defmodule FLAME.FlyBackend do ) end) - remaining_connect_window = state.boot_timeout - req_connect_time + remaining_connect_window = state.boot_timeout - req_connect_time - volume_validate_time case req.body do %{"id" => id, "instance_id" => instance_id, "private_ip" => ip} -> diff --git a/lib/flame/fly_backend/mounts.ex b/lib/flame/fly_backend/mounts.ex index 842df40..4ccd0af 100644 --- a/lib/flame/fly_backend/mounts.ex +++ b/lib/flame/fly_backend/mounts.ex @@ -1,8 +1,9 @@ defmodule FLAME.FlyBackend.Mounts do @derive Jason.Encoder defstruct name: nil, - path: nil - # extend_threshold_percent: 0, - # add_size_gb: 0, - # size_gb_limit: 0 + path: nil, + volume: nil, + extend_threshold_percent: 0, + add_size_gb: 0, + size_gb_limit: 0 end From 8a10c24c8e1b3c915e2af39b9c97184d07f2928d Mon Sep 17 00:00:00 2001 From: benbot Date: Sat, 23 Dec 2023 01:29:17 -0500 Subject: [PATCH 3/5] checks for created state --- lib/flame/fly_backend.ex | 1 + 1 file changed, 1 insertion(+) diff --git a/lib/flame/fly_backend.ex b/lib/flame/fly_backend.ex index 77f803c..8ad3895 100644 --- a/lib/flame/fly_backend.ex +++ b/lib/flame/fly_backend.ex @@ -184,6 +184,7 @@ defmodule FLAME.FlyBackend do all_vols |> Enum.filter(fn vol -> vol["attached_machine_id"] == nil + and vol["state"] == "created" end) volume_ids_by_name = From fdf71645d0b744ccf4eca863f9928cf7c215b2f7 Mon Sep 17 00:00:00 2001 From: "Ben Botwin (benbot)" Date: Sun, 31 Dec 2023 14:10:14 -0500 Subject: [PATCH 4/5] cleaned up volume code based on PR comments --- lib/flame/fly_backend.ex | 49 ++++++++++++++++------------------------ 1 file changed, 20 insertions(+), 29 deletions(-) diff --git a/lib/flame/fly_backend.ex b/lib/flame/fly_backend.ex index 8ad3895..a331c92 100644 --- a/lib/flame/fly_backend.ex +++ b/lib/flame/fly_backend.ex @@ -169,46 +169,37 @@ defmodule FLAME.FlyBackend do {result, div(micro, 1000)} end + defp get_volume_id(%FlyBackend{ mounts: [] }), do: {nil, 0} defp get_volume_id(%FlyBackend{ mounts: mounts } = state) when is_list(mounts) do - case mounts do - [] -> - {nil, 0} - mounts -> - {vols, time} = get_volumes(state) + {volumes, time} = get_volumes(state) - case vols do + case volumes do [] -> {:error, "no volumes to mount"} - all_vols -> - vols = - all_vols + all_volumes -> + volume_ids_by_name = + all_volumes |> Enum.filter(fn vol -> vol["attached_machine_id"] == nil and vol["state"] == "created" end) - - volume_ids_by_name = - vols - |> Enum.group_by(fn vol -> - vol["name"] - end) - |> Enum.map(fn {name, vols} -> - {name, Enum.map(vols, fn vol -> vol["id"] end)} - end) - - {new_mounts, leftover_vols} = Enum.reduce(mounts, {[], volume_ids_by_name}, fn mount, {new_mounts, leftover_vols} -> - case Enum.find(leftover_vols, fn {name, ids} -> name == mount.name end) do - nil -> - raise ArgumentError, "not enough fly volumes with the name \"#{mount.name}\" to a FLAME child" - {_, [id | rest]} -> - {new_mount, leftover_vols} = Enum.split(rest, 1) - {new_mounts ++ [%{mount | volume: id}], leftover_vols} + |> Enum.group_by(&(&1["name"]), &(&1["id"])) + + new_mounts = Enum.map_reduce( + mounts, + volume_ids_by_name, + fn mount, leftover_vols -> + case List.wrap(leftover_vols[mount.name]) do + [] -> + raise ArgumentError, "not enough fly volumes with the name \"#{mount.name}\" to a FLAME child" + [volume_id | rest] -> + {%{mount | volume: volume_id}, %{leftover_vols | mount.name => rest}} + end end - end) + ) {new_mounts, time} end - end end defp get_volume_id(_) do raise ArgumentError, "expected a list of mounts" @@ -219,7 +210,7 @@ defmodule FLAME.FlyBackend do Req.get!("#{state.host}/v1/apps/#{state.app}/volumes", connect_options: [timeout: state.boot_timeout], retry: false, - auth: {:bearer, state.token}, + auth: {:bearer, state.token} ) end) From 69856e1b334d5f69976fba805f58239c9c68296a Mon Sep 17 00:00:00 2001 From: terrcin Date: Fri, 4 Jul 2025 17:13:10 +1200 Subject: [PATCH 5/5] updated FlyBackend to allow assigning existing vols to the machine on boot --- lib/flame/fly_backend.ex | 178 +++++++++++++++++++------------- lib/flame/fly_backend/mount.ex | 42 ++++++++ lib/flame/fly_backend/mounts.ex | 9 -- 3 files changed, 151 insertions(+), 78 deletions(-) create mode 100644 lib/flame/fly_backend/mount.ex delete mode 100644 lib/flame/fly_backend/mounts.ex diff --git a/lib/flame/fly_backend.ex b/lib/flame/fly_backend.ex index bbed8bb..63e008b 100644 --- a/lib/flame/fly_backend.ex +++ b/lib/flame/fly_backend.ex @@ -33,6 +33,8 @@ defmodule FLAME.FlyBackend do * `:gpus` - The number of runner GPUs. Defaults to `1` if `:gpu_kind` is set. + * `:mounts` - List volumes to mount. Refer to FlyBackend.Mount for opt details + * `:boot_timeout` - The boot timeout. Defaults to `30_000`. * `:app` – The name of the otp app. Defaults to `System.get_env("FLY_APP_NAME")`, @@ -88,6 +90,8 @@ defmodule FLAME.FlyBackend do alias FLAME.FlyBackend alias FLAME.Parser.JSON + alias FLAME.FlyBackend.Mount + require Logger @derive {Inspect, @@ -195,7 +199,11 @@ defmodule FLAME.FlyBackend do end end - state = %{state | runner_node_base: "#{state.app}-flame-#{rand_id(20)}"} + mounts = state.mounts |> List.wrap() |> Enum.map(&Mount.parse_opts/1) + + state = + Map.merge(state, %{mounts: mounts, runner_node_base: "#{state.app}-flame-#{rand_id(20)}"}) + parent_ref = make_ref() encoded_parent = @@ -254,74 +262,80 @@ defmodule FLAME.FlyBackend do {result, div(micro, 1000)} end - defp get_volume_id(%FlyBackend{ mounts: [] }), do: {nil, 0} - defp get_volume_id(%FlyBackend{ mounts: mounts } = state) when is_list(mounts) do - {volumes, time} = get_volumes(state) - - case volumes do - [] -> - {:error, "no volumes to mount"} - all_volumes -> - volume_ids_by_name = - all_volumes - |> Enum.filter(fn vol -> - vol["attached_machine_id"] == nil - and vol["state"] == "created" - end) - |> Enum.group_by(&(&1["name"]), &(&1["id"])) - - new_mounts = Enum.map_reduce( - mounts, - volume_ids_by_name, - fn mount, leftover_vols -> - case List.wrap(leftover_vols[mount.name]) do - [] -> - raise ArgumentError, "not enough fly volumes with the name \"#{mount.name}\" to a FLAME child" - [volume_id | rest] -> - {%{mount | volume: volume_id}, %{leftover_vols | mount.name => rest}} - end + defp allocate_volume_ids(%FlyBackend{mounts: []}), do: [] + + defp allocate_volume_ids(%FlyBackend{mounts: mounts} = state) when is_list(mounts) do + case get_volumes(state) do + [] -> + {:error, "no Fly volumes found"} + + all_volumes -> + volume_ids_by_name = + all_volumes + |> Enum.filter(fn vol -> + vol["attached_machine_id"] == nil && vol["state"] == "created" && + vol["host_status"] == "ok" && + Map.get(state, :region, vol["region"]) == vol["region"] + end) + |> Enum.shuffle() + |> Enum.group_by(& &1["name"], & &1["id"]) + + {new_mounts, _unused_vols} = + Enum.map_reduce( + mounts, + volume_ids_by_name, + fn + %{volume: nil} = mount, leftover_vols -> + case leftover_vols[mount.name] do + [volume_id | rest] -> + {%{mount | volume: volume_id}, %{leftover_vols | mount.name => rest}} + + _ -> + raise ArgumentError, + "no available Fly volumes with the name \"#{mount.name}\" in region \"#{Map.get(state, :region)}\" found" end - ) - {new_mounts, time} - end + mount, leftover_vols -> + {mount, leftover_vols} + end + ) + + Enum.map(new_mounts, &Map.from_struct/1) + end end - defp get_volume_id(_) do + + defp allocate_volume_ids(_) do raise ArgumentError, "expected a list of mounts" end defp get_volumes(%FlyBackend{} = state) do - {vols, get_vols_time} = with_elapsed_ms(fn -> - Req.get!("#{state.host}/v1/apps/#{state.app}/volumes", - connect_options: [timeout: state.boot_timeout], - retry: false, - auth: {:bearer, state.token} - ) - end) - - {vols.body, get_vols_time} + http_request!(:get, "#{state.host}/v1/apps/#{state.app}/volumes", @retry, + headers: [ + {"Accept", "application/json"}, + {"Authorization", "Bearer #{state.token}"} + ], + connect_timeout: state.boot_timeout + ) end @impl true def remote_boot(%FlyBackend{parent_ref: parent_ref} = state) do - {mounts, volume_validate_time} = get_volume_id(state) - {resp, req_connect_time} = with_elapsed_ms(fn -> - http_post!("#{state.host}/v1/apps/#{state.app}/machines", @retry, + http_request!(:post, "#{state.host}/v1/apps/#{state.app}/machines", @retry, content_type: "application/json", headers: [ {"Content-Type", "application/json"}, {"Authorization", "Bearer #{state.token}"} ], connect_timeout: state.boot_timeout, - body: + body: fn -> JSON.encode!(%{ name: state.runner_node_base, region: state.region, config: %{ image: state.image, - mounts: mounts, + mounts: allocate_volume_ids(state), init: state.init, guest: %{ cpu_kind: state.cpu_kind, @@ -337,6 +351,7 @@ defmodule FLAME.FlyBackend do metadata: Map.put(state.metadata, :flame_parent_ip, state.local_ip) } }) + end ) end) @@ -347,7 +362,7 @@ defmodule FLAME.FlyBackend do ) end - remaining_connect_window = state.boot_timeout - req_connect_time - volume_validate_time + remaining_connect_window = state.boot_timeout - req_connect_time case resp do %{"id" => id, "instance_id" => instance_id, "private_ip" => ip} -> @@ -389,32 +404,17 @@ defmodule FLAME.FlyBackend do |> binary_part(0, len) end - defp http_post!(url, remaining_tries, opts) do - Keyword.validate!(opts, [:headers, :body, :connect_timeout, :content_type]) + defp http_request!(method, url, remaining_tries, opts) when method in [:get, :post] do + validation_request_opts!(method, opts) headers = for {field, val} <- Keyword.fetch!(opts, :headers), do: {String.to_charlist(field), val} - body = Keyword.fetch!(opts, :body) - connect_timeout = Keyword.fetch!(opts, :connect_timeout) - content_type = Keyword.fetch!(opts, :content_type) - - http_opts = [ - ssl: - [ - verify: :verify_peer, - depth: 2, - customize_hostname_check: [ - match_fun: :public_key.pkix_verify_hostname_match_fun(:https) - ] - ] ++ cacerts_options(), - connect_timeout: connect_timeout - ] + request = make_request(method, url, headers, opts) + http_opts = make_http_opts(opts) - case :httpc.request(:post, {url, headers, ~c"#{content_type}", body}, http_opts, - body_format: :binary - ) do + case :httpc.request(method, request, http_opts, body_format: :binary) do {:ok, {{_, 200, _}, _, response_body}} -> JSON.decode!(response_body) @@ -425,16 +425,56 @@ defmodule FLAME.FlyBackend do {:ok, {{_, status, _}, _, _response_body}} when status in [429, 412, 409, 422] and remaining_tries > 0 -> Process.sleep(1000) - http_post!(url, remaining_tries - 1, opts) + http_request!(method, url, remaining_tries - 1, opts) {:ok, {{_, status, reason}, _, resp_body}} -> - raise "failed POST #{url} with #{inspect(status)} (#{inspect(reason)}): #{inspect(resp_body)} #{inspect(headers)}" + raise "failed #{method} #{url} with #{inspect(status)} (#{inspect(reason)}): #{inspect(resp_body)} #{inspect(headers)}" {:error, reason} -> - raise "failed POST #{url} with #{inspect(reason)} #{inspect(headers)}" + raise "failed #{method} #{url} with #{inspect(reason)} #{inspect(headers)}" end end + defp validation_request_opts!(:get, opts) do + Keyword.validate!(opts, [:headers, :connect_timeout]) + end + + defp validation_request_opts!(:post, opts) do + Keyword.validate!(opts, [:headers, :body, :connect_timeout, :content_type]) + end + + defp make_request(:get, url, headers, _opts) do + {url, headers} + end + + defp make_request(:post, url, headers, opts) do + content_type = Keyword.fetch!(opts, :content_type) + + body = + case Keyword.fetch!(opts, :body) do + body_func when is_function(body_func) -> body_func.() + body -> body + end + + {url, headers, ~c"#{content_type}", body} + end + + defp make_http_opts(opts) do + connect_timeout = Keyword.fetch!(opts, :connect_timeout) + + [ + ssl: + [ + verify: :verify_peer, + depth: 2, + customize_hostname_check: [ + match_fun: :public_key.pkix_verify_hostname_match_fun(:https) + ] + ] ++ cacerts_options(), + connect_timeout: connect_timeout + ] + end + defp cacerts_options do cond do certs = otp_cacerts() -> diff --git a/lib/flame/fly_backend/mount.ex b/lib/flame/fly_backend/mount.ex new file mode 100644 index 0000000..6bd72ac --- /dev/null +++ b/lib/flame/fly_backend/mount.ex @@ -0,0 +1,42 @@ +defmodule FLAME.FlyBackend.Mount do + # Refer to the "mount:" section most of the may down this page for how to use these keys + # https://fly.io/docs/machines/api/machines-resource/ + + alias FLAME.FlyBackend.Mount + + @derive {Inspect, + only: [ + :volume, + :path, + :name, + :extend_threshold_percent, + :add_size_gb, + :size_gb_limit + ]} + defstruct volume: nil, + path: nil, + name: nil, + extend_threshold_percent: nil, + add_size_gb: nil, + size_gb_limit: nil + + @valid_opts [:volume, :path, :name, :extend_threshold_percent, :add_size_gb, :size_gb_limit] + + @required_opts [:path, :name] + + def parse_opts(opts) do + default = %Mount{extend_threshold_percent: 0, add_size_gb: 0, size_gb_limit: 0} + + provided_opts = Keyword.validate!(opts, @valid_opts) + + %Mount{} = state = Map.merge(default, Map.new(provided_opts)) + + for key <- @required_opts do + unless Map.get(state, key) do + raise ArgumentError, "missing :#{key} config for #{inspect(__MODULE__)}" + end + end + + state + end +end diff --git a/lib/flame/fly_backend/mounts.ex b/lib/flame/fly_backend/mounts.ex deleted file mode 100644 index 4ccd0af..0000000 --- a/lib/flame/fly_backend/mounts.ex +++ /dev/null @@ -1,9 +0,0 @@ -defmodule FLAME.FlyBackend.Mounts do - @derive Jason.Encoder - defstruct name: nil, - path: nil, - volume: nil, - extend_threshold_percent: 0, - add_size_gb: 0, - size_gb_limit: 0 -end