aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-05-28 23:35:24 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-05-28 23:35:24 -0700
commit41d230ccb02ea3d731f56bf6ced3486cf13260f0 (patch)
treedd4765a5180b08a1866d8c72d3b9253202ac1f8a
parent3db1e17baa11fa37b0c7f04d7213a30df66d1611 (diff)
parent38d4b97c6d47df4e1f1a3279ff786509f60e0eaf (diff)
downloadspark-41d230ccb02ea3d731f56bf6ced3486cf13260f0.tar.gz
spark-41d230ccb02ea3d731f56bf6ced3486cf13260f0.tar.bz2
spark-41d230ccb02ea3d731f56bf6ced3486cf13260f0.zip
Merge pull request #611 from squito/classloader
Use default classloaders for akka & deserializing task results
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala13
-rw-r--r--core/src/main/scala/spark/util/AkkaUtils.scala2
2 files changed, 11 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index 18d105e0a4..f1c6266bac 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -516,9 +516,16 @@ private[spark] class TaskSetManager(
logInfo("Finished TID %s in %d ms (progress: %d/%d)".format(
tid, info.duration, tasksFinished, numTasks))
// Deserialize task result and pass it to the scheduler
- val result = ser.deserialize[TaskResult[_]](serializedData, getClass.getClassLoader)
- result.metrics.resultSize = serializedData.limit()
- sched.listener.taskEnded(tasks(index), Success, result.value, result.accumUpdates, info, result.metrics)
+ try {
+ val result = ser.deserialize[TaskResult[_]](serializedData)
+ result.metrics.resultSize = serializedData.limit()
+ sched.listener.taskEnded(tasks(index), Success, result.value, result.accumUpdates, info, result.metrics)
+ } catch {
+ case cnf: ClassNotFoundException =>
+ val loader = Thread.currentThread().getContextClassLoader
+ throw new SparkException("ClassNotFound with classloader: " + loader, cnf)
+ case ex => throw ex
+ }
// Mark finished and stop if we've finished all the tasks
finished(index) = true
if (tasksFinished == numTasks) {
diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala
index 9fb7e001ba..cd79bd2bda 100644
--- a/core/src/main/scala/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/spark/util/AkkaUtils.scala
@@ -52,7 +52,7 @@ private[spark] object AkkaUtils {
""".format(host, port, akkaTimeout, akkaFrameSize, akkaThreads, akkaBatchSize,
lifecycleEvents, akkaWriteTimeout))
- val actorSystem = ActorSystem(name, akkaConf, getClass.getClassLoader)
+ val actorSystem = ActorSystem(name, akkaConf)
// Figure out the port number we bound to, in case port was passed as 0. This is a bit of a
// hack because Akka doesn't let you figure out the port through the public API yet.