aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-05-12 11:03:10 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-05-12 11:03:10 -0700
commit5dbc9b20061893de1a07f175f9765addf251fd78 (patch)
treef2de6cafd38fb71a1e101540b7847ac9cff9522d /core
parent63e1999f6057bd397b49efe432ad74c0015a101b (diff)
parent7f0833647b784c4ec7cd2f2e8e4fcd5ed6f673cd (diff)
downloadspark-5dbc9b20061893de1a07f175f9765addf251fd78.tar.gz
spark-5dbc9b20061893de1a07f175f9765addf251fd78.tar.bz2
spark-5dbc9b20061893de1a07f175f9765addf251fd78.zip
Merge pull request #608 from pwendell/SPARK-738
SPARK-738: Spark should detect and wrap nonserializable exceptions
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/TaskEndReason.scala14
-rw-r--r--core/src/main/scala/spark/executor/Executor.scala2
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala9
-rw-r--r--core/src/main/scala/spark/scheduler/local/LocalScheduler.scala6
-rw-r--r--core/src/test/scala/spark/DistributedSuite.scala21
5 files changed, 42 insertions, 10 deletions
diff --git a/core/src/main/scala/spark/TaskEndReason.scala b/core/src/main/scala/spark/TaskEndReason.scala
index 420c54bc9a..ca793eb402 100644
--- a/core/src/main/scala/spark/TaskEndReason.scala
+++ b/core/src/main/scala/spark/TaskEndReason.scala
@@ -14,9 +14,17 @@ private[spark] case object Success extends TaskEndReason
private[spark]
case object Resubmitted extends TaskEndReason // Task was finished earlier but we've now lost it
-private[spark]
-case class FetchFailed(bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int) extends TaskEndReason
+private[spark] case class FetchFailed(
+ bmAddress: BlockManagerId,
+ shuffleId: Int,
+ mapId: Int,
+ reduceId: Int)
+ extends TaskEndReason
-private[spark] case class ExceptionFailure(exception: Throwable) extends TaskEndReason
+private[spark] case class ExceptionFailure(
+ className: String,
+ description: String,
+ stackTrace: Array[StackTraceElement])
+ extends TaskEndReason
private[spark] case class OtherFailure(message: String) extends TaskEndReason
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 344face5e6..da20b84544 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -122,7 +122,7 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
}
case t: Throwable => {
- val reason = ExceptionFailure(t)
+ val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace)
context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
// TODO: Should we exit the whole executor here? On the one hand, the failed task may
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index 27e713e2c4..06de3c755e 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -493,7 +493,7 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
return
case ef: ExceptionFailure =>
- val key = ef.exception.toString
+ val key = ef.description
val now = System.currentTimeMillis
val (printFull, dupCount) = {
if (recentExceptions.contains(key)) {
@@ -511,10 +511,11 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
}
}
if (printFull) {
- val locs = ef.exception.getStackTrace.map(loc => "\tat %s".format(loc.toString))
- logInfo("Loss was due to %s\n%s".format(ef.exception.toString, locs.mkString("\n")))
+ val locs = ef.stackTrace.map(loc => "\tat %s".format(loc.toString))
+ logInfo("Loss was due to %s\n%s\n%s".format(
+ ef.className, ef.description, locs.mkString("\n")))
} else {
- logInfo("Loss was due to %s [duplicate %d]".format(ef.exception.toString, dupCount))
+ logInfo("Loss was due to %s [duplicate %d]".format(ef.description, dupCount))
}
case _ => {}
diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
index f060a940a9..ebe42685ad 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
@@ -101,8 +101,10 @@ private[spark] class LocalScheduler(threads: Int, maxFailures: Int, sc: SparkCon
submitTask(task, idInJob)
} else {
// TODO: Do something nicer here to return all the way to the user
- if (!Thread.currentThread().isInterrupted)
- listener.taskEnded(task, new ExceptionFailure(t), null, null, info, null)
+ if (!Thread.currentThread().isInterrupted) {
+ val failure = new ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace)
+ listener.taskEnded(task, failure, null, null, info, null)
+ }
}
}
}
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala
index 4df3bb5b67..33c99471c6 100644
--- a/core/src/test/scala/spark/DistributedSuite.scala
+++ b/core/src/test/scala/spark/DistributedSuite.scala
@@ -18,6 +18,9 @@ import scala.collection.mutable.ArrayBuffer
import SparkContext._
import storage.{GetBlock, BlockManagerWorker, StorageLevel}
+class NotSerializableClass
+class NotSerializableExn(val notSer: NotSerializableClass) extends Throwable() {}
+
class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter with LocalSparkContext {
val clusterUrl = "local-cluster[2,1,512]"
@@ -27,6 +30,24 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
System.clearProperty("spark.storage.memoryFraction")
}
+ test("task throws not serializable exception") {
+ // Ensures that executors do not crash when an exn is not serializable. If executors crash,
+ // this test will hang. Correct behavior is that executors don't crash but fail tasks
+ // and the scheduler throws a SparkException.
+
+ // numSlaves must be less than numPartitions
+ val numSlaves = 3
+ val numPartitions = 10
+
+ sc = new SparkContext("local-cluster[%s,1,512]".format(numSlaves), "test")
+ val data = sc.parallelize(1 to 100, numPartitions).
+ map(x => throw new NotSerializableExn(new NotSerializableClass))
+ intercept[SparkException] {
+ data.count()
+ }
+ resetSparkContext()
+ }
+
test("local-cluster format") {
sc = new SparkContext("local-cluster[2,1,512]", "test")
assert(sc.parallelize(1 to 2, 2).count() == 2)