aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-10-27 16:14:33 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-10-27 16:14:33 -0700
commit9fbd75ab5d46612e52116ec5b9ced70715cf26b5 (patch)
tree7546e9b6ded400f74c0ccb54ace082bba9bea0eb /streaming/src/test
parent4f030b9e82172659d250281782ac573cbd1438fc (diff)
downloadspark-9fbd75ab5d46612e52116ec5b9ced70715cf26b5.tar.gz
spark-9fbd75ab5d46612e52116ec5b9ced70715cf26b5.tar.bz2
spark-9fbd75ab5d46612e52116ec5b9ced70715cf26b5.zip
[SPARK-11212][CORE][STREAMING] Make preferred locations support ExecutorCacheTaskLocation and update…
… ReceiverTracker and ReceiverSchedulingPolicy to use it This PR includes the following changes: 1. Add a new preferred location format, `executor_<host>_<executorID>` (e.g., "executor_localhost_2"), to support specifying the executor locations for RDD. 2. Use the new preferred location format in `ReceiverTracker` to optimize the starting time of Receivers when there are multiple executors in a host. The goal of this PR is to enable the streaming scheduler to place receivers (which run as tasks) in specific executors. Basically, I want to have more control on the placement of the receivers such that they are evenly distributed among the executors. We tried to do this without changing the core scheduling logic. But it does not allow specifying particular executor as preferred location, only at the host level. So if there are two executors in the same host, and I want two receivers to run on them (one on each executor), I cannot specify that. Current code only specifies the host as preference, which may end up launching both receivers on the same executor. We try to work around it but restarting a receiver when it does not launch in the desired executor and hope that next time it will be started in the right one. But that cause lots of restarts, and delays in correctly launching the receiver. So this change, would allow the streaming scheduler to specify the exact executor as the preferred location. Also this is not exposed to the user, only the streaming scheduler uses this. Author: zsxwing <zsxwing@gmail.com> Closes #9181 from zsxwing/executor-location.
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala110
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala4
2 files changed, 72 insertions, 42 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala
index b2a51d72ba..05b4e66c63 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala
@@ -20,73 +20,96 @@ package org.apache.spark.streaming.scheduler
import scala.collection.mutable
import org.apache.spark.SparkFunSuite
+import org.apache.spark.scheduler.{ExecutorCacheTaskLocation, HostTaskLocation, TaskLocation}
class ReceiverSchedulingPolicySuite extends SparkFunSuite {
val receiverSchedulingPolicy = new ReceiverSchedulingPolicy
test("rescheduleReceiver: empty executors") {
- val scheduledExecutors =
+ val scheduledLocations =
receiverSchedulingPolicy.rescheduleReceiver(0, None, Map.empty, executors = Seq.empty)
- assert(scheduledExecutors === Seq.empty)
+ assert(scheduledLocations === Seq.empty)
}
test("rescheduleReceiver: receiver preferredLocation") {
+ val executors = Seq(ExecutorCacheTaskLocation("host2", "2"))
val receiverTrackingInfoMap = Map(
0 -> ReceiverTrackingInfo(0, ReceiverState.INACTIVE, None, None))
- val scheduledExecutors = receiverSchedulingPolicy.rescheduleReceiver(
- 0, Some("host1"), receiverTrackingInfoMap, executors = Seq("host2"))
- assert(scheduledExecutors.toSet === Set("host1", "host2"))
+ val scheduledLocations = receiverSchedulingPolicy.rescheduleReceiver(
+ 0, Some("host1"), receiverTrackingInfoMap, executors)
+ assert(scheduledLocations.toSet === Set(HostTaskLocation("host1"), executors(0)))
}
test("rescheduleReceiver: return all idle executors if there are any idle executors") {
- val executors = Seq("host1", "host2", "host3", "host4", "host5")
- // host3 is idle
+ val executors = (1 to 5).map(i => ExecutorCacheTaskLocation(s"host$i", s"$i"))
+ // executor 1 is busy, others are idle.
val receiverTrackingInfoMap = Map(
- 0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None, Some("host1")))
- val scheduledExecutors = receiverSchedulingPolicy.rescheduleReceiver(
+ 0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None, Some(executors(0))))
+ val scheduledLocations = receiverSchedulingPolicy.rescheduleReceiver(
1, None, receiverTrackingInfoMap, executors)
- assert(scheduledExecutors.toSet === Set("host2", "host3", "host4", "host5"))
+ assert(scheduledLocations.toSet === executors.tail.toSet)
}
test("rescheduleReceiver: return all executors that have minimum weight if no idle executors") {
- val executors = Seq("host1", "host2", "host3", "host4", "host5")
+ val executors = Seq(
+ ExecutorCacheTaskLocation("host1", "1"),
+ ExecutorCacheTaskLocation("host2", "2"),
+ ExecutorCacheTaskLocation("host3", "3"),
+ ExecutorCacheTaskLocation("host4", "4"),
+ ExecutorCacheTaskLocation("host5", "5")
+ )
// Weights: host1 = 1.5, host2 = 0.5, host3 = 1.0, host4 = 0.5, host5 = 0.5
val receiverTrackingInfoMap = Map(
- 0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None, Some("host1")),
- 1 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED, Some(Seq("host2", "host3")), None),
- 2 -> ReceiverTrackingInfo(2, ReceiverState.SCHEDULED, Some(Seq("host1", "host3")), None),
- 3 -> ReceiverTrackingInfo(4, ReceiverState.SCHEDULED, Some(Seq("host4", "host5")), None))
- val scheduledExecutors = receiverSchedulingPolicy.rescheduleReceiver(
+ 0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None,
+ Some(ExecutorCacheTaskLocation("host1", "1"))),
+ 1 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED,
+ Some(Seq(ExecutorCacheTaskLocation("host2", "2"), ExecutorCacheTaskLocation("host3", "3"))),
+ None),
+ 2 -> ReceiverTrackingInfo(2, ReceiverState.SCHEDULED,
+ Some(Seq(ExecutorCacheTaskLocation("host1", "1"), ExecutorCacheTaskLocation("host3", "3"))),
+ None),
+ 3 -> ReceiverTrackingInfo(4, ReceiverState.SCHEDULED,
+ Some(Seq(ExecutorCacheTaskLocation("host4", "4"),
+ ExecutorCacheTaskLocation("host5", "5"))), None))
+ val scheduledLocations = receiverSchedulingPolicy.rescheduleReceiver(
4, None, receiverTrackingInfoMap, executors)
- assert(scheduledExecutors.toSet === Set("host2", "host4", "host5"))
+ val expectedScheduledLocations = Set(
+ ExecutorCacheTaskLocation("host2", "2"),
+ ExecutorCacheTaskLocation("host4", "4"),
+ ExecutorCacheTaskLocation("host5", "5")
+ )
+ assert(scheduledLocations.toSet === expectedScheduledLocations)
}
test("scheduleReceivers: " +
"schedule receivers evenly when there are more receivers than executors") {
val receivers = (0 until 6).map(new RateTestReceiver(_))
- val executors = (10000 until 10003).map(port => s"localhost:${port}")
- val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
- val numReceiversOnExecutor = mutable.HashMap[String, Int]()
+ val executors = (0 until 3).map(executorId =>
+ ExecutorCacheTaskLocation("localhost", executorId.toString))
+ val scheduledLocations = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
+ val numReceiversOnExecutor = mutable.HashMap[TaskLocation, Int]()
// There should be 2 receivers running on each executor and each receiver has one executor
- scheduledExecutors.foreach { case (receiverId, executors) =>
- assert(executors.size == 1)
- numReceiversOnExecutor(executors(0)) = numReceiversOnExecutor.getOrElse(executors(0), 0) + 1
+ scheduledLocations.foreach { case (receiverId, locations) =>
+ assert(locations.size == 1)
+ assert(locations(0).isInstanceOf[ExecutorCacheTaskLocation])
+ numReceiversOnExecutor(locations(0)) = numReceiversOnExecutor.getOrElse(locations(0), 0) + 1
}
assert(numReceiversOnExecutor === executors.map(_ -> 2).toMap)
}
-
test("scheduleReceivers: " +
"schedule receivers evenly when there are more executors than receivers") {
val receivers = (0 until 3).map(new RateTestReceiver(_))
- val executors = (10000 until 10006).map(port => s"localhost:${port}")
- val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
- val numReceiversOnExecutor = mutable.HashMap[String, Int]()
+ val executors = (0 until 6).map(executorId =>
+ ExecutorCacheTaskLocation("localhost", executorId.toString))
+ val scheduledLocations = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
+ val numReceiversOnExecutor = mutable.HashMap[TaskLocation, Int]()
// There should be 1 receiver running on each executor and each receiver has two executors
- scheduledExecutors.foreach { case (receiverId, executors) =>
- assert(executors.size == 2)
- executors.foreach { l =>
+ scheduledLocations.foreach { case (receiverId, locations) =>
+ assert(locations.size == 2)
+ locations.foreach { l =>
+ assert(l.isInstanceOf[ExecutorCacheTaskLocation])
numReceiversOnExecutor(l) = numReceiversOnExecutor.getOrElse(l, 0) + 1
}
}
@@ -96,34 +119,41 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
test("scheduleReceivers: schedule receivers evenly when the preferredLocations are even") {
val receivers = (0 until 3).map(new RateTestReceiver(_)) ++
(3 until 6).map(new RateTestReceiver(_, Some("localhost")))
- val executors = (10000 until 10003).map(port => s"localhost:${port}") ++
- (10003 until 10006).map(port => s"localhost2:${port}")
- val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
- val numReceiversOnExecutor = mutable.HashMap[String, Int]()
+ val executors = (0 until 3).map(executorId =>
+ ExecutorCacheTaskLocation("localhost", executorId.toString)) ++
+ (3 until 6).map(executorId =>
+ ExecutorCacheTaskLocation("localhost2", executorId.toString))
+ val scheduledLocations = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
+ val numReceiversOnExecutor = mutable.HashMap[TaskLocation, Int]()
// There should be 1 receiver running on each executor and each receiver has 1 executor
- scheduledExecutors.foreach { case (receiverId, executors) =>
+ scheduledLocations.foreach { case (receiverId, executors) =>
assert(executors.size == 1)
executors.foreach { l =>
+ assert(l.isInstanceOf[ExecutorCacheTaskLocation])
numReceiversOnExecutor(l) = numReceiversOnExecutor.getOrElse(l, 0) + 1
}
}
assert(numReceiversOnExecutor === executors.map(_ -> 1).toMap)
// Make sure we schedule the receivers to their preferredLocations
val executorsForReceiversWithPreferredLocation =
- scheduledExecutors.filter { case (receiverId, executors) => receiverId >= 3 }.flatMap(_._2)
+ scheduledLocations.filter { case (receiverId, executors) => receiverId >= 3 }.flatMap(_._2)
// We can simply check the executor set because we only know each receiver only has 1 executor
assert(executorsForReceiversWithPreferredLocation.toSet ===
- (10000 until 10003).map(port => s"localhost:${port}").toSet)
+ (0 until 3).map(executorId =>
+ ExecutorCacheTaskLocation("localhost", executorId.toString)
+ ).toSet)
}
test("scheduleReceivers: return empty if no receiver") {
- assert(receiverSchedulingPolicy.scheduleReceivers(Seq.empty, Seq("localhost:10000")).isEmpty)
+ val scheduledLocations = receiverSchedulingPolicy.
+ scheduleReceivers(Seq.empty, Seq(ExecutorCacheTaskLocation("localhost", "1")))
+ assert(scheduledLocations.isEmpty)
}
test("scheduleReceivers: return empty scheduled executors if no executors") {
val receivers = (0 until 3).map(new RateTestReceiver(_))
- val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, Seq.empty)
- scheduledExecutors.foreach { case (receiverId, executors) =>
+ val scheduledLocations = receiverSchedulingPolicy.scheduleReceivers(receivers, Seq.empty)
+ scheduledLocations.foreach { case (receiverId, executors) =>
assert(executors.isEmpty)
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
index fda86aef45..3bd8d086ab 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
@@ -99,8 +99,8 @@ class ReceiverTrackerSuite extends TestSuiteBase {
output.register()
ssc.start()
eventually(timeout(10 seconds), interval(10 millis)) {
- // If preferredLocations is set correctly, receiverTaskLocality should be NODE_LOCAL
- assert(receiverTaskLocality === TaskLocality.NODE_LOCAL)
+ // If preferredLocations is set correctly, receiverTaskLocality should be PROCESS_LOCAL
+ assert(receiverTaskLocality === TaskLocality.PROCESS_LOCAL)
}
}
}