aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-10-27 16:14:33 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-10-27 16:14:33 -0700
commit9fbd75ab5d46612e52116ec5b9ced70715cf26b5 (patch)
tree7546e9b6ded400f74c0ccb54ace082bba9bea0eb /core
parent4f030b9e82172659d250281782ac573cbd1438fc (diff)
downloadspark-9fbd75ab5d46612e52116ec5b9ced70715cf26b5.tar.gz
spark-9fbd75ab5d46612e52116ec5b9ced70715cf26b5.tar.bz2
spark-9fbd75ab5d46612e52116ec5b9ced70715cf26b5.zip
[SPARK-11212][CORE][STREAMING] Make preferred locations support ExecutorCacheTaskLocation and update…
… ReceiverTracker and ReceiverSchedulingPolicy to use it This PR includes the following changes: 1. Add a new preferred location format, `executor_<host>_<executorID>` (e.g., "executor_localhost_2"), to support specifying the executor locations for RDD. 2. Use the new preferred location format in `ReceiverTracker` to optimize the starting time of Receivers when there are multiple executors in a host. The goal of this PR is to enable the streaming scheduler to place receivers (which run as tasks) in specific executors. Basically, I want to have more control on the placement of the receivers such that they are evenly distributed among the executors. We tried to do this without changing the core scheduling logic. But it does not allow specifying particular executor as preferred location, only at the host level. So if there are two executors in the same host, and I want two receivers to run on them (one on each executor), I cannot specify that. Current code only specifies the host as preference, which may end up launching both receivers on the same executor. We try to work around it but restarting a receiver when it does not launch in the desired executor and hope that next time it will be started in the right one. But that cause lots of restarts, and delays in correctly launching the receiver. So this change, would allow the streaming scheduler to specify the exact executor as the preferred location. Also this is not exposed to the user, only the streaming scheduler uses this. Author: zsxwing <zsxwing@gmail.com> Closes #9181 from zsxwing/executor-location.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala17
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala1
2 files changed, 16 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala
index 1b65926f5c..1eb6c1614f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala
@@ -31,7 +31,9 @@ private[spark] sealed trait TaskLocation {
*/
private [spark]
case class ExecutorCacheTaskLocation(override val host: String, executorId: String)
- extends TaskLocation
+ extends TaskLocation {
+ override def toString: String = s"${TaskLocation.executorLocationTag}${host}_$executorId"
+}
/**
* A location on a host.
@@ -53,6 +55,9 @@ private[spark] object TaskLocation {
// confusion. See RFC 952 and RFC 1123 for information about the format of hostnames.
val inMemoryLocationTag = "hdfs_cache_"
+ // Identify locations of executors with this prefix.
+ val executorLocationTag = "executor_"
+
def apply(host: String, executorId: String): TaskLocation = {
new ExecutorCacheTaskLocation(host, executorId)
}
@@ -65,7 +70,15 @@ private[spark] object TaskLocation {
def apply(str: String): TaskLocation = {
val hstr = str.stripPrefix(inMemoryLocationTag)
if (hstr.equals(str)) {
- new HostTaskLocation(str)
+ if (str.startsWith(executorLocationTag)) {
+ val splits = str.split("_")
+ if (splits.length != 3) {
+ throw new IllegalArgumentException("Illegal executor location format: " + str)
+ }
+ new ExecutorCacheTaskLocation(splits(1), splits(2))
+ } else {
+ new HostTaskLocation(str)
+ }
} else {
new HDFSCacheTaskLocation(hstr)
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 695523cc8a..cd6bf723e7 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -779,6 +779,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("Test TaskLocation for different host type.") {
assert(TaskLocation("host1") === HostTaskLocation("host1"))
assert(TaskLocation("hdfs_cache_host1") === HDFSCacheTaskLocation("host1"))
+ assert(TaskLocation("executor_host1_3") === ExecutorCacheTaskLocation("host1", "3"))
}
def createTaskResult(id: Int): DirectTaskResult[Int] = {