diff --git a/README.md b/README.md index 50122d4..f47bc50 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,9 @@ Provides `ClickHouseHook` and `ClickHouseOperator` for [Apache Airflow][airflow] 4. Executed queries are logged in a pretty form. 5. Uses effective native ClickHouse TCP protocol thanks to [clickhouse-driver][ch-driver-docs]. Does not support HTTP protocol. +6. Supports extra ClickHouse [connection parameters][ch-driver-connection] such + as various timeouts, `compression`, `secure`, etc through Airflow + [Connection.extra][airflow-conn-extra] property. # Installation @@ -28,13 +31,8 @@ To import `ClickHouseOperator` use: Supported kwargs: * `sql`: templated query (if argument is a single `str`) or queries (if iterable of `str`'s). -* `clickhouse_conn_id`: connection id. Connection schema (all properties are - optional, defaults correspond to the default ClickHouse configuration): - * `host`, default: `localhost`; - * `port`, default: `9000` (default native ClickHouse protocol port); - * `database`, default: `default`; - * `user`, default: `default`; - * `password`, default: `''` (empty). +* `clickhouse_conn_id`: connection id. Connection schema is described + [below](#clickhouse-connection-schema). * `parameters`: passed to clickhouse-driver [execute method][ch-driver-execute]. * If multiple queries are provided via `sql` then the parameters are passed to _all_ of them. @@ -51,7 +49,8 @@ To import `ClickHouseHook` use: `from airflow.hooks.clickhouse_hook import ClickHouseHook` Supported kwargs of constructor (`__init__` method): -* `clickhouse_conn_id`: connection id. See connection schema above. +* `clickhouse_conn_id`: connection id. Connection schema is described + [below](#clickhouse-connection-schema). * `database`: if present, overrides database defined by connection. Supports all of the methods of the Airflow [BaseHook][airflow-base-hook] @@ -76,13 +75,55 @@ Supports all of the methods of the Airflow [BaseHook][airflow-base-hook] * `get_conn()`: returns the underlying [clickhouse_driver.Client][ch-driver-client] instance. +## ClickHouse Connection schema + +[clickhouse_driver.Client][ch-driver-client] is initiated with attributes stored + in Airflow [Connection attributes][airflow-connection-attrs]. The mapping of + the attributes is listed below: + +| Airflow Connection attribute | `Client.__init__` argument | +| --- | --- | +| `host` | `host` | +| `port` | `port` | +| `schema` | `database` | +| `login` | `user` | +| `password` | `password` | + +If you pass `database` argument to `ClickHouseOperator` or `ClickHouseHook` + explicitly then it is passed to the `Client` instead of the `schema` + attribute of the Airflow connection. + +### Extra arguments + +You may also pass [additional arguments][ch-driver-connection], such as + timeouts, `compression`, `secure`, etc through + [Connection.extra][airflow-conn-extra] attribute. The attribute should + contain a JSON object which will be [deserialized][airflow-conn-dejson] and + all of its properties will be passed as-is to the `Client`. + +For example, if Airflow connection contains `extra={"secure":true}` then + the `Client.__init__` will receive `secure=True` keyword argument in + addition to other non-empty connection attributes. + +### Default values + +If the Airflow connection attribute is not set then it is not passed to the + `Client` at all. In that case the default value of the corresponding + [clickhouse_driver.Connection][ch-driver-connection] argument is used (e.g. + `user` defaults to `'default'`). + +This means that Airflow ClickHouse Plugin does not itself define any default + values for the ClickHouse connection. You may fully rely on default values + of the `clickhouse-driver` version you use. The only exception is `host`: if + the attribute of Airflow connection is not set then `'localhost'` is used. + ## Examples ### ClickHouseOperator ```python from airflow import DAG -from airflow.operators.clickhouse_plugin import ClickHouseOperator +from airflow.operators.clickhouse_operator import ClickHouseOperator from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago @@ -162,6 +203,7 @@ From the root project directory: `python -m unittest discover -s tests` * Viktor Taranenko, [@viktortnk](https://github.com/viktortnk) * Danila Ganchar, [@d-ganchar](https://github.com/d-ganchar) * Mikhail, [@glader](https://github.com/glader) +* Alexander Chashnikov, [@ne1r0n](https://github.com/ne1r0n) [airflow]: https://airflow.apache.org/ @@ -173,3 +215,7 @@ From the root project directory: `python -m unittest discover -s tests` [ch-driver-execute-iter]: https://clickhouse-driver.readthedocs.io/en/latest/quickstart.html#streaming-results [ch-driver-insert]: https://clickhouse-driver.readthedocs.io/en/latest/quickstart.html#inserting-data [ch-driver-client]: https://clickhouse-driver.readthedocs.io/en/latest/api.html#client +[airflow-conn-extra]: https://airflow.apache.org/docs/stable/_api/airflow/models/connection/index.html#airflow.models.connection.Connection.extra +[ch-driver-connection]: https://clickhouse-driver.readthedocs.io/en/latest/api.html#connection +[airflow-connection-attrs]: https://airflow.apache.org/docs/1.10.6/_api/airflow/models/index.html?highlight=connection#airflow.models.Connection +[airflow-conn-dejson]: https://airflow.apache.org/docs/1.10.6/_api/airflow/models/index.html?highlight=connection#airflow.models.Connection.extra_dejson diff --git a/airflow_clickhouse_plugin/__version__.py b/airflow_clickhouse_plugin/__version__.py index e7fb4af..073e645 100644 --- a/airflow_clickhouse_plugin/__version__.py +++ b/airflow_clickhouse_plugin/__version__.py @@ -9,7 +9,7 @@ __description__ = \ 'airflow-clickhouse-plugin - ' \ 'Airflow plugin to execute ClickHouse commands and queries' -__version__ = '0.5.5' +__version__ = '0.5.6' __url__ = 'https://github.com/whisklabs/airflow-clickhouse-plugin' __license__ = 'MIT License' __author__ = 'Viktor Taranenko' diff --git a/airflow_clickhouse_plugin/hooks/clickhouse_hook.py b/airflow_clickhouse_plugin/hooks/clickhouse_hook.py index bb871ab..1382c6e 100644 --- a/airflow_clickhouse_plugin/hooks/clickhouse_hook.py +++ b/airflow_clickhouse_plugin/hooks/clickhouse_hook.py @@ -19,19 +19,18 @@ def __init__( def get_conn(self) -> Client: conn = self.get_connection(self.clickhouse_conn_id) - return self.create_connection( - host=conn.host or 'localhost', - port=int(conn.port) if conn.port else 9000, - user=conn.login or 'default', - password=conn.password or '', - database=self.database or conn.schema or 'default', - ) - - @classmethod - def create_connection( - cls, host: str, port: int, database: str, user: str, password: str - ) -> Client: - return Client(host, port, database, user, password) + connection_kwargs = conn.extra_dejson.copy() + if conn.port: + connection_kwargs.update(port=int(conn.port)) + if conn.login: + connection_kwargs.update(user=conn.login) + if conn.password: + connection_kwargs.update(password=conn.password) + if self.database: + connection_kwargs.update(database=self.database) + elif conn.schema: + connection_kwargs.update(database=conn.schema) + return Client(conn.host or 'localhost', **connection_kwargs) def get_records(self, sql: str, parameters: dict = None) -> List[Tuple]: self._log_query(sql, parameters) diff --git a/tests/test_hook.py b/tests/test_hook.py index 8b3669f..fde38a6 100644 --- a/tests/test_hook.py +++ b/tests/test_hook.py @@ -1,11 +1,18 @@ +import json +from typing import Any, Dict from unittest import TestCase, mock +import airflow.models +import clickhouse_driver.connection +import clickhouse_driver.defines +import clickhouse_driver.protocol from clickhouse_driver.errors import ServerException, ErrorCodes +from airflow_clickhouse_plugin import ClickHouseHook from tests.util import LocalClickHouseHook -class ClientFromUrlTestCase(TestCase): +class ClickHouseHookTestCaseCase(TestCase): def test_temp_table(self): hook = LocalClickHouseHook() temp_table_name = 'test_temp_table' @@ -25,6 +32,138 @@ def test_temp_table(self): raise AssertionError('server did not raise an error') +class ClickHouseConnectionParamsTestCase(TestCase): + def test_plain_arguments(self): + plain_arguments = dict(host='hst', password='pass') + self._connection_kwargs.update(plain_arguments) + connection = self._connection_from_hook() + for key, value in plain_arguments.items(): + with self.subTest(key): + self.assertEqual(value, getattr(connection, key)) + + def test_user(self): + login = 'user' + self._connection_kwargs.update(login=login) + self.assertEqual(login, self._connection_from_hook().user) + + def test_database(self): + database = 'test-db-from-init-args' + self.assertEqual( + database, + ClickHouseHook(database=database).get_conn().connection.database, + ) + + def test_schema(self): + schema = 'test-db-from-connection-schema' + self._connection_kwargs.update(schema=schema) + self.assertEqual(schema, self._connection_from_hook().database) + + def test_port(self): + port = 8888 + self._connection_kwargs.update(port=str(port)) + self.assertEqual(port, self._connection_from_hook().port) + + def test_host_missing(self): + self.assertEqual('localhost', self._connection_from_hook().host) + + def test_plain_extra_params(self): + extra_params = dict( + connect_timeout=123, + send_receive_timeout=456, + sync_request_timeout=789, + ) + self._connection_kwargs.update(extra=extra_params) + connection = self._connection_from_hook() + for key, value in extra_params.items(): + with self.subTest(key): + self.assertEqual(value, getattr(connection, key)) + + def test_client_name(self): + client_name = 'test-client-name' + self._connection_kwargs.update(extra=dict(client_name=client_name)) + self.assertEqual( + f'{clickhouse_driver.defines.DBMS_NAME} {client_name}', + self._connection_from_hook().client_name, + ) + + def test_compression_enabled(self): + # to pass successfully requires: pip install clickhouse-driver[lz4] + comp_block_size = 123456 + for compression in (True, 'lz4'): + with self.subTest(compression): + self._connection_kwargs.update(extra=dict( + compression=compression, + compress_block_size=comp_block_size, + )) + connection = self._connection_from_hook() + self.assertEqual( + clickhouse_driver.protocol.Compression.ENABLED, + connection.compression, + ) + self.assertEqual(comp_block_size, connection.compress_block_size) + + def test_compression_disabled(self): + self._connection_kwargs.update(extra=dict(compression=False)) + self.assertEqual( + clickhouse_driver.protocol.Compression.DISABLED, + self._connection_from_hook().compression, + ) + + def test_secure(self): + for secure in (False, True): + with self.subTest(secure): + self._connection_kwargs.update(extra=dict(secure=secure)) + self.assertEqual( + secure, + self._connection_from_hook().secure_socket, + ) + + def test_verify(self): + for verify in (False, True): + with self.subTest(verify): + self._connection_kwargs.update(extra=dict(verify=verify)) + self.assertEqual( + verify, + self._connection_from_hook().verify_cert, + ) + + def test_ssl_options(self): + ssl_options = dict(ssl_version='0.0', ca_certs='/a/b', ciphers='c') + self._connection_kwargs.update(extra=ssl_options) + for option, value in ssl_options.items(): + with self.subTest(option): + self.assertEqual( + value, + self._connection_from_hook().ssl_options[option], + ) + + @staticmethod + def _connection_from_hook() -> clickhouse_driver.connection.Connection: + return ClickHouseHook().get_conn().connection + + _connection_kwargs: Dict[str, Any] + _mocked_hook: mock._patch + + @classmethod + def setUpClass(cls): + cls._mocked_hook = mock.patch( + 'airflow_clickhouse_plugin.ClickHouseHook.get_connection', + lambda self, conn_id: airflow.models.Connection(**dict( + cls._connection_kwargs, + extra=json.dumps(cls._connection_kwargs['extra']), + ) if 'extra' in cls._connection_kwargs else cls._connection_kwargs), + ) + cls._mocked_hook.__enter__() + + @classmethod + def setUp(cls): + cls._connection_kwargs = {} + + @classmethod + def tearDownClass(cls) -> None: + cls._mocked_hook.__exit__(None, None, None) + + class HookLogQueryTestCase(TestCase): def setUp(self) -> None: self.hook = LocalClickHouseHook()