aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKan Zhang <kzhang@apache.org>2014-05-10 14:01:08 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-05-10 14:01:08 -0700
commit6c2691d0a0ed46a8b8093e05a4708706cf187168 (patch)
tree6292433802cb3cd9afb5ddb0b6c6479cc8d9bca8
parent3776f2f283842543ff766398292532c6e94221cc (diff)
downloadspark-6c2691d0a0ed46a8b8093e05a4708706cf187168.tar.gz
spark-6c2691d0a0ed46a8b8093e05a4708706cf187168.tar.bz2
spark-6c2691d0a0ed46a8b8093e05a4708706cf187168.zip
[SPARK-1690] Tolerating empty elements when saving Python RDD to text files
Tolerate empty strings in PythonRDD Author: Kan Zhang <kzhang@apache.org> Closes #644 from kanzhang/SPARK-1690 and squashes the following commits: c62ad33 [Kan Zhang] Adding Python doctest 473ec4b [Kan Zhang] [SPARK-1690] Tolerating empty elements when saving Python RDD to text files
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala5
-rw-r--r--python/pyspark/rdd.py8
2 files changed, 11 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 388b838d78..2971c277aa 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -94,6 +94,7 @@ private[spark] class PythonRDD[T: ClassTag](
val obj = new Array[Byte](length)
stream.readFully(obj)
obj
+ case 0 => Array.empty[Byte]
case SpecialLengths.TIMING_DATA =>
// Timing data from worker
val bootTime = stream.readLong()
@@ -123,7 +124,7 @@ private[spark] class PythonRDD[T: ClassTag](
stream.readFully(update)
accumulator += Collections.singletonList(update)
}
- Array.empty[Byte]
+ null
}
} catch {
@@ -143,7 +144,7 @@ private[spark] class PythonRDD[T: ClassTag](
var _nextObj = read()
- def hasNext = _nextObj.length != 0
+ def hasNext = _nextObj != null
}
new InterruptibleIterator(context, stdoutIterator)
}
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 3a1c56af5b..4f74824ba4 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -891,6 +891,14 @@ class RDD(object):
>>> from glob import glob
>>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*"))))
'0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n'
+
+ Empty lines are tolerated when saving to text files.
+
+ >>> tempFile2 = NamedTemporaryFile(delete=True)
+ >>> tempFile2.close()
+ >>> sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name)
+ >>> ''.join(sorted(input(glob(tempFile2.name + "/part-0000*"))))
+ '\\n\\n\\nbar\\nfoo\\n'
"""
def func(split, iterator):
for x in iterator: