diff options
8 files changed, 217 insertions, 132 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala index 1b65926f5c..1eb6c1614f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskLocation.scala @@ -31,7 +31,9 @@ private[spark] sealed trait TaskLocation { */ private [spark] case class ExecutorCacheTaskLocation(override val host: String, executorId: String) - extends TaskLocation + extends TaskLocation { + override def toString: String = s"${TaskLocation.executorLocationTag}${host}_$executorId" +} /** * A location on a host. @@ -53,6 +55,9 @@ private[spark] object TaskLocation { // confusion. See RFC 952 and RFC 1123 for information about the format of hostnames. val inMemoryLocationTag = "hdfs_cache_" + // Identify locations of executors with this prefix. + val executorLocationTag = "executor_" + def apply(host: String, executorId: String): TaskLocation = { new ExecutorCacheTaskLocation(host, executorId) } @@ -65,7 +70,15 @@ private[spark] object TaskLocation { def apply(str: String): TaskLocation = { val hstr = str.stripPrefix(inMemoryLocationTag) if (hstr.equals(str)) { - new HostTaskLocation(str) + if (str.startsWith(executorLocationTag)) { + val splits = str.split("_") + if (splits.length != 3) { + throw new IllegalArgumentException("Illegal executor location format: " + str) + } + new ExecutorCacheTaskLocation(splits(1), splits(2)) + } else { + new HostTaskLocation(str) + } } else { new HDFSCacheTaskLocation(hstr) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index 695523cc8a..cd6bf723e7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -779,6 +779,7 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg test("Test TaskLocation for different host type.") { assert(TaskLocation("host1") === HostTaskLocation("host1")) assert(TaskLocation("hdfs_cache_host1") === HDFSCacheTaskLocation("host1")) + assert(TaskLocation("executor_host1_3") === ExecutorCacheTaskLocation("host1", "3")) } def createTaskResult(id: Int): DirectTaskResult[Int] = { diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala index 59ef58d232..167f56aa42 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala @@ -47,7 +47,8 @@ private[streaming] class ReceiverSupervisorImpl( checkpointDirOption: Option[String] ) extends ReceiverSupervisor(receiver, env.conf) with Logging { - private val hostPort = SparkEnv.get.blockManager.blockManagerId.hostPort + private val host = SparkEnv.get.blockManager.blockManagerId.host + private val executorId = SparkEnv.get.blockManager.blockManagerId.executorId private val receivedBlockHandler: ReceivedBlockHandler = { if (WriteAheadLogUtils.enableReceiverLog(env.conf)) { @@ -179,7 +180,7 @@ private[streaming] class ReceiverSupervisorImpl( override protected def onReceiverStart(): Boolean = { val msg = RegisterReceiver( - streamId, receiver.getClass.getSimpleName, hostPort, endpoint) + streamId, receiver.getClass.getSimpleName, host, executorId, endpoint) trackerEndpoint.askWithRetry[Boolean](msg) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala index d2b0be7f4a..234bc8660d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala @@ -20,8 +20,8 @@ package org.apache.spark.streaming.scheduler import scala.collection.Map import scala.collection.mutable +import org.apache.spark.scheduler.{ExecutorCacheTaskLocation, TaskLocation} import org.apache.spark.streaming.receiver.Receiver -import org.apache.spark.util.Utils /** * A class that tries to schedule receivers with evenly distributed. There are two phases for @@ -29,23 +29,23 @@ import org.apache.spark.util.Utils * * - The first phase is global scheduling when ReceiverTracker is starting and we need to schedule * all receivers at the same time. ReceiverTracker will call `scheduleReceivers` at this phase. - * It will try to schedule receivers with evenly distributed. ReceiverTracker should update its - * receiverTrackingInfoMap according to the results of `scheduleReceivers`. - * `ReceiverTrackingInfo.scheduledExecutors` for each receiver will set to an executor list that - * contains the scheduled locations. Then when a receiver is starting, it will send a register - * request and `ReceiverTracker.registerReceiver` will be called. In - * `ReceiverTracker.registerReceiver`, if a receiver's scheduled executors is set, it should check - * if the location of this receiver is one of the scheduled executors, if not, the register will + * It will try to schedule receivers such that they are evenly distributed. ReceiverTracker should + * update its `receiverTrackingInfoMap` according to the results of `scheduleReceivers`. + * `ReceiverTrackingInfo.scheduledLocations` for each receiver should be set to an location list + * that contains the scheduled locations. Then when a receiver is starting, it will send a + * register request and `ReceiverTracker.registerReceiver` will be called. In + * `ReceiverTracker.registerReceiver`, if a receiver's scheduled locations is set, it should check + * if the location of this receiver is one of the scheduled locations, if not, the register will * be rejected. * - The second phase is local scheduling when a receiver is restarting. There are two cases of * receiver restarting: * - If a receiver is restarting because it's rejected due to the real location and the scheduled - * executors mismatching, in other words, it fails to start in one of the locations that + * locations mismatching, in other words, it fails to start in one of the locations that * `scheduleReceivers` suggested, `ReceiverTracker` should firstly choose the executors that are - * still alive in the list of scheduled executors, then use them to launch the receiver job. - * - If a receiver is restarting without a scheduled executors list, or the executors in the list + * still alive in the list of scheduled locations, then use them to launch the receiver job. + * - If a receiver is restarting without a scheduled locations list, or the executors in the list * are dead, `ReceiverTracker` should call `rescheduleReceiver`. If so, `ReceiverTracker` should - * not set `ReceiverTrackingInfo.scheduledExecutors` for this executor, instead, it should clear + * not set `ReceiverTrackingInfo.scheduledLocations` for this receiver, instead, it should clear * it. Then when this receiver is registering, we can know this is a local scheduling, and * `ReceiverTrackingInfo` should call `rescheduleReceiver` again to check if the launching * location is matching. @@ -69,9 +69,12 @@ private[streaming] class ReceiverSchedulingPolicy { * </ol> * * This method is called when we start to launch receivers at the first time. + * + * @return a map for receivers and their scheduled locations */ def scheduleReceivers( - receivers: Seq[Receiver[_]], executors: Seq[String]): Map[Int, Seq[String]] = { + receivers: Seq[Receiver[_]], + executors: Seq[ExecutorCacheTaskLocation]): Map[Int, Seq[TaskLocation]] = { if (receivers.isEmpty) { return Map.empty } @@ -80,16 +83,16 @@ private[streaming] class ReceiverSchedulingPolicy { return receivers.map(_.streamId -> Seq.empty).toMap } - val hostToExecutors = executors.groupBy(executor => Utils.parseHostPort(executor)._1) - val scheduledExecutors = Array.fill(receivers.length)(new mutable.ArrayBuffer[String]) - val numReceiversOnExecutor = mutable.HashMap[String, Int]() + val hostToExecutors = executors.groupBy(_.host) + val scheduledLocations = Array.fill(receivers.length)(new mutable.ArrayBuffer[TaskLocation]) + val numReceiversOnExecutor = mutable.HashMap[ExecutorCacheTaskLocation, Int]() // Set the initial value to 0 executors.foreach(e => numReceiversOnExecutor(e) = 0) // Firstly, we need to respect "preferredLocation". So if a receiver has "preferredLocation", // we need to make sure the "preferredLocation" is in the candidate scheduled executor list. for (i <- 0 until receivers.length) { - // Note: preferredLocation is host but executors are host:port + // Note: preferredLocation is host but executors are host_executorId receivers(i).preferredLocation.foreach { host => hostToExecutors.get(host) match { case Some(executorsOnHost) => @@ -97,7 +100,7 @@ private[streaming] class ReceiverSchedulingPolicy { // this host val leastScheduledExecutor = executorsOnHost.minBy(executor => numReceiversOnExecutor(executor)) - scheduledExecutors(i) += leastScheduledExecutor + scheduledLocations(i) += leastScheduledExecutor numReceiversOnExecutor(leastScheduledExecutor) = numReceiversOnExecutor(leastScheduledExecutor) + 1 case None => @@ -106,17 +109,20 @@ private[streaming] class ReceiverSchedulingPolicy { // 1. This executor is not up. But it may be up later. // 2. This executor is dead, or it's not a host in the cluster. // Currently, simply add host to the scheduled executors. - scheduledExecutors(i) += host + + // Note: host could be `HDFSCacheTaskLocation`, so use `TaskLocation.apply` to handle + // this case + scheduledLocations(i) += TaskLocation(host) } } } // For those receivers that don't have preferredLocation, make sure we assign at least one // executor to them. - for (scheduledExecutorsForOneReceiver <- scheduledExecutors.filter(_.isEmpty)) { + for (scheduledLocationsForOneReceiver <- scheduledLocations.filter(_.isEmpty)) { // Select the executor that has the least receivers val (leastScheduledExecutor, numReceivers) = numReceiversOnExecutor.minBy(_._2) - scheduledExecutorsForOneReceiver += leastScheduledExecutor + scheduledLocationsForOneReceiver += leastScheduledExecutor numReceiversOnExecutor(leastScheduledExecutor) = numReceivers + 1 } @@ -124,22 +130,22 @@ private[streaming] class ReceiverSchedulingPolicy { val idleExecutors = numReceiversOnExecutor.filter(_._2 == 0).map(_._1) for (executor <- idleExecutors) { // Assign an idle executor to the receiver that has least candidate executors. - val leastScheduledExecutors = scheduledExecutors.minBy(_.size) + val leastScheduledExecutors = scheduledLocations.minBy(_.size) leastScheduledExecutors += executor } - receivers.map(_.streamId).zip(scheduledExecutors).toMap + receivers.map(_.streamId).zip(scheduledLocations).toMap } /** - * Return a list of candidate executors to run the receiver. If the list is empty, the caller can + * Return a list of candidate locations to run the receiver. If the list is empty, the caller can * run this receiver in arbitrary executor. * * This method tries to balance executors' load. Here is the approach to schedule executors * for a receiver. * <ol> * <li> - * If preferredLocation is set, preferredLocation should be one of the candidate executors. + * If preferredLocation is set, preferredLocation should be one of the candidate locations. * </li> * <li> * Every executor will be assigned to a weight according to the receivers running or @@ -163,40 +169,58 @@ private[streaming] class ReceiverSchedulingPolicy { receiverId: Int, preferredLocation: Option[String], receiverTrackingInfoMap: Map[Int, ReceiverTrackingInfo], - executors: Seq[String]): Seq[String] = { + executors: Seq[ExecutorCacheTaskLocation]): Seq[TaskLocation] = { if (executors.isEmpty) { return Seq.empty } // Always try to schedule to the preferred locations - val scheduledExecutors = mutable.Set[String]() - scheduledExecutors ++= preferredLocation - - val executorWeights = receiverTrackingInfoMap.values.flatMap { receiverTrackingInfo => - receiverTrackingInfo.state match { - case ReceiverState.INACTIVE => Nil - case ReceiverState.SCHEDULED => - val scheduledExecutors = receiverTrackingInfo.scheduledExecutors.get - // The probability that a scheduled receiver will run in an executor is - // 1.0 / scheduledLocations.size - scheduledExecutors.map(location => location -> (1.0 / scheduledExecutors.size)) - case ReceiverState.ACTIVE => Seq(receiverTrackingInfo.runningExecutor.get -> 1.0) - } - }.groupBy(_._1).mapValues(_.map(_._2).sum) // Sum weights for each executor + val scheduledLocations = mutable.Set[TaskLocation]() + // Note: preferredLocation could be `HDFSCacheTaskLocation`, so use `TaskLocation.apply` to + // handle this case + scheduledLocations ++= preferredLocation.map(TaskLocation(_)) + + val executorWeights: Map[ExecutorCacheTaskLocation, Double] = { + receiverTrackingInfoMap.values.flatMap(convertReceiverTrackingInfoToExecutorWeights) + .groupBy(_._1).mapValues(_.map(_._2).sum) // Sum weights for each executor + } val idleExecutors = executors.toSet -- executorWeights.keys if (idleExecutors.nonEmpty) { - scheduledExecutors ++= idleExecutors + scheduledLocations ++= idleExecutors } else { // There is no idle executor. So select all executors that have the minimum weight. val sortedExecutors = executorWeights.toSeq.sortBy(_._2) if (sortedExecutors.nonEmpty) { val minWeight = sortedExecutors(0)._2 - scheduledExecutors ++= sortedExecutors.takeWhile(_._2 == minWeight).map(_._1) + scheduledLocations ++= sortedExecutors.takeWhile(_._2 == minWeight).map(_._1) } else { // This should not happen since "executors" is not empty } } - scheduledExecutors.toSeq + scheduledLocations.toSeq + } + + /** + * This method tries to convert a receiver tracking info to executor weights. Every executor will + * be assigned to a weight according to the receivers running or scheduling on it: + * + * - If a receiver is running on an executor, it contributes 1.0 to the executor's weight. + * - If a receiver is scheduled to an executor but has not yet run, it contributes + * `1.0 / #candidate_executors_of_this_receiver` to the executor's weight. + */ + private def convertReceiverTrackingInfoToExecutorWeights( + receiverTrackingInfo: ReceiverTrackingInfo): Seq[(ExecutorCacheTaskLocation, Double)] = { + receiverTrackingInfo.state match { + case ReceiverState.INACTIVE => Nil + case ReceiverState.SCHEDULED => + val scheduledLocations = receiverTrackingInfo.scheduledLocations.get + // The probability that a scheduled receiver will run in an executor is + // 1.0 / scheduledLocations.size + scheduledLocations.filter(_.isInstanceOf[ExecutorCacheTaskLocation]).map { location => + location.asInstanceOf[ExecutorCacheTaskLocation] -> (1.0 / scheduledLocations.size) + } + case ReceiverState.ACTIVE => Seq(receiverTrackingInfo.runningExecutor.get -> 1.0) + } } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 2ce80d618b..b183d856f5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -17,20 +17,21 @@ package org.apache.spark.streaming.scheduler -import java.util.concurrent.{TimeUnit, CountDownLatch} +import java.util.concurrent.{CountDownLatch, TimeUnit} import scala.collection.mutable.HashMap import scala.concurrent.ExecutionContext import scala.language.existentials import scala.util.{Failure, Success} -import org.apache.spark.streaming.util.WriteAheadLogUtils import org.apache.spark._ import org.apache.spark.rdd.RDD import org.apache.spark.rpc._ +import org.apache.spark.scheduler.{TaskLocation, ExecutorCacheTaskLocation} import org.apache.spark.streaming.{StreamingContext, Time} import org.apache.spark.streaming.receiver._ -import org.apache.spark.util.{Utils, ThreadUtils, SerializableConfiguration} +import org.apache.spark.streaming.util.WriteAheadLogUtils +import org.apache.spark.util.{SerializableConfiguration, ThreadUtils, Utils} /** Enumeration to identify current state of a Receiver */ @@ -47,7 +48,8 @@ private[streaming] sealed trait ReceiverTrackerMessage private[streaming] case class RegisterReceiver( streamId: Int, typ: String, - hostPort: String, + host: String, + executorId: String, receiverEndpoint: RpcEndpointRef ) extends ReceiverTrackerMessage private[streaming] case class AddBlock(receivedBlockInfo: ReceivedBlockInfo) @@ -235,7 +237,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false private def registerReceiver( streamId: Int, typ: String, - hostPort: String, + host: String, + executorId: String, receiverEndpoint: RpcEndpointRef, senderAddress: RpcAddress ): Boolean = { @@ -247,18 +250,23 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false return false } - val scheduledExecutors = receiverTrackingInfos(streamId).scheduledExecutors - val accetableExecutors = if (scheduledExecutors.nonEmpty) { + val scheduledLocations = receiverTrackingInfos(streamId).scheduledLocations + val acceptableExecutors = if (scheduledLocations.nonEmpty) { // This receiver is registering and it's scheduled by - // ReceiverSchedulingPolicy.scheduleReceivers. So use "scheduledExecutors" to check it. - scheduledExecutors.get + // ReceiverSchedulingPolicy.scheduleReceivers. So use "scheduledLocations" to check it. + scheduledLocations.get } else { // This receiver is scheduled by "ReceiverSchedulingPolicy.rescheduleReceiver", so calling // "ReceiverSchedulingPolicy.rescheduleReceiver" again to check it. scheduleReceiver(streamId) } - if (!accetableExecutors.contains(hostPort)) { + def isAcceptable: Boolean = acceptableExecutors.exists { + case loc: ExecutorCacheTaskLocation => loc.executorId == executorId + case loc: TaskLocation => loc.host == host + } + + if (!isAcceptable) { // Refuse it since it's scheduled to a wrong executor false } else { @@ -266,8 +274,8 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false val receiverTrackingInfo = ReceiverTrackingInfo( streamId, ReceiverState.ACTIVE, - scheduledExecutors = None, - runningExecutor = Some(hostPort), + scheduledLocations = None, + runningExecutor = Some(ExecutorCacheTaskLocation(host, executorId)), name = Some(name), endpoint = Some(receiverEndpoint)) receiverTrackingInfos.put(streamId, receiverTrackingInfo) @@ -338,25 +346,25 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false logWarning(s"Error reported by receiver for stream $streamId: $messageWithError") } - private def scheduleReceiver(receiverId: Int): Seq[String] = { + private def scheduleReceiver(receiverId: Int): Seq[TaskLocation] = { val preferredLocation = receiverPreferredLocations.getOrElse(receiverId, None) - val scheduledExecutors = schedulingPolicy.rescheduleReceiver( + val scheduledLocations = schedulingPolicy.rescheduleReceiver( receiverId, preferredLocation, receiverTrackingInfos, getExecutors) - updateReceiverScheduledExecutors(receiverId, scheduledExecutors) - scheduledExecutors + updateReceiverScheduledExecutors(receiverId, scheduledLocations) + scheduledLocations } private def updateReceiverScheduledExecutors( - receiverId: Int, scheduledExecutors: Seq[String]): Unit = { + receiverId: Int, scheduledLocations: Seq[TaskLocation]): Unit = { val newReceiverTrackingInfo = receiverTrackingInfos.get(receiverId) match { case Some(oldInfo) => oldInfo.copy(state = ReceiverState.SCHEDULED, - scheduledExecutors = Some(scheduledExecutors)) + scheduledLocations = Some(scheduledLocations)) case None => ReceiverTrackingInfo( receiverId, ReceiverState.SCHEDULED, - Some(scheduledExecutors), + Some(scheduledLocations), runningExecutor = None) } receiverTrackingInfos.put(receiverId, newReceiverTrackingInfo) @@ -370,13 +378,16 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false /** * Get the list of executors excluding driver */ - private def getExecutors: Seq[String] = { + private def getExecutors: Seq[ExecutorCacheTaskLocation] = { if (ssc.sc.isLocal) { - Seq(ssc.sparkContext.env.blockManager.blockManagerId.hostPort) + val blockManagerId = ssc.sparkContext.env.blockManager.blockManagerId + Seq(ExecutorCacheTaskLocation(blockManagerId.host, blockManagerId.executorId)) } else { ssc.sparkContext.env.blockManager.master.getMemoryStatus.filter { case (blockManagerId, _) => blockManagerId.executorId != SparkContext.DRIVER_IDENTIFIER // Ignore the driver location - }.map { case (blockManagerId, _) => blockManagerId.hostPort }.toSeq + }.map { case (blockManagerId, _) => + ExecutorCacheTaskLocation(blockManagerId.host, blockManagerId.executorId) + }.toSeq } } @@ -431,9 +442,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false override def receive: PartialFunction[Any, Unit] = { // Local messages case StartAllReceivers(receivers) => - val scheduledExecutors = schedulingPolicy.scheduleReceivers(receivers, getExecutors) + val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors) for (receiver <- receivers) { - val executors = scheduledExecutors(receiver.streamId) + val executors = scheduledLocations(receiver.streamId) updateReceiverScheduledExecutors(receiver.streamId, executors) receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation startReceiver(receiver, executors) @@ -441,14 +452,14 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false case RestartReceiver(receiver) => // Old scheduled executors minus the ones that are not active any more val oldScheduledExecutors = getStoredScheduledExecutors(receiver.streamId) - val scheduledExecutors = if (oldScheduledExecutors.nonEmpty) { + val scheduledLocations = if (oldScheduledExecutors.nonEmpty) { // Try global scheduling again oldScheduledExecutors } else { val oldReceiverInfo = receiverTrackingInfos(receiver.streamId) - // Clear "scheduledExecutors" to indicate we are going to do local scheduling + // Clear "scheduledLocations" to indicate we are going to do local scheduling val newReceiverInfo = oldReceiverInfo.copy( - state = ReceiverState.INACTIVE, scheduledExecutors = None) + state = ReceiverState.INACTIVE, scheduledLocations = None) receiverTrackingInfos(receiver.streamId) = newReceiverInfo schedulingPolicy.rescheduleReceiver( receiver.streamId, @@ -458,7 +469,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false } // Assume there is one receiver restarting at one time, so we don't need to update // receiverTrackingInfos - startReceiver(receiver, scheduledExecutors) + startReceiver(receiver, scheduledLocations) case c: CleanupOldBlocks => receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c)) case UpdateReceiverRateLimit(streamUID, newRate) => @@ -472,9 +483,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { // Remote messages - case RegisterReceiver(streamId, typ, hostPort, receiverEndpoint) => + case RegisterReceiver(streamId, typ, host, executorId, receiverEndpoint) => val successful = - registerReceiver(streamId, typ, hostPort, receiverEndpoint, context.senderAddress) + registerReceiver(streamId, typ, host, executorId, receiverEndpoint, context.senderAddress) context.reply(successful) case AddBlock(receivedBlockInfo) => context.reply(addBlock(receivedBlockInfo)) @@ -493,13 +504,16 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false /** * Return the stored scheduled executors that are still alive. */ - private def getStoredScheduledExecutors(receiverId: Int): Seq[String] = { + private def getStoredScheduledExecutors(receiverId: Int): Seq[TaskLocation] = { if (receiverTrackingInfos.contains(receiverId)) { - val scheduledExecutors = receiverTrackingInfos(receiverId).scheduledExecutors - if (scheduledExecutors.nonEmpty) { + val scheduledLocations = receiverTrackingInfos(receiverId).scheduledLocations + if (scheduledLocations.nonEmpty) { val executors = getExecutors.toSet // Only return the alive executors - scheduledExecutors.get.filter(executors) + scheduledLocations.get.filter { + case loc: ExecutorCacheTaskLocation => executors(loc) + case loc: TaskLocation => true + } } else { Nil } @@ -511,7 +525,9 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false /** * Start a receiver along with its scheduled executors */ - private def startReceiver(receiver: Receiver[_], scheduledExecutors: Seq[String]): Unit = { + private def startReceiver( + receiver: Receiver[_], + scheduledLocations: Seq[TaskLocation]): Unit = { def shouldStartReceiver: Boolean = { // It's okay to start when trackerState is Initialized or Started !(isTrackerStopping || isTrackerStopped) @@ -546,13 +562,12 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false } } - // Create the RDD using the scheduledExecutors to run the receiver in a Spark job + // Create the RDD using the scheduledLocations to run the receiver in a Spark job val receiverRDD: RDD[Receiver[_]] = - if (scheduledExecutors.isEmpty) { + if (scheduledLocations.isEmpty) { ssc.sc.makeRDD(Seq(receiver), 1) } else { - val preferredLocations = - scheduledExecutors.map(hostPort => Utils.parseHostPort(hostPort)._1).distinct + val preferredLocations = scheduledLocations.map(_.toString).distinct ssc.sc.makeRDD(Seq(receiver -> preferredLocations)) } receiverRDD.setName(s"Receiver $receiverId") diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala index 043ff4d0ff..ab0a84f052 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTrackingInfo.scala @@ -18,6 +18,7 @@ package org.apache.spark.streaming.scheduler import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.{ExecutorCacheTaskLocation, TaskLocation} import org.apache.spark.streaming.scheduler.ReceiverState._ private[streaming] case class ReceiverErrorInfo( @@ -28,7 +29,7 @@ private[streaming] case class ReceiverErrorInfo( * * @param receiverId the unique receiver id * @param state the current Receiver state - * @param scheduledExecutors the scheduled executors provided by ReceiverSchedulingPolicy + * @param scheduledLocations the scheduled locations provided by ReceiverSchedulingPolicy * @param runningExecutor the running executor if the receiver is active * @param name the receiver name * @param endpoint the receiver endpoint. It can be used to send messages to the receiver @@ -37,8 +38,8 @@ private[streaming] case class ReceiverErrorInfo( private[streaming] case class ReceiverTrackingInfo( receiverId: Int, state: ReceiverState, - scheduledExecutors: Option[Seq[String]], - runningExecutor: Option[String], + scheduledLocations: Option[Seq[TaskLocation]], + runningExecutor: Option[ExecutorCacheTaskLocation], name: Option[String] = None, endpoint: Option[RpcEndpointRef] = None, errorInfo: Option[ReceiverErrorInfo] = None) { @@ -47,7 +48,7 @@ private[streaming] case class ReceiverTrackingInfo( receiverId, name.getOrElse(""), state == ReceiverState.ACTIVE, - location = runningExecutor.getOrElse(""), + location = runningExecutor.map(_.host).getOrElse(""), lastErrorMessage = errorInfo.map(_.lastErrorMessage).getOrElse(""), lastError = errorInfo.map(_.lastError).getOrElse(""), lastErrorTime = errorInfo.map(_.lastErrorTime).getOrElse(-1L) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala index b2a51d72ba..05b4e66c63 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicySuite.scala @@ -20,73 +20,96 @@ package org.apache.spark.streaming.scheduler import scala.collection.mutable import org.apache.spark.SparkFunSuite +import org.apache.spark.scheduler.{ExecutorCacheTaskLocation, HostTaskLocation, TaskLocation} class ReceiverSchedulingPolicySuite extends SparkFunSuite { val receiverSchedulingPolicy = new ReceiverSchedulingPolicy test("rescheduleReceiver: empty executors") { - val scheduledExecutors = + val scheduledLocations = receiverSchedulingPolicy.rescheduleReceiver(0, None, Map.empty, executors = Seq.empty) - assert(scheduledExecutors === Seq.empty) + assert(scheduledLocations === Seq.empty) } test("rescheduleReceiver: receiver preferredLocation") { + val executors = Seq(ExecutorCacheTaskLocation("host2", "2")) val receiverTrackingInfoMap = Map( 0 -> ReceiverTrackingInfo(0, ReceiverState.INACTIVE, None, None)) - val scheduledExecutors = receiverSchedulingPolicy.rescheduleReceiver( - 0, Some("host1"), receiverTrackingInfoMap, executors = Seq("host2")) - assert(scheduledExecutors.toSet === Set("host1", "host2")) + val scheduledLocations = receiverSchedulingPolicy.rescheduleReceiver( + 0, Some("host1"), receiverTrackingInfoMap, executors) + assert(scheduledLocations.toSet === Set(HostTaskLocation("host1"), executors(0))) } test("rescheduleReceiver: return all idle executors if there are any idle executors") { - val executors = Seq("host1", "host2", "host3", "host4", "host5") - // host3 is idle + val executors = (1 to 5).map(i => ExecutorCacheTaskLocation(s"host$i", s"$i")) + // executor 1 is busy, others are idle. val receiverTrackingInfoMap = Map( - 0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None, Some("host1"))) - val scheduledExecutors = receiverSchedulingPolicy.rescheduleReceiver( + 0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None, Some(executors(0)))) + val scheduledLocations = receiverSchedulingPolicy.rescheduleReceiver( 1, None, receiverTrackingInfoMap, executors) - assert(scheduledExecutors.toSet === Set("host2", "host3", "host4", "host5")) + assert(scheduledLocations.toSet === executors.tail.toSet) } test("rescheduleReceiver: return all executors that have minimum weight if no idle executors") { - val executors = Seq("host1", "host2", "host3", "host4", "host5") + val executors = Seq( + ExecutorCacheTaskLocation("host1", "1"), + ExecutorCacheTaskLocation("host2", "2"), + ExecutorCacheTaskLocation("host3", "3"), + ExecutorCacheTaskLocation("host4", "4"), + ExecutorCacheTaskLocation("host5", "5") + ) // Weights: host1 = 1.5, host2 = 0.5, host3 = 1.0, host4 = 0.5, host5 = 0.5 val receiverTrackingInfoMap = Map( - 0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None, Some("host1")), - 1 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED, Some(Seq("host2", "host3")), None), - 2 -> ReceiverTrackingInfo(2, ReceiverState.SCHEDULED, Some(Seq("host1", "host3")), None), - 3 -> ReceiverTrackingInfo(4, ReceiverState.SCHEDULED, Some(Seq("host4", "host5")), None)) - val scheduledExecutors = receiverSchedulingPolicy.rescheduleReceiver( + 0 -> ReceiverTrackingInfo(0, ReceiverState.ACTIVE, None, + Some(ExecutorCacheTaskLocation("host1", "1"))), + 1 -> ReceiverTrackingInfo(1, ReceiverState.SCHEDULED, + Some(Seq(ExecutorCacheTaskLocation("host2", "2"), ExecutorCacheTaskLocation("host3", "3"))), + None), + 2 -> ReceiverTrackingInfo(2, ReceiverState.SCHEDULED, + Some(Seq(ExecutorCacheTaskLocation("host1", "1"), ExecutorCacheTaskLocation("host3", "3"))), + None), + 3 -> ReceiverTrackingInfo(4, ReceiverState.SCHEDULED, + Some(Seq(ExecutorCacheTaskLocation("host4", "4"), + ExecutorCacheTaskLocation("host5", "5"))), None)) + val scheduledLocations = receiverSchedulingPolicy.rescheduleReceiver( 4, None, receiverTrackingInfoMap, executors) - assert(scheduledExecutors.toSet === Set("host2", "host4", "host5")) + val expectedScheduledLocations = Set( + ExecutorCacheTaskLocation("host2", "2"), + ExecutorCacheTaskLocation("host4", "4"), + ExecutorCacheTaskLocation("host5", "5") + ) + assert(scheduledLocations.toSet === expectedScheduledLocations) } test("scheduleReceivers: " + "schedule receivers evenly when there are more receivers than executors") { val receivers = (0 until 6).map(new RateTestReceiver(_)) - val executors = (10000 until 10003).map(port => s"localhost:${port}") - val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, executors) - val numReceiversOnExecutor = mutable.HashMap[String, Int]() + val executors = (0 until 3).map(executorId => + ExecutorCacheTaskLocation("localhost", executorId.toString)) + val scheduledLocations = receiverSchedulingPolicy.scheduleReceivers(receivers, executors) + val numReceiversOnExecutor = mutable.HashMap[TaskLocation, Int]() // There should be 2 receivers running on each executor and each receiver has one executor - scheduledExecutors.foreach { case (receiverId, executors) => - assert(executors.size == 1) - numReceiversOnExecutor(executors(0)) = numReceiversOnExecutor.getOrElse(executors(0), 0) + 1 + scheduledLocations.foreach { case (receiverId, locations) => + assert(locations.size == 1) + assert(locations(0).isInstanceOf[ExecutorCacheTaskLocation]) + numReceiversOnExecutor(locations(0)) = numReceiversOnExecutor.getOrElse(locations(0), 0) + 1 } assert(numReceiversOnExecutor === executors.map(_ -> 2).toMap) } - test("scheduleReceivers: " + "schedule receivers evenly when there are more executors than receivers") { val receivers = (0 until 3).map(new RateTestReceiver(_)) - val executors = (10000 until 10006).map(port => s"localhost:${port}") - val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, executors) - val numReceiversOnExecutor = mutable.HashMap[String, Int]() + val executors = (0 until 6).map(executorId => + ExecutorCacheTaskLocation("localhost", executorId.toString)) + val scheduledLocations = receiverSchedulingPolicy.scheduleReceivers(receivers, executors) + val numReceiversOnExecutor = mutable.HashMap[TaskLocation, Int]() // There should be 1 receiver running on each executor and each receiver has two executors - scheduledExecutors.foreach { case (receiverId, executors) => - assert(executors.size == 2) - executors.foreach { l => + scheduledLocations.foreach { case (receiverId, locations) => + assert(locations.size == 2) + locations.foreach { l => + assert(l.isInstanceOf[ExecutorCacheTaskLocation]) numReceiversOnExecutor(l) = numReceiversOnExecutor.getOrElse(l, 0) + 1 } } @@ -96,34 +119,41 @@ class ReceiverSchedulingPolicySuite extends SparkFunSuite { test("scheduleReceivers: schedule receivers evenly when the preferredLocations are even") { val receivers = (0 until 3).map(new RateTestReceiver(_)) ++ (3 until 6).map(new RateTestReceiver(_, Some("localhost"))) - val executors = (10000 until 10003).map(port => s"localhost:${port}") ++ - (10003 until 10006).map(port => s"localhost2:${port}") - val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, executors) - val numReceiversOnExecutor = mutable.HashMap[String, Int]() + val executors = (0 until 3).map(executorId => + ExecutorCacheTaskLocation("localhost", executorId.toString)) ++ + (3 until 6).map(executorId => + ExecutorCacheTaskLocation("localhost2", executorId.toString)) + val scheduledLocations = receiverSchedulingPolicy.scheduleReceivers(receivers, executors) + val numReceiversOnExecutor = mutable.HashMap[TaskLocation, Int]() // There should be 1 receiver running on each executor and each receiver has 1 executor - scheduledExecutors.foreach { case (receiverId, executors) => + scheduledLocations.foreach { case (receiverId, executors) => assert(executors.size == 1) executors.foreach { l => + assert(l.isInstanceOf[ExecutorCacheTaskLocation]) numReceiversOnExecutor(l) = numReceiversOnExecutor.getOrElse(l, 0) + 1 } } assert(numReceiversOnExecutor === executors.map(_ -> 1).toMap) // Make sure we schedule the receivers to their preferredLocations val executorsForReceiversWithPreferredLocation = - scheduledExecutors.filter { case (receiverId, executors) => receiverId >= 3 }.flatMap(_._2) + scheduledLocations.filter { case (receiverId, executors) => receiverId >= 3 }.flatMap(_._2) // We can simply check the executor set because we only know each receiver only has 1 executor assert(executorsForReceiversWithPreferredLocation.toSet === - (10000 until 10003).map(port => s"localhost:${port}").toSet) + (0 until 3).map(executorId => + ExecutorCacheTaskLocation("localhost", executorId.toString) + ).toSet) } test("scheduleReceivers: return empty if no receiver") { - assert(receiverSchedulingPolicy.scheduleReceivers(Seq.empty, Seq("localhost:10000")).isEmpty) + val scheduledLocations = receiverSchedulingPolicy. + scheduleReceivers(Seq.empty, Seq(ExecutorCacheTaskLocation("localhost", "1"))) + assert(scheduledLocations.isEmpty) } test("scheduleReceivers: return empty scheduled executors if no executors") { val receivers = (0 until 3).map(new RateTestReceiver(_)) - val scheduledExecutors = receiverSchedulingPolicy.scheduleReceivers(receivers, Seq.empty) - scheduledExecutors.foreach { case (receiverId, executors) => + val scheduledLocations = receiverSchedulingPolicy.scheduleReceivers(receivers, Seq.empty) + scheduledLocations.foreach { case (receiverId, executors) => assert(executors.isEmpty) } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala index fda86aef45..3bd8d086ab 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala @@ -99,8 +99,8 @@ class ReceiverTrackerSuite extends TestSuiteBase { output.register() ssc.start() eventually(timeout(10 seconds), interval(10 millis)) { - // If preferredLocations is set correctly, receiverTaskLocality should be NODE_LOCAL - assert(receiverTaskLocality === TaskLocality.NODE_LOCAL) + // If preferredLocations is set correctly, receiverTaskLocality should be PROCESS_LOCAL + assert(receiverTaskLocality === TaskLocality.PROCESS_LOCAL) } } } |