aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-05-11 14:17:09 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-05-11 14:17:09 -0700
commit0345954530a445b275595962c9f949cad55a01f6 (patch)
tree675b3e94c0cb93eda48eb440ca82ebdf6eee711e
parent63e1999f6057bd397b49efe432ad74c0015a101b (diff)
downloadspark-0345954530a445b275595962c9f949cad55a01f6.tar.gz
spark-0345954530a445b275595962c9f949cad55a01f6.tar.bz2
spark-0345954530a445b275595962c9f949cad55a01f6.zip
SPARK-738: Spark should detect and squash nonserializable exceptions
-rw-r--r--core/src/main/scala/spark/executor/Executor.scala16
-rw-r--r--core/src/test/scala/spark/DistributedSuite.scala21
2 files changed, 35 insertions, 2 deletions
diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala
index 344face5e6..f9061b1c71 100644
--- a/core/src/main/scala/spark/executor/Executor.scala
+++ b/core/src/main/scala/spark/executor/Executor.scala
@@ -1,6 +1,6 @@
package spark.executor
-import java.io.{File, FileOutputStream}
+import java.io.{NotSerializableException, File, FileOutputStream}
import java.net.{URI, URL, URLClassLoader}
import java.util.concurrent._
@@ -123,7 +123,19 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
case t: Throwable => {
val reason = ExceptionFailure(t)
- context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
+ val serReason =
+ try {
+ ser.serialize(reason)
+ }
+ catch {
+ case e: NotSerializableException => {
+ val message = "Spark caught unserializable exn: " + t.toString
+ val throwable = new Exception(message)
+ throwable.setStackTrace(t.getStackTrace)
+ ser.serialize(new ExceptionFailure(throwable))
+ }
+ }
+ context.statusUpdate(taskId, TaskState.FAILED, serReason)
// TODO: Should we exit the whole executor here? On the one hand, the failed task may
// have left some weird state around depending on when the exception was thrown, but on
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala
index 4df3bb5b67..8ab0f2cfa2 100644
--- a/core/src/test/scala/spark/DistributedSuite.scala
+++ b/core/src/test/scala/spark/DistributedSuite.scala
@@ -18,6 +18,9 @@ import scala.collection.mutable.ArrayBuffer
import SparkContext._
import storage.{GetBlock, BlockManagerWorker, StorageLevel}
+class NotSerializableClass
+class NotSerializableExn(val notSer: NotSerializableClass) extends Throwable() {}
+
class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter with LocalSparkContext {
val clusterUrl = "local-cluster[2,1,512]"
@@ -27,6 +30,24 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
System.clearProperty("spark.storage.memoryFraction")
}
+ test("task throws not serializable exception") {
+ // Ensures that executors do not crash when an exn is not serializable. If executors crash,
+ // this test will hang. Correct behavior is that executors don't crash but fail tasks
+ // and the scheduler throws a SparkException.
+
+ // numSlaves must be less than numPartitions
+ val numSlaves = 3
+ val numPartitions = 10
+
+ sc = new SparkContext("local-cluster[%s,1,512]".format(numSlaves), "test")
+ val data = sc.parallelize(1 to 100, numPartitions).map(x => (x, x)).
+ map(x => throw new NotSerializableExn(new NotSerializableClass))
+ intercept[SparkException] {
+ data.count()
+ }
+ resetSparkContext()
+ }
+
test("local-cluster format") {
sc = new SparkContext("local-cluster[2,1,512]", "test")
assert(sc.parallelize(1 to 2, 2).count() == 2)