aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorCharles Reiss <woggle@apache.org>2013-07-08 12:25:46 -0700
committerCharles Reiss <woggle@apache.org>2013-07-08 12:25:46 -0700
commit8c1d1c98e0cea8d2b20ac10b84aa76d8e22a1661 (patch)
treeb225860f21b4154f6c2aa16fcffc3836ad15e18a /core
parent744da8eefda3ae66f3471a12cc02b29cf5441dbc (diff)
downloadspark-8c1d1c98e0cea8d2b20ac10b84aa76d8e22a1661.tar.gz
spark-8c1d1c98e0cea8d2b20ac10b84aa76d8e22a1661.tar.bz2
spark-8c1d1c98e0cea8d2b20ac10b84aa76d8e22a1661.zip
Explicitly set class loader for MesosSchedulerDriver callbacks.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala13
1 files changed, 13 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
index ca7fab4cc5..e73b780fcb 100644
--- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
@@ -43,8 +43,12 @@ private[spark] class MesosSchedulerBackend(
// An ExecutorInfo for our tasks
var execArgs: Array[Byte] = null
+ var classLoader: ClassLoader = null
+
override def start() {
synchronized {
+ classLoader = Thread.currentThread.getContextClassLoader
+
new Thread("MesosSchedulerBackend driver") {
setDaemon(true)
override def run() {
@@ -114,9 +118,16 @@ private[spark] class MesosSchedulerBackend(
return execArgs
}
+ private def setClassLoader() {
+ // Since native code starts the thread our callbacks run in, it may not correctly
+ // inherit and custom class loaders. Therefore, set the class loader manually.
+ Thread.currentThread.setContextClassLoader(classLoader)
+ }
+
override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
+ setClassLoader()
logInfo("Registered as framework ID " + frameworkId.getValue)
registeredLock.synchronized {
isRegistered = true
@@ -142,6 +153,7 @@ private[spark] class MesosSchedulerBackend(
* tasks are balanced across the cluster.
*/
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
+ setClassLoader()
synchronized {
// Build a big list of the offerable workers, and remember their indices so that we can
// figure out which Offer to reply to for each worker
@@ -224,6 +236,7 @@ private[spark] class MesosSchedulerBackend(
}
override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
+ setClassLoader()
val tid = status.getTaskId.getValue.toLong
val state = TaskState.fromMesos(status.getState)
synchronized {