-
Notifications
You must be signed in to change notification settings - Fork 770
/
Copy pathclickhouse_connector.py
58 lines (46 loc) · 1.92 KB
/
clickhouse_connector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
import environs
import os
from log import log
default_database = "default"
class ClickhouseConnector(object):
def connect(self, host, port, user="root", password="", database=default_database):
protocol = os.getenv("QUERY_CLICKHOUSE_HANDLER_PROTOCAL")
if protocol is None:
if port == "443" or port == "8443":
protocol = "https"
else:
protocol = "http"
self._uri = f"clickhouse+http://{user}:{password}@{host}:{port}/{database}?protocol={protocol}"
log.debug(self._uri)
e = environs.Env()
self._additonal_headers = dict()
if os.getenv("CLICKHOUSE_ADDITIONAL_HEADERS") is not None:
headers = e.dict("CLICKHOUSE_ADDITIONAL_HEADERS")
for key in headers:
self._additonal_headers["header__" + key] = headers[key]
self._session = None
def query_with_session(self, statement):
from clickhouse_sqlalchemy import make_session # type: ignore
from sqlalchemy import create_engine # type: ignore
if self._session is None:
engine = create_engine(self._uri, connect_args=self._additonal_headers)
self._session = make_session(engine)
log.debug(statement)
return self._session.execute(statement)
def reset_session(self):
if self._session is not None:
self._session.close()
self._session = None
def fetch_all(self, statement):
cursor = self.query_with_session(statement)
data_list = list()
for item in cursor.fetchall():
data_list.append(list(item))
cursor.close()
return data_list
# if __name__ == '__main__':
# from config import clickhouse_config
# connector = ClickhouseConnector()
# connector.connect(**clickhouse_config)
# print(connector.fetch_all("show databases"))
# print(connector.fetch_all("select * from t1"))