aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNilanjan Raychaudhuri <nraychaudhuri@gmail.com>2016-02-01 13:33:24 -0800
committerAndrew Or <andrew@databricks.com>2016-02-01 13:33:24 -0800
commita41b68b954ba47284a1df312f0aaea29b0721b0a (patch)
tree64e21e5bc16dcf9e949a54546de42c700996cedb
parent51b03b71ffc390e67b32936efba61e614a8b0d86 (diff)
downloadspark-a41b68b954ba47284a1df312f0aaea29b0721b0a.tar.gz
spark-a41b68b954ba47284a1df312f0aaea29b0721b0a.tar.bz2
spark-a41b68b954ba47284a1df312f0aaea29b0721b0a.zip
[SPARK-12265][MESOS] Spark calls System.exit inside driver instead of throwing exception
This takes over #10729 and makes sure that `spark-shell` fails with a proper error message. There is a slight behavioral change: before this change `spark-shell` would exit, while now the REPL is still there, but `sc` and `sqlContext` are not defined and the error is visible to the user. Author: Nilanjan Raychaudhuri <nraychaudhuri@gmail.com> Author: Iulian Dragos <jaguarul@gmail.com> Closes #10921 from dragos/pr/10729.
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala21
3 files changed, 19 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 05fda0fded..e77d77208c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -573,6 +573,7 @@ private[spark] class MesosClusterScheduler(
override def slaveLost(driver: SchedulerDriver, slaveId: SlaveID): Unit = {}
override def error(driver: SchedulerDriver, error: String): Unit = {
logError("Error received: " + error)
+ markErr()
}
/**
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index eaf0cb06d6..a8bf79a78c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -375,6 +375,7 @@ private[spark] class MesosSchedulerBackend(
override def error(d: SchedulerDriver, message: String) {
inClassLoader() {
logError("Mesos error: " + message)
+ markErr()
scheduler.error(message)
}
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 010caff3e3..f9f5da9bc8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -106,28 +106,37 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
registerLatch.await()
return
}
+ @volatile
+ var error: Option[Exception] = None
+ // We create a new thread that will block inside `mesosDriver.run`
+ // until the scheduler exists
new Thread(Utils.getFormattedClassName(this) + "-mesos-driver") {
setDaemon(true)
-
override def run() {
- mesosDriver = newDriver
try {
+ mesosDriver = newDriver
val ret = mesosDriver.run()
logInfo("driver.run() returned with code " + ret)
if (ret != null && ret.equals(Status.DRIVER_ABORTED)) {
- System.exit(1)
+ error = Some(new SparkException("Error starting driver, DRIVER_ABORTED"))
+ markErr()
}
} catch {
case e: Exception => {
logError("driver.run() failed", e)
- System.exit(1)
+ error = Some(e)
+ markErr()
}
}
}
}.start()
registerLatch.await()
+
+ // propagate any error to the calling thread. This ensures that SparkContext creation fails
+ // without leaving a broken context that won't be able to schedule any tasks
+ error.foreach(throw _)
}
}
@@ -144,6 +153,10 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
registerLatch.countDown()
}
+ protected def markErr(): Unit = {
+ registerLatch.countDown()
+ }
+
def createResource(name: String, amount: Double, role: Option[String] = None): Resource = {
val builder = Resource.newBuilder()
.setName(name)