Skip to content

Commit

Permalink
[AIRFLOW-685] Add test for MySqlHook.bulk_load()
Browse files Browse the repository at this point in the history
Closes apache#1929 from sekikn/AIRFLOW-685
  • Loading branch information
sekikn authored and bolkedebruin committed Dec 25, 2016
1 parent cd33276 commit 93538ca
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 1 deletion.
2 changes: 1 addition & 1 deletion airflow/utils/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def initdb():
merge_conn(
models.Connection(
conn_id='airflow_ci', conn_type='mysql',
host='localhost', login='root',
host='localhost', login='root', extra="{\"local_infile\": true}",
schema='airflow_ci'))
merge_conn(
models.Connection(
Expand Down
22 changes: 22 additions & 0 deletions tests/operators/operators.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,28 @@ def mysql_operator_test_multi(self):
sql=sql, dag=self.dag)
t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True)

def mysql_hook_test_bulk_load(self):
records = ("foo", "bar", "baz")

import tempfile
with tempfile.NamedTemporaryFile() as t:
t.write("\n".join(records).encode('utf8'))
t.flush()

from airflow.hooks.mysql_hook import MySqlHook
h = MySqlHook('airflow_ci')
with h.get_conn() as c:
c.execute("""
CREATE TABLE IF NOT EXISTS test_airflow (
dummy VARCHAR(50)
)
""")
c.execute("TRUNCATE TABLE test_airflow")
h.bulk_load("test_airflow", t.name)
c.execute("SELECT dummy FROM test_airflow")
results = tuple(result[0] for result in c.fetchall())
assert sorted(records) == sorted(results)

def test_mysql_to_mysql(self):
sql = "SELECT * FROM INFORMATION_SCHEMA.TABLES LIMIT 100;"
import airflow.operators.generic_transfer
Expand Down

0 comments on commit 93538ca

Please sign in to comment.