diff options
author | Kan Zhang <kzhang@apache.org> | 2014-05-10 14:01:08 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-05-10 14:01:08 -0700 |
commit | 6c2691d0a0ed46a8b8093e05a4708706cf187168 (patch) | |
tree | 6292433802cb3cd9afb5ddb0b6c6479cc8d9bca8 | |
parent | 3776f2f283842543ff766398292532c6e94221cc (diff) | |
download | spark-6c2691d0a0ed46a8b8093e05a4708706cf187168.tar.gz spark-6c2691d0a0ed46a8b8093e05a4708706cf187168.tar.bz2 spark-6c2691d0a0ed46a8b8093e05a4708706cf187168.zip |
[SPARK-1690] Tolerating empty elements when saving Python RDD to text files
Tolerate empty strings in PythonRDD
Author: Kan Zhang <kzhang@apache.org>
Closes #644 from kanzhang/SPARK-1690 and squashes the following commits:
c62ad33 [Kan Zhang] Adding Python doctest
473ec4b [Kan Zhang] [SPARK-1690] Tolerating empty elements when saving Python RDD to text files
-rw-r--r-- | core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 5 | ||||
-rw-r--r-- | python/pyspark/rdd.py | 8 |
2 files changed, 11 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 388b838d78..2971c277aa 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -94,6 +94,7 @@ private[spark] class PythonRDD[T: ClassTag]( val obj = new Array[Byte](length) stream.readFully(obj) obj + case 0 => Array.empty[Byte] case SpecialLengths.TIMING_DATA => // Timing data from worker val bootTime = stream.readLong() @@ -123,7 +124,7 @@ private[spark] class PythonRDD[T: ClassTag]( stream.readFully(update) accumulator += Collections.singletonList(update) } - Array.empty[Byte] + null } } catch { @@ -143,7 +144,7 @@ private[spark] class PythonRDD[T: ClassTag]( var _nextObj = read() - def hasNext = _nextObj.length != 0 + def hasNext = _nextObj != null } new InterruptibleIterator(context, stdoutIterator) } diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 3a1c56af5b..4f74824ba4 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -891,6 +891,14 @@ class RDD(object): >>> from glob import glob >>> ''.join(sorted(input(glob(tempFile.name + "/part-0000*")))) '0\\n1\\n2\\n3\\n4\\n5\\n6\\n7\\n8\\n9\\n' + + Empty lines are tolerated when saving to text files. + + >>> tempFile2 = NamedTemporaryFile(delete=True) + >>> tempFile2.close() + >>> sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name) + >>> ''.join(sorted(input(glob(tempFile2.name + "/part-0000*")))) + '\\n\\n\\nbar\\nfoo\\n' """ def func(split, iterator): for x in iterator: |