aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/tests.py
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2016-04-28 15:22:28 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2016-04-28 15:22:28 -0700
commit78c8aaf849aadbb065730959e7c1b70bb58d69c9 (patch)
tree8795a2ad1c070ec852809d5f6e81c5bbd1c3afd8 /python/pyspark/sql/tests.py
parentd584a2b8ac57eff3bf230c760e5bda205c6ea747 (diff)
downloadspark-78c8aaf849aadbb065730959e7c1b70bb58d69c9.tar.gz
spark-78c8aaf849aadbb065730959e7c1b70bb58d69c9.tar.bz2
spark-78c8aaf849aadbb065730959e7c1b70bb58d69c9.zip
[SPARK-14555] Second cut of Python API for Structured Streaming
## What changes were proposed in this pull request? This PR adds Python APIs for: - `ContinuousQueryManager` - `ContinuousQueryException` The `ContinuousQueryException` is a very basic wrapper, it doesn't provide the functionality that the Scala side provides, but it follows the same pattern for `AnalysisException`. For `ContinuousQueryManager`, all APIs are provided except for registering listeners. This PR also attempts to fix test flakiness by stopping all active streams just before tests. ## How was this patch tested? Python Doc tests and unit tests Author: Burak Yavuz <brkyvz@gmail.com> Closes #12673 from brkyvz/pyspark-cqm.
Diffstat (limited to 'python/pyspark/sql/tests.py')
-rw-r--r--python/pyspark/sql/tests.py109
1 files changed, 75 insertions, 34 deletions
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 99a12d639a..1d3dc159da 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -924,26 +924,32 @@ class SQLTests(ReusedPySparkTestCase):
def test_stream_save_options(self):
df = self.sqlCtx.read.format('text').stream('python/test_support/sql/streaming')
+ for cq in self.sqlCtx.streams.active:
+ cq.stop()
tmpPath = tempfile.mkdtemp()
shutil.rmtree(tmpPath)
self.assertTrue(df.isStreaming)
out = os.path.join(tmpPath, 'out')
chk = os.path.join(tmpPath, 'chk')
- cq = df.write.option('checkpointLocation', chk).queryName('this_query')\
+ cq = df.write.option('checkpointLocation', chk).queryName('this_query') \
.format('parquet').option('path', out).startStream()
- self.assertEqual(cq.name, 'this_query')
- self.assertTrue(cq.isActive)
- cq.processAllAvailable()
- output_files = []
- for _, _, files in os.walk(out):
- output_files.extend([f for f in files if 'parquet' in f and not f.startswith('.')])
- self.assertTrue(len(output_files) > 0)
- self.assertTrue(len(os.listdir(chk)) > 0)
- cq.stop()
- shutil.rmtree(tmpPath)
+ try:
+ self.assertEqual(cq.name, 'this_query')
+ self.assertTrue(cq.isActive)
+ cq.processAllAvailable()
+ output_files = []
+ for _, _, files in os.walk(out):
+ output_files.extend([f for f in files if 'parquet' in f and not f.startswith('.')])
+ self.assertTrue(len(output_files) > 0)
+ self.assertTrue(len(os.listdir(chk)) > 0)
+ finally:
+ cq.stop()
+ shutil.rmtree(tmpPath)
def test_stream_save_options_overwrite(self):
df = self.sqlCtx.read.format('text').stream('python/test_support/sql/streaming')
+ for cq in self.sqlCtx.streams.active:
+ cq.stop()
tmpPath = tempfile.mkdtemp()
shutil.rmtree(tmpPath)
self.assertTrue(df.isStreaming)
@@ -954,21 +960,25 @@ class SQLTests(ReusedPySparkTestCase):
cq = df.write.option('checkpointLocation', fake1).format('memory').option('path', fake2) \
.queryName('fake_query').startStream(path=out, format='parquet', queryName='this_query',
checkpointLocation=chk)
- self.assertEqual(cq.name, 'this_query')
- self.assertTrue(cq.isActive)
- cq.processAllAvailable()
- output_files = []
- for _, _, files in os.walk(out):
- output_files.extend([f for f in files if 'parquet' in f and not f.startswith('.')])
- self.assertTrue(len(output_files) > 0)
- self.assertTrue(len(os.listdir(chk)) > 0)
- self.assertFalse(os.path.isdir(fake1)) # should not have been created
- self.assertFalse(os.path.isdir(fake2)) # should not have been created
- cq.stop()
- shutil.rmtree(tmpPath)
+ try:
+ self.assertEqual(cq.name, 'this_query')
+ self.assertTrue(cq.isActive)
+ cq.processAllAvailable()
+ output_files = []
+ for _, _, files in os.walk(out):
+ output_files.extend([f for f in files if 'parquet' in f and not f.startswith('.')])
+ self.assertTrue(len(output_files) > 0)
+ self.assertTrue(len(os.listdir(chk)) > 0)
+ self.assertFalse(os.path.isdir(fake1)) # should not have been created
+ self.assertFalse(os.path.isdir(fake2)) # should not have been created
+ finally:
+ cq.stop()
+ shutil.rmtree(tmpPath)
def test_stream_await_termination(self):
df = self.sqlCtx.read.format('text').stream('python/test_support/sql/streaming')
+ for cq in self.sqlCtx.streams.active:
+ cq.stop()
tmpPath = tempfile.mkdtemp()
shutil.rmtree(tmpPath)
self.assertTrue(df.isStreaming)
@@ -976,19 +986,50 @@ class SQLTests(ReusedPySparkTestCase):
chk = os.path.join(tmpPath, 'chk')
cq = df.write.startStream(path=out, format='parquet', queryName='this_query',
checkpointLocation=chk)
- self.assertTrue(cq.isActive)
try:
- cq.awaitTermination("hello")
- self.fail("Expected a value exception")
- except ValueError:
- pass
- now = time.time()
- res = cq.awaitTermination(2600) # test should take at least 2 seconds
- duration = time.time() - now
- self.assertTrue(duration >= 2)
- self.assertFalse(res)
- cq.stop()
+ self.assertTrue(cq.isActive)
+ try:
+ cq.awaitTermination("hello")
+ self.fail("Expected a value exception")
+ except ValueError:
+ pass
+ now = time.time()
+ # test should take at least 2 seconds
+ res = cq.awaitTermination(2.6)
+ duration = time.time() - now
+ self.assertTrue(duration >= 2)
+ self.assertFalse(res)
+ finally:
+ cq.stop()
+ shutil.rmtree(tmpPath)
+
+ def test_query_manager_await_termination(self):
+ df = self.sqlCtx.read.format('text').stream('python/test_support/sql/streaming')
+ for cq in self.sqlCtx.streams.active:
+ cq.stop()
+ tmpPath = tempfile.mkdtemp()
shutil.rmtree(tmpPath)
+ self.assertTrue(df.isStreaming)
+ out = os.path.join(tmpPath, 'out')
+ chk = os.path.join(tmpPath, 'chk')
+ cq = df.write.startStream(path=out, format='parquet', queryName='this_query',
+ checkpointLocation=chk)
+ try:
+ self.assertTrue(cq.isActive)
+ try:
+ self.sqlCtx.streams.awaitAnyTermination("hello")
+ self.fail("Expected a value exception")
+ except ValueError:
+ pass
+ now = time.time()
+ # test should take at least 2 seconds
+ res = self.sqlCtx.streams.awaitAnyTermination(2.6)
+ duration = time.time() - now
+ self.assertTrue(duration >= 2)
+ self.assertFalse(res)
+ finally:
+ cq.stop()
+ shutil.rmtree(tmpPath)
def test_help_command(self):
# Regression test for SPARK-5464