Skip to content

Commit

Permalink
substantial opts work around dynamo query and scan
Browse files Browse the repository at this point in the history
  • Loading branch information
benwilson512 committed May 4, 2015
1 parent 999bec4 commit 2bd5609
Show file tree
Hide file tree
Showing 10 changed files with 140 additions and 66 deletions.
1 change: 1 addition & 0 deletions config/dev.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use Mix.Config

config :ex_aws,
debug_requests: true,
access_key_id: System.get_env("AWS_ACCESS_KEY_ID"),
secret_access_key: System.get_env("AWS_SECRET_ACCESS_KEY")
10 changes: 5 additions & 5 deletions lib/ex_aws/dynamo/client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,14 @@ defmodule ExAws.Dynamo.Client do
key_definitions :: [%{}],
read_capacity :: pos_integer,
write_capacity :: pos_integer,
global_indexes :: %{},
local_indexes :: %{}) :: ExAws.Request.response_t
global_indexes :: Keyword.t,
local_indexes :: Keyword.t) :: ExAws.Request.response_t

@doc "Describe table"
defcallback describe_table(name :: binary) :: ExAws.Request.response_t

@doc "Update Table"
defcallback update_table(name :: binary, attributes :: %{}) :: ExAws.Request.response_t
defcallback update_table(name :: binary, attributes :: Keyword.t) :: ExAws.Request.response_t

@doc "Delete Table"
defcallback delete_table(table :: binary) :: ExAws.Request.response_t
Expand All @@ -121,8 +121,8 @@ defmodule ExAws.Dynamo.Client do
defcallback stream_scan(table_name :: binary, opts :: Keyword.t) :: ExAws.Request.response_t

@doc "Query Table"
defcallback query(table_name :: binary, key_conditions :: %{}) :: ExAws.Request.response_t
defcallback query(table_name :: binary, key_conditions :: %{}, opts :: Keyword.t) :: ExAws.Request.response_t
defcallback query(table_name :: binary) :: ExAws.Request.response_t
defcallback query(table_name :: binary, opts :: Keyword.t) :: ExAws.Request.response_t

@doc """
Get up to 100 items (16mb)
Expand Down
14 changes: 7 additions & 7 deletions lib/ex_aws/dynamo/decoder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ defmodule ExAws.Dynamo.Decoder do
Use these if you just want the dynamo result to look more like elixir without
coercing it into a particular struct.
"""
def decode(%{"S" => "TRUE"}), do: true
def decode(%{"S" => "FALSE"}), do: false
def decode(%{"BOOL" => true}), do: true
def decode(%{"BOOL" => false}), do: false
def decode(%{"S" => "TRUE"}), do: true
def decode(%{"S" => "FALSE"}), do: false
def decode(%{"BOOL" => true}), do: true
def decode(%{"BOOL" => false}), do: false
def decode(%{"BOOL" => "true"}), do: true
def decode(%{"BOOL" => "false"}), do: false
def decode(%{"B" => value}), do: value
def decode(%{"S" => value}), do: value
def decode(%{"M" => value}), do: value |> decode
def decode(%{"B" => value}), do: value
def decode(%{"S" => value}), do: value
def decode(%{"M" => value}), do: value |> decode
def decode(%{"N" => value}) when is_binary(value), do: binary_to_number(value)
def decode(%{"N" => value}) when value |> is_integer or value |> is_float, do: value
def decode(item = %{}) do
Expand Down
33 changes: 33 additions & 0 deletions lib/ex_aws/dynamo/encoder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,47 @@ defmodule ExAws.Dynamo.Encoder do
This is handled via the ExAws.Dynamo.Encodable protocol.
"""

# These functions exist to ensure that encoding is idempotent.
def encode(%{"B" => _} = val), do: val
def encode(%{"BOOL" => _} = val), do: val
def encode(%{"BS" => _} = val), do: val
def encode(%{"L" => _} = val), do: val
def encode(%{"M" => _} = val), do: val
def encode(%{"NS" => _} = val), do: val
def encode(%{"NULL" => _} = val), do: val
def encode(%{"N" => _} = val), do: val
def encode(%{"S" => _} = val), do: val
def encode(%{"SS" => _} = val), do: val

def encode(value) do
ExAws.Dynamo.Encodable.encode(value)
end

# Use this in case you want to encode something already in dynamo format
# For some reason I cannot fathom. If you find yourself using this, please open an issue
# so I can find out why and better support this.
def encode!(value) do
ExAws.Dynamo.Encodable.encode(value)
end

def encode_flat(value) do
case ExAws.Dynamo.Encodable.encode(value) do
%{"M" => value} -> value
%{"L" => value} -> value
end
end

def atom_to_dynamo_type(:blob), do: "B"
def atom_to_dynamo_type(:boolean), do: "BOOL"
def atom_to_dynamo_type(:blob_set), do: "BS"
def atom_to_dynamo_type(:list), do: "L"
def atom_to_dynamo_type(:map), do: "M"
def atom_to_dynamo_type(:number_set), do: "NS"
def atom_to_dynamo_type(:null), do: "NULL"
def atom_to_dynamo_type(:number), do: "N"
def atom_to_dynamo_type(:string), do: "S"
def atom_to_dynamo_type(:string_set), do: "SS"
def atom_to_dynamo_type(value) do
raise ArgumentError, "Unknown dynamo type for value: #{inspect value}"
end
end
115 changes: 74 additions & 41 deletions lib/ex_aws/dynamo/impl.ex
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
defmodule ExAws.Dynamo.Impl do
alias ExAws.Dynamo
import ExAws.Utils, only: [camelize_opts: 1]
import ExAws.Utils, only: [camelize_keys: 1, camelize_keys: 2]
use ExAws.Actions


defdelegate stream_scan(client, name), to: ExAws.Dynamo.Lazy
defdelegate stream_scan(client, name, opts), to: ExAws.Dynamo.Lazy

Expand Down Expand Up @@ -34,42 +35,46 @@ defmodule ExAws.Dynamo.Impl do
client.request(%{}, :list_tables)
end

def create_table(client, name, primary_key, key_definitions, read_capacity, write_capacity) do
def create_table(client, name, primary_key, key_definitions, read_capacity, write_capacity) when is_atom(primary_key) do
key_schema = [%{
AttributeName: primary_key,
KeyType: "HASH"
attribute_name: primary_key,
key_type: "HASH"
}]
create_table(client, name, key_schema, key_definitions, read_capacity, write_capacity, [], [])
create_table(client, name, key_schema, key_definitions, read_capacity, write_capacity)
end

def create_table(client, name, key_schema, key_definitions, read_capacity, write_capacity) when is_list(key_schema) do
create_table(client, name, key_schema, key_definitions, read_capacity, write_capacity)
end

def create_table(client, name, key_schema, key_definitions, read_capacity, write_capacity, global_indexes, local_indexes) do
data = %{
TableName: name,
AttributeDefinitions: key_definitions |> encode_attrs,
KeySchema: key_schema,
ProvisionedThroughput: %{
ReadCapacityUnits: read_capacity,
WriteCapacityUnits: write_capacity
"TableName" => name,
"AttributeDefinitions" => key_definitions |> encode_key_definitions,
"KeySchema" => key_schema |> camelize_keys,
"ProvisionedThroughput" => %{
"ReadCapacityUnits" => read_capacity,
"WriteCapacityUnits" => write_capacity
}
}
[GlobalSecondaryIndexes: global_indexes, LocalSecondaryIndexes: local_indexes]
|> Enum.reduce(data, fn
%{
"GlobalSecondaryIndexes" => global_indexes |> camelize_keys(deep: true),
"LocalSecondaryIndexes" => local_indexes |> camelize_keys(deep: true)
} |> Enum.reduce(data, fn
{_, []}, data -> data
{name, indices}, data -> Map.put(data, name, Enum.into(indices, %{}))
end)
|> client.request(:create_table)
end

@doc "Describe table"
def describe_table(client, name) do
%{TableName: name}
%{"TableName" => name}
|> client.request(:describe_table)
end

@doc "Update Table"
def update_table(client, name, attributes) do
%{TableName: name}
|> Map.merge(camelize_opts(attributes))
%{"TableName" => name}
|> Map.merge(camelize_keys(attributes, deep: true))
|> client.request(:update_table)
end

Expand All @@ -80,27 +85,45 @@ defmodule ExAws.Dynamo.Impl do

## Records
######################

def scan(client, name, opts \\ []) do
%{TableName: name}
|> Map.merge(camelize_opts(opts))
opts = opts |> Enum.into(%{})
special_opts = [:exclusive_start_key, :expression_attribute_values]

regular_opts = opts
|> Map.drop(special_opts)
|> camelize_keys

%{"TableName" => name}
|> build_exclusive_start_key(opts)
|> build_expression_attribute_values(opts)
|> Map.merge(regular_opts)
|> client.request(:scan)
end

def query(client, name, key_conditions, opts \\ []) do
%{TableName: name, KeyConditions: key_conditions}
|> Map.merge(camelize_opts(opts))
def query(client, name, opts \\ []) do
opts = opts |> Enum.into(%{})
special_opts = [:exclusive_start_key, :expression_attribute_values]

regular_opts = opts
|> Map.drop(special_opts)
|> camelize_keys

%{"TableName" => name}
|> build_exclusive_start_key(opts)
|> build_expression_attribute_values(opts)
|> Map.merge(regular_opts)
|> client.request(:query)
end

def batch_get_item(client, data) do
data
client.request(data, :batch_get_item)
end

def put_item(client, name, record) do
%{
TableName: name,
Item: Dynamo.Encoder.encode(record)
"TableName" => name,
"Item" => Dynamo.Encoder.encode(record)
} |> client.request(:put_item)
end

Expand All @@ -110,16 +133,16 @@ defmodule ExAws.Dynamo.Impl do

def get_item(client, name, primary_key) do
%{
TableName: name,
Key: Dynamo.Encoder.encode_flat(primary_key)
"TableName" => name,
"Key" => Dynamo.Encoder.encode_flat(primary_key)
}
|> client.request(:get_item)
end

def get_item!(client, name, primary_key) do
{:ok, %{"Item" => item}} = %{
TableName: name,
Key: Dynamo.Encoder.encode_flat(primary_key)
"TableName" => name,
"Key" => Dynamo.Encoder.encode_flat(primary_key)
}
|> client.request(:get_item)

Expand All @@ -128,29 +151,39 @@ defmodule ExAws.Dynamo.Impl do

def update_item(client, table_name, primary_key, update_args) do
%{
TableName: table_name,
Key: Dynamo.Encoder.encode_flat(primary_key)
"TableName" => table_name,
"Key" => Dynamo.Encoder.encode_flat(primary_key)
}
|> Map.merge(camelize_opts(update_args))
|> Map.merge(camelize_keys(update_args))
|> client.request(:update_item)
end

def delete_item(client, name, primary_key) do
%{TableName: name, Key: primary_key}
%{"TableName" => name, Key: primary_key}
|> client.request(:delete_item)
end

defp encode_attrs(attrs) do
defp encode_key_definitions(attrs) do
attrs |> Enum.map(fn({name, type}) ->
%{AttributeName: name, AttributeType: type |> atom_to_dynamo_type}
%{"AttributeName" => name, "AttributeType" => type |> Dynamo.Encoder.atom_to_dynamo_type}
end)
end

defp atom_to_dynamo_type(:string), do: "S"
defp atom_to_dynamo_type(:integer), do: "N"
defp atom_to_dynamo_type(:float), do: "N"
defp atom_to_dynamo_type(value) do
raise ArgumentError, "Unknown dynamo type for value: #{inspect value}"
# Expects exclusive_start_key shape like
defp build_exclusive_start_key(data, %{exclusive_start_key: start_key}) do
Map.put(data, "ExclusiveStartKey", start_key |> encode_values)
end
defp build_exclusive_start_key(data, _), do: data

defp build_expression_attribute_values(data, %{expression_attribute_values: values}) do
Map.put(data, "ExpressionAttributeValues", values |> encode_values)
end
defp build_expression_attribute_values(data, _), do: data

defp encode_values(map) do
Enum.reduce(map, %{}, fn {attr, value}, attribute_values ->
Map.put(attribute_values, attr, Dynamo.Encoder.encode(value))
end)
end

end
6 changes: 3 additions & 3 deletions lib/ex_aws/dynamo/lazy.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ defmodule ExAws.Dynamo.Lazy do
@moduledoc false
## Implimentation of the lazy functions surfaced by ExAws.Dynamo.Client

def stream_scan(client, table, opts) do
def stream_scan(client, table, opts \\ []) do
request_fun = fn
{:initial, initial} -> initial
fun_opts -> ExAws.Dynamo.Impl.scan(client, table, Map.merge(opts, fun_opts))
fun_opts -> ExAws.Dynamo.Impl.scan(client, table, Keyword.merge(opts, fun_opts |> Enum.to_list))
end

client
Expand All @@ -30,7 +30,7 @@ defmodule ExAws.Dynamo.Lazy do
{:error, items} -> {[{:error, items}], :quit}

{:ok, %{"Items" => items, "LastEvaluatedKey" => key}} ->
{items, {fun, %{ExclusiveStartKey: key}}}
{items, {fun, %{exclusive_start_key: key}}}

{:ok, %{"Items" => items}} ->
{items, :quit}
Expand Down
12 changes: 6 additions & 6 deletions lib/ex_aws/kinesis/impl.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
defmodule ExAws.Kinesis.Impl do
use ExAws.Actions
import ExAws.Utils, only: [camelize_opts: 1]
import ExAws.Utils, only: [camelize_keys: 1]
require Logger

defdelegate stream_shards(client, name), to: ExAws.Kinesis.Lazy
Expand Down Expand Up @@ -40,7 +40,7 @@ defmodule ExAws.Kinesis.Impl do

def describe_stream(client, name, opts \\ []) do
%{StreamName: name}
|> Map.merge(camelize_opts(opts))
|> Map.merge(camelize_keys(opts))
|> client.request(:describe_stream)
end

Expand All @@ -62,7 +62,7 @@ defmodule ExAws.Kinesis.Impl do

def get_records(client, shard_iterator, opts \\ []) do
%{ShardIterator: shard_iterator}
|> Map.merge(camelize_opts(opts))
|> Map.merge(camelize_keys(opts))
|> client.request(:get_records)
|> do_get_records
end
Expand Down Expand Up @@ -91,7 +91,7 @@ defmodule ExAws.Kinesis.Impl do
PartitionKey: partition_key,
StreamName: stream_name
}
|> Map.merge(camelize_opts(opts))
|> Map.merge(camelize_keys(opts))
|> client.request(:put_record)
end

Expand Down Expand Up @@ -119,7 +119,7 @@ defmodule ExAws.Kinesis.Impl do
StreamName: name,
ShardId: shard_id,
ShardIteratorType: shard_iterator_type
} |> Map.merge(camelize_opts(opts))
} |> Map.merge(camelize_keys(opts))
|> client.request(:get_shard_iterator)
end

Expand Down Expand Up @@ -151,7 +151,7 @@ defmodule ExAws.Kinesis.Impl do

def list_tags_for_stream(client, name, opts \\ []) do
%{StreamName: name}
|> Map.merge(camelize_opts(opts))
|> Map.merge(camelize_keys(opts))
|> client.request(:list_tags_for_stream)
end

Expand Down
4 changes: 2 additions & 2 deletions lib/ex_aws/lambda/impl.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
defmodule ExAws.Lambda.Impl do
use ExAws.Actions
import ExAws.Utils, only: [camelize_opts: 1]
import ExAws.Utils, only: [camelize_keys: 1]
require Logger

@moduledoc false
Expand Down Expand Up @@ -135,7 +135,7 @@ defmodule ExAws.Lambda.Impl do
defp normalize_opts(opts) do
opts
|> Enum.into(%{})
|> camelize_opts
|> camelize_keys
end

end
Loading

0 comments on commit 2bd5609

Please sign in to comment.