aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/sql/tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/sql/tests.py')
-rw-r--r--python/pyspark/sql/tests.py42
1 files changed, 23 insertions, 19 deletions
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index e0acde6783..fee960a1a7 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -892,9 +892,9 @@ class SQLTests(ReusedPySparkTestCase):
shutil.rmtree(tmpPath)
def test_stream_trigger_takes_keyword_args(self):
- df = self.spark.read.format('text').stream('python/test_support/sql/streaming')
+ df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
try:
- df.write.trigger('5 seconds')
+ df.writeStream.trigger('5 seconds')
self.fail("Should have thrown an exception")
except TypeError:
# should throw error
@@ -902,22 +902,25 @@ class SQLTests(ReusedPySparkTestCase):
def test_stream_read_options(self):
schema = StructType([StructField("data", StringType(), False)])
- df = self.spark.read.format('text').option('path', 'python/test_support/sql/streaming')\
- .schema(schema).stream()
+ df = self.spark.readStream\
+ .format('text')\
+ .option('path', 'python/test_support/sql/streaming')\
+ .schema(schema)\
+ .load()
self.assertTrue(df.isStreaming)
self.assertEqual(df.schema.simpleString(), "struct<data:string>")
def test_stream_read_options_overwrite(self):
bad_schema = StructType([StructField("test", IntegerType(), False)])
schema = StructType([StructField("data", StringType(), False)])
- df = self.spark.read.format('csv').option('path', 'python/test_support/sql/fake') \
- .schema(bad_schema).stream(path='python/test_support/sql/streaming',
- schema=schema, format='text')
+ df = self.spark.readStream.format('csv').option('path', 'python/test_support/sql/fake') \
+ .schema(bad_schema)\
+ .load(path='python/test_support/sql/streaming', schema=schema, format='text')
self.assertTrue(df.isStreaming)
self.assertEqual(df.schema.simpleString(), "struct<data:string>")
def test_stream_save_options(self):
- df = self.spark.read.format('text').stream('python/test_support/sql/streaming')
+ df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
for cq in self.spark._wrapped.streams.active:
cq.stop()
tmpPath = tempfile.mkdtemp()
@@ -925,8 +928,8 @@ class SQLTests(ReusedPySparkTestCase):
self.assertTrue(df.isStreaming)
out = os.path.join(tmpPath, 'out')
chk = os.path.join(tmpPath, 'chk')
- cq = df.write.option('checkpointLocation', chk).queryName('this_query') \
- .format('parquet').outputMode('append').option('path', out).startStream()
+ cq = df.writeStream.option('checkpointLocation', chk).queryName('this_query') \
+ .format('parquet').outputMode('append').option('path', out).start()
try:
self.assertEqual(cq.name, 'this_query')
self.assertTrue(cq.isActive)
@@ -941,7 +944,7 @@ class SQLTests(ReusedPySparkTestCase):
shutil.rmtree(tmpPath)
def test_stream_save_options_overwrite(self):
- df = self.spark.read.format('text').stream('python/test_support/sql/streaming')
+ df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
for cq in self.spark._wrapped.streams.active:
cq.stop()
tmpPath = tempfile.mkdtemp()
@@ -951,9 +954,10 @@ class SQLTests(ReusedPySparkTestCase):
chk = os.path.join(tmpPath, 'chk')
fake1 = os.path.join(tmpPath, 'fake1')
fake2 = os.path.join(tmpPath, 'fake2')
- cq = df.write.option('checkpointLocation', fake1).format('memory').option('path', fake2) \
+ cq = df.writeStream.option('checkpointLocation', fake1)\
+ .format('memory').option('path', fake2) \
.queryName('fake_query').outputMode('append') \
- .startStream(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)
+ .start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)
try:
self.assertEqual(cq.name, 'this_query')
@@ -971,7 +975,7 @@ class SQLTests(ReusedPySparkTestCase):
shutil.rmtree(tmpPath)
def test_stream_await_termination(self):
- df = self.spark.read.format('text').stream('python/test_support/sql/streaming')
+ df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
for cq in self.spark._wrapped.streams.active:
cq.stop()
tmpPath = tempfile.mkdtemp()
@@ -979,8 +983,8 @@ class SQLTests(ReusedPySparkTestCase):
self.assertTrue(df.isStreaming)
out = os.path.join(tmpPath, 'out')
chk = os.path.join(tmpPath, 'chk')
- cq = df.write.startStream(path=out, format='parquet', queryName='this_query',
- checkpointLocation=chk)
+ cq = df.writeStream\
+ .start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)
try:
self.assertTrue(cq.isActive)
try:
@@ -999,7 +1003,7 @@ class SQLTests(ReusedPySparkTestCase):
shutil.rmtree(tmpPath)
def test_query_manager_await_termination(self):
- df = self.spark.read.format('text').stream('python/test_support/sql/streaming')
+ df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
for cq in self.spark._wrapped.streams.active:
cq.stop()
tmpPath = tempfile.mkdtemp()
@@ -1007,8 +1011,8 @@ class SQLTests(ReusedPySparkTestCase):
self.assertTrue(df.isStreaming)
out = os.path.join(tmpPath, 'out')
chk = os.path.join(tmpPath, 'chk')
- cq = df.write.startStream(path=out, format='parquet', queryName='this_query',
- checkpointLocation=chk)
+ cq = df.writeStream\
+ .start(path=out, format='parquet', queryName='this_query', checkpointLocation=chk)
try:
self.assertTrue(cq.isActive)
try: