diff options
Diffstat (limited to 'streaming/src')
8 files changed, 2370 insertions, 120 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index fbe3cebd6d..036763fe2f 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -98,10 +98,10 @@ abstract class DStream[T: ClassManifest] ( this } - /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ + /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER) - /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ + /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ def cache(): DStream[T] = persist() /** @@ -119,7 +119,7 @@ abstract class DStream[T: ClassManifest] ( } /** - * This method initializes the DStream by setting the "zero" time, based on which + * Initialize the DStream by setting the "zero" time, based on which * the validity of future times is calculated. This method also recursively initializes * its parent DStreams. */ @@ -244,7 +244,7 @@ abstract class DStream[T: ClassManifest] ( } /** - * Retrieves a precomputed RDD of this DStream, or computes the RDD. This is an internal + * Retrieve a precomputed RDD of this DStream, or computes the RDD. This is an internal * method that should not be called directly. */ protected[streaming] def getOrCompute(time: Time): Option[RDD[T]] = { @@ -283,7 +283,7 @@ abstract class DStream[T: ClassManifest] ( } /** - * Generates a SparkStreaming job for the given time. This is an internal method that + * Generate a SparkStreaming job for the given time. This is an internal method that * should not be called directly. This default implementation creates a job * that materializes the corresponding RDD. Subclasses of DStream may override this * (eg. ForEachDStream). @@ -302,7 +302,7 @@ abstract class DStream[T: ClassManifest] ( } /** - * Dereferences RDDs that are older than rememberDuration. + * Dereference RDDs that are older than rememberDuration. */ protected[streaming] def forgetOldRDDs(time: Time) { val keys = generatedRDDs.keys @@ -328,7 +328,7 @@ abstract class DStream[T: ClassManifest] ( } /** - * Refreshes the list of checkpointed RDDs that will be saved along with checkpoint of + * Refresh the list of checkpointed RDDs that will be saved along with checkpoint of * this stream. This is an internal method that should not be called directly. This is * a default implementation that saves only the file names of the checkpointed RDDs to * checkpointData. Subclasses of DStream (especially those of InputDStream) may override @@ -373,7 +373,7 @@ abstract class DStream[T: ClassManifest] ( } /** - * Restores the RDDs in generatedRDDs from the checkpointData. This is an internal method + * Restore the RDDs in generatedRDDs from the checkpointData. This is an internal method * that should not be called directly. This is a default implementation that recreates RDDs * from the checkpoint file names stored in checkpointData. Subclasses of DStream that * override the updateCheckpointData() method would also need to override this method. @@ -425,20 +425,20 @@ abstract class DStream[T: ClassManifest] ( // DStream operations // ======================================================================= - /** Returns a new DStream by applying a function to all elements of this DStream. */ + /** Return a new DStream by applying a function to all elements of this DStream. */ def map[U: ClassManifest](mapFunc: T => U): DStream[U] = { new MappedDStream(this, ssc.sc.clean(mapFunc)) } /** - * Returns a new DStream by applying a function to all elements of this DStream, + * Return a new DStream by applying a function to all elements of this DStream, * and then flattening the results */ def flatMap[U: ClassManifest](flatMapFunc: T => Traversable[U]): DStream[U] = { new FlatMappedDStream(this, ssc.sc.clean(flatMapFunc)) } - /** Returns a new DStream containing only the elements that satisfy a predicate. */ + /** Return a new DStream containing only the elements that satisfy a predicate. */ def filter(filterFunc: T => Boolean): DStream[T] = new FilteredDStream(this, filterFunc) /** @@ -461,20 +461,20 @@ abstract class DStream[T: ClassManifest] ( } /** - * Returns a new DStream in which each RDD has a single element generated by reducing each RDD + * Return a new DStream in which each RDD has a single element generated by reducing each RDD * of this DStream. */ def reduce(reduceFunc: (T, T) => T): DStream[T] = this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2) /** - * Returns a new DStream in which each RDD has a single element generated by counting each RDD + * Return a new DStream in which each RDD has a single element generated by counting each RDD * of this DStream. */ def count(): DStream[Long] = this.map(_ => 1L).reduce(_ + _) /** - * Applies a function to each RDD in this DStream. This is an output operator, so + * Apply a function to each RDD in this DStream. This is an output operator, so * this DStream will be registered as an output stream and therefore materialized. */ def foreach(foreachFunc: RDD[T] => Unit) { @@ -482,7 +482,7 @@ abstract class DStream[T: ClassManifest] ( } /** - * Applies a function to each RDD in this DStream. This is an output operator, so + * Apply a function to each RDD in this DStream. This is an output operator, so * this DStream will be registered as an output stream and therefore materialized. */ def foreach(foreachFunc: (RDD[T], Time) => Unit) { @@ -492,7 +492,7 @@ abstract class DStream[T: ClassManifest] ( } /** - * Returns a new DStream in which each RDD is generated by applying a function + * Return a new DStream in which each RDD is generated by applying a function * on each RDD of this DStream. */ def transform[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = { @@ -500,7 +500,7 @@ abstract class DStream[T: ClassManifest] ( } /** - * Returns a new DStream in which each RDD is generated by applying a function + * Return a new DStream in which each RDD is generated by applying a function * on each RDD of this DStream. */ def transform[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = { @@ -508,7 +508,7 @@ abstract class DStream[T: ClassManifest] ( } /** - * Prints the first ten elements of each RDD generated in this DStream. This is an output + * Print the first ten elements of each RDD generated in this DStream. This is an output * operator, so this DStream will be registered as an output stream and there materialized. */ def print() { @@ -545,7 +545,7 @@ abstract class DStream[T: ClassManifest] ( } /** - * Returns a new DStream which computed based on tumbling window on this DStream. + * Return a new DStream which computed based on tumbling window on this DStream. * This is equivalent to window(batchTime, batchTime). * @param batchDuration tumbling window duration; must be a multiple of this DStream's * batching interval @@ -553,7 +553,7 @@ abstract class DStream[T: ClassManifest] ( def tumble(batchDuration: Duration): DStream[T] = window(batchDuration, batchDuration) /** - * Returns a new DStream in which each RDD has a single element generated by reducing all + * Return a new DStream in which each RDD has a single element generated by reducing all * elements in a window over this DStream. windowDuration and slideDuration are as defined * in the window() operation. This is equivalent to * window(windowDuration, slideDuration).reduce(reduceFunc) @@ -578,7 +578,7 @@ abstract class DStream[T: ClassManifest] ( } /** - * Returns a new DStream in which each RDD has a single element generated by counting the number + * Return a new DStream in which each RDD has a single element generated by counting the number * of elements in a window over this DStream. windowDuration and slideDuration are as defined in the * window() operation. This is equivalent to window(windowDuration, slideDuration).count() */ @@ -587,20 +587,20 @@ abstract class DStream[T: ClassManifest] ( } /** - * Returns a new DStream by unifying data of another DStream with this DStream. + * Return a new DStream by unifying data of another DStream with this DStream. * @param that Another DStream having the same slideDuration as this DStream. */ def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that)) /** - * Returns all the RDDs defined by the Interval object (both end times included) + * Return all the RDDs defined by the Interval object (both end times included) */ protected[streaming] def slice(interval: Interval): Seq[RDD[T]] = { slice(interval.beginTime, interval.endTime) } /** - * Returns all the RDDs between 'fromTime' to 'toTime' (both included) + * Return all the RDDs between 'fromTime' to 'toTime' (both included) */ def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = { val rdds = new ArrayBuffer[RDD[T]]() @@ -616,7 +616,7 @@ abstract class DStream[T: ClassManifest] ( } /** - * Saves each RDD in this DStream as a Sequence file of serialized objects. + * Save each RDD in this DStream as a Sequence file of serialized objects. * The file name at each batch interval is generated based on `prefix` and * `suffix`: "prefix-TIME_IN_MS.suffix". */ @@ -629,7 +629,7 @@ abstract class DStream[T: ClassManifest] ( } /** - * Saves each RDD in this DStream as at text file, using string representation + * Save each RDD in this DStream as at text file, using string representation * of elements. The file name at each batch interval is generated based on * `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". */ diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala index 3dbef69868..fbcf061126 100644 --- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala @@ -26,29 +26,23 @@ extends Serializable { } /** - * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream. - * Therefore, the values for each key in `this` DStream's RDDs are grouped into a - * single sequence to generate the RDDs of the new DStream. Hash partitioning is - * used to generate the RDDs with Spark's default number of partitions. + * Create a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to + * generate the RDDs with Spark's default number of partitions. */ def groupByKey(): DStream[(K, Seq[V])] = { groupByKey(defaultPartitioner()) } /** - * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream. - * Therefore, the values for each key in `this` DStream's RDDs are grouped into a - * single sequence to generate the RDDs of the new DStream. Hash partitioning is - * used to generate the RDDs with `numPartitions` partitions. + * Create a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to + * generate the RDDs with `numPartitions` partitions. */ def groupByKey(numPartitions: Int): DStream[(K, Seq[V])] = { groupByKey(defaultPartitioner(numPartitions)) } /** - * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream. - * Therefore, the values for each key in `this` DStream's RDDs are grouped into a - * single sequence to generate the RDDs of the new DStream. [[spark.Partitioner]] + * Create a new DStream by applying `groupByKey` on each RDD. The supplied [[spark.Partitioner]] * is used to control the partitioning of each RDD. */ def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = { @@ -60,30 +54,27 @@ extends Serializable { } /** - * Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream. - * Therefore, the values for each key in `this` DStream's RDDs is merged using the - * associative reduce function to generate the RDDs of the new DStream. - * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * merged using the associative reduce function. Hash partitioning is used to generate the RDDs + * with Spark's default number of partitions. */ def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = { reduceByKey(reduceFunc, defaultPartitioner()) } /** - * Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream. - * Therefore, the values for each key in `this` DStream's RDDs is merged using the - * associative reduce function to generate the RDDs of the new DStream. - * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * merged using the supplied reduce function. Hash partitioning is used to generate the RDDs + * with `numPartitions` partitions. */ def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)] = { reduceByKey(reduceFunc, defaultPartitioner(numPartitions)) } /** - * Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream. - * Therefore, the values for each key in `this` DStream's RDDs is merged using the - * associative reduce function to generate the RDDs of the new DStream. - * [[spark.Partitioner]] is used to control the partitioning of each RDD. + * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * merged using the supplied reduce function. [[spark.Partitioner]] is used to control the + * partitioning of each RDD. */ def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = { val cleanedReduceFunc = ssc.sc.clean(reduceFunc) @@ -91,9 +82,9 @@ extends Serializable { } /** - * Generic function to combine elements of each key in DStream's RDDs using custom function. - * This is similar to the combineByKey for RDDs. Please refer to combineByKey in - * [[spark.PairRDDFunctions]] for more information. + * Combine elements of each key in DStream's RDDs using custom function. This is similar to the + * combineByKey for RDDs. Please refer to combineByKey in [[spark.PairRDDFunctions]] for more + * information. */ def combineByKey[C: ClassManifest]( createCombiner: V => C, @@ -104,19 +95,18 @@ extends Serializable { } /** - * Creates a new DStream by counting the number of values of each key in each RDD - * of `this` DStream. Hash partitioning is used to generate the RDDs with Spark's - * `numPartitions` partitions. + * Create a new DStream by counting the number of values of each key in each RDD. Hash + * partitioning is used to generate the RDDs with Spark's `numPartitions` partitions. */ def countByKey(numPartitions: Int = self.ssc.sc.defaultParallelism): DStream[(K, Long)] = { self.map(x => (x._1, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions) } /** - * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream. - * This is similar to `DStream.groupByKey()` but applies it over a sliding window. - * The new DStream generates RDDs with the same interval as this DStream. - * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * Creates a new DStream by applying `groupByKey` over a sliding window. This is similar to + * `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs + * with the same interval as this DStream. Hash partitioning is used to generate the RDDs with + * Spark's default number of partitions. * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval */ @@ -125,9 +115,9 @@ extends Serializable { } /** - * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream. - * This is similar to `DStream.groupByKey()` but applies it over a sliding window. - * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * Create a new DStream by applying `groupByKey` over a sliding window. Similar to + * `DStream.groupByKey()`, but applies it over a sliding window. Hash partitioning is used to + * generate the RDDs with Spark's default number of partitions. * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which @@ -139,8 +129,8 @@ extends Serializable { } /** - * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream. - * This is similar to `DStream.groupByKey()` but applies it over a sliding window. + * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream. + * Similar to `DStream.groupByKey()`, but applies it over a sliding window. * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval @@ -158,8 +148,8 @@ extends Serializable { } /** - * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream. - * This is similar to `DStream.groupByKey()` but applies it over a sliding window. + * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream. + * Similar to `DStream.groupByKey()`, but applies it over a sliding window. * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval * @param slideDuration sliding interval of the window (i.e., the interval after which @@ -176,10 +166,10 @@ extends Serializable { } /** - * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream. - * This is similar to `DStream.reduceByKey()` but applies it over a sliding window. - * The new DStream generates RDDs with the same interval as this DStream. - * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * Create a new DStream by applying `reduceByKey` over a sliding window on `this` DStream. + * Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream + * generates RDDs with the same interval as this DStream. Hash partitioning is used to generate + * the RDDs with Spark's default number of partitions. * @param reduceFunc associative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval @@ -192,9 +182,9 @@ extends Serializable { } /** - * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream. - * This is similar to `DStream.reduceByKey()` but applies it over a sliding window. - * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * Create a new DStream by applying `reduceByKey` over a sliding window. This is similar to + * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to + * generate the RDDs with Spark's default number of partitions. * @param reduceFunc associative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval @@ -211,9 +201,9 @@ extends Serializable { } /** - * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream. - * This is similar to `DStream.reduceByKey()` but applies it over a sliding window. - * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * Create a new DStream by applying `reduceByKey` over a sliding window. This is similar to + * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to + * generate the RDDs with `numPartitions` partitions. * @param reduceFunc associative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval @@ -232,8 +222,8 @@ extends Serializable { } /** - * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream. - * This is similar to `DStream.reduceByKey()` but applies it over a sliding window. + * Create a new DStream by applying `reduceByKey` over a sliding window. Similar to + * `DStream.reduceByKey()`, but applies it over a sliding window. * @param reduceFunc associative reduce function * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval @@ -255,9 +245,8 @@ extends Serializable { } /** - * Creates a new DStream by reducing over a window in a smarter way. - * The reduced value of over a new window is calculated incrementally by using the - * old window's reduce value : + * Create a new DStream by reducing over a using incremental computation. + * The reduced value of over a new window is calculated using the old window's reduce value : * 1. reduce the new values that entered the window (e.g., adding new counts) * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. @@ -283,9 +272,8 @@ extends Serializable { } /** - * Creates a new DStream by reducing over a window in a smarter way. - * The reduced value of over a new window is calculated incrementally by using the - * old window's reduce value : + * Create a new DStream by reducing over a using incremental computation. + * The reduced value of over a new window is calculated using the old window's reduce value : * 1. reduce the new values that entered the window (e.g., adding new counts) * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. @@ -313,9 +301,8 @@ extends Serializable { } /** - * Creates a new DStream by reducing over a window in a smarter way. - * The reduced value of over a new window is calculated incrementally by using the - * old window's reduce value : + * Create a new DStream by reducing over a using incremental computation. + * The reduced value of over a new window is calculated using the old window's reduce value : * 1. reduce the new values that entered the window (e.g., adding new counts) * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. @@ -344,7 +331,7 @@ extends Serializable { } /** - * Creates a new DStream by counting the number of values for each key over a window. + * Create a new DStream by counting the number of values for each key over a window. * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval @@ -369,10 +356,9 @@ extends Serializable { } /** - * Creates a new "state" DStream where the state for each key is updated by applying - * the given function on the previous state of the key and the new values of the key from - * `this` DStream. Hash partitioning is used to generate the RDDs with Spark's default - * number of partitions. + * Create a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of each key. + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. * @param updateFunc State update function. If `this` function returns None, then * corresponding state key-value pair will be eliminated. * @tparam S State type @@ -384,9 +370,9 @@ extends Serializable { } /** - * Creates a new "state" DStream where the state for each key is updated by applying - * the given function on the previous state of the key and the new values of the key from - * `this` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * Create a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of each key. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. * @param updateFunc State update function. If `this` function returns None, then * corresponding state key-value pair will be eliminated. * @param numPartitions Number of partitions of each RDD in the new DStream. @@ -400,9 +386,9 @@ extends Serializable { } /** - * Creates a new "state" DStream where the state for each key is updated by applying - * the given function on the previous state of the key and the new values of the key from - * `this` DStream. [[spark.Partitioner]] is used to control the partitioning of each RDD. + * Create a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of the key. + * [[spark.Partitioner]] is used to control the partitioning of each RDD. * @param updateFunc State update function. If `this` function returns None, then * corresponding state key-value pair will be eliminated. * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. @@ -419,9 +405,9 @@ extends Serializable { } /** - * Creates a new "state" DStream where the state for each key is updated by applying - * the given function on the previous state of the key and the new values of the key from - * `this` DStream. [[spark.Partitioner]] is used to control the partitioning of each RDD. + * Create a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of each key. + * [[spark.Paxrtitioner]] is used to control the partitioning of each RDD. * @param updateFunc State update function. If `this` function returns None, then * corresponding state key-value pair will be eliminated. Note, that * this function may generate a different a tuple with a different key @@ -451,22 +437,19 @@ extends Serializable { } /** - * Cogroups `this` DStream with `other` DStream. Each RDD of the new DStream will - * be generated by cogrouping RDDs from`this`and `other` DStreams. Therefore, for - * each key k in corresponding RDDs of `this` or `other` DStreams, the generated RDD - * will contains a tuple with the list of values for that key in both RDDs. - * HashPartitioner is used to partition each generated RDD into default number of partitions. + * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this` + * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that + * key in both RDDs. HashPartitioner is used to partition each generated RDD into default number + * of partitions. */ def cogroup[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = { cogroup(other, defaultPartitioner()) } /** - * Cogroups `this` DStream with `other` DStream. Each RDD of the new DStream will - * be generated by cogrouping RDDs from`this`and `other` DStreams. Therefore, for - * each key k in corresponding RDDs of `this` or `other` DStreams, the generated RDD - * will contains a tuple with the list of values for that key in both RDDs. - * Partitioner is used to partition each generated RDD. + * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this` + * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that + * key in both RDDs. Partitioner is used to partition each generated RDD. */ def cogroup[W: ClassManifest]( other: DStream[(K, W)], @@ -488,8 +471,7 @@ extends Serializable { } /** - * Joins `this` DStream with `other` DStream. Each RDD of the new DStream will - * be generated by joining RDDs from `this` and `other` DStreams. HashPartitioner is used + * Join `this` DStream with `other` DStream. HashPartitioner is used * to partition each generated RDD into default number of partitions. */ def join[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, W))] = { @@ -497,7 +479,7 @@ extends Serializable { } /** - * Joins `this` DStream with `other` DStream, that is, each RDD of the new DStream will + * Join `this` DStream with `other` DStream, that is, each RDD of the new DStream will * be generated by joining RDDs from `this` and other DStream. Uses the given * Partitioner to partition each generated RDD. */ @@ -513,7 +495,7 @@ extends Serializable { } /** - * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" */ def saveAsHadoopFiles[F <: OutputFormat[K, V]]( @@ -524,7 +506,7 @@ extends Serializable { } /** - * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix" */ def saveAsHadoopFiles( @@ -543,8 +525,8 @@ extends Serializable { } /** - * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated - * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". */ def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]]( prefix: String, @@ -554,8 +536,8 @@ extends Serializable { } /** - * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated - * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". */ def saveAsNewAPIHadoopFiles( prefix: String, diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala new file mode 100644 index 0000000000..2e7466b16c --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala @@ -0,0 +1,91 @@ +package spark.streaming.api.java + +import spark.streaming.{Duration, Time, DStream} +import spark.api.java.function.{Function => JFunction} +import spark.api.java.JavaRDD +import spark.storage.StorageLevel + +/** + * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous + * sequence of RDDs (of the same type) representing a continuous stream of data (see [[spark.RDD]] + * for more details on RDDs). DStreams can either be created from live data (such as, data from + * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations + * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each + * DStream periodically generates a RDD, either from live data or by transforming the RDD generated + * by a parent DStream. + * + * This class contains the basic operations available on all DStreams, such as `map`, `filter` and + * `window`. In addition, [[spark.streaming.api.java.JavaPairDStream]] contains operations available + * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. These operations + * are automatically available on any DStream of the right type (e.g., DStream[(Int, Int)] through + * implicit conversions when `spark.streaming.StreamingContext._` is imported. + * + * DStreams internally is characterized by a few basic properties: + * - A list of other DStreams that the DStream depends on + * - A time interval at which the DStream generates an RDD + * - A function that is used to generate an RDD after each time interval + */ +class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassManifest[T]) + extends JavaDStreamLike[T, JavaDStream[T]] { + + /** Return a new DStream containing only the elements that satisfy a predicate. */ + def filter(f: JFunction[T, java.lang.Boolean]): JavaDStream[T] = + dstream.filter((x => f(x).booleanValue())) + + /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ + def cache(): JavaDStream[T] = dstream.cache() + + /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ + def persist(): JavaDStream[T] = dstream.cache() + + /** Persist the RDDs of this DStream with the given storage level */ + def persist(storageLevel: StorageLevel): JavaDStream[T] = dstream.persist(storageLevel) + + /** Generate an RDD for the given duration */ + def compute(validTime: Time): JavaRDD[T] = { + dstream.compute(validTime) match { + case Some(rdd) => new JavaRDD(rdd) + case None => null + } + } + + /** + * Return a new DStream which is computed based on windowed batches of this DStream. + * The new DStream generates RDDs with the same interval as this DStream. + * @param windowDuration width of the window; must be a multiple of this DStream's interval. + * @return + */ + def window(windowDuration: Duration): JavaDStream[T] = + dstream.window(windowDuration) + + /** + * Return a new DStream which is computed based on windowed batches of this DStream. + * @param windowDuration duration (i.e., width) of the window; + * must be a multiple of this DStream's interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's interval + */ + def window(windowDuration: Duration, slideDuration: Duration): JavaDStream[T] = + dstream.window(windowDuration, slideDuration) + + /** + * Return a new DStream which computed based on tumbling window on this DStream. + * This is equivalent to window(batchDuration, batchDuration). + * @param batchDuration tumbling window duration; must be a multiple of this DStream's interval + */ + def tumble(batchDuration: Duration): JavaDStream[T] = + dstream.tumble(batchDuration) + + /** + * Return a new DStream by unifying data of another DStream with this DStream. + * @param that Another DStream having the same interval (i.e., slideDuration) as this DStream. + */ + def union(that: JavaDStream[T]): JavaDStream[T] = + dstream.union(that.dstream) +} + +object JavaDStream { + implicit def fromDStream[T: ClassManifest](dstream: DStream[T]): JavaDStream[T] = + new JavaDStream[T](dstream) +}
\ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala new file mode 100644 index 0000000000..b93cb7865a --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala @@ -0,0 +1,183 @@ +package spark.streaming.api.java + +import java.util.{List => JList} +import java.lang.{Long => JLong} + +import scala.collection.JavaConversions._ + +import spark.streaming._ +import spark.api.java.JavaRDD +import spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _} +import java.util +import spark.RDD +import JavaDStream._ + +trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable { + implicit val classManifest: ClassManifest[T] + + def dstream: DStream[T] + + implicit def scalaIntToJavaLong(in: DStream[Long]): JavaDStream[JLong] = { + in.map(new JLong(_)) + } + + /** + * Print the first ten elements of each RDD generated in this DStream. This is an output + * operator, so this DStream will be registered as an output stream and there materialized. + */ + def print() = dstream.print() + + /** + * Return a new DStream in which each RDD has a single element generated by counting each RDD + * of this DStream. + */ + def count(): JavaDStream[JLong] = dstream.count() + + /** + * Return a new DStream in which each RDD has a single element generated by counting the number + * of elements in a window over this DStream. windowDuration and slideDuration are as defined in the + * window() operation. This is equivalent to window(windowDuration, slideDuration).count() + */ + def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[JLong] = { + dstream.countByWindow(windowDuration, slideDuration) + } + + /** + * Return a new DStream in which each RDD is generated by applying glom() to each RDD of + * this DStream. Applying glom() to an RDD coalesces all elements within each partition into + * an array. + */ + def glom(): JavaDStream[JList[T]] = + new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq))) + + /** Return the StreamingContext associated with this DStream */ + def context(): StreamingContext = dstream.context() + + /** Return a new DStream by applying a function to all elements of this DStream. */ + def map[R](f: JFunction[T, R]): JavaDStream[R] = { + new JavaDStream(dstream.map(f)(f.returnType()))(f.returnType()) + } + + /** Return a new DStream by applying a function to all elements of this DStream. */ + def map[K, V](f: PairFunction[T, K, V]): JavaPairDStream[K, V] = { + def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]] + new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType()) + } + + /** + * Return a new DStream by applying a function to all elements of this DStream, + * and then flattening the results + */ + def flatMap[U](f: FlatMapFunction[T, U]): JavaDStream[U] = { + import scala.collection.JavaConverters._ + def fn = (x: T) => f.apply(x).asScala + new JavaDStream(dstream.flatMap(fn)(f.elementType()))(f.elementType()) + } + + /** + * Return a new DStream by applying a function to all elements of this DStream, + * and then flattening the results + */ + def flatMap[K, V](f: PairFlatMapFunction[T, K, V]): JavaPairDStream[K, V] = { + import scala.collection.JavaConverters._ + def fn = (x: T) => f.apply(x).asScala + def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]] + new JavaPairDStream(dstream.flatMap(fn)(cm))(f.keyType(), f.valueType()) + } + + /** + * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs + * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition + * of the RDD. + */ + def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = { + def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) + new JavaDStream(dstream.mapPartitions(fn)(f.elementType()))(f.elementType()) + } + + /** + * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs + * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition + * of the RDD. + */ + def mapPartitions[K, V](f: PairFlatMapFunction[java.util.Iterator[T], K, V]) + : JavaPairDStream[K, V] = { + def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) + new JavaPairDStream(dstream.mapPartitions(fn))(f.keyType(), f.valueType()) + } + + /** + * Return a new DStream in which each RDD has a single element generated by reducing each RDD + * of this DStream. + */ + def reduce(f: JFunction2[T, T, T]): JavaDStream[T] = dstream.reduce(f) + + /** + * Return a new DStream in which each RDD has a single element generated by reducing all + * elements in a window over this DStream. windowDuration and slideDuration are as defined in the + * window() operation. This is equivalent to window(windowDuration, slideDuration).reduce(reduceFunc) + */ + def reduceByWindow( + reduceFunc: JFunction2[T, T, T], + invReduceFunc: JFunction2[T, T, T], + windowDuration: Duration, + slideDuration: Duration + ): JavaDStream[T] = { + dstream.reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration) + } + + /** + * Return all the RDDs between 'fromDuration' to 'toDuration' (both included) + */ + def slice(fromDuration: Duration, toDuration: Duration): JList[JavaRDD[T]] = { + new util.ArrayList(dstream.slice(fromDuration, toDuration).map(new JavaRDD(_)).toSeq) + } + + /** + * Apply a function to each RDD in this DStream. This is an output operator, so + * this DStream will be registered as an output stream and therefore materialized. + */ + def foreach(foreachFunc: JFunction[JavaRDD[T], Void]) { + dstream.foreach(rdd => foreachFunc.call(new JavaRDD(rdd))) + } + + /** + * Apply a function to each RDD in this DStream. This is an output operator, so + * this DStream will be registered as an output stream and therefore materialized. + */ + def foreach(foreachFunc: JFunction2[JavaRDD[T], Time, Void]) { + dstream.foreach((rdd, time) => foreachFunc.call(new JavaRDD(rdd), time)) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of this DStream. + */ + def transform[U](transformFunc: JFunction[JavaRDD[T], JavaRDD[U]]): JavaDStream[U] = { + implicit val cm: ClassManifest[U] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] + def scalaTransform (in: RDD[T]): RDD[U] = + transformFunc.call(new JavaRDD[T](in)).rdd + dstream.transform(scalaTransform(_)) + } + + /** + * Return a new DStream in which each RDD is generated by applying a function + * on each RDD of this DStream. + */ + def transform[U](transformFunc: JFunction2[JavaRDD[T], Time, JavaRDD[U]]): JavaDStream[U] = { + implicit val cm: ClassManifest[U] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] + def scalaTransform (in: RDD[T], time: Time): RDD[U] = + transformFunc.call(new JavaRDD[T](in), time).rdd + dstream.transform(scalaTransform(_, _)) + } + + /** + * Enable periodic checkpointing of RDDs of this DStream + * @param interval Time interval after which generated RDD will be checkpointed + */ + def checkpoint(interval: Duration) = { + dstream.checkpoint(interval) + } +}
\ No newline at end of file diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala new file mode 100644 index 0000000000..ef10c091ca --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala @@ -0,0 +1,638 @@ +package spark.streaming.api.java + +import java.util.{List => JList} +import java.lang.{Long => JLong} + +import scala.collection.JavaConversions._ + +import spark.streaming._ +import spark.streaming.StreamingContext._ +import spark.api.java.function.{Function => JFunction, Function2 => JFunction2} +import spark.Partitioner +import org.apache.hadoop.mapred.{JobConf, OutputFormat} +import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} +import org.apache.hadoop.conf.Configuration +import spark.api.java.JavaPairRDD +import spark.storage.StorageLevel +import com.google.common.base.Optional + +class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( + implicit val kManifiest: ClassManifest[K], + implicit val vManifest: ClassManifest[V]) + extends JavaDStreamLike[(K, V), JavaPairDStream[K, V]] { + + // ======================================================================= + // Methods common to all DStream's + // ======================================================================= + + /** Returns a new DStream containing only the elements that satisfy a predicate. */ + def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairDStream[K, V] = + dstream.filter((x => f(x).booleanValue())) + + /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ + def cache(): JavaPairDStream[K, V] = dstream.cache() + + /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */ + def persist(): JavaPairDStream[K, V] = dstream.cache() + + /** Persists the RDDs of this DStream with the given storage level */ + def persist(storageLevel: StorageLevel): JavaPairDStream[K, V] = dstream.persist(storageLevel) + + /** Method that generates a RDD for the given Duration */ + def compute(validTime: Time): JavaPairRDD[K, V] = { + dstream.compute(validTime) match { + case Some(rdd) => new JavaPairRDD(rdd) + case None => null + } + } + + /** + * Return a new DStream which is computed based on windowed batches of this DStream. + * The new DStream generates RDDs with the same interval as this DStream. + * @param windowDuration width of the window; must be a multiple of this DStream's interval. + * @return + */ + def window(windowDuration: Duration): JavaPairDStream[K, V] = + dstream.window(windowDuration) + + /** + * Return a new DStream which is computed based on windowed batches of this DStream. + * @param windowDuration duration (i.e., width) of the window; + * must be a multiple of this DStream's interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's interval + */ + def window(windowDuration: Duration, slideDuration: Duration): JavaPairDStream[K, V] = + dstream.window(windowDuration, slideDuration) + + /** + * Returns a new DStream which computed based on tumbling window on this DStream. + * This is equivalent to window(batchDuration, batchDuration). + * @param batchDuration tumbling window duration; must be a multiple of this DStream's interval + */ + def tumble(batchDuration: Duration): JavaPairDStream[K, V] = + dstream.tumble(batchDuration) + + /** + * Returns a new DStream by unifying data of another DStream with this DStream. + * @param that Another DStream having the same interval (i.e., slideDuration) as this DStream. + */ + def union(that: JavaPairDStream[K, V]): JavaPairDStream[K, V] = + dstream.union(that.dstream) + + // ======================================================================= + // Methods only for PairDStream's + // ======================================================================= + + /** + * Create a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to + * generate the RDDs with Spark's default number of partitions. + */ + def groupByKey(): JavaPairDStream[K, JList[V]] = + dstream.groupByKey().mapValues(seqAsJavaList _) + + /** + * Create a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to + * generate the RDDs with `numPartitions` partitions. + */ + def groupByKey(numPartitions: Int): JavaPairDStream[K, JList[V]] = + dstream.groupByKey(numPartitions).mapValues(seqAsJavaList _) + + /** + * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream. + * Therefore, the values for each key in `this` DStream's RDDs are grouped into a + * single sequence to generate the RDDs of the new DStream. [[spark.Partitioner]] + * is used to control the partitioning of each RDD. + */ + def groupByKey(partitioner: Partitioner): JavaPairDStream[K, JList[V]] = + dstream.groupByKey(partitioner).mapValues(seqAsJavaList _) + + /** + * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * merged using the associative reduce function. Hash partitioning is used to generate the RDDs + * with Spark's default number of partitions. + */ + def reduceByKey(func: JFunction2[V, V, V]): JavaPairDStream[K, V] = + dstream.reduceByKey(func) + + /** + * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * merged using the supplied reduce function. Hash partitioning is used to generate the RDDs + * with `numPartitions` partitions. + */ + def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairDStream[K, V] = + dstream.reduceByKey(func, numPartitions) + + /** + * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are + * merged using the supplied reduce function. [[spark.Partitioner]] is used to control the + * partitioning of each RDD. + */ + def reduceByKey(func: JFunction2[V, V, V], partitioner: Partitioner): JavaPairDStream[K, V] = { + dstream.reduceByKey(func, partitioner) + } + + /** + * Combine elements of each key in DStream's RDDs using custom function. This is similar to the + * combineByKey for RDDs. Please refer to combineByKey in [[spark.PairRDDFunctions]] for more + * information. + */ + def combineByKey[C](createCombiner: JFunction[V, C], + mergeValue: JFunction2[C, V, C], + mergeCombiners: JFunction2[C, C, C], + partitioner: Partitioner + ): JavaPairDStream[K, C] = { + implicit val cm: ClassManifest[C] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[C]] + dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner) + } + + /** + * Create a new DStream by counting the number of values of each key in each RDD. Hash + * partitioning is used to generate the RDDs with Spark's `numPartitions` partitions. + */ + def countByKey(numPartitions: Int): JavaPairDStream[K, JLong] = { + JavaPairDStream.scalaToJavaLong(dstream.countByKey(numPartitions)); + } + + + /** + * Create a new DStream by counting the number of values of each key in each RDD. Hash + * partitioning is used to generate the RDDs with the default number of partitions. + */ + def countByKey(): JavaPairDStream[K, JLong] = { + JavaPairDStream.scalaToJavaLong(dstream.countByKey()); + } + + /** + * Creates a new DStream by applying `groupByKey` over a sliding window. This is similar to + * `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs + * with the same interval as this DStream. Hash partitioning is used to generate the RDDs with + * Spark's default number of partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + */ + def groupByKeyAndWindow(windowDuration: Duration): JavaPairDStream[K, JList[V]] = { + dstream.groupByKeyAndWindow(windowDuration).mapValues(seqAsJavaList _) + } + + /** + * Create a new DStream by applying `groupByKey` over a sliding window. Similar to + * `DStream.groupByKey()`, but applies it over a sliding window. Hash partitioning is used to + * generate the RDDs with Spark's default number of partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration) + : JavaPairDStream[K, JList[V]] = { + dstream.groupByKeyAndWindow(windowDuration, slideDuration).mapValues(seqAsJavaList _) + } + + /** + * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream. + * Similar to `DStream.groupByKey()`, but applies it over a sliding window. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions Number of partitions of each RDD in the new DStream. + */ + def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int) + :JavaPairDStream[K, JList[V]] = { + dstream.groupByKeyAndWindow(windowDuration, slideDuration, numPartitions) + .mapValues(seqAsJavaList _) + } + + /** + * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream. + * Similar to `DStream.groupByKey()`, but applies it over a sliding window. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + */ + def groupByKeyAndWindow( + windowDuration: Duration, + slideDuration: Duration, + partitioner: Partitioner + ):JavaPairDStream[K, JList[V]] = { + dstream.groupByKeyAndWindow(windowDuration, slideDuration, partitioner) + .mapValues(seqAsJavaList _) + } + + /** + * Create a new DStream by applying `reduceByKey` over a sliding window on `this` DStream. + * Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream + * generates RDDs with the same interval as this DStream. Hash partitioning is used to generate + * the RDDs with Spark's default number of partitions. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + */ + def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowDuration: Duration) + :JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow(reduceFunc, windowDuration) + } + + /** + * Create a new DStream by applying `reduceByKey` over a sliding window. This is similar to + * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to + * generate the RDDs with Spark's default number of partitions. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def reduceByKeyAndWindow( + reduceFunc: Function2[V, V, V], + windowDuration: Duration, + slideDuration: Duration + ):JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration) + } + + /** + * Create a new DStream by applying `reduceByKey` over a sliding window. This is similar to + * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to + * generate the RDDs with `numPartitions` partitions. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions Number of partitions of each RDD in the new DStream. + */ + def reduceByKeyAndWindow( + reduceFunc: Function2[V, V, V], + windowDuration: Duration, + slideDuration: Duration, + numPartitions: Int + ): JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, numPartitions) + } + + /** + * Create a new DStream by applying `reduceByKey` over a sliding window. Similar to + * `DStream.reduceByKey()`, but applies it over a sliding window. + * @param reduceFunc associative reduce function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + */ + def reduceByKeyAndWindow( + reduceFunc: Function2[V, V, V], + windowDuration: Duration, + slideDuration: Duration, + partitioner: Partitioner + ): JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, partitioner) + } + + /** + * Create a new DStream by reducing over a using incremental computation. + * The reduced value of over a new window is calculated using the old window's reduce value : + * 1. reduce the new values that entered the window (e.g., adding new counts) + * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. + * However, it is applicable to only "invertible reduce functions". + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * @param reduceFunc associative reduce function + * @param invReduceFunc inverse function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def reduceByKeyAndWindow( + reduceFunc: Function2[V, V, V], + invReduceFunc: Function2[V, V, V], + windowDuration: Duration, + slideDuration: Duration + ): JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration) + } + + /** + * Create a new DStream by reducing over a using incremental computation. + * The reduced value of over a new window is calculated using the old window's reduce value : + * 1. reduce the new values that entered the window (e.g., adding new counts) + * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. + * However, it is applicable to only "invertible reduce functions". + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * @param reduceFunc associative reduce function + * @param invReduceFunc inverse function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions Number of partitions of each RDD in the new DStream. + */ + def reduceByKeyAndWindow( + reduceFunc: Function2[V, V, V], + invReduceFunc: Function2[V, V, V], + windowDuration: Duration, + slideDuration: Duration, + numPartitions: Int + ): JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow( + reduceFunc, + invReduceFunc, + windowDuration, + slideDuration, + numPartitions) + } + + /** + * Create a new DStream by reducing over a using incremental computation. + * The reduced value of over a new window is calculated using the old window's reduce value : + * 1. reduce the new values that entered the window (e.g., adding new counts) + * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) + * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function. + * However, it is applicable to only "invertible reduce functions". + * @param reduceFunc associative reduce function + * @param invReduceFunc inverse function + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + */ + def reduceByKeyAndWindow( + reduceFunc: Function2[V, V, V], + invReduceFunc: Function2[V, V, V], + windowDuration: Duration, + slideDuration: Duration, + partitioner: Partitioner + ): JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow( + reduceFunc, + invReduceFunc, + windowDuration, + slideDuration, + partitioner) + } + + /** + * Create a new DStream by counting the number of values for each key over a window. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration) + : JavaPairDStream[K, JLong] = { + JavaPairDStream.scalaToJavaLong(dstream.countByKeyAndWindow(windowDuration, slideDuration)) + } + + /** + * Create a new DStream by counting the number of values for each key over a window. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + * @param numPartitions Number of partitions of each RDD in the new DStream. + */ + def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int) + : JavaPairDStream[K, Long] = { + dstream.countByKeyAndWindow(windowDuration, slideDuration, numPartitions) + } + + private def convertUpdateStateFunction[S](in: JFunction2[JList[V], Optional[S], Optional[S]]): + (Seq[V], Option[S]) => Option[S] = { + val scalaFunc: (Seq[V], Option[S]) => Option[S] = (values, state) => { + val list: JList[V] = values + val scalaState: Optional[S] = state match { + case Some(s) => Optional.of(s) + case _ => Optional.absent() + } + val result: Optional[S] = in.apply(list, scalaState) + result.isPresent match { + case true => Some(result.get()) + case _ => None + } + } + scalaFunc + } + + /** + * Create a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of each key. + * Hash partitioning is used to generate the RDDs with Spark's default number of partitions. + * @param updateFunc State update function. If `this` function returns None, then + * corresponding state key-value pair will be eliminated. + * @tparam S State type + */ + def updateStateByKey[S](updateFunc: JFunction2[JList[V], Optional[S], Optional[S]]) + : JavaPairDStream[K, S] = { + implicit val cm: ClassManifest[S] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[S]] + dstream.updateStateByKey(convertUpdateStateFunction(updateFunc)) + } + + /** + * Create a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of each key. + * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. + * @param updateFunc State update function. If `this` function returns None, then + * corresponding state key-value pair will be eliminated. + * @param numPartitions Number of partitions of each RDD in the new DStream. + * @tparam S State type + */ + def updateStateByKey[S: ClassManifest]( + updateFunc: JFunction2[JList[V], Optional[S], Optional[S]], + numPartitions: Int) + : JavaPairDStream[K, S] = { + dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), numPartitions) + } + + /** + * Create a new "state" DStream where the state for each key is updated by applying + * the given function on the previous state of the key and the new values of the key. + * [[spark.Partitioner]] is used to control the partitioning of each RDD. + * @param updateFunc State update function. If `this` function returns None, then + * corresponding state key-value pair will be eliminated. + * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream. + * @tparam S State type + */ + def updateStateByKey[S: ClassManifest]( + updateFunc: JFunction2[JList[V], Optional[S], Optional[S]], + partitioner: Partitioner + ): JavaPairDStream[K, S] = { + dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), partitioner) + } + + def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = { + implicit val cm: ClassManifest[U] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] + dstream.mapValues(f) + } + + def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairDStream[K, U] = { + import scala.collection.JavaConverters._ + def fn = (x: V) => f.apply(x).asScala + implicit val cm: ClassManifest[U] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] + dstream.flatMapValues(fn) + } + + /** + * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this` + * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that + * key in both RDDs. HashPartitioner is used to partition each generated RDD into default number + * of partitions. + */ + def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = { + implicit val cm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + dstream.cogroup(other.dstream).mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) + } + + /** + * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this` + * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that + * key in both RDDs. Partitioner is used to partition each generated RDD. + */ + def cogroup[W](other: JavaPairDStream[K, W], partitioner: Partitioner) + : JavaPairDStream[K, (JList[V], JList[W])] = { + implicit val cm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + dstream.cogroup(other.dstream, partitioner) + .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) + } + + /** + * Join `this` DStream with `other` DStream. HashPartitioner is used + * to partition each generated RDD into default number of partitions. + */ + def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = { + implicit val cm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + dstream.join(other.dstream) + } + + /** + * Join `this` DStream with `other` DStream, that is, each RDD of the new DStream will + * be generated by joining RDDs from `this` and other DStream. Uses the given + * Partitioner to partition each generated RDD. + */ + def join[W](other: JavaPairDStream[K, W], partitioner: Partitioner) + : JavaPairDStream[K, (V, W)] = { + implicit val cm: ClassManifest[W] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]] + dstream.join(other.dstream, partitioner) + } + + /** + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ + def saveAsHadoopFiles[F <: OutputFormat[K, V]](prefix: String, suffix: String) { + dstream.saveAsHadoopFiles(prefix, suffix) + } + + /** + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ + def saveAsHadoopFiles( + prefix: String, + suffix: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: OutputFormat[_, _]]) { + dstream.saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass) + } + + /** + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ + def saveAsHadoopFiles( + prefix: String, + suffix: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: OutputFormat[_, _]], + conf: JobConf) { + dstream.saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf) + } + + /** + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ + def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](prefix: String, suffix: String) { + dstream.saveAsNewAPIHadoopFiles(prefix, suffix) + } + + /** + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ + def saveAsNewAPIHadoopFiles( + prefix: String, + suffix: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: NewOutputFormat[_, _]]) { + dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass) + } + + /** + * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is + * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix". + */ + def saveAsNewAPIHadoopFiles( + prefix: String, + suffix: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: NewOutputFormat[_, _]], + conf: Configuration = new Configuration) { + dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf) + } + + override val classManifest: ClassManifest[(K, V)] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]] +} + +object JavaPairDStream { + implicit def fromPairDStream[K: ClassManifest, V: ClassManifest](dstream: DStream[(K, V)]) + :JavaPairDStream[K, V] = + new JavaPairDStream[K, V](dstream) + + def fromJavaDStream[K, V](dstream: JavaDStream[(K, V)]): JavaPairDStream[K, V] = { + implicit val cmk: ClassManifest[K] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] + implicit val cmv: ClassManifest[V] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + new JavaPairDStream[K, V](dstream.dstream) + } + + def scalaToJavaLong[K: ClassManifest](dstream: JavaPairDStream[K, Long]) + : JavaPairDStream[K, JLong] = { + StreamingContext.toPairDStreamFunctions(dstream.dstream).mapValues(new JLong(_)) + } +} diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala new file mode 100644 index 0000000000..accac82e09 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -0,0 +1,288 @@ +package spark.streaming.api.java + +import scala.collection.JavaConversions._ +import java.lang.{Long => JLong, Integer => JInt} + +import spark.streaming._ +import dstream._ +import spark.storage.StorageLevel +import spark.api.java.function.{Function => JFunction, Function2 => JFunction2} +import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} +import java.io.InputStream +import java.util.{Map => JMap} + +/** + * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic + * information (such as, cluster URL and job name) to internally create a SparkContext, it provides + * methods used to create DStream from various input sources. + */ +class JavaStreamingContext(val ssc: StreamingContext) { + + // TODOs: + // - Test to/from Hadoop functions + // - Support creating and registering InputStreams + + + /** + * Creates a StreamingContext. + * @param master Name of the Spark Master + * @param frameworkName Name to be used when registering with the scheduler + * @param batchDuration The time interval at which streaming data will be divided into batches + */ + def this(master: String, frameworkName: String, batchDuration: Duration) = + this(new StreamingContext(master, frameworkName, batchDuration)) + + /** + * Re-creates a StreamingContext from a checkpoint file. + * @param path Path either to the directory that was specified as the checkpoint directory, or + * to the checkpoint file 'graph' or 'graph.bk'. + */ + def this(path: String) = this (new StreamingContext(path)) + + /** + * Create an input stream that pulls messages form a Kafka Broker. + * @param hostname Zookeper hostname. + * @param port Zookeper port. + * @param groupId The group id for this consumer. + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + */ + def kafkaStream[T]( + hostname: String, + port: Int, + groupId: String, + topics: JMap[String, JInt]) + : JavaDStream[T] = { + implicit val cmt: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + ssc.kafkaStream[T](hostname, port, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*)) + } + + /** + * Create an input stream that pulls messages form a Kafka Broker. + * @param hostname Zookeper hostname. + * @param port Zookeper port. + * @param groupId The group id for this consumer. + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param initialOffsets Optional initial offsets for each of the partitions to consume. + * By default the value is pulled from zookeper. + */ + def kafkaStream[T]( + hostname: String, + port: Int, + groupId: String, + topics: JMap[String, JInt], + initialOffsets: JMap[KafkaPartitionKey, JLong]) + : JavaDStream[T] = { + implicit val cmt: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + ssc.kafkaStream[T]( + hostname, + port, + groupId, + Map(topics.mapValues(_.intValue()).toSeq: _*), + Map(initialOffsets.mapValues(_.longValue()).toSeq: _*)) + } + + /** + * Create an input stream that pulls messages form a Kafka Broker. + * @param hostname Zookeper hostname. + * @param port Zookeper port. + * @param groupId The group id for this consumer. + * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed + * in its own thread. + * @param initialOffsets Optional initial offsets for each of the partitions to consume. + * By default the value is pulled from zookeper. + * @param storageLevel RDD storage level. Defaults to memory-only + */ + def kafkaStream[T]( + hostname: String, + port: Int, + groupId: String, + topics: JMap[String, JInt], + initialOffsets: JMap[KafkaPartitionKey, JLong], + storageLevel: StorageLevel) + : JavaDStream[T] = { + implicit val cmt: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + ssc.kafkaStream[T]( + hostname, + port, + groupId, + Map(topics.mapValues(_.intValue()).toSeq: _*), + Map(initialOffsets.mapValues(_.longValue()).toSeq: _*), + storageLevel) + } + + /** + * Create a input stream from network source hostname:port. Data is received using + * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited + * lines. + * @param hostname Hostname to connect to for receiving data + * @param port Port to connect to for receiving data + * @param storageLevel Storage level to use for storing the received objects + * (default: StorageLevel.MEMORY_AND_DISK_SER_2) + */ + def networkTextStream(hostname: String, port: Int, storageLevel: StorageLevel) + : JavaDStream[String] = { + ssc.networkTextStream(hostname, port, storageLevel) + } + + /** + * Create a input stream from network source hostname:port. Data is received using + * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited + * lines. + * @param hostname Hostname to connect to for receiving data + * @param port Port to connect to for receiving data + */ + def networkTextStream(hostname: String, port: Int): JavaDStream[String] = { + ssc.networkTextStream(hostname, port) + } + + /** + * Create a input stream from network source hostname:port. Data is received using + * a TCP socket and the receive bytes it interepreted as object using the given + * converter. + * @param hostname Hostname to connect to for receiving data + * @param port Port to connect to for receiving data + * @param converter Function to convert the byte stream to objects + * @param storageLevel Storage level to use for storing the received objects + * @tparam T Type of the objects received (after converting bytes to objects) + */ + def networkStream[T]( + hostname: String, + port: Int, + converter: JFunction[InputStream, java.lang.Iterable[T]], + storageLevel: StorageLevel) + : JavaDStream[T] = { + def fn = (x: InputStream) => converter.apply(x).toIterator + implicit val cmt: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + ssc.networkStream(hostname, port, fn, storageLevel) + } + + /** + * Creates a input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them as text files (using key as LongWritable, value + * as Text and input format as TextInputFormat). File names starting with . are ignored. + * @param directory HDFS directory to monitor for new file + */ + def textFileStream(directory: String): JavaDStream[String] = { + ssc.textFileStream(directory) + } + + /** + * Create a input stream from network source hostname:port, where data is received + * as serialized blocks (serialized using the Spark's serializer) that can be directly + * pushed into the block manager without deserializing them. This is the most efficient + * way to receive data. + * @param hostname Hostname to connect to for receiving data + * @param port Port to connect to for receiving data + * @param storageLevel Storage level to use for storing the received objects + * @tparam T Type of the objects in the received blocks + */ + def rawNetworkStream[T]( + hostname: String, + port: Int, + storageLevel: StorageLevel): JavaDStream[T] = { + implicit val cmt: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + JavaDStream.fromDStream(ssc.rawNetworkStream(hostname, port, storageLevel)) + } + + /** + * Create a input stream from network source hostname:port, where data is received + * as serialized blocks (serialized using the Spark's serializer) that can be directly + * pushed into the block manager without deserializing them. This is the most efficient + * way to receive data. + * @param hostname Hostname to connect to for receiving data + * @param port Port to connect to for receiving data + * @tparam T Type of the objects in the received blocks + */ + def rawNetworkStream[T](hostname: String, port: Int): JavaDStream[T] = { + implicit val cmt: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + JavaDStream.fromDStream(ssc.rawNetworkStream(hostname, port)) + } + + /** + * Creates a input stream that monitors a Hadoop-compatible filesystem + * for new files and reads them using the given key-value types and input format. + * File names starting with . are ignored. + * @param directory HDFS directory to monitor for new file + * @tparam K Key type for reading HDFS file + * @tparam V Value type for reading HDFS file + * @tparam F Input format for reading HDFS file + */ + def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): JavaPairDStream[K, V] = { + implicit val cmk: ClassManifest[K] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] + implicit val cmv: ClassManifest[V] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + implicit val cmf: ClassManifest[F] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[F]] + ssc.fileStream[K, V, F](directory); + } + + /** + * Creates a input stream from a Flume source. + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + * @param storageLevel Storage level to use for storing the received objects + */ + def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel): + JavaDStream[SparkFlumeEvent] = { + ssc.flumeStream(hostname, port, storageLevel) + } + + + /** + * Creates a input stream from a Flume source. + * @param hostname Hostname of the slave machine to which the flume data will be sent + * @param port Port of the slave machine to which the flume data will be sent + */ + def flumeStream(hostname: String, port: Int): + JavaDStream[SparkFlumeEvent] = { + ssc.flumeStream(hostname, port) + } + + /** + * Registers an output stream that will be computed every interval + */ + def registerOutputStream(outputStream: JavaDStreamLike[_, _]) { + ssc.registerOutputStream(outputStream.dstream) + } + + /** + * Sets the context to periodically checkpoint the DStream operations for master + * fault-tolerance. By default, the graph will be checkpointed every batch interval. + * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored + * @param interval checkpoint interval + */ + def checkpoint(directory: String, interval: Duration = null) { + ssc.checkpoint(directory, interval) + } + + /** + * Sets each DStreams in this context to remember RDDs it generated in the last given duration. + * DStreams remember RDDs only for a limited duration of duration and releases them for garbage + * collection. This method allows the developer to specify how to long to remember the RDDs ( + * if the developer wishes to query old data outside the DStream computation). + * @param duration Minimum duration that each DStream should remember its RDDs + */ + def remember(duration: Duration) { + ssc.remember(duration) + } + + /** + * Starts the execution of the streams. + */ + def start() = ssc.start() + + /** + * Sstops the execution of the streams. + */ + def stop() = ssc.stop() + +} diff --git a/streaming/src/test/scala/JavaTestUtils.scala b/streaming/src/test/scala/JavaTestUtils.scala new file mode 100644 index 0000000000..56349837e5 --- /dev/null +++ b/streaming/src/test/scala/JavaTestUtils.scala @@ -0,0 +1,65 @@ +package spark.streaming + +import collection.mutable.{SynchronizedBuffer, ArrayBuffer} +import java.util.{List => JList} +import spark.streaming.api.java.{JavaPairDStream, JavaDStreamLike, JavaDStream, JavaStreamingContext} +import spark.streaming._ +import java.util.ArrayList +import collection.JavaConversions._ + +/** Exposes streaming test functionality in a Java-friendly way. */ +trait JavaTestBase extends TestSuiteBase { + + /** + * Create a [[spark.streaming.TestInputStream]] and attach it to the supplied context. + * The stream will be derived from the supplied lists of Java objects. + **/ + def attachTestInputStream[T]( + ssc: JavaStreamingContext, + data: JList[JList[T]], + numPartitions: Int) = { + val seqData = data.map(Seq(_:_*)) + + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + val dstream = new TestInputStream[T](ssc.ssc, seqData, numPartitions) + ssc.ssc.registerInputStream(dstream) + new JavaDStream[T](dstream) + } + + /** + * Attach a provided stream to it's associated StreamingContext as a + * [[spark.streaming.TestOutputStream]]. + **/ + def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T,This]]( + dstream: JavaDStreamLike[T, This]) = { + implicit val cm: ClassManifest[T] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + val ostream = new TestOutputStream(dstream.dstream, + new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]]) + dstream.dstream.ssc.registerOutputStream(ostream) + } + + /** + * Process all registered streams for a numBatches batches, failing if + * numExpectedOutput RDD's are not generated. Generated RDD's are collected + * and returned, represented as a list for each batch interval. + */ + def runStreams[V]( + ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = { + implicit val cm: ClassManifest[V] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput) + val out = new ArrayList[JList[V]]() + res.map(entry => out.append(new ArrayList[V](entry))) + out + } +} + +object JavaTestUtils extends JavaTestBase { + +} + +object JavaCheckpointTestUtils extends JavaTestBase { + override def actuallyWait = true +}
\ No newline at end of file diff --git a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java new file mode 100644 index 0000000000..374793b57e --- /dev/null +++ b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java @@ -0,0 +1,1003 @@ +package spark.streaming; + +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.io.Files; +import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import scala.Tuple2; +import spark.HashPartitioner; +import spark.api.java.JavaRDD; +import spark.api.java.function.*; +import spark.storage.StorageLevel; +import spark.streaming.api.java.JavaDStream; +import spark.streaming.api.java.JavaPairDStream; +import spark.streaming.api.java.JavaStreamingContext; +import spark.streaming.JavaTestUtils; +import spark.streaming.JavaCheckpointTestUtils; +import spark.streaming.dstream.KafkaPartitionKey; + +import java.io.*; +import java.util.*; + +// The test suite itself is Serializable so that anonymous Function implementations can be +// serialized, as an alternative to converting these anonymous classes to static inner classes; +// see http://stackoverflow.com/questions/758570/. +public class JavaAPISuite implements Serializable { + private transient JavaStreamingContext sc; + + @Before + public void setUp() { + sc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); + } + + @After + public void tearDown() { + sc.stop(); + sc = null; + // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown + System.clearProperty("spark.master.port"); + } + + @Test + public void testCount() { + List<List<Integer>> inputData = Arrays.asList( + Arrays.asList(1,2,3,4), + Arrays.asList(3,4,5), + Arrays.asList(3)); + + List<List<Long>> expected = Arrays.asList( + Arrays.asList(4L), + Arrays.asList(3L), + Arrays.asList(1L)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream count = stream.count(); + JavaTestUtils.attachTestOutputStream(count); + List<List<Long>> result = JavaTestUtils.runStreams(sc, 3, 3); + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testMap() { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("hello", "world"), + Arrays.asList("goodnight", "moon")); + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(5,5), + Arrays.asList(9,4)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream letterCount = stream.map(new Function<String, Integer>() { + @Override + public Integer call(String s) throws Exception { + return s.length(); + } + }); + JavaTestUtils.attachTestOutputStream(letterCount); + List<List<Integer>> result = JavaTestUtils.runStreams(sc, 2, 2); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testWindow() { + List<List<Integer>> inputData = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9)); + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6,1,2,3), + Arrays.asList(7,8,9,4,5,6), + Arrays.asList(7,8,9)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream windowed = stream.window(new Duration(2000)); + JavaTestUtils.attachTestOutputStream(windowed); + List<List<Integer>> result = JavaTestUtils.runStreams(sc, 4, 4); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testWindowWithSlideDuration() { + List<List<Integer>> inputData = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9), + Arrays.asList(10,11,12), + Arrays.asList(13,14,15), + Arrays.asList(16,17,18)); + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(1,2,3,4,5,6), + Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12), + Arrays.asList(7,8,9,10,11,12,13,14,15,16,17,18), + Arrays.asList(13,14,15,16,17,18)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream windowed = stream.window(new Duration(4000), new Duration(2000)); + JavaTestUtils.attachTestOutputStream(windowed); + List<List<Integer>> result = JavaTestUtils.runStreams(sc, 8, 4); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testTumble() { + List<List<Integer>> inputData = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9), + Arrays.asList(10,11,12), + Arrays.asList(13,14,15), + Arrays.asList(16,17,18)); + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(1,2,3,4,5,6), + Arrays.asList(7,8,9,10,11,12), + Arrays.asList(13,14,15,16,17,18)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream windowed = stream.tumble(new Duration(2000)); + JavaTestUtils.attachTestOutputStream(windowed); + List<List<Integer>> result = JavaTestUtils.runStreams(sc, 6, 3); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testFilter() { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("giants", "dodgers"), + Arrays.asList("yankees", "red socks")); + + List<List<String>> expected = Arrays.asList( + Arrays.asList("giants"), + Arrays.asList("yankees")); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream filtered = stream.filter(new Function<String, Boolean>() { + @Override + public Boolean call(String s) throws Exception { + return s.contains("a"); + } + }); + JavaTestUtils.attachTestOutputStream(filtered); + List<List<String>> result = JavaTestUtils.runStreams(sc, 2, 2); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testGlom() { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("giants", "dodgers"), + Arrays.asList("yankees", "red socks")); + + List<List<List<String>>> expected = Arrays.asList( + Arrays.asList(Arrays.asList("giants", "dodgers")), + Arrays.asList(Arrays.asList("yankees", "red socks"))); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream glommed = stream.glom(); + JavaTestUtils.attachTestOutputStream(glommed); + List<List<List<String>>> result = JavaTestUtils.runStreams(sc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testMapPartitions() { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("giants", "dodgers"), + Arrays.asList("yankees", "red socks")); + + List<List<String>> expected = Arrays.asList( + Arrays.asList("GIANTSDODGERS"), + Arrays.asList("YANKEESRED SOCKS")); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream mapped = stream.mapPartitions(new FlatMapFunction<Iterator<String>, String>() { + @Override + public Iterable<String> call(Iterator<String> in) { + String out = ""; + while (in.hasNext()) { + out = out + in.next().toUpperCase(); + } + return Lists.newArrayList(out); + } + }); + JavaTestUtils.attachTestOutputStream(mapped); + List<List<List<String>>> result = JavaTestUtils.runStreams(sc, 2, 2); + + Assert.assertEquals(expected, result); + } + + private class IntegerSum extends Function2<Integer, Integer, Integer> { + @Override + public Integer call(Integer i1, Integer i2) throws Exception { + return i1 + i2; + } + } + + private class IntegerDifference extends Function2<Integer, Integer, Integer> { + @Override + public Integer call(Integer i1, Integer i2) throws Exception { + return i1 - i2; + } + } + + @Test + public void testReduce() { + List<List<Integer>> inputData = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9)); + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(6), + Arrays.asList(15), + Arrays.asList(24)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream reduced = stream.reduce(new IntegerSum()); + JavaTestUtils.attachTestOutputStream(reduced); + List<List<Integer>> result = JavaTestUtils.runStreams(sc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testReduceByWindow() { + List<List<Integer>> inputData = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9)); + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(6), + Arrays.asList(21), + Arrays.asList(39), + Arrays.asList(24)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream reducedWindowed = stream.reduceByWindow(new IntegerSum(), + new IntegerDifference(), new Duration(2000), new Duration(1000)); + JavaTestUtils.attachTestOutputStream(reducedWindowed); + List<List<Integer>> result = JavaTestUtils.runStreams(sc, 4, 4); + + Assert.assertEquals(expected, result); + } + + @Test + public void testTransform() { + List<List<Integer>> inputData = Arrays.asList( + Arrays.asList(1,2,3), + Arrays.asList(4,5,6), + Arrays.asList(7,8,9)); + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(3,4,5), + Arrays.asList(6,7,8), + Arrays.asList(9,10,11)); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream transformed = stream.transform(new Function<JavaRDD<Integer>, JavaRDD<Integer>>() { + @Override + public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception { + return in.map(new Function<Integer, Integer>() { + @Override + public Integer call(Integer i) throws Exception { + return i + 2; + } + }); + }}); + JavaTestUtils.attachTestOutputStream(transformed); + List<List<Integer>> result = JavaTestUtils.runStreams(sc, 3, 3); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testFlatMap() { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("go", "giants"), + Arrays.asList("boo", "dodgers"), + Arrays.asList("athletics")); + + List<List<String>> expected = Arrays.asList( + Arrays.asList("g","o","g","i","a","n","t","s"), + Arrays.asList("b", "o", "o", "d","o","d","g","e","r","s"), + Arrays.asList("a","t","h","l","e","t","i","c","s")); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream flatMapped = stream.flatMap(new FlatMapFunction<String, String>() { + @Override + public Iterable<String> call(String x) { + return Lists.newArrayList(x.split("(?!^)")); + } + }); + JavaTestUtils.attachTestOutputStream(flatMapped); + List<List<String>> result = JavaTestUtils.runStreams(sc, 3, 3); + + assertOrderInvariantEquals(expected, result); + } + + @Test + public void testPairFlatMap() { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("giants"), + Arrays.asList("dodgers"), + Arrays.asList("athletics")); + + List<List<Tuple2<Integer, String>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<Integer, String>(6, "g"), + new Tuple2<Integer, String>(6, "i"), + new Tuple2<Integer, String>(6, "a"), + new Tuple2<Integer, String>(6, "n"), + new Tuple2<Integer, String>(6, "t"), + new Tuple2<Integer, String>(6, "s")), + Arrays.asList( + new Tuple2<Integer, String>(7, "d"), + new Tuple2<Integer, String>(7, "o"), + new Tuple2<Integer, String>(7, "d"), + new Tuple2<Integer, String>(7, "g"), + new Tuple2<Integer, String>(7, "e"), + new Tuple2<Integer, String>(7, "r"), + new Tuple2<Integer, String>(7, "s")), + Arrays.asList( + new Tuple2<Integer, String>(9, "a"), + new Tuple2<Integer, String>(9, "t"), + new Tuple2<Integer, String>(9, "h"), + new Tuple2<Integer, String>(9, "l"), + new Tuple2<Integer, String>(9, "e"), + new Tuple2<Integer, String>(9, "t"), + new Tuple2<Integer, String>(9, "i"), + new Tuple2<Integer, String>(9, "c"), + new Tuple2<Integer, String>(9, "s"))); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaPairDStream flatMapped = stream.flatMap(new PairFlatMapFunction<String, Integer, String>() { + @Override + public Iterable<Tuple2<Integer, String>> call(String in) throws Exception { + List<Tuple2<Integer, String>> out = Lists.newArrayList(); + for (String letter: in.split("(?!^)")) { + out.add(new Tuple2<Integer, String>(in.length(), letter)); + } + return out; + } + }); + JavaTestUtils.attachTestOutputStream(flatMapped); + List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(sc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testUnion() { + List<List<Integer>> inputData1 = Arrays.asList( + Arrays.asList(1,1), + Arrays.asList(2,2), + Arrays.asList(3,3)); + + List<List<Integer>> inputData2 = Arrays.asList( + Arrays.asList(4,4), + Arrays.asList(5,5), + Arrays.asList(6,6)); + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(1,1,4,4), + Arrays.asList(2,2,5,5), + Arrays.asList(3,3,6,6)); + + JavaDStream stream1 = JavaTestUtils.attachTestInputStream(sc, inputData1, 2); + JavaDStream stream2 = JavaTestUtils.attachTestInputStream(sc, inputData2, 2); + + JavaDStream unioned = stream1.union(stream2); + JavaTestUtils.attachTestOutputStream(unioned); + List<List<Integer>> result = JavaTestUtils.runStreams(sc, 3, 3); + + assertOrderInvariantEquals(expected, result); + } + + /* + * Performs an order-invariant comparison of lists representing two RDD streams. This allows + * us to account for ordering variation within individual RDD's which occurs during windowing. + */ + public static <T extends Comparable> void assertOrderInvariantEquals( + List<List<T>> expected, List<List<T>> actual) { + for (List<T> list: expected) { + Collections.sort(list); + } + for (List<T> list: actual) { + Collections.sort(list); + } + Assert.assertEquals(expected, actual); + } + + + // PairDStream Functions + @Test + public void testPairFilter() { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("giants", "dodgers"), + Arrays.asList("yankees", "red socks")); + + List<List<Tuple2<String, Integer>>> expected = Arrays.asList( + Arrays.asList(new Tuple2<String, Integer>("giants", 6)), + Arrays.asList(new Tuple2<String, Integer>("yankees", 7))); + + JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = stream.map( + new PairFunction<String, String, Integer>() { + @Override + public Tuple2 call(String in) throws Exception { + return new Tuple2<String, Integer>(in, in.length()); + } + }); + + JavaPairDStream<String, Integer> filtered = pairStream.filter( + new Function<Tuple2<String, Integer>, Boolean>() { + @Override + public Boolean call(Tuple2<String, Integer> in) throws Exception { + return in._1().contains("a"); + } + }); + JavaTestUtils.attachTestOutputStream(filtered); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 2, 2); + + Assert.assertEquals(expected, result); + } + + List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList( + Arrays.asList(new Tuple2<String, String>("california", "dodgers"), + new Tuple2<String, String>("california", "giants"), + new Tuple2<String, String>("new york", "yankees"), + new Tuple2<String, String>("new york", "mets")), + Arrays.asList(new Tuple2<String, String>("california", "sharks"), + new Tuple2<String, String>("california", "ducks"), + new Tuple2<String, String>("new york", "rangers"), + new Tuple2<String, String>("new york", "islanders"))); + + List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList( + Arrays.asList( + new Tuple2<String, Integer>("california", 1), + new Tuple2<String, Integer>("california", 3), + new Tuple2<String, Integer>("new york", 4), + new Tuple2<String, Integer>("new york", 1)), + Arrays.asList( + new Tuple2<String, Integer>("california", 5), + new Tuple2<String, Integer>("california", 5), + new Tuple2<String, Integer>("new york", 3), + new Tuple2<String, Integer>("new york", 1))); + + @Test + public void testPairGroupByKey() { + List<List<Tuple2<String, String>>> inputData = stringStringKVStream; + + List<List<Tuple2<String, List<String>>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<String, List<String>>("california", Arrays.asList("dodgers", "giants")), + new Tuple2<String, List<String>>("new york", Arrays.asList("yankees", "mets"))), + Arrays.asList( + new Tuple2<String, List<String>>("california", Arrays.asList("sharks", "ducks")), + new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders")))); + + JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, List<String>> grouped = pairStream.groupByKey(); + JavaTestUtils.attachTestOutputStream(grouped); + List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(sc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testPairReduceByKey() { + List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; + + List<List<Tuple2<String, Integer>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<String, Integer>("california", 4), + new Tuple2<String, Integer>("new york", 5)), + Arrays.asList( + new Tuple2<String, Integer>("california", 10), + new Tuple2<String, Integer>("new york", 4))); + + JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream( + sc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, Integer> reduced = pairStream.reduceByKey(new IntegerSum()); + + JavaTestUtils.attachTestOutputStream(reduced); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testCombineByKey() { + List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; + + List<List<Tuple2<String, Integer>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<String, Integer>("california", 4), + new Tuple2<String, Integer>("new york", 5)), + Arrays.asList( + new Tuple2<String, Integer>("california", 10), + new Tuple2<String, Integer>("new york", 4))); + + JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream( + sc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey( + new Function<Integer, Integer>() { + @Override + public Integer call(Integer i) throws Exception { + return i; + } + }, new IntegerSum(), new IntegerSum(), new HashPartitioner(2)); + + JavaTestUtils.attachTestOutputStream(combined); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testCountByKey() { + List<List<Tuple2<String, String>>> inputData = stringStringKVStream; + + List<List<Tuple2<String, Long>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<String, Long>("california", 2L), + new Tuple2<String, Long>("new york", 2L)), + Arrays.asList( + new Tuple2<String, Long>("california", 2L), + new Tuple2<String, Long>("new york", 2L))); + + JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( + sc, inputData, 1); + JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, Long> counted = pairStream.countByKey(); + JavaTestUtils.attachTestOutputStream(counted); + List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(sc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testGroupByKeyAndWindow() { + List<List<Tuple2<String, String>>> inputData = stringStringKVStream; + + List<List<Tuple2<String, List<String>>>> expected = Arrays.asList( + Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("dodgers", "giants")), + new Tuple2<String, List<String>>("new york", Arrays.asList("yankees", "mets"))), + Arrays.asList(new Tuple2<String, List<String>>("california", + Arrays.asList("sharks", "ducks", "dodgers", "giants")), + new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders", "yankees", "mets"))), + Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("sharks", "ducks")), + new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders")))); + + JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, List<String>> groupWindowed = + pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000)); + JavaTestUtils.attachTestOutputStream(groupWindowed); + List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(sc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testReduceByKeyAndWindow() { + List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; + + List<List<Tuple2<String, Integer>>> expected = Arrays.asList( + Arrays.asList(new Tuple2<String, Integer>("california", 4), + new Tuple2<String, Integer>("new york", 5)), + Arrays.asList(new Tuple2<String, Integer>("california", 14), + new Tuple2<String, Integer>("new york", 9)), + Arrays.asList(new Tuple2<String, Integer>("california", 10), + new Tuple2<String, Integer>("new york", 4))); + + JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, Integer> reduceWindowed = + pairStream.reduceByKeyAndWindow(new IntegerSum(), new Duration(2000), new Duration(1000)); + JavaTestUtils.attachTestOutputStream(reduceWindowed); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testUpdateStateByKey() { + List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; + + List<List<Tuple2<String, Integer>>> expected = Arrays.asList( + Arrays.asList(new Tuple2<String, Integer>("california", 4), + new Tuple2<String, Integer>("new york", 5)), + Arrays.asList(new Tuple2<String, Integer>("california", 14), + new Tuple2<String, Integer>("new york", 9)), + Arrays.asList(new Tuple2<String, Integer>("california", 14), + new Tuple2<String, Integer>("new york", 9))); + + JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey( + new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>(){ + @Override + public Optional<Integer> call(List<Integer> values, Optional<Integer> state) { + int out = 0; + if (state.isPresent()) { + out = out + state.get(); + } + for (Integer v: values) { + out = out + v; + } + return Optional.of(out); + } + }); + JavaTestUtils.attachTestOutputStream(updated); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testReduceByKeyAndWindowWithInverse() { + List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream; + + List<List<Tuple2<String, Integer>>> expected = Arrays.asList( + Arrays.asList(new Tuple2<String, Integer>("california", 4), + new Tuple2<String, Integer>("new york", 5)), + Arrays.asList(new Tuple2<String, Integer>("california", 14), + new Tuple2<String, Integer>("new york", 9)), + Arrays.asList(new Tuple2<String, Integer>("california", 10), + new Tuple2<String, Integer>("new york", 4))); + + JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); + JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, Integer> reduceWindowed = + pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(), new Duration(2000), new Duration(1000)); + JavaTestUtils.attachTestOutputStream(reduceWindowed); + List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testCountByKeyAndWindow() { + List<List<Tuple2<String, String>>> inputData = stringStringKVStream; + + List<List<Tuple2<String, Long>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<String, Long>("california", 2L), + new Tuple2<String, Long>("new york", 2L)), + Arrays.asList( + new Tuple2<String, Long>("california", 4L), + new Tuple2<String, Long>("new york", 4L)), + Arrays.asList( + new Tuple2<String, Long>("california", 2L), + new Tuple2<String, Long>("new york", 2L))); + + JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( + sc, inputData, 1); + JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, Long> counted = + pairStream.countByKeyAndWindow(new Duration(2000), new Duration(1000)); + JavaTestUtils.attachTestOutputStream(counted); + List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(sc, 3, 3); + + Assert.assertEquals(expected, result); + } + + @Test + public void testMapValues() { + List<List<Tuple2<String, String>>> inputData = stringStringKVStream; + + List<List<Tuple2<String, String>>> expected = Arrays.asList( + Arrays.asList(new Tuple2<String, String>("california", "DODGERS"), + new Tuple2<String, String>("california", "GIANTS"), + new Tuple2<String, String>("new york", "YANKEES"), + new Tuple2<String, String>("new york", "METS")), + Arrays.asList(new Tuple2<String, String>("california", "SHARKS"), + new Tuple2<String, String>("california", "DUCKS"), + new Tuple2<String, String>("new york", "RANGERS"), + new Tuple2<String, String>("new york", "ISLANDERS"))); + + JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( + sc, inputData, 1); + JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); + + JavaPairDStream<String, String> mapped = pairStream.mapValues(new Function<String, String>() { + @Override + public String call(String s) throws Exception { + return s.toUpperCase(); + } + }); + + JavaTestUtils.attachTestOutputStream(mapped); + List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(sc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testFlatMapValues() { + List<List<Tuple2<String, String>>> inputData = stringStringKVStream; + + List<List<Tuple2<String, String>>> expected = Arrays.asList( + Arrays.asList(new Tuple2<String, String>("california", "dodgers1"), + new Tuple2<String, String>("california", "dodgers2"), + new Tuple2<String, String>("california", "giants1"), + new Tuple2<String, String>("california", "giants2"), + new Tuple2<String, String>("new york", "yankees1"), + new Tuple2<String, String>("new york", "yankees2"), + new Tuple2<String, String>("new york", "mets1"), + new Tuple2<String, String>("new york", "mets2")), + Arrays.asList(new Tuple2<String, String>("california", "sharks1"), + new Tuple2<String, String>("california", "sharks2"), + new Tuple2<String, String>("california", "ducks1"), + new Tuple2<String, String>("california", "ducks2"), + new Tuple2<String, String>("new york", "rangers1"), + new Tuple2<String, String>("new york", "rangers2"), + new Tuple2<String, String>("new york", "islanders1"), + new Tuple2<String, String>("new york", "islanders2"))); + + JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream( + sc, inputData, 1); + JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream); + + + JavaPairDStream<String, String> flatMapped = pairStream.flatMapValues( + new Function<String, Iterable<String>>() { + @Override + public Iterable<String> call(String in) { + List<String> out = new ArrayList<String>(); + out.add(in + "1"); + out.add(in + "2"); + return out; + } + }); + + JavaTestUtils.attachTestOutputStream(flatMapped); + List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(sc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testCoGroup() { + List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( + Arrays.asList(new Tuple2<String, String>("california", "dodgers"), + new Tuple2<String, String>("new york", "yankees")), + Arrays.asList(new Tuple2<String, String>("california", "sharks"), + new Tuple2<String, String>("new york", "rangers"))); + + List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList( + Arrays.asList(new Tuple2<String, String>("california", "giants"), + new Tuple2<String, String>("new york", "mets")), + Arrays.asList(new Tuple2<String, String>("california", "ducks"), + new Tuple2<String, String>("new york", "islanders"))); + + + List<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<String, Tuple2<List<String>, List<String>>>("california", + new Tuple2<List<String>, List<String>>(Arrays.asList("dodgers"), Arrays.asList("giants"))), + new Tuple2<String, Tuple2<List<String>, List<String>>>("new york", + new Tuple2<List<String>, List<String>>(Arrays.asList("yankees"), Arrays.asList("mets")))), + Arrays.asList( + new Tuple2<String, Tuple2<List<String>, List<String>>>("california", + new Tuple2<List<String>, List<String>>(Arrays.asList("sharks"), Arrays.asList("ducks"))), + new Tuple2<String, Tuple2<List<String>, List<String>>>("new york", + new Tuple2<List<String>, List<String>>(Arrays.asList("rangers"), Arrays.asList("islanders"))))); + + + JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream( + sc, stringStringKVStream1, 1); + JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1); + + JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream( + sc, stringStringKVStream2, 1); + JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2); + + JavaPairDStream<String, Tuple2<List<String>, List<String>>> grouped = pairStream1.cogroup(pairStream2); + JavaTestUtils.attachTestOutputStream(grouped); + List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(sc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testJoin() { + List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList( + Arrays.asList(new Tuple2<String, String>("california", "dodgers"), + new Tuple2<String, String>("new york", "yankees")), + Arrays.asList(new Tuple2<String, String>("california", "sharks"), + new Tuple2<String, String>("new york", "rangers"))); + + List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList( + Arrays.asList(new Tuple2<String, String>("california", "giants"), + new Tuple2<String, String>("new york", "mets")), + Arrays.asList(new Tuple2<String, String>("california", "ducks"), + new Tuple2<String, String>("new york", "islanders"))); + + + List<List<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList( + Arrays.asList( + new Tuple2<String, Tuple2<String, String>>("california", + new Tuple2<String, String>("dodgers", "giants")), + new Tuple2<String, Tuple2<String, String>>("new york", + new Tuple2<String, String>("yankees", "mets"))), + Arrays.asList( + new Tuple2<String, Tuple2<String, String>>("california", + new Tuple2<String, String>("sharks", "ducks")), + new Tuple2<String, Tuple2<String, String>>("new york", + new Tuple2<String, String>("rangers", "islanders")))); + + + JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream( + sc, stringStringKVStream1, 1); + JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1); + + JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream( + sc, stringStringKVStream2, 1); + JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2); + + JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.join(pairStream2); + JavaTestUtils.attachTestOutputStream(joined); + List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(sc, 2, 2); + + Assert.assertEquals(expected, result); + } + + @Test + public void testCheckpointMasterRecovery() throws InterruptedException { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("this", "is"), + Arrays.asList("a", "test"), + Arrays.asList("counting", "letters")); + + List<List<Integer>> expectedInitial = Arrays.asList( + Arrays.asList(4,2)); + List<List<Integer>> expectedFinal = Arrays.asList( + Arrays.asList(1,4), + Arrays.asList(8,7)); + + + File tempDir = Files.createTempDir(); + sc.checkpoint(tempDir.getAbsolutePath(), new Duration(1000)); + + JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream letterCount = stream.map(new Function<String, Integer>() { + @Override + public Integer call(String s) throws Exception { + return s.length(); + } + }); + JavaCheckpointTestUtils.attachTestOutputStream(letterCount); + List<List<Integer>> initialResult = JavaTestUtils.runStreams(sc, 1, 1); + + assertOrderInvariantEquals(expectedInitial, initialResult); + Thread.sleep(1000); + + sc.stop(); + sc = new JavaStreamingContext(tempDir.getAbsolutePath()); + sc.start(); + List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(sc, 2, 2); + assertOrderInvariantEquals(expectedFinal, finalResult); + } + + /** TEST DISABLED: Pending a discussion about checkpoint() semantics with TD + @Test + public void testCheckpointofIndividualStream() throws InterruptedException { + List<List<String>> inputData = Arrays.asList( + Arrays.asList("this", "is"), + Arrays.asList("a", "test"), + Arrays.asList("counting", "letters")); + + List<List<Integer>> expected = Arrays.asList( + Arrays.asList(4,2), + Arrays.asList(1,4), + Arrays.asList(8,7)); + + JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(sc, inputData, 1); + JavaDStream letterCount = stream.map(new Function<String, Integer>() { + @Override + public Integer call(String s) throws Exception { + return s.length(); + } + }); + JavaCheckpointTestUtils.attachTestOutputStream(letterCount); + + letterCount.checkpoint(new Duration(1000)); + + List<List<Integer>> result1 = JavaCheckpointTestUtils.runStreams(sc, 3, 3); + assertOrderInvariantEquals(expected, result1); + } + */ + + // Input stream tests. These mostly just test that we can instantiate a given InputStream with + // Java arguments and assign it to a JavaDStream without producing type errors. Testing of the + // InputStream functionality is deferred to the existing Scala tests. + @Test + public void testKafkaStream() { + HashMap<String, Integer> topics = Maps.newHashMap(); + HashMap<KafkaPartitionKey, Long> offsets = Maps.newHashMap(); + JavaDStream test1 = sc.kafkaStream("localhost", 12345, "group", topics); + JavaDStream test2 = sc.kafkaStream("localhost", 12345, "group", topics, offsets); + JavaDStream test3 = sc.kafkaStream("localhost", 12345, "group", topics, offsets, + StorageLevel.MEMORY_AND_DISK()); + } + + @Test + public void testNetworkTextStream() { + JavaDStream test = sc.networkTextStream("localhost", 12345); + } + + @Test + public void testNetworkString() { + class Converter extends Function<InputStream, Iterable<String>> { + public Iterable<String> call(InputStream in) { + BufferedReader reader = new BufferedReader(new InputStreamReader(in)); + List<String> out = new ArrayList<String>(); + try { + while (true) { + String line = reader.readLine(); + if (line == null) { break; } + out.add(line); + } + } catch (IOException e) { } + return out; + } + } + + JavaDStream test = sc.networkStream( + "localhost", + 12345, + new Converter(), + StorageLevel.MEMORY_ONLY()); + } + + @Test + public void testTextFileStream() { + JavaDStream test = sc.textFileStream("/tmp/foo"); + } + + @Test + public void testRawNetworkStream() { + JavaDStream test = sc.rawNetworkStream("localhost", 12345); + } + + @Test + public void testFlumeStream() { + JavaDStream test = sc.flumeStream("localhost", 12345); + } + + @Test + public void testFileStream() { + JavaPairDStream<String, String> foo = + sc.<String, String, SequenceFileInputFormat>fileStream("/tmp/foo"); + } +} |