diff options
author | Liwei Lin <lwlin7@gmail.com> | 2016-07-19 10:24:48 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-07-19 10:24:48 -0700 |
commit | 0bd76e872b60cb80295fc12654e370cf22390056 (patch) | |
tree | d5377d139b31babe5d6c3bae91f6345201d91831 | |
parent | 670891496a82538a5e2bf981a4044fb6f4cbb062 (diff) | |
download | spark-0bd76e872b60cb80295fc12654e370cf22390056.tar.gz spark-0bd76e872b60cb80295fc12654e370cf22390056.tar.bz2 spark-0bd76e872b60cb80295fc12654e370cf22390056.zip |
[SPARK-16620][CORE] Add back the tokenization process in `RDD.pipe(command: String)`
## What changes were proposed in this pull request?
Currently `RDD.pipe(command: String)`:
- works only when the command is specified without any options, such as `RDD.pipe("wc")`
- does NOT work when the command is specified with some options, such as `RDD.pipe("wc -l")`
This is a regression from Spark 1.6.
This patch adds back the tokenization process in `RDD.pipe(command: String)` to fix this regression.
## How was this patch tested?
Added a test which:
- would pass in `1.6`
- _[prior to this patch]_ would fail in `master`
- _[after this patch]_ would pass in `master`
Author: Liwei Lin <lwlin7@gmail.com>
Closes #14256 from lw-lin/rdd-pipe.
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/RDD.scala | 8 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala | 16 |
2 files changed, 22 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index b7a5b22208..0804cdeb04 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -699,14 +699,18 @@ abstract class RDD[T: ClassTag]( * Return an RDD created by piping elements to a forked external process. */ def pipe(command: String): RDD[String] = withScope { - pipe(command) + // Similar to Runtime.exec(), if we are given a single string, split it into words + // using a standard StringTokenizer (i.e. by spaces) + pipe(PipedRDD.tokenize(command)) } /** * Return an RDD created by piping elements to a forked external process. */ def pipe(command: String, env: Map[String, String]): RDD[String] = withScope { - pipe(command, env) + // Similar to Runtime.exec(), if we are given a single string, split it into words + // using a standard StringTokenizer (i.e. by spaces) + pipe(PipedRDD.tokenize(command), env) } /** diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala index 27cfdc7ace..5d56fc19f0 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala @@ -51,6 +51,22 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext { } } + test("basic pipe with tokenization") { + if (testCommandAvailable("wc")) { + val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) + + // verify that both RDD.pipe(command: String) and RDD.pipe(command: String, env) work good + for (piped <- Seq(nums.pipe("wc -l"), nums.pipe("wc -l", Map[String, String]()))) { + val c = piped.collect() + assert(c.size === 2) + assert(c(0).trim === "2") + assert(c(1).trim === "2") + } + } else { + assert(true) + } + } + test("failure in iterating over pipe input") { if (testCommandAvailable("cat")) { val nums = |