aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2013-07-12 15:12:46 +0530
committerPrashant Sharma <prashant.s@imaginea.com>2013-07-12 15:12:46 +0530
commita220e11a07eb56fade8d0573503b58ce659fda54 (patch)
tree3fcfddb1af499a29d778013abc1adea7c14d8f97
parente86d5dbaad9de3c04fe080b8fb96a7ebbd20c7cd (diff)
parent018d04c64e68876f4491fbccb3752e9da0a0c5d3 (diff)
downloadspark-a220e11a07eb56fade8d0573503b58ce659fda54.tar.gz
spark-a220e11a07eb56fade8d0573503b58ce659fda54.tar.bz2
spark-a220e11a07eb56fade8d0573503b58ce659fda54.zip
Merge branch 'master' of github.com:mesos/spark into scala-2.10
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala153
1 files changed, 96 insertions, 57 deletions
diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
index ca7fab4cc5..e83368b98d 100644
--- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
@@ -43,8 +43,12 @@ private[spark] class MesosSchedulerBackend(
// An ExecutorInfo for our tasks
var execArgs: Array[Byte] = null
+ var classLoader: ClassLoader = null
+
override def start() {
synchronized {
+ classLoader = Thread.currentThread.getContextClassLoader
+
new Thread("MesosSchedulerBackend driver") {
setDaemon(true)
override def run() {
@@ -114,13 +118,28 @@ private[spark] class MesosSchedulerBackend(
return execArgs
}
+ private def setClassLoader(): ClassLoader = {
+ val oldClassLoader = Thread.currentThread.getContextClassLoader
+ Thread.currentThread.setContextClassLoader(classLoader)
+ return oldClassLoader
+ }
+
+ private def restoreClassLoader(oldClassLoader: ClassLoader) {
+ Thread.currentThread.setContextClassLoader(oldClassLoader)
+ }
+
override def offerRescinded(d: SchedulerDriver, o: OfferID) {}
override def registered(d: SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
- logInfo("Registered as framework ID " + frameworkId.getValue)
- registeredLock.synchronized {
- isRegistered = true
- registeredLock.notifyAll()
+ val oldClassLoader = setClassLoader()
+ try {
+ logInfo("Registered as framework ID " + frameworkId.getValue)
+ registeredLock.synchronized {
+ isRegistered = true
+ registeredLock.notifyAll()
+ }
+ } finally {
+ restoreClassLoader(oldClassLoader)
}
}
@@ -142,49 +161,54 @@ private[spark] class MesosSchedulerBackend(
* tasks are balanced across the cluster.
*/
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
- synchronized {
- // Build a big list of the offerable workers, and remember their indices so that we can
- // figure out which Offer to reply to for each worker
- val offerableIndices = new ArrayBuffer[Int]
- val offerableWorkers = new ArrayBuffer[WorkerOffer]
-
- def enoughMemory(o: Offer) = {
- val mem = getResource(o.getResourcesList, "mem")
- val slaveId = o.getSlaveId.getValue
- mem >= executorMemory || slaveIdsWithExecutors.contains(slaveId)
- }
+ val oldClassLoader = setClassLoader()
+ try {
+ synchronized {
+ // Build a big list of the offerable workers, and remember their indices so that we can
+ // figure out which Offer to reply to for each worker
+ val offerableIndices = new ArrayBuffer[Int]
+ val offerableWorkers = new ArrayBuffer[WorkerOffer]
+
+ def enoughMemory(o: Offer) = {
+ val mem = getResource(o.getResourcesList, "mem")
+ val slaveId = o.getSlaveId.getValue
+ mem >= executorMemory || slaveIdsWithExecutors.contains(slaveId)
+ }
- for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) {
- offerableIndices += index
- offerableWorkers += new WorkerOffer(
- offer.getSlaveId.getValue,
- offer.getHostname,
- getResource(offer.getResourcesList, "cpus").toInt)
- }
+ for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) {
+ offerableIndices += index
+ offerableWorkers += new WorkerOffer(
+ offer.getSlaveId.getValue,
+ offer.getHostname,
+ getResource(offer.getResourcesList, "cpus").toInt)
+ }
- // Call into the ClusterScheduler
- val taskLists = scheduler.resourceOffers(offerableWorkers)
-
- // Build a list of Mesos tasks for each slave
- val mesosTasks = offers.map(o => Collections.emptyList[MesosTaskInfo]())
- for ((taskList, index) <- taskLists.zipWithIndex) {
- if (!taskList.isEmpty) {
- val offerNum = offerableIndices(index)
- val slaveId = offers(offerNum).getSlaveId.getValue
- slaveIdsWithExecutors += slaveId
- mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size)
- for (taskDesc <- taskList) {
- taskIdToSlaveId(taskDesc.taskId) = slaveId
- mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId))
+ // Call into the ClusterScheduler
+ val taskLists = scheduler.resourceOffers(offerableWorkers)
+
+ // Build a list of Mesos tasks for each slave
+ val mesosTasks = offers.map(o => Collections.emptyList[MesosTaskInfo]())
+ for ((taskList, index) <- taskLists.zipWithIndex) {
+ if (!taskList.isEmpty) {
+ val offerNum = offerableIndices(index)
+ val slaveId = offers(offerNum).getSlaveId.getValue
+ slaveIdsWithExecutors += slaveId
+ mesosTasks(offerNum) = new JArrayList[MesosTaskInfo](taskList.size)
+ for (taskDesc <- taskList) {
+ taskIdToSlaveId(taskDesc.taskId) = slaveId
+ mesosTasks(offerNum).add(createMesosTask(taskDesc, slaveId))
+ }
}
}
- }
- // Reply to the offers
- val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
- for (i <- 0 until offers.size) {
- d.launchTasks(offers(i).getId, mesosTasks(i), filters)
+ // Reply to the offers
+ val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
+ for (i <- 0 until offers.size) {
+ d.launchTasks(offers(i).getId, mesosTasks(i), filters)
+ }
}
+ } finally {
+ restoreClassLoader(oldClassLoader)
}
}
@@ -224,23 +248,33 @@ private[spark] class MesosSchedulerBackend(
}
override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
- val tid = status.getTaskId.getValue.toLong
- val state = TaskState.fromMesos(status.getState)
- synchronized {
- if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) {
- // We lost the executor on this slave, so remember that it's gone
- slaveIdsWithExecutors -= taskIdToSlaveId(tid)
- }
- if (isFinished(status.getState)) {
- taskIdToSlaveId.remove(tid)
+ val oldClassLoader = setClassLoader()
+ try {
+ val tid = status.getTaskId.getValue.toLong
+ val state = TaskState.fromMesos(status.getState)
+ synchronized {
+ if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) {
+ // We lost the executor on this slave, so remember that it's gone
+ slaveIdsWithExecutors -= taskIdToSlaveId(tid)
+ }
+ if (isFinished(status.getState)) {
+ taskIdToSlaveId.remove(tid)
+ }
}
+ scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer)
+ } finally {
+ restoreClassLoader(oldClassLoader)
}
- scheduler.statusUpdate(tid, state, status.getData.asReadOnlyByteBuffer)
}
override def error(d: SchedulerDriver, message: String) {
- logError("Mesos error: " + message)
- scheduler.error(message)
+ val oldClassLoader = setClassLoader()
+ try {
+ logError("Mesos error: " + message)
+ scheduler.error(message)
+ } finally {
+ restoreClassLoader(oldClassLoader)
+ }
}
override def stop() {
@@ -256,11 +290,16 @@ private[spark] class MesosSchedulerBackend(
override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {}
private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) {
- logInfo("Mesos slave lost: " + slaveId.getValue)
- synchronized {
- slaveIdsWithExecutors -= slaveId.getValue
+ val oldClassLoader = setClassLoader()
+ try {
+ logInfo("Mesos slave lost: " + slaveId.getValue)
+ synchronized {
+ slaveIdsWithExecutors -= slaveId.getValue
+ }
+ scheduler.executorLost(slaveId.getValue, reason)
+ } finally {
+ restoreClassLoader(oldClassLoader)
}
- scheduler.executorLost(slaveId.getValue, reason)
}
override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) {