Skip to content

Commit

Permalink
Update clickhouse.py
Browse files Browse the repository at this point in the history
  • Loading branch information
amolsr authored May 30, 2024
1 parent 1e3fb48 commit 85a60cf
Showing 1 changed file with 31 additions and 0 deletions.
31 changes: 31 additions & 0 deletions src/airflow_clickhouse_plugin/hooks/clickhouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,37 @@ def execute(
)
return last_result

def get_pandas_df(
self, sql: str, parameters: t.Union[None, dict, list, tuple, t.Generator] = None
) -> Any:
import pandas as pd

rows, columns_defs = self.run(sql, parameters, with_column_types=True)
columns = [column_name for column_name, _ in columns_defs]
return pd.DataFrame(rows, columns=columns)

def run(
self,
sql: t.Union[str, t.Iterable[str]],
parameters: t.Union[None, dict, list, tuple, t.Generator] = None,
with_column_types: bool = False,
types_check: bool = False,
) -> Any:
if isinstance(sql, str):
sql = (sql,)
with _disconnecting(self.get_conn()) as conn:
last_result = None
for s in sql:
self._log_query(s, parameters)
last_result = conn.execute(
s,
params=parameters,
with_column_types=with_column_types,
types_check=types_check,
)

return last_result


def conn_to_kwargs(conn: Connection, database: t.Optional[str]) -> t.Dict[str, t.Any]:
""" Translate Airflow Connection to clickhouse-driver Connection kwargs. """
Expand Down

0 comments on commit 85a60cf

Please sign in to comment.