aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/tests.py
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-06-14 17:58:45 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-06-14 17:58:45 -0700
commit214adb14b8d1f1c4dce0c97dd6dc09efedbaa643 (patch)
tree4933de7ffd5ff7f099957fceaf581b4519a0b2fa /python/pyspark/sql/tests.py
parent5d50d4f0f9db3e6cc7c51e35cdb2d12daa4fd108 (diff)
downloadspark-214adb14b8d1f1c4dce0c97dd6dc09efedbaa643.tar.gz
spark-214adb14b8d1f1c4dce0c97dd6dc09efedbaa643.tar.bz2
spark-214adb14b8d1f1c4dce0c97dd6dc09efedbaa643.zip
[SPARK-15933][SQL][STREAMING] Refactored DF reader-writer to use readStream and writeStream for streaming DFs
## What changes were proposed in this pull request? Currently, the DataFrameReader/Writer has method that are needed for streaming and non-streaming DFs. This is quite awkward because each method in them through runtime exception for one case or the other. So rather having half the methods throw runtime exceptions, its just better to have a different reader/writer API for streams. - [x] Python API!! ## How was this patch tested? Existing unit tests + two sets of unit tests for DataFrameReader/Writer and DataStreamReader/Writer. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #13653 from tdas/SPARK-15933.
Diffstat (limited to 'python/pyspark/sql/tests.py')
-rw-r--r--python/pyspark/sql/tests.py42
1 files changed, 23 insertions, 19 deletions
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index e0acde6783..fee960a1a7 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -892,9 +892,9 @@ class SQLTests(ReusedPySparkTestCase):
shutil.rmtree(tmpPath)
def test_stream_trigger_takes_keyword_args(self):
- df = self.spark.read.format('text').stream('python/test_support/sql/streaming')
+ df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
try:
- df.write.trigger('5 seconds')
+ df.writeStream.trigger('5 seconds')
self.fail("Should have thrown an exception")
except TypeError:
# should throw error
@@ -902,22 +902,25 @@ class SQLTests(ReusedPySparkTestCase):
def test_stream_read_options(self):
schema = StructType([StructField("data", StringType(), False)])
- df = self.spark.read.format('text').option('path', 'python/test_support/sql/streaming')\
- .schema(schema).stream()
+ df = self.spark.readStream\
+ .format('text')\
+ .option('path', 'python/test_support/sql/streaming')\
+ .schema(schema)\
+ .load()
self.assertTrue(df.isStreaming)
self.assertEqual(df.schema.simpleString(), "struct<data:string>")
def test_stream_read_options_overwrite(self):
bad_schema = StructType([StructField("test", IntegerType(), False)])
schema = StructType([StructField("data", StringType(), False)])
- df = self.spark.read.format('csv').option('path', 'python/test_support/sql/fake') \
- .schema(bad_schema).stream(path='python/test_support/sql/streaming',
- schema=schema, format='text')
+ df = self.spark.readStream.format('csv').option('path', 'python/test_support/sql/fake') \
+ .schema(bad_schema)\
+ .load(path='python/test_support/sql/streaming', schema=schema, format='text')
self.assertTrue(df.isStreaming)
self.assertEqual(df.schema.simpleString(), "struct<data:string>")
def test_stream_save_options(self):
- df = self.spark.read.format('text').stream('python/test_support/sql/streaming')
+ df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
for cq in self.spark._wrapped.streams.active:
cq.stop()
tmpPath = tempfile.mkdtemp()
@@ -925,8 +928,8 @@ class SQLTests(ReusedPySparkTestCase):
self.assertTrue(df.isStreaming)
out = os.path.join(tmpPath, 'out')
chk = os.path.join(tmpPath, 'chk')
- cq = df.write.option('checkpointLocation', chk).queryName('this_query') \
- .format('parquet').outputMode('append').option('path', out).startStream()
+ cq = df.writeStream.option('checkpointLocation', chk).queryName('this_query') \
+ .format('parquet').outputMode('append').option('path', out).start()
try:
self.assertEqual(cq.name, 'this_query')
self.assertTrue(cq.isActive)
@@ -941,7 +944,7 @@ class SQLTests(ReusedPySparkTestCase):
shutil.rmtree(tmpPath)
def test_stream_save_options_overwrite(self):
- df = self.spark.read.format('text').stream('python/test_support/sql/streaming')
+ df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
for cq in self.spark._wrapped.streams.active:
cq.stop()
tmpPath = tempfile.mkdtemp()
@@ -951,9 +954,10 @@ class SQLTests(ReusedPySparkTestCase):
chk = os.path.join(tmpPath, 'chk')
fake1 = os.path.join(tmpPath, 'fake1')
fake2 = os.path.join(tmpPath, 'fake2')
- cq = df.write.option('checkpointLocation', fake1).format('memory').option('path', fake2) \
+ cq = df.writeStream.option('checkpointLocation', fake1)\
+ .format('memory').option('path', fake2) \
.queryName('fake_query').outputMode('append') \
- .startStream(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)
+ .start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)
try:
self.assertEqual(cq.name, 'this_query')
@@ -971,7 +975,7 @@ class SQLTests(ReusedPySparkTestCase):
shutil.rmtree(tmpPath)
def test_stream_await_termination(self):
- df = self.spark.read.format('text').stream('python/test_support/sql/streaming')
+ df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
for cq in self.spark._wrapped.streams.active:
cq.stop()
tmpPath = tempfile.mkdtemp()
@@ -979,8 +983,8 @@ class SQLTests(ReusedPySparkTestCase):
self.assertTrue(df.isStreaming)
out = os.path.join(tmpPath, 'out')
chk = os.path.join(tmpPath, 'chk')
- cq = df.write.startStream(path=out, format='parquet', queryName='this_query',
- checkpointLocation=chk)
+ cq = df.writeStream\
+ .start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)
try:
self.assertTrue(cq.isActive)
try:
@@ -999,7 +1003,7 @@ class SQLTests(ReusedPySparkTestCase):
shutil.rmtree(tmpPath)
def test_query_manager_await_termination(self):
- df = self.spark.read.format('text').stream('python/test_support/sql/streaming')
+ df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
for cq in self.spark._wrapped.streams.active:
cq.stop()
tmpPath = tempfile.mkdtemp()
@@ -1007,8 +1011,8 @@ class SQLTests(ReusedPySparkTestCase):
self.assertTrue(df.isStreaming)
out = os.path.join(tmpPath, 'out')
chk = os.path.join(tmpPath, 'chk')
- cq = df.write.startStream(path=out, format='parquet', queryName='this_query',
- checkpointLocation=chk)
+ cq = df.writeStream\
+ .start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)
try:
self.assertTrue(cq.isActive)
try: