aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-07-20 09:48:52 -0700
committerReynold Xin <rxin@databricks.com>2016-07-20 09:48:52 -0700
commit4b079dc3964dbe0f4d7839d39512d0400122b520 (patch)
treec51c7f70518ac5d62b34e2b4b7c9081fa512eff7 /core
parent95abbe537751929353d18b733f6267c3287b6047 (diff)
downloadspark-4b079dc3964dbe0f4d7839d39512d0400122b520.tar.gz
spark-4b079dc3964dbe0f4d7839d39512d0400122b520.tar.bz2
spark-4b079dc3964dbe0f4d7839d39512d0400122b520.zip
[SPARK-16613][CORE] RDD.pipe returns values for empty partitions
## What changes were proposed in this pull request? Document RDD.pipe semantics; don't execute process for empty input partitions. Note this includes the fix in https://github.com/apache/spark/pull/14256 because it's necessary to even test this. One or the other will merge the fix. ## How was this patch tested? Jenkins tests including new test. Author: Sean Owen <sowen@cloudera.com> Closes #14260 from srowen/SPARK-16613.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala8
2 files changed, 15 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 0804cdeb04..a4905dd51b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -714,7 +714,13 @@ abstract class RDD[T: ClassTag](
}
/**
- * Return an RDD created by piping elements to a forked external process.
+ * Return an RDD created by piping elements to a forked external process. The resulting RDD
+ * is computed by executing the given process once per partition. All elements
+ * of each input partition are written to a process's stdin as lines of input separated
+ * by a newline. The resulting partition consists of the process's stdout output, with
+ * each line of stdout resulting in one element of the output partition. A process is invoked
+ * even for empty partitions.
+ *
* The print behavior can be customized by providing two functions.
*
* @param command command to run in forked process.
diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
index 5d56fc19f0..f8d523fa2c 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
@@ -138,6 +138,14 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
}
}
+ test("pipe with empty partition") {
+ val data = sc.parallelize(Seq("foo", "bing"), 8)
+ val piped = data.pipe("wc -c")
+ assert(piped.count == 8)
+ val charCounts = piped.map(_.trim.toInt).collect().toSet
+ assert(Set(0, 4, 5) == charCounts)
+ }
+
test("pipe with env variable") {
if (testCommandAvailable("printenv")) {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)