aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2016-12-14 19:27:29 +0000
committerSean Owen <sowen@cloudera.com>2016-12-14 19:27:29 +0000
commit169b9d73ee2136194df42c8deaaa95572b4ae56c (patch)
treed7645a92882637c61f1fceb99f7f793b59e838a8
parentc6b8eb71a9638c9a8ce02d11d5fe26f4c5be531e (diff)
downloadspark-169b9d73ee2136194df42c8deaaa95572b4ae56c.tar.gz
spark-169b9d73ee2136194df42c8deaaa95572b4ae56c.tar.bz2
spark-169b9d73ee2136194df42c8deaaa95572b4ae56c.zip
[SPARK-18830][TESTS] Fix tests in PipedRDDSuite to pass on Windows
## What changes were proposed in this pull request? This PR proposes to fix the tests failed on Windows as below: ``` [info] - pipe with empty partition *** FAILED *** (672 milliseconds) [info] Set(0, 4, 5) did not equal Set(0, 5, 6) (PipedRDDSuite.scala:145) [info] org.scalatest.exceptions.TestFailedException: ... ``` In this case, `wc -c` counts the characters on both Windows and Linux but the newlines characters on Windows are `\r\n` which are two. So, the counts ends up one more for each. ``` [info] - test pipe exports map_input_file *** FAILED *** (62 milliseconds) [info] java.lang.IllegalStateException: Subprocess exited with status 1. Command ran: printenv map_input_file [info] at org.apache.spark.rdd.PipedRDD$$anon$1.hasNext(PipedRDD.scala:178) ... ``` ``` [info] - test pipe exports mapreduce_map_input_file *** FAILED *** (172 milliseconds) [info] java.lang.IllegalStateException: Subprocess exited with status 1. Command ran: printenv mapreduce_map_input_file [info] at org.apache.spark.rdd.PipedRDD$$anon$1.hasNext(PipedRDD.scala:178) ... ``` `printenv` command prints the environment variables; however, when environment variables are set to `ProcessBuilder` as lower-cased keys, `printenv` in Windows ignores and does not print this although it is actually set and accessible. (this was tested in [here](https://ci.appveyor.com/project/spark-test/spark/build/208-PipedRDDSuite) for upper-cases with this [diff](https://github.com/apache/spark/compare/master...spark-test:74d39da) and [here](https://ci.appveyor.com/project/spark-test/spark/build/203-PipedRDDSuite) for lower-cases with this [diff](https://github.com/apache/spark/compare/master...spark-test:fde5e37f28032c15a8d8693ba033a8a779a26317). It seems a bug in `printenv`. (BTW, note that environment variables on Windows are case-insensitive). This is (I believe) a thirdparty tool on Windows that resembles `printenv` on Linux (installed in AppVeyor environment or Windows Server 2012 R2). This command does not exist, at least, for Windows 7 and 10 (manually tested). On Windows, we can use `cmd.exe /C set [varname]` officially for this purpose. We could fix the tests with this in order to test if the environment variable is set. ## How was this patch tested? Manually tested via AppVeyor. **Before** https://ci.appveyor.com/project/spark-test/spark/build/194-PipedRDDSuite **After** https://ci.appveyor.com/project/spark-test/spark/build/226-PipedRDDSuite Author: hyukjinkwon <gurwls223@gmail.com> Closes #16254 from HyukjinKwon/pipe-errors.
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala308
1 files changed, 151 insertions, 157 deletions
diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
index 7293aa9a25..287ae6ff6e 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala
@@ -32,109 +32,104 @@ import org.apache.spark._
import org.apache.spark.util.Utils
class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
+ val envCommand = if (Utils.isWindows) {
+ "cmd.exe /C set"
+ } else {
+ "printenv"
+ }
test("basic pipe") {
- if (testCommandAvailable("cat")) {
- val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+ assume(testCommandAvailable("cat"))
+ val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
- val piped = nums.pipe(Seq("cat"))
+ val piped = nums.pipe(Seq("cat"))
- val c = piped.collect()
- assert(c.size === 4)
- assert(c(0) === "1")
- assert(c(1) === "2")
- assert(c(2) === "3")
- assert(c(3) === "4")
- } else {
- assert(true)
- }
+ val c = piped.collect()
+ assert(c.size === 4)
+ assert(c(0) === "1")
+ assert(c(1) === "2")
+ assert(c(2) === "3")
+ assert(c(3) === "4")
}
test("basic pipe with tokenization") {
- if (testCommandAvailable("wc")) {
- val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
-
- // verify that both RDD.pipe(command: String) and RDD.pipe(command: String, env) work good
- for (piped <- Seq(nums.pipe("wc -l"), nums.pipe("wc -l", Map[String, String]()))) {
- val c = piped.collect()
- assert(c.size === 2)
- assert(c(0).trim === "2")
- assert(c(1).trim === "2")
- }
- } else {
- assert(true)
+ assume(testCommandAvailable("wc"))
+ val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+
+ // verify that both RDD.pipe(command: String) and RDD.pipe(command: String, env) work good
+ for (piped <- Seq(nums.pipe("wc -l"), nums.pipe("wc -l", Map[String, String]()))) {
+ val c = piped.collect()
+ assert(c.size === 2)
+ assert(c(0).trim === "2")
+ assert(c(1).trim === "2")
}
}
test("failure in iterating over pipe input") {
- if (testCommandAvailable("cat")) {
- val nums =
- sc.makeRDD(Array(1, 2, 3, 4), 2)
- .mapPartitionsWithIndex((index, iterator) => {
- new Iterator[Int] {
- def hasNext = true
- def next() = {
- throw new SparkException("Exception to simulate bad scenario")
- }
- }
- })
-
- val piped = nums.pipe(Seq("cat"))
-
- intercept[SparkException] {
- piped.collect()
- }
+ assume(testCommandAvailable("cat"))
+ val nums =
+ sc.makeRDD(Array(1, 2, 3, 4), 2)
+ .mapPartitionsWithIndex((index, iterator) => {
+ new Iterator[Int] {
+ def hasNext = true
+ def next() = {
+ throw new SparkException("Exception to simulate bad scenario")
+ }
+ }
+ })
+
+ val piped = nums.pipe(Seq("cat"))
+
+ intercept[SparkException] {
+ piped.collect()
}
}
test("advanced pipe") {
- if (testCommandAvailable("cat")) {
- val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
- val bl = sc.broadcast(List("0"))
-
- val piped = nums.pipe(Seq("cat"),
+ assume(testCommandAvailable("cat"))
+ val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+ val bl = sc.broadcast(List("0"))
+
+ val piped = nums.pipe(Seq("cat"),
+ Map[String, String](),
+ (f: String => Unit) => {
+ bl.value.foreach(f); f("\u0001")
+ },
+ (i: Int, f: String => Unit) => f(i + "_"))
+
+ val c = piped.collect()
+
+ assert(c.size === 8)
+ assert(c(0) === "0")
+ assert(c(1) === "\u0001")
+ assert(c(2) === "1_")
+ assert(c(3) === "2_")
+ assert(c(4) === "0")
+ assert(c(5) === "\u0001")
+ assert(c(6) === "3_")
+ assert(c(7) === "4_")
+
+ val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2)
+ val d = nums1.groupBy(str => str.split("\t")(0)).
+ pipe(Seq("cat"),
Map[String, String](),
(f: String => Unit) => {
bl.value.foreach(f); f("\u0001")
},
- (i: Int, f: String => Unit) => f(i + "_"))
-
- val c = piped.collect()
-
- assert(c.size === 8)
- assert(c(0) === "0")
- assert(c(1) === "\u0001")
- assert(c(2) === "1_")
- assert(c(3) === "2_")
- assert(c(4) === "0")
- assert(c(5) === "\u0001")
- assert(c(6) === "3_")
- assert(c(7) === "4_")
-
- val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2)
- val d = nums1.groupBy(str => str.split("\t")(0)).
- pipe(Seq("cat"),
- Map[String, String](),
- (f: String => Unit) => {
- bl.value.foreach(f); f("\u0001")
- },
- (i: Tuple2[String, Iterable[String]], f: String => Unit) => {
- for (e <- i._2) {
- f(e + "_")
- }
- }).collect()
- assert(d.size === 8)
- assert(d(0) === "0")
- assert(d(1) === "\u0001")
- assert(d(2) === "b\t2_")
- assert(d(3) === "b\t4_")
- assert(d(4) === "0")
- assert(d(5) === "\u0001")
- assert(d(6) === "a\t1_")
- assert(d(7) === "a\t3_")
- } else {
- assert(true)
- }
+ (i: Tuple2[String, Iterable[String]], f: String => Unit) => {
+ for (e <- i._2) {
+ f(e + "_")
+ }
+ }).collect()
+ assert(d.size === 8)
+ assert(d(0) === "0")
+ assert(d(1) === "\u0001")
+ assert(d(2) === "b\t2_")
+ assert(d(3) === "b\t4_")
+ assert(d(4) === "0")
+ assert(d(5) === "\u0001")
+ assert(d(6) === "a\t1_")
+ assert(d(7) === "a\t3_")
}
test("pipe with empty partition") {
@@ -142,67 +137,67 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
val piped = data.pipe("wc -c")
assert(piped.count == 8)
val charCounts = piped.map(_.trim.toInt).collect().toSet
- assert(Set(0, 4, 5) == charCounts)
+ val expected = if (Utils.isWindows) {
+ // Note that newline character on Windows is \r\n which are two.
+ Set(0, 5, 6)
+ } else {
+ Set(0, 4, 5)
+ }
+ assert(expected == charCounts)
}
test("pipe with env variable") {
- if (testCommandAvailable("printenv")) {
- val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
- val piped = nums.pipe(Seq("printenv", "MY_TEST_ENV"), Map("MY_TEST_ENV" -> "LALALA"))
- val c = piped.collect()
- assert(c.size === 2)
- assert(c(0) === "LALALA")
- assert(c(1) === "LALALA")
- } else {
- assert(true)
- }
+ assume(testCommandAvailable(envCommand))
+ val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+ val piped = nums.pipe(s"$envCommand MY_TEST_ENV", Map("MY_TEST_ENV" -> "LALALA"))
+ val c = piped.collect()
+ assert(c.length === 2)
+ // On Windows, `cmd.exe /C set` is used which prints out it as `varname=value` format
+ // whereas `printenv` usually prints out `value`. So, `varname=` is stripped here for both.
+ assert(c(0).stripPrefix("MY_TEST_ENV=") === "LALALA")
+ assert(c(1).stripPrefix("MY_TEST_ENV=") === "LALALA")
}
test("pipe with process which cannot be launched due to bad command") {
- if (!testCommandAvailable("some_nonexistent_command")) {
- val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
- val command = Seq("some_nonexistent_command")
- val piped = nums.pipe(command)
- val exception = intercept[SparkException] {
- piped.collect()
- }
- assert(exception.getMessage.contains(command.mkString(" ")))
+ assume(!testCommandAvailable("some_nonexistent_command"))
+ val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+ val command = Seq("some_nonexistent_command")
+ val piped = nums.pipe(command)
+ val exception = intercept[SparkException] {
+ piped.collect()
}
+ assert(exception.getMessage.contains(command.mkString(" ")))
}
test("pipe with process which is launched but fails with non-zero exit status") {
- if (testCommandAvailable("cat")) {
- val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
- val command = Seq("cat", "nonexistent_file")
- val piped = nums.pipe(command)
- val exception = intercept[SparkException] {
- piped.collect()
- }
- assert(exception.getMessage.contains(command.mkString(" ")))
+ assume(testCommandAvailable("cat"))
+ val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+ val command = Seq("cat", "nonexistent_file")
+ val piped = nums.pipe(command)
+ val exception = intercept[SparkException] {
+ piped.collect()
}
+ assert(exception.getMessage.contains(command.mkString(" ")))
}
test("basic pipe with separate working directory") {
- if (testCommandAvailable("cat")) {
- val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
- val piped = nums.pipe(Seq("cat"), separateWorkingDir = true)
- val c = piped.collect()
- assert(c.size === 4)
- assert(c(0) === "1")
- assert(c(1) === "2")
- assert(c(2) === "3")
- assert(c(3) === "4")
- val pipedPwd = nums.pipe(Seq("pwd"), separateWorkingDir = true)
- val collectPwd = pipedPwd.collect()
- assert(collectPwd(0).contains("tasks/"))
- val pipedLs = nums.pipe(Seq("ls"), separateWorkingDir = true, bufferSize = 16384).collect()
- // make sure symlinks were created
- assert(pipedLs.length > 0)
- // clean up top level tasks directory
- Utils.deleteRecursively(new File("tasks"))
- } else {
- assert(true)
- }
+ assume(testCommandAvailable("cat"))
+ val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+ val piped = nums.pipe(Seq("cat"), separateWorkingDir = true)
+ val c = piped.collect()
+ assert(c.size === 4)
+ assert(c(0) === "1")
+ assert(c(1) === "2")
+ assert(c(2) === "3")
+ assert(c(3) === "4")
+ val pipedPwd = nums.pipe(Seq("pwd"), separateWorkingDir = true)
+ val collectPwd = pipedPwd.collect()
+ assert(collectPwd(0).contains("tasks/"))
+ val pipedLs = nums.pipe(Seq("ls"), separateWorkingDir = true, bufferSize = 16384).collect()
+ // make sure symlinks were created
+ assert(pipedLs.length > 0)
+ // clean up top level tasks directory
+ Utils.deleteRecursively(new File("tasks"))
}
test("test pipe exports map_input_file") {
@@ -219,36 +214,35 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext {
}
def testExportInputFile(varName: String) {
- if (testCommandAvailable("printenv")) {
- val nums = new HadoopRDD(sc, new JobConf(), classOf[TextInputFormat], classOf[LongWritable],
- classOf[Text], 2) {
- override def getPartitions: Array[Partition] = Array(generateFakeHadoopPartition())
+ assume(testCommandAvailable(envCommand))
+ val nums = new HadoopRDD(sc, new JobConf(), classOf[TextInputFormat], classOf[LongWritable],
+ classOf[Text], 2) {
+ override def getPartitions: Array[Partition] = Array(generateFakeHadoopPartition())
- override val getDependencies = List[Dependency[_]]()
+ override val getDependencies = List[Dependency[_]]()
- override def compute(theSplit: Partition, context: TaskContext) = {
- new InterruptibleIterator[(LongWritable, Text)](context, Iterator((new LongWritable(1),
- new Text("b"))))
- }
+ override def compute(theSplit: Partition, context: TaskContext) = {
+ new InterruptibleIterator[(LongWritable, Text)](context, Iterator((new LongWritable(1),
+ new Text("b"))))
}
- val hadoopPart1 = generateFakeHadoopPartition()
- val pipedRdd =
- new PipedRDD(
- nums,
- PipedRDD.tokenize("printenv " + varName),
- Map(),
- null,
- null,
- false,
- 4092,
- Codec.defaultCharsetCodec.name)
- val tContext = TaskContext.empty()
- val rddIter = pipedRdd.compute(hadoopPart1, tContext)
- val arr = rddIter.toArray
- assert(arr(0) == "/some/path")
- } else {
- // printenv isn't available so just pass the test
}
+ val hadoopPart1 = generateFakeHadoopPartition()
+ val pipedRdd =
+ new PipedRDD(
+ nums,
+ PipedRDD.tokenize(s"$envCommand $varName"),
+ Map(),
+ null,
+ null,
+ false,
+ 4092,
+ Codec.defaultCharsetCodec.name)
+ val tContext = TaskContext.empty()
+ val rddIter = pipedRdd.compute(hadoopPart1, tContext)
+ val arr = rddIter.toArray
+ // On Windows, `cmd.exe /C set` is used which prints out it as `varname=value` format
+ // whereas `printenv` usually prints out `value`. So, `varname=` is stripped here for both.
+ assert(arr(0).stripPrefix(s"$varName=") === "/some/path")
}
def generateFakeHadoopPartition(): HadoopPartition = {