aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-08-24 23:34:50 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-08-24 23:35:02 -0700
commitbb1357f362cdd96b854c2a0a193496ce709cdbdd (patch)
tree54bfe8adb585a54b613b016d4d4c710cd3bb714e
parent88991dc4f04b0c88466c6eab5ada43506c981341 (diff)
downloadspark-bb1357f362cdd96b854c2a0a193496ce709cdbdd.tar.gz
spark-bb1357f362cdd96b854c2a0a193496ce709cdbdd.tar.bz2
spark-bb1357f362cdd96b854c2a0a193496ce709cdbdd.zip
[SPARK-10137] [STREAMING] Avoid to restart receivers if scheduleReceivers returns balanced results
This PR fixes the following cases for `ReceiverSchedulingPolicy`. 1) Assume there are 4 executors: host1, host2, host3, host4, and 5 receivers: r1, r2, r3, r4, r5. Then `ReceiverSchedulingPolicy.scheduleReceivers` will return (r1 -> host1, r2 -> host2, r3 -> host3, r4 -> host4, r5 -> host1). Let's assume r1 starts at first on `host1` as `scheduleReceivers` suggested, and try to register with ReceiverTracker. But the previous `ReceiverSchedulingPolicy.rescheduleReceiver` will return (host2, host3, host4) according to the current executor weights (host1 -> 1.0, host2 -> 0.5, host3 -> 0.5, host4 -> 0.5), so ReceiverTracker will reject `r1`. This is unexpected since r1 is starting exactly where `scheduleReceivers` suggested. This case can be fixed by ignoring the information of the receiver that is rescheduling in `receiverTrackingInfoMap`. 2) Assume there are 3 executors (host1, host2, host3) and each executors has 3 cores, and 3 receivers: r1, r2, r3. Assume r1 is running on host1. Now r2 is restarting, the previous `ReceiverSchedulingPolicy.rescheduleReceiver` will always return (host1, host2, host3). So it's possible that r2 will be scheduled to host1 by TaskScheduler. r3 is similar. Then at last, it's possible that there are 3 receivers running on host1, while host2 and host3 are idle. This issue can be fixed by returning only executors that have the minimum wight rather than returning at least 3 executors. Author: zsxwing <zsxwing@gmail.com> Closes #8340 from zsxwing/fix-receiver-scheduling. (cherry picked from commit f023aa2fcc1d1dbb82aee568be0a8f2457c309ae) Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala58
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala106
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala13
3 files changed, 120 insertions, 57 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
index ef5b687b58..10b5a7f57a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
@@ -22,6 +22,36 @@ import scala.collection.mutable
import org.apache.spark.streaming.receiver.Receiver
+/**
+ * A class that tries to schedule receivers with evenly distributed. There are two phases for
+ * scheduling receivers.
+ *
+ * - The first phase is global scheduling when ReceiverTracker is starting and we need to schedule
+ * all receivers at the same time. ReceiverTracker will call `scheduleReceivers` at this phase.
+ * It will try to schedule receivers with evenly distributed. ReceiverTracker should update its
+ * receiverTrackingInfoMap according to the results of `scheduleReceivers`.
+ * `ReceiverTrackingInfo.scheduledExecutors` for each receiver will set to an executor list that
+ * contains the scheduled locations. Then when a receiver is starting, it will send a register
+ * request and `ReceiverTracker.registerReceiver` will be called. In
+ * `ReceiverTracker.registerReceiver`, if a receiver's scheduled executors is set, it should check
+ * if the location of this receiver is one of the scheduled executors, if not, the register will
+ * be rejected.
+ * - The second phase is local scheduling when a receiver is restarting. There are two cases of
+ * receiver restarting:
+ * - If a receiver is restarting because it's rejected due to the real location and the scheduled
+ * executors mismatching, in other words, it fails to start in one of the locations that
+ * `scheduleReceivers` suggested, `ReceiverTracker` should firstly choose the executors that are
+ * still alive in the list of scheduled executors, then use them to launch the receiver job.
+ * - If a receiver is restarting without a scheduled executors list, or the executors in the list
+ * are dead, `ReceiverTracker` should call `rescheduleReceiver`. If so, `ReceiverTracker` should
+ * not set `ReceiverTrackingInfo.scheduledExecutors` for this executor, instead, it should clear
+ * it. Then when this receiver is registering, we can know this is a local scheduling, and
+ * `ReceiverTrackingInfo` should call `rescheduleReceiver` again to check if the launching
+ * location is matching.
+ *
+ * In conclusion, we should make a global schedule, try to achieve that exactly as long as possible,
+ * otherwise do local scheduling.
+ */
private[streaming] class ReceiverSchedulingPolicy {
/**
@@ -102,8 +132,7 @@ private[streaming] class ReceiverSchedulingPolicy {
/**
* Return a list of candidate executors to run the receiver. If the list is empty, the caller can
- * run this receiver in arbitrary executor. The caller can use `preferredNumExecutors` to require
- * returning `preferredNumExecutors` executors if possible.
+ * run this receiver in arbitrary executor.
*
* This method tries to balance executors' load. Here is the approach to schedule executors
* for a receiver.
@@ -122,9 +151,8 @@ private[streaming] class ReceiverSchedulingPolicy {
* If a receiver is scheduled to an executor but has not yet run, it contributes
* `1.0 / #candidate_executors_of_this_receiver` to the executor's weight.</li>
* </ul>
- * At last, if there are more than `preferredNumExecutors` idle executors (weight = 0),
- * returns all idle executors. Otherwise, we only return `preferredNumExecutors` best options
- * according to the weights.
+ * At last, if there are any idle executors (weight = 0), returns all idle executors.
+ * Otherwise, returns the executors that have the minimum weight.
* </li>
* </ol>
*
@@ -134,8 +162,7 @@ private[streaming] class ReceiverSchedulingPolicy {
receiverId: Int,
preferredLocation: Option[String],
receiverTrackingInfoMap: Map[Int, ReceiverTrackingInfo],
- executors: Seq[String],
- preferredNumExecutors: Int = 3): Seq[String] = {
+ executors: Seq[String]): Seq[String] = {
if (executors.isEmpty) {
return Seq.empty
}
@@ -156,15 +183,18 @@ private[streaming] class ReceiverSchedulingPolicy {
}
}.groupBy(_._1).mapValues(_.map(_._2).sum) // Sum weights for each executor
- val idleExecutors = (executors.toSet -- executorWeights.keys).toSeq
- if (idleExecutors.size >= preferredNumExecutors) {
- // If there are more than `preferredNumExecutors` idle executors, return all of them
+ val idleExecutors = executors.toSet -- executorWeights.keys
+ if (idleExecutors.nonEmpty) {
scheduledExecutors ++= idleExecutors
} else {
- // If there are less than `preferredNumExecutors` idle executors, return 3 best options
- scheduledExecutors ++= idleExecutors
- val sortedExecutors = executorWeights.toSeq.sortBy(_._2).map(_._1)
- scheduledExecutors ++= (idleExecutors ++ sortedExecutors).take(preferredNumExecutors)
+ // There is no idle executor. So select all executors that have the minimum weight.
+ val sortedExecutors = executorWeights.toSeq.sortBy(_._2)
+ if (sortedExecutors.nonEmpty) {
+ val minWeight = sortedExecutors(0)._2
+ scheduledExecutors ++= sortedExecutors.takeWhile(_._2 == minWeight).map(_._1)
+ } else {
+ // This should not happen since "executors" is not empty
+ }
}
scheduledExecutors.toSeq
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 30d25a64e3..3d532a675d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -244,8 +244,21 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
}
if (isTrackerStopping || isTrackerStopped) {
- false
- } else if (!scheduleReceiver(streamId).contains(hostPort)) {
+ return false
+ }
+
+ val scheduledExecutors = receiverTrackingInfos(streamId).scheduledExecutors
+ val accetableExecutors = if (scheduledExecutors.nonEmpty) {
+ // This receiver is registering and it's scheduled by
+ // ReceiverSchedulingPolicy.scheduleReceivers. So use "scheduledExecutors" to check it.
+ scheduledExecutors.get
+ } else {
+ // This receiver is scheduled by "ReceiverSchedulingPolicy.rescheduleReceiver", so calling
+ // "ReceiverSchedulingPolicy.rescheduleReceiver" again to check it.
+ scheduleReceiver(streamId)
+ }
+
+ if (!accetableExecutors.contains(hostPort)) {
// Refuse it since it's scheduled to a wrong executor
false
} else {
@@ -426,12 +439,25 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
startReceiver(receiver, executors)
}
case RestartReceiver(receiver) =>
- val scheduledExecutors = schedulingPolicy.rescheduleReceiver(
- receiver.streamId,
- receiver.preferredLocation,
- receiverTrackingInfos,
- getExecutors)
- updateReceiverScheduledExecutors(receiver.streamId, scheduledExecutors)
+ // Old scheduled executors minus the ones that are not active any more
+ val oldScheduledExecutors = getStoredScheduledExecutors(receiver.streamId)
+ val scheduledExecutors = if (oldScheduledExecutors.nonEmpty) {
+ // Try global scheduling again
+ oldScheduledExecutors
+ } else {
+ val oldReceiverInfo = receiverTrackingInfos(receiver.streamId)
+ // Clear "scheduledExecutors" to indicate we are going to do local scheduling
+ val newReceiverInfo = oldReceiverInfo.copy(
+ state = ReceiverState.INACTIVE, scheduledExecutors = None)
+ receiverTrackingInfos(receiver.streamId) = newReceiverInfo
+ schedulingPolicy.rescheduleReceiver(
+ receiver.streamId,
+ receiver.preferredLocation,
+ receiverTrackingInfos,
+ getExecutors)
+ }
+ // Assume there is one receiver restarting at one time, so we don't need to update
+ // receiverTrackingInfos
startReceiver(receiver, scheduledExecutors)
case c: CleanupOldBlocks =>
receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c))
@@ -465,6 +491,24 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
}
/**
+ * Return the stored scheduled executors that are still alive.
+ */
+ private def getStoredScheduledExecutors(receiverId: Int): Seq[String] = {
+ if (receiverTrackingInfos.contains(receiverId)) {
+ val scheduledExecutors = receiverTrackingInfos(receiverId).scheduledExecutors
+ if (scheduledExecutors.nonEmpty) {
+ val executors = getExecutors.toSet
+ // Only return the alive executors
+ scheduledExecutors.get.filter(executors)
+ } else {
+ Nil
+ }
+ } else {
+ Nil
+ }
+ }
+
+ /**
* Start a receiver along with its scheduled executors
*/
private def startReceiver(receiver: Receiver[_], scheduledExecutors: Seq[String]): Unit = {
@@ -484,7 +528,23 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)
// Function to start the receiver on the worker node
- val startReceiverFunc = new StartReceiverFunc(checkpointDirOption, serializableHadoopConf)
+ val startReceiverFunc: Iterator[Receiver[_]] => Unit =
+ (iterator: Iterator[Receiver[_]]) => {
+ if (!iterator.hasNext) {
+ throw new SparkException(
+ "Could not start receiver as object not found.")
+ }
+ if (TaskContext.get().attemptNumber() == 0) {
+ val receiver = iterator.next()
+ assert(iterator.hasNext == false)
+ val supervisor = new ReceiverSupervisorImpl(
+ receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
+ supervisor.start()
+ supervisor.awaitTermination()
+ } else {
+ // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
+ }
+ }
// Create the RDD using the scheduledExecutors to run the receiver in a Spark job
val receiverRDD: RDD[Receiver[_]] =
@@ -541,31 +601,3 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
}
}
-
-/**
- * Function to start the receiver on the worker node. Use a class instead of closure to avoid
- * the serialization issue.
- */
-private[streaming] class StartReceiverFunc(
- checkpointDirOption: Option[String],
- serializableHadoopConf: SerializableConfiguration)
- extends (Iterator[Receiver[_]] => Unit) with Serializable {
-
- override def apply(iterator: Iterator[Receiver[_]]): Unit = {
- if (!iterator.hasNext) {
- throw new SparkException(
- "Could not start receiver as object not found.")
- }
- if (TaskContext.get().attemptNumber() == 0) {
- val receiver = iterator.next()
- assert(iterator.hasNext == false)
- val supervisor = new ReceiverSupervisorImpl(
- receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
- supervisor.start()
- supervisor.awaitTermination()
- } else {
- // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
- }
- }
-
-}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala
index 0418d776ec..b2a51d72ba 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala
@@ -39,7 +39,7 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
assert(scheduledExecutors.toSet === Set("host1", "host2"))
}
- test("rescheduleReceiver: return all idle executors if more than 3 idle executors") {
+ test("rescheduleReceiver: return all idle executors if there are any idle executors") {
val executors = Seq("host1", "host2", "host3", "host4", "host5")
// host3 is idle
val receiverTrackingInfoMap = Map(
@@ -49,16 +49,16 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
assert(scheduledExecutors.toSet === Set("host2", "host3", "host4", "host5"))
}
- test("rescheduleReceiver: return 3 best options if less than 3 idle executors") {
+ test("rescheduleReceiver: return all executors that have minimum weight if no idle executors") {
val executors = Seq("host1", "host2", "host3", "host4", "host5")
- // Weights: host1 = 1.5, host2 = 0.5, host3 = 1.0
- // host4 and host5 are idle
+ // Weights: host1 = 1.5, host2 = 0.5, host3 = 1.0, host4 = 0.5, host5 = 0.5
val receiverTrackingInfoMap = Map(
0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None, Some("host1")),
1 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED, Some(Seq("host2", "host3")), None),
- 2 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED, Some(Seq("host1", "host3")), None))
+ 2 -> ReceiverTrackingInfo(2, ReceiverState.SCHEDULED, Some(Seq("host1", "host3")), None),
+ 3 -> ReceiverTrackingInfo(4, ReceiverState.SCHEDULED, Some(Seq("host4", "host5")), None))
val scheduledExecutors = receiverSchedulingPolicy.rescheduleReceiver(
- 3, None, receiverTrackingInfoMap, executors)
+ 4, None, receiverTrackingInfoMap, executors)
assert(scheduledExecutors.toSet === Set("host2", "host4", "host5"))
}
@@ -127,4 +127,5 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
assert(executors.isEmpty)
}
}
+
}