Skip to content

Commit

Permalink
Merge branch 'duffelhq-ua-support-goth-1-3'
Browse files Browse the repository at this point in the history
Also bumps version to 1.0.0
  • Loading branch information
peburrows committed Jan 3, 2023
2 parents 731d73b + 105c781 commit 08aca7e
Show file tree
Hide file tree
Showing 15 changed files with 903 additions and 666 deletions.
28 changes: 11 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,41 +16,35 @@ def deps do
end
```

2. Configure [Goth](https://github.com/peburrows/goth) (Kane's underlying token storage and retrieval library) with your Google JSON credentials:

```elixir
config :goth,
json: "path/to/google/json/creds.json" |> File.read!
```

3. Ensure Kane is started before your application:

```elixir
def application do
[applications: [:kane]]
end
```
2. Configure [Goth](https://github.com/peburrows/goth) (Kane's underlying token storage and retrieval library) with your Google JSON credentials.

## Usage

Pull, process and acknowledge messages via a pre-existing subscription:

```elixir
{:ok, token} = Goth.fetch(MyApp.Goth)

kane = %Kane{
project_id: my_app_gcp_credentials["project_id"],
token: token
}

subscription = %Kane.Subscription{
name: "my-sub",
topic: %Kane.Topic{
name: "my-topic"
}
}

{:ok, messages} = Kane.Subscription.pull(subscription)
{:ok, messages} = Kane.Subscription.pull(kane, subscription)

Enum.each messages, fn(mess)->
process_message(mess)
end

# acknowledge message receipt in bulk
Kane.Subscription.ack(subscription, messages)
Kane.Subscription.ack(kane, subscription, messages)
```

Send message via pre-existing subscription:
Expand All @@ -59,7 +53,7 @@ Send message via pre-existing subscription:
topic = %Kane.Topic{name: "my-topic"}
message = %Kane.Message{data: %{"hello": "world"}, attributes: %{"random" => "attr"}}

result = Kane.Message.publish(message, topic)
result = Kane.Message.publish(kane, message, topic)

case result do
{:ok, _return} -> IO.puts("It worked!")
Expand Down
53 changes: 53 additions & 0 deletions UPGRADE_GUIDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
## Upgrading from Kane 0.9

Earlier versions of Kane relied on global application environment configuration which is deprecated in favour of a more direct and explicit approach in Kane vX.X+. Kane is following the same principle changes of [Goth 1.3+](https://github.com/peburrows/goth/blob/master/UPGRADE_GUIDE.md).

Below is a step-by-step upgrade path from Goth 0.x to X.X:

Upgrade Goth to [1.3+](https://github.com/peburrows/goth/blob/master/UPGRADE_GUIDE.md). Previous versions of Kane, heavily depended on the global configuration of Goth.

So, your `mix.exs` should be looking like this:

```elixir
def deps do
[
{:goth, "~> 1.3"},
{:kane, "~> X.X"}
]
end
```

You might have a code similar to this:


```elixir
subscription = %Kane.Subscription{
name: "my-sub",
topic: %Kane.Topic{
name: "my-topic"
}
}

{:ok, messages} = Kane.Subscription.pull(subscription)
```

Now you need explicity fetch the token and the project's id:

```elixir
defmodule MyApp do
def kane do
{:ok, token} = Goth.fetch(MyApp.Goth)
project_id = Application.fetch_env!(:my_app, :gcp_credentials)["project_id"]

%Kane{
project_id: project_id,
token: token
}
end
end

# then
{:ok, messages} = Kane.Subscription.pull(MyApp.kane(), subscription)
```

For more information on earlier versions of Kane, [see v0.9.0 documentation on hexdocs.pm](https://hexdocs.pm/kane/0.9.0).
3 changes: 0 additions & 3 deletions config/dev.exs
Original file line number Diff line number Diff line change
@@ -1,4 +1 @@
use Mix.Config

config :goth,
json: "config/dev-credentials.json" |> Path.expand |> File.read!
5 changes: 0 additions & 5 deletions config/test.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1 @@
use Mix.Config

config :goth,
json: "config/test-credentials.json" |> Path.expand |> File.read!

config :kane, :token, Kane.TestToken
11 changes: 10 additions & 1 deletion lib/kane.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,18 @@ defmodule Kane do

@doc """
Retrieves the default Oauth scope for retrieving an access token
iex> Kane.oauth_scope
"https://www.googleapis.com/auth/pubsub"
"""
@spec oauth_scope :: String.t()
def oauth_scope, do: "https://www.googleapis.com/auth/pubsub"

@enforce_keys [:token, :project_id]
defstruct [:token, :project_id, endpoint: "https://pubsub.googleapis.com/v1"]

@type t :: %__MODULE__{
endpoint: String.t(),
token: Goth.Token.t(),
project_id: String.t()
}
end
50 changes: 26 additions & 24 deletions lib/kane/client.ex
Original file line number Diff line number Diff line change
@@ -1,40 +1,42 @@
defmodule Kane.Client do
alias Response.Success
alias Response.Error
@moduledoc false

@spec get(binary, keyword) :: Success.t() | Error.t()
def get(path, options \\ []), do: call(:get, path, options)
@type success :: {:ok, body :: String.t(), status_code :: pos_integer()}
@type error :: {:error, body :: String.t(), status_code :: pos_integer()}

@spec put(binary, any, keyword) :: Success.t() | Error.t()
def put(path, data \\ "", options \\ []), do: call(:put, path, data, options)
@spec get(Kane.t(), binary, keyword) :: success() | error()
def get(kane, path, options \\ []), do: call(kane, :get, path, options)

@spec post(binary, any, keyword) :: Success.t() | Error.t()
def post(path, data, options \\ []), do: call(:post, path, data, options)
@spec put(Kane.t(), binary, any, keyword) :: success() | error()
def put(kane, path, data \\ "", options \\ []), do: call(kane, :put, path, data, options)

@spec delete(binary, keyword) :: Success.t() | Error.t()
def delete(path, options \\ []), do: call(:delete, path, options)
@spec post(Kane.t(), binary, any, keyword) :: success() | error()
def post(kane, path, data, options \\ []), do: call(kane, :post, path, data, options)

defp call(method, path, options) do
headers = [auth_header()]
@spec delete(Kane.t(), binary, keyword) :: success() | error()
def delete(kane, path, options \\ []), do: call(kane, :delete, path, options)

apply(HTTPoison, method, [url(path), headers, options])
|> handle_response
defp call(kane, method, path, options) do
headers = [auth_header(kane)]
url = url(kane, path)

method
|> HTTPoison.request(url, "", headers, options)
|> handle_response()
end

defp call(method, path, data, options) do
headers = [auth_header(), {"content-type", "application/json"}]
defp call(kane, method, path, data, options) do
headers = [auth_header(kane), {"content-type", "application/json"}]
url = url(kane, path)

apply(HTTPoison, method, [url(path), encode!(data), headers, options])
|> handle_response
method
|> HTTPoison.request(url, encode!(data), headers, options)
|> handle_response()
end

defp url(path), do: Path.join([endpoint(), path])

defp endpoint, do: Application.get_env(:kane, :endpoint, "https://pubsub.googleapis.com/v1")
defp token_mod, do: Application.get_env(:kane, :token, Goth.Token)
defp url(%Kane{endpoint: endpoint}, path), do: Path.join([endpoint, path])

defp auth_header do
{:ok, token} = token_mod().for_scope(Kane.oauth_scope())
defp auth_header(%Kane{token: token}) do
{"Authorization", "#{token.type} #{token.token}"}
end

Expand Down
90 changes: 36 additions & 54 deletions lib/kane/message.ex
Original file line number Diff line number Diff line change
@@ -1,41 +1,49 @@
defmodule Kane.Message do
alias Kane.Topic
alias Kane.Client.Response.Error
alias Kane.Client

@type message_data :: binary() | Jason.Encoder.t()

@type t :: %__MODULE__{
id: String.t() | nil,
attributes: Map.t(),
data: any,
data: message_data(),
ack_id: String.t() | nil,
publish_time: String.t() | nil
}

defstruct id: nil, attributes: %{}, data: nil, ack_id: nil, publish_time: nil

@spec publish(binary, binary) :: {:ok, t} | Error.t()
def publish(message, topic) when is_binary(message) and is_binary(topic) do
publish(%__MODULE__{data: message}, %Topic{name: topic})
@spec publish(
Kane.t(),
messages :: message_data() | t() | [t()],
topic :: String.t() | Topic.t()
) :: {:ok, t() | [t()]} | Client.error()
def publish(kane, message, topic) when is_binary(topic) do
publish(kane, %__MODULE__{data: message}, %Topic{name: topic})
end

@spec publish(t, Topic.t()) :: {:ok, t} | Error.t()
def publish(%__MODULE__{} = message, %Topic{} = topic) do
case publish([message], topic) do
{:ok, [message | _]} -> {:ok, message}
def publish(kane, %__MODULE__{} = message, topic) do
case publish(kane, [message], topic) do
{:ok, [message]} -> {:ok, message}
err -> err
end
end

@spec publish([t], Topic.t()) :: {:ok, [t]} | Error.t()
def publish(messages, %Topic{name: topic}) when is_list(messages) do
case Kane.Client.post(path(topic), data(messages)) do
def publish(%Kane{project_id: project_id} = kane, messages, %Topic{name: topic_name})
when is_list(messages) do
publish_path = "projects/#{project_id}/topics/#{Topic.strip!(project_id, topic_name)}:publish"
data = publish_data(messages)

case Client.post(kane, publish_path, data) do
{:ok, body, _code} ->
collected =
body
|> Jason.decode!()
|> Map.get("messageIds")
|> Stream.with_index()
|> Enum.map(fn {id, i} ->
%{Enum.at(messages, i) | id: id}
|> Enum.zip(messages)
|> Enum.map(fn {id, message} ->
%{message | id: id}
end)

{:ok, collected}
Expand All @@ -45,41 +53,23 @@ defmodule Kane.Message do
end
end

@spec data(t) :: map
def data(%__MODULE__{} = message), do: data([message])

@spec data([t]) :: map
def data(messages) when is_list(messages) do
defp publish_data(messages) do
%{
"messages" =>
Enum.map(messages, fn %__MODULE__{data: d, attributes: a} ->
%{
"data" => encode_body(d),
"attributes" =>
Enum.reduce(a, %{}, fn {key, val}, map ->
Map.put(map, key, val)
end)
"attributes" => Map.new(a)
}
end)
}
end

@spec encode_body(any) :: binary
def encode_body(body) when is_binary(body), do: Base.encode64(body)
def encode_body(body), do: body |> Jason.encode!() |> encode_body

def json(%__MODULE__{} = message), do: json([message])

def json(messages) when is_list(messages) do
data(messages) |> Jason.encode!()
end

def from_subscription!(%{} = data) do
{:ok, message} = from_subscription(data)
message
end
defp encode_body(body) when is_binary(body), do: Base.encode64(body)
defp encode_body(body), do: body |> Jason.encode!() |> encode_body

def from_subscription(%{
@spec from_subscription!(payload :: map()) :: t()
def from_subscription!(%{
"ackId" => ack,
"message" => %{"publishTime" => time, "messageId" => id} = message
}) do
Expand All @@ -91,20 +81,12 @@ defmodule Kane.Message do
actual_data -> Base.decode64!(actual_data)
end

{:ok,
%__MODULE__{
id: id,
publish_time: time,
ack_id: ack,
data: data,
attributes: attr
}}
end

defp path(%Topic{name: topic}), do: path(topic)

defp path(topic) do
{:ok, project} = Goth.Config.get(:project_id)
"projects/#{project}/topics/#{Topic.strip!(topic)}:publish"
%__MODULE__{
id: id,
publish_time: time,
ack_id: ack,
data: data,
attributes: attr
}
end
end
Loading

0 comments on commit 08aca7e

Please sign in to comment.