aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authornishkamravi2 <nishkamravi@gmail.com>2015-06-30 11:12:15 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-06-30 11:12:15 -0700
commitca7e460f7d6fb898dc29236a85520bbe954c8a13 (patch)
treeeb37a75af04badbd044a2db1e8c39ce5154aedbf /streaming
parent9213f73a8ea09ae343af825a6b576c212cf4a0c7 (diff)
downloadspark-ca7e460f7d6fb898dc29236a85520bbe954c8a13.tar.gz
spark-ca7e460f7d6fb898dc29236a85520bbe954c8a13.tar.bz2
spark-ca7e460f7d6fb898dc29236a85520bbe954c8a13.zip
[SPARK-7988] [STREAMING] Round-robin scheduling of receivers by default
Minimal PR for round-robin scheduling of receivers. Dense scheduling can be enabled by setting preferredLocation, so a new config parameter isn't really needed. Tested this on a cluster of 6 nodes and noticed 20-25% gain in throughput compared to random scheduling. tdas pwendell Author: nishkamravi2 <nishkamravi@gmail.com> Author: Nishkam Ravi <nravi@cloudera.com> Closes #6607 from nishkamravi2/master_nravi and squashes the following commits: 1918819 [Nishkam Ravi] Update ReceiverTrackerSuite.scala f747739 [Nishkam Ravi] Update ReceiverTrackerSuite.scala 6127e58 [Nishkam Ravi] Update ReceiverTracker and ReceiverTrackerSuite 9f1abc2 [nishkamravi2] Update ReceiverTrackerSuite.scala ae29152 [Nishkam Ravi] Update test suite with TD's suggestions 48a4a97 [nishkamravi2] Update ReceiverTracker.scala bc23907 [nishkamravi2] Update ReceiverTracker.scala 68e8540 [nishkamravi2] Update SchedulerSuite.scala 4604f28 [nishkamravi2] Update SchedulerSuite.scala 179b90f [nishkamravi2] Update ReceiverTracker.scala 242e677 [nishkamravi2] Update SchedulerSuite.scala 7f3e028 [Nishkam Ravi] Update ReceiverTracker.scala, add unit test cases in SchedulerSuite f8a3e05 [nishkamravi2] Update ReceiverTracker.scala 4cf97b6 [nishkamravi2] Update ReceiverTracker.scala 16e84ec [Nishkam Ravi] Update ReceiverTracker.scala 45e3a99 [Nishkam Ravi] Merge branch 'master_nravi' of https://github.com/nishkamravi2/spark into master_nravi 02dbdb8 [Nishkam Ravi] Update ReceiverTracker.scala 07b9dfa [nishkamravi2] Update ReceiverTracker.scala 6caeefe [nishkamravi2] Update ReceiverTracker.scala 7888257 [nishkamravi2] Update ReceiverTracker.scala 6e3515c [Nishkam Ravi] Minor changes 975b8d8 [Nishkam Ravi] Merge branch 'master_nravi' of https://github.com/nishkamravi2/spark into master_nravi 3cac21b [Nishkam Ravi] Generalize the scheduling algorithm b05ee2f [nishkamravi2] Update ReceiverTracker.scala bb5e09b [Nishkam Ravi] Add a new var in receiver to store location information for round-robin scheduling 41705de [nishkamravi2] Update ReceiverTracker.scala fff1b2e [Nishkam Ravi] Round-robin scheduling of streaming receivers
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala64
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala90
2 files changed, 141 insertions, 13 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index e6cdbec11e..644e581cd8 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -17,8 +17,10 @@
package org.apache.spark.streaming.scheduler
-import scala.collection.mutable.{HashMap, SynchronizedMap}
+import scala.collection.mutable.{ArrayBuffer, HashMap, SynchronizedMap}
import scala.language.existentials
+import scala.math.max
+import org.apache.spark.rdd._
import org.apache.spark.streaming.util.WriteAheadLogUtils
import org.apache.spark.{Logging, SparkEnv, SparkException}
@@ -273,6 +275,41 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
}
/**
+ * Get the list of executors excluding driver
+ */
+ private def getExecutors(ssc: StreamingContext): List[String] = {
+ val executors = ssc.sparkContext.getExecutorMemoryStatus.map(_._1.split(":")(0)).toList
+ val driver = ssc.sparkContext.getConf.get("spark.driver.host")
+ executors.diff(List(driver))
+ }
+
+ /** Set host location(s) for each receiver so as to distribute them over
+ * executors in a round-robin fashion taking into account preferredLocation if set
+ */
+ private[streaming] def scheduleReceivers(receivers: Seq[Receiver[_]],
+ executors: List[String]): Array[ArrayBuffer[String]] = {
+ val locations = new Array[ArrayBuffer[String]](receivers.length)
+ var i = 0
+ for (i <- 0 until receivers.length) {
+ locations(i) = new ArrayBuffer[String]()
+ if (receivers(i).preferredLocation.isDefined) {
+ locations(i) += receivers(i).preferredLocation.get
+ }
+ }
+ var count = 0
+ for (i <- 0 until max(receivers.length, executors.length)) {
+ if (!receivers(i % receivers.length).preferredLocation.isDefined) {
+ locations(i % receivers.length) += executors(count)
+ count += 1
+ if (count == executors.length) {
+ count = 0
+ }
+ }
+ }
+ locations
+ }
+
+ /**
* Get the receivers from the ReceiverInputDStreams, distributes them to the
* worker nodes as a parallel collection, and runs them.
*/
@@ -283,18 +320,6 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
rcvr
})
- // Right now, we only honor preferences if all receivers have them
- val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _)
-
- // Create the parallel collection of receivers to distributed them on the worker nodes
- val tempRDD =
- if (hasLocationPreferences) {
- val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
- ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
- } else {
- ssc.sc.makeRDD(receivers, receivers.size)
- }
-
val checkpointDirOption = Option(ssc.checkpointDir)
val serializableHadoopConf =
new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)
@@ -311,12 +336,25 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
supervisor.start()
supervisor.awaitTermination()
}
+
// Run the dummy Spark job to ensure that all slaves have registered.
// This avoids all the receivers to be scheduled on the same node.
if (!ssc.sparkContext.isLocal) {
ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
}
+ // Get the list of executors and schedule receivers
+ val executors = getExecutors(ssc)
+ val tempRDD =
+ if (!executors.isEmpty) {
+ val locations = scheduleReceivers(receivers, executors)
+ val roundRobinReceivers = (0 until receivers.length).map(i =>
+ (receivers(i), locations(i)))
+ ssc.sc.makeRDD[Receiver[_]](roundRobinReceivers)
+ } else {
+ ssc.sc.makeRDD(receivers, receivers.size)
+ }
+
// Distribute the receivers and start them
logInfo("Starting " + receivers.length + " receivers")
running = true
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
new file mode 100644
index 0000000000..a6e783861d
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler
+
+import org.apache.spark.streaming._
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.receiver._
+import org.apache.spark.util.Utils
+
+/** Testsuite for receiver scheduling */
+class ReceiverTrackerSuite extends TestSuiteBase {
+ val sparkConf = new SparkConf().setMaster("local[8]").setAppName("test")
+ val ssc = new StreamingContext(sparkConf, Milliseconds(100))
+ val tracker = new ReceiverTracker(ssc)
+ val launcher = new tracker.ReceiverLauncher()
+ val executors: List[String] = List("0", "1", "2", "3")
+
+ test("receiver scheduling - all or none have preferred location") {
+
+ def parse(s: String): Array[Array[String]] = {
+ val outerSplit = s.split("\\|")
+ val loc = new Array[Array[String]](outerSplit.length)
+ var i = 0
+ for (i <- 0 until outerSplit.length) {
+ loc(i) = outerSplit(i).split("\\,")
+ }
+ loc
+ }
+
+ def testScheduler(numReceivers: Int, preferredLocation: Boolean, allocation: String) {
+ val receivers =
+ if (preferredLocation) {
+ Array.tabulate(numReceivers)(i => new DummyReceiver(host =
+ Some(((i + 1) % executors.length).toString)))
+ } else {
+ Array.tabulate(numReceivers)(_ => new DummyReceiver)
+ }
+ val locations = launcher.scheduleReceivers(receivers, executors)
+ val expectedLocations = parse(allocation)
+ assert(locations.deep === expectedLocations.deep)
+ }
+
+ testScheduler(numReceivers = 5, preferredLocation = false, allocation = "0|1|2|3|0")
+ testScheduler(numReceivers = 3, preferredLocation = false, allocation = "0,3|1|2")
+ testScheduler(numReceivers = 4, preferredLocation = true, allocation = "1|2|3|0")
+ }
+
+ test("receiver scheduling - some have preferred location") {
+ val numReceivers = 4;
+ val receivers: Seq[Receiver[_]] = Seq(new DummyReceiver(host = Some("1")),
+ new DummyReceiver, new DummyReceiver, new DummyReceiver)
+ val locations = launcher.scheduleReceivers(receivers, executors)
+ assert(locations(0)(0) === "1")
+ assert(locations(1)(0) === "0")
+ assert(locations(2)(0) === "1")
+ assert(locations(0).length === 1)
+ assert(locations(3).length === 1)
+ }
+}
+
+/**
+ * Dummy receiver implementation
+ */
+private class DummyReceiver(host: Option[String] = None)
+ extends Receiver[Int](StorageLevel.MEMORY_ONLY) {
+
+ def onStart() {
+ }
+
+ def onStop() {
+ }
+
+ override def preferredLocation: Option[String] = host
+}