aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-04-24 16:18:39 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-04-24 16:18:39 -0700
commit32a4f4623cb354ee20fb0e7f76214ef02560744a (patch)
treefd4d49dadc40aefaacd0515907bb33b20ebc829b
parent0b70dae2daada0b8170869d6338fd9d3839914db (diff)
parent761ea65a98f448f158e17d7c03d1566fc06fb265 (diff)
downloadspark-32a4f4623cb354ee20fb0e7f76214ef02560744a.tar.gz
spark-32a4f4623cb354ee20fb0e7f76214ef02560744a.tar.bz2
spark-32a4f4623cb354ee20fb0e7f76214ef02560744a.zip
Merge pull request #129 from mesos/rxin
Force serialize/deserialize task results in local execution mode.
-rw-r--r--core/src/main/scala/spark/LocalScheduler.scala8
-rw-r--r--core/src/test/scala/spark/FailureSuite.scala16
2 files changed, 23 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/LocalScheduler.scala b/core/src/main/scala/spark/LocalScheduler.scala
index fe67eb871a..3910c7b09e 100644
--- a/core/src/main/scala/spark/LocalScheduler.scala
+++ b/core/src/main/scala/spark/LocalScheduler.scala
@@ -46,9 +46,15 @@ private class LocalScheduler(threads: Int, maxFailures: Int) extends DAGSchedule
idInJob, bytes.size, timeTaken))
val deserializedTask = ser.deserialize[Task[_]](bytes, currentThread.getContextClassLoader)
val result: Any = deserializedTask.run(attemptId)
+
+ // Serialize and deserialize the result to emulate what the mesos
+ // executor does. This is useful to catch serialization errors early
+ // on in development (so when users move their local Spark programs
+ // to the cluster, they don't get surprised by serialization errors).
+ val resultToReturn = ser.deserialize[Any](ser.serialize(result))
val accumUpdates = Accumulators.values
logInfo("Finished task " + idInJob)
- taskEnded(task, Success, result, accumUpdates)
+ taskEnded(task, Success, resultToReturn, accumUpdates)
} catch {
case t: Throwable => {
logError("Exception in task " + idInJob, t)
diff --git a/core/src/test/scala/spark/FailureSuite.scala b/core/src/test/scala/spark/FailureSuite.scala
index ab21f6a6f0..75df4bee09 100644
--- a/core/src/test/scala/spark/FailureSuite.scala
+++ b/core/src/test/scala/spark/FailureSuite.scala
@@ -65,5 +65,21 @@ class FailureSuite extends FunSuite {
FailureSuiteState.clear()
}
+ test("failure because task results are not serializable") {
+ val sc = new SparkContext("local[1,1]", "test")
+ val results = sc.makeRDD(1 to 3).map(x => new NonSerializable)
+
+ val thrown = intercept[spark.SparkException] {
+ results.collect()
+ }
+ assert(thrown.getClass === classOf[spark.SparkException])
+ assert(thrown.getMessage.contains("NotSerializableException"))
+
+ sc.stop()
+ FailureSuiteState.clear()
+ }
+
// TODO: Need to add tests with shuffle fetch failures.
}
+
+