aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-10-27 16:14:33 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-10-27 16:14:33 -0700
commit9fbd75ab5d46612e52116ec5b9ced70715cf26b5 (patch)
tree7546e9b6ded400f74c0ccb54ace082bba9bea0eb /streaming
parent4f030b9e82172659d250281782ac573cbd1438fc (diff)
downloadspark-9fbd75ab5d46612e52116ec5b9ced70715cf26b5.tar.gz
spark-9fbd75ab5d46612e52116ec5b9ced70715cf26b5.tar.bz2
spark-9fbd75ab5d46612e52116ec5b9ced70715cf26b5.zip
[SPARK-11212][CORE][STREAMING] Make preferred locations support ExecutorCacheTaskLocation and update…
… ReceiverTracker and ReceiverSchedulingPolicy to use it This PR includes the following changes: 1. Add a new preferred location format, `executor_<host>_<executorID>` (e.g., "executor_localhost_2"), to support specifying the executor locations for RDD. 2. Use the new preferred location format in `ReceiverTracker` to optimize the starting time of Receivers when there are multiple executors in a host. The goal of this PR is to enable the streaming scheduler to place receivers (which run as tasks) in specific executors. Basically, I want to have more control on the placement of the receivers such that they are evenly distributed among the executors. We tried to do this without changing the core scheduling logic. But it does not allow specifying particular executor as preferred location, only at the host level. So if there are two executors in the same host, and I want two receivers to run on them (one on each executor), I cannot specify that. Current code only specifies the host as preference, which may end up launching both receivers on the same executor. We try to work around it but restarting a receiver when it does not launch in the desired executor and hope that next time it will be started in the right one. But that cause lots of restarts, and delays in correctly launching the receiver. So this change, would allow the streaming scheduler to specify the exact executor as the preferred location. Also this is not exposed to the user, only the streaming scheduler uses this. Author: zsxwing <zsxwing@gmail.com> Closes #9181 from zsxwing/executor-location.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala110
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala93
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala9
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala110
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala4
6 files changed, 201 insertions, 130 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 59ef58d232..167f56aa42 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -47,7 +47,8 @@ private[streaming] class ReceiverSupervisorImpl(
checkpointDirOption: Option[String]
) extends ReceiverSupervisor(receiver, env.conf) with Logging {
- private val hostPort = SparkEnv.get.blockManager.blockManagerId.hostPort
+ private val host = SparkEnv.get.blockManager.blockManagerId.host
+ private val executorId = SparkEnv.get.blockManager.blockManagerId.executorId
private val receivedBlockHandler: ReceivedBlockHandler = {
if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
@@ -179,7 +180,7 @@ private[streaming] class ReceiverSupervisorImpl(
override protected def onReceiverStart(): Boolean = {
val msg = RegisterReceiver(
- streamId, receiver.getClass.getSimpleName, hostPort, endpoint)
+ streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
trackerEndpoint.askWithRetry[Boolean](msg)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
index d2b0be7f4a..234bc8660d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
@@ -20,8 +20,8 @@ package org.apache.spark.streaming.scheduler
import scala.collection.Map
import scala.collection.mutable
+import org.apache.spark.scheduler.{ExecutorCacheTaskLocation, TaskLocation}
import org.apache.spark.streaming.receiver.Receiver
-import org.apache.spark.util.Utils
/**
* A class that tries to schedule receivers with evenly distributed. There are two phases for
@@ -29,23 +29,23 @@ import org.apache.spark.util.Utils
*
* - The first phase is global scheduling when ReceiverTracker is starting and we need to schedule
* all receivers at the same time. ReceiverTracker will call `scheduleReceivers` at this phase.
- * It will try to schedule receivers with evenly distributed. ReceiverTracker should update its
- * receiverTrackingInfoMap according to the results of `scheduleReceivers`.
- * `ReceiverTrackingInfo.scheduledExecutors` for each receiver will set to an executor list that
- * contains the scheduled locations. Then when a receiver is starting, it will send a register
- * request and `ReceiverTracker.registerReceiver` will be called. In
- * `ReceiverTracker.registerReceiver`, if a receiver's scheduled executors is set, it should check
- * if the location of this receiver is one of the scheduled executors, if not, the register will
+ * It will try to schedule receivers such that they are evenly distributed. ReceiverTracker should
+ * update its `receiverTrackingInfoMap` according to the results of `scheduleReceivers`.
+ * `ReceiverTrackingInfo.scheduledLocations` for each receiver should be set to an location list
+ * that contains the scheduled locations. Then when a receiver is starting, it will send a
+ * register request and `ReceiverTracker.registerReceiver` will be called. In
+ * `ReceiverTracker.registerReceiver`, if a receiver's scheduled locations is set, it should check
+ * if the location of this receiver is one of the scheduled locations, if not, the register will
* be rejected.
* - The second phase is local scheduling when a receiver is restarting. There are two cases of
* receiver restarting:
* - If a receiver is restarting because it's rejected due to the real location and the scheduled
- * executors mismatching, in other words, it fails to start in one of the locations that
+ * locations mismatching, in other words, it fails to start in one of the locations that
* `scheduleReceivers` suggested, `ReceiverTracker` should firstly choose the executors that are
- * still alive in the list of scheduled executors, then use them to launch the receiver job.
- * - If a receiver is restarting without a scheduled executors list, or the executors in the list
+ * still alive in the list of scheduled locations, then use them to launch the receiver job.
+ * - If a receiver is restarting without a scheduled locations list, or the executors in the list
* are dead, `ReceiverTracker` should call `rescheduleReceiver`. If so, `ReceiverTracker` should
- * not set `ReceiverTrackingInfo.scheduledExecutors` for this executor, instead, it should clear
+ * not set `ReceiverTrackingInfo.scheduledLocations` for this receiver, instead, it should clear
* it. Then when this receiver is registering, we can know this is a local scheduling, and
* `ReceiverTrackingInfo` should call `rescheduleReceiver` again to check if the launching
* location is matching.
@@ -69,9 +69,12 @@ private[streaming] class ReceiverSchedulingPolicy {
* </ol>
*
* This method is called when we start to launch receivers at the first time.
+ *
+ * @return a map for receivers and their scheduled locations
*/
def scheduleReceivers(
- receivers: Seq[Receiver[_]], executors: Seq[String]): Map[Int, Seq[String]] = {
+ receivers: Seq[Receiver[_]],
+ executors: Seq[ExecutorCacheTaskLocation]): Map[Int, Seq[TaskLocation]] = {
if (receivers.isEmpty) {
return Map.empty
}
@@ -80,16 +83,16 @@ private[streaming] class ReceiverSchedulingPolicy {
return receivers.map(_.streamId -> Seq.empty).toMap
}
- val hostToExecutors = executors.groupBy(executor => Utils.parseHostPort(executor)._1)
- val scheduledExecutors = Array.fill(receivers.length)(new mutable.ArrayBuffer[String])
- val numReceiversOnExecutor = mutable.HashMap[String, Int]()
+ val hostToExecutors = executors.groupBy(_.host)
+ val scheduledLocations = Array.fill(receivers.length)(new mutable.ArrayBuffer[TaskLocation])
+ val numReceiversOnExecutor = mutable.HashMap[ExecutorCacheTaskLocation, Int]()
// Set the initial value to 0
executors.foreach(e => numReceiversOnExecutor(e) = 0)
// Firstly, we need to respect "preferredLocation". So if a receiver has "preferredLocation",
// we need to make sure the "preferredLocation" is in the candidate scheduled executor list.
for (i <- 0 until receivers.length) {
- // Note: preferredLocation is host but executors are host:port
+ // Note: preferredLocation is host but executors are host_executorId
receivers(i).preferredLocation.foreach { host =>
hostToExecutors.get(host) match {
case Some(executorsOnHost) =>
@@ -97,7 +100,7 @@ private[streaming] class ReceiverSchedulingPolicy {
// this host
val leastScheduledExecutor =
executorsOnHost.minBy(executor => numReceiversOnExecutor(executor))
- scheduledExecutors(i) += leastScheduledExecutor
+ scheduledLocations(i) += leastScheduledExecutor
numReceiversOnExecutor(leastScheduledExecutor) =
numReceiversOnExecutor(leastScheduledExecutor) + 1
case None =>
@@ -106,17 +109,20 @@ private[streaming] class ReceiverSchedulingPolicy {
// 1. This executor is not up. But it may be up later.
// 2. This executor is dead, or it's not a host in the cluster.
// Currently, simply add host to the scheduled executors.
- scheduledExecutors(i) += host
+
+ // Note: host could be `HDFSCacheTaskLocation`, so use `TaskLocation.apply` to handle
+ // this case
+ scheduledLocations(i) += TaskLocation(host)
}
}
}
// For those receivers that don't have preferredLocation, make sure we assign at least one
// executor to them.
- for (scheduledExecutorsForOneReceiver <- scheduledExecutors.filter(_.isEmpty)) {
+ for (scheduledLocationsForOneReceiver <- scheduledLocations.filter(_.isEmpty)) {
// Select the executor that has the least receivers
val (leastScheduledExecutor, numReceivers) = numReceiversOnExecutor.minBy(_._2)
- scheduledExecutorsForOneReceiver += leastScheduledExecutor
+ scheduledLocationsForOneReceiver += leastScheduledExecutor
numReceiversOnExecutor(leastScheduledExecutor) = numReceivers + 1
}
@@ -124,22 +130,22 @@ private[streaming] class ReceiverSchedulingPolicy {
val idleExecutors = numReceiversOnExecutor.filter(_._2 == 0).map(_._1)
for (executor <- idleExecutors) {
// Assign an idle executor to the receiver that has least candidate executors.
- val leastScheduledExecutors = scheduledExecutors.minBy(_.size)
+ val leastScheduledExecutors = scheduledLocations.minBy(_.size)
leastScheduledExecutors += executor
}
- receivers.map(_.streamId).zip(scheduledExecutors).toMap
+ receivers.map(_.streamId).zip(scheduledLocations).toMap
}
/**
- * Return a list of candidate executors to run the receiver. If the list is empty, the caller can
+ * Return a list of candidate locations to run the receiver. If the list is empty, the caller can
* run this receiver in arbitrary executor.
*
* This method tries to balance executors' load. Here is the approach to schedule executors
* for a receiver.
* <ol>
* <li>
- * If preferredLocation is set, preferredLocation should be one of the candidate executors.
+ * If preferredLocation is set, preferredLocation should be one of the candidate locations.
* </li>
* <li>
* Every executor will be assigned to a weight according to the receivers running or
@@ -163,40 +169,58 @@ private[streaming] class ReceiverSchedulingPolicy {
receiverId: Int,
preferredLocation: Option[String],
receiverTrackingInfoMap: Map[Int, ReceiverTrackingInfo],
- executors: Seq[String]): Seq[String] = {
+ executors: Seq[ExecutorCacheTaskLocation]): Seq[TaskLocation] = {
if (executors.isEmpty) {
return Seq.empty
}
// Always try to schedule to the preferred locations
- val scheduledExecutors = mutable.Set[String]()
- scheduledExecutors ++= preferredLocation
-
- val executorWeights = receiverTrackingInfoMap.values.flatMap { receiverTrackingInfo =>
- receiverTrackingInfo.state match {
- case ReceiverState.INACTIVE => Nil
- case ReceiverState.SCHEDULED =>
- val scheduledExecutors = receiverTrackingInfo.scheduledExecutors.get
- // The probability that a scheduled receiver will run in an executor is
- // 1.0 / scheduledLocations.size
- scheduledExecutors.map(location => location -> (1.0 / scheduledExecutors.size))
- case ReceiverState.ACTIVE => Seq(receiverTrackingInfo.runningExecutor.get -> 1.0)
- }
- }.groupBy(_._1).mapValues(_.map(_._2).sum) // Sum weights for each executor
+ val scheduledLocations = mutable.Set[TaskLocation]()
+ // Note: preferredLocation could be `HDFSCacheTaskLocation`, so use `TaskLocation.apply` to
+ // handle this case
+ scheduledLocations ++= preferredLocation.map(TaskLocation(_))
+
+ val executorWeights: Map[ExecutorCacheTaskLocation, Double] = {
+ receiverTrackingInfoMap.values.flatMap(convertReceiverTrackingInfoToExecutorWeights)
+ .groupBy(_._1).mapValues(_.map(_._2).sum) // Sum weights for each executor
+ }
val idleExecutors = executors.toSet -- executorWeights.keys
if (idleExecutors.nonEmpty) {
- scheduledExecutors ++= idleExecutors
+ scheduledLocations ++= idleExecutors
} else {
// There is no idle executor. So select all executors that have the minimum weight.
val sortedExecutors = executorWeights.toSeq.sortBy(_._2)
if (sortedExecutors.nonEmpty) {
val minWeight = sortedExecutors(0)._2
- scheduledExecutors ++= sortedExecutors.takeWhile(_._2 == minWeight).map(_._1)
+ scheduledLocations ++= sortedExecutors.takeWhile(_._2 == minWeight).map(_._1)
} else {
// This should not happen since "executors" is not empty
}
}
- scheduledExecutors.toSeq
+ scheduledLocations.toSeq
+ }
+
+ /**
+ * This method tries to convert a receiver tracking info to executor weights. Every executor will
+ * be assigned to a weight according to the receivers running or scheduling on it:
+ *
+ * - If a receiver is running on an executor, it contributes 1.0 to the executor's weight.
+ * - If a receiver is scheduled to an executor but has not yet run, it contributes
+ * `1.0 / #candidate_executors_of_this_receiver` to the executor's weight.
+ */
+ private def convertReceiverTrackingInfoToExecutorWeights(
+ receiverTrackingInfo: ReceiverTrackingInfo): Seq[(ExecutorCacheTaskLocation, Double)] = {
+ receiverTrackingInfo.state match {
+ case ReceiverState.INACTIVE => Nil
+ case ReceiverState.SCHEDULED =>
+ val scheduledLocations = receiverTrackingInfo.scheduledLocations.get
+ // The probability that a scheduled receiver will run in an executor is
+ // 1.0 / scheduledLocations.size
+ scheduledLocations.filter(_.isInstanceOf[ExecutorCacheTaskLocation]).map { location =>
+ location.asInstanceOf[ExecutorCacheTaskLocation] -> (1.0 / scheduledLocations.size)
+ }
+ case ReceiverState.ACTIVE => Seq(receiverTrackingInfo.runningExecutor.get -> 1.0)
+ }
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 2ce80d618b..b183d856f5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -17,20 +17,21 @@
package org.apache.spark.streaming.scheduler
-import java.util.concurrent.{TimeUnit, CountDownLatch}
+import java.util.concurrent.{CountDownLatch, TimeUnit}
import scala.collection.mutable.HashMap
import scala.concurrent.ExecutionContext
import scala.language.existentials
import scala.util.{Failure, Success}
-import org.apache.spark.streaming.util.WriteAheadLogUtils
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.rpc._
+import org.apache.spark.scheduler.{TaskLocation, ExecutorCacheTaskLocation}
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.receiver._
-import org.apache.spark.util.{Utils, ThreadUtils, SerializableConfiguration}
+import org.apache.spark.streaming.util.WriteAheadLogUtils
+import org.apache.spark.util.{SerializableConfiguration, ThreadUtils, Utils}
/** Enumeration to identify current state of a Receiver */
@@ -47,7 +48,8 @@ private[streaming] sealed trait ReceiverTrackerMessage
private[streaming] case class RegisterReceiver(
streamId: Int,
typ: String,
- hostPort: String,
+ host: String,
+ executorId: String,
receiverEndpoint: RpcEndpointRef
) extends ReceiverTrackerMessage
private[streaming] case class AddBlock(receivedBlockInfo: ReceivedBlockInfo)
@@ -235,7 +237,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
private def registerReceiver(
streamId: Int,
typ: String,
- hostPort: String,
+ host: String,
+ executorId: String,
receiverEndpoint: RpcEndpointRef,
senderAddress: RpcAddress
): Boolean = {
@@ -247,18 +250,23 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
return false
}
- val scheduledExecutors = receiverTrackingInfos(streamId).scheduledExecutors
- val accetableExecutors = if (scheduledExecutors.nonEmpty) {
+ val scheduledLocations = receiverTrackingInfos(streamId).scheduledLocations
+ val acceptableExecutors = if (scheduledLocations.nonEmpty) {
// This receiver is registering and it's scheduled by
- // ReceiverSchedulingPolicy.scheduleReceivers. So use "scheduledExecutors" to check it.
- scheduledExecutors.get
+ // ReceiverSchedulingPolicy.scheduleReceivers. So use "scheduledLocations" to check it.
+ scheduledLocations.get
} else {
// This receiver is scheduled by "ReceiverSchedulingPolicy.rescheduleReceiver", so calling
// "ReceiverSchedulingPolicy.rescheduleReceiver" again to check it.
scheduleReceiver(streamId)
}
- if (!accetableExecutors.contains(hostPort)) {
+ def isAcceptable: Boolean = acceptableExecutors.exists {
+ case loc: ExecutorCacheTaskLocation => loc.executorId == executorId
+ case loc: TaskLocation => loc.host == host
+ }
+
+ if (!isAcceptable) {
// Refuse it since it's scheduled to a wrong executor
false
} else {
@@ -266,8 +274,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
val receiverTrackingInfo = ReceiverTrackingInfo(
streamId,
ReceiverState.ACTIVE,
- scheduledExecutors = None,
- runningExecutor = Some(hostPort),
+ scheduledLocations = None,
+ runningExecutor = Some(ExecutorCacheTaskLocation(host, executorId)),
name = Some(name),
endpoint = Some(receiverEndpoint))
receiverTrackingInfos.put(streamId, receiverTrackingInfo)
@@ -338,25 +346,25 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
logWarning(s"Error reported by receiver for stream $streamId: $messageWithError")
}
- private def scheduleReceiver(receiverId: Int): Seq[String] = {
+ private def scheduleReceiver(receiverId: Int): Seq[TaskLocation] = {
val preferredLocation = receiverPreferredLocations.getOrElse(receiverId, None)
- val scheduledExecutors = schedulingPolicy.rescheduleReceiver(
+ val scheduledLocations = schedulingPolicy.rescheduleReceiver(
receiverId, preferredLocation, receiverTrackingInfos, getExecutors)
- updateReceiverScheduledExecutors(receiverId, scheduledExecutors)
- scheduledExecutors
+ updateReceiverScheduledExecutors(receiverId, scheduledLocations)
+ scheduledLocations
}
private def updateReceiverScheduledExecutors(
- receiverId: Int, scheduledExecutors: Seq[String]): Unit = {
+ receiverId: Int, scheduledLocations: Seq[TaskLocation]): Unit = {
val newReceiverTrackingInfo = receiverTrackingInfos.get(receiverId) match {
case Some(oldInfo) =>
oldInfo.copy(state = ReceiverState.SCHEDULED,
- scheduledExecutors = Some(scheduledExecutors))
+ scheduledLocations = Some(scheduledLocations))
case None =>
ReceiverTrackingInfo(
receiverId,
ReceiverState.SCHEDULED,
- Some(scheduledExecutors),
+ Some(scheduledLocations),
runningExecutor = None)
}
receiverTrackingInfos.put(receiverId, newReceiverTrackingInfo)
@@ -370,13 +378,16 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
/**
* Get the list of executors excluding driver
*/
- private def getExecutors: Seq[String] = {
+ private def getExecutors: Seq[ExecutorCacheTaskLocation] = {
if (ssc.sc.isLocal) {
- Seq(ssc.sparkContext.env.blockManager.blockManagerId.hostPort)
+ val blockManagerId = ssc.sparkContext.env.blockManager.blockManagerId
+ Seq(ExecutorCacheTaskLocation(blockManagerId.host, blockManagerId.executorId))
} else {
ssc.sparkContext.env.blockManager.master.getMemoryStatus.filter { case (blockManagerId, _) =>
blockManagerId.executorId != SparkContext.DRIVER_IDENTIFIER // Ignore the driver location
- }.map { case (blockManagerId, _) => blockManagerId.hostPort }.toSeq
+ }.map { case (blockManagerId, _) =>
+ ExecutorCacheTaskLocation(blockManagerId.host, blockManagerId.executorId)
+ }.toSeq
}
}
@@ -431,9 +442,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
override def receive: PartialFunction[Any, Unit] = {
// Local messages
case StartAllReceivers(receivers) =>
- val scheduledExecutors = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
+ val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
for (receiver <- receivers) {
- val executors = scheduledExecutors(receiver.streamId)
+ val executors = scheduledLocations(receiver.streamId)
updateReceiverScheduledExecutors(receiver.streamId, executors)
receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
startReceiver(receiver, executors)
@@ -441,14 +452,14 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
case RestartReceiver(receiver) =>
// Old scheduled executors minus the ones that are not active any more
val oldScheduledExecutors = getStoredScheduledExecutors(receiver.streamId)
- val scheduledExecutors = if (oldScheduledExecutors.nonEmpty) {
+ val scheduledLocations = if (oldScheduledExecutors.nonEmpty) {
// Try global scheduling again
oldScheduledExecutors
} else {
val oldReceiverInfo = receiverTrackingInfos(receiver.streamId)
- // Clear "scheduledExecutors" to indicate we are going to do local scheduling
+ // Clear "scheduledLocations" to indicate we are going to do local scheduling
val newReceiverInfo = oldReceiverInfo.copy(
- state = ReceiverState.INACTIVE, scheduledExecutors = None)
+ state = ReceiverState.INACTIVE, scheduledLocations = None)
receiverTrackingInfos(receiver.streamId) = newReceiverInfo
schedulingPolicy.rescheduleReceiver(
receiver.streamId,
@@ -458,7 +469,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
}
// Assume there is one receiver restarting at one time, so we don't need to update
// receiverTrackingInfos
- startReceiver(receiver, scheduledExecutors)
+ startReceiver(receiver, scheduledLocations)
case c: CleanupOldBlocks =>
receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c))
case UpdateReceiverRateLimit(streamUID, newRate) =>
@@ -472,9 +483,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
// Remote messages
- case RegisterReceiver(streamId, typ, hostPort, receiverEndpoint) =>
+ case RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) =>
val successful =
- registerReceiver(streamId, typ, hostPort, receiverEndpoint, context.senderAddress)
+ registerReceiver(streamId, typ, host, executorId, receiverEndpoint, context.senderAddress)
context.reply(successful)
case AddBlock(receivedBlockInfo) =>
context.reply(addBlock(receivedBlockInfo))
@@ -493,13 +504,16 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
/**
* Return the stored scheduled executors that are still alive.
*/
- private def getStoredScheduledExecutors(receiverId: Int): Seq[String] = {
+ private def getStoredScheduledExecutors(receiverId: Int): Seq[TaskLocation] = {
if (receiverTrackingInfos.contains(receiverId)) {
- val scheduledExecutors = receiverTrackingInfos(receiverId).scheduledExecutors
- if (scheduledExecutors.nonEmpty) {
+ val scheduledLocations = receiverTrackingInfos(receiverId).scheduledLocations
+ if (scheduledLocations.nonEmpty) {
val executors = getExecutors.toSet
// Only return the alive executors
- scheduledExecutors.get.filter(executors)
+ scheduledLocations.get.filter {
+ case loc: ExecutorCacheTaskLocation => executors(loc)
+ case loc: TaskLocation => true
+ }
} else {
Nil
}
@@ -511,7 +525,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
/**
* Start a receiver along with its scheduled executors
*/
- private def startReceiver(receiver: Receiver[_], scheduledExecutors: Seq[String]): Unit = {
+ private def startReceiver(
+ receiver: Receiver[_],
+ scheduledLocations: Seq[TaskLocation]): Unit = {
def shouldStartReceiver: Boolean = {
// It's okay to start when trackerState is Initialized or Started
!(isTrackerStopping || isTrackerStopped)
@@ -546,13 +562,12 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
}
}
- // Create the RDD using the scheduledExecutors to run the receiver in a Spark job
+ // Create the RDD using the scheduledLocations to run the receiver in a Spark job
val receiverRDD: RDD[Receiver[_]] =
- if (scheduledExecutors.isEmpty) {
+ if (scheduledLocations.isEmpty) {
ssc.sc.makeRDD(Seq(receiver), 1)
} else {
- val preferredLocations =
- scheduledExecutors.map(hostPort => Utils.parseHostPort(hostPort)._1).distinct
+ val preferredLocations = scheduledLocations.map(_.toString).distinct
ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
}
receiverRDD.setName(s"Receiver $receiverId")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala
index 043ff4d0ff..ab0a84f052 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala
@@ -18,6 +18,7 @@
package org.apache.spark.streaming.scheduler
import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.scheduler.{ExecutorCacheTaskLocation, TaskLocation}
import org.apache.spark.streaming.scheduler.ReceiverState._
private[streaming] case class ReceiverErrorInfo(
@@ -28,7 +29,7 @@ private[streaming] case class ReceiverErrorInfo(
*
* @param receiverId the unique receiver id
* @param state the current Receiver state
- * @param scheduledExecutors the scheduled executors provided by ReceiverSchedulingPolicy
+ * @param scheduledLocations the scheduled locations provided by ReceiverSchedulingPolicy
* @param runningExecutor the running executor if the receiver is active
* @param name the receiver name
* @param endpoint the receiver endpoint. It can be used to send messages to the receiver
@@ -37,8 +38,8 @@ private[streaming] case class ReceiverErrorInfo(
private[streaming] case class ReceiverTrackingInfo(
receiverId: Int,
state: ReceiverState,
- scheduledExecutors: Option[Seq[String]],
- runningExecutor: Option[String],
+ scheduledLocations: Option[Seq[TaskLocation]],
+ runningExecutor: Option[ExecutorCacheTaskLocation],
name: Option[String] = None,
endpoint: Option[RpcEndpointRef] = None,
errorInfo: Option[ReceiverErrorInfo] = None) {
@@ -47,7 +48,7 @@ private[streaming] case class ReceiverTrackingInfo(
receiverId,
name.getOrElse(""),
state == ReceiverState.ACTIVE,
- location = runningExecutor.getOrElse(""),
+ location = runningExecutor.map(_.host).getOrElse(""),
lastErrorMessage = errorInfo.map(_.lastErrorMessage).getOrElse(""),
lastError = errorInfo.map(_.lastError).getOrElse(""),
lastErrorTime = errorInfo.map(_.lastErrorTime).getOrElse(-1L)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala
index b2a51d72ba..05b4e66c63 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala
@@ -20,73 +20,96 @@ package org.apache.spark.streaming.scheduler
import scala.collection.mutable
import org.apache.spark.SparkFunSuite
+import org.apache.spark.scheduler.{ExecutorCacheTaskLocation, HostTaskLocation, TaskLocation}
class ReceiverSchedulingPolicySuite extends SparkFunSuite {
val receiverSchedulingPolicy = new ReceiverSchedulingPolicy
test("rescheduleReceiver: empty executors") {
- val scheduledExecutors =
+ val scheduledLocations =
receiverSchedulingPolicy.rescheduleReceiver(0, None, Map.empty, executors = Seq.empty)
- assert(scheduledExecutors === Seq.empty)
+ assert(scheduledLocations === Seq.empty)
}
test("rescheduleReceiver: receiver preferredLocation") {
+ val executors = Seq(ExecutorCacheTaskLocation("host2", "2"))
val receiverTrackingInfoMap = Map(
0 -> ReceiverTrackingInfo(0, ReceiverState.INACTIVE, None, None))
- val scheduledExecutors = receiverSchedulingPolicy.rescheduleReceiver(
- 0, Some("host1"), receiverTrackingInfoMap, executors = Seq("host2"))
- assert(scheduledExecutors.toSet === Set("host1", "host2"))
+ val scheduledLocations = receiverSchedulingPolicy.rescheduleReceiver(
+ 0, Some("host1"), receiverTrackingInfoMap, executors)
+ assert(scheduledLocations.toSet === Set(HostTaskLocation("host1"), executors(0)))
}
test("rescheduleReceiver: return all idle executors if there are any idle executors") {
- val executors = Seq("host1", "host2", "host3", "host4", "host5")
- // host3 is idle
+ val executors = (1 to 5).map(i => ExecutorCacheTaskLocation(s"host$i", s"$i"))
+ // executor 1 is busy, others are idle.
val receiverTrackingInfoMap = Map(
- 0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None, Some("host1")))
- val scheduledExecutors = receiverSchedulingPolicy.rescheduleReceiver(
+ 0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None, Some(executors(0))))
+ val scheduledLocations = receiverSchedulingPolicy.rescheduleReceiver(
1, None, receiverTrackingInfoMap, executors)
- assert(scheduledExecutors.toSet === Set("host2", "host3", "host4", "host5"))
+ assert(scheduledLocations.toSet === executors.tail.toSet)
}
test("rescheduleReceiver: return all executors that have minimum weight if no idle executors") {
- val executors = Seq("host1", "host2", "host3", "host4", "host5")
+ val executors = Seq(
+ ExecutorCacheTaskLocation("host1", "1"),
+ ExecutorCacheTaskLocation("host2", "2"),
+ ExecutorCacheTaskLocation("host3", "3"),
+ ExecutorCacheTaskLocation("host4", "4"),
+ ExecutorCacheTaskLocation("host5", "5")
+ )
// Weights: host1 = 1.5, host2 = 0.5, host3 = 1.0, host4 = 0.5, host5 = 0.5
val receiverTrackingInfoMap = Map(
- 0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None, Some("host1")),
- 1 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED, Some(Seq("host2", "host3")), None),
- 2 -> ReceiverTrackingInfo(2, ReceiverState.SCHEDULED, Some(Seq("host1", "host3")), None),
- 3 -> ReceiverTrackingInfo(4, ReceiverState.SCHEDULED, Some(Seq("host4", "host5")), None))
- val scheduledExecutors = receiverSchedulingPolicy.rescheduleReceiver(
+ 0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None,
+ Some(ExecutorCacheTaskLocation("host1", "1"))),
+ 1 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED,
+ Some(Seq(ExecutorCacheTaskLocation("host2", "2"), ExecutorCacheTaskLocation("host3", "3"))),
+ None),
+ 2 -> ReceiverTrackingInfo(2, ReceiverState.SCHEDULED,
+ Some(Seq(ExecutorCacheTaskLocation("host1", "1"), ExecutorCacheTaskLocation("host3", "3"))),
+ None),
+ 3 -> ReceiverTrackingInfo(4, ReceiverState.SCHEDULED,
+ Some(Seq(ExecutorCacheTaskLocation("host4", "4"),
+ ExecutorCacheTaskLocation("host5", "5"))), None))
+ val scheduledLocations = receiverSchedulingPolicy.rescheduleReceiver(
4, None, receiverTrackingInfoMap, executors)
- assert(scheduledExecutors.toSet === Set("host2", "host4", "host5"))
+ val expectedScheduledLocations = Set(
+ ExecutorCacheTaskLocation("host2", "2"),
+ ExecutorCacheTaskLocation("host4", "4"),
+ ExecutorCacheTaskLocation("host5", "5")
+ )
+ assert(scheduledLocations.toSet === expectedScheduledLocations)
}
test("scheduleReceivers: " +
"schedule receivers evenly when there are more receivers than executors") {
val receivers = (0 until 6).map(new RateTestReceiver(_))
- val executors = (10000 until 10003).map(port => s"localhost:${port}")
- val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
- val numReceiversOnExecutor = mutable.HashMap[String, Int]()
+ val executors = (0 until 3).map(executorId =>
+ ExecutorCacheTaskLocation("localhost", executorId.toString))
+ val scheduledLocations = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
+ val numReceiversOnExecutor = mutable.HashMap[TaskLocation, Int]()
// There should be 2 receivers running on each executor and each receiver has one executor
- scheduledExecutors.foreach { case (receiverId, executors) =>
- assert(executors.size == 1)
- numReceiversOnExecutor(executors(0)) = numReceiversOnExecutor.getOrElse(executors(0), 0) + 1
+ scheduledLocations.foreach { case (receiverId, locations) =>
+ assert(locations.size == 1)
+ assert(locations(0).isInstanceOf[ExecutorCacheTaskLocation])
+ numReceiversOnExecutor(locations(0)) = numReceiversOnExecutor.getOrElse(locations(0), 0) + 1
}
assert(numReceiversOnExecutor === executors.map(_ -> 2).toMap)
}
-
test("scheduleReceivers: " +
"schedule receivers evenly when there are more executors than receivers") {
val receivers = (0 until 3).map(new RateTestReceiver(_))
- val executors = (10000 until 10006).map(port => s"localhost:${port}")
- val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
- val numReceiversOnExecutor = mutable.HashMap[String, Int]()
+ val executors = (0 until 6).map(executorId =>
+ ExecutorCacheTaskLocation("localhost", executorId.toString))
+ val scheduledLocations = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
+ val numReceiversOnExecutor = mutable.HashMap[TaskLocation, Int]()
// There should be 1 receiver running on each executor and each receiver has two executors
- scheduledExecutors.foreach { case (receiverId, executors) =>
- assert(executors.size == 2)
- executors.foreach { l =>
+ scheduledLocations.foreach { case (receiverId, locations) =>
+ assert(locations.size == 2)
+ locations.foreach { l =>
+ assert(l.isInstanceOf[ExecutorCacheTaskLocation])
numReceiversOnExecutor(l) = numReceiversOnExecutor.getOrElse(l, 0) + 1
}
}
@@ -96,34 +119,41 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
test("scheduleReceivers: schedule receivers evenly when the preferredLocations are even") {
val receivers = (0 until 3).map(new RateTestReceiver(_)) ++
(3 until 6).map(new RateTestReceiver(_, Some("localhost")))
- val executors = (10000 until 10003).map(port => s"localhost:${port}") ++
- (10003 until 10006).map(port => s"localhost2:${port}")
- val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
- val numReceiversOnExecutor = mutable.HashMap[String, Int]()
+ val executors = (0 until 3).map(executorId =>
+ ExecutorCacheTaskLocation("localhost", executorId.toString)) ++
+ (3 until 6).map(executorId =>
+ ExecutorCacheTaskLocation("localhost2", executorId.toString))
+ val scheduledLocations = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
+ val numReceiversOnExecutor = mutable.HashMap[TaskLocation, Int]()
// There should be 1 receiver running on each executor and each receiver has 1 executor
- scheduledExecutors.foreach { case (receiverId, executors) =>
+ scheduledLocations.foreach { case (receiverId, executors) =>
assert(executors.size == 1)
executors.foreach { l =>
+ assert(l.isInstanceOf[ExecutorCacheTaskLocation])
numReceiversOnExecutor(l) = numReceiversOnExecutor.getOrElse(l, 0) + 1
}
}
assert(numReceiversOnExecutor === executors.map(_ -> 1).toMap)
// Make sure we schedule the receivers to their preferredLocations
val executorsForReceiversWithPreferredLocation =
- scheduledExecutors.filter { case (receiverId, executors) => receiverId >= 3 }.flatMap(_._2)
+ scheduledLocations.filter { case (receiverId, executors) => receiverId >= 3 }.flatMap(_._2)
// We can simply check the executor set because we only know each receiver only has 1 executor
assert(executorsForReceiversWithPreferredLocation.toSet ===
- (10000 until 10003).map(port => s"localhost:${port}").toSet)
+ (0 until 3).map(executorId =>
+ ExecutorCacheTaskLocation("localhost", executorId.toString)
+ ).toSet)
}
test("scheduleReceivers: return empty if no receiver") {
- assert(receiverSchedulingPolicy.scheduleReceivers(Seq.empty, Seq("localhost:10000")).isEmpty)
+ val scheduledLocations = receiverSchedulingPolicy.
+ scheduleReceivers(Seq.empty, Seq(ExecutorCacheTaskLocation("localhost", "1")))
+ assert(scheduledLocations.isEmpty)
}
test("scheduleReceivers: return empty scheduled executors if no executors") {
val receivers = (0 until 3).map(new RateTestReceiver(_))
- val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, Seq.empty)
- scheduledExecutors.foreach { case (receiverId, executors) =>
+ val scheduledLocations = receiverSchedulingPolicy.scheduleReceivers(receivers, Seq.empty)
+ scheduledLocations.foreach { case (receiverId, executors) =>
assert(executors.isEmpty)
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
index fda86aef45..3bd8d086ab 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
@@ -99,8 +99,8 @@ class ReceiverTrackerSuite extends TestSuiteBase {
output.register()
ssc.start()
eventually(timeout(10 seconds), interval(10 millis)) {
- // If preferredLocations is set correctly, receiverTaskLocality should be NODE_LOCAL
- assert(receiverTaskLocality === TaskLocality.NODE_LOCAL)
+ // If preferredLocations is set correctly, receiverTaskLocality should be PROCESS_LOCAL
+ assert(receiverTaskLocality === TaskLocality.PROCESS_LOCAL)
}
}
}