aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2013-12-11 10:21:53 +0530
committerPrashant Sharma <prashant.s@imaginea.com>2013-12-11 10:21:53 +0530
commit603af51bb5257744ce0db28e7f10db6a2ba899ec (patch)
tree31ad611d8d56147ec90108d2b3a92d38eae4d150 /python
parent17db6a9041d5e83d7b6fe47f9c36758d0613fcd6 (diff)
parentd2efe13574090e93c600adeacc7f6356bc196e6c (diff)
downloadspark-603af51bb5257744ce0db28e7f10db6a2ba899ec.tar.gz
spark-603af51bb5257744ce0db28e7f10db6a2ba899ec.tar.bz2
spark-603af51bb5257744ce0db28e7f10db6a2ba899ec.zip
Merge branch 'master' into akka-bug-fix
Conflicts: core/pom.xml core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala pom.xml project/SparkBuild.scala streaming/pom.xml yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/rdd.py5
-rw-r--r--python/pyspark/tests.py15
-rwxr-xr-xpython/test_support/userlibrary.py17
3 files changed, 36 insertions, 1 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index d2cb5f191a..61720dcf1a 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -605,7 +605,10 @@ class RDD(object):
'0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n'
"""
def func(split, iterator):
- return (str(x).encode("utf-8") for x in iterator)
+ for x in iterator:
+ if not isinstance(x, basestring):
+ x = unicode(x)
+ yield x.encode("utf-8")
keyed = PipelinedRDD(self, func)
keyed._bypass_serializer = True
keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 621e1cb58c..3987642bf4 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -19,6 +19,8 @@
Unit tests for PySpark; additional tests are implemented as doctests in
individual modules.
"""
+from fileinput import input
+from glob import glob
import os
import shutil
import sys
@@ -138,6 +140,19 @@ class TestAddFile(PySparkTestCase):
self.assertEqual("Hello World from inside a package!", UserClass().hello())
+class TestRDDFunctions(PySparkTestCase):
+
+ def test_save_as_textfile_with_unicode(self):
+ # Regression test for SPARK-970
+ x = u"\u00A1Hola, mundo!"
+ data = self.sc.parallelize([x])
+ tempFile = NamedTemporaryFile(delete=True)
+ tempFile.close()
+ data.saveAsTextFile(tempFile.name)
+ raw_contents = ''.join(input(glob(tempFile.name + "/part-0000*")))
+ self.assertEqual(x, unicode(raw_contents.strip(), "utf-8"))
+
+
class TestIO(PySparkTestCase):
def test_stdout_redirection(self):
diff --git a/python/test_support/userlibrary.py b/python/test_support/userlibrary.py
index 5bb6f5009f..8e4a6292bc 100755
--- a/python/test_support/userlibrary.py
+++ b/python/test_support/userlibrary.py
@@ -1,3 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
"""
Used to test shipping of code depenencies with SparkContext.addPyFile().
"""