Skip to content

Commit

Permalink
Migrate Kane.Message public API to Goth 1.3
Browse files Browse the repository at this point in the history
  • Loading branch information
ulissesalmeida committed Nov 16, 2022
1 parent 79cb3fe commit 6357bc5
Show file tree
Hide file tree
Showing 2 changed files with 212 additions and 148 deletions.
90 changes: 36 additions & 54 deletions lib/kane/message.ex
Original file line number Diff line number Diff line change
@@ -1,41 +1,49 @@
defmodule Kane.Message do
alias Kane.Topic
alias Kane.Client.Response.Error
alias Kane.Client

@type message_data :: binary() | Jason.Encoder.t()

@type t :: %__MODULE__{
id: String.t() | nil,
attributes: Map.t(),
data: any,
data: message_data(),
ack_id: String.t() | nil,
publish_time: String.t() | nil
}

defstruct id: nil, attributes: %{}, data: nil, ack_id: nil, publish_time: nil

@spec publish(binary, binary) :: {:ok, t} | Error.t()
def publish(message, topic) when is_binary(message) and is_binary(topic) do
publish(%__MODULE__{data: message}, %Topic{name: topic})
@spec publish(
Kane.t(),
messages :: message_data() | t() | [t()],
topic :: String.t() | Topic.t()
) :: {:ok, t() | [t()]} | Client.error()
def publish(kane, message, topic) when is_binary(topic) do
publish(kane, %__MODULE__{data: message}, %Topic{name: topic})
end

@spec publish(t, Topic.t()) :: {:ok, t} | Error.t()
def publish(%__MODULE__{} = message, %Topic{} = topic) do
case publish([message], topic) do
{:ok, [message | _]} -> {:ok, message}
def publish(kane, %__MODULE__{} = message, topic) do
case publish(kane, [message], topic) do
{:ok, [message]} -> {:ok, message}
err -> err
end
end

@spec publish([t], Topic.t()) :: {:ok, [t]} | Error.t()
def publish(messages, %Topic{name: topic}) when is_list(messages) do
case Kane.Client.post(path(topic), data(messages)) do
def publish(%Kane{project_id: project_id} = kane, messages, %Topic{name: topic_name})
when is_list(messages) do
publish_path = "projects/#{project_id}/topics/#{Topic.strip!(project_id, topic_name)}:publish"
data = publish_data(messages)

case Client.post(kane, publish_path, data) do
{:ok, body, _code} ->
collected =
body
|> Jason.decode!()
|> Map.get("messageIds")
|> Stream.with_index()
|> Enum.map(fn {id, i} ->
%{Enum.at(messages, i) | id: id}
|> Enum.zip(messages)
|> Enum.map(fn {id, message} ->
%{message | id: id}
end)

{:ok, collected}
Expand All @@ -45,41 +53,23 @@ defmodule Kane.Message do
end
end

@spec data(t) :: map
def data(%__MODULE__{} = message), do: data([message])

@spec data([t]) :: map
def data(messages) when is_list(messages) do
defp publish_data(messages) do
%{
"messages" =>
Enum.map(messages, fn %__MODULE__{data: d, attributes: a} ->
%{
"data" => encode_body(d),
"attributes" =>
Enum.reduce(a, %{}, fn {key, val}, map ->
Map.put(map, key, val)
end)
"attributes" => Map.new(a)
}
end)
}
end

@spec encode_body(any) :: binary
def encode_body(body) when is_binary(body), do: Base.encode64(body)
def encode_body(body), do: body |> Jason.encode!() |> encode_body

def json(%__MODULE__{} = message), do: json([message])

def json(messages) when is_list(messages) do
data(messages) |> Jason.encode!()
end

def from_subscription!(%{} = data) do
{:ok, message} = from_subscription(data)
message
end
defp encode_body(body) when is_binary(body), do: Base.encode64(body)
defp encode_body(body), do: body |> Jason.encode!() |> encode_body

def from_subscription(%{
@spec from_subscription!(payload :: map()) :: t()
def from_subscription!(%{
"ackId" => ack,
"message" => %{"publishTime" => time, "messageId" => id} = message
}) do
Expand All @@ -91,20 +81,12 @@ defmodule Kane.Message do
actual_data -> Base.decode64!(actual_data)
end

{:ok,
%__MODULE__{
id: id,
publish_time: time,
ack_id: ack,
data: data,
attributes: attr
}}
end

defp path(%Topic{name: topic}), do: path(topic)

defp path(topic) do
{:ok, project} = Goth.Config.get(:project_id)
"projects/#{project}/topics/#{Topic.strip!(topic)}:publish"
%__MODULE__{
id: id,
publish_time: time,
ack_id: ack,
data: data,
attributes: attr
}
end
end
Loading

0 comments on commit 6357bc5

Please sign in to comment.