diff options
Diffstat (limited to 'python/pyspark/sql/tests.py')
-rw-r--r-- | python/pyspark/sql/tests.py | 42 |
1 files changed, 23 insertions, 19 deletions
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index e0acde6783..fee960a1a7 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -892,9 +892,9 @@ class SQLTests(ReusedPySparkTestCase): shutil.rmtree(tmpPath) def test_stream_trigger_takes_keyword_args(self): - df = self.spark.read.format('text').stream('python/test_support/sql/streaming') + df = self.spark.readStream.format('text').load('python/test_support/sql/streaming') try: - df.write.trigger('5 seconds') + df.writeStream.trigger('5 seconds') self.fail("Should have thrown an exception") except TypeError: # should throw error @@ -902,22 +902,25 @@ class SQLTests(ReusedPySparkTestCase): def test_stream_read_options(self): schema = StructType([StructField("data", StringType(), False)]) - df = self.spark.read.format('text').option('path', 'python/test_support/sql/streaming')\ - .schema(schema).stream() + df = self.spark.readStream\ + .format('text')\ + .option('path', 'python/test_support/sql/streaming')\ + .schema(schema)\ + .load() self.assertTrue(df.isStreaming) self.assertEqual(df.schema.simpleString(), "struct<data:string>") def test_stream_read_options_overwrite(self): bad_schema = StructType([StructField("test", IntegerType(), False)]) schema = StructType([StructField("data", StringType(), False)]) - df = self.spark.read.format('csv').option('path', 'python/test_support/sql/fake') \ - .schema(bad_schema).stream(path='python/test_support/sql/streaming', - schema=schema, format='text') + df = self.spark.readStream.format('csv').option('path', 'python/test_support/sql/fake') \ + .schema(bad_schema)\ + .load(path='python/test_support/sql/streaming', schema=schema, format='text') self.assertTrue(df.isStreaming) self.assertEqual(df.schema.simpleString(), "struct<data:string>") def test_stream_save_options(self): - df = self.spark.read.format('text').stream('python/test_support/sql/streaming') + df = self.spark.readStream.format('text').load('python/test_support/sql/streaming') for cq in self.spark._wrapped.streams.active: cq.stop() tmpPath = tempfile.mkdtemp() @@ -925,8 +928,8 @@ class SQLTests(ReusedPySparkTestCase): self.assertTrue(df.isStreaming) out = os.path.join(tmpPath, 'out') chk = os.path.join(tmpPath, 'chk') - cq = df.write.option('checkpointLocation', chk).queryName('this_query') \ - .format('parquet').outputMode('append').option('path', out).startStream() + cq = df.writeStream.option('checkpointLocation', chk).queryName('this_query') \ + .format('parquet').outputMode('append').option('path', out).start() try: self.assertEqual(cq.name, 'this_query') self.assertTrue(cq.isActive) @@ -941,7 +944,7 @@ class SQLTests(ReusedPySparkTestCase): shutil.rmtree(tmpPath) def test_stream_save_options_overwrite(self): - df = self.spark.read.format('text').stream('python/test_support/sql/streaming') + df = self.spark.readStream.format('text').load('python/test_support/sql/streaming') for cq in self.spark._wrapped.streams.active: cq.stop() tmpPath = tempfile.mkdtemp() @@ -951,9 +954,10 @@ class SQLTests(ReusedPySparkTestCase): chk = os.path.join(tmpPath, 'chk') fake1 = os.path.join(tmpPath, 'fake1') fake2 = os.path.join(tmpPath, 'fake2') - cq = df.write.option('checkpointLocation', fake1).format('memory').option('path', fake2) \ + cq = df.writeStream.option('checkpointLocation', fake1)\ + .format('memory').option('path', fake2) \ .queryName('fake_query').outputMode('append') \ - .startStream(path=out, format='parquet', queryName='this_query', checkpointLocation=chk) + .start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk) try: self.assertEqual(cq.name, 'this_query') @@ -971,7 +975,7 @@ class SQLTests(ReusedPySparkTestCase): shutil.rmtree(tmpPath) def test_stream_await_termination(self): - df = self.spark.read.format('text').stream('python/test_support/sql/streaming') + df = self.spark.readStream.format('text').load('python/test_support/sql/streaming') for cq in self.spark._wrapped.streams.active: cq.stop() tmpPath = tempfile.mkdtemp() @@ -979,8 +983,8 @@ class SQLTests(ReusedPySparkTestCase): self.assertTrue(df.isStreaming) out = os.path.join(tmpPath, 'out') chk = os.path.join(tmpPath, 'chk') - cq = df.write.startStream(path=out, format='parquet', queryName='this_query', - checkpointLocation=chk) + cq = df.writeStream\ + .start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk) try: self.assertTrue(cq.isActive) try: @@ -999,7 +1003,7 @@ class SQLTests(ReusedPySparkTestCase): shutil.rmtree(tmpPath) def test_query_manager_await_termination(self): - df = self.spark.read.format('text').stream('python/test_support/sql/streaming') + df = self.spark.readStream.format('text').load('python/test_support/sql/streaming') for cq in self.spark._wrapped.streams.active: cq.stop() tmpPath = tempfile.mkdtemp() @@ -1007,8 +1011,8 @@ class SQLTests(ReusedPySparkTestCase): self.assertTrue(df.isStreaming) out = os.path.join(tmpPath, 'out') chk = os.path.join(tmpPath, 'chk') - cq = df.write.startStream(path=out, format='parquet', queryName='this_query', - checkpointLocation=chk) + cq = df.writeStream\ + .start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk) try: self.assertTrue(cq.isActive) try: |