aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala19
1 files changed, 11 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 64ec2b8e3d..1206f184fb 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -507,14 +507,16 @@ private[spark] class MesosClusterScheduler(
val driversToRetry = pendingRetryDrivers.filter { d =>
d.retryState.get.nextRetry.before(currentTime)
}
+
scheduleTasks(
- driversToRetry,
+ copyBuffer(driversToRetry),
removeFromPendingRetryDrivers,
currentOffers,
tasks)
+
// Then we walk through the queued drivers and try to schedule them.
scheduleTasks(
- queuedDrivers,
+ copyBuffer(queuedDrivers),
removeFromQueuedDrivers,
currentOffers,
tasks)
@@ -527,13 +529,14 @@ private[spark] class MesosClusterScheduler(
.foreach(o => driver.declineOffer(o.getId))
}
+ private def copyBuffer(
+ buffer: ArrayBuffer[MesosDriverDescription]): ArrayBuffer[MesosDriverDescription] = {
+ val newBuffer = new ArrayBuffer[MesosDriverDescription](buffer.size)
+ buffer.copyToBuffer(newBuffer)
+ newBuffer
+ }
+
def getSchedulerState(): MesosClusterSchedulerState = {
- def copyBuffer(
- buffer: ArrayBuffer[MesosDriverDescription]): ArrayBuffer[MesosDriverDescription] = {
- val newBuffer = new ArrayBuffer[MesosDriverDescription](buffer.size)
- buffer.copyToBuffer(newBuffer)
- newBuffer
- }
stateLock.synchronized {
new MesosClusterSchedulerState(
frameworkId,