Skip to content

Commit

Permalink
Merge pull request #21 from whisklabs/feature/extra-connection-params
Browse files Browse the repository at this point in the history
Extra ClickHouse connection params support (#16)
  • Loading branch information
bryzgaloff authored May 31, 2020
2 parents 95444b7 + 47e829e commit cea9457
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 24 deletions.
64 changes: 55 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ Provides `ClickHouseHook` and `ClickHouseOperator` for [Apache Airflow][airflow]
4. Executed queries are logged in a pretty form.
5. Uses effective native ClickHouse TCP protocol thanks to
[clickhouse-driver][ch-driver-docs]. Does not support HTTP protocol.
6. Supports extra ClickHouse [connection parameters][ch-driver-connection] such
as various timeouts, `compression`, `secure`, etc through Airflow
[Connection.extra][airflow-conn-extra] property.

# Installation

Expand All @@ -28,13 +31,8 @@ To import `ClickHouseOperator` use:
Supported kwargs:
* `sql`: templated query (if argument is a single `str`) or queries (if iterable
of `str`'s).
* `clickhouse_conn_id`: connection id. Connection schema (all properties are
optional, defaults correspond to the default ClickHouse configuration):
* `host`, default: `localhost`;
* `port`, default: `9000` (default native ClickHouse protocol port);
* `database`, default: `default`;
* `user`, default: `default`;
* `password`, default: `''` (empty).
* `clickhouse_conn_id`: connection id. Connection schema is described
[below](#clickhouse-connection-schema).
* `parameters`: passed to clickhouse-driver [execute method][ch-driver-execute].
* If multiple queries are provided via `sql` then the parameters are passed to
_all_ of them.
Expand All @@ -51,7 +49,8 @@ To import `ClickHouseHook` use:
`from airflow.hooks.clickhouse_hook import ClickHouseHook`

Supported kwargs of constructor (`__init__` method):
* `clickhouse_conn_id`: connection id. See connection schema above.
* `clickhouse_conn_id`: connection id. Connection schema is described
[below](#clickhouse-connection-schema).
* `database`: if present, overrides database defined by connection.

Supports all of the methods of the Airflow [BaseHook][airflow-base-hook]
Expand All @@ -76,13 +75,55 @@ Supports all of the methods of the Airflow [BaseHook][airflow-base-hook]
* `get_conn()`: returns the underlying
[clickhouse_driver.Client][ch-driver-client] instance.

## ClickHouse Connection schema

[clickhouse_driver.Client][ch-driver-client] is initiated with attributes stored
in Airflow [Connection attributes][airflow-connection-attrs]. The mapping of
the attributes is listed below:

| Airflow Connection attribute | `Client.__init__` argument |
| --- | --- |
| `host` | `host` |
| `port` | `port` |
| `schema` | `database` |
| `login` | `user` |
| `password` | `password` |

If you pass `database` argument to `ClickHouseOperator` or `ClickHouseHook`
explicitly then it is passed to the `Client` instead of the `schema`
attribute of the Airflow connection.

### Extra arguments

You may also pass [additional arguments][ch-driver-connection], such as
timeouts, `compression`, `secure`, etc through
[Connection.extra][airflow-conn-extra] attribute. The attribute should
contain a JSON object which will be [deserialized][airflow-conn-dejson] and
all of its properties will be passed as-is to the `Client`.

For example, if Airflow connection contains `extra={"secure":true}` then
the `Client.__init__` will receive `secure=True` keyword argument in
addition to other non-empty connection attributes.

### Default values

If the Airflow connection attribute is not set then it is not passed to the
`Client` at all. In that case the default value of the corresponding
[clickhouse_driver.Connection][ch-driver-connection] argument is used (e.g.
`user` defaults to `'default'`).

This means that Airflow ClickHouse Plugin does not itself define any default
values for the ClickHouse connection. You may fully rely on default values
of the `clickhouse-driver` version you use. The only exception is `host`: if
the attribute of Airflow connection is not set then `'localhost'` is used.

## Examples

### ClickHouseOperator

```python
from airflow import DAG
from airflow.operators.clickhouse_plugin import ClickHouseOperator
from airflow.operators.clickhouse_operator import ClickHouseOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago

Expand Down Expand Up @@ -162,6 +203,7 @@ From the root project directory: `python -m unittest discover -s tests`
* Viktor Taranenko, [@viktortnk](https://github.com/viktortnk)
* Danila Ganchar, [@d-ganchar](https://github.com/d-ganchar)
* Mikhail, [@glader](https://github.com/glader)
* Alexander Chashnikov, [@ne1r0n](https://github.com/ne1r0n)


[airflow]: https://airflow.apache.org/
Expand All @@ -173,3 +215,7 @@ From the root project directory: `python -m unittest discover -s tests`
[ch-driver-execute-iter]: https://clickhouse-driver.readthedocs.io/en/latest/quickstart.html#streaming-results
[ch-driver-insert]: https://clickhouse-driver.readthedocs.io/en/latest/quickstart.html#inserting-data
[ch-driver-client]: https://clickhouse-driver.readthedocs.io/en/latest/api.html#client
[airflow-conn-extra]: https://airflow.apache.org/docs/stable/_api/airflow/models/connection/index.html#airflow.models.connection.Connection.extra
[ch-driver-connection]: https://clickhouse-driver.readthedocs.io/en/latest/api.html#connection
[airflow-connection-attrs]: https://airflow.apache.org/docs/1.10.6/_api/airflow/models/index.html?highlight=connection#airflow.models.Connection
[airflow-conn-dejson]: https://airflow.apache.org/docs/1.10.6/_api/airflow/models/index.html?highlight=connection#airflow.models.Connection.extra_dejson
2 changes: 1 addition & 1 deletion airflow_clickhouse_plugin/__version__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
__description__ = \
'airflow-clickhouse-plugin - ' \
'Airflow plugin to execute ClickHouse commands and queries'
__version__ = '0.5.5'
__version__ = '0.5.6'
__url__ = 'https://github.com/whisklabs/airflow-clickhouse-plugin'
__license__ = 'MIT License'
__author__ = 'Viktor Taranenko'
Expand Down
25 changes: 12 additions & 13 deletions airflow_clickhouse_plugin/hooks/clickhouse_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,18 @@ def __init__(

def get_conn(self) -> Client:
conn = self.get_connection(self.clickhouse_conn_id)
return self.create_connection(
host=conn.host or 'localhost',
port=int(conn.port) if conn.port else 9000,
user=conn.login or 'default',
password=conn.password or '',
database=self.database or conn.schema or 'default',
)

@classmethod
def create_connection(
cls, host: str, port: int, database: str, user: str, password: str
) -> Client:
return Client(host, port, database, user, password)
connection_kwargs = conn.extra_dejson.copy()
if conn.port:
connection_kwargs.update(port=int(conn.port))
if conn.login:
connection_kwargs.update(user=conn.login)
if conn.password:
connection_kwargs.update(password=conn.password)
if self.database:
connection_kwargs.update(database=self.database)
elif conn.schema:
connection_kwargs.update(database=conn.schema)
return Client(conn.host or 'localhost', **connection_kwargs)

def get_records(self, sql: str, parameters: dict = None) -> List[Tuple]:
self._log_query(sql, parameters)
Expand Down
141 changes: 140 additions & 1 deletion tests/test_hook.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
import json
from typing import Any, Dict
from unittest import TestCase, mock

import airflow.models
import clickhouse_driver.connection
import clickhouse_driver.defines
import clickhouse_driver.protocol
from clickhouse_driver.errors import ServerException, ErrorCodes

from airflow_clickhouse_plugin import ClickHouseHook
from tests.util import LocalClickHouseHook


class ClientFromUrlTestCase(TestCase):
class ClickHouseHookTestCaseCase(TestCase):
def test_temp_table(self):
hook = LocalClickHouseHook()
temp_table_name = 'test_temp_table'
Expand All @@ -25,6 +32,138 @@ def test_temp_table(self):
raise AssertionError('server did not raise an error')


class ClickHouseConnectionParamsTestCase(TestCase):
def test_plain_arguments(self):
plain_arguments = dict(host='hst', password='pass')
self._connection_kwargs.update(plain_arguments)
connection = self._connection_from_hook()
for key, value in plain_arguments.items():
with self.subTest(key):
self.assertEqual(value, getattr(connection, key))

def test_user(self):
login = 'user'
self._connection_kwargs.update(login=login)
self.assertEqual(login, self._connection_from_hook().user)

def test_database(self):
database = 'test-db-from-init-args'
self.assertEqual(
database,
ClickHouseHook(database=database).get_conn().connection.database,
)

def test_schema(self):
schema = 'test-db-from-connection-schema'
self._connection_kwargs.update(schema=schema)
self.assertEqual(schema, self._connection_from_hook().database)

def test_port(self):
port = 8888
self._connection_kwargs.update(port=str(port))
self.assertEqual(port, self._connection_from_hook().port)

def test_host_missing(self):
self.assertEqual('localhost', self._connection_from_hook().host)

def test_plain_extra_params(self):
extra_params = dict(
connect_timeout=123,
send_receive_timeout=456,
sync_request_timeout=789,
)
self._connection_kwargs.update(extra=extra_params)
connection = self._connection_from_hook()
for key, value in extra_params.items():
with self.subTest(key):
self.assertEqual(value, getattr(connection, key))

def test_client_name(self):
client_name = 'test-client-name'
self._connection_kwargs.update(extra=dict(client_name=client_name))
self.assertEqual(
f'{clickhouse_driver.defines.DBMS_NAME} {client_name}',
self._connection_from_hook().client_name,
)

def test_compression_enabled(self):
# to pass successfully requires: pip install clickhouse-driver[lz4]
comp_block_size = 123456
for compression in (True, 'lz4'):
with self.subTest(compression):
self._connection_kwargs.update(extra=dict(
compression=compression,
compress_block_size=comp_block_size,
))
connection = self._connection_from_hook()
self.assertEqual(
clickhouse_driver.protocol.Compression.ENABLED,
connection.compression,
)
self.assertEqual(comp_block_size, connection.compress_block_size)

def test_compression_disabled(self):
self._connection_kwargs.update(extra=dict(compression=False))
self.assertEqual(
clickhouse_driver.protocol.Compression.DISABLED,
self._connection_from_hook().compression,
)

def test_secure(self):
for secure in (False, True):
with self.subTest(secure):
self._connection_kwargs.update(extra=dict(secure=secure))
self.assertEqual(
secure,
self._connection_from_hook().secure_socket,
)

def test_verify(self):
for verify in (False, True):
with self.subTest(verify):
self._connection_kwargs.update(extra=dict(verify=verify))
self.assertEqual(
verify,
self._connection_from_hook().verify_cert,
)

def test_ssl_options(self):
ssl_options = dict(ssl_version='0.0', ca_certs='/a/b', ciphers='c')
self._connection_kwargs.update(extra=ssl_options)
for option, value in ssl_options.items():
with self.subTest(option):
self.assertEqual(
value,
self._connection_from_hook().ssl_options[option],
)

@staticmethod
def _connection_from_hook() -> clickhouse_driver.connection.Connection:
return ClickHouseHook().get_conn().connection

_connection_kwargs: Dict[str, Any]
_mocked_hook: mock._patch

@classmethod
def setUpClass(cls):
cls._mocked_hook = mock.patch(
'airflow_clickhouse_plugin.ClickHouseHook.get_connection',
lambda self, conn_id: airflow.models.Connection(**dict(
cls._connection_kwargs,
extra=json.dumps(cls._connection_kwargs['extra']),
) if 'extra' in cls._connection_kwargs else cls._connection_kwargs),
)
cls._mocked_hook.__enter__()

@classmethod
def setUp(cls):
cls._connection_kwargs = {}

@classmethod
def tearDownClass(cls) -> None:
cls._mocked_hook.__exit__(None, None, None)


class HookLogQueryTestCase(TestCase):
def setUp(self) -> None:
self.hook = LocalClickHouseHook()
Expand Down

0 comments on commit cea9457

Please sign in to comment.