diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-04-24 16:18:39 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-04-24 16:18:39 -0700 |
commit | 32a4f4623cb354ee20fb0e7f76214ef02560744a (patch) | |
tree | fd4d49dadc40aefaacd0515907bb33b20ebc829b | |
parent | 0b70dae2daada0b8170869d6338fd9d3839914db (diff) | |
parent | 761ea65a98f448f158e17d7c03d1566fc06fb265 (diff) | |
download | spark-32a4f4623cb354ee20fb0e7f76214ef02560744a.tar.gz spark-32a4f4623cb354ee20fb0e7f76214ef02560744a.tar.bz2 spark-32a4f4623cb354ee20fb0e7f76214ef02560744a.zip |
Merge pull request #129 from mesos/rxin
Force serialize/deserialize task results in local execution mode.
-rw-r--r-- | core/src/main/scala/spark/LocalScheduler.scala | 8 | ||||
-rw-r--r-- | core/src/test/scala/spark/FailureSuite.scala | 16 |
2 files changed, 23 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/LocalScheduler.scala b/core/src/main/scala/spark/LocalScheduler.scala index fe67eb871a..3910c7b09e 100644 --- a/core/src/main/scala/spark/LocalScheduler.scala +++ b/core/src/main/scala/spark/LocalScheduler.scala @@ -46,9 +46,15 @@ private class LocalScheduler(threads: Int, maxFailures: Int) extends DAGSchedule idInJob, bytes.size, timeTaken)) val deserializedTask = ser.deserialize[Task[_]](bytes, currentThread.getContextClassLoader) val result: Any = deserializedTask.run(attemptId) + + // Serialize and deserialize the result to emulate what the mesos + // executor does. This is useful to catch serialization errors early + // on in development (so when users move their local Spark programs + // to the cluster, they don't get surprised by serialization errors). + val resultToReturn = ser.deserialize[Any](ser.serialize(result)) val accumUpdates = Accumulators.values logInfo("Finished task " + idInJob) - taskEnded(task, Success, result, accumUpdates) + taskEnded(task, Success, resultToReturn, accumUpdates) } catch { case t: Throwable => { logError("Exception in task " + idInJob, t) diff --git a/core/src/test/scala/spark/FailureSuite.scala b/core/src/test/scala/spark/FailureSuite.scala index ab21f6a6f0..75df4bee09 100644 --- a/core/src/test/scala/spark/FailureSuite.scala +++ b/core/src/test/scala/spark/FailureSuite.scala @@ -65,5 +65,21 @@ class FailureSuite extends FunSuite { FailureSuiteState.clear() } + test("failure because task results are not serializable") { + val sc = new SparkContext("local[1,1]", "test") + val results = sc.makeRDD(1 to 3).map(x => new NonSerializable) + + val thrown = intercept[spark.SparkException] { + results.collect() + } + assert(thrown.getClass === classOf[spark.SparkException]) + assert(thrown.getMessage.contains("NotSerializableException")) + + sc.stop() + FailureSuiteState.clear() + } + // TODO: Need to add tests with shuffle fetch failures. } + + |