From d182a57cae6455804773db23d9498d2dcdd02172 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 14 Jan 2013 10:03:55 -0800 Subject: Two changes: - Updating countByX() types based on bug fix - Porting new documentation to Java --- .../spark/streaming/api/java/JavaDStreamLike.scala | 10 +- .../spark/streaming/api/java/JavaPairDStream.scala | 264 +++++++++++++++++++++ 2 files changed, 269 insertions(+), 5 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala index 80d8865725..4257ecd583 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala @@ -1,7 +1,7 @@ package spark.streaming.api.java import java.util.{List => JList} -import java.lang.{Integer => JInt} +import java.lang.{Long => JLong} import scala.collection.JavaConversions._ @@ -17,8 +17,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable def dstream: DStream[T] - implicit def scalaIntToJavaInteger(in: DStream[Int]): JavaDStream[JInt] = { - in.map(new JInt(_)) + implicit def scalaIntToJavaLong(in: DStream[Long]): JavaDStream[JLong] = { + in.map(new JLong(_)) } /** @@ -31,14 +31,14 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable * Returns a new DStream in which each RDD has a single element generated by counting each RDD * of this DStream. */ - def count(): JavaDStream[JInt] = dstream.count() + def count(): JavaDStream[JLong] = dstream.count() /** * Returns a new DStream in which each RDD has a single element generated by counting the number * of elements in a window over this DStream. windowDuration and slideDuration are as defined in the * window() operation. This is equivalent to window(windowDuration, slideDuration).count() */ - def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[JInt] = { + def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[JLong] = { dstream.countByWindow(windowDuration, slideDuration) } diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala index eeb1f07939..c761fdd3bd 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala @@ -85,21 +85,66 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( // Methods only for PairDStream's // ======================================================================= + /** + * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream. + * Therefore, the values for each key in `this` DStream's RDDs are grouped into a + * single sequence to generate the RDDs of the new DStream. Hash partitioning is + * used to generate the RDDs with Spark's default number of partitions. + */ def groupByKey(): JavaPairDStream[K, JList[V]] = dstream.groupByKey().mapValues(seqAsJavaList _) + /** + * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream. + * Therefore, the values for each key in `this` DStream's RDDs are grouped into a + * single sequence to generate the RDDs of the new DStream. Hash partitioning is + * used to generate the RDDs with `numPartitions` partitions. + */ def groupByKey(numPartitions: Int): JavaPairDStream[K, JList[V]] = dstream.groupByKey(numPartitions).mapValues(seqAsJavaList _) + /** + * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream. + * Therefore, the values for each key in `this` DStream's RDDs are grouped into a + * single sequence to generate the RDDs of the new DStream. [[spark.Partitioner]] + * is used to control the partitioning of each RDD. + */ def groupByKey(partitioner: Partitioner): JavaPairDStream[K, JList[V]] = dstream.groupByKey(partitioner).mapValues(seqAsJavaList _) + /** + * Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream. + * Therefore, the values for each key in `this` DStream's RDDs is merged using the + * associative reduce function to generate the RDDs of the new DStream. + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + */ def reduceByKey(func: JFunction2[V, V, V]): JavaPairDStream[K, V] = dstream.reduceByKey(func) + /** + * Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream. + * Therefore, the values for each key in `this` DStream's RDDs is merged using the + * associative reduce function to generate the RDDs of the new DStream. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + */ def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairDStream[K, V] = dstream.reduceByKey(func, numPartitions) + /** + * Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream. + * Therefore, the values for each key in `this` DStream's RDDs is merged using the + * associative reduce function to generate the RDDs of the new DStream. + * [[spark.Partitioner]] is used to control the partitioning of each RDD. + */ + def reduceByKey(func: JFunction2[V, V, V], partitioner: Partitioner): JavaPairDStream[K, V] = { + dstream.reduceByKey(func, partitioner) + } + + /** + * Generic function to combine elements of each key in DStream's RDDs using custom function. + * This is similar to the combineByKey for RDDs. Please refer to combineByKey in + * [[spark.PairRDDFunctions]] for more information. + */ def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], mergeCombiners: JFunction2[C, C, C], @@ -110,25 +155,78 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner) } + /** + * Creates a new DStream by counting the number of values of each key in each RDD + * of `this` DStream. Hash partitioning is used to generate the RDDs. + */ def countByKey(numPartitions: Int): JavaPairDStream[K, JLong] = { JavaPairDStream.scalaToJavaLong(dstream.countByKey(numPartitions)); } + + /** + * Creates a new DStream by counting the number of values of each key in each RDD + * of `this` DStream. Hash partitioning is used to generate the RDDs with Spark's + * `numPartitions` partitions. + */ def countByKey(): JavaPairDStream[K, JLong] = { JavaPairDStream.scalaToJavaLong(dstream.countByKey()); } + /** + * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream. + * This is similar to `DStream.groupByKey()` but applies it over a sliding window. + * The new DStream generates RDDs with the same interval as this DStream. + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + */ + def groupByKeyAndWindow(windowDuration: Duration): JavaPairDStream[K, JList[V]] = { + dstream.groupByKeyAndWindow(windowDuration).mapValues(seqAsJavaList _) + } + + /** + * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream. + * This is similar to `DStream.groupByKey()` but applies it over a sliding window. + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration) : JavaPairDStream[K, JList[V]] = { dstream.groupByKeyAndWindow(windowDuration, slideDuration).mapValues(seqAsJavaList _) } + /** + * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream. + * This is similar to `DStream.groupByKey()` but applies it over a sliding window. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions Number of partitions of each RDD in the new DStream. + */ def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int) :JavaPairDStream[K, JList[V]] = { dstream.groupByKeyAndWindow(windowDuration, slideDuration, numPartitions) .mapValues(seqAsJavaList _) } + /** + * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream. + * This is similar to `DStream.groupByKey()` but applies it over a sliding window. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + */ def groupByKeyAndWindow( windowDuration: Duration, slideDuration: Duration, @@ -138,11 +236,31 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( .mapValues(seqAsJavaList _) } + /** + * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream. + * This is similar to `DStream.reduceByKey()` but applies it over a sliding window. + * The new DStream generates RDDs with the same interval as this DStream. + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + */ def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowDuration: Duration) :JavaPairDStream[K, V] = { dstream.reduceByKeyAndWindow(reduceFunc, windowDuration) } + /** + * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream. + * This is similar to `DStream.reduceByKey()` but applies it over a sliding window. + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ def reduceByKeyAndWindow( reduceFunc: Function2[V, V, V], windowDuration: Duration, @@ -151,6 +269,18 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration) } + /** + * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream. + * This is similar to `DStream.reduceByKey()` but applies it over a sliding window. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions Number of partitions of each RDD in the new DStream. + */ def reduceByKeyAndWindow( reduceFunc: Function2[V, V, V], windowDuration: Duration, @@ -160,6 +290,17 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, numPartitions) } + /** + * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream. + * This is similar to `DStream.reduceByKey()` but applies it over a sliding window. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + */ def reduceByKeyAndWindow( reduceFunc: Function2[V, V, V], windowDuration: Duration, @@ -169,6 +310,24 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, partitioner) } + + /** + * Creates a new DStream by reducing over a window in a smarter way. + * The reduced value of over a new window is calculated incrementally by using the + * old window's reduce value : + * 1. reduce the new values that entered the window (e.g., adding new counts) + * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. + * However, it is applicable to only "invertible reduce functions". + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * @param reduceFunc associative reduce function + * @param invReduceFunc inverse function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ def reduceByKeyAndWindow( reduceFunc: Function2[V, V, V], invReduceFunc: Function2[V, V, V], @@ -178,6 +337,24 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration) } + /** + * Creates a new DStream by reducing over a window in a smarter way. + * The reduced value of over a new window is calculated incrementally by using the + * old window's reduce value : + * 1. reduce the new values that entered the window (e.g., adding new counts) + * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. + * However, it is applicable to only "invertible reduce functions". + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * @param reduceFunc associative reduce function + * @param invReduceFunc inverse function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions Number of partitions of each RDD in the new DStream. + */ def reduceByKeyAndWindow( reduceFunc: Function2[V, V, V], invReduceFunc: Function2[V, V, V], @@ -193,6 +370,23 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( numPartitions) } + /** + * Creates a new DStream by reducing over a window in a smarter way. + * The reduced value of over a new window is calculated incrementally by using the + * old window's reduce value : + * 1. reduce the new values that entered the window (e.g., adding new counts) + * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. + * However, it is applicable to only "invertible reduce functions". + * @param reduceFunc associative reduce function + * @param invReduceFunc inverse function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + */ def reduceByKeyAndWindow( reduceFunc: Function2[V, V, V], invReduceFunc: Function2[V, V, V], @@ -208,16 +402,38 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( partitioner) } + /** + * Creates a new DStream by counting the number of values for each key over a window. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration) : JavaPairDStream[K, JLong] = { JavaPairDStream.scalaToJavaLong(dstream.countByKeyAndWindow(windowDuration, slideDuration)) } + /** + * Creates a new DStream by counting the number of values for each key over a window. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions Number of partitions of each RDD in the new DStream. + */ def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int) : JavaPairDStream[K, Long] = { dstream.countByKeyAndWindow(windowDuration, slideDuration, numPartitions) } + + // TODO: Update State + def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = { implicit val cm: ClassManifest[U] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] @@ -232,12 +448,26 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( dstream.flatMapValues(fn) } + /** + * Cogroups `this` DStream with `other` DStream. Each RDD of the new DStream will + * be generated by cogrouping RDDs from`this`and `other` DStreams. Therefore, for + * each key k in corresponding RDDs of `this` or `other` DStreams, the generated RDD + * will contains a tuple with the list of values for that key in both RDDs. + * HashPartitioner is used to partition each generated RDD into default number of partitions. + */ def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = { implicit val cm: ClassManifest[W] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] dstream.cogroup(other.dstream).mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) } + /** + * Cogroups `this` DStream with `other` DStream. Each RDD of the new DStream will + * be generated by cogrouping RDDs from`this`and `other` DStreams. Therefore, for + * each key k in corresponding RDDs of `this` or `other` DStreams, the generated RDD + * will contains a tuple with the list of values for that key in both RDDs. + * Partitioner is used to partition each generated RDD. + */ def cogroup[W](other: JavaPairDStream[K, W], partitioner: Partitioner) : JavaPairDStream[K, (JList[V], JList[W])] = { implicit val cm: ClassManifest[W] = @@ -246,12 +476,22 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) } + /** + * Joins `this` DStream with `other` DStream. Each RDD of the new DStream will + * be generated by joining RDDs from `this` and `other` DStreams. HashPartitioner is used + * to partition each generated RDD into default number of partitions. + */ def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = { implicit val cm: ClassManifest[W] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] dstream.join(other.dstream) } + /** + * Joins `this` DStream with `other` DStream, that is, each RDD of the new DStream will + * be generated by joining RDDs from `this` and other DStream. Uses the given + * Partitioner to partition each generated RDD. + */ def join[W](other: JavaPairDStream[K, W], partitioner: Partitioner) : JavaPairDStream[K, (V, W)] = { implicit val cm: ClassManifest[W] = @@ -259,10 +499,18 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( dstream.join(other.dstream, partitioner) } + /** + * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated + * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" + */ def saveAsHadoopFiles[F <: OutputFormat[K, V]](prefix: String, suffix: String) { dstream.saveAsHadoopFiles(prefix, suffix) } + /** + * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated + * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" + */ def saveAsHadoopFiles( prefix: String, suffix: String, @@ -272,6 +520,10 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( dstream.saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass) } + /** + * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated + * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" + */ def saveAsHadoopFiles( prefix: String, suffix: String, @@ -282,10 +534,18 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( dstream.saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf) } + /** + * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated + * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](prefix: String, suffix: String) { dstream.saveAsNewAPIHadoopFiles(prefix, suffix) } + /** + * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated + * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ def saveAsNewAPIHadoopFiles( prefix: String, suffix: String, @@ -295,6 +555,10 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass) } + /** + * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated + * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ def saveAsNewAPIHadoopFiles( prefix: String, suffix: String, -- cgit v1.2.3