aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala17
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala1
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala110
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala93
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala9
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala110
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala4
8 files changed, 217 insertions, 132 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala
index 1b65926f5c..1eb6c1614f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala
@@ -31,7 +31,9 @@ private[spark] sealed trait TaskLocation {
*/
private [spark]
case class ExecutorCacheTaskLocation(override val host: String, executorId: String)
- extends TaskLocation
+ extends TaskLocation {
+ override def toString: String = s"${TaskLocation.executorLocationTag}${host}_$executorId"
+}
/**
* A location on a host.
@@ -53,6 +55,9 @@ private[spark] object TaskLocation {
// confusion. See RFC 952 and RFC 1123 for information about the format of hostnames.
val inMemoryLocationTag = "hdfs_cache_"
+ // Identify locations of executors with this prefix.
+ val executorLocationTag = "executor_"
+
def apply(host: String, executorId: String): TaskLocation = {
new ExecutorCacheTaskLocation(host, executorId)
}
@@ -65,7 +70,15 @@ private[spark] object TaskLocation {
def apply(str: String): TaskLocation = {
val hstr = str.stripPrefix(inMemoryLocationTag)
if (hstr.equals(str)) {
- new HostTaskLocation(str)
+ if (str.startsWith(executorLocationTag)) {
+ val splits = str.split("_")
+ if (splits.length != 3) {
+ throw new IllegalArgumentException("Illegal executor location format: " + str)
+ }
+ new ExecutorCacheTaskLocation(splits(1), splits(2))
+ } else {
+ new HostTaskLocation(str)
+ }
} else {
new HDFSCacheTaskLocation(hstr)
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
index 695523cc8a..cd6bf723e7 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala
@@ -779,6 +779,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
test("Test TaskLocation for different host type.") {
assert(TaskLocation("host1") === HostTaskLocation("host1"))
assert(TaskLocation("hdfs_cache_host1") === HDFSCacheTaskLocation("host1"))
+ assert(TaskLocation("executor_host1_3") === ExecutorCacheTaskLocation("host1", "3"))
}
def createTaskResult(id: Int): DirectTaskResult[Int] = {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index 59ef58d232..167f56aa42 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -47,7 +47,8 @@ private[streaming] class ReceiverSupervisorImpl(
checkpointDirOption: Option[String]
) extends ReceiverSupervisor(receiver, env.conf) with Logging {
- private val hostPort = SparkEnv.get.blockManager.blockManagerId.hostPort
+ private val host = SparkEnv.get.blockManager.blockManagerId.host
+ private val executorId = SparkEnv.get.blockManager.blockManagerId.executorId
private val receivedBlockHandler: ReceivedBlockHandler = {
if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
@@ -179,7 +180,7 @@ private[streaming] class ReceiverSupervisorImpl(
override protected def onReceiverStart(): Boolean = {
val msg = RegisterReceiver(
- streamId, receiver.getClass.getSimpleName, hostPort, endpoint)
+ streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
trackerEndpoint.askWithRetry[Boolean](msg)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
index d2b0be7f4a..234bc8660d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
@@ -20,8 +20,8 @@ package org.apache.spark.streaming.scheduler
import scala.collection.Map
import scala.collection.mutable
+import org.apache.spark.scheduler.{ExecutorCacheTaskLocation, TaskLocation}
import org.apache.spark.streaming.receiver.Receiver
-import org.apache.spark.util.Utils
/**
* A class that tries to schedule receivers with evenly distributed. There are two phases for
@@ -29,23 +29,23 @@ import org.apache.spark.util.Utils
*
* - The first phase is global scheduling when ReceiverTracker is starting and we need to schedule
* all receivers at the same time. ReceiverTracker will call `scheduleReceivers` at this phase.
- * It will try to schedule receivers with evenly distributed. ReceiverTracker should update its
- * receiverTrackingInfoMap according to the results of `scheduleReceivers`.
- * `ReceiverTrackingInfo.scheduledExecutors` for each receiver will set to an executor list that
- * contains the scheduled locations. Then when a receiver is starting, it will send a register
- * request and `ReceiverTracker.registerReceiver` will be called. In
- * `ReceiverTracker.registerReceiver`, if a receiver's scheduled executors is set, it should check
- * if the location of this receiver is one of the scheduled executors, if not, the register will
+ * It will try to schedule receivers such that they are evenly distributed. ReceiverTracker should
+ * update its `receiverTrackingInfoMap` according to the results of `scheduleReceivers`.
+ * `ReceiverTrackingInfo.scheduledLocations` for each receiver should be set to an location list
+ * that contains the scheduled locations. Then when a receiver is starting, it will send a
+ * register request and `ReceiverTracker.registerReceiver` will be called. In
+ * `ReceiverTracker.registerReceiver`, if a receiver's scheduled locations is set, it should check
+ * if the location of this receiver is one of the scheduled locations, if not, the register will
* be rejected.
* - The second phase is local scheduling when a receiver is restarting. There are two cases of
* receiver restarting:
* - If a receiver is restarting because it's rejected due to the real location and the scheduled
- * executors mismatching, in other words, it fails to start in one of the locations that
+ * locations mismatching, in other words, it fails to start in one of the locations that
* `scheduleReceivers` suggested, `ReceiverTracker` should firstly choose the executors that are
- * still alive in the list of scheduled executors, then use them to launch the receiver job.
- * - If a receiver is restarting without a scheduled executors list, or the executors in the list
+ * still alive in the list of scheduled locations, then use them to launch the receiver job.
+ * - If a receiver is restarting without a scheduled locations list, or the executors in the list
* are dead, `ReceiverTracker` should call `rescheduleReceiver`. If so, `ReceiverTracker` should
- * not set `ReceiverTrackingInfo.scheduledExecutors` for this executor, instead, it should clear
+ * not set `ReceiverTrackingInfo.scheduledLocations` for this receiver, instead, it should clear
* it. Then when this receiver is registering, we can know this is a local scheduling, and
* `ReceiverTrackingInfo` should call `rescheduleReceiver` again to check if the launching
* location is matching.
@@ -69,9 +69,12 @@ private[streaming] class ReceiverSchedulingPolicy {
* </ol>
*
* This method is called when we start to launch receivers at the first time.
+ *
+ * @return a map for receivers and their scheduled locations
*/
def scheduleReceivers(
- receivers: Seq[Receiver[_]], executors: Seq[String]): Map[Int, Seq[String]] = {
+ receivers: Seq[Receiver[_]],
+ executors: Seq[ExecutorCacheTaskLocation]): Map[Int, Seq[TaskLocation]] = {
if (receivers.isEmpty) {
return Map.empty
}
@@ -80,16 +83,16 @@ private[streaming] class ReceiverSchedulingPolicy {
return receivers.map(_.streamId -> Seq.empty).toMap
}
- val hostToExecutors = executors.groupBy(executor => Utils.parseHostPort(executor)._1)
- val scheduledExecutors = Array.fill(receivers.length)(new mutable.ArrayBuffer[String])
- val numReceiversOnExecutor = mutable.HashMap[String, Int]()
+ val hostToExecutors = executors.groupBy(_.host)
+ val scheduledLocations = Array.fill(receivers.length)(new mutable.ArrayBuffer[TaskLocation])
+ val numReceiversOnExecutor = mutable.HashMap[ExecutorCacheTaskLocation, Int]()
// Set the initial value to 0
executors.foreach(e => numReceiversOnExecutor(e) = 0)
// Firstly, we need to respect "preferredLocation". So if a receiver has "preferredLocation",
// we need to make sure the "preferredLocation" is in the candidate scheduled executor list.
for (i <- 0 until receivers.length) {
- // Note: preferredLocation is host but executors are host:port
+ // Note: preferredLocation is host but executors are host_executorId
receivers(i).preferredLocation.foreach { host =>
hostToExecutors.get(host) match {
case Some(executorsOnHost) =>
@@ -97,7 +100,7 @@ private[streaming] class ReceiverSchedulingPolicy {
// this host
val leastScheduledExecutor =
executorsOnHost.minBy(executor => numReceiversOnExecutor(executor))
- scheduledExecutors(i) += leastScheduledExecutor
+ scheduledLocations(i) += leastScheduledExecutor
numReceiversOnExecutor(leastScheduledExecutor) =
numReceiversOnExecutor(leastScheduledExecutor) + 1
case None =>
@@ -106,17 +109,20 @@ private[streaming] class ReceiverSchedulingPolicy {
// 1. This executor is not up. But it may be up later.
// 2. This executor is dead, or it's not a host in the cluster.
// Currently, simply add host to the scheduled executors.
- scheduledExecutors(i) += host
+
+ // Note: host could be `HDFSCacheTaskLocation`, so use `TaskLocation.apply` to handle
+ // this case
+ scheduledLocations(i) += TaskLocation(host)
}
}
}
// For those receivers that don't have preferredLocation, make sure we assign at least one
// executor to them.
- for (scheduledExecutorsForOneReceiver <- scheduledExecutors.filter(_.isEmpty)) {
+ for (scheduledLocationsForOneReceiver <- scheduledLocations.filter(_.isEmpty)) {
// Select the executor that has the least receivers
val (leastScheduledExecutor, numReceivers) = numReceiversOnExecutor.minBy(_._2)
- scheduledExecutorsForOneReceiver += leastScheduledExecutor
+ scheduledLocationsForOneReceiver += leastScheduledExecutor
numReceiversOnExecutor(leastScheduledExecutor) = numReceivers + 1
}
@@ -124,22 +130,22 @@ private[streaming] class ReceiverSchedulingPolicy {
val idleExecutors = numReceiversOnExecutor.filter(_._2 == 0).map(_._1)
for (executor <- idleExecutors) {
// Assign an idle executor to the receiver that has least candidate executors.
- val leastScheduledExecutors = scheduledExecutors.minBy(_.size)
+ val leastScheduledExecutors = scheduledLocations.minBy(_.size)
leastScheduledExecutors += executor
}
- receivers.map(_.streamId).zip(scheduledExecutors).toMap
+ receivers.map(_.streamId).zip(scheduledLocations).toMap
}
/**
- * Return a list of candidate executors to run the receiver. If the list is empty, the caller can
+ * Return a list of candidate locations to run the receiver. If the list is empty, the caller can
* run this receiver in arbitrary executor.
*
* This method tries to balance executors' load. Here is the approach to schedule executors
* for a receiver.
* <ol>
* <li>
- * If preferredLocation is set, preferredLocation should be one of the candidate executors.
+ * If preferredLocation is set, preferredLocation should be one of the candidate locations.
* </li>
* <li>
* Every executor will be assigned to a weight according to the receivers running or
@@ -163,40 +169,58 @@ private[streaming] class ReceiverSchedulingPolicy {
receiverId: Int,
preferredLocation: Option[String],
receiverTrackingInfoMap: Map[Int, ReceiverTrackingInfo],
- executors: Seq[String]): Seq[String] = {
+ executors: Seq[ExecutorCacheTaskLocation]): Seq[TaskLocation] = {
if (executors.isEmpty) {
return Seq.empty
}
// Always try to schedule to the preferred locations
- val scheduledExecutors = mutable.Set[String]()
- scheduledExecutors ++= preferredLocation
-
- val executorWeights = receiverTrackingInfoMap.values.flatMap { receiverTrackingInfo =>
- receiverTrackingInfo.state match {
- case ReceiverState.INACTIVE => Nil
- case ReceiverState.SCHEDULED =>
- val scheduledExecutors = receiverTrackingInfo.scheduledExecutors.get
- // The probability that a scheduled receiver will run in an executor is
- // 1.0 / scheduledLocations.size
- scheduledExecutors.map(location => location -> (1.0 / scheduledExecutors.size))
- case ReceiverState.ACTIVE => Seq(receiverTrackingInfo.runningExecutor.get -> 1.0)
- }
- }.groupBy(_._1).mapValues(_.map(_._2).sum) // Sum weights for each executor
+ val scheduledLocations = mutable.Set[TaskLocation]()
+ // Note: preferredLocation could be `HDFSCacheTaskLocation`, so use `TaskLocation.apply` to
+ // handle this case
+ scheduledLocations ++= preferredLocation.map(TaskLocation(_))
+
+ val executorWeights: Map[ExecutorCacheTaskLocation, Double] = {
+ receiverTrackingInfoMap.values.flatMap(convertReceiverTrackingInfoToExecutorWeights)
+ .groupBy(_._1).mapValues(_.map(_._2).sum) // Sum weights for each executor
+ }
val idleExecutors = executors.toSet -- executorWeights.keys
if (idleExecutors.nonEmpty) {
- scheduledExecutors ++= idleExecutors
+ scheduledLocations ++= idleExecutors
} else {
// There is no idle executor. So select all executors that have the minimum weight.
val sortedExecutors = executorWeights.toSeq.sortBy(_._2)
if (sortedExecutors.nonEmpty) {
val minWeight = sortedExecutors(0)._2
- scheduledExecutors ++= sortedExecutors.takeWhile(_._2 == minWeight).map(_._1)
+ scheduledLocations ++= sortedExecutors.takeWhile(_._2 == minWeight).map(_._1)
} else {
// This should not happen since "executors" is not empty
}
}
- scheduledExecutors.toSeq
+ scheduledLocations.toSeq
+ }
+
+ /**
+ * This method tries to convert a receiver tracking info to executor weights. Every executor will
+ * be assigned to a weight according to the receivers running or scheduling on it:
+ *
+ * - If a receiver is running on an executor, it contributes 1.0 to the executor's weight.
+ * - If a receiver is scheduled to an executor but has not yet run, it contributes
+ * `1.0 / #candidate_executors_of_this_receiver` to the executor's weight.
+ */
+ private def convertReceiverTrackingInfoToExecutorWeights(
+ receiverTrackingInfo: ReceiverTrackingInfo): Seq[(ExecutorCacheTaskLocation, Double)] = {
+ receiverTrackingInfo.state match {
+ case ReceiverState.INACTIVE => Nil
+ case ReceiverState.SCHEDULED =>
+ val scheduledLocations = receiverTrackingInfo.scheduledLocations.get
+ // The probability that a scheduled receiver will run in an executor is
+ // 1.0 / scheduledLocations.size
+ scheduledLocations.filter(_.isInstanceOf[ExecutorCacheTaskLocation]).map { location =>
+ location.asInstanceOf[ExecutorCacheTaskLocation] -> (1.0 / scheduledLocations.size)
+ }
+ case ReceiverState.ACTIVE => Seq(receiverTrackingInfo.runningExecutor.get -> 1.0)
+ }
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 2ce80d618b..b183d856f5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -17,20 +17,21 @@
package org.apache.spark.streaming.scheduler
-import java.util.concurrent.{TimeUnit, CountDownLatch}
+import java.util.concurrent.{CountDownLatch, TimeUnit}
import scala.collection.mutable.HashMap
import scala.concurrent.ExecutionContext
import scala.language.existentials
import scala.util.{Failure, Success}
-import org.apache.spark.streaming.util.WriteAheadLogUtils
import org.apache.spark._
import org.apache.spark.rdd.RDD
import org.apache.spark.rpc._
+import org.apache.spark.scheduler.{TaskLocation, ExecutorCacheTaskLocation}
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.receiver._
-import org.apache.spark.util.{Utils, ThreadUtils, SerializableConfiguration}
+import org.apache.spark.streaming.util.WriteAheadLogUtils
+import org.apache.spark.util.{SerializableConfiguration, ThreadUtils, Utils}
/** Enumeration to identify current state of a Receiver */
@@ -47,7 +48,8 @@ private[streaming] sealed trait ReceiverTrackerMessage
private[streaming] case class RegisterReceiver(
streamId: Int,
typ: String,
- hostPort: String,
+ host: String,
+ executorId: String,
receiverEndpoint: RpcEndpointRef
) extends ReceiverTrackerMessage
private[streaming] case class AddBlock(receivedBlockInfo: ReceivedBlockInfo)
@@ -235,7 +237,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
private def registerReceiver(
streamId: Int,
typ: String,
- hostPort: String,
+ host: String,
+ executorId: String,
receiverEndpoint: RpcEndpointRef,
senderAddress: RpcAddress
): Boolean = {
@@ -247,18 +250,23 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
return false
}
- val scheduledExecutors = receiverTrackingInfos(streamId).scheduledExecutors
- val accetableExecutors = if (scheduledExecutors.nonEmpty) {
+ val scheduledLocations = receiverTrackingInfos(streamId).scheduledLocations
+ val acceptableExecutors = if (scheduledLocations.nonEmpty) {
// This receiver is registering and it's scheduled by
- // ReceiverSchedulingPolicy.scheduleReceivers. So use "scheduledExecutors" to check it.
- scheduledExecutors.get
+ // ReceiverSchedulingPolicy.scheduleReceivers. So use "scheduledLocations" to check it.
+ scheduledLocations.get
} else {
// This receiver is scheduled by "ReceiverSchedulingPolicy.rescheduleReceiver", so calling
// "ReceiverSchedulingPolicy.rescheduleReceiver" again to check it.
scheduleReceiver(streamId)
}
- if (!accetableExecutors.contains(hostPort)) {
+ def isAcceptable: Boolean = acceptableExecutors.exists {
+ case loc: ExecutorCacheTaskLocation => loc.executorId == executorId
+ case loc: TaskLocation => loc.host == host
+ }
+
+ if (!isAcceptable) {
// Refuse it since it's scheduled to a wrong executor
false
} else {
@@ -266,8 +274,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
val receiverTrackingInfo = ReceiverTrackingInfo(
streamId,
ReceiverState.ACTIVE,
- scheduledExecutors = None,
- runningExecutor = Some(hostPort),
+ scheduledLocations = None,
+ runningExecutor = Some(ExecutorCacheTaskLocation(host, executorId)),
name = Some(name),
endpoint = Some(receiverEndpoint))
receiverTrackingInfos.put(streamId, receiverTrackingInfo)
@@ -338,25 +346,25 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
logWarning(s"Error reported by receiver for stream $streamId: $messageWithError")
}
- private def scheduleReceiver(receiverId: Int): Seq[String] = {
+ private def scheduleReceiver(receiverId: Int): Seq[TaskLocation] = {
val preferredLocation = receiverPreferredLocations.getOrElse(receiverId, None)
- val scheduledExecutors = schedulingPolicy.rescheduleReceiver(
+ val scheduledLocations = schedulingPolicy.rescheduleReceiver(
receiverId, preferredLocation, receiverTrackingInfos, getExecutors)
- updateReceiverScheduledExecutors(receiverId, scheduledExecutors)
- scheduledExecutors
+ updateReceiverScheduledExecutors(receiverId, scheduledLocations)
+ scheduledLocations
}
private def updateReceiverScheduledExecutors(
- receiverId: Int, scheduledExecutors: Seq[String]): Unit = {
+ receiverId: Int, scheduledLocations: Seq[TaskLocation]): Unit = {
val newReceiverTrackingInfo = receiverTrackingInfos.get(receiverId) match {
case Some(oldInfo) =>
oldInfo.copy(state = ReceiverState.SCHEDULED,
- scheduledExecutors = Some(scheduledExecutors))
+ scheduledLocations = Some(scheduledLocations))
case None =>
ReceiverTrackingInfo(
receiverId,
ReceiverState.SCHEDULED,
- Some(scheduledExecutors),
+ Some(scheduledLocations),
runningExecutor = None)
}
receiverTrackingInfos.put(receiverId, newReceiverTrackingInfo)
@@ -370,13 +378,16 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
/**
* Get the list of executors excluding driver
*/
- private def getExecutors: Seq[String] = {
+ private def getExecutors: Seq[ExecutorCacheTaskLocation] = {
if (ssc.sc.isLocal) {
- Seq(ssc.sparkContext.env.blockManager.blockManagerId.hostPort)
+ val blockManagerId = ssc.sparkContext.env.blockManager.blockManagerId
+ Seq(ExecutorCacheTaskLocation(blockManagerId.host, blockManagerId.executorId))
} else {
ssc.sparkContext.env.blockManager.master.getMemoryStatus.filter { case (blockManagerId, _) =>
blockManagerId.executorId != SparkContext.DRIVER_IDENTIFIER // Ignore the driver location
- }.map { case (blockManagerId, _) => blockManagerId.hostPort }.toSeq
+ }.map { case (blockManagerId, _) =>
+ ExecutorCacheTaskLocation(blockManagerId.host, blockManagerId.executorId)
+ }.toSeq
}
}
@@ -431,9 +442,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
override def receive: PartialFunction[Any, Unit] = {
// Local messages
case StartAllReceivers(receivers) =>
- val scheduledExecutors = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
+ val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
for (receiver <- receivers) {
- val executors = scheduledExecutors(receiver.streamId)
+ val executors = scheduledLocations(receiver.streamId)
updateReceiverScheduledExecutors(receiver.streamId, executors)
receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
startReceiver(receiver, executors)
@@ -441,14 +452,14 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
case RestartReceiver(receiver) =>
// Old scheduled executors minus the ones that are not active any more
val oldScheduledExecutors = getStoredScheduledExecutors(receiver.streamId)
- val scheduledExecutors = if (oldScheduledExecutors.nonEmpty) {
+ val scheduledLocations = if (oldScheduledExecutors.nonEmpty) {
// Try global scheduling again
oldScheduledExecutors
} else {
val oldReceiverInfo = receiverTrackingInfos(receiver.streamId)
- // Clear "scheduledExecutors" to indicate we are going to do local scheduling
+ // Clear "scheduledLocations" to indicate we are going to do local scheduling
val newReceiverInfo = oldReceiverInfo.copy(
- state = ReceiverState.INACTIVE, scheduledExecutors = None)
+ state = ReceiverState.INACTIVE, scheduledLocations = None)
receiverTrackingInfos(receiver.streamId) = newReceiverInfo
schedulingPolicy.rescheduleReceiver(
receiver.streamId,
@@ -458,7 +469,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
}
// Assume there is one receiver restarting at one time, so we don't need to update
// receiverTrackingInfos
- startReceiver(receiver, scheduledExecutors)
+ startReceiver(receiver, scheduledLocations)
case c: CleanupOldBlocks =>
receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c))
case UpdateReceiverRateLimit(streamUID, newRate) =>
@@ -472,9 +483,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
// Remote messages
- case RegisterReceiver(streamId, typ, hostPort, receiverEndpoint) =>
+ case RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) =>
val successful =
- registerReceiver(streamId, typ, hostPort, receiverEndpoint, context.senderAddress)
+ registerReceiver(streamId, typ, host, executorId, receiverEndpoint, context.senderAddress)
context.reply(successful)
case AddBlock(receivedBlockInfo) =>
context.reply(addBlock(receivedBlockInfo))
@@ -493,13 +504,16 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
/**
* Return the stored scheduled executors that are still alive.
*/
- private def getStoredScheduledExecutors(receiverId: Int): Seq[String] = {
+ private def getStoredScheduledExecutors(receiverId: Int): Seq[TaskLocation] = {
if (receiverTrackingInfos.contains(receiverId)) {
- val scheduledExecutors = receiverTrackingInfos(receiverId).scheduledExecutors
- if (scheduledExecutors.nonEmpty) {
+ val scheduledLocations = receiverTrackingInfos(receiverId).scheduledLocations
+ if (scheduledLocations.nonEmpty) {
val executors = getExecutors.toSet
// Only return the alive executors
- scheduledExecutors.get.filter(executors)
+ scheduledLocations.get.filter {
+ case loc: ExecutorCacheTaskLocation => executors(loc)
+ case loc: TaskLocation => true
+ }
} else {
Nil
}
@@ -511,7 +525,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
/**
* Start a receiver along with its scheduled executors
*/
- private def startReceiver(receiver: Receiver[_], scheduledExecutors: Seq[String]): Unit = {
+ private def startReceiver(
+ receiver: Receiver[_],
+ scheduledLocations: Seq[TaskLocation]): Unit = {
def shouldStartReceiver: Boolean = {
// It's okay to start when trackerState is Initialized or Started
!(isTrackerStopping || isTrackerStopped)
@@ -546,13 +562,12 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
}
}
- // Create the RDD using the scheduledExecutors to run the receiver in a Spark job
+ // Create the RDD using the scheduledLocations to run the receiver in a Spark job
val receiverRDD: RDD[Receiver[_]] =
- if (scheduledExecutors.isEmpty) {
+ if (scheduledLocations.isEmpty) {
ssc.sc.makeRDD(Seq(receiver), 1)
} else {
- val preferredLocations =
- scheduledExecutors.map(hostPort => Utils.parseHostPort(hostPort)._1).distinct
+ val preferredLocations = scheduledLocations.map(_.toString).distinct
ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
}
receiverRDD.setName(s"Receiver $receiverId")
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala
index 043ff4d0ff..ab0a84f052 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala
@@ -18,6 +18,7 @@
package org.apache.spark.streaming.scheduler
import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.scheduler.{ExecutorCacheTaskLocation, TaskLocation}
import org.apache.spark.streaming.scheduler.ReceiverState._
private[streaming] case class ReceiverErrorInfo(
@@ -28,7 +29,7 @@ private[streaming] case class ReceiverErrorInfo(
*
* @param receiverId the unique receiver id
* @param state the current Receiver state
- * @param scheduledExecutors the scheduled executors provided by ReceiverSchedulingPolicy
+ * @param scheduledLocations the scheduled locations provided by ReceiverSchedulingPolicy
* @param runningExecutor the running executor if the receiver is active
* @param name the receiver name
* @param endpoint the receiver endpoint. It can be used to send messages to the receiver
@@ -37,8 +38,8 @@ private[streaming] case class ReceiverErrorInfo(
private[streaming] case class ReceiverTrackingInfo(
receiverId: Int,
state: ReceiverState,
- scheduledExecutors: Option[Seq[String]],
- runningExecutor: Option[String],
+ scheduledLocations: Option[Seq[TaskLocation]],
+ runningExecutor: Option[ExecutorCacheTaskLocation],
name: Option[String] = None,
endpoint: Option[RpcEndpointRef] = None,
errorInfo: Option[ReceiverErrorInfo] = None) {
@@ -47,7 +48,7 @@ private[streaming] case class ReceiverTrackingInfo(
receiverId,
name.getOrElse(""),
state == ReceiverState.ACTIVE,
- location = runningExecutor.getOrElse(""),
+ location = runningExecutor.map(_.host).getOrElse(""),
lastErrorMessage = errorInfo.map(_.lastErrorMessage).getOrElse(""),
lastError = errorInfo.map(_.lastError).getOrElse(""),
lastErrorTime = errorInfo.map(_.lastErrorTime).getOrElse(-1L)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala
index b2a51d72ba..05b4e66c63 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala
@@ -20,73 +20,96 @@ package org.apache.spark.streaming.scheduler
import scala.collection.mutable
import org.apache.spark.SparkFunSuite
+import org.apache.spark.scheduler.{ExecutorCacheTaskLocation, HostTaskLocation, TaskLocation}
class ReceiverSchedulingPolicySuite extends SparkFunSuite {
val receiverSchedulingPolicy = new ReceiverSchedulingPolicy
test("rescheduleReceiver: empty executors") {
- val scheduledExecutors =
+ val scheduledLocations =
receiverSchedulingPolicy.rescheduleReceiver(0, None, Map.empty, executors = Seq.empty)
- assert(scheduledExecutors === Seq.empty)
+ assert(scheduledLocations === Seq.empty)
}
test("rescheduleReceiver: receiver preferredLocation") {
+ val executors = Seq(ExecutorCacheTaskLocation("host2", "2"))
val receiverTrackingInfoMap = Map(
0 -> ReceiverTrackingInfo(0, ReceiverState.INACTIVE, None, None))
- val scheduledExecutors = receiverSchedulingPolicy.rescheduleReceiver(
- 0, Some("host1"), receiverTrackingInfoMap, executors = Seq("host2"))
- assert(scheduledExecutors.toSet === Set("host1", "host2"))
+ val scheduledLocations = receiverSchedulingPolicy.rescheduleReceiver(
+ 0, Some("host1"), receiverTrackingInfoMap, executors)
+ assert(scheduledLocations.toSet === Set(HostTaskLocation("host1"), executors(0)))
}
test("rescheduleReceiver: return all idle executors if there are any idle executors") {
- val executors = Seq("host1", "host2", "host3", "host4", "host5")
- // host3 is idle
+ val executors = (1 to 5).map(i => ExecutorCacheTaskLocation(s"host$i", s"$i"))
+ // executor 1 is busy, others are idle.
val receiverTrackingInfoMap = Map(
- 0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None, Some("host1")))
- val scheduledExecutors = receiverSchedulingPolicy.rescheduleReceiver(
+ 0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None, Some(executors(0))))
+ val scheduledLocations = receiverSchedulingPolicy.rescheduleReceiver(
1, None, receiverTrackingInfoMap, executors)
- assert(scheduledExecutors.toSet === Set("host2", "host3", "host4", "host5"))
+ assert(scheduledLocations.toSet === executors.tail.toSet)
}
test("rescheduleReceiver: return all executors that have minimum weight if no idle executors") {
- val executors = Seq("host1", "host2", "host3", "host4", "host5")
+ val executors = Seq(
+ ExecutorCacheTaskLocation("host1", "1"),
+ ExecutorCacheTaskLocation("host2", "2"),
+ ExecutorCacheTaskLocation("host3", "3"),
+ ExecutorCacheTaskLocation("host4", "4"),
+ ExecutorCacheTaskLocation("host5", "5")
+ )
// Weights: host1 = 1.5, host2 = 0.5, host3 = 1.0, host4 = 0.5, host5 = 0.5
val receiverTrackingInfoMap = Map(
- 0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None, Some("host1")),
- 1 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED, Some(Seq("host2", "host3")), None),
- 2 -> ReceiverTrackingInfo(2, ReceiverState.SCHEDULED, Some(Seq("host1", "host3")), None),
- 3 -> ReceiverTrackingInfo(4, ReceiverState.SCHEDULED, Some(Seq("host4", "host5")), None))
- val scheduledExecutors = receiverSchedulingPolicy.rescheduleReceiver(
+ 0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None,
+ Some(ExecutorCacheTaskLocation("host1", "1"))),
+ 1 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED,
+ Some(Seq(ExecutorCacheTaskLocation("host2", "2"), ExecutorCacheTaskLocation("host3", "3"))),
+ None),
+ 2 -> ReceiverTrackingInfo(2, ReceiverState.SCHEDULED,
+ Some(Seq(ExecutorCacheTaskLocation("host1", "1"), ExecutorCacheTaskLocation("host3", "3"))),
+ None),
+ 3 -> ReceiverTrackingInfo(4, ReceiverState.SCHEDULED,
+ Some(Seq(ExecutorCacheTaskLocation("host4", "4"),
+ ExecutorCacheTaskLocation("host5", "5"))), None))
+ val scheduledLocations = receiverSchedulingPolicy.rescheduleReceiver(
4, None, receiverTrackingInfoMap, executors)
- assert(scheduledExecutors.toSet === Set("host2", "host4", "host5"))
+ val expectedScheduledLocations = Set(
+ ExecutorCacheTaskLocation("host2", "2"),
+ ExecutorCacheTaskLocation("host4", "4"),
+ ExecutorCacheTaskLocation("host5", "5")
+ )
+ assert(scheduledLocations.toSet === expectedScheduledLocations)
}
test("scheduleReceivers: " +
"schedule receivers evenly when there are more receivers than executors") {
val receivers = (0 until 6).map(new RateTestReceiver(_))
- val executors = (10000 until 10003).map(port => s"localhost:${port}")
- val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
- val numReceiversOnExecutor = mutable.HashMap[String, Int]()
+ val executors = (0 until 3).map(executorId =>
+ ExecutorCacheTaskLocation("localhost", executorId.toString))
+ val scheduledLocations = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
+ val numReceiversOnExecutor = mutable.HashMap[TaskLocation, Int]()
// There should be 2 receivers running on each executor and each receiver has one executor
- scheduledExecutors.foreach { case (receiverId, executors) =>
- assert(executors.size == 1)
- numReceiversOnExecutor(executors(0)) = numReceiversOnExecutor.getOrElse(executors(0), 0) + 1
+ scheduledLocations.foreach { case (receiverId, locations) =>
+ assert(locations.size == 1)
+ assert(locations(0).isInstanceOf[ExecutorCacheTaskLocation])
+ numReceiversOnExecutor(locations(0)) = numReceiversOnExecutor.getOrElse(locations(0), 0) + 1
}
assert(numReceiversOnExecutor === executors.map(_ -> 2).toMap)
}
-
test("scheduleReceivers: " +
"schedule receivers evenly when there are more executors than receivers") {
val receivers = (0 until 3).map(new RateTestReceiver(_))
- val executors = (10000 until 10006).map(port => s"localhost:${port}")
- val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
- val numReceiversOnExecutor = mutable.HashMap[String, Int]()
+ val executors = (0 until 6).map(executorId =>
+ ExecutorCacheTaskLocation("localhost", executorId.toString))
+ val scheduledLocations = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
+ val numReceiversOnExecutor = mutable.HashMap[TaskLocation, Int]()
// There should be 1 receiver running on each executor and each receiver has two executors
- scheduledExecutors.foreach { case (receiverId, executors) =>
- assert(executors.size == 2)
- executors.foreach { l =>
+ scheduledLocations.foreach { case (receiverId, locations) =>
+ assert(locations.size == 2)
+ locations.foreach { l =>
+ assert(l.isInstanceOf[ExecutorCacheTaskLocation])
numReceiversOnExecutor(l) = numReceiversOnExecutor.getOrElse(l, 0) + 1
}
}
@@ -96,34 +119,41 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite {
test("scheduleReceivers: schedule receivers evenly when the preferredLocations are even") {
val receivers = (0 until 3).map(new RateTestReceiver(_)) ++
(3 until 6).map(new RateTestReceiver(_, Some("localhost")))
- val executors = (10000 until 10003).map(port => s"localhost:${port}") ++
- (10003 until 10006).map(port => s"localhost2:${port}")
- val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
- val numReceiversOnExecutor = mutable.HashMap[String, Int]()
+ val executors = (0 until 3).map(executorId =>
+ ExecutorCacheTaskLocation("localhost", executorId.toString)) ++
+ (3 until 6).map(executorId =>
+ ExecutorCacheTaskLocation("localhost2", executorId.toString))
+ val scheduledLocations = receiverSchedulingPolicy.scheduleReceivers(receivers, executors)
+ val numReceiversOnExecutor = mutable.HashMap[TaskLocation, Int]()
// There should be 1 receiver running on each executor and each receiver has 1 executor
- scheduledExecutors.foreach { case (receiverId, executors) =>
+ scheduledLocations.foreach { case (receiverId, executors) =>
assert(executors.size == 1)
executors.foreach { l =>
+ assert(l.isInstanceOf[ExecutorCacheTaskLocation])
numReceiversOnExecutor(l) = numReceiversOnExecutor.getOrElse(l, 0) + 1
}
}
assert(numReceiversOnExecutor === executors.map(_ -> 1).toMap)
// Make sure we schedule the receivers to their preferredLocations
val executorsForReceiversWithPreferredLocation =
- scheduledExecutors.filter { case (receiverId, executors) => receiverId >= 3 }.flatMap(_._2)
+ scheduledLocations.filter { case (receiverId, executors) => receiverId >= 3 }.flatMap(_._2)
// We can simply check the executor set because we only know each receiver only has 1 executor
assert(executorsForReceiversWithPreferredLocation.toSet ===
- (10000 until 10003).map(port => s"localhost:${port}").toSet)
+ (0 until 3).map(executorId =>
+ ExecutorCacheTaskLocation("localhost", executorId.toString)
+ ).toSet)
}
test("scheduleReceivers: return empty if no receiver") {
- assert(receiverSchedulingPolicy.scheduleReceivers(Seq.empty, Seq("localhost:10000")).isEmpty)
+ val scheduledLocations = receiverSchedulingPolicy.
+ scheduleReceivers(Seq.empty, Seq(ExecutorCacheTaskLocation("localhost", "1")))
+ assert(scheduledLocations.isEmpty)
}
test("scheduleReceivers: return empty scheduled executors if no executors") {
val receivers = (0 until 3).map(new RateTestReceiver(_))
- val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, Seq.empty)
- scheduledExecutors.foreach { case (receiverId, executors) =>
+ val scheduledLocations = receiverSchedulingPolicy.scheduleReceivers(receivers, Seq.empty)
+ scheduledLocations.foreach { case (receiverId, executors) =>
assert(executors.isEmpty)
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
index fda86aef45..3bd8d086ab 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
@@ -99,8 +99,8 @@ class ReceiverTrackerSuite extends TestSuiteBase {
output.register()
ssc.start()
eventually(timeout(10 seconds), interval(10 millis)) {
- // If preferredLocations is set correctly, receiverTaskLocality should be NODE_LOCAL
- assert(receiverTaskLocality === TaskLocality.NODE_LOCAL)
+ // If preferredLocations is set correctly, receiverTaskLocality should be PROCESS_LOCAL
+ assert(receiverTaskLocality === TaskLocality.PROCESS_LOCAL)
}
}
}