aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-08-10 00:56:42 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-08-10 00:56:42 -0700
commite463e7a333577b4e4b693268fba7f4df9f362426 (patch)
treee49dd8aa47220a1eb43856d7ff8d82292eebdd16 /core
parent8069bd5b4142a5758818087d501a7498b554dac6 (diff)
parent59c22fb444e7a03e2272be1d76e16960083d161d (diff)
downloadspark-e463e7a333577b4e4b693268fba7f4df9f362426.tar.gz
spark-e463e7a333577b4e4b693268fba7f4df9f362426.tar.bz2
spark-e463e7a333577b4e4b693268fba7f4df9f362426.zip
Merge pull request #167 from JoshRosen/piped-rdd-fixes
Detect non-zero exit status from PipedRDD process
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/PipedRDD.scala16
-rw-r--r--core/src/test/scala/spark/PipedRDDSuite.scala9
2 files changed, 24 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/PipedRDD.scala b/core/src/main/scala/spark/PipedRDD.scala
index 4fcfd869cf..3103d7889b 100644
--- a/core/src/main/scala/spark/PipedRDD.scala
+++ b/core/src/main/scala/spark/PipedRDD.scala
@@ -57,7 +57,21 @@ class PipedRDD[T: ClassManifest](
}.start()
// Return an iterator that read lines from the process's stdout
- Source.fromInputStream(proc.getInputStream).getLines
+ val lines = Source.fromInputStream(proc.getInputStream).getLines
+ return new Iterator[String] {
+ def next() = lines.next()
+ def hasNext = {
+ if (lines.hasNext) {
+ true
+ } else {
+ val exitStatus = proc.waitFor()
+ if (exitStatus != 0) {
+ throw new Exception("Subprocess exited with status " + exitStatus)
+ }
+ false
+ }
+ }
+ }
}
}
diff --git a/core/src/test/scala/spark/PipedRDDSuite.scala b/core/src/test/scala/spark/PipedRDDSuite.scala
index db1b9835a0..d010a9be7a 100644
--- a/core/src/test/scala/spark/PipedRDDSuite.scala
+++ b/core/src/test/scala/spark/PipedRDDSuite.scala
@@ -39,6 +39,15 @@ class PipedRDDSuite extends FunSuite with BeforeAndAfter {
assert(c(1) === "LALALA")
}
+ test("pipe with non-zero exit status") {
+ sc = new SparkContext("local", "test")
+ val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
+ val piped = nums.pipe("cat nonexistent_file")
+ intercept[SparkException] {
+ piped.collect()
+ }
+ }
+
}