-SELECT
500, 500 thousand, and 500 million rows (original)
-
-
-$ MIX_ENV=bench mix run bench/stream.exs
-
-This benchmark is based on https://github.com/ClickHouse/ch-bench
-
-Operating System: macOS
-CPU Information: Apple M1
-Number of Available Cores: 8
-Available memory: 8 GB
-Elixir 1.14.4
-Erlang 25.3
-
-Benchmark suite executing with the following configuration:
-warmup: 2 s
-time: 5 s
-memory time: 0 ns
-reduction time: 0 ns
-parallel: 1
-inputs: 500 rows, 500_000 rows, 500_000_000 rows
-Estimated total run time: 1.05 min
-
-Benchmarking stream with decode with input 500 rows ...
-Benchmarking stream with decode with input 500_000 rows ...
-Benchmarking stream with decode with input 500_000_000 rows ...
-Benchmarking stream with manual decode with input 500 rows ...
-Benchmarking stream with manual decode with input 500_000 rows ...
-Benchmarking stream with manual decode with input 500_000_000 rows ...
-Benchmarking stream without decode with input 500 rows ...
-Benchmarking stream without decode with input 500_000 rows ...
-Benchmarking stream without decode with input 500_000_000 rows ...
-
-##### With input 500 rows #####
-Name ips average deviation median 99th %
-stream with decode 4.69 K 213.34 μs ±12.49% 211.38 μs 290.94 μs
-stream with manual decode 4.69 K 213.43 μs ±17.40% 210.96 μs 298.75 μs
-stream without decode 4.65 K 215.08 μs ±10.79% 213.79 μs 284.66 μs
-
-Comparison:
-stream with decode 4.69 K
-stream with manual decode 4.69 K - 1.00x slower +0.0838 μs
-stream without decode 4.65 K - 1.01x slower +1.74 μs
-
-##### With input 500_000 rows #####
-Name ips average deviation median 99th %
-stream without decode 234.58 4.26 ms ±13.99% 4.04 ms 5.95 ms
-stream with manual decode 64.26 15.56 ms ±8.36% 15.86 ms 17.97 ms
-stream with decode 41.03 24.37 ms ±6.27% 24.39 ms 26.60 ms
-
-Comparison:
-stream without decode 234.58
-stream with manual decode 64.26 - 3.65x slower +11.30 ms
-stream with decode 41.03 - 5.72x slower +20.11 ms
-
-##### With input 500_000_000 rows #####
-Name ips average deviation median 99th %
-stream without decode 0.32 3.17 s ±0.20% 3.17 s 3.17 s
-stream with manual decode 0.0891 11.23 s ±0.00% 11.23 s 11.23 s
-stream with decode 0.0462 21.66 s ±0.00% 21.66 s 21.66 s
-
-Comparison:
-stream without decode 0.32
-stream with manual decode 0.0891 - 3.55x slower +8.06 s
-stream with decode 0.0462 - 6.84x slower +18.50 s
-
-
-
-
-[CI Results](https://github.com/plausible/ch/actions/workflows/bench.yml) (click the latest workflow run and scroll down to "Artifacts")
+Please see [CI Results](https://github.com/plausible/ch/actions/workflows/bench.yml) (make sure to click the latest workflow run and scroll down to "Artifacts") for [some of our benchmarks.](./bench/)
diff --git a/bench/buffer.exs b/bench/buffer.exs
new file mode 100644
index 0000000..e69de29
diff --git a/bench/decode.exs b/bench/decode.exs
new file mode 100644
index 0000000..e69de29
diff --git a/bench/encode.exs b/bench/encode.exs
new file mode 100644
index 0000000..f6f01c4
--- /dev/null
+++ b/bench/encode.exs
@@ -0,0 +1,55 @@
+IO.puts("This benchmark is based on https://github.com/ClickHouse/clickhouse-go#benchmark\n")
+
+port = String.to_integer(System.get_env("CH_PORT") || "8123")
+hostname = System.get_env("CH_HOSTNAME") || "localhost"
+scheme = System.get_env("CH_SCHEME") || "http"
+database = System.get_env("CH_DATABASE") || "ch_bench"
+
+{:ok, conn} = Ch.start_link(scheme: scheme, hostname: hostname, port: port)
+Ch.query!(conn, "CREATE DATABASE IF NOT EXISTS {$0:Identifier}", [database])
+
+Ch.query!(conn, """
+CREATE TABLE IF NOT EXISTS #{database}.benchmark (
+ col1 UInt64,
+ col2 String,
+ col3 Array(UInt8),
+ col4 DateTime
+) Engine Null
+""")
+
+types = [Ch.Types.u64(), Ch.Types.string(), Ch.Types.array(Ch.Types.u8()), Ch.Types.datetime()]
+statement = "INSERT INTO #{database}.benchmark FORMAT RowBinary"
+
+rows = fn count ->
+ Enum.map(1..count, fn i ->
+ [i, "Golang SQL database driver", [1, 2, 3, 4, 5, 6, 7, 8, 9], NaiveDateTime.utc_now()]
+ end)
+end
+
+alias Ch.RowBinary
+
+Benchee.run(
+ %{
+ # "control" => fn rows -> Enum.each(rows, fn _row -> :ok end) end,
+ "encode" => fn rows -> RowBinary.encode_rows(rows, types) end,
+ "insert" => fn rows -> Ch.query!(conn, statement, rows, types: types) end,
+ # "control stream" => fn rows -> rows |> Stream.chunk_every(60_000) |> Stream.run() end,
+ "encode stream" => fn rows ->
+ rows
+ |> Stream.chunk_every(60_000)
+ |> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, types) end)
+ |> Stream.run()
+ end,
+ "insert stream" => fn rows ->
+ stream =
+ rows
+ |> Stream.chunk_every(60_000)
+ |> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, types) end)
+
+ Ch.query!(conn, statement, stream, encode: false)
+ end
+ },
+ inputs: %{
+ "1_000_000 rows" => rows.(1_000_000)
+ }
+)
diff --git a/bench/http.exs b/bench/http.exs
new file mode 100644
index 0000000..e69de29
diff --git a/bench/insert.exs b/bench/insert.exs
index f6f01c4..9f06704 100644
--- a/bench/insert.exs
+++ b/bench/insert.exs
@@ -31,15 +31,10 @@ alias Ch.RowBinary
Benchee.run(
%{
# "control" => fn rows -> Enum.each(rows, fn _row -> :ok end) end,
- "encode" => fn rows -> RowBinary.encode_rows(rows, types) end,
+
"insert" => fn rows -> Ch.query!(conn, statement, rows, types: types) end,
# "control stream" => fn rows -> rows |> Stream.chunk_every(60_000) |> Stream.run() end,
- "encode stream" => fn rows ->
- rows
- |> Stream.chunk_every(60_000)
- |> Stream.map(fn chunk -> RowBinary.encode_rows(chunk, types) end)
- |> Stream.run()
- end,
+
"insert stream" => fn rows ->
stream =
rows
diff --git a/bench/native.exs b/bench/native.exs
new file mode 100644
index 0000000..e69de29
diff --git a/bench/types.exs b/bench/types.exs
new file mode 100644
index 0000000..e69de29
diff --git a/guides/in_memory_insert_buffer.md b/guides/in_memory_insert_buffer.md
new file mode 100644
index 0000000..3d73602
--- /dev/null
+++ b/guides/in_memory_insert_buffer.md
@@ -0,0 +1 @@
+# In-memory INSERT buffer
diff --git a/guides/multinode.md b/guides/multinode.md
new file mode 100644
index 0000000..9d7bff9
--- /dev/null
+++ b/guides/multinode.md
@@ -0,0 +1,3 @@
+# Connecting to multiple nodes
+
+Similar to https://clickhouse.com/docs/en/integrations/go#connecting-to-multiple-nodes
diff --git a/guides/on_disk_insert_buffer.md b/guides/on_disk_insert_buffer.md
new file mode 100644
index 0000000..0301b03
--- /dev/null
+++ b/guides/on_disk_insert_buffer.md
@@ -0,0 +1,37 @@
+# On-disk INSERT buffer
+
+Here how you could do it
+
+```elixir
+defmodule WriteBuffer do
+ use GenServer
+
+ # 5 MB
+ max_buffer_size = 5_000_000
+
+ def insert(rows) do
+ row_binary = Ch.RowBinary.encode_many(rows, unquote(encoding_types))
+ GenServer.call(__MODULE__, {:buffer, row_binary})
+ end
+
+ def init(opts) do
+ {:ok, fd} = :file.open()
+ %{fd: fd, buffer_size: 0}
+ end
+
+ def handle_call({:buffer, row_binary}, _from, state) do
+ new_buffer_size = state.buffer_size + IO.iodata_length(row_binary)
+ :file.write(state.fd, row_binary)
+
+ if new_buffer_size < unquote(max_buffer_size) do
+ %{state | buffer_size: new_buffer_size}
+ else
+ flush(state)
+ end
+ end
+end
+```
+
+See [tests](../test/ch/on_disk_buffer_test.exs) for more.
+
+TODO: notes on using it in docker and "surviving" restarts
diff --git a/lib/ch.ex b/lib/ch.ex
index ad4217f..4643fb1 100644
--- a/lib/ch.ex
+++ b/lib/ch.ex
@@ -7,14 +7,13 @@ defmodule Ch do
| {:username, String.t()}
| {:password, String.t()}
| {:settings, Keyword.t()}
- | {:timeout, timeout}
@type start_option ::
common_option
| {:scheme, String.t()}
| {:hostname, String.t()}
| {:port, :inet.port_number()}
- | {:transport_opts, :gen_tcp.connect_option()}
+ | {:transport_opts, [:gen_tcp.connect_option() | :ssl.tls_client_option()]}
| DBConnection.start_option()
@doc """
@@ -29,9 +28,7 @@ defmodule Ch do
* `:database` - Database, defaults to `"default"`
* `:username` - Username
* `:password` - User password
- * `:settings` - Keyword list of ClickHouse settings
- * `:timeout` - HTTP receive timeout in milliseconds
- * `:transport_opts` - options to be given to the transport being used. See `Mint.HTTP1.connect/4` for more info
+ * `:settings` - Keyword list of ClickHouse settings to send wtih every query
* [`DBConnection.start_option()`](https://hexdocs.pm/db_connection/DBConnection.html#t:start_option/0)
"""
@@ -50,16 +47,18 @@ defmodule Ch do
DBConnection.child_spec(Connection, opts)
end
+ @type query :: iodata
+
@type query_option ::
common_option
| {:command, Ch.Query.command()}
| {:headers, [{String.t(), String.t()}]}
- | {:format, String.t()}
- # TODO remove
- | {:encode, boolean}
- | {:decode, boolean}
| DBConnection.connection_option()
+ @type query_params ::
+ %{(name :: String.t()) => value :: term}
+ | [{name :: String.t(), value :: term}]
+
@doc """
Runs a query and returns the result as `{:ok, %Ch.Result{}}` or
`{:error, Exception.t()}` if there was a database error.
@@ -69,18 +68,14 @@ defmodule Ch do
* `:database` - Database
* `:username` - Username
* `:password` - User password
- * `:settings` - Keyword list of settings
- * `:timeout` - Query request timeout
- * `:command` - Command tag for the query
+ * `:settings` - Keyword list of settings to merge with `:settings` from `start_link` and send with this query
+ * `:command` - Command tag for the query like `:insert` or `:select`, to avoid extracting it from SQL. Used in some Ecto.Repo `:telemetry` events
* `:headers` - Custom HTTP headers for the request
- * `:format` - Custom response format for the request
- * `:decode` - Whether to automatically decode the response
* [`DBConnection.connection_option()`](https://hexdocs.pm/db_connection/DBConnection.html#t:connection_option/0)
"""
- @spec query(DBConnection.conn(), iodata, params, [query_option]) ::
+ @spec query(DBConnection.conn(), query, query_params, [query_option]) ::
{:ok, Result.t()} | {:error, Exception.t()}
- when params: map | [term] | [row :: [term]] | iodata | Enumerable.t()
def query(conn, statement, params \\ [], opts \\ []) do
query = Query.build(statement, opts)
@@ -93,27 +88,20 @@ defmodule Ch do
Runs a query and returns the result or raises `Ch.Error` if
there was an error. See `query/4`.
"""
- @spec query!(DBConnection.conn(), iodata, params, [query_option]) :: Result.t()
- when params: map | [term] | [row :: [term]] | iodata | Enumerable.t()
+ @spec query!(DBConnection.conn(), query, query_params, [query_option]) :: Result.t()
def query!(conn, statement, params \\ [], opts \\ []) do
query = Query.build(statement, opts)
DBConnection.execute!(conn, query, params, opts)
end
@doc false
- @spec stream(DBConnection.t(), iodata, map | [term], [query_option]) :: Ch.Stream.t()
+ @spec stream(DBConnection.t(), query, query_params, [query_option]) :: Ch.Stream.t()
def stream(conn, statement, params \\ [], opts \\ []) do
query = Query.build(statement, opts)
%Ch.Stream{conn: conn, query: query, params: params, opts: opts}
end
- # TODO drop
- @doc false
- @spec run(DBConnection.conn(), (DBConnection.t() -> any), Keyword.t()) :: any
- def run(conn, f, opts \\ []) when is_function(f, 1) do
- DBConnection.run(conn, f, opts)
- end
-
+ # TODO need it?
if Code.ensure_loaded?(Ecto.ParameterizedType) do
@behaviour Ecto.ParameterizedType
diff --git a/lib/ch/connection.ex b/lib/ch/connection.ex
index 3742f47..e5ddef8 100644
--- a/lib/ch/connection.ex
+++ b/lib/ch/connection.ex
@@ -20,7 +20,6 @@ defmodule Ch.Connection do
with {:ok, conn} <- HTTP.connect(scheme, address, port, mint_opts) do
conn =
conn
- |> HTTP.put_private(:timeout, opts[:timeout] || :timer.seconds(15))
|> maybe_put_private(:database, opts[:database])
|> maybe_put_private(:username, opts[:username])
|> maybe_put_private(:password, opts[:password])
diff --git a/lib/ch/error.ex b/lib/ch/error.ex
index 9b427ee..5c2f313 100644
--- a/lib/ch/error.ex
+++ b/lib/ch/error.ex
@@ -1,5 +1,5 @@
defmodule Ch.Error do
@moduledoc "Error struct wrapping ClickHouse error responses."
defexception [:code, :message]
- @type t :: %__MODULE__{code: pos_integer | nil, message: String.t()}
+ @type t :: %__MODULE__{code: pos_integer | nil, message: binary}
end
diff --git a/lib/ch/query.ex b/lib/ch/query.ex
index 4aaf5bb..ff3d513 100644
--- a/lib/ch/query.ex
+++ b/lib/ch/query.ex
@@ -1,16 +1,13 @@
defmodule Ch.Query do
@moduledoc "Query struct wrapping the SQL statement."
- defstruct [:statement, :command, :encode, :decode]
-
- @type t :: %__MODULE__{statement: iodata, command: command, encode: boolean, decode: boolean}
+ defstruct [:statement, :command]
+ @type t :: %__MODULE__{statement: iodata, command: command}
@doc false
@spec build(iodata, [Ch.query_option()]) :: t
def build(statement, opts \\ []) do
command = Keyword.get(opts, :command) || extract_command(statement)
- encode = Keyword.get(opts, :encode, true)
- decode = Keyword.get(opts, :decode, true)
- %__MODULE__{statement: statement, command: command, encode: encode, decode: decode}
+ %__MODULE__{statement: statement, command: command}
end
statements = [
diff --git a/lib/ch/result.ex b/lib/ch/result.ex
index db75fc6..86dfd0d 100644
--- a/lib/ch/result.ex
+++ b/lib/ch/result.ex
@@ -2,10 +2,9 @@ defmodule Ch.Result do
@moduledoc """
Result struct returned from any successful query. Its fields are:
- * `command` - An atom of the query command, for example: `:select`, `:insert`;
+ * `command` - An atom of the query command, for example: `:select`, `:insert`
* `rows` - A list of lists, each inner list corresponding to a row, each element in the inner list corresponds to a column
- * `num_rows` - The number of fetched or affected rows;
- * `headers` - The HTTP response headers
+ * `num_rows` - The number of fetched or affected rows
* `data` - The raw iodata from the response
"""
@@ -14,8 +13,7 @@ defmodule Ch.Result do
@type t :: %__MODULE__{
command: Ch.Query.command(),
num_rows: non_neg_integer | nil,
- rows: [[term]] | iodata | nil,
- headers: Mint.Types.headers(),
+ rows: [[term]] | nil,
data: iodata
}
end
diff --git a/lib/ch/row_binary.ex b/lib/ch/row_binary.ex
index d2081b7..afa1d05 100644
--- a/lib/ch/row_binary.ex
+++ b/lib/ch/row_binary.ex
@@ -1,6 +1,8 @@
defmodule Ch.RowBinary do
@moduledoc "Helpers for working with ClickHouse [`RowBinary`](https://clickhouse.com/docs/en/sql-reference/formats#rowbinary) format."
+ # TODO cleanup
+
# @compile {:bin_opt_info, true}
@dialyzer :no_improper_lists
diff --git a/lib/ch/stream.ex b/lib/ch/stream.ex
index 9ec8b5f..ae2782f 100644
--- a/lib/ch/stream.ex
+++ b/lib/ch/stream.ex
@@ -8,7 +8,7 @@ defmodule Ch.Stream do
conn: DBConnection.conn(),
ref: Mint.Types.request_ref() | nil,
query: Ch.Query.t(),
- params: term,
+ params: Ch.query_params(),
opts: [Ch.query_option()]
}
@@ -24,6 +24,7 @@ defmodule Ch.Stream do
def slice(_), do: {:error, __MODULE__}
end
+ # TODO optimize
defimpl Collectable do
def into(stream) do
%Ch.Stream{conn: conn, query: query, params: params, opts: opts} = stream
diff --git a/lib/ch/types.ex b/lib/ch/types.ex
index c9e7731..a877435 100644
--- a/lib/ch/types.ex
+++ b/lib/ch/types.ex
@@ -3,6 +3,8 @@ defmodule Ch.Types do
Helpers to turn ClickHouse types into Elixir terms for easier processing.
"""
+ # TODO cleanup
+
types =
[
{_encoded = "String", _decoded = :string, _args = []},
diff --git a/mix.exs b/mix.exs
index 1ac899c..5357beb 100644
--- a/mix.exs
+++ b/mix.exs
@@ -2,7 +2,7 @@ defmodule Ch.MixProject do
use Mix.Project
@source_url "https://github.com/plausible/ch"
- @version "0.2.8"
+ @version "0.3.0"
def project do
[
@@ -12,7 +12,7 @@ defmodule Ch.MixProject do
elixirc_paths: elixirc_paths(Mix.env()),
deps: deps(),
name: "Ch",
- description: "HTTP ClickHouse driver for Elixir",
+ description: "ClickHouse driver for Elixir",
docs: docs(),
package: package(),
source_url: @source_url
@@ -33,9 +33,9 @@ defmodule Ch.MixProject do
# Run "mix help deps" to learn about dependencies.
defp deps do
[
- {:mint, "~> 1.0"},
+ {:mint, "~> 1.0", optional: true},
{:db_connection, "~> 2.0"},
- {:jason, "~> 1.0"},
+ {:jason, "~> 1.0", optional: true},
{:decimal, "~> 2.0"},
{:ecto, "~> 3.12", optional: true},
{:benchee, "~> 1.0", only: [:bench]},
diff --git a/test/test_helper.exs b/test/test_helper.exs
index 5685699..325ab17 100644
--- a/test/test_helper.exs
+++ b/test/test_helper.exs
@@ -1,8 +1,8 @@
Calendar.put_time_zone_database(Tz.TimeZoneDatabase)
default_test_db = System.get_env("CH_DATABASE", "ch_elixir_test")
-{:ok, _} = Ch.Test.sql_exec("DROP DATABASE IF EXISTS #{default_test_db}")
-{:ok, _} = Ch.Test.sql_exec("CREATE DATABASE #{default_test_db}")
+Ch.HTTP.query!("DROP DATABASE IF EXISTS {db:Identifier}", %{"db" => default_test_db})
+Ch.HTTP.query!("CREATE DATABASE {db:Identifier}}", %{"db" => default_test_db})
Application.put_env(:ch, :database, default_test_db)
ExUnit.start(exclude: [:slow])