aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-01-13 21:28:12 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-01-13 21:28:12 -0800
commit82b0cc90cad4eb0e85a7ff4e14a90afec258a9a1 (patch)
tree7a267a044de6c068917794d225b3d9412ee2321f /streaming/src
parent23b54f62b7c4b703a32e96f9436b03f87e7f6710 (diff)
parentf90f794cde479f4de425e9be0158a136a57666a2 (diff)
downloadspark-82b0cc90cad4eb0e85a7ff4e14a90afec258a9a1.tar.gz
spark-82b0cc90cad4eb0e85a7ff4e14a90afec258a9a1.tar.bz2
spark-82b0cc90cad4eb0e85a7ff4e14a90afec258a9a1.zip
Merge pull request #370 from tdas/streaming
Added more documentation and minor change in API for NetworkReceiver
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala37
-rw-r--r--streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala34
-rw-r--r--streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala293
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala5
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala6
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala42
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala6
-rw-r--r--streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala5
-rw-r--r--streaming/src/main/scala/spark/streaming/util/RawTextHelper.scala2
-rw-r--r--streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala4
10 files changed, 375 insertions, 59 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index c89fb7723e..fbe3cebd6d 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -471,7 +471,7 @@ abstract class DStream[T: ClassManifest] (
* Returns a new DStream in which each RDD has a single element generated by counting each RDD
* of this DStream.
*/
- def count(): DStream[Int] = this.map(_ => 1).reduce(_ + _)
+ def count(): DStream[Long] = this.map(_ => 1L).reduce(_ + _)
/**
* Applies a function to each RDD in this DStream. This is an output operator, so
@@ -529,17 +529,16 @@ abstract class DStream[T: ClassManifest] (
* Return a new DStream which is computed based on windowed batches of this DStream.
* The new DStream generates RDDs with the same interval as this DStream.
* @param windowDuration width of the window; must be a multiple of this DStream's interval.
- * @return
*/
def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration)
/**
* Return a new DStream which is computed based on windowed batches of this DStream.
- * @param windowDuration duration (i.e., width) of the window;
- * must be a multiple of this DStream's interval
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
- * the new DStream will generate RDDs); must be a multiple of this
- * DStream's interval
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
*/
def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = {
new WindowedDStream(this, windowDuration, slideDuration)
@@ -548,16 +547,22 @@ abstract class DStream[T: ClassManifest] (
/**
* Returns a new DStream which computed based on tumbling window on this DStream.
* This is equivalent to window(batchTime, batchTime).
- * @param batchTime tumbling window duration; must be a multiple of this DStream's interval
+ * @param batchDuration tumbling window duration; must be a multiple of this DStream's
+ * batching interval
*/
- def tumble(batchTime: Duration): DStream[T] = window(batchTime, batchTime)
+ def tumble(batchDuration: Duration): DStream[T] = window(batchDuration, batchDuration)
/**
* Returns a new DStream in which each RDD has a single element generated by reducing all
- * elements in a window over this DStream. windowDuration and slideDuration are as defined in the
- * window() operation. This is equivalent to window(windowDuration, slideDuration).reduce(reduceFunc)
+ * elements in a window over this DStream. windowDuration and slideDuration are as defined
+ * in the window() operation. This is equivalent to
+ * window(windowDuration, slideDuration).reduce(reduceFunc)
*/
- def reduceByWindow(reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T] = {
+ def reduceByWindow(
+ reduceFunc: (T, T) => T,
+ windowDuration: Duration,
+ slideDuration: Duration
+ ): DStream[T] = {
this.window(windowDuration, slideDuration).reduce(reduceFunc)
}
@@ -577,8 +582,8 @@ abstract class DStream[T: ClassManifest] (
* of elements in a window over this DStream. windowDuration and slideDuration are as defined in the
* window() operation. This is equivalent to window(windowDuration, slideDuration).count()
*/
- def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Int] = {
- this.map(_ => 1).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
+ def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long] = {
+ this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
}
/**
@@ -612,6 +617,8 @@ abstract class DStream[T: ClassManifest] (
/**
* Saves each RDD in this DStream as a Sequence file of serialized objects.
+ * The file name at each batch interval is generated based on `prefix` and
+ * `suffix`: "prefix-TIME_IN_MS.suffix".
*/
def saveAsObjectFiles(prefix: String, suffix: String = "") {
val saveFunc = (rdd: RDD[T], time: Time) => {
@@ -622,7 +629,9 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Saves each RDD in this DStream as at text file, using string representation of elements.
+ * Saves each RDD in this DStream as at text file, using string representation
+ * of elements. The file name at each batch interval is generated based on
+ * `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
def saveAsTextFiles(prefix: String, suffix: String = "") {
val saveFunc = (rdd: RDD[T], time: Time) => {
diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
index a6ab44271f..e4152f3a61 100644
--- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
@@ -18,7 +18,10 @@ private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: Act
private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[String], metadata: Any) extends NetworkInputTrackerMessage
private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage
-
+/**
+ * This class manages the execution of the receivers of NetworkInputDStreams.
+ */
+private[streaming]
class NetworkInputTracker(
@transient ssc: StreamingContext,
@transient networkInputStreams: Array[NetworkInputDStream[_]])
@@ -32,16 +35,20 @@ class NetworkInputTracker(
var currentTime: Time = null
+ /** Start the actor and receiver execution thread. */
def start() {
ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor), "NetworkInputTracker")
receiverExecutor.start()
}
+ /** Stop the receiver execution thread. */
def stop() {
+ // TODO: stop the actor as well
receiverExecutor.interrupt()
receiverExecutor.stopReceivers()
}
+ /** Return all the blocks received from a receiver. */
def getBlockIds(receiverId: Int, time: Time): Array[String] = synchronized {
val queue = receivedBlockIds.synchronized {
receivedBlockIds.getOrElse(receiverId, new Queue[String]())
@@ -53,6 +60,7 @@ class NetworkInputTracker(
result.toArray
}
+ /** Actor to receive messages from the receivers. */
private class NetworkInputTrackerActor extends Actor {
def receive = {
case RegisterReceiver(streamId, receiverActor) => {
@@ -83,7 +91,8 @@ class NetworkInputTracker(
}
}
}
-
+
+ /** This thread class runs all the receivers on the cluster. */
class ReceiverExecutor extends Thread {
val env = ssc.env
@@ -97,13 +106,22 @@ class NetworkInputTracker(
stopReceivers()
}
}
-
+
+ /**
+ * Get the receivers from the NetworkInputDStreams, distributes them to the
+ * worker nodes as a parallel collection, and runs them.
+ */
def startReceivers() {
- val receivers = networkInputStreams.map(_.createReceiver())
+ val receivers = networkInputStreams.map(nis => {
+ val rcvr = nis.createReceiver()
+ rcvr.setStreamId(nis.id)
+ rcvr
+ })
// Right now, we only honor preferences if all receivers have them
val hasLocationPreferences = receivers.map(_.getLocationPreference().isDefined).reduce(_ && _)
+ // Create the parallel collection of receivers to distributed them on the worker nodes
val tempRDD =
if (hasLocationPreferences) {
val receiversWithPreferences = receivers.map(r => (r, Seq(r.getLocationPreference().toString)))
@@ -113,21 +131,21 @@ class NetworkInputTracker(
ssc.sc.makeRDD(receivers, receivers.size)
}
+ // Function to start the receiver on the worker node
val startReceiver = (iterator: Iterator[NetworkReceiver[_]]) => {
if (!iterator.hasNext) {
throw new Exception("Could not start receiver as details not found.")
}
iterator.next().start()
}
+ // Distribute the receivers and start them
ssc.sc.runJob(tempRDD, startReceiver)
}
+ /** Stops the receivers. */
def stopReceivers() {
- //implicit val ec = env.actorSystem.dispatcher
+ // Signal the receivers to stop
receiverInfo.values.foreach(_ ! StopReceiver)
- //val listOfFutures = receiverInfo.values.map(_.ask(StopReceiver)(timeout)).toList
- //val futureOfList = Future.sequence(listOfFutures)
- //Await.result(futureOfList, timeout)
}
}
}
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
index 482d01300d..3952457339 100644
--- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
@@ -25,34 +25,76 @@ extends Serializable {
new HashPartitioner(numPartitions)
}
+ /**
+ * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream.
+ * Therefore, the values for each key in `this` DStream's RDDs are grouped into a
+ * single sequence to generate the RDDs of the new DStream. Hash partitioning is
+ * used to generate the RDDs with Spark's default number of partitions.
+ */
def groupByKey(): DStream[(K, Seq[V])] = {
groupByKey(defaultPartitioner())
}
+ /**
+ * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream.
+ * Therefore, the values for each key in `this` DStream's RDDs are grouped into a
+ * single sequence to generate the RDDs of the new DStream. Hash partitioning is
+ * used to generate the RDDs with `numPartitions` partitions.
+ */
def groupByKey(numPartitions: Int): DStream[(K, Seq[V])] = {
groupByKey(defaultPartitioner(numPartitions))
}
+ /**
+ * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream.
+ * Therefore, the values for each key in `this` DStream's RDDs are grouped into a
+ * single sequence to generate the RDDs of the new DStream. [[spark.Partitioner]]
+ * is used to control the partitioning of each RDD.
+ */
def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = {
val createCombiner = (v: V) => ArrayBuffer[V](v)
val mergeValue = (c: ArrayBuffer[V], v: V) => (c += v)
val mergeCombiner = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => (c1 ++ c2)
- combineByKey(createCombiner, mergeValue, mergeCombiner, partitioner).asInstanceOf[DStream[(K, Seq[V])]]
+ combineByKey(createCombiner, mergeValue, mergeCombiner, partitioner)
+ .asInstanceOf[DStream[(K, Seq[V])]]
}
+ /**
+ * Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream.
+ * Therefore, the values for each key in `this` DStream's RDDs is merged using the
+ * associative reduce function to generate the RDDs of the new DStream.
+ * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ */
def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = {
reduceByKey(reduceFunc, defaultPartitioner())
}
+ /**
+ * Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream.
+ * Therefore, the values for each key in `this` DStream's RDDs is merged using the
+ * associative reduce function to generate the RDDs of the new DStream.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ */
def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)] = {
reduceByKey(reduceFunc, defaultPartitioner(numPartitions))
}
+ /**
+ * Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream.
+ * Therefore, the values for each key in `this` DStream's RDDs is merged using the
+ * associative reduce function to generate the RDDs of the new DStream.
+ * [[spark.Partitioner]] is used to control the partitioning of each RDD.
+ */
def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = {
val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
combineByKey((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, partitioner)
}
+ /**
+ * Generic function to combine elements of each key in DStream's RDDs using custom function.
+ * This is similar to the combineByKey for RDDs. Please refer to combineByKey in
+ * [[spark.PairRDDFunctions]] for more information.
+ */
def combineByKey[C: ClassManifest](
createCombiner: V => C,
mergeValue: (C, V) => C,
@@ -61,14 +103,52 @@ extends Serializable {
new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner)
}
+ /**
+ * Creates a new DStream by counting the number of values of each key in each RDD
+ * of `this` DStream. Hash partitioning is used to generate the RDDs with Spark's
+ * `numPartitions` partitions.
+ */
def countByKey(numPartitions: Int = self.ssc.sc.defaultParallelism): DStream[(K, Long)] = {
self.map(x => (x._1, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
}
+ /**
+ * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
+ * This is similar to `DStream.groupByKey()` but applies it over a sliding window.
+ * The new DStream generates RDDs with the same interval as this DStream.
+ * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ */
+ def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Seq[V])] = {
+ groupByKeyAndWindow(windowDuration, self.slideDuration, defaultPartitioner())
+ }
+
+ /**
+ * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
+ * This is similar to `DStream.groupByKey()` but applies it over a sliding window.
+ * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Seq[V])] = {
groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner())
}
+ /**
+ * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
+ * This is similar to `DStream.groupByKey()` but applies it over a sliding window.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param numPartitions Number of partitions of each RDD in the new DStream.
+ */
def groupByKeyAndWindow(
windowDuration: Duration,
slideDuration: Duration,
@@ -77,6 +157,16 @@ extends Serializable {
groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner(numPartitions))
}
+ /**
+ * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
+ * This is similar to `DStream.groupByKey()` but applies it over a sliding window.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ */
def groupByKeyAndWindow(
windowDuration: Duration,
slideDuration: Duration,
@@ -85,6 +175,15 @@ extends Serializable {
self.window(windowDuration, slideDuration).groupByKey(partitioner)
}
+ /**
+ * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
+ * This is similar to `DStream.reduceByKey()` but applies it over a sliding window.
+ * The new DStream generates RDDs with the same interval as this DStream.
+ * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ */
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
windowDuration: Duration
@@ -92,6 +191,17 @@ extends Serializable {
reduceByKeyAndWindow(reduceFunc, windowDuration, self.slideDuration, defaultPartitioner())
}
+ /**
+ * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
+ * This is similar to `DStream.reduceByKey()` but applies it over a sliding window.
+ * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
windowDuration: Duration,
@@ -100,6 +210,18 @@ extends Serializable {
reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner())
}
+ /**
+ * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
+ * This is similar to `DStream.reduceByKey()` but applies it over a sliding window.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param numPartitions Number of partitions of each RDD in the new DStream.
+ */
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
windowDuration: Duration,
@@ -109,6 +231,17 @@ extends Serializable {
reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions))
}
+ /**
+ * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
+ * This is similar to `DStream.reduceByKey()` but applies it over a sliding window.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ */
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
windowDuration: Duration,
@@ -121,12 +254,23 @@ extends Serializable {
.reduceByKey(cleanedReduceFunc, partitioner)
}
- // This method is the efficient sliding window reduce operation,
- // which requires the specification of an inverse reduce function,
- // so that new elements introduced in the window can be "added" using
- // reduceFunc to the previous window's result and old elements can be
- // "subtracted using invReduceFunc.
-
+ /**
+ * Creates a new DStream by reducing over a window in a smarter way.
+ * The reduced value of over a new window is calculated incrementally by using the
+ * old window's reduce value :
+ * 1. reduce the new values that entered the window (e.g., adding new counts)
+ * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
+ * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
+ * However, it is applicable to only "invertible reduce functions".
+ * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * @param reduceFunc associative reduce function
+ * @param invReduceFunc inverse function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
@@ -138,6 +282,24 @@ extends Serializable {
reduceFunc, invReduceFunc, windowDuration, slideDuration, defaultPartitioner())
}
+ /**
+ * Creates a new DStream by reducing over a window in a smarter way.
+ * The reduced value of over a new window is calculated incrementally by using the
+ * old window's reduce value :
+ * 1. reduce the new values that entered the window (e.g., adding new counts)
+ * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
+ * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
+ * However, it is applicable to only "invertible reduce functions".
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ * @param reduceFunc associative reduce function
+ * @param invReduceFunc inverse function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param numPartitions Number of partitions of each RDD in the new DStream.
+ */
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
@@ -150,6 +312,23 @@ extends Serializable {
reduceFunc, invReduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions))
}
+ /**
+ * Creates a new DStream by reducing over a window in a smarter way.
+ * The reduced value of over a new window is calculated incrementally by using the
+ * old window's reduce value :
+ * 1. reduce the new values that entered the window (e.g., adding new counts)
+ * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
+ * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
+ * However, it is applicable to only "invertible reduce functions".
+ * @param reduceFunc associative reduce function
+ * @param invReduceFunc inverse function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ */
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
@@ -164,6 +343,16 @@ extends Serializable {
self, cleanedReduceFunc, cleanedInvReduceFunc, windowDuration, slideDuration, partitioner)
}
+ /**
+ * Creates a new DStream by counting the number of values for each key over a window.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param numPartitions Number of partitions of each RDD in the new DStream.
+ */
def countByKeyAndWindow(
windowDuration: Duration,
slideDuration: Duration,
@@ -179,17 +368,30 @@ extends Serializable {
)
}
- // TODO:
- //
- //
- //
- //
+ /**
+ * Creates a new "state" DStream where the state for each key is updated by applying
+ * the given function on the previous state of the key and the new values of the key from
+ * `this` DStream. Hash partitioning is used to generate the RDDs with Spark's default
+ * number of partitions.
+ * @param updateFunc State update function. If `this` function returns None, then
+ * corresponding state key-value pair will be eliminated.
+ * @tparam S State type
+ */
def updateStateByKey[S <: AnyRef : ClassManifest](
updateFunc: (Seq[V], Option[S]) => Option[S]
): DStream[(K, S)] = {
updateStateByKey(updateFunc, defaultPartitioner())
}
+ /**
+ * Creates a new "state" DStream where the state for each key is updated by applying
+ * the given function on the previous state of the key and the new values of the key from
+ * `this` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ * @param updateFunc State update function. If `this` function returns None, then
+ * corresponding state key-value pair will be eliminated.
+ * @param numPartitions Number of partitions of each RDD in the new DStream.
+ * @tparam S State type
+ */
def updateStateByKey[S <: AnyRef : ClassManifest](
updateFunc: (Seq[V], Option[S]) => Option[S],
numPartitions: Int
@@ -197,6 +399,15 @@ extends Serializable {
updateStateByKey(updateFunc, defaultPartitioner(numPartitions))
}
+ /**
+ * Creates a new "state" DStream where the state for each key is updated by applying
+ * the given function on the previous state of the key and the new values of the key from
+ * `this` DStream. [[spark.Partitioner]] is used to control the partitioning of each RDD.
+ * @param updateFunc State update function. If `this` function returns None, then
+ * corresponding state key-value pair will be eliminated.
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @tparam S State type
+ */
def updateStateByKey[S <: AnyRef : ClassManifest](
updateFunc: (Seq[V], Option[S]) => Option[S],
partitioner: Partitioner
@@ -207,6 +418,19 @@ extends Serializable {
updateStateByKey(newUpdateFunc, partitioner, true)
}
+ /**
+ * Creates a new "state" DStream where the state for each key is updated by applying
+ * the given function on the previous state of the key and the new values of the key from
+ * `this` DStream. [[spark.Partitioner]] is used to control the partitioning of each RDD.
+ * @param updateFunc State update function. If `this` function returns None, then
+ * corresponding state key-value pair will be eliminated. Note, that
+ * this function may generate a different a tuple with a different key
+ * than the input key. It is up to the developer to decide whether to
+ * remember the partitioner despite the key being changed.
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs.
+ * @tparam S State type
+ */
def updateStateByKey[S <: AnyRef : ClassManifest](
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
partitioner: Partitioner,
@@ -226,10 +450,24 @@ extends Serializable {
new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc)
}
+ /**
+ * Cogroups `this` DStream with `other` DStream. Each RDD of the new DStream will
+ * be generated by cogrouping RDDs from`this`and `other` DStreams. Therefore, for
+ * each key k in corresponding RDDs of `this` or `other` DStreams, the generated RDD
+ * will contains a tuple with the list of values for that key in both RDDs.
+ * HashPartitioner is used to partition each generated RDD into default number of partitions.
+ */
def cogroup[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = {
cogroup(other, defaultPartitioner())
}
+ /**
+ * Cogroups `this` DStream with `other` DStream. Each RDD of the new DStream will
+ * be generated by cogrouping RDDs from`this`and `other` DStreams. Therefore, for
+ * each key k in corresponding RDDs of `this` or `other` DStreams, the generated RDD
+ * will contains a tuple with the list of values for that key in both RDDs.
+ * Partitioner is used to partition each generated RDD.
+ */
def cogroup[W: ClassManifest](
other: DStream[(K, W)],
partitioner: Partitioner
@@ -249,11 +487,24 @@ extends Serializable {
}
}
+ /**
+ * Joins `this` DStream with `other` DStream. Each RDD of the new DStream will
+ * be generated by joining RDDs from `this` and `other` DStreams. HashPartitioner is used
+ * to partition each generated RDD into default number of partitions.
+ */
def join[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, W))] = {
join[W](other, defaultPartitioner())
}
- def join[W: ClassManifest](other: DStream[(K, W)], partitioner: Partitioner): DStream[(K, (V, W))] = {
+ /**
+ * Joins `this` DStream with `other` DStream, that is, each RDD of the new DStream will
+ * be generated by joining RDDs from `this` and other DStream. Uses the given
+ * Partitioner to partition each generated RDD.
+ */
+ def join[W: ClassManifest](
+ other: DStream[(K, W)],
+ partitioner: Partitioner
+ ): DStream[(K, (V, W))] = {
this.cogroup(other, partitioner)
.flatMapValues{
case (vs, ws) =>
@@ -261,6 +512,10 @@ extends Serializable {
}
}
+ /**
+ * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
+ * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
+ */
def saveAsHadoopFiles[F <: OutputFormat[K, V]](
prefix: String,
suffix: String
@@ -268,6 +523,10 @@ extends Serializable {
saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
}
+ /**
+ * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
+ * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
+ */
def saveAsHadoopFiles(
prefix: String,
suffix: String,
@@ -283,6 +542,10 @@ extends Serializable {
self.foreach(saveFunc)
}
+ /**
+ * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
+ * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
+ */
def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](
prefix: String,
suffix: String
@@ -290,6 +553,10 @@ extends Serializable {
saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
}
+ /**
+ * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
+ * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
+ */
def saveAsNewAPIHadoopFiles(
prefix: String,
suffix: String,
diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala
index ca70e72e56..efc7058480 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala
@@ -26,7 +26,7 @@ class FlumeInputDStream[T: ClassManifest](
) extends NetworkInputDStream[SparkFlumeEvent](ssc_) {
override def createReceiver(): NetworkReceiver[SparkFlumeEvent] = {
- new FlumeReceiver(id, host, port, storageLevel)
+ new FlumeReceiver(host, port, storageLevel)
}
}
@@ -112,11 +112,10 @@ class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol {
* Flume Avro interface.*/
private[streaming]
class FlumeReceiver(
- streamId: Int,
host: String,
port: Int,
storageLevel: StorageLevel
- ) extends NetworkReceiver[SparkFlumeEvent](streamId) {
+ ) extends NetworkReceiver[SparkFlumeEvent] {
lazy val blockGenerator = new BlockGenerator(storageLevel)
diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
index 25988a2ce7..2b4740bdf7 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala
@@ -96,15 +96,15 @@ class KafkaInputDStream[T: ClassManifest](
} */
def createReceiver(): NetworkReceiver[T] = {
- new KafkaReceiver(id, host, port, groupId, topics, initialOffsets, storageLevel)
+ new KafkaReceiver(host, port, groupId, topics, initialOffsets, storageLevel)
.asInstanceOf[NetworkReceiver[T]]
}
}
private[streaming]
-class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String,
+class KafkaReceiver(host: String, port: Int, groupId: String,
topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long],
- storageLevel: StorageLevel) extends NetworkReceiver[Any](streamId) {
+ storageLevel: StorageLevel) extends NetworkReceiver[Any] {
// Timeout for establishing a connection to Zookeper in ms.
val ZK_TIMEOUT = 10000
diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
index 18e62a0e33..aa6be95f30 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
@@ -17,6 +17,15 @@ import akka.util.duration._
import spark.streaming.util.{RecurringTimer, SystemClock}
import java.util.concurrent.ArrayBlockingQueue
+/**
+ * Abstract class for defining any InputDStream that has to start a receiver on worker
+ * nodes to receive external data. Specific implementations of NetworkInputDStream must
+ * define the createReceiver() function that creates the receiver object of type
+ * [[spark.streaming.dstream.NetworkReceiver]] that will be sent to the workers to receive
+ * data.
+ * @param ssc_ Streaming context that will execute this input stream
+ * @tparam T Class type of the object of this stream
+ */
abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : StreamingContext)
extends InputDStream[T](ssc_) {
@@ -25,7 +34,7 @@ abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : Streaming
val id = ssc.getNewNetworkStreamId()
/**
- * This method creates the receiver object that will be sent to the workers
+ * Creates the receiver object that will be sent to the worker nodes
* to receive data. This method needs to defined by any specific implementation
* of a NetworkInputDStream.
*/
@@ -48,7 +57,11 @@ private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverM
private[streaming] case class ReportBlock(blockId: String, metadata: Any) extends NetworkReceiverMessage
private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage
-abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Serializable with Logging {
+/**
+ * Abstract class of a receiver that can be run on worker nodes to receive external data. See
+ * [[spark.streaming.dstream.NetworkInputDStream]] for an explanation.
+ */
+abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Logging {
initLogging()
@@ -59,17 +72,22 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri
lazy protected val receivingThread = Thread.currentThread()
- /** This method will be called to start receiving data. */
+ protected var streamId: Int = -1
+
+ /**
+ * This method will be called to start receiving data. All your receiver
+ * starting code should be implemented by defining this function.
+ */
protected def onStart()
/** This method will be called to stop receiving data. */
protected def onStop()
- /** This method conveys a placement preference (hostname) for this receiver. */
+ /** Conveys a placement preference (hostname) for this receiver. */
def getLocationPreference() : Option[String] = None
/**
- * This method starts the receiver. First is accesses all the lazy members to
+ * Starts the receiver. First is accesses all the lazy members to
* materialize them. Then it calls the user-defined onStart() method to start
* other threads, etc required to receiver the data.
*/
@@ -92,7 +110,7 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri
}
/**
- * This method stops the receiver. First it interrupts the main receiving thread,
+ * Stops the receiver. First it interrupts the main receiving thread,
* that is, the thread that called receiver.start(). Then it calls the user-defined
* onStop() method to stop other threads and/or do cleanup.
*/
@@ -103,7 +121,7 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri
}
/**
- * This method stops the receiver and reports to exception to the tracker.
+ * Stops the receiver and reports to exception to the tracker.
* This should be called whenever an exception has happened on any thread
* of the receiver.
*/
@@ -115,7 +133,7 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri
/**
- * This method pushes a block (as iterator of values) into the block manager.
+ * Pushes a block (as iterator of values) into the block manager.
*/
def pushBlock(blockId: String, iterator: Iterator[T], metadata: Any, level: StorageLevel) {
val buffer = new ArrayBuffer[T] ++ iterator
@@ -125,7 +143,7 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri
}
/**
- * This method pushes a block (as bytes) into the block manager.
+ * Pushes a block (as bytes) into the block manager.
*/
def pushBlock(blockId: String, bytes: ByteBuffer, metadata: Any, level: StorageLevel) {
env.blockManager.putBytes(blockId, bytes, level)
@@ -157,6 +175,10 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri
}
}
+ protected[streaming] def setStreamId(id: Int) {
+ streamId = id
+ }
+
/**
* Batches objects created by a [[spark.streaming.NetworkReceiver]] and puts them into
* appropriately named blocks at regular intervals. This class starts two threads,
@@ -202,7 +224,7 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri
val newBlockBuffer = currentBuffer
currentBuffer = new ArrayBuffer[T]
if (newBlockBuffer.size > 0) {
- val blockId = "input-" + NetworkReceiver.this.streamId + "- " + (time - blockInterval)
+ val blockId = "input-" + NetworkReceiver.this.streamId + "-" + (time - blockInterval)
val newBlock = createBlock(blockId, newBlockBuffer.toIterator)
blocksForPushing.add(newBlock)
}
diff --git a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala
index aa2f31cea8..290fab1ce0 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala
@@ -26,13 +26,13 @@ class RawInputDStream[T: ClassManifest](
) extends NetworkInputDStream[T](ssc_ ) with Logging {
def createReceiver(): NetworkReceiver[T] = {
- new RawNetworkReceiver(id, host, port, storageLevel).asInstanceOf[NetworkReceiver[T]]
+ new RawNetworkReceiver(host, port, storageLevel).asInstanceOf[NetworkReceiver[T]]
}
}
private[streaming]
-class RawNetworkReceiver(streamId: Int, host: String, port: Int, storageLevel: StorageLevel)
- extends NetworkReceiver[Any](streamId) {
+class RawNetworkReceiver(host: String, port: Int, storageLevel: StorageLevel)
+ extends NetworkReceiver[Any] {
var blockPushingThread: Thread = null
diff --git a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
index 8e4b20ea4c..d42027092b 100644
--- a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala
@@ -16,18 +16,17 @@ class SocketInputDStream[T: ClassManifest](
) extends NetworkInputDStream[T](ssc_) {
def createReceiver(): NetworkReceiver[T] = {
- new SocketReceiver(id, host, port, bytesToObjects, storageLevel)
+ new SocketReceiver(host, port, bytesToObjects, storageLevel)
}
}
private[streaming]
class SocketReceiver[T: ClassManifest](
- streamId: Int,
host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel
- ) extends NetworkReceiver[T](streamId) {
+ ) extends NetworkReceiver[T] {
lazy protected val blockGenerator = new BlockGenerator(storageLevel)
diff --git a/streaming/src/main/scala/spark/streaming/util/RawTextHelper.scala b/streaming/src/main/scala/spark/streaming/util/RawTextHelper.scala
index f31ae39a16..03749d4a94 100644
--- a/streaming/src/main/scala/spark/streaming/util/RawTextHelper.scala
+++ b/streaming/src/main/scala/spark/streaming/util/RawTextHelper.scala
@@ -81,7 +81,7 @@ object RawTextHelper {
* before real workload starts.
*/
def warmUp(sc: SparkContext) {
- for(i <- 0 to 4) {
+ for(i <- 0 to 1) {
sc.parallelize(1 to 200000, 1000)
.map(_ % 1331).map(_.toString)
.mapPartitions(splitAndCountPartitions).reduceByKey(_ + _, 10)
diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
index fa117cfcf0..f9ba1f20f0 100644
--- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
@@ -224,7 +224,9 @@ class WindowOperationsSuite extends TestSuiteBase {
val windowDuration = Seconds(2)
val slideDuration = Seconds(1)
val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
- val operation = (s: DStream[Int]) => s.countByWindow(windowDuration, slideDuration)
+ val operation = (s: DStream[Int]) => {
+ s.countByWindow(windowDuration, slideDuration).map(_.toInt)
+ }
testOperation(input, operation, expectedOutput, numBatches, true)
}