aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala21
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala17
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala6
-rw-r--r--core/src/main/scala/spark/scheduler/local/LocalScheduler.scala3
-rw-r--r--core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala6
-rw-r--r--core/src/test/scala/spark/scheduler/cluster/ClusterSchedulerSuite.scala8
-rw-r--r--docs/configuration.md30
7 files changed, 68 insertions, 23 deletions
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
index 028f4d3283..e88edc5b2a 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterScheduler.scala
@@ -184,27 +184,29 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
}
- // Build a list of tasks to assign to each slave
+ // Build a list of tasks to assign to each worker
val tasks = offers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = offers.map(o => o.cores).toArray
- val sortedTaskSetQueue = rootPool.getSortedTaskSetQueue()
- for (manager <- sortedTaskSetQueue) {
+ val sortedTaskSets = rootPool.getSortedTaskSetQueue()
+ for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
- manager.parent.name, manager.name, manager.runningTasks))
+ taskSet.parent.name, taskSet.name, taskSet.runningTasks))
}
+ // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
+ // of locality levels so that it gets a chance to launch local tasks on all of them.
var launchedTask = false
- for (manager <- sortedTaskSetQueue; offer <- offers) {
+ for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {
do {
launchedTask = false
for (i <- 0 until offers.size) {
val execId = offers(i).executorId
val host = offers(i).host
- for (task <- manager.resourceOffer(execId, host, availableCpus(i))) {
+ for (task <- taskSet.resourceOffer(execId, host, availableCpus(i), maxLocality)) {
tasks(i) += task
val tid = task.taskId
- taskIdToTaskSetId(tid) = manager.taskSet.id
- taskSetTaskIds(manager.taskSet.id) += tid
+ taskIdToTaskSetId(tid) = taskSet.taskSet.id
+ taskSetTaskIds(taskSet.taskSet.id) += tid
taskIdToExecutorId(tid) = execId
activeExecutorIds += execId
executorsByHost(host) += execId
@@ -402,8 +404,7 @@ object ClusterScheduler {
// order keyList based on population of value in map
val keyList = _keyList.sortWith(
- // TODO(matei): not sure why we're using getOrElse if keyList = map.keys... see if it matters
- (left, right) => map.get(left).getOrElse(Set()).size > map.get(right).getOrElse(Set()).size
+ (left, right) => map(left).size > map(right).size
)
val retval = new ArrayBuffer[T](keyList.size * 2)
diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
index 5316a7aed1..91de25254c 100644
--- a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala
@@ -43,7 +43,7 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
extends TaskSetManager with Logging {
// CPUs to request per task
- val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toDouble
+ val CPUS_PER_TASK = System.getProperty("spark.task.cpus", "1").toInt
// Maximum times a task is allowed to fail before failing the job
val MAX_TASK_FAILURES = System.getProperty("spark.task.maxFailures", "4").toInt
@@ -325,15 +325,22 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
/**
* Respond to an offer of a single slave from the scheduler by finding a task
*/
- override def resourceOffer(execId: String, host: String, availableCpus: Double)
+ override def resourceOffer(
+ execId: String,
+ host: String,
+ availableCpus: Int,
+ maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
if (tasksFinished < numTasks && availableCpus >= CPUS_PER_TASK) {
val curTime = System.currentTimeMillis()
- val locality = getAllowedLocalityLevel(curTime)
+ var allowedLocality = getAllowedLocalityLevel(curTime)
+ if (allowedLocality > maxLocality) {
+ allowedLocality = maxLocality // We're not allowed to search for farther-away tasks
+ }
- findTask(execId, host, locality) match {
+ findTask(execId, host, allowedLocality) match {
case Some((index, taskLocality)) => {
// Found a task; do some bookkeeping and return a task description
val task = tasks(index)
@@ -347,7 +354,7 @@ private[spark] class ClusterTaskSetManager(sched: ClusterScheduler, val taskSet:
taskInfos(taskId) = info
taskAttempts(index) = info :: taskAttempts(index)
// Update our locality level for delay scheduling
- currentLocalityIndex = getLocalityIndex(locality)
+ currentLocalityIndex = getLocalityIndex(allowedLocality)
lastLaunchTime = curTime
// Serialize and return the task
val startTime = System.currentTimeMillis()
diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
index 277654edc0..5ab6ab9aad 100644
--- a/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/cluster/TaskSetManager.scala
@@ -29,7 +29,11 @@ private[spark] trait TaskSetManager extends Schedulable {
def taskSet: TaskSet
- def resourceOffer(execId: String, hostPort: String, availableCpus: Double)
+ def resourceOffer(
+ execId: String,
+ host: String,
+ availableCpus: Int,
+ maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription]
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer)
diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
index a4f5f46777..5be4dbd9f0 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala
@@ -141,8 +141,7 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc:
for (manager <- sortedTaskSetQueue) {
do {
launchTask = false
- // TODO(matei): don't pass null here?
- manager.resourceOffer(null, null, freeCpuCores) match {
+ manager.resourceOffer(null, null, freeCpuCores, null) match {
case Some(task) =>
tasks += task
taskIdToTaskSetId(task.taskId) = manager.taskSet.id
diff --git a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
index 698c777bec..3ef636ff07 100644
--- a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
+++ b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala
@@ -98,7 +98,11 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
return None
}
- override def resourceOffer(execId: String, host: String, availableCpus: Double)
+ override def resourceOffer(
+ execId: String,
+ host: String,
+ availableCpus: Int,
+ maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
SparkEnv.set(sched.env)
diff --git a/core/src/test/scala/spark/scheduler/cluster/ClusterSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/cluster/ClusterSchedulerSuite.scala
index 8618009ea6..aeeed14786 100644
--- a/core/src/test/scala/spark/scheduler/cluster/ClusterSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/cluster/ClusterSchedulerSuite.scala
@@ -72,7 +72,11 @@ class DummyTaskSetManager(
override def executorLost(executorId: String, host: String): Unit = {
}
- override def resourceOffer(execId: String, host: String, availableCpus: Double)
+ override def resourceOffer(
+ execId: String,
+ host: String,
+ availableCpus: Int,
+ maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
if (tasksFinished + runningTasks < numTasks) {
@@ -120,7 +124,7 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format(manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks))
}
for (taskSet <- taskSetQueue) {
- taskSet.resourceOffer("execId_1", "hostname_1", 1) match {
+ taskSet.resourceOffer("execId_1", "hostname_1", 1, TaskLocality.ANY) match {
case Some(task) =>
return taskSet.stageId
case None => {}
diff --git a/docs/configuration.md b/docs/configuration.md
index 99624a44aa..dff08a06f5 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -243,8 +243,34 @@ Apart from these, the following properties are also available, and may be useful
<td>3000</td>
<td>
Number of milliseconds to wait to launch a data-local task before giving up and launching it
- in a non-data-local location. You should increase this if your tasks are long and you are seeing
- poor data locality, but the default generally works well.
+ on a less-local node. The same wait will be used to step through multiple locality levels
+ (process-local, node-local, rack-local and then any). It is also possible to customize the
+ waiting time for each level by setting <code>spark.locality.wait.node</code>, etc.
+ You should increase this setting if your tasks are long and see poor locality, but the
+ default usually works well.
+ </td>
+</tr>
+<tr>
+ <td>spark.locality.wait.process</td>
+ <td>spark.locality.wait</td>
+ <td>
+ Customize the locality wait for process locality. This affects tasks that attempt to access
+ cached data in a particular executor process.
+ </td>
+</tr>
+<tr>
+ <td>spark.locality.wait.node</td>
+ <td>spark.locality.wait</td>
+ <td>
+ Customize the locality wait for node locality. For example, you can set this to 0 to skip
+ node locality and search immediately for rack locality (if your cluster has rack information).
+ </td>
+</tr>
+<tr>
+ <td>spark.locality.wait.rack</td>
+ <td>spark.locality.wait</td>
+ <td>
+ Customize the locality wait for rack locality.
</td>
</tr>
<tr>