From 80bf48f437939ddc3bb82c8c7530c8ae419f8427 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Wed, 20 Apr 2016 10:32:01 -0700 Subject: [SPARK-14555] First cut of Python API for Structured Streaming ## What changes were proposed in this pull request? This patch provides a first cut of python APIs for structured streaming. This PR provides the new classes: - ContinuousQuery - Trigger - ProcessingTime in pyspark under `pyspark.sql.streaming`. In addition, it contains the new methods added under: - `DataFrameWriter` a) `startStream` b) `trigger` c) `queryName` - `DataFrameReader` a) `stream` - `DataFrame` a) `isStreaming` This PR doesn't contain all methods exposed for `ContinuousQuery`, for example: - `exception` - `sourceStatuses` - `sinkStatus` They may be added in a follow up. This PR also contains some very minor doc fixes in the Scala side. ## How was this patch tested? Python doc tests TODO: - [ ] verify Python docs look good Author: Burak Yavuz Author: Burak Yavuz Closes #12320 from brkyvz/stream-python. --- python/pyspark/sql/tests.py | 93 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) (limited to 'python/pyspark/sql/tests.py') diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index d4c221d712..1e864b4cd1 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -879,6 +879,99 @@ class SQLTests(ReusedPySparkTestCase): shutil.rmtree(tmpPath) + def test_stream_trigger_takes_keyword_args(self): + df = self.sqlCtx.read.format('text').stream('python/test_support/sql/streaming') + try: + df.write.trigger('5 seconds') + self.fail("Should have thrown an exception") + except TypeError: + # should throw error + pass + + def test_stream_read_options(self): + schema = StructType([StructField("data", StringType(), False)]) + df = self.sqlCtx.read.format('text').option('path', 'python/test_support/sql/streaming')\ + .schema(schema).stream() + self.assertTrue(df.isStreaming) + self.assertEqual(df.schema.simpleString(), "struct") + + def test_stream_read_options_overwrite(self): + bad_schema = StructType([StructField("test", IntegerType(), False)]) + schema = StructType([StructField("data", StringType(), False)]) + df = self.sqlCtx.read.format('csv').option('path', 'python/test_support/sql/fake') \ + .schema(bad_schema).stream(path='python/test_support/sql/streaming', + schema=schema, format='text') + self.assertTrue(df.isStreaming) + self.assertEqual(df.schema.simpleString(), "struct") + + def test_stream_save_options(self): + df = self.sqlCtx.read.format('text').stream('python/test_support/sql/streaming') + tmpPath = tempfile.mkdtemp() + shutil.rmtree(tmpPath) + self.assertTrue(df.isStreaming) + out = os.path.join(tmpPath, 'out') + chk = os.path.join(tmpPath, 'chk') + cq = df.write.option('checkpointLocation', chk).queryName('this_query')\ + .format('parquet').option('path', out).startStream() + self.assertEqual(cq.name, 'this_query') + self.assertTrue(cq.isActive) + cq.processAllAvailable() + output_files = [] + for _, _, files in os.walk(out): + output_files.extend([f for f in files if 'parquet' in f and not f.startswith('.')]) + self.assertTrue(len(output_files) > 0) + self.assertTrue(len(os.listdir(chk)) > 0) + cq.stop() + shutil.rmtree(tmpPath) + + def test_stream_save_options_overwrite(self): + df = self.sqlCtx.read.format('text').stream('python/test_support/sql/streaming') + tmpPath = tempfile.mkdtemp() + shutil.rmtree(tmpPath) + self.assertTrue(df.isStreaming) + out = os.path.join(tmpPath, 'out') + chk = os.path.join(tmpPath, 'chk') + fake1 = os.path.join(tmpPath, 'fake1') + fake2 = os.path.join(tmpPath, 'fake2') + cq = df.write.option('checkpointLocation', fake1).format('memory').option('path', fake2) \ + .queryName('fake_query').startStream(path=out, format='parquet', queryName='this_query', + checkpointLocation=chk) + self.assertEqual(cq.name, 'this_query') + self.assertTrue(cq.isActive) + cq.processAllAvailable() + output_files = [] + for _, _, files in os.walk(out): + output_files.extend([f for f in files if 'parquet' in f and not f.startswith('.')]) + self.assertTrue(len(output_files) > 0) + self.assertTrue(len(os.listdir(chk)) > 0) + self.assertFalse(os.path.isdir(fake1)) # should not have been created + self.assertFalse(os.path.isdir(fake2)) # should not have been created + cq.stop() + shutil.rmtree(tmpPath) + + def test_stream_await_termination(self): + df = self.sqlCtx.read.format('text').stream('python/test_support/sql/streaming') + tmpPath = tempfile.mkdtemp() + shutil.rmtree(tmpPath) + self.assertTrue(df.isStreaming) + out = os.path.join(tmpPath, 'out') + chk = os.path.join(tmpPath, 'chk') + cq = df.write.startStream(path=out, format='parquet', queryName='this_query', + checkpointLocation=chk) + self.assertTrue(cq.isActive) + try: + cq.awaitTermination("hello") + self.fail("Expected a value exception") + except ValueError: + pass + now = time.time() + res = cq.awaitTermination(2600) # test should take at least 2 seconds + duration = time.time() - now + self.assertTrue(duration >= 2) + self.assertFalse(res) + cq.stop() + shutil.rmtree(tmpPath) + def test_help_command(self): # Regression test for SPARK-5464 rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}']) -- cgit v1.2.3