From ca7e460f7d6fb898dc29236a85520bbe954c8a13 Mon Sep 17 00:00:00 2001 From: nishkamravi2 Date: Tue, 30 Jun 2015 11:12:15 -0700 Subject: [SPARK-7988] [STREAMING] Round-robin scheduling of receivers by default Minimal PR for round-robin scheduling of receivers. Dense scheduling can be enabled by setting preferredLocation, so a new config parameter isn't really needed. Tested this on a cluster of 6 nodes and noticed 20-25% gain in throughput compared to random scheduling. tdas pwendell Author: nishkamravi2 Author: Nishkam Ravi Closes #6607 from nishkamravi2/master_nravi and squashes the following commits: 1918819 [Nishkam Ravi] Update ReceiverTrackerSuite.scala f747739 [Nishkam Ravi] Update ReceiverTrackerSuite.scala 6127e58 [Nishkam Ravi] Update ReceiverTracker and ReceiverTrackerSuite 9f1abc2 [nishkamravi2] Update ReceiverTrackerSuite.scala ae29152 [Nishkam Ravi] Update test suite with TD's suggestions 48a4a97 [nishkamravi2] Update ReceiverTracker.scala bc23907 [nishkamravi2] Update ReceiverTracker.scala 68e8540 [nishkamravi2] Update SchedulerSuite.scala 4604f28 [nishkamravi2] Update SchedulerSuite.scala 179b90f [nishkamravi2] Update ReceiverTracker.scala 242e677 [nishkamravi2] Update SchedulerSuite.scala 7f3e028 [Nishkam Ravi] Update ReceiverTracker.scala, add unit test cases in SchedulerSuite f8a3e05 [nishkamravi2] Update ReceiverTracker.scala 4cf97b6 [nishkamravi2] Update ReceiverTracker.scala 16e84ec [Nishkam Ravi] Update ReceiverTracker.scala 45e3a99 [Nishkam Ravi] Merge branch 'master_nravi' of https://github.com/nishkamravi2/spark into master_nravi 02dbdb8 [Nishkam Ravi] Update ReceiverTracker.scala 07b9dfa [nishkamravi2] Update ReceiverTracker.scala 6caeefe [nishkamravi2] Update ReceiverTracker.scala 7888257 [nishkamravi2] Update ReceiverTracker.scala 6e3515c [Nishkam Ravi] Minor changes 975b8d8 [Nishkam Ravi] Merge branch 'master_nravi' of https://github.com/nishkamravi2/spark into master_nravi 3cac21b [Nishkam Ravi] Generalize the scheduling algorithm b05ee2f [nishkamravi2] Update ReceiverTracker.scala bb5e09b [Nishkam Ravi] Add a new var in receiver to store location information for round-robin scheduling 41705de [nishkamravi2] Update ReceiverTracker.scala fff1b2e [Nishkam Ravi] Round-robin scheduling of streaming receivers --- .../streaming/scheduler/ReceiverTracker.scala | 64 +++++++++++---- .../streaming/scheduler/ReceiverTrackerSuite.scala | 90 ++++++++++++++++++++++ 2 files changed, 141 insertions(+), 13 deletions(-) create mode 100644 streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala (limited to 'streaming') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index e6cdbec11e..644e581cd8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -17,8 +17,10 @@ package org.apache.spark.streaming.scheduler -import scala.collection.mutable.{HashMap, SynchronizedMap} +import scala.collection.mutable.{ArrayBuffer, HashMap, SynchronizedMap} import scala.language.existentials +import scala.math.max +import org.apache.spark.rdd._ import org.apache.spark.streaming.util.WriteAheadLogUtils import org.apache.spark.{Logging, SparkEnv, SparkException} @@ -272,6 +274,41 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false } } + /** + * Get the list of executors excluding driver + */ + private def getExecutors(ssc: StreamingContext): List[String] = { + val executors = ssc.sparkContext.getExecutorMemoryStatus.map(_._1.split(":")(0)).toList + val driver = ssc.sparkContext.getConf.get("spark.driver.host") + executors.diff(List(driver)) + } + + /** Set host location(s) for each receiver so as to distribute them over + * executors in a round-robin fashion taking into account preferredLocation if set + */ + private[streaming] def scheduleReceivers(receivers: Seq[Receiver[_]], + executors: List[String]): Array[ArrayBuffer[String]] = { + val locations = new Array[ArrayBuffer[String]](receivers.length) + var i = 0 + for (i <- 0 until receivers.length) { + locations(i) = new ArrayBuffer[String]() + if (receivers(i).preferredLocation.isDefined) { + locations(i) += receivers(i).preferredLocation.get + } + } + var count = 0 + for (i <- 0 until max(receivers.length, executors.length)) { + if (!receivers(i % receivers.length).preferredLocation.isDefined) { + locations(i % receivers.length) += executors(count) + count += 1 + if (count == executors.length) { + count = 0 + } + } + } + locations + } + /** * Get the receivers from the ReceiverInputDStreams, distributes them to the * worker nodes as a parallel collection, and runs them. @@ -283,18 +320,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false rcvr }) - // Right now, we only honor preferences if all receivers have them - val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _) - - // Create the parallel collection of receivers to distributed them on the worker nodes - val tempRDD = - if (hasLocationPreferences) { - val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get))) - ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences) - } else { - ssc.sc.makeRDD(receivers, receivers.size) - } - val checkpointDirOption = Option(ssc.checkpointDir) val serializableHadoopConf = new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration) @@ -311,12 +336,25 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false supervisor.start() supervisor.awaitTermination() } + // Run the dummy Spark job to ensure that all slaves have registered. // This avoids all the receivers to be scheduled on the same node. if (!ssc.sparkContext.isLocal) { ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect() } + // Get the list of executors and schedule receivers + val executors = getExecutors(ssc) + val tempRDD = + if (!executors.isEmpty) { + val locations = scheduleReceivers(receivers, executors) + val roundRobinReceivers = (0 until receivers.length).map(i => + (receivers(i), locations(i))) + ssc.sc.makeRDD[Receiver[_]](roundRobinReceivers) + } else { + ssc.sc.makeRDD(receivers, receivers.size) + } + // Distribute the receivers and start them logInfo("Starting " + receivers.length + " receivers") running = true diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala new file mode 100644 index 0000000000..a6e783861d --- /dev/null +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.scheduler + +import org.apache.spark.streaming._ +import org.apache.spark.SparkConf +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.receiver._ +import org.apache.spark.util.Utils + +/** Testsuite for receiver scheduling */ +class ReceiverTrackerSuite extends TestSuiteBase { + val sparkConf = new SparkConf().setMaster("local[8]").setAppName("test") + val ssc = new StreamingContext(sparkConf, Milliseconds(100)) + val tracker = new ReceiverTracker(ssc) + val launcher = new tracker.ReceiverLauncher() + val executors: List[String] = List("0", "1", "2", "3") + + test("receiver scheduling - all or none have preferred location") { + + def parse(s: String): Array[Array[String]] = { + val outerSplit = s.split("\\|") + val loc = new Array[Array[String]](outerSplit.length) + var i = 0 + for (i <- 0 until outerSplit.length) { + loc(i) = outerSplit(i).split("\\,") + } + loc + } + + def testScheduler(numReceivers: Int, preferredLocation: Boolean, allocation: String) { + val receivers = + if (preferredLocation) { + Array.tabulate(numReceivers)(i => new DummyReceiver(host = + Some(((i + 1) % executors.length).toString))) + } else { + Array.tabulate(numReceivers)(_ => new DummyReceiver) + } + val locations = launcher.scheduleReceivers(receivers, executors) + val expectedLocations = parse(allocation) + assert(locations.deep === expectedLocations.deep) + } + + testScheduler(numReceivers = 5, preferredLocation = false, allocation = "0|1|2|3|0") + testScheduler(numReceivers = 3, preferredLocation = false, allocation = "0,3|1|2") + testScheduler(numReceivers = 4, preferredLocation = true, allocation = "1|2|3|0") + } + + test("receiver scheduling - some have preferred location") { + val numReceivers = 4; + val receivers: Seq[Receiver[_]] = Seq(new DummyReceiver(host = Some("1")), + new DummyReceiver, new DummyReceiver, new DummyReceiver) + val locations = launcher.scheduleReceivers(receivers, executors) + assert(locations(0)(0) === "1") + assert(locations(1)(0) === "0") + assert(locations(2)(0) === "1") + assert(locations(0).length === 1) + assert(locations(3).length === 1) + } +} + +/** + * Dummy receiver implementation + */ +private class DummyReceiver(host: Option[String] = None) + extends Receiver[Int](StorageLevel.MEMORY_ONLY) { + + def onStart() { + } + + def onStop() { + } + + override def preferredLocation: Option[String] = host +} -- cgit v1.2.3