diff options
author | Charles Reiss <woggle@apache.org> | 2013-07-08 12:25:46 -0700 |
---|---|---|
committer | Charles Reiss <woggle@apache.org> | 2013-07-08 12:25:46 -0700 |
commit | 8c1d1c98e0cea8d2b20ac10b84aa76d8e22a1661 (patch) | |
tree | b225860f21b4154f6c2aa16fcffc3836ad15e18a | |
parent | 744da8eefda3ae66f3471a12cc02b29cf5441dbc (diff) | |
download | spark-8c1d1c98e0cea8d2b20ac10b84aa76d8e22a1661.tar.gz spark-8c1d1c98e0cea8d2b20ac10b84aa76d8e22a1661.tar.bz2 spark-8c1d1c98e0cea8d2b20ac10b84aa76d8e22a1661.zip |
Explicitly set class loader for MesosSchedulerDriver callbacks.
-rw-r--r-- | core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala | 13 |
1 files changed, 13 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala index ca7fab4cc5..e73b780fcb 100644 --- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala @@ -43,8 +43,12 @@ private[spark] class MesosSchedulerBackend( // An ExecutorInfo for our tasks var execArgs: Array[Byte] = null + var classLoader: ClassLoader = null + override def start() { synchronized { + classLoader = Thread.currentThread.getContextClassLoader + new Thread("MesosSchedulerBackend driver") { setDaemon(true) override def run() { @@ -114,9 +118,16 @@ private[spark] class MesosSchedulerBackend( return execArgs } + private def setClassLoader() { + // Since native code starts the thread our callbacks run in, it may not correctly + // inherit and custom class loaders. Therefore, set the class loader manually. + Thread.currentThread.setContextClassLoader(classLoader) + } + override def offerRescinded(d: SchedulerDriver, o: OfferID) {} override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { + setClassLoader() logInfo("Registered as framework ID " + frameworkId.getValue) registeredLock.synchronized { isRegistered = true @@ -142,6 +153,7 @@ private[spark] class MesosSchedulerBackend( * tasks are balanced across the cluster. */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { + setClassLoader() synchronized { // Build a big list of the offerable workers, and remember their indices so that we can // figure out which Offer to reply to for each worker @@ -224,6 +236,7 @@ private[spark] class MesosSchedulerBackend( } override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { + setClassLoader() val tid = status.getTaskId.getValue.toLong val state = TaskState.fromMesos(status.getState) synchronized { |