diff options
Diffstat (limited to 'python/pyspark/sql/tests.py')
-rw-r--r-- | python/pyspark/sql/tests.py | 52 |
1 files changed, 26 insertions, 26 deletions
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index fee960a1a7..1d5d691696 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -921,32 +921,32 @@ class SQLTests(ReusedPySparkTestCase): def test_stream_save_options(self): df = self.spark.readStream.format('text').load('python/test_support/sql/streaming') - for cq in self.spark._wrapped.streams.active: - cq.stop() + for q in self.spark._wrapped.streams.active: + q.stop() tmpPath = tempfile.mkdtemp() shutil.rmtree(tmpPath) self.assertTrue(df.isStreaming) out = os.path.join(tmpPath, 'out') chk = os.path.join(tmpPath, 'chk') - cq = df.writeStream.option('checkpointLocation', chk).queryName('this_query') \ + q = df.writeStream.option('checkpointLocation', chk).queryName('this_query') \ .format('parquet').outputMode('append').option('path', out).start() try: - self.assertEqual(cq.name, 'this_query') - self.assertTrue(cq.isActive) - cq.processAllAvailable() + self.assertEqual(q.name, 'this_query') + self.assertTrue(q.isActive) + q.processAllAvailable() output_files = [] for _, _, files in os.walk(out): output_files.extend([f for f in files if not f.startswith('.')]) self.assertTrue(len(output_files) > 0) self.assertTrue(len(os.listdir(chk)) > 0) finally: - cq.stop() + q.stop() shutil.rmtree(tmpPath) def test_stream_save_options_overwrite(self): df = self.spark.readStream.format('text').load('python/test_support/sql/streaming') - for cq in self.spark._wrapped.streams.active: - cq.stop() + for q in self.spark._wrapped.streams.active: + q.stop() tmpPath = tempfile.mkdtemp() shutil.rmtree(tmpPath) self.assertTrue(df.isStreaming) @@ -954,15 +954,15 @@ class SQLTests(ReusedPySparkTestCase): chk = os.path.join(tmpPath, 'chk') fake1 = os.path.join(tmpPath, 'fake1') fake2 = os.path.join(tmpPath, 'fake2') - cq = df.writeStream.option('checkpointLocation', fake1)\ + q = df.writeStream.option('checkpointLocation', fake1)\ .format('memory').option('path', fake2) \ .queryName('fake_query').outputMode('append') \ .start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk) try: - self.assertEqual(cq.name, 'this_query') - self.assertTrue(cq.isActive) - cq.processAllAvailable() + self.assertEqual(q.name, 'this_query') + self.assertTrue(q.isActive) + q.processAllAvailable() output_files = [] for _, _, files in os.walk(out): output_files.extend([f for f in files if not f.startswith('.')]) @@ -971,50 +971,50 @@ class SQLTests(ReusedPySparkTestCase): self.assertFalse(os.path.isdir(fake1)) # should not have been created self.assertFalse(os.path.isdir(fake2)) # should not have been created finally: - cq.stop() + q.stop() shutil.rmtree(tmpPath) def test_stream_await_termination(self): df = self.spark.readStream.format('text').load('python/test_support/sql/streaming') - for cq in self.spark._wrapped.streams.active: - cq.stop() + for q in self.spark._wrapped.streams.active: + q.stop() tmpPath = tempfile.mkdtemp() shutil.rmtree(tmpPath) self.assertTrue(df.isStreaming) out = os.path.join(tmpPath, 'out') chk = os.path.join(tmpPath, 'chk') - cq = df.writeStream\ + q = df.writeStream\ .start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk) try: - self.assertTrue(cq.isActive) + self.assertTrue(q.isActive) try: - cq.awaitTermination("hello") + q.awaitTermination("hello") self.fail("Expected a value exception") except ValueError: pass now = time.time() # test should take at least 2 seconds - res = cq.awaitTermination(2.6) + res = q.awaitTermination(2.6) duration = time.time() - now self.assertTrue(duration >= 2) self.assertFalse(res) finally: - cq.stop() + q.stop() shutil.rmtree(tmpPath) def test_query_manager_await_termination(self): df = self.spark.readStream.format('text').load('python/test_support/sql/streaming') - for cq in self.spark._wrapped.streams.active: - cq.stop() + for q in self.spark._wrapped.streams.active: + q.stop() tmpPath = tempfile.mkdtemp() shutil.rmtree(tmpPath) self.assertTrue(df.isStreaming) out = os.path.join(tmpPath, 'out') chk = os.path.join(tmpPath, 'chk') - cq = df.writeStream\ + q = df.writeStream\ .start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk) try: - self.assertTrue(cq.isActive) + self.assertTrue(q.isActive) try: self.spark._wrapped.streams.awaitAnyTermination("hello") self.fail("Expected a value exception") @@ -1027,7 +1027,7 @@ class SQLTests(ReusedPySparkTestCase): self.assertTrue(duration >= 2) self.assertFalse(res) finally: - cq.stop() + q.stop() shutil.rmtree(tmpPath) def test_help_command(self): |