aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala154
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala53
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala67
3 files changed, 19 insertions, 255 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala
deleted file mode 100644
index 62df9657a6..0000000000
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskAssigner.scala
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler
-
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.PriorityQueue
-import scala.util.Random
-
-import org.apache.spark.SparkConf
-
-case class OfferState(workOffer: WorkerOffer, var cores: Int) {
- // Build a list of tasks to assign to each worker.
- val tasks = new ArrayBuffer[TaskDescription](cores)
-}
-
-abstract class TaskAssigner(conf: SparkConf) {
- var offer: Seq[OfferState] = _
- val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
-
- // The final assigned offer returned to TaskScheduler.
- def tasks(): Seq[ArrayBuffer[TaskDescription]] = offer.map(_.tasks)
-
- // construct the assigner by the workoffer.
- def construct(workOffer: Seq[WorkerOffer]): Unit = {
- offer = workOffer.map(o => OfferState(o, o.cores))
- }
-
- // Invoked in each round of Taskset assignment to initialize the internal structure.
- def init(): Unit
-
- // Indicating whether there is offer available to be used by one round of Taskset assignment.
- def hasNext(): Boolean
-
- // Next available offer returned to one round of Taskset assignment.
- def getNext(): OfferState
-
- // Called by the TaskScheduler to indicate whether the current offer is accepted
- // In order to decide whether the current is valid for the next offering.
- def taskAssigned(assigned: Boolean): Unit
-
- // Release internally maintained resources. Subclass is responsible to
- // release its own private resources.
- def reset: Unit = {
- offer = null
- }
-}
-
-class RoundRobinAssigner(conf: SparkConf) extends TaskAssigner(conf) {
- var i = 0
- override def construct(workOffer: Seq[WorkerOffer]): Unit = {
- offer = Random.shuffle(workOffer.map(o => OfferState(o, o.cores)))
- }
- override def init(): Unit = {
- i = 0
- }
- override def hasNext: Boolean = {
- i < offer.size
- }
- override def getNext(): OfferState = {
- offer(i)
- }
- override def taskAssigned(assigned: Boolean): Unit = {
- i += 1
- }
- override def reset: Unit = {
- super.reset
- i = 0
- }
-}
-
-class BalancedAssigner(conf: SparkConf) extends TaskAssigner(conf) {
- var maxHeap: PriorityQueue[OfferState] = _
- var current: OfferState = _
-
- override def construct(workOffer: Seq[WorkerOffer]): Unit = {
- offer = Random.shuffle(workOffer.map(o => OfferState(o, o.cores)))
- }
- implicit val ord: Ordering[OfferState] = new Ordering[OfferState] {
- def compare(x: OfferState, y: OfferState): Int = {
- return Ordering[Int].compare(x.cores, y.cores)
- }
- }
- def init(): Unit = {
- maxHeap = new PriorityQueue[OfferState]()
- offer.filter(_.cores >= CPUS_PER_TASK).foreach(maxHeap.enqueue(_))
- }
- override def hasNext: Boolean = {
- maxHeap.size > 0
- }
- override def getNext(): OfferState = {
- current = maxHeap.dequeue()
- current
- }
-
- override def taskAssigned(assigned: Boolean): Unit = {
- if (current.cores >= CPUS_PER_TASK && assigned) {
- maxHeap.enqueue(current)
- }
- }
- override def reset: Unit = {
- super.reset
- maxHeap = null
- current = null
- }
-}
-
-class PackedAssigner(conf: SparkConf) extends TaskAssigner(conf) {
-
- var sorted: Seq[OfferState] = _
- var i = 0
- var current: OfferState = _
-
- override def init(): Unit = {
- i = 0
- sorted = offer.filter(_.cores >= CPUS_PER_TASK).sortBy(_.cores)
- }
-
- override def hasNext: Boolean = {
- i < sorted.size
- }
-
- override def getNext(): OfferState = {
- current = sorted(i)
- current
- }
-
- def taskAssigned(assigned: Boolean): Unit = {
- if (current.cores < CPUS_PER_TASK || !assigned) {
- i += 1
- }
- }
-
- override def reset: Unit = {
- super.reset
- sorted = null
- current = null
- i = 0
- }
-}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index fb732ea8e5..3e3f1ad031 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -22,7 +22,9 @@ import java.util.{Timer, TimerTask}
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
+import scala.collection.Set
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.util.Random
import org.apache.spark._
import org.apache.spark.TaskState.TaskState
@@ -59,21 +61,6 @@ private[spark] class TaskSchedulerImpl(
val conf = sc.conf
- val DEFAULT_TASK_ASSIGNER = classOf[RoundRobinAssigner].getName
- lazy val taskAssigner: TaskAssigner = {
- val className = conf.get("spark.task.assigner", DEFAULT_TASK_ASSIGNER)
- try {
- logInfo(s"""constructing assigner as $className""")
- val ctor = Utils.classForName(className).getConstructor(classOf[SparkConf])
- ctor.newInstance(conf).asInstanceOf[TaskAssigner]
- } catch {
- case _: Throwable =>
- logWarning(
- s"""$className cannot be constructed fallback to default
- | $DEFAULT_TASK_ASSIGNER""".stripMargin)
- new RoundRobinAssigner(conf)
- }
- }
// How often to check for speculative tasks
val SPECULATION_INTERVAL_MS = conf.getTimeAsMs("spark.speculation.interval", "100ms")
@@ -263,26 +250,24 @@ private[spark] class TaskSchedulerImpl(
private def resourceOfferSingleTaskSet(
taskSet: TaskSetManager,
maxLocality: TaskLocality,
- taskAssigner: TaskAssigner) : Boolean = {
+ shuffledOffers: Seq[WorkerOffer],
+ availableCpus: Array[Int],
+ tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
var launchedTask = false
- taskAssigner.init()
- while(taskAssigner.hasNext()) {
- var assigned = false
- val current = taskAssigner.getNext()
- val execId = current.workOffer.executorId
- val host = current.workOffer.host
- if (current.cores >= CPUS_PER_TASK) {
+ for (i <- 0 until shuffledOffers.size) {
+ val execId = shuffledOffers(i).executorId
+ val host = shuffledOffers(i).host
+ if (availableCpus(i) >= CPUS_PER_TASK) {
try {
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
- current.tasks += task
+ tasks(i) += task
val tid = task.taskId
taskIdToTaskSetManager(tid) = taskSet
taskIdToExecutorId(tid) = execId
executorIdToTaskCount(execId) += 1
- current.cores = current.cores - CPUS_PER_TASK
- assert(current.cores >= 0)
+ availableCpus(i) -= CPUS_PER_TASK
+ assert(availableCpus(i) >= 0)
launchedTask = true
- assigned = true
}
} catch {
case e: TaskNotSerializableException =>
@@ -292,10 +277,8 @@ private[spark] class TaskSchedulerImpl(
return launchedTask
}
}
- taskAssigner.taskAssigned(assigned)
}
return launchedTask
-
}
/**
@@ -322,8 +305,12 @@ private[spark] class TaskSchedulerImpl(
hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
}
}
- taskAssigner.construct(offers)
+ // Randomly shuffle offers to avoid always placing tasks on the same set of workers.
+ val shuffledOffers = Random.shuffle(offers)
+ // Build a list of tasks to assign to each worker.
+ val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
+ val availableCpus = shuffledOffers.map(o => o.cores).toArray
val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
@@ -342,7 +329,7 @@ private[spark] class TaskSchedulerImpl(
for (currentMaxLocality <- taskSet.myLocalityLevels) {
do {
launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
- taskSet, currentMaxLocality, taskAssigner)
+ taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
launchedAnyTask |= launchedTaskAtCurrentMaxLocality
} while (launchedTaskAtCurrentMaxLocality)
}
@@ -350,12 +337,10 @@ private[spark] class TaskSchedulerImpl(
taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
}
}
- val tasks = taskAssigner.tasks
- taskAssigner.reset
+
if (tasks.size > 0) {
hasLaunchedTask = true
}
-
return tasks
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
index 2584f85bc5..f5f1947661 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala
@@ -109,72 +109,6 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
assert(!failedTaskSet)
}
- test("Scheduler balance the assignment to the worker with more free cores") {
- val taskScheduler = setupScheduler(("spark.task.assigner", classOf[BalancedAssigner].getName))
- val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 2),
- new WorkerOffer("executor1", "host1", 4))
- val selectedExecutorIds = {
- val taskSet = FakeTask.createTaskSet(2)
- taskScheduler.submitTasks(taskSet)
- val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
- assert(2 === taskDescriptions.length)
- taskDescriptions.map(_.executorId)
- }
- val count = selectedExecutorIds.count(_ == workerOffers(1).executorId)
- assert(count == 2)
- assert(!failedTaskSet)
- }
-
- test("Scheduler balance the assignment across workers with same free cores") {
- val taskScheduler = setupScheduler(("spark.task.assigner", classOf[BalancedAssigner].getName))
- val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 2),
- new WorkerOffer("executor1", "host1", 2))
- val selectedExecutorIds = {
- val taskSet = FakeTask.createTaskSet(2)
- taskScheduler.submitTasks(taskSet)
- val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
- assert(2 === taskDescriptions.length)
- taskDescriptions.map(_.executorId)
- }
- val count = selectedExecutorIds.count(_ == workerOffers(1).executorId)
- assert(count == 1)
- assert(!failedTaskSet)
- }
-
- test("Scheduler packs the assignment to workers with less free cores") {
- val taskScheduler = setupScheduler(("spark.task.assigner", classOf[PackedAssigner].getName))
- val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 2),
- new WorkerOffer("executor1", "host1", 4))
- val selectedExecutorIds = {
- val taskSet = FakeTask.createTaskSet(2)
- taskScheduler.submitTasks(taskSet)
- val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
- assert(2 === taskDescriptions.length)
- taskDescriptions.map(_.executorId)
- }
- val count = selectedExecutorIds.count(_ == workerOffers(0).executorId)
- assert(count == 2)
- assert(!failedTaskSet)
- }
-
- test("Scheduler keeps packing the assignment to the same worker") {
- val taskScheduler = setupScheduler(("spark.task.assigner", classOf[PackedAssigner].getName))
- val workerOffers = IndexedSeq(new WorkerOffer("executor0", "host0", 4),
- new WorkerOffer("executor1", "host1", 4))
- val selectedExecutorIds = {
- val taskSet = FakeTask.createTaskSet(4)
- taskScheduler.submitTasks(taskSet)
- val taskDescriptions = taskScheduler.resourceOffers(workerOffers).flatten
- assert(4 === taskDescriptions.length)
- taskDescriptions.map(_.executorId)
- }
-
- val count = selectedExecutorIds.count(_ == workerOffers(0).executorId)
- assert(count == 4)
- assert(!failedTaskSet)
- }
-
-
test("Scheduler correctly accounts for multiple CPUs per task") {
val taskCpus = 2
val taskScheduler = setupScheduler("spark.task.cpus" -> taskCpus.toString)
@@ -474,5 +408,4 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
assert(thirdTaskDescs.size === 0)
assert(taskScheduler.getExecutorsAliveOnHost("host1") === Some(Set("executor1", "executor3")))
}
-
}