aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/tests.py
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2016-06-15 10:46:02 -0700
committerShixiong Zhu <shixiong@databricks.com>2016-06-15 10:46:07 -0700
commit9a5071996b968148f6b9aba12e0d3fe888d9acd8 (patch)
tree1f39691de83edb31dca56d6cd460261070504248 /python/pyspark/sql/tests.py
parentd30b7e6696e20f1014c7f26aadbc051da0fac578 (diff)
downloadspark-9a5071996b968148f6b9aba12e0d3fe888d9acd8.tar.gz
spark-9a5071996b968148f6b9aba12e0d3fe888d9acd8.tar.bz2
spark-9a5071996b968148f6b9aba12e0d3fe888d9acd8.zip
[SPARK-15953][WIP][STREAMING] Renamed ContinuousQuery to StreamingQuery
Renamed for simplicity, so that its obvious that its related to streaming. Existing unit tests. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #13673 from tdas/SPARK-15953.
Diffstat (limited to 'python/pyspark/sql/tests.py')
-rw-r--r--python/pyspark/sql/tests.py52
1 files changed, 26 insertions, 26 deletions
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index fee960a1a7..1d5d691696 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -921,32 +921,32 @@ class SQLTests(ReusedPySparkTestCase):
def test_stream_save_options(self):
df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
- for cq in self.spark._wrapped.streams.active:
- cq.stop()
+ for q in self.spark._wrapped.streams.active:
+ q.stop()
tmpPath = tempfile.mkdtemp()
shutil.rmtree(tmpPath)
self.assertTrue(df.isStreaming)
out = os.path.join(tmpPath, 'out')
chk = os.path.join(tmpPath, 'chk')
- cq = df.writeStream.option('checkpointLocation', chk).queryName('this_query') \
+ q = df.writeStream.option('checkpointLocation', chk).queryName('this_query') \
.format('parquet').outputMode('append').option('path', out).start()
try:
- self.assertEqual(cq.name, 'this_query')
- self.assertTrue(cq.isActive)
- cq.processAllAvailable()
+ self.assertEqual(q.name, 'this_query')
+ self.assertTrue(q.isActive)
+ q.processAllAvailable()
output_files = []
for _, _, files in os.walk(out):
output_files.extend([f for f in files if not f.startswith('.')])
self.assertTrue(len(output_files) > 0)
self.assertTrue(len(os.listdir(chk)) > 0)
finally:
- cq.stop()
+ q.stop()
shutil.rmtree(tmpPath)
def test_stream_save_options_overwrite(self):
df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
- for cq in self.spark._wrapped.streams.active:
- cq.stop()
+ for q in self.spark._wrapped.streams.active:
+ q.stop()
tmpPath = tempfile.mkdtemp()
shutil.rmtree(tmpPath)
self.assertTrue(df.isStreaming)
@@ -954,15 +954,15 @@ class SQLTests(ReusedPySparkTestCase):
chk = os.path.join(tmpPath, 'chk')
fake1 = os.path.join(tmpPath, 'fake1')
fake2 = os.path.join(tmpPath, 'fake2')
- cq = df.writeStream.option('checkpointLocation', fake1)\
+ q = df.writeStream.option('checkpointLocation', fake1)\
.format('memory').option('path', fake2) \
.queryName('fake_query').outputMode('append') \
.start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)
try:
- self.assertEqual(cq.name, 'this_query')
- self.assertTrue(cq.isActive)
- cq.processAllAvailable()
+ self.assertEqual(q.name, 'this_query')
+ self.assertTrue(q.isActive)
+ q.processAllAvailable()
output_files = []
for _, _, files in os.walk(out):
output_files.extend([f for f in files if not f.startswith('.')])
@@ -971,50 +971,50 @@ class SQLTests(ReusedPySparkTestCase):
self.assertFalse(os.path.isdir(fake1)) # should not have been created
self.assertFalse(os.path.isdir(fake2)) # should not have been created
finally:
- cq.stop()
+ q.stop()
shutil.rmtree(tmpPath)
def test_stream_await_termination(self):
df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
- for cq in self.spark._wrapped.streams.active:
- cq.stop()
+ for q in self.spark._wrapped.streams.active:
+ q.stop()
tmpPath = tempfile.mkdtemp()
shutil.rmtree(tmpPath)
self.assertTrue(df.isStreaming)
out = os.path.join(tmpPath, 'out')
chk = os.path.join(tmpPath, 'chk')
- cq = df.writeStream\
+ q = df.writeStream\
.start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)
try:
- self.assertTrue(cq.isActive)
+ self.assertTrue(q.isActive)
try:
- cq.awaitTermination("hello")
+ q.awaitTermination("hello")
self.fail("Expected a value exception")
except ValueError:
pass
now = time.time()
# test should take at least 2 seconds
- res = cq.awaitTermination(2.6)
+ res = q.awaitTermination(2.6)
duration = time.time() - now
self.assertTrue(duration >= 2)
self.assertFalse(res)
finally:
- cq.stop()
+ q.stop()
shutil.rmtree(tmpPath)
def test_query_manager_await_termination(self):
df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
- for cq in self.spark._wrapped.streams.active:
- cq.stop()
+ for q in self.spark._wrapped.streams.active:
+ q.stop()
tmpPath = tempfile.mkdtemp()
shutil.rmtree(tmpPath)
self.assertTrue(df.isStreaming)
out = os.path.join(tmpPath, 'out')
chk = os.path.join(tmpPath, 'chk')
- cq = df.writeStream\
+ q = df.writeStream\
.start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)
try:
- self.assertTrue(cq.isActive)
+ self.assertTrue(q.isActive)
try:
self.spark._wrapped.streams.awaitAnyTermination("hello")
self.fail("Expected a value exception")
@@ -1027,7 +1027,7 @@ class SQLTests(ReusedPySparkTestCase):
self.assertTrue(duration >= 2)
self.assertFalse(res)
finally:
- cq.stop()
+ q.stop()
shutil.rmtree(tmpPath)
def test_help_command(self):