diff options
author | Timothy Chen <tnachen@gmail.com> | 2015-08-19 19:43:26 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-08-19 19:43:26 -0700 |
commit | 73431d8afb41b93888d2642a1ce2d011f03fb740 (patch) | |
tree | 3d8d734dee4d9df43624e0af2fca6b6acd0f2a4f | |
parent | affc8a887ede9fdc2ca6051833954cd10918c869 (diff) | |
download | spark-73431d8afb41b93888d2642a1ce2d011f03fb740.tar.gz spark-73431d8afb41b93888d2642a1ce2d011f03fb740.tar.bz2 spark-73431d8afb41b93888d2642a1ce2d011f03fb740.zip |
[SPARK-10124] [MESOS] Fix removing queued driver in mesos cluster mode.
Currently the spark applications can be queued to the Mesos cluster dispatcher, but when multiple jobs are in queue we don't handle removing jobs from the buffer correctly while iterating and causes null pointer exception.
This patch copies the buffer before iterating them, so exceptions aren't thrown when the jobs are removed.
Author: Timothy Chen <tnachen@gmail.com>
Closes #8322 from tnachen/fix_cluster_mode.
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala | 19 |
1 files changed, 11 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 64ec2b8e3d..1206f184fb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -507,14 +507,16 @@ private[spark] class MesosClusterScheduler( val driversToRetry = pendingRetryDrivers.filter { d => d.retryState.get.nextRetry.before(currentTime) } + scheduleTasks( - driversToRetry, + copyBuffer(driversToRetry), removeFromPendingRetryDrivers, currentOffers, tasks) + // Then we walk through the queued drivers and try to schedule them. scheduleTasks( - queuedDrivers, + copyBuffer(queuedDrivers), removeFromQueuedDrivers, currentOffers, tasks) @@ -527,13 +529,14 @@ private[spark] class MesosClusterScheduler( .foreach(o => driver.declineOffer(o.getId)) } + private def copyBuffer( + buffer: ArrayBuffer[MesosDriverDescription]): ArrayBuffer[MesosDriverDescription] = { + val newBuffer = new ArrayBuffer[MesosDriverDescription](buffer.size) + buffer.copyToBuffer(newBuffer) + newBuffer + } + def getSchedulerState(): MesosClusterSchedulerState = { - def copyBuffer( - buffer: ArrayBuffer[MesosDriverDescription]): ArrayBuffer[MesosDriverDescription] = { - val newBuffer = new ArrayBuffer[MesosDriverDescription](buffer.size) - buffer.copyToBuffer(newBuffer) - newBuffer - } stateLock.synchronized { new MesosClusterSchedulerState( frameworkId, |