aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorLiwei Lin <lwlin7@gmail.com>2016-07-19 10:24:48 -0700
committerReynold Xin <rxin@databricks.com>2016-07-19 10:24:48 -0700
commit0bd76e872b60cb80295fc12654e370cf22390056 (patch)
treed5377d139b31babe5d6c3bae91f6345201d91831 /core
parent670891496a82538a5e2bf981a4044fb6f4cbb062 (diff)
downloadspark-0bd76e872b60cb80295fc12654e370cf22390056.tar.gz
spark-0bd76e872b60cb80295fc12654e370cf22390056.tar.bz2
spark-0bd76e872b60cb80295fc12654e370cf22390056.zip
[SPARK-16620][CORE] Add back the tokenization process in `RDD.pipe(command: String)`
## What changes were proposed in this pull request? Currently `RDD.pipe(command: String)`: - works only when the command is specified without any options, such as `RDD.pipe("wc")` - does NOT work when the command is specified with some options, such as `RDD.pipe("wc -l")` This is a regression from Spark 1.6. This patch adds back the tokenization process in `RDD.pipe(command: String)` to fix this regression. ## How was this patch tested? Added a test which: - would pass in `1.6` - _[prior to this patch]_ would fail in `master` - _[after this patch]_ would pass in `master` Author: Liwei Lin <lwlin7@gmail.com> Closes #14256 from lw-lin/rdd-pipe.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala16
2 files changed, 22 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index b7a5b22208..0804cdeb04 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -699,14 +699,18 @@ abstract class RDD[T: ClassTag](
* Return an RDD created by piping elements to a forked external process.
*/
def pipe(command: String): RDD[String] = withScope {
- pipe(command)
+ // Similar to Runtime.exec(), if we are given a single string, split it into words
+ // using a standard StringTokenizer (i.e. by spaces)
+ pipe(PipedRDD.tokenize(command))
}
/**
* Return an RDD created by piping elements to a forked external process.
*/
def pipe(command: String, env: Map[String, String]): RDD[String] = withScope {
- pipe(command, env)
+ // Similar to Runtime.exec(), if we are given a single string, split it into words
+ // using a standard StringTokenizer (i.e. by spaces)
+ pipe(PipedRDD.tokenize(command), env)
}
/**
diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
index 27cfdc7ace..5d56fc19f0 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
@@ -51,6 +51,22 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
}
}
+ test("basic pipe with tokenization") {
+ if (testCommandAvailable("wc")) {
+ val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+
+ // verify that both RDD.pipe(command: String) and RDD.pipe(command: String, env) work good
+ for (piped <- Seq(nums.pipe("wc -l"), nums.pipe("wc -l", Map[String, String]()))) {
+ val c = piped.collect()
+ assert(c.size === 2)
+ assert(c(0).trim === "2")
+ assert(c(1).trim === "2")
+ }
+ } else {
+ assert(true)
+ }
+ }
+
test("failure in iterating over pipe input") {
if (testCommandAvailable("cat")) {
val nums =