diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-01-13 21:28:12 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-01-13 21:28:12 -0800 |
commit | 82b0cc90cad4eb0e85a7ff4e14a90afec258a9a1 (patch) | |
tree | 7a267a044de6c068917794d225b3d9412ee2321f /streaming/src | |
parent | 23b54f62b7c4b703a32e96f9436b03f87e7f6710 (diff) | |
parent | f90f794cde479f4de425e9be0158a136a57666a2 (diff) | |
download | spark-82b0cc90cad4eb0e85a7ff4e14a90afec258a9a1.tar.gz spark-82b0cc90cad4eb0e85a7ff4e14a90afec258a9a1.tar.bz2 spark-82b0cc90cad4eb0e85a7ff4e14a90afec258a9a1.zip |
Merge pull request #370 from tdas/streaming
Added more documentation and minor change in API for NetworkReceiver
Diffstat (limited to 'streaming/src')
10 files changed, 375 insertions, 59 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index c89fb7723e..fbe3cebd6d 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -471,7 +471,7 @@ abstract class DStream[T: ClassManifest] ( * Returns a new DStream in which each RDD has a single element generated by counting each RDD * of this DStream. */ - def count(): DStream[Int] = this.map(_ => 1).reduce(_ + _) + def count(): DStream[Long] = this.map(_ => 1L).reduce(_ + _) /** * Applies a function to each RDD in this DStream. This is an output operator, so @@ -529,17 +529,16 @@ abstract class DStream[T: ClassManifest] ( * Return a new DStream which is computed based on windowed batches of this DStream. * The new DStream generates RDDs with the same interval as this DStream. * @param windowDuration width of the window; must be a multiple of this DStream's interval. - * @return */ def window(windowDuration: Duration): DStream[T] = window(windowDuration, this.slideDuration) /** * Return a new DStream which is computed based on windowed batches of this DStream. - * @param windowDuration duration (i.e., width) of the window; - * must be a multiple of this DStream's interval + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which - * the new DStream will generate RDDs); must be a multiple of this - * DStream's interval + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval */ def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = { new WindowedDStream(this, windowDuration, slideDuration) @@ -548,16 +547,22 @@ abstract class DStream[T: ClassManifest] ( /** * Returns a new DStream which computed based on tumbling window on this DStream. * This is equivalent to window(batchTime, batchTime). - * @param batchTime tumbling window duration; must be a multiple of this DStream's interval + * @param batchDuration tumbling window duration; must be a multiple of this DStream's + * batching interval */ - def tumble(batchTime: Duration): DStream[T] = window(batchTime, batchTime) + def tumble(batchDuration: Duration): DStream[T] = window(batchDuration, batchDuration) /** * Returns a new DStream in which each RDD has a single element generated by reducing all - * elements in a window over this DStream. windowDuration and slideDuration are as defined in the - * window() operation. This is equivalent to window(windowDuration, slideDuration).reduce(reduceFunc) + * elements in a window over this DStream. windowDuration and slideDuration are as defined + * in the window() operation. This is equivalent to + * window(windowDuration, slideDuration).reduce(reduceFunc) */ - def reduceByWindow(reduceFunc: (T, T) => T, windowDuration: Duration, slideDuration: Duration): DStream[T] = { + def reduceByWindow( + reduceFunc: (T, T) => T, + windowDuration: Duration, + slideDuration: Duration + ): DStream[T] = { this.window(windowDuration, slideDuration).reduce(reduceFunc) } @@ -577,8 +582,8 @@ abstract class DStream[T: ClassManifest] ( * of elements in a window over this DStream. windowDuration and slideDuration are as defined in the * window() operation. This is equivalent to window(windowDuration, slideDuration).count() */ - def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Int] = { - this.map(_ => 1).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration) + def countByWindow(windowDuration: Duration, slideDuration: Duration): DStream[Long] = { + this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration) } /** @@ -612,6 +617,8 @@ abstract class DStream[T: ClassManifest] ( /** * Saves each RDD in this DStream as a Sequence file of serialized objects. + * The file name at each batch interval is generated based on `prefix` and + * `suffix`: "prefix-TIME_IN_MS.suffix". */ def saveAsObjectFiles(prefix: String, suffix: String = "") { val saveFunc = (rdd: RDD[T], time: Time) => { @@ -622,7 +629,9 @@ abstract class DStream[T: ClassManifest] ( } /** - * Saves each RDD in this DStream as at text file, using string representation of elements. + * Saves each RDD in this DStream as at text file, using string representation + * of elements. The file name at each batch interval is generated based on + * `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". */ def saveAsTextFiles(prefix: String, suffix: String = "") { val saveFunc = (rdd: RDD[T], time: Time) => { diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala index a6ab44271f..e4152f3a61 100644 --- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala +++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala @@ -18,7 +18,10 @@ private[streaming] case class RegisterReceiver(streamId: Int, receiverActor: Act private[streaming] case class AddBlocks(streamId: Int, blockIds: Seq[String], metadata: Any) extends NetworkInputTrackerMessage private[streaming] case class DeregisterReceiver(streamId: Int, msg: String) extends NetworkInputTrackerMessage - +/** + * This class manages the execution of the receivers of NetworkInputDStreams. + */ +private[streaming] class NetworkInputTracker( @transient ssc: StreamingContext, @transient networkInputStreams: Array[NetworkInputDStream[_]]) @@ -32,16 +35,20 @@ class NetworkInputTracker( var currentTime: Time = null + /** Start the actor and receiver execution thread. */ def start() { ssc.env.actorSystem.actorOf(Props(new NetworkInputTrackerActor), "NetworkInputTracker") receiverExecutor.start() } + /** Stop the receiver execution thread. */ def stop() { + // TODO: stop the actor as well receiverExecutor.interrupt() receiverExecutor.stopReceivers() } + /** Return all the blocks received from a receiver. */ def getBlockIds(receiverId: Int, time: Time): Array[String] = synchronized { val queue = receivedBlockIds.synchronized { receivedBlockIds.getOrElse(receiverId, new Queue[String]()) @@ -53,6 +60,7 @@ class NetworkInputTracker( result.toArray } + /** Actor to receive messages from the receivers. */ private class NetworkInputTrackerActor extends Actor { def receive = { case RegisterReceiver(streamId, receiverActor) => { @@ -83,7 +91,8 @@ class NetworkInputTracker( } } } - + + /** This thread class runs all the receivers on the cluster. */ class ReceiverExecutor extends Thread { val env = ssc.env @@ -97,13 +106,22 @@ class NetworkInputTracker( stopReceivers() } } - + + /** + * Get the receivers from the NetworkInputDStreams, distributes them to the + * worker nodes as a parallel collection, and runs them. + */ def startReceivers() { - val receivers = networkInputStreams.map(_.createReceiver()) + val receivers = networkInputStreams.map(nis => { + val rcvr = nis.createReceiver() + rcvr.setStreamId(nis.id) + rcvr + }) // Right now, we only honor preferences if all receivers have them val hasLocationPreferences = receivers.map(_.getLocationPreference().isDefined).reduce(_ && _) + // Create the parallel collection of receivers to distributed them on the worker nodes val tempRDD = if (hasLocationPreferences) { val receiversWithPreferences = receivers.map(r => (r, Seq(r.getLocationPreference().toString))) @@ -113,21 +131,21 @@ class NetworkInputTracker( ssc.sc.makeRDD(receivers, receivers.size) } + // Function to start the receiver on the worker node val startReceiver = (iterator: Iterator[NetworkReceiver[_]]) => { if (!iterator.hasNext) { throw new Exception("Could not start receiver as details not found.") } iterator.next().start() } + // Distribute the receivers and start them ssc.sc.runJob(tempRDD, startReceiver) } + /** Stops the receivers. */ def stopReceivers() { - //implicit val ec = env.actorSystem.dispatcher + // Signal the receivers to stop receiverInfo.values.foreach(_ ! StopReceiver) - //val listOfFutures = receiverInfo.values.map(_.ask(StopReceiver)(timeout)).toList - //val futureOfList = Future.sequence(listOfFutures) - //Await.result(futureOfList, timeout) } } } diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala index 482d01300d..3952457339 100644 --- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala @@ -25,34 +25,76 @@ extends Serializable { new HashPartitioner(numPartitions) } + /** + * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream. + * Therefore, the values for each key in `this` DStream's RDDs are grouped into a + * single sequence to generate the RDDs of the new DStream. Hash partitioning is + * used to generate the RDDs with Spark's default number of partitions. + */ def groupByKey(): DStream[(K, Seq[V])] = { groupByKey(defaultPartitioner()) } + /** + * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream. + * Therefore, the values for each key in `this` DStream's RDDs are grouped into a + * single sequence to generate the RDDs of the new DStream. Hash partitioning is + * used to generate the RDDs with `numPartitions` partitions. + */ def groupByKey(numPartitions: Int): DStream[(K, Seq[V])] = { groupByKey(defaultPartitioner(numPartitions)) } + /** + * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream. + * Therefore, the values for each key in `this` DStream's RDDs are grouped into a + * single sequence to generate the RDDs of the new DStream. [[spark.Partitioner]] + * is used to control the partitioning of each RDD. + */ def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = { val createCombiner = (v: V) => ArrayBuffer[V](v) val mergeValue = (c: ArrayBuffer[V], v: V) => (c += v) val mergeCombiner = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => (c1 ++ c2) - combineByKey(createCombiner, mergeValue, mergeCombiner, partitioner).asInstanceOf[DStream[(K, Seq[V])]] + combineByKey(createCombiner, mergeValue, mergeCombiner, partitioner) + .asInstanceOf[DStream[(K, Seq[V])]] } + /** + * Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream. + * Therefore, the values for each key in `this` DStream's RDDs is merged using the + * associative reduce function to generate the RDDs of the new DStream. + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + */ def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = { reduceByKey(reduceFunc, defaultPartitioner()) } + /** + * Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream. + * Therefore, the values for each key in `this` DStream's RDDs is merged using the + * associative reduce function to generate the RDDs of the new DStream. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + */ def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)] = { reduceByKey(reduceFunc, defaultPartitioner(numPartitions)) } + /** + * Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream. + * Therefore, the values for each key in `this` DStream's RDDs is merged using the + * associative reduce function to generate the RDDs of the new DStream. + * [[spark.Partitioner]] is used to control the partitioning of each RDD. + */ def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = { val cleanedReduceFunc = ssc.sc.clean(reduceFunc) combineByKey((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, partitioner) } + /** + * Generic function to combine elements of each key in DStream's RDDs using custom function. + * This is similar to the combineByKey for RDDs. Please refer to combineByKey in + * [[spark.PairRDDFunctions]] for more information. + */ def combineByKey[C: ClassManifest]( createCombiner: V => C, mergeValue: (C, V) => C, @@ -61,14 +103,52 @@ extends Serializable { new ShuffledDStream[K, V, C](self, createCombiner, mergeValue, mergeCombiner, partitioner) } + /** + * Creates a new DStream by counting the number of values of each key in each RDD + * of `this` DStream. Hash partitioning is used to generate the RDDs with Spark's + * `numPartitions` partitions. + */ def countByKey(numPartitions: Int = self.ssc.sc.defaultParallelism): DStream[(K, Long)] = { self.map(x => (x._1, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions) } + /** + * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream. + * This is similar to `DStream.groupByKey()` but applies it over a sliding window. + * The new DStream generates RDDs with the same interval as this DStream. + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + */ + def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Seq[V])] = { + groupByKeyAndWindow(windowDuration, self.slideDuration, defaultPartitioner()) + } + + /** + * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream. + * This is similar to `DStream.groupByKey()` but applies it over a sliding window. + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Seq[V])] = { groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner()) } + /** + * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream. + * This is similar to `DStream.groupByKey()` but applies it over a sliding window. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions Number of partitions of each RDD in the new DStream. + */ def groupByKeyAndWindow( windowDuration: Duration, slideDuration: Duration, @@ -77,6 +157,16 @@ extends Serializable { groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner(numPartitions)) } + /** + * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream. + * This is similar to `DStream.groupByKey()` but applies it over a sliding window. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + */ def groupByKeyAndWindow( windowDuration: Duration, slideDuration: Duration, @@ -85,6 +175,15 @@ extends Serializable { self.window(windowDuration, slideDuration).groupByKey(partitioner) } + /** + * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream. + * This is similar to `DStream.reduceByKey()` but applies it over a sliding window. + * The new DStream generates RDDs with the same interval as this DStream. + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + */ def reduceByKeyAndWindow( reduceFunc: (V, V) => V, windowDuration: Duration @@ -92,6 +191,17 @@ extends Serializable { reduceByKeyAndWindow(reduceFunc, windowDuration, self.slideDuration, defaultPartitioner()) } + /** + * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream. + * This is similar to `DStream.reduceByKey()` but applies it over a sliding window. + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ def reduceByKeyAndWindow( reduceFunc: (V, V) => V, windowDuration: Duration, @@ -100,6 +210,18 @@ extends Serializable { reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner()) } + /** + * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream. + * This is similar to `DStream.reduceByKey()` but applies it over a sliding window. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions Number of partitions of each RDD in the new DStream. + */ def reduceByKeyAndWindow( reduceFunc: (V, V) => V, windowDuration: Duration, @@ -109,6 +231,17 @@ extends Serializable { reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions)) } + /** + * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream. + * This is similar to `DStream.reduceByKey()` but applies it over a sliding window. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + */ def reduceByKeyAndWindow( reduceFunc: (V, V) => V, windowDuration: Duration, @@ -121,12 +254,23 @@ extends Serializable { .reduceByKey(cleanedReduceFunc, partitioner) } - // This method is the efficient sliding window reduce operation, - // which requires the specification of an inverse reduce function, - // so that new elements introduced in the window can be "added" using - // reduceFunc to the previous window's result and old elements can be - // "subtracted using invReduceFunc. - + /** + * Creates a new DStream by reducing over a window in a smarter way. + * The reduced value of over a new window is calculated incrementally by using the + * old window's reduce value : + * 1. reduce the new values that entered the window (e.g., adding new counts) + * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. + * However, it is applicable to only "invertible reduce functions". + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * @param reduceFunc associative reduce function + * @param invReduceFunc inverse function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ def reduceByKeyAndWindow( reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V, @@ -138,6 +282,24 @@ extends Serializable { reduceFunc, invReduceFunc, windowDuration, slideDuration, defaultPartitioner()) } + /** + * Creates a new DStream by reducing over a window in a smarter way. + * The reduced value of over a new window is calculated incrementally by using the + * old window's reduce value : + * 1. reduce the new values that entered the window (e.g., adding new counts) + * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. + * However, it is applicable to only "invertible reduce functions". + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * @param reduceFunc associative reduce function + * @param invReduceFunc inverse function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions Number of partitions of each RDD in the new DStream. + */ def reduceByKeyAndWindow( reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V, @@ -150,6 +312,23 @@ extends Serializable { reduceFunc, invReduceFunc, windowDuration, slideDuration, defaultPartitioner(numPartitions)) } + /** + * Creates a new DStream by reducing over a window in a smarter way. + * The reduced value of over a new window is calculated incrementally by using the + * old window's reduce value : + * 1. reduce the new values that entered the window (e.g., adding new counts) + * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. + * However, it is applicable to only "invertible reduce functions". + * @param reduceFunc associative reduce function + * @param invReduceFunc inverse function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + */ def reduceByKeyAndWindow( reduceFunc: (V, V) => V, invReduceFunc: (V, V) => V, @@ -164,6 +343,16 @@ extends Serializable { self, cleanedReduceFunc, cleanedInvReduceFunc, windowDuration, slideDuration, partitioner) } + /** + * Creates a new DStream by counting the number of values for each key over a window. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions Number of partitions of each RDD in the new DStream. + */ def countByKeyAndWindow( windowDuration: Duration, slideDuration: Duration, @@ -179,17 +368,30 @@ extends Serializable { ) } - // TODO: - // - // - // - // + /** + * Creates a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of the key from + * `this` DStream. Hash partitioning is used to generate the RDDs with Spark's default + * number of partitions. + * @param updateFunc State update function. If `this` function returns None, then + * corresponding state key-value pair will be eliminated. + * @tparam S State type + */ def updateStateByKey[S <: AnyRef : ClassManifest]( updateFunc: (Seq[V], Option[S]) => Option[S] ): DStream[(K, S)] = { updateStateByKey(updateFunc, defaultPartitioner()) } + /** + * Creates a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of the key from + * `this` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * @param updateFunc State update function. If `this` function returns None, then + * corresponding state key-value pair will be eliminated. + * @param numPartitions Number of partitions of each RDD in the new DStream. + * @tparam S State type + */ def updateStateByKey[S <: AnyRef : ClassManifest]( updateFunc: (Seq[V], Option[S]) => Option[S], numPartitions: Int @@ -197,6 +399,15 @@ extends Serializable { updateStateByKey(updateFunc, defaultPartitioner(numPartitions)) } + /** + * Creates a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of the key from + * `this` DStream. [[spark.Partitioner]] is used to control the partitioning of each RDD. + * @param updateFunc State update function. If `this` function returns None, then + * corresponding state key-value pair will be eliminated. + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + * @tparam S State type + */ def updateStateByKey[S <: AnyRef : ClassManifest]( updateFunc: (Seq[V], Option[S]) => Option[S], partitioner: Partitioner @@ -207,6 +418,19 @@ extends Serializable { updateStateByKey(newUpdateFunc, partitioner, true) } + /** + * Creates a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of the key from + * `this` DStream. [[spark.Partitioner]] is used to control the partitioning of each RDD. + * @param updateFunc State update function. If `this` function returns None, then + * corresponding state key-value pair will be eliminated. Note, that + * this function may generate a different a tuple with a different key + * than the input key. It is up to the developer to decide whether to + * remember the partitioner despite the key being changed. + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + * @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs. + * @tparam S State type + */ def updateStateByKey[S <: AnyRef : ClassManifest]( updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)], partitioner: Partitioner, @@ -226,10 +450,24 @@ extends Serializable { new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc) } + /** + * Cogroups `this` DStream with `other` DStream. Each RDD of the new DStream will + * be generated by cogrouping RDDs from`this`and `other` DStreams. Therefore, for + * each key k in corresponding RDDs of `this` or `other` DStreams, the generated RDD + * will contains a tuple with the list of values for that key in both RDDs. + * HashPartitioner is used to partition each generated RDD into default number of partitions. + */ def cogroup[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = { cogroup(other, defaultPartitioner()) } + /** + * Cogroups `this` DStream with `other` DStream. Each RDD of the new DStream will + * be generated by cogrouping RDDs from`this`and `other` DStreams. Therefore, for + * each key k in corresponding RDDs of `this` or `other` DStreams, the generated RDD + * will contains a tuple with the list of values for that key in both RDDs. + * Partitioner is used to partition each generated RDD. + */ def cogroup[W: ClassManifest]( other: DStream[(K, W)], partitioner: Partitioner @@ -249,11 +487,24 @@ extends Serializable { } } + /** + * Joins `this` DStream with `other` DStream. Each RDD of the new DStream will + * be generated by joining RDDs from `this` and `other` DStreams. HashPartitioner is used + * to partition each generated RDD into default number of partitions. + */ def join[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, W))] = { join[W](other, defaultPartitioner()) } - def join[W: ClassManifest](other: DStream[(K, W)], partitioner: Partitioner): DStream[(K, (V, W))] = { + /** + * Joins `this` DStream with `other` DStream, that is, each RDD of the new DStream will + * be generated by joining RDDs from `this` and other DStream. Uses the given + * Partitioner to partition each generated RDD. + */ + def join[W: ClassManifest]( + other: DStream[(K, W)], + partitioner: Partitioner + ): DStream[(K, (V, W))] = { this.cogroup(other, partitioner) .flatMapValues{ case (vs, ws) => @@ -261,6 +512,10 @@ extends Serializable { } } + /** + * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated + * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" + */ def saveAsHadoopFiles[F <: OutputFormat[K, V]]( prefix: String, suffix: String @@ -268,6 +523,10 @@ extends Serializable { saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]]) } + /** + * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated + * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" + */ def saveAsHadoopFiles( prefix: String, suffix: String, @@ -283,6 +542,10 @@ extends Serializable { self.foreach(saveFunc) } + /** + * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated + * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]]( prefix: String, suffix: String @@ -290,6 +553,10 @@ extends Serializable { saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]]) } + /** + * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated + * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ def saveAsNewAPIHadoopFiles( prefix: String, suffix: String, diff --git a/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala index ca70e72e56..efc7058480 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/FlumeInputDStream.scala @@ -26,7 +26,7 @@ class FlumeInputDStream[T: ClassManifest]( ) extends NetworkInputDStream[SparkFlumeEvent](ssc_) { override def createReceiver(): NetworkReceiver[SparkFlumeEvent] = { - new FlumeReceiver(id, host, port, storageLevel) + new FlumeReceiver(host, port, storageLevel) } } @@ -112,11 +112,10 @@ class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol { * Flume Avro interface.*/ private[streaming] class FlumeReceiver( - streamId: Int, host: String, port: Int, storageLevel: StorageLevel - ) extends NetworkReceiver[SparkFlumeEvent](streamId) { + ) extends NetworkReceiver[SparkFlumeEvent] { lazy val blockGenerator = new BlockGenerator(storageLevel) diff --git a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala index 25988a2ce7..2b4740bdf7 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala @@ -96,15 +96,15 @@ class KafkaInputDStream[T: ClassManifest]( } */ def createReceiver(): NetworkReceiver[T] = { - new KafkaReceiver(id, host, port, groupId, topics, initialOffsets, storageLevel) + new KafkaReceiver(host, port, groupId, topics, initialOffsets, storageLevel) .asInstanceOf[NetworkReceiver[T]] } } private[streaming] -class KafkaReceiver(streamId: Int, host: String, port: Int, groupId: String, +class KafkaReceiver(host: String, port: Int, groupId: String, topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long], - storageLevel: StorageLevel) extends NetworkReceiver[Any](streamId) { + storageLevel: StorageLevel) extends NetworkReceiver[Any] { // Timeout for establishing a connection to Zookeper in ms. val ZK_TIMEOUT = 10000 diff --git a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala index 18e62a0e33..aa6be95f30 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala @@ -17,6 +17,15 @@ import akka.util.duration._ import spark.streaming.util.{RecurringTimer, SystemClock} import java.util.concurrent.ArrayBlockingQueue +/** + * Abstract class for defining any InputDStream that has to start a receiver on worker + * nodes to receive external data. Specific implementations of NetworkInputDStream must + * define the createReceiver() function that creates the receiver object of type + * [[spark.streaming.dstream.NetworkReceiver]] that will be sent to the workers to receive + * data. + * @param ssc_ Streaming context that will execute this input stream + * @tparam T Class type of the object of this stream + */ abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : StreamingContext) extends InputDStream[T](ssc_) { @@ -25,7 +34,7 @@ abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : Streaming val id = ssc.getNewNetworkStreamId() /** - * This method creates the receiver object that will be sent to the workers + * Creates the receiver object that will be sent to the worker nodes * to receive data. This method needs to defined by any specific implementation * of a NetworkInputDStream. */ @@ -48,7 +57,11 @@ private[streaming] case class StopReceiver(msg: String) extends NetworkReceiverM private[streaming] case class ReportBlock(blockId: String, metadata: Any) extends NetworkReceiverMessage private[streaming] case class ReportError(msg: String) extends NetworkReceiverMessage -abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Serializable with Logging { +/** + * Abstract class of a receiver that can be run on worker nodes to receive external data. See + * [[spark.streaming.dstream.NetworkInputDStream]] for an explanation. + */ +abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Logging { initLogging() @@ -59,17 +72,22 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri lazy protected val receivingThread = Thread.currentThread() - /** This method will be called to start receiving data. */ + protected var streamId: Int = -1 + + /** + * This method will be called to start receiving data. All your receiver + * starting code should be implemented by defining this function. + */ protected def onStart() /** This method will be called to stop receiving data. */ protected def onStop() - /** This method conveys a placement preference (hostname) for this receiver. */ + /** Conveys a placement preference (hostname) for this receiver. */ def getLocationPreference() : Option[String] = None /** - * This method starts the receiver. First is accesses all the lazy members to + * Starts the receiver. First is accesses all the lazy members to * materialize them. Then it calls the user-defined onStart() method to start * other threads, etc required to receiver the data. */ @@ -92,7 +110,7 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri } /** - * This method stops the receiver. First it interrupts the main receiving thread, + * Stops the receiver. First it interrupts the main receiving thread, * that is, the thread that called receiver.start(). Then it calls the user-defined * onStop() method to stop other threads and/or do cleanup. */ @@ -103,7 +121,7 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri } /** - * This method stops the receiver and reports to exception to the tracker. + * Stops the receiver and reports to exception to the tracker. * This should be called whenever an exception has happened on any thread * of the receiver. */ @@ -115,7 +133,7 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri /** - * This method pushes a block (as iterator of values) into the block manager. + * Pushes a block (as iterator of values) into the block manager. */ def pushBlock(blockId: String, iterator: Iterator[T], metadata: Any, level: StorageLevel) { val buffer = new ArrayBuffer[T] ++ iterator @@ -125,7 +143,7 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri } /** - * This method pushes a block (as bytes) into the block manager. + * Pushes a block (as bytes) into the block manager. */ def pushBlock(blockId: String, bytes: ByteBuffer, metadata: Any, level: StorageLevel) { env.blockManager.putBytes(blockId, bytes, level) @@ -157,6 +175,10 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri } } + protected[streaming] def setStreamId(id: Int) { + streamId = id + } + /** * Batches objects created by a [[spark.streaming.NetworkReceiver]] and puts them into * appropriately named blocks at regular intervals. This class starts two threads, @@ -202,7 +224,7 @@ abstract class NetworkReceiver[T: ClassManifest](val streamId: Int) extends Seri val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[T] if (newBlockBuffer.size > 0) { - val blockId = "input-" + NetworkReceiver.this.streamId + "- " + (time - blockInterval) + val blockId = "input-" + NetworkReceiver.this.streamId + "-" + (time - blockInterval) val newBlock = createBlock(blockId, newBlockBuffer.toIterator) blocksForPushing.add(newBlock) } diff --git a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala index aa2f31cea8..290fab1ce0 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/RawInputDStream.scala @@ -26,13 +26,13 @@ class RawInputDStream[T: ClassManifest]( ) extends NetworkInputDStream[T](ssc_ ) with Logging { def createReceiver(): NetworkReceiver[T] = { - new RawNetworkReceiver(id, host, port, storageLevel).asInstanceOf[NetworkReceiver[T]] + new RawNetworkReceiver(host, port, storageLevel).asInstanceOf[NetworkReceiver[T]] } } private[streaming] -class RawNetworkReceiver(streamId: Int, host: String, port: Int, storageLevel: StorageLevel) - extends NetworkReceiver[Any](streamId) { +class RawNetworkReceiver(host: String, port: Int, storageLevel: StorageLevel) + extends NetworkReceiver[Any] { var blockPushingThread: Thread = null diff --git a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala index 8e4b20ea4c..d42027092b 100644 --- a/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/dstream/SocketInputDStream.scala @@ -16,18 +16,17 @@ class SocketInputDStream[T: ClassManifest]( ) extends NetworkInputDStream[T](ssc_) { def createReceiver(): NetworkReceiver[T] = { - new SocketReceiver(id, host, port, bytesToObjects, storageLevel) + new SocketReceiver(host, port, bytesToObjects, storageLevel) } } private[streaming] class SocketReceiver[T: ClassManifest]( - streamId: Int, host: String, port: Int, bytesToObjects: InputStream => Iterator[T], storageLevel: StorageLevel - ) extends NetworkReceiver[T](streamId) { + ) extends NetworkReceiver[T] { lazy protected val blockGenerator = new BlockGenerator(storageLevel) diff --git a/streaming/src/main/scala/spark/streaming/util/RawTextHelper.scala b/streaming/src/main/scala/spark/streaming/util/RawTextHelper.scala index f31ae39a16..03749d4a94 100644 --- a/streaming/src/main/scala/spark/streaming/util/RawTextHelper.scala +++ b/streaming/src/main/scala/spark/streaming/util/RawTextHelper.scala @@ -81,7 +81,7 @@ object RawTextHelper { * before real workload starts. */ def warmUp(sc: SparkContext) { - for(i <- 0 to 4) { + for(i <- 0 to 1) { sc.parallelize(1 to 200000, 1000) .map(_ % 1331).map(_.toString) .mapPartitions(splitAndCountPartitions).reduceByKey(_ + _, 10) diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala index fa117cfcf0..f9ba1f20f0 100644 --- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala @@ -224,7 +224,9 @@ class WindowOperationsSuite extends TestSuiteBase { val windowDuration = Seconds(2) val slideDuration = Seconds(1) val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt - val operation = (s: DStream[Int]) => s.countByWindow(windowDuration, slideDuration) + val operation = (s: DStream[Int]) => { + s.countByWindow(windowDuration, slideDuration).map(_.toInt) + } testOperation(input, operation, expectedOutput, numBatches, true) } |