aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorIulian Dragos <jaguarul@gmail.com>2015-07-29 13:47:37 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-07-29 13:47:37 -0700
commit819be46e5a73f2d19230354ebba30c58538590f5 (patch)
treebbbfc1016fe8bdb553864cea1bb036bdc27188ae /streaming/src/main
parent069a4c414db4612d7bdb6f5615c1ba36998e5a49 (diff)
downloadspark-819be46e5a73f2d19230354ebba30c58538590f5.tar.gz
spark-819be46e5a73f2d19230354ebba30c58538590f5.tar.bz2
spark-819be46e5a73f2d19230354ebba30c58538590f5.zip
[SPARK-8977] [STREAMING] Defines the RateEstimator interface, and impements the RateController
Based on #7471. - [x] add a test that exercises the publish path from driver to receiver - [ ] remove Serializable from `RateController` and `RateEstimator` Author: Iulian Dragos <jaguarul@gmail.com> Author: François Garillot <francois@garillot.net> Closes #7600 from dragos/topic/streaming-bp/rate-controller and squashes the following commits: f168c94 [Iulian Dragos] Latest review round. 5125e60 [Iulian Dragos] Fix style. a2eb3b9 [Iulian Dragos] Merge remote-tracking branch 'upstream/master' into topic/streaming-bp/rate-controller 475e346 [Iulian Dragos] Latest round of reviews. e9fb45e [Iulian Dragos] - Add a test for checkpointing - fixed serialization for RateController.executionContext 715437a [Iulian Dragos] Review comments and added a `reset` call in ReceiverTrackerTest. e57c66b [Iulian Dragos] Added a couple of tests for the full scenario from driver to receivers, with several rate updates. b425d32 [Iulian Dragos] Removed DeveloperAPI, removed rateEstimator field, removed Noop rate estimator, changed logic for initialising rate estimator. 238cfc6 [Iulian Dragos] Merge remote-tracking branch 'upstream/master' into topic/streaming-bp/rate-controller 34a389d [Iulian Dragos] Various style changes and a first test for the rate controller. d32ca36 [François Garillot] [SPARK-8977][Streaming] Defines the RateEstimator interface, and implements the ReceiverRateController 8941cf9 [Iulian Dragos] Renames and other nitpicks. 162d9e5 [Iulian Dragos] Use Reflection for accessing truly private `executor` method and use the listener bus to know when receivers have registered (`onStart` is called before receivers have registered, leading to flaky behavior). 210f495 [Iulian Dragos] Revert "Added a few tests that measure the receiver’s rate." 0c51959 [Iulian Dragos] Added a few tests that measure the receiver’s rate. 261a051 [Iulian Dragos] - removed field to hold the current rate limit in rate limiter - made rate limit a Long and default to Long.MaxValue (consequence of the above) - removed custom `waitUntil` and replaced it by `eventually` cd1397d [Iulian Dragos] Add a test for the propagation of a new rate limit from driver to receivers. 6369b30 [Iulian Dragos] Merge pull request #15 from huitseeker/SPARK-8975 d15de42 [François Garillot] [SPARK-8975][Streaming] Adds Ratelimiter unit tests w.r.t. spark.streaming.receiver.maxRate 4721c7d [François Garillot] [SPARK-8975][Streaming] Add a mechanism to send a new rate from the driver to the block generator
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala7
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala26
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala90
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala59
5 files changed, 185 insertions, 3 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
index d58c99a8ff..a6c4cd220e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala
@@ -21,7 +21,9 @@ import scala.reflect.ClassTag
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDDOperationScope
-import org.apache.spark.streaming.{Time, Duration, StreamingContext}
+import org.apache.spark.streaming.{Duration, StreamingContext, Time}
+import org.apache.spark.streaming.scheduler.RateController
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
import org.apache.spark.util.Utils
/**
@@ -47,6 +49,9 @@ abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
/** This is an unique identifier for the input stream. */
val id = ssc.getNewInputStreamId()
+ // Keep track of the freshest rate for this stream using the rateEstimator
+ protected[streaming] val rateController: Option[RateController] = None
+
/** A human-readable name of this InputDStream */
private[streaming] def name: String = {
// e.g. FlumePollingDStream -> "Flume polling stream"
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
index a50f0efc03..646a8c3530 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala
@@ -21,10 +21,11 @@ import scala.reflect.ClassTag
import org.apache.spark.rdd.{BlockRDD, RDD}
import org.apache.spark.storage.BlockId
-import org.apache.spark.streaming._
+import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
import org.apache.spark.streaming.receiver.Receiver
-import org.apache.spark.streaming.scheduler.StreamInputInfo
+import org.apache.spark.streaming.scheduler.{RateController, StreamInputInfo}
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
import org.apache.spark.streaming.util.WriteAheadLogUtils
/**
@@ -41,6 +42,17 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
extends InputDStream[T](ssc_) {
/**
+ * Asynchronously maintains & sends new rate limits to the receiver through the receiver tracker.
+ */
+ override protected[streaming] val rateController: Option[RateController] = {
+ if (RateController.isBackPressureEnabled(ssc.conf)) {
+ RateEstimator.create(ssc.conf).map { new ReceiverRateController(id, _) }
+ } else {
+ None
+ }
+ }
+
+ /**
* Gets the receiver object that will be sent to the worker nodes
* to receive data. This method needs to defined by any specific implementation
* of a ReceiverInputDStream.
@@ -110,4 +122,14 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
}
Some(blockRDD)
}
+
+ /**
+ * A RateController that sends the new rate to receivers, via the receiver tracker.
+ */
+ private[streaming] class ReceiverRateController(id: Int, estimator: RateEstimator)
+ extends RateController(id, estimator) {
+ override def publish(rate: Long): Unit =
+ ssc.scheduler.receiverTracker.sendRateUpdate(id, rate)
+ }
}
+
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 4af9b6d3b5..58bdda7794 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -66,6 +66,12 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
}
eventLoop.start()
+ // attach rate controllers of input streams to receive batch completion updates
+ for {
+ inputDStream <- ssc.graph.getInputStreams
+ rateController <- inputDStream.rateController
+ } ssc.addStreamingListener(rateController)
+
listenerBus.start(ssc.sparkContext)
receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala
new file mode 100644
index 0000000000..882ca0676b
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/RateController.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler
+
+import java.io.ObjectInputStream
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.concurrent.{ExecutionContext, Future}
+
+import org.apache.spark.SparkConf
+import org.apache.spark.streaming.scheduler.rate.RateEstimator
+import org.apache.spark.util.{ThreadUtils, Utils}
+
+/**
+ * A StreamingListener that receives batch completion updates, and maintains
+ * an estimate of the speed at which this stream should ingest messages,
+ * given an estimate computation from a `RateEstimator`
+ */
+private[streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator)
+ extends StreamingListener with Serializable {
+
+ init()
+
+ protected def publish(rate: Long): Unit
+
+ @transient
+ implicit private var executionContext: ExecutionContext = _
+
+ @transient
+ private var rateLimit: AtomicLong = _
+
+ /**
+ * An initialization method called both from the constructor and Serialization code.
+ */
+ private def init() {
+ executionContext = ExecutionContext.fromExecutorService(
+ ThreadUtils.newDaemonSingleThreadExecutor("stream-rate-update"))
+ rateLimit = new AtomicLong(-1L)
+ }
+
+ private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
+ ois.defaultReadObject()
+ init()
+ }
+
+ /**
+ * Compute the new rate limit and publish it asynchronously.
+ */
+ private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
+ Future[Unit] {
+ val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
+ newRate.foreach { s =>
+ rateLimit.set(s.toLong)
+ publish(getLatestRate())
+ }
+ }
+
+ def getLatestRate(): Long = rateLimit.get()
+
+ override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
+ val elements = batchCompleted.batchInfo.streamIdToInputInfo
+
+ for {
+ processingEnd <- batchCompleted.batchInfo.processingEndTime;
+ workDelay <- batchCompleted.batchInfo.processingDelay;
+ waitDelay <- batchCompleted.batchInfo.schedulingDelay;
+ elems <- elements.get(streamUID).map(_.numRecords)
+ } computeAndPublish(processingEnd, elems, workDelay, waitDelay)
+ }
+}
+
+object RateController {
+ def isBackPressureEnabled(conf: SparkConf): Boolean =
+ conf.getBoolean("spark.streaming.backpressure.enable", false)
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala
new file mode 100644
index 0000000000..a08685119e
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/rate/RateEstimator.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler.rate
+
+import org.apache.spark.SparkConf
+import org.apache.spark.SparkException
+
+/**
+ * A component that estimates the rate at wich an InputDStream should ingest
+ * elements, based on updates at every batch completion.
+ */
+private[streaming] trait RateEstimator extends Serializable {
+
+ /**
+ * Computes the number of elements the stream attached to this `RateEstimator`
+ * should ingest per second, given an update on the size and completion
+ * times of the latest batch.
+ *
+ * @param time The timetamp of the current batch interval that just finished
+ * @param elements The number of elements that were processed in this batch
+ * @param processingDelay The time in ms that took for the job to complete
+ * @param schedulingDelay The time in ms that the job spent in the scheduling queue
+ */
+ def compute(
+ time: Long,
+ elements: Long,
+ processingDelay: Long,
+ schedulingDelay: Long): Option[Double]
+}
+
+object RateEstimator {
+
+ /**
+ * Return a new RateEstimator based on the value of `spark.streaming.RateEstimator`.
+ *
+ * @return None if there is no configured estimator, otherwise an instance of RateEstimator
+ * @throws IllegalArgumentException if there is a configured RateEstimator that doesn't match any
+ * known estimators.
+ */
+ def create(conf: SparkConf): Option[RateEstimator] =
+ conf.getOption("spark.streaming.backpressure.rateEstimator").map { estimator =>
+ throw new IllegalArgumentException(s"Unkown rate estimator: $estimator")
+ }
+}