diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-05-28 23:35:24 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-05-28 23:35:24 -0700 |
commit | 41d230ccb02ea3d731f56bf6ced3486cf13260f0 (patch) | |
tree | dd4765a5180b08a1866d8c72d3b9253202ac1f8a | |
parent | 3db1e17baa11fa37b0c7f04d7213a30df66d1611 (diff) | |
parent | 38d4b97c6d47df4e1f1a3279ff786509f60e0eaf (diff) | |
download | spark-41d230ccb02ea3d731f56bf6ced3486cf13260f0.tar.gz spark-41d230ccb02ea3d731f56bf6ced3486cf13260f0.tar.bz2 spark-41d230ccb02ea3d731f56bf6ced3486cf13260f0.zip |
Merge pull request #611 from squito/classloader
Use default classloaders for akka & deserializing task results
-rw-r--r-- | core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala | 13 | ||||
-rw-r--r-- | core/src/main/scala/spark/util/AkkaUtils.scala | 2 |
2 files changed, 11 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala index 18d105e0a4..f1c6266bac 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala @@ -516,9 +516,16 @@ private[spark] class TaskSetManager( logInfo("Finished TID %s in %d ms (progress: %d/%d)".format( tid, info.duration, tasksFinished, numTasks)) // Deserialize task result and pass it to the scheduler - val result = ser.deserialize[TaskResult[_]](serializedData, getClass.getClassLoader) - result.metrics.resultSize = serializedData.limit() - sched.listener.taskEnded(tasks(index), Success, result.value, result.accumUpdates, info, result.metrics) + try { + val result = ser.deserialize[TaskResult[_]](serializedData) + result.metrics.resultSize = serializedData.limit() + sched.listener.taskEnded(tasks(index), Success, result.value, result.accumUpdates, info, result.metrics) + } catch { + case cnf: ClassNotFoundException => + val loader = Thread.currentThread().getContextClassLoader + throw new SparkException("ClassNotFound with classloader: " + loader, cnf) + case ex => throw ex + } // Mark finished and stop if we've finished all the tasks finished(index) = true if (tasksFinished == numTasks) { diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala index 9fb7e001ba..cd79bd2bda 100644 --- a/core/src/main/scala/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/spark/util/AkkaUtils.scala @@ -52,7 +52,7 @@ private[spark] object AkkaUtils { """.format(host, port, akkaTimeout, akkaFrameSize, akkaThreads, akkaBatchSize, lifecycleEvents, akkaWriteTimeout)) - val actorSystem = ActorSystem(name, akkaConf, getClass.getClassLoader) + val actorSystem = ActorSystem(name, akkaConf) // Figure out the port number we bound to, in case port was passed as 0. This is a bit of a // hack because Akka doesn't let you figure out the port through the public API yet. |