Skip to content

Commit c1adb45

Browse files
authored
Test data frames with pyspark (#135)
* Add tags to test_dual_write * Add the Python 3.5 example. * Fix up style and imports * Remove excess newline * Fix new test
1 parent 6901d19 commit c1adb45

File tree

3 files changed

+44
-1
lines changed

3 files changed

+44
-1
lines changed

python/examples/test_dual_write.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import os
22
import tempfile
33

4+
# tag::test[]
45
from sparktestingbase.sqltestcase import SQLTestCase
56
from pyspark.sql.functions import current_timestamp
67
from pyspark.sql.types import Row
@@ -21,3 +22,6 @@ def test_actual_dual_write(self):
2122
df1 = self.sqlCtx.read.format("parquet").load(p1)
2223
df2 = self.sqlCtx.read.format("parquet").load(p2)
2324
self.assertDataFrameEqual(df2.select("times"), df1, 0.1)
25+
26+
27+
# end::test[]
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import os
2+
import tempfile
3+
4+
# tag::test[]
5+
import unittest
6+
from pyspark.sql import SparkSession
7+
from pyspark.sql.functions import current_timestamp
8+
from pyspark.sql.types import Row
9+
from pyspark.testing.utils import assertDataFrameEqual
10+
from .dual_write import DualWriteExample
11+
12+
13+
class DualWriteTest(unittest.TestCase):
14+
@classmethod
15+
def setUpClass(cls):
16+
cls.spark = SparkSession.builder.appName(
17+
"Testing PySpark Example"
18+
).getOrCreate()
19+
20+
@classmethod
21+
def tearDownClass(cls):
22+
cls.spark.stop()
23+
24+
def test_always_passes(self):
25+
self.assertTrue(True)
26+
27+
def test_actual_dual_write(self):
28+
tempdir = tempfile.mkdtemp()
29+
p1 = os.path.join(tempdir, "data1")
30+
p2 = os.path.join(tempdir, "data2")
31+
df = self.spark.createDataFrame([Row("timbit"), Row("farted")], ["names"])
32+
combined = df.withColumn("times", current_timestamp())
33+
DualWriteExample().do_write(combined, p1, p2)
34+
df1 = self.spark.read.format("parquet").load(p1)
35+
df2 = self.spark.read.format("parquet").load(p2)
36+
assertDataFrameEqual(df2.select("times"), df1, 0.1)
37+
38+
39+
# end::test[]

python/tox.ini

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ deps =
5757
[testenv:flake8]
5858
extras = tests
5959
skipsdist = True
60-
commands = flake8 --ignore=F403,E402,F401,F405,W503 examples
60+
commands = flake8 --ignore=F403,E402,F401,F405,W503,E265 examples
6161
allowlist_externals = flake8
6262

6363
[testenv:mypy]

0 commit comments

Comments
 (0)