From 38d4b97c6d47df4e1f1a3279ff786509f60e0eaf Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Mon, 13 May 2013 16:50:43 -0700 Subject: use threads classloader when deserializing task results; classnotfoundexception includes classloader --- .../main/scala/spark/scheduler/cluster/TaskSetManager.scala | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala index c69f3bdb7f..b348092d89 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala @@ -503,9 +503,16 @@ private[spark] class TaskSetManager(sched: ClusterScheduler, val taskSet: TaskSe logInfo("Finished TID %s in %d ms (progress: %d/%d)".format( tid, info.duration, tasksFinished, numTasks)) // Deserialize task result and pass it to the scheduler - val result = ser.deserialize[TaskResult[_]](serializedData, getClass.getClassLoader) - result.metrics.resultSize = serializedData.limit() - sched.listener.taskEnded(tasks(index), Success, result.value, result.accumUpdates, info, result.metrics) + try { + val result = ser.deserialize[TaskResult[_]](serializedData) + result.metrics.resultSize = serializedData.limit() + sched.listener.taskEnded(tasks(index), Success, result.value, result.accumUpdates, info, result.metrics) + } catch { + case cnf: ClassNotFoundException => + val loader = Thread.currentThread().getContextClassLoader + throw new SparkException("ClassNotFound with classloader: " + loader, cnf) + case ex => throw ex + } // Mark finished and stop if we've finished all the tasks finished(index) = true if (tasksFinished == numTasks) { -- cgit v1.2.3