diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-08-10 00:56:42 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-08-10 00:56:42 -0700 |
commit | e463e7a333577b4e4b693268fba7f4df9f362426 (patch) | |
tree | e49dd8aa47220a1eb43856d7ff8d82292eebdd16 /core | |
parent | 8069bd5b4142a5758818087d501a7498b554dac6 (diff) | |
parent | 59c22fb444e7a03e2272be1d76e16960083d161d (diff) | |
download | spark-e463e7a333577b4e4b693268fba7f4df9f362426.tar.gz spark-e463e7a333577b4e4b693268fba7f4df9f362426.tar.bz2 spark-e463e7a333577b4e4b693268fba7f4df9f362426.zip |
Merge pull request #167 from JoshRosen/piped-rdd-fixes
Detect non-zero exit status from PipedRDD process
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/PipedRDD.scala | 16 | ||||
-rw-r--r-- | core/src/test/scala/spark/PipedRDDSuite.scala | 9 |
2 files changed, 24 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/PipedRDD.scala b/core/src/main/scala/spark/PipedRDD.scala index 4fcfd869cf..3103d7889b 100644 --- a/core/src/main/scala/spark/PipedRDD.scala +++ b/core/src/main/scala/spark/PipedRDD.scala @@ -57,7 +57,21 @@ class PipedRDD[T: ClassManifest]( }.start() // Return an iterator that read lines from the process's stdout - Source.fromInputStream(proc.getInputStream).getLines + val lines = Source.fromInputStream(proc.getInputStream).getLines + return new Iterator[String] { + def next() = lines.next() + def hasNext = { + if (lines.hasNext) { + true + } else { + val exitStatus = proc.waitFor() + if (exitStatus != 0) { + throw new Exception("Subprocess exited with status " + exitStatus) + } + false + } + } + } } } diff --git a/core/src/test/scala/spark/PipedRDDSuite.scala b/core/src/test/scala/spark/PipedRDDSuite.scala index db1b9835a0..d010a9be7a 100644 --- a/core/src/test/scala/spark/PipedRDDSuite.scala +++ b/core/src/test/scala/spark/PipedRDDSuite.scala @@ -39,6 +39,15 @@ class PipedRDDSuite extends FunSuite with BeforeAndAfter { assert(c(1) === "LALALA") } + test("pipe with non-zero exit status") { + sc = new SparkContext("local", "test") + val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) + val piped = nums.pipe("cat nonexistent_file") + intercept[SparkException] { + piped.collect() + } + } + } |