aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorBurak Yavuz <brkyvz@gmail.com>2016-12-15 14:26:54 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2016-12-15 14:26:54 -0800
commit0917c8ee07bd3de87d9754960d8e89808b5efb2f (patch)
tree43fac08844a3e1eaff94ad7f2167f655fea7ba18 /python
parent68a6dc974b25e6eddef109f6fd23ae4e9775ceca (diff)
downloadspark-0917c8ee07bd3de87d9754960d8e89808b5efb2f.tar.gz
spark-0917c8ee07bd3de87d9754960d8e89808b5efb2f.tar.bz2
spark-0917c8ee07bd3de87d9754960d8e89808b5efb2f.zip
[SPARK-18888] partitionBy in DataStreamWriter in Python throws _to_seq not defined
## What changes were proposed in this pull request? `_to_seq` wasn't imported. ## How was this patch tested? Added partitionBy to existing write path unit test Author: Burak Yavuz <brkyvz@gmail.com> Closes #16297 from brkyvz/SPARK-18888.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/sql/streaming.py1
-rw-r--r--python/pyspark/sql/tests.py7
2 files changed, 5 insertions, 3 deletions
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index eabd5ef54c..5014299ad2 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -28,6 +28,7 @@ from abc import ABCMeta, abstractmethod
from pyspark import since, keyword_only
from pyspark.rdd import ignore_unicode_prefix
+from pyspark.sql.column import _to_seq
from pyspark.sql.readwriter import OptionUtils, to_str
from pyspark.sql.types import *
from pyspark.sql.utils import StreamingQueryException
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 6ddd804eec..18fd68ec5e 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -50,7 +50,7 @@ from pyspark.sql import SparkSession, HiveContext, Column, Row
from pyspark.sql.types import *
from pyspark.sql.types import UserDefinedType, _infer_type
from pyspark.tests import ReusedPySparkTestCase, SparkSubmitTests
-from pyspark.sql.functions import UserDefinedFunction, sha2
+from pyspark.sql.functions import UserDefinedFunction, sha2, lit
from pyspark.sql.window import Window
from pyspark.sql.utils import AnalysisException, ParseException, IllegalArgumentException
@@ -1065,7 +1065,8 @@ class SQLTests(ReusedPySparkTestCase):
self.assertEqual(df.schema.simpleString(), "struct<data:string>")
def test_stream_save_options(self):
- df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
+ df = self.spark.readStream.format('text').load('python/test_support/sql/streaming') \
+ .withColumn('id', lit(1))
for q in self.spark._wrapped.streams.active:
q.stop()
tmpPath = tempfile.mkdtemp()
@@ -1074,7 +1075,7 @@ class SQLTests(ReusedPySparkTestCase):
out = os.path.join(tmpPath, 'out')
chk = os.path.join(tmpPath, 'chk')
q = df.writeStream.option('checkpointLocation', chk).queryName('this_query') \
- .format('parquet').outputMode('append').option('path', out).start()
+ .format('parquet').partitionBy('id').outputMode('append').option('path', out).start()
try:
self.assertEqual(q.name, 'this_query')
self.assertTrue(q.isActive)