Skip to content

Commit

Permalink
[AIRFLOW-2441] Fix bugs in HiveCliHook.load_df
Browse files Browse the repository at this point in the history
This PR fixes HiveCliHook.load_df to:

1. encode delimiter with the specified encoding
   before passing it to pandas.DataFrame.to_csv
   so as not to fail

2. flush output file by pandas.DataFrame.to_csv
   before executing LOAD DATA statement

3. remove header and row index from output file
   by pandas.DataFrame.to_csv so as to read it
   as expected via Hive

Closes apache#3334 from sekikn/AIRFLOW-2441
  • Loading branch information
sekikn authored and bolkedebruin committed May 10, 2018
1 parent f7c33af commit 74027c9
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 1 deletion.
7 changes: 6 additions & 1 deletion airflow/hooks/hive_hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,12 @@ def _infer_field_types_from_df(df):
if field_dict is None and (create or recreate):
field_dict = _infer_field_types_from_df(df)

df.to_csv(f, sep=delimiter, **pandas_kwargs)
df.to_csv(path_or_buf=f,
sep=delimiter.encode(encoding),
header=False,
index=False,
**pandas_kwargs)
f.flush()

return self.load_file(filepath=f.name,
table=table,
Expand Down
27 changes: 27 additions & 0 deletions tests/hooks/test_hive_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#

import datetime
import pandas as pd
import random

import mock
Expand Down Expand Up @@ -105,6 +106,32 @@ def test_load_file(self, mock_run_cli):
)
mock_run_cli.assert_called_with(query)

@mock.patch('airflow.hooks.hive_hooks.HiveCliHook.load_file')
@mock.patch('pandas.DataFrame.to_csv')
def test_load_df(self, mock_to_csv, mock_load_file):
df = pd.DataFrame({"c": ["foo", "bar", "baz"]})
table = "t"
delimiter = ","
encoding = "utf-8"

hook = HiveCliHook()
hook.load_df(df=df,
table=table,
delimiter=delimiter,
encoding=encoding)

mock_to_csv.assert_called_once()
kwargs = mock_to_csv.call_args[1]
self.assertEqual(kwargs["header"], False)
self.assertEqual(kwargs["index"], False)
self.assertEqual(kwargs["sep"], delimiter.encode(encoding))

mock_load_file.assert_called_once()
kwargs = mock_load_file.call_args[1]
self.assertEqual(kwargs["delimiter"], delimiter)
self.assertEqual(kwargs["field_dict"], {"c": u"STRING"})
self.assertEqual(kwargs["table"], table)


class TestHiveMetastoreHook(HiveEnvironmentTest):
VALID_FILTER_MAP = {'key2': 'value2'}
Expand Down

0 comments on commit 74027c9

Please sign in to comment.