aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorTimothy Chen <tnachen@gmail.com>2015-08-19 19:43:26 -0700
committerAndrew Or <andrew@databricks.com>2015-08-19 19:43:26 -0700
commit73431d8afb41b93888d2642a1ce2d011f03fb740 (patch)
tree3d8d734dee4d9df43624e0af2fca6b6acd0f2a4f /core
parentaffc8a887ede9fdc2ca6051833954cd10918c869 (diff)
downloadspark-73431d8afb41b93888d2642a1ce2d011f03fb740.tar.gz
spark-73431d8afb41b93888d2642a1ce2d011f03fb740.tar.bz2
spark-73431d8afb41b93888d2642a1ce2d011f03fb740.zip
[SPARK-10124] [MESOS] Fix removing queued driver in mesos cluster mode.
Currently the spark applications can be queued to the Mesos cluster dispatcher, but when multiple jobs are in queue we don't handle removing jobs from the buffer correctly while iterating and causes null pointer exception. This patch copies the buffer before iterating them, so exceptions aren't thrown when the jobs are removed. Author: Timothy Chen <tnachen@gmail.com> Closes #8322 from tnachen/fix_cluster_mode.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala19
1 files changed, 11 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 64ec2b8e3d..1206f184fb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -507,14 +507,16 @@ private[spark] class MesosClusterScheduler(
val driversToRetry = pendingRetryDrivers.filter { d =>
d.retryState.get.nextRetry.before(currentTime)
}
+
scheduleTasks(
- driversToRetry,
+ copyBuffer(driversToRetry),
removeFromPendingRetryDrivers,
currentOffers,
tasks)
+
// Then we walk through the queued drivers and try to schedule them.
scheduleTasks(
- queuedDrivers,
+ copyBuffer(queuedDrivers),
removeFromQueuedDrivers,
currentOffers,
tasks)
@@ -527,13 +529,14 @@ private[spark] class MesosClusterScheduler(
.foreach(o => driver.declineOffer(o.getId))
}
+ private def copyBuffer(
+ buffer: ArrayBuffer[MesosDriverDescription]): ArrayBuffer[MesosDriverDescription] = {
+ val newBuffer = new ArrayBuffer[MesosDriverDescription](buffer.size)
+ buffer.copyToBuffer(newBuffer)
+ newBuffer
+ }
+
def getSchedulerState(): MesosClusterSchedulerState = {
- def copyBuffer(
- buffer: ArrayBuffer[MesosDriverDescription]): ArrayBuffer[MesosDriverDescription] = {
- val newBuffer = new ArrayBuffer[MesosDriverDescription](buffer.size)
- buffer.copyToBuffer(newBuffer)
- newBuffer
- }
stateLock.synchronized {
new MesosClusterSchedulerState(
frameworkId,