aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorImran Rashid <imran@quantifind.com>2013-05-13 16:50:43 -0700
committerImran Rashid <imran@quantifind.com>2013-05-14 22:32:14 -0700
commit38d4b97c6d47df4e1f1a3279ff786509f60e0eaf (patch)
tree05c1b6905d5072fa75989d934ae88c444a2431ca /core
parentd7d1da79d30961e461115a73bbfc9e4c4448e533 (diff)
downloadspark-38d4b97c6d47df4e1f1a3279ff786509f60e0eaf.tar.gz
spark-38d4b97c6d47df4e1f1a3279ff786509f60e0eaf.tar.bz2
spark-38d4b97c6d47df4e1f1a3279ff786509f60e0eaf.zip
use threads classloader when deserializing task results; classnotfoundexception includes classloader
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala13
1 files changed, 10 insertions, 3 deletions
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index c69f3bdb7f..b348092d89 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -503,9 +503,16 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe
logInfo("Finished TID %s in %d ms (progress: %d/%d)".format(
tid, info.duration, tasksFinished, numTasks))
// Deserialize task result and pass it to the scheduler
- val result = ser.deserialize[TaskResult[_]](serializedData, getClass.getClassLoader)
- result.metrics.resultSize = serializedData.limit()
- sched.listener.taskEnded(tasks(index), Success, result.value, result.accumUpdates, info, result.metrics)
+ try {
+ val result = ser.deserialize[TaskResult[_]](serializedData)
+ result.metrics.resultSize = serializedData.limit()
+ sched.listener.taskEnded(tasks(index), Success, result.value, result.accumUpdates, info, result.metrics)
+ } catch {
+ case cnf: ClassNotFoundException =>
+ val loader = Thread.currentThread().getContextClassLoader
+ throw new SparkException("ClassNotFound with classloader: " + loader, cnf)
+ case ex => throw ex
+ }
// Mark finished and stop if we've finished all the tasks
finished(index) = true
if (tasksFinished == numTasks) {