diff options
author | Sean Owen <sowen@cloudera.com> | 2016-07-20 09:48:52 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-07-20 09:48:52 -0700 |
commit | 4b079dc3964dbe0f4d7839d39512d0400122b520 (patch) | |
tree | c51c7f70518ac5d62b34e2b4b7c9081fa512eff7 /core/src | |
parent | 95abbe537751929353d18b733f6267c3287b6047 (diff) | |
download | spark-4b079dc3964dbe0f4d7839d39512d0400122b520.tar.gz spark-4b079dc3964dbe0f4d7839d39512d0400122b520.tar.bz2 spark-4b079dc3964dbe0f4d7839d39512d0400122b520.zip |
[SPARK-16613][CORE] RDD.pipe returns values for empty partitions
## What changes were proposed in this pull request?
Document RDD.pipe semantics; don't execute process for empty input partitions.
Note this includes the fix in https://github.com/apache/spark/pull/14256 because it's necessary to even test this. One or the other will merge the fix.
## How was this patch tested?
Jenkins tests including new test.
Author: Sean Owen <sowen@cloudera.com>
Closes #14260 from srowen/SPARK-16613.
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/RDD.scala | 8 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala | 8 |
2 files changed, 15 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 0804cdeb04..a4905dd51b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -714,7 +714,13 @@ abstract class RDD[T: ClassTag]( } /** - * Return an RDD created by piping elements to a forked external process. + * Return an RDD created by piping elements to a forked external process. The resulting RDD + * is computed by executing the given process once per partition. All elements + * of each input partition are written to a process's stdin as lines of input separated + * by a newline. The resulting partition consists of the process's stdout output, with + * each line of stdout resulting in one element of the output partition. A process is invoked + * even for empty partitions. + * * The print behavior can be customized by providing two functions. * * @param command command to run in forked process. diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala index 5d56fc19f0..f8d523fa2c 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala @@ -138,6 +138,14 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext { } } + test("pipe with empty partition") { + val data = sc.parallelize(Seq("foo", "bing"), 8) + val piped = data.pipe("wc -c") + assert(piped.count == 8) + val charCounts = piped.map(_.trim.toInt).collect().toSet + assert(Set(0, 4, 5) == charCounts) + } + test("pipe with env variable") { if (testCommandAvailable("printenv")) { val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) |