From 6357bc56a0ab9822781ef2710e30b4c48296343d Mon Sep 17 00:00:00 2001 From: Ulisses Almeida Date: Wed, 16 Nov 2022 16:48:38 +0000 Subject: [PATCH] Migrate Kane.Message public API to Goth 1.3 --- lib/kane/message.ex | 90 +++++-------- test/kane/message_test.exs | 270 ++++++++++++++++++++++++------------- 2 files changed, 212 insertions(+), 148 deletions(-) diff --git a/lib/kane/message.ex b/lib/kane/message.ex index 47087d2..8938110 100644 --- a/lib/kane/message.ex +++ b/lib/kane/message.ex @@ -1,41 +1,49 @@ defmodule Kane.Message do alias Kane.Topic - alias Kane.Client.Response.Error + alias Kane.Client + + @type message_data :: binary() | Jason.Encoder.t() @type t :: %__MODULE__{ id: String.t() | nil, attributes: Map.t(), - data: any, + data: message_data(), ack_id: String.t() | nil, publish_time: String.t() | nil } defstruct id: nil, attributes: %{}, data: nil, ack_id: nil, publish_time: nil - @spec publish(binary, binary) :: {:ok, t} | Error.t() - def publish(message, topic) when is_binary(message) and is_binary(topic) do - publish(%__MODULE__{data: message}, %Topic{name: topic}) + @spec publish( + Kane.t(), + messages :: message_data() | t() | [t()], + topic :: String.t() | Topic.t() + ) :: {:ok, t() | [t()]} | Client.error() + def publish(kane, message, topic) when is_binary(topic) do + publish(kane, %__MODULE__{data: message}, %Topic{name: topic}) end - @spec publish(t, Topic.t()) :: {:ok, t} | Error.t() - def publish(%__MODULE__{} = message, %Topic{} = topic) do - case publish([message], topic) do - {:ok, [message | _]} -> {:ok, message} + def publish(kane, %__MODULE__{} = message, topic) do + case publish(kane, [message], topic) do + {:ok, [message]} -> {:ok, message} err -> err end end - @spec publish([t], Topic.t()) :: {:ok, [t]} | Error.t() - def publish(messages, %Topic{name: topic}) when is_list(messages) do - case Kane.Client.post(path(topic), data(messages)) do + def publish(%Kane{project_id: project_id} = kane, messages, %Topic{name: topic_name}) + when is_list(messages) do + publish_path = "projects/#{project_id}/topics/#{Topic.strip!(project_id, topic_name)}:publish" + data = publish_data(messages) + + case Client.post(kane, publish_path, data) do {:ok, body, _code} -> collected = body |> Jason.decode!() |> Map.get("messageIds") - |> Stream.with_index() - |> Enum.map(fn {id, i} -> - %{Enum.at(messages, i) | id: id} + |> Enum.zip(messages) + |> Enum.map(fn {id, message} -> + %{message | id: id} end) {:ok, collected} @@ -45,41 +53,23 @@ defmodule Kane.Message do end end - @spec data(t) :: map - def data(%__MODULE__{} = message), do: data([message]) - - @spec data([t]) :: map - def data(messages) when is_list(messages) do + defp publish_data(messages) do %{ "messages" => Enum.map(messages, fn %__MODULE__{data: d, attributes: a} -> %{ "data" => encode_body(d), - "attributes" => - Enum.reduce(a, %{}, fn {key, val}, map -> - Map.put(map, key, val) - end) + "attributes" => Map.new(a) } end) } end - @spec encode_body(any) :: binary - def encode_body(body) when is_binary(body), do: Base.encode64(body) - def encode_body(body), do: body |> Jason.encode!() |> encode_body - - def json(%__MODULE__{} = message), do: json([message]) - - def json(messages) when is_list(messages) do - data(messages) |> Jason.encode!() - end - - def from_subscription!(%{} = data) do - {:ok, message} = from_subscription(data) - message - end + defp encode_body(body) when is_binary(body), do: Base.encode64(body) + defp encode_body(body), do: body |> Jason.encode!() |> encode_body - def from_subscription(%{ + @spec from_subscription!(payload :: map()) :: t() + def from_subscription!(%{ "ackId" => ack, "message" => %{"publishTime" => time, "messageId" => id} = message }) do @@ -91,20 +81,12 @@ defmodule Kane.Message do actual_data -> Base.decode64!(actual_data) end - {:ok, - %__MODULE__{ - id: id, - publish_time: time, - ack_id: ack, - data: data, - attributes: attr - }} - end - - defp path(%Topic{name: topic}), do: path(topic) - - defp path(topic) do - {:ok, project} = Goth.Config.get(:project_id) - "projects/#{project}/topics/#{Topic.strip!(topic)}:publish" + %__MODULE__{ + id: id, + publish_time: time, + ack_id: ack, + data: data, + attributes: attr + } end end diff --git a/test/kane/message_test.exs b/test/kane/message_test.exs index 75cd52b..414ed52 100644 --- a/test/kane/message_test.exs +++ b/test/kane/message_test.exs @@ -1,117 +1,199 @@ defmodule Kane.MessageTest do - use ExUnit.Case + use ExUnit.Case, async: true + alias Kane.Message alias Kane.Topic + alias Kane.GCPTestCredentials + alias Kane.TestToken setup do bypass = Bypass.open() - Application.put_env(:kane, :endpoint, "http://localhost:#{bypass.port}") - {:ok, bypass: bypass} - end + credentials = GCPTestCredentials.read!() + {:ok, token} = TestToken.for_scope(Kane.oauth_scope()) - test "encoding the message body from a data structure" do - data = %{phil?: "He's aweseom"} - encoded = data |> Jason.encode!() |> Base.encode64() - assert Message.encode_body(data) == encoded - end + kane = %Kane{ + endpoint: "http://localhost:#{bypass.port}", + token: token, + project_id: Map.fetch!(credentials, "project_id") + } - test "encoding the message body from a string" do - data = "we are just a string!" - encoded = Base.encode64(data) - assert Message.encode_body(data) == encoded + {:ok, bypass: bypass, kane: kane} end - test "building the message data" do - message = %Message{data: %{hello: "world"}, attributes: %{"random" => "attr"}} + describe "publish/3" do + test "publishes binary data", %{kane: kane, bypass: bypass} do + topic = "publish" + message = "hello world" - data = %{hello: "world"} |> Jason.encode!() |> Base.encode64() + Bypass.expect(bypass, fn conn -> + assert_binary_message(conn, message) + + Plug.Conn.resp(conn, 201, ~s({"messageIds": [ "19916711285" ]})) + end) + + assert {:ok, %Message{}} = Message.publish(kane, message, topic) + end + + test "publishes json encodable data", %{kane: kane, bypass: bypass} do + topic = "publish" + message = %{"my" => "message", "random" => "fields"} + + Bypass.expect(bypass, fn conn -> + assert_json_message(conn, message) + + Plug.Conn.resp(conn, 201, ~s({"messageIds": [ "19916711285" ]})) + end) + + assert {:ok, %Message{}} = Message.publish(kane, message, topic) + end + + test "publishes a message", %{kane: kane, bypass: bypass} do + topic = "publish" + + message = %Message{ + data: %{"my" => "message", "random" => "fields"}, + attributes: [{"random", "attr"}] + } + + Bypass.expect(bypass, fn conn -> + assert_message(conn, message) + + Plug.Conn.resp(conn, 201, ~s({"messageIds": [ "19916711285" ]})) + end) + + assert {:ok, %Message{}} = Message.publish(kane, message, %Topic{name: topic}) + end + + test "assigns the retuning id", %{kane: kane, bypass: bypass} do + topic = "publish" + message = "hello world" + + Bypass.expect(bypass, fn conn -> + Plug.Conn.resp(conn, 201, ~s({"messageIds": [ "19916711285" ]})) + end) + + assert {:ok, %Message{id: id}} = Message.publish(kane, message, topic) + + assert id == "19916711285" + end + + test "requests to the correct path", %{kane: kane, bypass: bypass} do + project_id = kane.project_id + topic = "publish" + message = "hello world" + + Bypass.expect(bypass, fn conn -> + assert Regex.match?( + ~r{/projects/#{project_id}/topics/#{topic}:publish}, + conn.request_path + ) + + Plug.Conn.resp(conn, 201, ~s({"messageIds": [ "19916711285" ]})) + end) + + assert {:ok, %Message{}} = Message.publish(kane, message, topic) + end + + test "publishes multiple messages", %{kane: kane, bypass: bypass} do + project_id = kane.project_id + topic = "publish-multi" + ids = ["hello", "hi", "howdy"] + + Bypass.expect(bypass, fn conn -> + assert Regex.match?( + ~r{/projects/#{project_id}/topics/#{topic}:publish}, + conn.request_path + ) + + Plug.Conn.resp( + conn, + 201, + ~s({"messageIds": [ "#{Enum.at(ids, 0)}", "#{Enum.at(ids, 1)}", "#{Enum.at(ids, 2)}" ]}) + ) + end) + + data = [%{"hello" => "world"}, %{"hi" => "world"}, %{"howdy" => "world"}] + + assert {:ok, messages} = + Message.publish( + kane, + [ + %Message{data: Enum.at(data, 0)}, + %Message{data: Enum.at(data, 1)}, + %Message{data: Enum.at(data, 2)} + ], + %Topic{name: topic} + ) + + ids + |> Enum.with_index() + |> Enum.each(fn {id, i} -> + m = Enum.at(messages, i) + assert id == m.id + assert Enum.at(data, i) == m.data + end) + end + end - assert %{ - "messages" => [ - %{ - "data" => ^data, - "attributes" => %{ - "random" => "attr" + describe "from_subscription!/1" do + test "creating from subscription message" do + ack = "123" + id = "321" + data = "eyJoZWxsbyI6IndvcmxkIn0=" + decoded = data |> Base.decode64!() + time = "2016-01-24T03:07:33.195Z" + attributes = %{key: "123"} + + assert %Message{ + id: ^id, + ack_id: ^ack, + publish_time: ^time, + data: ^decoded, + attributes: ^attributes + } = + Message.from_subscription!(%{ + "ackId" => ack, + "message" => %{ + "data" => data, + "attributes" => attributes, + "messageId" => id, + "publishTime" => time } - } - ] - } = Message.data(message) + }) + end end - test "publishing a message", %{bypass: bypass} do - {:ok, project} = Goth.Config.get(:project_id) - topic = "publish" + defp assert_binary_message(conn, binary_data) do + {:ok, body, _conn} = Plug.Conn.read_body(conn) - Bypass.expect(bypass, fn conn -> - assert Regex.match?(~r{/projects/#{project}/topics/#{topic}:publish}, conn.request_path) - Plug.Conn.resp(conn, 201, ~s({"messageIds": [ "19916711285" ]})) - end) + [sent_message] = + body + |> Jason.decode!() + |> Map.fetch!("messages") - data = %{"my" => "message", "random" => "fields"} - assert {:ok, %Message{id: id}} = Message.publish(%Message{data: data}, %Topic{name: topic}) - assert id != nil + assert sent_message["data"] |> Base.decode64!() == binary_data end - test "publishing multiple messages", %{bypass: bypass} do - {:ok, project} = Goth.Config.get(:project_id) - topic = "publish-multi" - ids = ["hello", "hi", "howdy"] - - Bypass.expect(bypass, fn conn -> - assert Regex.match?(~r{/projects/#{project}/topics/#{topic}:publish}, conn.request_path) - - Plug.Conn.resp( - conn, - 201, - ~s({"messageIds": [ "#{Enum.at(ids, 0)}", "#{Enum.at(ids, 1)}", "#{Enum.at(ids, 2)}" ]}) - ) - end) - - data = [%{"hello" => "world"}, %{"hi" => "world"}, %{"howdy" => "world"}] - - assert {:ok, messages} = - Message.publish( - [ - %Message{data: Enum.at(data, 0)}, - %Message{data: Enum.at(data, 1)}, - %Message{data: Enum.at(data, 2)} - ], - %Topic{name: topic} - ) - - ids - |> Enum.with_index() - |> Enum.each(fn {id, i} -> - m = Enum.at(messages, i) - assert id == m.id - assert Enum.at(data, i) == m.data - end) + defp assert_json_message(conn, json_data) do + {:ok, body, _conn} = Plug.Conn.read_body(conn) + + [sent_message] = + body + |> Jason.decode!() + |> Map.fetch!("messages") + + assert sent_message["data"] |> Base.decode64!() |> Jason.decode!() == json_data end - test "creating from subscription message" do - ack = "123" - id = "321" - data = "eyJoZWxsbyI6IndvcmxkIn0=" - decoded = data |> Base.decode64!() - time = "2016-01-24T03:07:33.195Z" - attributes = %{key: "123"} - - assert {:ok, - %Message{ - id: ^id, - ack_id: ^ack, - publish_time: ^time, - data: ^decoded, - attributes: ^attributes - }} = - Message.from_subscription(%{ - "ackId" => ack, - "message" => %{ - "data" => data, - "attributes" => attributes, - "messageId" => id, - "publishTime" => time - } - }) + defp assert_message(conn, message) do + {:ok, body, _conn} = Plug.Conn.read_body(conn) + + [sent_message] = + body + |> Jason.decode!() + |> Map.fetch!("messages") + + assert sent_message["data"] |> Base.decode64!() |> Jason.decode!() == message.data + assert sent_message["attributes"] == Map.new(message.attributes) end end