diff options
author | hyukjinkwon <gurwls223@gmail.com> | 2016-12-14 19:27:29 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-12-14 19:27:29 +0000 |
commit | 169b9d73ee2136194df42c8deaaa95572b4ae56c (patch) | |
tree | d7645a92882637c61f1fceb99f7f793b59e838a8 | |
parent | c6b8eb71a9638c9a8ce02d11d5fe26f4c5be531e (diff) | |
download | spark-169b9d73ee2136194df42c8deaaa95572b4ae56c.tar.gz spark-169b9d73ee2136194df42c8deaaa95572b4ae56c.tar.bz2 spark-169b9d73ee2136194df42c8deaaa95572b4ae56c.zip |
[SPARK-18830][TESTS] Fix tests in PipedRDDSuite to pass on Windows
## What changes were proposed in this pull request?
This PR proposes to fix the tests failed on Windows as below:
```
[info] - pipe with empty partition *** FAILED *** (672 milliseconds)
[info] Set(0, 4, 5) did not equal Set(0, 5, 6) (PipedRDDSuite.scala:145)
[info] org.scalatest.exceptions.TestFailedException:
...
```
In this case, `wc -c` counts the characters on both Windows and Linux but the newlines characters on Windows are `\r\n` which are two. So, the counts ends up one more for each.
```
[info] - test pipe exports map_input_file *** FAILED *** (62 milliseconds)
[info] java.lang.IllegalStateException: Subprocess exited with status 1. Command ran: printenv map_input_file
[info] at org.apache.spark.rdd.PipedRDD$$anon$1.hasNext(PipedRDD.scala:178)
...
```
```
[info] - test pipe exports mapreduce_map_input_file *** FAILED *** (172 milliseconds)
[info] java.lang.IllegalStateException: Subprocess exited with status 1. Command ran: printenv mapreduce_map_input_file
[info] at org.apache.spark.rdd.PipedRDD$$anon$1.hasNext(PipedRDD.scala:178)
...
```
`printenv` command prints the environment variables; however, when environment variables are set to `ProcessBuilder` as lower-cased keys, `printenv` in Windows ignores and does not print this although it is actually set and accessible. (this was tested in [here](https://ci.appveyor.com/project/spark-test/spark/build/208-PipedRDDSuite) for upper-cases with this [diff](https://github.com/apache/spark/compare/master...spark-test:74d39da) and [here](https://ci.appveyor.com/project/spark-test/spark/build/203-PipedRDDSuite) for lower-cases with this [diff](https://github.com/apache/spark/compare/master...spark-test:fde5e37f28032c15a8d8693ba033a8a779a26317). It seems a bug in `printenv`.
(BTW, note that environment variables on Windows are case-insensitive).
This is (I believe) a thirdparty tool on Windows that resembles `printenv` on Linux (installed in AppVeyor environment or Windows Server 2012 R2). This command does not exist, at least, for Windows 7 and 10 (manually tested).
On Windows, we can use `cmd.exe /C set [varname]` officially for this purpose. We could fix the tests with this in order to test if the environment variable is set.
## How was this patch tested?
Manually tested via AppVeyor.
**Before**
https://ci.appveyor.com/project/spark-test/spark/build/194-PipedRDDSuite
**After**
https://ci.appveyor.com/project/spark-test/spark/build/226-PipedRDDSuite
Author: hyukjinkwon <gurwls223@gmail.com>
Closes #16254 from HyukjinKwon/pipe-errors.
-rw-r--r-- | core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala | 308 |
1 files changed, 151 insertions, 157 deletions
diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala index 7293aa9a25..287ae6ff6e 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala @@ -32,109 +32,104 @@ import org.apache.spark._ import org.apache.spark.util.Utils class PipedRDDSuite extends SparkFunSuite with SharedSparkContext { + val envCommand = if (Utils.isWindows) { + "cmd.exe /C set" + } else { + "printenv" + } test("basic pipe") { - if (testCommandAvailable("cat")) { - val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) + assume(testCommandAvailable("cat")) + val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) - val piped = nums.pipe(Seq("cat")) + val piped = nums.pipe(Seq("cat")) - val c = piped.collect() - assert(c.size === 4) - assert(c(0) === "1") - assert(c(1) === "2") - assert(c(2) === "3") - assert(c(3) === "4") - } else { - assert(true) - } + val c = piped.collect() + assert(c.size === 4) + assert(c(0) === "1") + assert(c(1) === "2") + assert(c(2) === "3") + assert(c(3) === "4") } test("basic pipe with tokenization") { - if (testCommandAvailable("wc")) { - val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) - - // verify that both RDD.pipe(command: String) and RDD.pipe(command: String, env) work good - for (piped <- Seq(nums.pipe("wc -l"), nums.pipe("wc -l", Map[String, String]()))) { - val c = piped.collect() - assert(c.size === 2) - assert(c(0).trim === "2") - assert(c(1).trim === "2") - } - } else { - assert(true) + assume(testCommandAvailable("wc")) + val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) + + // verify that both RDD.pipe(command: String) and RDD.pipe(command: String, env) work good + for (piped <- Seq(nums.pipe("wc -l"), nums.pipe("wc -l", Map[String, String]()))) { + val c = piped.collect() + assert(c.size === 2) + assert(c(0).trim === "2") + assert(c(1).trim === "2") } } test("failure in iterating over pipe input") { - if (testCommandAvailable("cat")) { - val nums = - sc.makeRDD(Array(1, 2, 3, 4), 2) - .mapPartitionsWithIndex((index, iterator) => { - new Iterator[Int] { - def hasNext = true - def next() = { - throw new SparkException("Exception to simulate bad scenario") - } - } - }) - - val piped = nums.pipe(Seq("cat")) - - intercept[SparkException] { - piped.collect() - } + assume(testCommandAvailable("cat")) + val nums = + sc.makeRDD(Array(1, 2, 3, 4), 2) + .mapPartitionsWithIndex((index, iterator) => { + new Iterator[Int] { + def hasNext = true + def next() = { + throw new SparkException("Exception to simulate bad scenario") + } + } + }) + + val piped = nums.pipe(Seq("cat")) + + intercept[SparkException] { + piped.collect() } } test("advanced pipe") { - if (testCommandAvailable("cat")) { - val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) - val bl = sc.broadcast(List("0")) - - val piped = nums.pipe(Seq("cat"), + assume(testCommandAvailable("cat")) + val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) + val bl = sc.broadcast(List("0")) + + val piped = nums.pipe(Seq("cat"), + Map[String, String](), + (f: String => Unit) => { + bl.value.foreach(f); f("\u0001") + }, + (i: Int, f: String => Unit) => f(i + "_")) + + val c = piped.collect() + + assert(c.size === 8) + assert(c(0) === "0") + assert(c(1) === "\u0001") + assert(c(2) === "1_") + assert(c(3) === "2_") + assert(c(4) === "0") + assert(c(5) === "\u0001") + assert(c(6) === "3_") + assert(c(7) === "4_") + + val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2) + val d = nums1.groupBy(str => str.split("\t")(0)). + pipe(Seq("cat"), Map[String, String](), (f: String => Unit) => { bl.value.foreach(f); f("\u0001") }, - (i: Int, f: String => Unit) => f(i + "_")) - - val c = piped.collect() - - assert(c.size === 8) - assert(c(0) === "0") - assert(c(1) === "\u0001") - assert(c(2) === "1_") - assert(c(3) === "2_") - assert(c(4) === "0") - assert(c(5) === "\u0001") - assert(c(6) === "3_") - assert(c(7) === "4_") - - val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2) - val d = nums1.groupBy(str => str.split("\t")(0)). - pipe(Seq("cat"), - Map[String, String](), - (f: String => Unit) => { - bl.value.foreach(f); f("\u0001") - }, - (i: Tuple2[String, Iterable[String]], f: String => Unit) => { - for (e <- i._2) { - f(e + "_") - } - }).collect() - assert(d.size === 8) - assert(d(0) === "0") - assert(d(1) === "\u0001") - assert(d(2) === "b\t2_") - assert(d(3) === "b\t4_") - assert(d(4) === "0") - assert(d(5) === "\u0001") - assert(d(6) === "a\t1_") - assert(d(7) === "a\t3_") - } else { - assert(true) - } + (i: Tuple2[String, Iterable[String]], f: String => Unit) => { + for (e <- i._2) { + f(e + "_") + } + }).collect() + assert(d.size === 8) + assert(d(0) === "0") + assert(d(1) === "\u0001") + assert(d(2) === "b\t2_") + assert(d(3) === "b\t4_") + assert(d(4) === "0") + assert(d(5) === "\u0001") + assert(d(6) === "a\t1_") + assert(d(7) === "a\t3_") } test("pipe with empty partition") { @@ -142,67 +137,67 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext { val piped = data.pipe("wc -c") assert(piped.count == 8) val charCounts = piped.map(_.trim.toInt).collect().toSet - assert(Set(0, 4, 5) == charCounts) + val expected = if (Utils.isWindows) { + // Note that newline character on Windows is \r\n which are two. + Set(0, 5, 6) + } else { + Set(0, 4, 5) + } + assert(expected == charCounts) } test("pipe with env variable") { - if (testCommandAvailable("printenv")) { - val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) - val piped = nums.pipe(Seq("printenv", "MY_TEST_ENV"), Map("MY_TEST_ENV" -> "LALALA")) - val c = piped.collect() - assert(c.size === 2) - assert(c(0) === "LALALA") - assert(c(1) === "LALALA") - } else { - assert(true) - } + assume(testCommandAvailable(envCommand)) + val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) + val piped = nums.pipe(s"$envCommand MY_TEST_ENV", Map("MY_TEST_ENV" -> "LALALA")) + val c = piped.collect() + assert(c.length === 2) + // On Windows, `cmd.exe /C set` is used which prints out it as `varname=value` format + // whereas `printenv` usually prints out `value`. So, `varname=` is stripped here for both. + assert(c(0).stripPrefix("MY_TEST_ENV=") === "LALALA") + assert(c(1).stripPrefix("MY_TEST_ENV=") === "LALALA") } test("pipe with process which cannot be launched due to bad command") { - if (!testCommandAvailable("some_nonexistent_command")) { - val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) - val command = Seq("some_nonexistent_command") - val piped = nums.pipe(command) - val exception = intercept[SparkException] { - piped.collect() - } - assert(exception.getMessage.contains(command.mkString(" "))) + assume(!testCommandAvailable("some_nonexistent_command")) + val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) + val command = Seq("some_nonexistent_command") + val piped = nums.pipe(command) + val exception = intercept[SparkException] { + piped.collect() } + assert(exception.getMessage.contains(command.mkString(" "))) } test("pipe with process which is launched but fails with non-zero exit status") { - if (testCommandAvailable("cat")) { - val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) - val command = Seq("cat", "nonexistent_file") - val piped = nums.pipe(command) - val exception = intercept[SparkException] { - piped.collect() - } - assert(exception.getMessage.contains(command.mkString(" "))) + assume(testCommandAvailable("cat")) + val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) + val command = Seq("cat", "nonexistent_file") + val piped = nums.pipe(command) + val exception = intercept[SparkException] { + piped.collect() } + assert(exception.getMessage.contains(command.mkString(" "))) } test("basic pipe with separate working directory") { - if (testCommandAvailable("cat")) { - val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) - val piped = nums.pipe(Seq("cat"), separateWorkingDir = true) - val c = piped.collect() - assert(c.size === 4) - assert(c(0) === "1") - assert(c(1) === "2") - assert(c(2) === "3") - assert(c(3) === "4") - val pipedPwd = nums.pipe(Seq("pwd"), separateWorkingDir = true) - val collectPwd = pipedPwd.collect() - assert(collectPwd(0).contains("tasks/")) - val pipedLs = nums.pipe(Seq("ls"), separateWorkingDir = true, bufferSize = 16384).collect() - // make sure symlinks were created - assert(pipedLs.length > 0) - // clean up top level tasks directory - Utils.deleteRecursively(new File("tasks")) - } else { - assert(true) - } + assume(testCommandAvailable("cat")) + val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) + val piped = nums.pipe(Seq("cat"), separateWorkingDir = true) + val c = piped.collect() + assert(c.size === 4) + assert(c(0) === "1") + assert(c(1) === "2") + assert(c(2) === "3") + assert(c(3) === "4") + val pipedPwd = nums.pipe(Seq("pwd"), separateWorkingDir = true) + val collectPwd = pipedPwd.collect() + assert(collectPwd(0).contains("tasks/")) + val pipedLs = nums.pipe(Seq("ls"), separateWorkingDir = true, bufferSize = 16384).collect() + // make sure symlinks were created + assert(pipedLs.length > 0) + // clean up top level tasks directory + Utils.deleteRecursively(new File("tasks")) } test("test pipe exports map_input_file") { @@ -219,36 +214,35 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext { } def testExportInputFile(varName: String) { - if (testCommandAvailable("printenv")) { - val nums = new HadoopRDD(sc, new JobConf(), classOf[TextInputFormat], classOf[LongWritable], - classOf[Text], 2) { - override def getPartitions: Array[Partition] = Array(generateFakeHadoopPartition()) + assume(testCommandAvailable(envCommand)) + val nums = new HadoopRDD(sc, new JobConf(), classOf[TextInputFormat], classOf[LongWritable], + classOf[Text], 2) { + override def getPartitions: Array[Partition] = Array(generateFakeHadoopPartition()) - override val getDependencies = List[Dependency[_]]() + override val getDependencies = List[Dependency[_]]() - override def compute(theSplit: Partition, context: TaskContext) = { - new InterruptibleIterator[(LongWritable, Text)](context, Iterator((new LongWritable(1), - new Text("b")))) - } + override def compute(theSplit: Partition, context: TaskContext) = { + new InterruptibleIterator[(LongWritable, Text)](context, Iterator((new LongWritable(1), + new Text("b")))) } - val hadoopPart1 = generateFakeHadoopPartition() - val pipedRdd = - new PipedRDD( - nums, - PipedRDD.tokenize("printenv " + varName), - Map(), - null, - null, - false, - 4092, - Codec.defaultCharsetCodec.name) - val tContext = TaskContext.empty() - val rddIter = pipedRdd.compute(hadoopPart1, tContext) - val arr = rddIter.toArray - assert(arr(0) == "/some/path") - } else { - // printenv isn't available so just pass the test } + val hadoopPart1 = generateFakeHadoopPartition() + val pipedRdd = + new PipedRDD( + nums, + PipedRDD.tokenize(s"$envCommand $varName"), + Map(), + null, + null, + false, + 4092, + Codec.defaultCharsetCodec.name) + val tContext = TaskContext.empty() + val rddIter = pipedRdd.compute(hadoopPart1, tContext) + val arr = rddIter.toArray + // On Windows, `cmd.exe /C set` is used which prints out it as `varname=value` format + // whereas `printenv` usually prints out `value`. So, `varname=` is stripped here for both. + assert(arr(0).stripPrefix(s"$varName=") === "/some/path") } def generateFakeHadoopPartition(): HadoopPartition = { |