aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala52
-rw-r--r--streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala170
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala91
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala183
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala638
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala288
-rw-r--r--streaming/src/test/scala/JavaTestUtils.scala65
-rw-r--r--streaming/src/test/scala/spark/streaming/JavaAPISuite.java1003
8 files changed, 2370 insertions, 120 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index fbe3cebd6d..036763fe2f 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -98,10 +98,10 @@ abstract class DStream[T: ClassManifest] (
this
}
- /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
+ /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER)
- /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
+ /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def cache(): DStream[T] = persist()
/**
@@ -119,7 +119,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * This method initializes the DStream by setting the "zero" time, based on which
+ * Initialize the DStream by setting the "zero" time, based on which
* the validity of future times is calculated. This method also recursively initializes
* its parent DStreams.
*/
@@ -244,7 +244,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Retrieves a precomputed RDD of this DStream, or computes the RDD. This is an internal
+ * Retrieve a precomputed RDD of this DStream, or computes the RDD. This is an internal
* method that should not be called directly.
*/
protected[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
@@ -283,7 +283,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Generates a SparkStreaming job for the given time. This is an internal method that
+ * Generate a SparkStreaming job for the given time. This is an internal method that
* should not be called directly. This default implementation creates a job
* that materializes the corresponding RDD. Subclasses of DStream may override this
* (eg. ForEachDStream).
@@ -302,7 +302,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Dereferences RDDs that are older than rememberDuration.
+ * Dereference RDDs that are older than rememberDuration.
*/
protected[streaming] def forgetOldRDDs(time: Time) {
val keys = generatedRDDs.keys
@@ -328,7 +328,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Refreshes the list of checkpointed RDDs that will be saved along with checkpoint of
+ * Refresh the list of checkpointed RDDs that will be saved along with checkpoint of
* this stream. This is an internal method that should not be called directly. This is
* a default implementation that saves only the file names of the checkpointed RDDs to
* checkpointData. Subclasses of DStream (especially those of InputDStream) may override
@@ -373,7 +373,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Restores the RDDs in generatedRDDs from the checkpointData. This is an internal method
+ * Restore the RDDs in generatedRDDs from the checkpointData. This is an internal method
* that should not be called directly. This is a default implementation that recreates RDDs
* from the checkpoint file names stored in checkpointData. Subclasses of DStream that
* override the updateCheckpointData() method would also need to override this method.
@@ -425,20 +425,20 @@ abstract class DStream[T: ClassManifest] (
// DStream operations
// =======================================================================
- /** Returns a new DStream by applying a function to all elements of this DStream. */
+ /** Return a new DStream by applying a function to all elements of this DStream. */
def map[U: ClassManifest](mapFunc: T => U): DStream[U] = {
new MappedDStream(this, ssc.sc.clean(mapFunc))
}
/**
- * Returns a new DStream by applying a function to all elements of this DStream,
+ * Return a new DStream by applying a function to all elements of this DStream,
* and then flattening the results
*/
def flatMap[U: ClassManifest](flatMapFunc: T => Traversable[U]): DStream[U] = {
new FlatMappedDStream(this, ssc.sc.clean(flatMapFunc))
}
- /** Returns a new DStream containing only the elements that satisfy a predicate. */
+ /** Return a new DStream containing only the elements that satisfy a predicate. */
def filter(filterFunc: T => Boolean): DStream[T] = new FilteredDStream(this, filterFunc)
/**
@@ -461,20 +461,20 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Returns a new DStream in which each RDD has a single element generated by reducing each RDD
+ * Return a new DStream in which each RDD has a single element generated by reducing each RDD
* of this DStream.
*/
def reduce(reduceFunc: (T, T) => T): DStream[T] =
this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2)
/**
- * Returns a new DStream in which each RDD has a single element generated by counting each RDD
+ * Return a new DStream in which each RDD has a single element generated by counting each RDD
* of this DStream.
*/
def count(): DStream[Long] = this.map(_ => 1L).reduce(_ + _)
/**
- * Applies a function to each RDD in this DStream. This is an output operator, so
+ * Apply a function to each RDD in this DStream. This is an output operator, so
* this DStream will be registered as an output stream and therefore materialized.
*/
def foreach(foreachFunc: RDD[T] => Unit) {
@@ -482,7 +482,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Applies a function to each RDD in this DStream. This is an output operator, so
+ * Apply a function to each RDD in this DStream. This is an output operator, so
* this DStream will be registered as an output stream and therefore materialized.
*/
def foreach(foreachFunc: (RDD[T], Time) => Unit) {
@@ -492,7 +492,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Returns a new DStream in which each RDD is generated by applying a function
+ * Return a new DStream in which each RDD is generated by applying a function
* on each RDD of this DStream.
*/
def transform[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
@@ -500,7 +500,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Returns a new DStream in which each RDD is generated by applying a function
+ * Return a new DStream in which each RDD is generated by applying a function
* on each RDD of this DStream.
*/
def transform[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
@@ -508,7 +508,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Prints the first ten elements of each RDD generated in this DStream. This is an output
+ * Print the first ten elements of each RDD generated in this DStream. This is an output
* operator, so this DStream will be registered as an output stream and there materialized.
*/
def print() {
@@ -545,7 +545,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Returns a new DStream which computed based on tumbling window on this DStream.
+ * Return a new DStream which computed based on tumbling window on this DStream.
* This is equivalent to window(batchTime, batchTime).
* @param batchDuration tumbling window duration; must be a multiple of this DStream's
* batching interval
@@ -553,7 +553,7 @@ abstract class DStream[T: ClassManifest] (
def tumble(batchDuration: Duration): DStream[T] = window(batchDuration, batchDuration)
/**
- * Returns a new DStream in which each RDD has a single element generated by reducing all
+ * Return a new DStream in which each RDD has a single element generated by reducing all
* elements in a window over this DStream. windowDuration and slideDuration are as defined
* in the window() operation. This is equivalent to
* window(windowDuration, slideDuration).reduce(reduceFunc)
@@ -578,7 +578,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Returns a new DStream in which each RDD has a single element generated by counting the number
+ * Return a new DStream in which each RDD has a single element generated by counting the number
* of elements in a window over this DStream. windowDuration and slideDuration are as defined in the
* window() operation. This is equivalent to window(windowDuration, slideDuration).count()
*/
@@ -587,20 +587,20 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Returns a new DStream by unifying data of another DStream with this DStream.
+ * Return a new DStream by unifying data of another DStream with this DStream.
* @param that Another DStream having the same slideDuration as this DStream.
*/
def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that))
/**
- * Returns all the RDDs defined by the Interval object (both end times included)
+ * Return all the RDDs defined by the Interval object (both end times included)
*/
protected[streaming] def slice(interval: Interval): Seq[RDD[T]] = {
slice(interval.beginTime, interval.endTime)
}
/**
- * Returns all the RDDs between 'fromTime' to 'toTime' (both included)
+ * Return all the RDDs between 'fromTime' to 'toTime' (both included)
*/
def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
val rdds = new ArrayBuffer[RDD[T]]()
@@ -616,7 +616,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Saves each RDD in this DStream as a Sequence file of serialized objects.
+ * Save each RDD in this DStream as a Sequence file of serialized objects.
* The file name at each batch interval is generated based on `prefix` and
* `suffix`: "prefix-TIME_IN_MS.suffix".
*/
@@ -629,7 +629,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
- * Saves each RDD in this DStream as at text file, using string representation
+ * Save each RDD in this DStream as at text file, using string representation
* of elements. The file name at each batch interval is generated based on
* `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
diff --git a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
index 3dbef69868..fbcf061126 100644
--- a/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/spark/streaming/PairDStreamFunctions.scala
@@ -26,29 +26,23 @@ extends Serializable {
}
/**
- * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream.
- * Therefore, the values for each key in `this` DStream's RDDs are grouped into a
- * single sequence to generate the RDDs of the new DStream. Hash partitioning is
- * used to generate the RDDs with Spark's default number of partitions.
+ * Create a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
+ * generate the RDDs with Spark's default number of partitions.
*/
def groupByKey(): DStream[(K, Seq[V])] = {
groupByKey(defaultPartitioner())
}
/**
- * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream.
- * Therefore, the values for each key in `this` DStream's RDDs are grouped into a
- * single sequence to generate the RDDs of the new DStream. Hash partitioning is
- * used to generate the RDDs with `numPartitions` partitions.
+ * Create a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
+ * generate the RDDs with `numPartitions` partitions.
*/
def groupByKey(numPartitions: Int): DStream[(K, Seq[V])] = {
groupByKey(defaultPartitioner(numPartitions))
}
/**
- * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream.
- * Therefore, the values for each key in `this` DStream's RDDs are grouped into a
- * single sequence to generate the RDDs of the new DStream. [[spark.Partitioner]]
+ * Create a new DStream by applying `groupByKey` on each RDD. The supplied [[spark.Partitioner]]
* is used to control the partitioning of each RDD.
*/
def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = {
@@ -60,30 +54,27 @@ extends Serializable {
}
/**
- * Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream.
- * Therefore, the values for each key in `this` DStream's RDDs is merged using the
- * associative reduce function to generate the RDDs of the new DStream.
- * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are
+ * merged using the associative reduce function. Hash partitioning is used to generate the RDDs
+ * with Spark's default number of partitions.
*/
def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = {
reduceByKey(reduceFunc, defaultPartitioner())
}
/**
- * Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream.
- * Therefore, the values for each key in `this` DStream's RDDs is merged using the
- * associative reduce function to generate the RDDs of the new DStream.
- * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are
+ * merged using the supplied reduce function. Hash partitioning is used to generate the RDDs
+ * with `numPartitions` partitions.
*/
def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)] = {
reduceByKey(reduceFunc, defaultPartitioner(numPartitions))
}
/**
- * Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream.
- * Therefore, the values for each key in `this` DStream's RDDs is merged using the
- * associative reduce function to generate the RDDs of the new DStream.
- * [[spark.Partitioner]] is used to control the partitioning of each RDD.
+ * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are
+ * merged using the supplied reduce function. [[spark.Partitioner]] is used to control the
+ * partitioning of each RDD.
*/
def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = {
val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
@@ -91,9 +82,9 @@ extends Serializable {
}
/**
- * Generic function to combine elements of each key in DStream's RDDs using custom function.
- * This is similar to the combineByKey for RDDs. Please refer to combineByKey in
- * [[spark.PairRDDFunctions]] for more information.
+ * Combine elements of each key in DStream's RDDs using custom function. This is similar to the
+ * combineByKey for RDDs. Please refer to combineByKey in [[spark.PairRDDFunctions]] for more
+ * information.
*/
def combineByKey[C: ClassManifest](
createCombiner: V => C,
@@ -104,19 +95,18 @@ extends Serializable {
}
/**
- * Creates a new DStream by counting the number of values of each key in each RDD
- * of `this` DStream. Hash partitioning is used to generate the RDDs with Spark's
- * `numPartitions` partitions.
+ * Create a new DStream by counting the number of values of each key in each RDD. Hash
+ * partitioning is used to generate the RDDs with Spark's `numPartitions` partitions.
*/
def countByKey(numPartitions: Int = self.ssc.sc.defaultParallelism): DStream[(K, Long)] = {
self.map(x => (x._1, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
}
/**
- * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
- * This is similar to `DStream.groupByKey()` but applies it over a sliding window.
- * The new DStream generates RDDs with the same interval as this DStream.
- * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * Creates a new DStream by applying `groupByKey` over a sliding window. This is similar to
+ * `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs
+ * with the same interval as this DStream. Hash partitioning is used to generate the RDDs with
+ * Spark's default number of partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
*/
@@ -125,9 +115,9 @@ extends Serializable {
}
/**
- * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
- * This is similar to `DStream.groupByKey()` but applies it over a sliding window.
- * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * Create a new DStream by applying `groupByKey` over a sliding window. Similar to
+ * `DStream.groupByKey()`, but applies it over a sliding window. Hash partitioning is used to
+ * generate the RDDs with Spark's default number of partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
@@ -139,8 +129,8 @@ extends Serializable {
}
/**
- * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
- * This is similar to `DStream.groupByKey()` but applies it over a sliding window.
+ * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
+ * Similar to `DStream.groupByKey()`, but applies it over a sliding window.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
@@ -158,8 +148,8 @@ extends Serializable {
}
/**
- * Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
- * This is similar to `DStream.groupByKey()` but applies it over a sliding window.
+ * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
+ * Similar to `DStream.groupByKey()`, but applies it over a sliding window.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
@@ -176,10 +166,10 @@ extends Serializable {
}
/**
- * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
- * This is similar to `DStream.reduceByKey()` but applies it over a sliding window.
- * The new DStream generates RDDs with the same interval as this DStream.
- * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * Create a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
+ * Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream
+ * generates RDDs with the same interval as this DStream. Hash partitioning is used to generate
+ * the RDDs with Spark's default number of partitions.
* @param reduceFunc associative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
@@ -192,9 +182,9 @@ extends Serializable {
}
/**
- * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
- * This is similar to `DStream.reduceByKey()` but applies it over a sliding window.
- * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * Create a new DStream by applying `reduceByKey` over a sliding window. This is similar to
+ * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
+ * generate the RDDs with Spark's default number of partitions.
* @param reduceFunc associative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
@@ -211,9 +201,9 @@ extends Serializable {
}
/**
- * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
- * This is similar to `DStream.reduceByKey()` but applies it over a sliding window.
- * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ * Create a new DStream by applying `reduceByKey` over a sliding window. This is similar to
+ * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
+ * generate the RDDs with `numPartitions` partitions.
* @param reduceFunc associative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
@@ -232,8 +222,8 @@ extends Serializable {
}
/**
- * Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
- * This is similar to `DStream.reduceByKey()` but applies it over a sliding window.
+ * Create a new DStream by applying `reduceByKey` over a sliding window. Similar to
+ * `DStream.reduceByKey()`, but applies it over a sliding window.
* @param reduceFunc associative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
@@ -255,9 +245,8 @@ extends Serializable {
}
/**
- * Creates a new DStream by reducing over a window in a smarter way.
- * The reduced value of over a new window is calculated incrementally by using the
- * old window's reduce value :
+ * Create a new DStream by reducing over a using incremental computation.
+ * The reduced value of over a new window is calculated using the old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
* This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
@@ -283,9 +272,8 @@ extends Serializable {
}
/**
- * Creates a new DStream by reducing over a window in a smarter way.
- * The reduced value of over a new window is calculated incrementally by using the
- * old window's reduce value :
+ * Create a new DStream by reducing over a using incremental computation.
+ * The reduced value of over a new window is calculated using the old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
* This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
@@ -313,9 +301,8 @@ extends Serializable {
}
/**
- * Creates a new DStream by reducing over a window in a smarter way.
- * The reduced value of over a new window is calculated incrementally by using the
- * old window's reduce value :
+ * Create a new DStream by reducing over a using incremental computation.
+ * The reduced value of over a new window is calculated using the old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
* This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
@@ -344,7 +331,7 @@ extends Serializable {
}
/**
- * Creates a new DStream by counting the number of values for each key over a window.
+ * Create a new DStream by counting the number of values for each key over a window.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
@@ -369,10 +356,9 @@ extends Serializable {
}
/**
- * Creates a new "state" DStream where the state for each key is updated by applying
- * the given function on the previous state of the key and the new values of the key from
- * `this` DStream. Hash partitioning is used to generate the RDDs with Spark's default
- * number of partitions.
+ * Create a new "state" DStream where the state for each key is updated by applying
+ * the given function on the previous state of the key and the new values of each key.
+ * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated.
* @tparam S State type
@@ -384,9 +370,9 @@ extends Serializable {
}
/**
- * Creates a new "state" DStream where the state for each key is updated by applying
- * the given function on the previous state of the key and the new values of the key from
- * `this` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ * Create a new "state" DStream where the state for each key is updated by applying
+ * the given function on the previous state of the key and the new values of each key.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated.
* @param numPartitions Number of partitions of each RDD in the new DStream.
@@ -400,9 +386,9 @@ extends Serializable {
}
/**
- * Creates a new "state" DStream where the state for each key is updated by applying
- * the given function on the previous state of the key and the new values of the key from
- * `this` DStream. [[spark.Partitioner]] is used to control the partitioning of each RDD.
+ * Create a new "state" DStream where the state for each key is updated by applying
+ * the given function on the previous state of the key and the new values of the key.
+ * [[spark.Partitioner]] is used to control the partitioning of each RDD.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated.
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
@@ -419,9 +405,9 @@ extends Serializable {
}
/**
- * Creates a new "state" DStream where the state for each key is updated by applying
- * the given function on the previous state of the key and the new values of the key from
- * `this` DStream. [[spark.Partitioner]] is used to control the partitioning of each RDD.
+ * Create a new "state" DStream where the state for each key is updated by applying
+ * the given function on the previous state of the key and the new values of each key.
+ * [[spark.Paxrtitioner]] is used to control the partitioning of each RDD.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated. Note, that
* this function may generate a different a tuple with a different key
@@ -451,22 +437,19 @@ extends Serializable {
}
/**
- * Cogroups `this` DStream with `other` DStream. Each RDD of the new DStream will
- * be generated by cogrouping RDDs from`this`and `other` DStreams. Therefore, for
- * each key k in corresponding RDDs of `this` or `other` DStreams, the generated RDD
- * will contains a tuple with the list of values for that key in both RDDs.
- * HashPartitioner is used to partition each generated RDD into default number of partitions.
+ * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this`
+ * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
+ * key in both RDDs. HashPartitioner is used to partition each generated RDD into default number
+ * of partitions.
*/
def cogroup[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = {
cogroup(other, defaultPartitioner())
}
/**
- * Cogroups `this` DStream with `other` DStream. Each RDD of the new DStream will
- * be generated by cogrouping RDDs from`this`and `other` DStreams. Therefore, for
- * each key k in corresponding RDDs of `this` or `other` DStreams, the generated RDD
- * will contains a tuple with the list of values for that key in both RDDs.
- * Partitioner is used to partition each generated RDD.
+ * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this`
+ * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
+ * key in both RDDs. Partitioner is used to partition each generated RDD.
*/
def cogroup[W: ClassManifest](
other: DStream[(K, W)],
@@ -488,8 +471,7 @@ extends Serializable {
}
/**
- * Joins `this` DStream with `other` DStream. Each RDD of the new DStream will
- * be generated by joining RDDs from `this` and `other` DStreams. HashPartitioner is used
+ * Join `this` DStream with `other` DStream. HashPartitioner is used
* to partition each generated RDD into default number of partitions.
*/
def join[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, W))] = {
@@ -497,7 +479,7 @@ extends Serializable {
}
/**
- * Joins `this` DStream with `other` DStream, that is, each RDD of the new DStream will
+ * Join `this` DStream with `other` DStream, that is, each RDD of the new DStream will
* be generated by joining RDDs from `this` and other DStream. Uses the given
* Partitioner to partition each generated RDD.
*/
@@ -513,7 +495,7 @@ extends Serializable {
}
/**
- * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
* based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
*/
def saveAsHadoopFiles[F <: OutputFormat[K, V]](
@@ -524,7 +506,7 @@ extends Serializable {
}
/**
- * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
* based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
*/
def saveAsHadoopFiles(
@@ -543,8 +525,8 @@ extends Serializable {
}
/**
- * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
- * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
+ * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](
prefix: String,
@@ -554,8 +536,8 @@ extends Serializable {
}
/**
- * Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
- * based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
+ * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
def saveAsNewAPIHadoopFiles(
prefix: String,
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
new file mode 100644
index 0000000000..2e7466b16c
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
@@ -0,0 +1,91 @@
+package spark.streaming.api.java
+
+import spark.streaming.{Duration, Time, DStream}
+import spark.api.java.function.{Function => JFunction}
+import spark.api.java.JavaRDD
+import spark.storage.StorageLevel
+
+/**
+ * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
+ * sequence of RDDs (of the same type) representing a continuous stream of data (see [[spark.RDD]]
+ * for more details on RDDs). DStreams can either be created from live data (such as, data from
+ * HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations
+ * such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each
+ * DStream periodically generates a RDD, either from live data or by transforming the RDD generated
+ * by a parent DStream.
+ *
+ * This class contains the basic operations available on all DStreams, such as `map`, `filter` and
+ * `window`. In addition, [[spark.streaming.api.java.JavaPairDStream]] contains operations available
+ * only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. These operations
+ * are automatically available on any DStream of the right type (e.g., DStream[(Int, Int)] through
+ * implicit conversions when `spark.streaming.StreamingContext._` is imported.
+ *
+ * DStreams internally is characterized by a few basic properties:
+ * - A list of other DStreams that the DStream depends on
+ * - A time interval at which the DStream generates an RDD
+ * - A function that is used to generate an RDD after each time interval
+ */
+class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassManifest[T])
+ extends JavaDStreamLike[T, JavaDStream[T]] {
+
+ /** Return a new DStream containing only the elements that satisfy a predicate. */
+ def filter(f: JFunction[T, java.lang.Boolean]): JavaDStream[T] =
+ dstream.filter((x => f(x).booleanValue()))
+
+ /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
+ def cache(): JavaDStream[T] = dstream.cache()
+
+ /** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
+ def persist(): JavaDStream[T] = dstream.cache()
+
+ /** Persist the RDDs of this DStream with the given storage level */
+ def persist(storageLevel: StorageLevel): JavaDStream[T] = dstream.persist(storageLevel)
+
+ /** Generate an RDD for the given duration */
+ def compute(validTime: Time): JavaRDD[T] = {
+ dstream.compute(validTime) match {
+ case Some(rdd) => new JavaRDD(rdd)
+ case None => null
+ }
+ }
+
+ /**
+ * Return a new DStream which is computed based on windowed batches of this DStream.
+ * The new DStream generates RDDs with the same interval as this DStream.
+ * @param windowDuration width of the window; must be a multiple of this DStream's interval.
+ * @return
+ */
+ def window(windowDuration: Duration): JavaDStream[T] =
+ dstream.window(windowDuration)
+
+ /**
+ * Return a new DStream which is computed based on windowed batches of this DStream.
+ * @param windowDuration duration (i.e., width) of the window;
+ * must be a multiple of this DStream's interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's interval
+ */
+ def window(windowDuration: Duration, slideDuration: Duration): JavaDStream[T] =
+ dstream.window(windowDuration, slideDuration)
+
+ /**
+ * Return a new DStream which computed based on tumbling window on this DStream.
+ * This is equivalent to window(batchDuration, batchDuration).
+ * @param batchDuration tumbling window duration; must be a multiple of this DStream's interval
+ */
+ def tumble(batchDuration: Duration): JavaDStream[T] =
+ dstream.tumble(batchDuration)
+
+ /**
+ * Return a new DStream by unifying data of another DStream with this DStream.
+ * @param that Another DStream having the same interval (i.e., slideDuration) as this DStream.
+ */
+ def union(that: JavaDStream[T]): JavaDStream[T] =
+ dstream.union(that.dstream)
+}
+
+object JavaDStream {
+ implicit def fromDStream[T: ClassManifest](dstream: DStream[T]): JavaDStream[T] =
+ new JavaDStream[T](dstream)
+} \ No newline at end of file
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
new file mode 100644
index 0000000000..b93cb7865a
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
@@ -0,0 +1,183 @@
+package spark.streaming.api.java
+
+import java.util.{List => JList}
+import java.lang.{Long => JLong}
+
+import scala.collection.JavaConversions._
+
+import spark.streaming._
+import spark.api.java.JavaRDD
+import spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _}
+import java.util
+import spark.RDD
+import JavaDStream._
+
+trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable {
+ implicit val classManifest: ClassManifest[T]
+
+ def dstream: DStream[T]
+
+ implicit def scalaIntToJavaLong(in: DStream[Long]): JavaDStream[JLong] = {
+ in.map(new JLong(_))
+ }
+
+ /**
+ * Print the first ten elements of each RDD generated in this DStream. This is an output
+ * operator, so this DStream will be registered as an output stream and there materialized.
+ */
+ def print() = dstream.print()
+
+ /**
+ * Return a new DStream in which each RDD has a single element generated by counting each RDD
+ * of this DStream.
+ */
+ def count(): JavaDStream[JLong] = dstream.count()
+
+ /**
+ * Return a new DStream in which each RDD has a single element generated by counting the number
+ * of elements in a window over this DStream. windowDuration and slideDuration are as defined in the
+ * window() operation. This is equivalent to window(windowDuration, slideDuration).count()
+ */
+ def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[JLong] = {
+ dstream.countByWindow(windowDuration, slideDuration)
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying glom() to each RDD of
+ * this DStream. Applying glom() to an RDD coalesces all elements within each partition into
+ * an array.
+ */
+ def glom(): JavaDStream[JList[T]] =
+ new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq)))
+
+ /** Return the StreamingContext associated with this DStream */
+ def context(): StreamingContext = dstream.context()
+
+ /** Return a new DStream by applying a function to all elements of this DStream. */
+ def map[R](f: JFunction[T, R]): JavaDStream[R] = {
+ new JavaDStream(dstream.map(f)(f.returnType()))(f.returnType())
+ }
+
+ /** Return a new DStream by applying a function to all elements of this DStream. */
+ def map[K, V](f: PairFunction[T, K, V]): JavaPairDStream[K, V] = {
+ def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
+ new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType())
+ }
+
+ /**
+ * Return a new DStream by applying a function to all elements of this DStream,
+ * and then flattening the results
+ */
+ def flatMap[U](f: FlatMapFunction[T, U]): JavaDStream[U] = {
+ import scala.collection.JavaConverters._
+ def fn = (x: T) => f.apply(x).asScala
+ new JavaDStream(dstream.flatMap(fn)(f.elementType()))(f.elementType())
+ }
+
+ /**
+ * Return a new DStream by applying a function to all elements of this DStream,
+ * and then flattening the results
+ */
+ def flatMap[K, V](f: PairFlatMapFunction[T, K, V]): JavaPairDStream[K, V] = {
+ import scala.collection.JavaConverters._
+ def fn = (x: T) => f.apply(x).asScala
+ def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
+ new JavaPairDStream(dstream.flatMap(fn)(cm))(f.keyType(), f.valueType())
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs
+ * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition
+ * of the RDD.
+ */
+ def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = {
+ def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
+ new JavaDStream(dstream.mapPartitions(fn)(f.elementType()))(f.elementType())
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs
+ * of this DStream. Applying mapPartitions() to an RDD applies a function to each partition
+ * of the RDD.
+ */
+ def mapPartitions[K, V](f: PairFlatMapFunction[java.util.Iterator[T], K, V])
+ : JavaPairDStream[K, V] = {
+ def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
+ new JavaPairDStream(dstream.mapPartitions(fn))(f.keyType(), f.valueType())
+ }
+
+ /**
+ * Return a new DStream in which each RDD has a single element generated by reducing each RDD
+ * of this DStream.
+ */
+ def reduce(f: JFunction2[T, T, T]): JavaDStream[T] = dstream.reduce(f)
+
+ /**
+ * Return a new DStream in which each RDD has a single element generated by reducing all
+ * elements in a window over this DStream. windowDuration and slideDuration are as defined in the
+ * window() operation. This is equivalent to window(windowDuration, slideDuration).reduce(reduceFunc)
+ */
+ def reduceByWindow(
+ reduceFunc: JFunction2[T, T, T],
+ invReduceFunc: JFunction2[T, T, T],
+ windowDuration: Duration,
+ slideDuration: Duration
+ ): JavaDStream[T] = {
+ dstream.reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration)
+ }
+
+ /**
+ * Return all the RDDs between 'fromDuration' to 'toDuration' (both included)
+ */
+ def slice(fromDuration: Duration, toDuration: Duration): JList[JavaRDD[T]] = {
+ new util.ArrayList(dstream.slice(fromDuration, toDuration).map(new JavaRDD(_)).toSeq)
+ }
+
+ /**
+ * Apply a function to each RDD in this DStream. This is an output operator, so
+ * this DStream will be registered as an output stream and therefore materialized.
+ */
+ def foreach(foreachFunc: JFunction[JavaRDD[T], Void]) {
+ dstream.foreach(rdd => foreachFunc.call(new JavaRDD(rdd)))
+ }
+
+ /**
+ * Apply a function to each RDD in this DStream. This is an output operator, so
+ * this DStream will be registered as an output stream and therefore materialized.
+ */
+ def foreach(foreachFunc: JFunction2[JavaRDD[T], Time, Void]) {
+ dstream.foreach((rdd, time) => foreachFunc.call(new JavaRDD(rdd), time))
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of this DStream.
+ */
+ def transform[U](transformFunc: JFunction[JavaRDD[T], JavaRDD[U]]): JavaDStream[U] = {
+ implicit val cm: ClassManifest[U] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
+ def scalaTransform (in: RDD[T]): RDD[U] =
+ transformFunc.call(new JavaRDD[T](in)).rdd
+ dstream.transform(scalaTransform(_))
+ }
+
+ /**
+ * Return a new DStream in which each RDD is generated by applying a function
+ * on each RDD of this DStream.
+ */
+ def transform[U](transformFunc: JFunction2[JavaRDD[T], Time, JavaRDD[U]]): JavaDStream[U] = {
+ implicit val cm: ClassManifest[U] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
+ def scalaTransform (in: RDD[T], time: Time): RDD[U] =
+ transformFunc.call(new JavaRDD[T](in), time).rdd
+ dstream.transform(scalaTransform(_, _))
+ }
+
+ /**
+ * Enable periodic checkpointing of RDDs of this DStream
+ * @param interval Time interval after which generated RDD will be checkpointed
+ */
+ def checkpoint(interval: Duration) = {
+ dstream.checkpoint(interval)
+ }
+} \ No newline at end of file
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
new file mode 100644
index 0000000000..ef10c091ca
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
@@ -0,0 +1,638 @@
+package spark.streaming.api.java
+
+import java.util.{List => JList}
+import java.lang.{Long => JLong}
+
+import scala.collection.JavaConversions._
+
+import spark.streaming._
+import spark.streaming.StreamingContext._
+import spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
+import spark.Partitioner
+import org.apache.hadoop.mapred.{JobConf, OutputFormat}
+import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
+import org.apache.hadoop.conf.Configuration
+import spark.api.java.JavaPairRDD
+import spark.storage.StorageLevel
+import com.google.common.base.Optional
+
+class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
+ implicit val kManifiest: ClassManifest[K],
+ implicit val vManifest: ClassManifest[V])
+ extends JavaDStreamLike[(K, V), JavaPairDStream[K, V]] {
+
+ // =======================================================================
+ // Methods common to all DStream's
+ // =======================================================================
+
+ /** Returns a new DStream containing only the elements that satisfy a predicate. */
+ def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairDStream[K, V] =
+ dstream.filter((x => f(x).booleanValue()))
+
+ /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
+ def cache(): JavaPairDStream[K, V] = dstream.cache()
+
+ /** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
+ def persist(): JavaPairDStream[K, V] = dstream.cache()
+
+ /** Persists the RDDs of this DStream with the given storage level */
+ def persist(storageLevel: StorageLevel): JavaPairDStream[K, V] = dstream.persist(storageLevel)
+
+ /** Method that generates a RDD for the given Duration */
+ def compute(validTime: Time): JavaPairRDD[K, V] = {
+ dstream.compute(validTime) match {
+ case Some(rdd) => new JavaPairRDD(rdd)
+ case None => null
+ }
+ }
+
+ /**
+ * Return a new DStream which is computed based on windowed batches of this DStream.
+ * The new DStream generates RDDs with the same interval as this DStream.
+ * @param windowDuration width of the window; must be a multiple of this DStream's interval.
+ * @return
+ */
+ def window(windowDuration: Duration): JavaPairDStream[K, V] =
+ dstream.window(windowDuration)
+
+ /**
+ * Return a new DStream which is computed based on windowed batches of this DStream.
+ * @param windowDuration duration (i.e., width) of the window;
+ * must be a multiple of this DStream's interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's interval
+ */
+ def window(windowDuration: Duration, slideDuration: Duration): JavaPairDStream[K, V] =
+ dstream.window(windowDuration, slideDuration)
+
+ /**
+ * Returns a new DStream which computed based on tumbling window on this DStream.
+ * This is equivalent to window(batchDuration, batchDuration).
+ * @param batchDuration tumbling window duration; must be a multiple of this DStream's interval
+ */
+ def tumble(batchDuration: Duration): JavaPairDStream[K, V] =
+ dstream.tumble(batchDuration)
+
+ /**
+ * Returns a new DStream by unifying data of another DStream with this DStream.
+ * @param that Another DStream having the same interval (i.e., slideDuration) as this DStream.
+ */
+ def union(that: JavaPairDStream[K, V]): JavaPairDStream[K, V] =
+ dstream.union(that.dstream)
+
+ // =======================================================================
+ // Methods only for PairDStream's
+ // =======================================================================
+
+ /**
+ * Create a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
+ * generate the RDDs with Spark's default number of partitions.
+ */
+ def groupByKey(): JavaPairDStream[K, JList[V]] =
+ dstream.groupByKey().mapValues(seqAsJavaList _)
+
+ /**
+ * Create a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
+ * generate the RDDs with `numPartitions` partitions.
+ */
+ def groupByKey(numPartitions: Int): JavaPairDStream[K, JList[V]] =
+ dstream.groupByKey(numPartitions).mapValues(seqAsJavaList _)
+
+ /**
+ * Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream.
+ * Therefore, the values for each key in `this` DStream's RDDs are grouped into a
+ * single sequence to generate the RDDs of the new DStream. [[spark.Partitioner]]
+ * is used to control the partitioning of each RDD.
+ */
+ def groupByKey(partitioner: Partitioner): JavaPairDStream[K, JList[V]] =
+ dstream.groupByKey(partitioner).mapValues(seqAsJavaList _)
+
+ /**
+ * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are
+ * merged using the associative reduce function. Hash partitioning is used to generate the RDDs
+ * with Spark's default number of partitions.
+ */
+ def reduceByKey(func: JFunction2[V, V, V]): JavaPairDStream[K, V] =
+ dstream.reduceByKey(func)
+
+ /**
+ * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are
+ * merged using the supplied reduce function. Hash partitioning is used to generate the RDDs
+ * with `numPartitions` partitions.
+ */
+ def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairDStream[K, V] =
+ dstream.reduceByKey(func, numPartitions)
+
+ /**
+ * Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are
+ * merged using the supplied reduce function. [[spark.Partitioner]] is used to control the
+ * partitioning of each RDD.
+ */
+ def reduceByKey(func: JFunction2[V, V, V], partitioner: Partitioner): JavaPairDStream[K, V] = {
+ dstream.reduceByKey(func, partitioner)
+ }
+
+ /**
+ * Combine elements of each key in DStream's RDDs using custom function. This is similar to the
+ * combineByKey for RDDs. Please refer to combineByKey in [[spark.PairRDDFunctions]] for more
+ * information.
+ */
+ def combineByKey[C](createCombiner: JFunction[V, C],
+ mergeValue: JFunction2[C, V, C],
+ mergeCombiners: JFunction2[C, C, C],
+ partitioner: Partitioner
+ ): JavaPairDStream[K, C] = {
+ implicit val cm: ClassManifest[C] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[C]]
+ dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner)
+ }
+
+ /**
+ * Create a new DStream by counting the number of values of each key in each RDD. Hash
+ * partitioning is used to generate the RDDs with Spark's `numPartitions` partitions.
+ */
+ def countByKey(numPartitions: Int): JavaPairDStream[K, JLong] = {
+ JavaPairDStream.scalaToJavaLong(dstream.countByKey(numPartitions));
+ }
+
+
+ /**
+ * Create a new DStream by counting the number of values of each key in each RDD. Hash
+ * partitioning is used to generate the RDDs with the default number of partitions.
+ */
+ def countByKey(): JavaPairDStream[K, JLong] = {
+ JavaPairDStream.scalaToJavaLong(dstream.countByKey());
+ }
+
+ /**
+ * Creates a new DStream by applying `groupByKey` over a sliding window. This is similar to
+ * `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs
+ * with the same interval as this DStream. Hash partitioning is used to generate the RDDs with
+ * Spark's default number of partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ */
+ def groupByKeyAndWindow(windowDuration: Duration): JavaPairDStream[K, JList[V]] = {
+ dstream.groupByKeyAndWindow(windowDuration).mapValues(seqAsJavaList _)
+ }
+
+ /**
+ * Create a new DStream by applying `groupByKey` over a sliding window. Similar to
+ * `DStream.groupByKey()`, but applies it over a sliding window. Hash partitioning is used to
+ * generate the RDDs with Spark's default number of partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
+ def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration)
+ : JavaPairDStream[K, JList[V]] = {
+ dstream.groupByKeyAndWindow(windowDuration, slideDuration).mapValues(seqAsJavaList _)
+ }
+
+ /**
+ * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
+ * Similar to `DStream.groupByKey()`, but applies it over a sliding window.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param numPartitions Number of partitions of each RDD in the new DStream.
+ */
+ def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int)
+ :JavaPairDStream[K, JList[V]] = {
+ dstream.groupByKeyAndWindow(windowDuration, slideDuration, numPartitions)
+ .mapValues(seqAsJavaList _)
+ }
+
+ /**
+ * Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
+ * Similar to `DStream.groupByKey()`, but applies it over a sliding window.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ */
+ def groupByKeyAndWindow(
+ windowDuration: Duration,
+ slideDuration: Duration,
+ partitioner: Partitioner
+ ):JavaPairDStream[K, JList[V]] = {
+ dstream.groupByKeyAndWindow(windowDuration, slideDuration, partitioner)
+ .mapValues(seqAsJavaList _)
+ }
+
+ /**
+ * Create a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
+ * Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream
+ * generates RDDs with the same interval as this DStream. Hash partitioning is used to generate
+ * the RDDs with Spark's default number of partitions.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ */
+ def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowDuration: Duration)
+ :JavaPairDStream[K, V] = {
+ dstream.reduceByKeyAndWindow(reduceFunc, windowDuration)
+ }
+
+ /**
+ * Create a new DStream by applying `reduceByKey` over a sliding window. This is similar to
+ * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
+ * generate the RDDs with Spark's default number of partitions.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
+ def reduceByKeyAndWindow(
+ reduceFunc: Function2[V, V, V],
+ windowDuration: Duration,
+ slideDuration: Duration
+ ):JavaPairDStream[K, V] = {
+ dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration)
+ }
+
+ /**
+ * Create a new DStream by applying `reduceByKey` over a sliding window. This is similar to
+ * `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
+ * generate the RDDs with `numPartitions` partitions.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param numPartitions Number of partitions of each RDD in the new DStream.
+ */
+ def reduceByKeyAndWindow(
+ reduceFunc: Function2[V, V, V],
+ windowDuration: Duration,
+ slideDuration: Duration,
+ numPartitions: Int
+ ): JavaPairDStream[K, V] = {
+ dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, numPartitions)
+ }
+
+ /**
+ * Create a new DStream by applying `reduceByKey` over a sliding window. Similar to
+ * `DStream.reduceByKey()`, but applies it over a sliding window.
+ * @param reduceFunc associative reduce function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ */
+ def reduceByKeyAndWindow(
+ reduceFunc: Function2[V, V, V],
+ windowDuration: Duration,
+ slideDuration: Duration,
+ partitioner: Partitioner
+ ): JavaPairDStream[K, V] = {
+ dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, partitioner)
+ }
+
+ /**
+ * Create a new DStream by reducing over a using incremental computation.
+ * The reduced value of over a new window is calculated using the old window's reduce value :
+ * 1. reduce the new values that entered the window (e.g., adding new counts)
+ * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
+ * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
+ * However, it is applicable to only "invertible reduce functions".
+ * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * @param reduceFunc associative reduce function
+ * @param invReduceFunc inverse function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
+ def reduceByKeyAndWindow(
+ reduceFunc: Function2[V, V, V],
+ invReduceFunc: Function2[V, V, V],
+ windowDuration: Duration,
+ slideDuration: Duration
+ ): JavaPairDStream[K, V] = {
+ dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration)
+ }
+
+ /**
+ * Create a new DStream by reducing over a using incremental computation.
+ * The reduced value of over a new window is calculated using the old window's reduce value :
+ * 1. reduce the new values that entered the window (e.g., adding new counts)
+ * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
+ * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
+ * However, it is applicable to only "invertible reduce functions".
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ * @param reduceFunc associative reduce function
+ * @param invReduceFunc inverse function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param numPartitions Number of partitions of each RDD in the new DStream.
+ */
+ def reduceByKeyAndWindow(
+ reduceFunc: Function2[V, V, V],
+ invReduceFunc: Function2[V, V, V],
+ windowDuration: Duration,
+ slideDuration: Duration,
+ numPartitions: Int
+ ): JavaPairDStream[K, V] = {
+ dstream.reduceByKeyAndWindow(
+ reduceFunc,
+ invReduceFunc,
+ windowDuration,
+ slideDuration,
+ numPartitions)
+ }
+
+ /**
+ * Create a new DStream by reducing over a using incremental computation.
+ * The reduced value of over a new window is calculated using the old window's reduce value :
+ * 1. reduce the new values that entered the window (e.g., adding new counts)
+ * 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
+ * This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
+ * However, it is applicable to only "invertible reduce functions".
+ * @param reduceFunc associative reduce function
+ * @param invReduceFunc inverse function
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ */
+ def reduceByKeyAndWindow(
+ reduceFunc: Function2[V, V, V],
+ invReduceFunc: Function2[V, V, V],
+ windowDuration: Duration,
+ slideDuration: Duration,
+ partitioner: Partitioner
+ ): JavaPairDStream[K, V] = {
+ dstream.reduceByKeyAndWindow(
+ reduceFunc,
+ invReduceFunc,
+ windowDuration,
+ slideDuration,
+ partitioner)
+ }
+
+ /**
+ * Create a new DStream by counting the number of values for each key over a window.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ */
+ def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration)
+ : JavaPairDStream[K, JLong] = {
+ JavaPairDStream.scalaToJavaLong(dstream.countByKeyAndWindow(windowDuration, slideDuration))
+ }
+
+ /**
+ * Create a new DStream by counting the number of values for each key over a window.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ * @param windowDuration width of the window; must be a multiple of this DStream's
+ * batching interval
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
+ * the new DStream will generate RDDs); must be a multiple of this
+ * DStream's batching interval
+ * @param numPartitions Number of partitions of each RDD in the new DStream.
+ */
+ def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int)
+ : JavaPairDStream[K, Long] = {
+ dstream.countByKeyAndWindow(windowDuration, slideDuration, numPartitions)
+ }
+
+ private def convertUpdateStateFunction[S](in: JFunction2[JList[V], Optional[S], Optional[S]]):
+ (Seq[V], Option[S]) => Option[S] = {
+ val scalaFunc: (Seq[V], Option[S]) => Option[S] = (values, state) => {
+ val list: JList[V] = values
+ val scalaState: Optional[S] = state match {
+ case Some(s) => Optional.of(s)
+ case _ => Optional.absent()
+ }
+ val result: Optional[S] = in.apply(list, scalaState)
+ result.isPresent match {
+ case true => Some(result.get())
+ case _ => None
+ }
+ }
+ scalaFunc
+ }
+
+ /**
+ * Create a new "state" DStream where the state for each key is updated by applying
+ * the given function on the previous state of the key and the new values of each key.
+ * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
+ * @param updateFunc State update function. If `this` function returns None, then
+ * corresponding state key-value pair will be eliminated.
+ * @tparam S State type
+ */
+ def updateStateByKey[S](updateFunc: JFunction2[JList[V], Optional[S], Optional[S]])
+ : JavaPairDStream[K, S] = {
+ implicit val cm: ClassManifest[S] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[S]]
+ dstream.updateStateByKey(convertUpdateStateFunction(updateFunc))
+ }
+
+ /**
+ * Create a new "state" DStream where the state for each key is updated by applying
+ * the given function on the previous state of the key and the new values of each key.
+ * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
+ * @param updateFunc State update function. If `this` function returns None, then
+ * corresponding state key-value pair will be eliminated.
+ * @param numPartitions Number of partitions of each RDD in the new DStream.
+ * @tparam S State type
+ */
+ def updateStateByKey[S: ClassManifest](
+ updateFunc: JFunction2[JList[V], Optional[S], Optional[S]],
+ numPartitions: Int)
+ : JavaPairDStream[K, S] = {
+ dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), numPartitions)
+ }
+
+ /**
+ * Create a new "state" DStream where the state for each key is updated by applying
+ * the given function on the previous state of the key and the new values of the key.
+ * [[spark.Partitioner]] is used to control the partitioning of each RDD.
+ * @param updateFunc State update function. If `this` function returns None, then
+ * corresponding state key-value pair will be eliminated.
+ * @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
+ * @tparam S State type
+ */
+ def updateStateByKey[S: ClassManifest](
+ updateFunc: JFunction2[JList[V], Optional[S], Optional[S]],
+ partitioner: Partitioner
+ ): JavaPairDStream[K, S] = {
+ dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), partitioner)
+ }
+
+ def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = {
+ implicit val cm: ClassManifest[U] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
+ dstream.mapValues(f)
+ }
+
+ def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairDStream[K, U] = {
+ import scala.collection.JavaConverters._
+ def fn = (x: V) => f.apply(x).asScala
+ implicit val cm: ClassManifest[U] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
+ dstream.flatMapValues(fn)
+ }
+
+ /**
+ * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this`
+ * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
+ * key in both RDDs. HashPartitioner is used to partition each generated RDD into default number
+ * of partitions.
+ */
+ def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = {
+ implicit val cm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ dstream.cogroup(other.dstream).mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
+ }
+
+ /**
+ * Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this`
+ * or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
+ * key in both RDDs. Partitioner is used to partition each generated RDD.
+ */
+ def cogroup[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
+ : JavaPairDStream[K, (JList[V], JList[W])] = {
+ implicit val cm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ dstream.cogroup(other.dstream, partitioner)
+ .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
+ }
+
+ /**
+ * Join `this` DStream with `other` DStream. HashPartitioner is used
+ * to partition each generated RDD into default number of partitions.
+ */
+ def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = {
+ implicit val cm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ dstream.join(other.dstream)
+ }
+
+ /**
+ * Join `this` DStream with `other` DStream, that is, each RDD of the new DStream will
+ * be generated by joining RDDs from `this` and other DStream. Uses the given
+ * Partitioner to partition each generated RDD.
+ */
+ def join[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
+ : JavaPairDStream[K, (V, W)] = {
+ implicit val cm: ClassManifest[W] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
+ dstream.join(other.dstream, partitioner)
+ }
+
+ /**
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
+ * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
+ */
+ def saveAsHadoopFiles[F <: OutputFormat[K, V]](prefix: String, suffix: String) {
+ dstream.saveAsHadoopFiles(prefix, suffix)
+ }
+
+ /**
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
+ * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
+ */
+ def saveAsHadoopFiles(
+ prefix: String,
+ suffix: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[_ <: OutputFormat[_, _]]) {
+ dstream.saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass)
+ }
+
+ /**
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
+ * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
+ */
+ def saveAsHadoopFiles(
+ prefix: String,
+ suffix: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[_ <: OutputFormat[_, _]],
+ conf: JobConf) {
+ dstream.saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf)
+ }
+
+ /**
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
+ * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
+ */
+ def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](prefix: String, suffix: String) {
+ dstream.saveAsNewAPIHadoopFiles(prefix, suffix)
+ }
+
+ /**
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
+ * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
+ */
+ def saveAsNewAPIHadoopFiles(
+ prefix: String,
+ suffix: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[_ <: NewOutputFormat[_, _]]) {
+ dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass)
+ }
+
+ /**
+ * Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
+ * generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
+ */
+ def saveAsNewAPIHadoopFiles(
+ prefix: String,
+ suffix: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
+ conf: Configuration = new Configuration) {
+ dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf)
+ }
+
+ override val classManifest: ClassManifest[(K, V)] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
+}
+
+object JavaPairDStream {
+ implicit def fromPairDStream[K: ClassManifest, V: ClassManifest](dstream: DStream[(K, V)])
+ :JavaPairDStream[K, V] =
+ new JavaPairDStream[K, V](dstream)
+
+ def fromJavaDStream[K, V](dstream: JavaDStream[(K, V)]): JavaPairDStream[K, V] = {
+ implicit val cmk: ClassManifest[K] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
+ implicit val cmv: ClassManifest[V] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+ new JavaPairDStream[K, V](dstream.dstream)
+ }
+
+ def scalaToJavaLong[K: ClassManifest](dstream: JavaPairDStream[K, Long])
+ : JavaPairDStream[K, JLong] = {
+ StreamingContext.toPairDStreamFunctions(dstream.dstream).mapValues(new JLong(_))
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
new file mode 100644
index 0000000000..accac82e09
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -0,0 +1,288 @@
+package spark.streaming.api.java
+
+import scala.collection.JavaConversions._
+import java.lang.{Long => JLong, Integer => JInt}
+
+import spark.streaming._
+import dstream._
+import spark.storage.StorageLevel
+import spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
+import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
+import java.io.InputStream
+import java.util.{Map => JMap}
+
+/**
+ * A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
+ * information (such as, cluster URL and job name) to internally create a SparkContext, it provides
+ * methods used to create DStream from various input sources.
+ */
+class JavaStreamingContext(val ssc: StreamingContext) {
+
+ // TODOs:
+ // - Test to/from Hadoop functions
+ // - Support creating and registering InputStreams
+
+
+ /**
+ * Creates a StreamingContext.
+ * @param master Name of the Spark Master
+ * @param frameworkName Name to be used when registering with the scheduler
+ * @param batchDuration The time interval at which streaming data will be divided into batches
+ */
+ def this(master: String, frameworkName: String, batchDuration: Duration) =
+ this(new StreamingContext(master, frameworkName, batchDuration))
+
+ /**
+ * Re-creates a StreamingContext from a checkpoint file.
+ * @param path Path either to the directory that was specified as the checkpoint directory, or
+ * to the checkpoint file 'graph' or 'graph.bk'.
+ */
+ def this(path: String) = this (new StreamingContext(path))
+
+ /**
+ * Create an input stream that pulls messages form a Kafka Broker.
+ * @param hostname Zookeper hostname.
+ * @param port Zookeper port.
+ * @param groupId The group id for this consumer.
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ */
+ def kafkaStream[T](
+ hostname: String,
+ port: Int,
+ groupId: String,
+ topics: JMap[String, JInt])
+ : JavaDStream[T] = {
+ implicit val cmt: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ ssc.kafkaStream[T](hostname, port, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
+ }
+
+ /**
+ * Create an input stream that pulls messages form a Kafka Broker.
+ * @param hostname Zookeper hostname.
+ * @param port Zookeper port.
+ * @param groupId The group id for this consumer.
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ * @param initialOffsets Optional initial offsets for each of the partitions to consume.
+ * By default the value is pulled from zookeper.
+ */
+ def kafkaStream[T](
+ hostname: String,
+ port: Int,
+ groupId: String,
+ topics: JMap[String, JInt],
+ initialOffsets: JMap[KafkaPartitionKey, JLong])
+ : JavaDStream[T] = {
+ implicit val cmt: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ ssc.kafkaStream[T](
+ hostname,
+ port,
+ groupId,
+ Map(topics.mapValues(_.intValue()).toSeq: _*),
+ Map(initialOffsets.mapValues(_.longValue()).toSeq: _*))
+ }
+
+ /**
+ * Create an input stream that pulls messages form a Kafka Broker.
+ * @param hostname Zookeper hostname.
+ * @param port Zookeper port.
+ * @param groupId The group id for this consumer.
+ * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
+ * in its own thread.
+ * @param initialOffsets Optional initial offsets for each of the partitions to consume.
+ * By default the value is pulled from zookeper.
+ * @param storageLevel RDD storage level. Defaults to memory-only
+ */
+ def kafkaStream[T](
+ hostname: String,
+ port: Int,
+ groupId: String,
+ topics: JMap[String, JInt],
+ initialOffsets: JMap[KafkaPartitionKey, JLong],
+ storageLevel: StorageLevel)
+ : JavaDStream[T] = {
+ implicit val cmt: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ ssc.kafkaStream[T](
+ hostname,
+ port,
+ groupId,
+ Map(topics.mapValues(_.intValue()).toSeq: _*),
+ Map(initialOffsets.mapValues(_.longValue()).toSeq: _*),
+ storageLevel)
+ }
+
+ /**
+ * Create a input stream from network source hostname:port. Data is received using
+ * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
+ * lines.
+ * @param hostname Hostname to connect to for receiving data
+ * @param port Port to connect to for receiving data
+ * @param storageLevel Storage level to use for storing the received objects
+ * (default: StorageLevel.MEMORY_AND_DISK_SER_2)
+ */
+ def networkTextStream(hostname: String, port: Int, storageLevel: StorageLevel)
+ : JavaDStream[String] = {
+ ssc.networkTextStream(hostname, port, storageLevel)
+ }
+
+ /**
+ * Create a input stream from network source hostname:port. Data is received using
+ * a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
+ * lines.
+ * @param hostname Hostname to connect to for receiving data
+ * @param port Port to connect to for receiving data
+ */
+ def networkTextStream(hostname: String, port: Int): JavaDStream[String] = {
+ ssc.networkTextStream(hostname, port)
+ }
+
+ /**
+ * Create a input stream from network source hostname:port. Data is received using
+ * a TCP socket and the receive bytes it interepreted as object using the given
+ * converter.
+ * @param hostname Hostname to connect to for receiving data
+ * @param port Port to connect to for receiving data
+ * @param converter Function to convert the byte stream to objects
+ * @param storageLevel Storage level to use for storing the received objects
+ * @tparam T Type of the objects received (after converting bytes to objects)
+ */
+ def networkStream[T](
+ hostname: String,
+ port: Int,
+ converter: JFunction[InputStream, java.lang.Iterable[T]],
+ storageLevel: StorageLevel)
+ : JavaDStream[T] = {
+ def fn = (x: InputStream) => converter.apply(x).toIterator
+ implicit val cmt: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ ssc.networkStream(hostname, port, fn, storageLevel)
+ }
+
+ /**
+ * Creates a input stream that monitors a Hadoop-compatible filesystem
+ * for new files and reads them as text files (using key as LongWritable, value
+ * as Text and input format as TextInputFormat). File names starting with . are ignored.
+ * @param directory HDFS directory to monitor for new file
+ */
+ def textFileStream(directory: String): JavaDStream[String] = {
+ ssc.textFileStream(directory)
+ }
+
+ /**
+ * Create a input stream from network source hostname:port, where data is received
+ * as serialized blocks (serialized using the Spark's serializer) that can be directly
+ * pushed into the block manager without deserializing them. This is the most efficient
+ * way to receive data.
+ * @param hostname Hostname to connect to for receiving data
+ * @param port Port to connect to for receiving data
+ * @param storageLevel Storage level to use for storing the received objects
+ * @tparam T Type of the objects in the received blocks
+ */
+ def rawNetworkStream[T](
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel): JavaDStream[T] = {
+ implicit val cmt: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ JavaDStream.fromDStream(ssc.rawNetworkStream(hostname, port, storageLevel))
+ }
+
+ /**
+ * Create a input stream from network source hostname:port, where data is received
+ * as serialized blocks (serialized using the Spark's serializer) that can be directly
+ * pushed into the block manager without deserializing them. This is the most efficient
+ * way to receive data.
+ * @param hostname Hostname to connect to for receiving data
+ * @param port Port to connect to for receiving data
+ * @tparam T Type of the objects in the received blocks
+ */
+ def rawNetworkStream[T](hostname: String, port: Int): JavaDStream[T] = {
+ implicit val cmt: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ JavaDStream.fromDStream(ssc.rawNetworkStream(hostname, port))
+ }
+
+ /**
+ * Creates a input stream that monitors a Hadoop-compatible filesystem
+ * for new files and reads them using the given key-value types and input format.
+ * File names starting with . are ignored.
+ * @param directory HDFS directory to monitor for new file
+ * @tparam K Key type for reading HDFS file
+ * @tparam V Value type for reading HDFS file
+ * @tparam F Input format for reading HDFS file
+ */
+ def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): JavaPairDStream[K, V] = {
+ implicit val cmk: ClassManifest[K] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
+ implicit val cmv: ClassManifest[V] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+ implicit val cmf: ClassManifest[F] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[F]]
+ ssc.fileStream[K, V, F](directory);
+ }
+
+ /**
+ * Creates a input stream from a Flume source.
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ * @param storageLevel Storage level to use for storing the received objects
+ */
+ def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel):
+ JavaDStream[SparkFlumeEvent] = {
+ ssc.flumeStream(hostname, port, storageLevel)
+ }
+
+
+ /**
+ * Creates a input stream from a Flume source.
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ */
+ def flumeStream(hostname: String, port: Int):
+ JavaDStream[SparkFlumeEvent] = {
+ ssc.flumeStream(hostname, port)
+ }
+
+ /**
+ * Registers an output stream that will be computed every interval
+ */
+ def registerOutputStream(outputStream: JavaDStreamLike[_, _]) {
+ ssc.registerOutputStream(outputStream.dstream)
+ }
+
+ /**
+ * Sets the context to periodically checkpoint the DStream operations for master
+ * fault-tolerance. By default, the graph will be checkpointed every batch interval.
+ * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored
+ * @param interval checkpoint interval
+ */
+ def checkpoint(directory: String, interval: Duration = null) {
+ ssc.checkpoint(directory, interval)
+ }
+
+ /**
+ * Sets each DStreams in this context to remember RDDs it generated in the last given duration.
+ * DStreams remember RDDs only for a limited duration of duration and releases them for garbage
+ * collection. This method allows the developer to specify how to long to remember the RDDs (
+ * if the developer wishes to query old data outside the DStream computation).
+ * @param duration Minimum duration that each DStream should remember its RDDs
+ */
+ def remember(duration: Duration) {
+ ssc.remember(duration)
+ }
+
+ /**
+ * Starts the execution of the streams.
+ */
+ def start() = ssc.start()
+
+ /**
+ * Sstops the execution of the streams.
+ */
+ def stop() = ssc.stop()
+
+}
diff --git a/streaming/src/test/scala/JavaTestUtils.scala b/streaming/src/test/scala/JavaTestUtils.scala
new file mode 100644
index 0000000000..56349837e5
--- /dev/null
+++ b/streaming/src/test/scala/JavaTestUtils.scala
@@ -0,0 +1,65 @@
+package spark.streaming
+
+import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+import java.util.{List => JList}
+import spark.streaming.api.java.{JavaPairDStream, JavaDStreamLike, JavaDStream, JavaStreamingContext}
+import spark.streaming._
+import java.util.ArrayList
+import collection.JavaConversions._
+
+/** Exposes streaming test functionality in a Java-friendly way. */
+trait JavaTestBase extends TestSuiteBase {
+
+ /**
+ * Create a [[spark.streaming.TestInputStream]] and attach it to the supplied context.
+ * The stream will be derived from the supplied lists of Java objects.
+ **/
+ def attachTestInputStream[T](
+ ssc: JavaStreamingContext,
+ data: JList[JList[T]],
+ numPartitions: Int) = {
+ val seqData = data.map(Seq(_:_*))
+
+ implicit val cm: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ val dstream = new TestInputStream[T](ssc.ssc, seqData, numPartitions)
+ ssc.ssc.registerInputStream(dstream)
+ new JavaDStream[T](dstream)
+ }
+
+ /**
+ * Attach a provided stream to it's associated StreamingContext as a
+ * [[spark.streaming.TestOutputStream]].
+ **/
+ def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T,This]](
+ dstream: JavaDStreamLike[T, This]) = {
+ implicit val cm: ClassManifest[T] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ val ostream = new TestOutputStream(dstream.dstream,
+ new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]])
+ dstream.dstream.ssc.registerOutputStream(ostream)
+ }
+
+ /**
+ * Process all registered streams for a numBatches batches, failing if
+ * numExpectedOutput RDD's are not generated. Generated RDD's are collected
+ * and returned, represented as a list for each batch interval.
+ */
+ def runStreams[V](
+ ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = {
+ implicit val cm: ClassManifest[V] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+ val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput)
+ val out = new ArrayList[JList[V]]()
+ res.map(entry => out.append(new ArrayList[V](entry)))
+ out
+ }
+}
+
+object JavaTestUtils extends JavaTestBase {
+
+}
+
+object JavaCheckpointTestUtils extends JavaTestBase {
+ override def actuallyWait = true
+} \ No newline at end of file
diff --git a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
new file mode 100644
index 0000000000..374793b57e
--- /dev/null
+++ b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
@@ -0,0 +1,1003 @@
+package spark.streaming;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.io.Files;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import scala.Tuple2;
+import spark.HashPartitioner;
+import spark.api.java.JavaRDD;
+import spark.api.java.function.*;
+import spark.storage.StorageLevel;
+import spark.streaming.api.java.JavaDStream;
+import spark.streaming.api.java.JavaPairDStream;
+import spark.streaming.api.java.JavaStreamingContext;
+import spark.streaming.JavaTestUtils;
+import spark.streaming.JavaCheckpointTestUtils;
+import spark.streaming.dstream.KafkaPartitionKey;
+
+import java.io.*;
+import java.util.*;
+
+// The test suite itself is Serializable so that anonymous Function implementations can be
+// serialized, as an alternative to converting these anonymous classes to static inner classes;
+// see http://stackoverflow.com/questions/758570/.
+public class JavaAPISuite implements Serializable {
+ private transient JavaStreamingContext sc;
+
+ @Before
+ public void setUp() {
+ sc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+ }
+
+ @After
+ public void tearDown() {
+ sc.stop();
+ sc = null;
+ // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
+ System.clearProperty("spark.master.port");
+ }
+
+ @Test
+ public void testCount() {
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1,2,3,4),
+ Arrays.asList(3,4,5),
+ Arrays.asList(3));
+
+ List<List<Long>> expected = Arrays.asList(
+ Arrays.asList(4L),
+ Arrays.asList(3L),
+ Arrays.asList(1L));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream count = stream.count();
+ JavaTestUtils.attachTestOutputStream(count);
+ List<List<Long>> result = JavaTestUtils.runStreams(sc, 3, 3);
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @Test
+ public void testMap() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("hello", "world"),
+ Arrays.asList("goodnight", "moon"));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(5,5),
+ Arrays.asList(9,4));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream letterCount = stream.map(new Function<String, Integer>() {
+ @Override
+ public Integer call(String s) throws Exception {
+ return s.length();
+ }
+ });
+ JavaTestUtils.attachTestOutputStream(letterCount);
+ List<List<Integer>> result = JavaTestUtils.runStreams(sc, 2, 2);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @Test
+ public void testWindow() {
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1,2,3),
+ Arrays.asList(4,5,6),
+ Arrays.asList(7,8,9));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(1,2,3),
+ Arrays.asList(4,5,6,1,2,3),
+ Arrays.asList(7,8,9,4,5,6),
+ Arrays.asList(7,8,9));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream windowed = stream.window(new Duration(2000));
+ JavaTestUtils.attachTestOutputStream(windowed);
+ List<List<Integer>> result = JavaTestUtils.runStreams(sc, 4, 4);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @Test
+ public void testWindowWithSlideDuration() {
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1,2,3),
+ Arrays.asList(4,5,6),
+ Arrays.asList(7,8,9),
+ Arrays.asList(10,11,12),
+ Arrays.asList(13,14,15),
+ Arrays.asList(16,17,18));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(1,2,3,4,5,6),
+ Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12),
+ Arrays.asList(7,8,9,10,11,12,13,14,15,16,17,18),
+ Arrays.asList(13,14,15,16,17,18));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream windowed = stream.window(new Duration(4000), new Duration(2000));
+ JavaTestUtils.attachTestOutputStream(windowed);
+ List<List<Integer>> result = JavaTestUtils.runStreams(sc, 8, 4);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @Test
+ public void testTumble() {
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1,2,3),
+ Arrays.asList(4,5,6),
+ Arrays.asList(7,8,9),
+ Arrays.asList(10,11,12),
+ Arrays.asList(13,14,15),
+ Arrays.asList(16,17,18));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(1,2,3,4,5,6),
+ Arrays.asList(7,8,9,10,11,12),
+ Arrays.asList(13,14,15,16,17,18));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream windowed = stream.tumble(new Duration(2000));
+ JavaTestUtils.attachTestOutputStream(windowed);
+ List<List<Integer>> result = JavaTestUtils.runStreams(sc, 6, 3);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @Test
+ public void testFilter() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("giants", "dodgers"),
+ Arrays.asList("yankees", "red socks"));
+
+ List<List<String>> expected = Arrays.asList(
+ Arrays.asList("giants"),
+ Arrays.asList("yankees"));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream filtered = stream.filter(new Function<String, Boolean>() {
+ @Override
+ public Boolean call(String s) throws Exception {
+ return s.contains("a");
+ }
+ });
+ JavaTestUtils.attachTestOutputStream(filtered);
+ List<List<String>> result = JavaTestUtils.runStreams(sc, 2, 2);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @Test
+ public void testGlom() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("giants", "dodgers"),
+ Arrays.asList("yankees", "red socks"));
+
+ List<List<List<String>>> expected = Arrays.asList(
+ Arrays.asList(Arrays.asList("giants", "dodgers")),
+ Arrays.asList(Arrays.asList("yankees", "red socks")));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream glommed = stream.glom();
+ JavaTestUtils.attachTestOutputStream(glommed);
+ List<List<List<String>>> result = JavaTestUtils.runStreams(sc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testMapPartitions() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("giants", "dodgers"),
+ Arrays.asList("yankees", "red socks"));
+
+ List<List<String>> expected = Arrays.asList(
+ Arrays.asList("GIANTSDODGERS"),
+ Arrays.asList("YANKEESRED SOCKS"));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream mapped = stream.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
+ @Override
+ public Iterable<String> call(Iterator<String> in) {
+ String out = "";
+ while (in.hasNext()) {
+ out = out + in.next().toUpperCase();
+ }
+ return Lists.newArrayList(out);
+ }
+ });
+ JavaTestUtils.attachTestOutputStream(mapped);
+ List<List<List<String>>> result = JavaTestUtils.runStreams(sc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ private class IntegerSum extends Function2<Integer, Integer, Integer> {
+ @Override
+ public Integer call(Integer i1, Integer i2) throws Exception {
+ return i1 + i2;
+ }
+ }
+
+ private class IntegerDifference extends Function2<Integer, Integer, Integer> {
+ @Override
+ public Integer call(Integer i1, Integer i2) throws Exception {
+ return i1 - i2;
+ }
+ }
+
+ @Test
+ public void testReduce() {
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1,2,3),
+ Arrays.asList(4,5,6),
+ Arrays.asList(7,8,9));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(6),
+ Arrays.asList(15),
+ Arrays.asList(24));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream reduced = stream.reduce(new IntegerSum());
+ JavaTestUtils.attachTestOutputStream(reduced);
+ List<List<Integer>> result = JavaTestUtils.runStreams(sc, 3, 3);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testReduceByWindow() {
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1,2,3),
+ Arrays.asList(4,5,6),
+ Arrays.asList(7,8,9));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(6),
+ Arrays.asList(21),
+ Arrays.asList(39),
+ Arrays.asList(24));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream reducedWindowed = stream.reduceByWindow(new IntegerSum(),
+ new IntegerDifference(), new Duration(2000), new Duration(1000));
+ JavaTestUtils.attachTestOutputStream(reducedWindowed);
+ List<List<Integer>> result = JavaTestUtils.runStreams(sc, 4, 4);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testTransform() {
+ List<List<Integer>> inputData = Arrays.asList(
+ Arrays.asList(1,2,3),
+ Arrays.asList(4,5,6),
+ Arrays.asList(7,8,9));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(3,4,5),
+ Arrays.asList(6,7,8),
+ Arrays.asList(9,10,11));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream transformed = stream.transform(new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
+ @Override
+ public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception {
+ return in.map(new Function<Integer, Integer>() {
+ @Override
+ public Integer call(Integer i) throws Exception {
+ return i + 2;
+ }
+ });
+ }});
+ JavaTestUtils.attachTestOutputStream(transformed);
+ List<List<Integer>> result = JavaTestUtils.runStreams(sc, 3, 3);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @Test
+ public void testFlatMap() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("go", "giants"),
+ Arrays.asList("boo", "dodgers"),
+ Arrays.asList("athletics"));
+
+ List<List<String>> expected = Arrays.asList(
+ Arrays.asList("g","o","g","i","a","n","t","s"),
+ Arrays.asList("b", "o", "o", "d","o","d","g","e","r","s"),
+ Arrays.asList("a","t","h","l","e","t","i","c","s"));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream flatMapped = stream.flatMap(new FlatMapFunction<String, String>() {
+ @Override
+ public Iterable<String> call(String x) {
+ return Lists.newArrayList(x.split("(?!^)"));
+ }
+ });
+ JavaTestUtils.attachTestOutputStream(flatMapped);
+ List<List<String>> result = JavaTestUtils.runStreams(sc, 3, 3);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ @Test
+ public void testPairFlatMap() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("giants"),
+ Arrays.asList("dodgers"),
+ Arrays.asList("athletics"));
+
+ List<List<Tuple2<Integer, String>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<Integer, String>(6, "g"),
+ new Tuple2<Integer, String>(6, "i"),
+ new Tuple2<Integer, String>(6, "a"),
+ new Tuple2<Integer, String>(6, "n"),
+ new Tuple2<Integer, String>(6, "t"),
+ new Tuple2<Integer, String>(6, "s")),
+ Arrays.asList(
+ new Tuple2<Integer, String>(7, "d"),
+ new Tuple2<Integer, String>(7, "o"),
+ new Tuple2<Integer, String>(7, "d"),
+ new Tuple2<Integer, String>(7, "g"),
+ new Tuple2<Integer, String>(7, "e"),
+ new Tuple2<Integer, String>(7, "r"),
+ new Tuple2<Integer, String>(7, "s")),
+ Arrays.asList(
+ new Tuple2<Integer, String>(9, "a"),
+ new Tuple2<Integer, String>(9, "t"),
+ new Tuple2<Integer, String>(9, "h"),
+ new Tuple2<Integer, String>(9, "l"),
+ new Tuple2<Integer, String>(9, "e"),
+ new Tuple2<Integer, String>(9, "t"),
+ new Tuple2<Integer, String>(9, "i"),
+ new Tuple2<Integer, String>(9, "c"),
+ new Tuple2<Integer, String>(9, "s")));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaPairDStream flatMapped = stream.flatMap(new PairFlatMapFunction<String, Integer, String>() {
+ @Override
+ public Iterable<Tuple2<Integer, String>> call(String in) throws Exception {
+ List<Tuple2<Integer, String>> out = Lists.newArrayList();
+ for (String letter: in.split("(?!^)")) {
+ out.add(new Tuple2<Integer, String>(in.length(), letter));
+ }
+ return out;
+ }
+ });
+ JavaTestUtils.attachTestOutputStream(flatMapped);
+ List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(sc, 3, 3);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testUnion() {
+ List<List<Integer>> inputData1 = Arrays.asList(
+ Arrays.asList(1,1),
+ Arrays.asList(2,2),
+ Arrays.asList(3,3));
+
+ List<List<Integer>> inputData2 = Arrays.asList(
+ Arrays.asList(4,4),
+ Arrays.asList(5,5),
+ Arrays.asList(6,6));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(1,1,4,4),
+ Arrays.asList(2,2,5,5),
+ Arrays.asList(3,3,6,6));
+
+ JavaDStream stream1 = JavaTestUtils.attachTestInputStream(sc, inputData1, 2);
+ JavaDStream stream2 = JavaTestUtils.attachTestInputStream(sc, inputData2, 2);
+
+ JavaDStream unioned = stream1.union(stream2);
+ JavaTestUtils.attachTestOutputStream(unioned);
+ List<List<Integer>> result = JavaTestUtils.runStreams(sc, 3, 3);
+
+ assertOrderInvariantEquals(expected, result);
+ }
+
+ /*
+ * Performs an order-invariant comparison of lists representing two RDD streams. This allows
+ * us to account for ordering variation within individual RDD's which occurs during windowing.
+ */
+ public static <T extends Comparable> void assertOrderInvariantEquals(
+ List<List<T>> expected, List<List<T>> actual) {
+ for (List<T> list: expected) {
+ Collections.sort(list);
+ }
+ for (List<T> list: actual) {
+ Collections.sort(list);
+ }
+ Assert.assertEquals(expected, actual);
+ }
+
+
+ // PairDStream Functions
+ @Test
+ public void testPairFilter() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("giants", "dodgers"),
+ Arrays.asList("yankees", "red socks"));
+
+ List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<String, Integer>("giants", 6)),
+ Arrays.asList(new Tuple2<String, Integer>("yankees", 7)));
+
+ JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = stream.map(
+ new PairFunction<String, String, Integer>() {
+ @Override
+ public Tuple2 call(String in) throws Exception {
+ return new Tuple2<String, Integer>(in, in.length());
+ }
+ });
+
+ JavaPairDStream<String, Integer> filtered = pairStream.filter(
+ new Function<Tuple2<String, Integer>, Boolean>() {
+ @Override
+ public Boolean call(Tuple2<String, Integer> in) throws Exception {
+ return in._1().contains("a");
+ }
+ });
+ JavaTestUtils.attachTestOutputStream(filtered);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ List<List<Tuple2<String, String>>> stringStringKVStream = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
+ new Tuple2<String, String>("california", "giants"),
+ new Tuple2<String, String>("new york", "yankees"),
+ new Tuple2<String, String>("new york", "mets")),
+ Arrays.asList(new Tuple2<String, String>("california", "sharks"),
+ new Tuple2<String, String>("california", "ducks"),
+ new Tuple2<String, String>("new york", "rangers"),
+ new Tuple2<String, String>("new york", "islanders")));
+
+ List<List<Tuple2<String, Integer>>> stringIntKVStream = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, Integer>("california", 1),
+ new Tuple2<String, Integer>("california", 3),
+ new Tuple2<String, Integer>("new york", 4),
+ new Tuple2<String, Integer>("new york", 1)),
+ Arrays.asList(
+ new Tuple2<String, Integer>("california", 5),
+ new Tuple2<String, Integer>("california", 5),
+ new Tuple2<String, Integer>("new york", 3),
+ new Tuple2<String, Integer>("new york", 1)));
+
+ @Test
+ public void testPairGroupByKey() {
+ List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+
+ List<List<Tuple2<String, List<String>>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, List<String>>("california", Arrays.asList("dodgers", "giants")),
+ new Tuple2<String, List<String>>("new york", Arrays.asList("yankees", "mets"))),
+ Arrays.asList(
+ new Tuple2<String, List<String>>("california", Arrays.asList("sharks", "ducks")),
+ new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders"))));
+
+ JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, List<String>> grouped = pairStream.groupByKey();
+ JavaTestUtils.attachTestOutputStream(grouped);
+ List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(sc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testPairReduceByKey() {
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, Integer>("california", 4),
+ new Tuple2<String, Integer>("new york", 5)),
+ Arrays.asList(
+ new Tuple2<String, Integer>("california", 10),
+ new Tuple2<String, Integer>("new york", 4)));
+
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
+ sc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, Integer> reduced = pairStream.reduceByKey(new IntegerSum());
+
+ JavaTestUtils.attachTestOutputStream(reduced);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testCombineByKey() {
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, Integer>("california", 4),
+ new Tuple2<String, Integer>("new york", 5)),
+ Arrays.asList(
+ new Tuple2<String, Integer>("california", 10),
+ new Tuple2<String, Integer>("new york", 4)));
+
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
+ sc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey(
+ new Function<Integer, Integer>() {
+ @Override
+ public Integer call(Integer i) throws Exception {
+ return i;
+ }
+ }, new IntegerSum(), new IntegerSum(), new HashPartitioner(2));
+
+ JavaTestUtils.attachTestOutputStream(combined);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testCountByKey() {
+ List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+
+ List<List<Tuple2<String, Long>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, Long>("california", 2L),
+ new Tuple2<String, Long>("new york", 2L)),
+ Arrays.asList(
+ new Tuple2<String, Long>("california", 2L),
+ new Tuple2<String, Long>("new york", 2L)));
+
+ JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
+ sc, inputData, 1);
+ JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, Long> counted = pairStream.countByKey();
+ JavaTestUtils.attachTestOutputStream(counted);
+ List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(sc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testGroupByKeyAndWindow() {
+ List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+
+ List<List<Tuple2<String, List<String>>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("dodgers", "giants")),
+ new Tuple2<String, List<String>>("new york", Arrays.asList("yankees", "mets"))),
+ Arrays.asList(new Tuple2<String, List<String>>("california",
+ Arrays.asList("sharks", "ducks", "dodgers", "giants")),
+ new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders", "yankees", "mets"))),
+ Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("sharks", "ducks")),
+ new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders"))));
+
+ JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, List<String>> groupWindowed =
+ pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000));
+ JavaTestUtils.attachTestOutputStream(groupWindowed);
+ List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(sc, 3, 3);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testReduceByKeyAndWindow() {
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<String, Integer>("california", 4),
+ new Tuple2<String, Integer>("new york", 5)),
+ Arrays.asList(new Tuple2<String, Integer>("california", 14),
+ new Tuple2<String, Integer>("new york", 9)),
+ Arrays.asList(new Tuple2<String, Integer>("california", 10),
+ new Tuple2<String, Integer>("new york", 4)));
+
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, Integer> reduceWindowed =
+ pairStream.reduceByKeyAndWindow(new IntegerSum(), new Duration(2000), new Duration(1000));
+ JavaTestUtils.attachTestOutputStream(reduceWindowed);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 3, 3);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testUpdateStateByKey() {
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<String, Integer>("california", 4),
+ new Tuple2<String, Integer>("new york", 5)),
+ Arrays.asList(new Tuple2<String, Integer>("california", 14),
+ new Tuple2<String, Integer>("new york", 9)),
+ Arrays.asList(new Tuple2<String, Integer>("california", 14),
+ new Tuple2<String, Integer>("new york", 9)));
+
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey(
+ new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>(){
+ @Override
+ public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
+ int out = 0;
+ if (state.isPresent()) {
+ out = out + state.get();
+ }
+ for (Integer v: values) {
+ out = out + v;
+ }
+ return Optional.of(out);
+ }
+ });
+ JavaTestUtils.attachTestOutputStream(updated);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 3, 3);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testReduceByKeyAndWindowWithInverse() {
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
+
+ List<List<Tuple2<String, Integer>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<String, Integer>("california", 4),
+ new Tuple2<String, Integer>("new york", 5)),
+ Arrays.asList(new Tuple2<String, Integer>("california", 14),
+ new Tuple2<String, Integer>("new york", 9)),
+ Arrays.asList(new Tuple2<String, Integer>("california", 10),
+ new Tuple2<String, Integer>("new york", 4)));
+
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, Integer> reduceWindowed =
+ pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(), new Duration(2000), new Duration(1000));
+ JavaTestUtils.attachTestOutputStream(reduceWindowed);
+ List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 3, 3);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testCountByKeyAndWindow() {
+ List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+
+ List<List<Tuple2<String, Long>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, Long>("california", 2L),
+ new Tuple2<String, Long>("new york", 2L)),
+ Arrays.asList(
+ new Tuple2<String, Long>("california", 4L),
+ new Tuple2<String, Long>("new york", 4L)),
+ Arrays.asList(
+ new Tuple2<String, Long>("california", 2L),
+ new Tuple2<String, Long>("new york", 2L)));
+
+ JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
+ sc, inputData, 1);
+ JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, Long> counted =
+ pairStream.countByKeyAndWindow(new Duration(2000), new Duration(1000));
+ JavaTestUtils.attachTestOutputStream(counted);
+ List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(sc, 3, 3);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testMapValues() {
+ List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+
+ List<List<Tuple2<String, String>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "DODGERS"),
+ new Tuple2<String, String>("california", "GIANTS"),
+ new Tuple2<String, String>("new york", "YANKEES"),
+ new Tuple2<String, String>("new york", "METS")),
+ Arrays.asList(new Tuple2<String, String>("california", "SHARKS"),
+ new Tuple2<String, String>("california", "DUCKS"),
+ new Tuple2<String, String>("new york", "RANGERS"),
+ new Tuple2<String, String>("new york", "ISLANDERS")));
+
+ JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
+ sc, inputData, 1);
+ JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+ JavaPairDStream<String, String> mapped = pairStream.mapValues(new Function<String, String>() {
+ @Override
+ public String call(String s) throws Exception {
+ return s.toUpperCase();
+ }
+ });
+
+ JavaTestUtils.attachTestOutputStream(mapped);
+ List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(sc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testFlatMapValues() {
+ List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+
+ List<List<Tuple2<String, String>>> expected = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "dodgers1"),
+ new Tuple2<String, String>("california", "dodgers2"),
+ new Tuple2<String, String>("california", "giants1"),
+ new Tuple2<String, String>("california", "giants2"),
+ new Tuple2<String, String>("new york", "yankees1"),
+ new Tuple2<String, String>("new york", "yankees2"),
+ new Tuple2<String, String>("new york", "mets1"),
+ new Tuple2<String, String>("new york", "mets2")),
+ Arrays.asList(new Tuple2<String, String>("california", "sharks1"),
+ new Tuple2<String, String>("california", "sharks2"),
+ new Tuple2<String, String>("california", "ducks1"),
+ new Tuple2<String, String>("california", "ducks2"),
+ new Tuple2<String, String>("new york", "rangers1"),
+ new Tuple2<String, String>("new york", "rangers2"),
+ new Tuple2<String, String>("new york", "islanders1"),
+ new Tuple2<String, String>("new york", "islanders2")));
+
+ JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
+ sc, inputData, 1);
+ JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+
+
+ JavaPairDStream<String, String> flatMapped = pairStream.flatMapValues(
+ new Function<String, Iterable<String>>() {
+ @Override
+ public Iterable<String> call(String in) {
+ List<String> out = new ArrayList<String>();
+ out.add(in + "1");
+ out.add(in + "2");
+ return out;
+ }
+ });
+
+ JavaTestUtils.attachTestOutputStream(flatMapped);
+ List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(sc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testCoGroup() {
+ List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
+ new Tuple2<String, String>("new york", "yankees")),
+ Arrays.asList(new Tuple2<String, String>("california", "sharks"),
+ new Tuple2<String, String>("new york", "rangers")));
+
+ List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "giants"),
+ new Tuple2<String, String>("new york", "mets")),
+ Arrays.asList(new Tuple2<String, String>("california", "ducks"),
+ new Tuple2<String, String>("new york", "islanders")));
+
+
+ List<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, Tuple2<List<String>, List<String>>>("california",
+ new Tuple2<List<String>, List<String>>(Arrays.asList("dodgers"), Arrays.asList("giants"))),
+ new Tuple2<String, Tuple2<List<String>, List<String>>>("new york",
+ new Tuple2<List<String>, List<String>>(Arrays.asList("yankees"), Arrays.asList("mets")))),
+ Arrays.asList(
+ new Tuple2<String, Tuple2<List<String>, List<String>>>("california",
+ new Tuple2<List<String>, List<String>>(Arrays.asList("sharks"), Arrays.asList("ducks"))),
+ new Tuple2<String, Tuple2<List<String>, List<String>>>("new york",
+ new Tuple2<List<String>, List<String>>(Arrays.asList("rangers"), Arrays.asList("islanders")))));
+
+
+ JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
+ sc, stringStringKVStream1, 1);
+ JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
+
+ JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
+ sc, stringStringKVStream2, 1);
+ JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
+
+ JavaPairDStream<String, Tuple2<List<String>, List<String>>> grouped = pairStream1.cogroup(pairStream2);
+ JavaTestUtils.attachTestOutputStream(grouped);
+ List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(sc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testJoin() {
+ List<List<Tuple2<String, String>>> stringStringKVStream1 = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "dodgers"),
+ new Tuple2<String, String>("new york", "yankees")),
+ Arrays.asList(new Tuple2<String, String>("california", "sharks"),
+ new Tuple2<String, String>("new york", "rangers")));
+
+ List<List<Tuple2<String, String>>> stringStringKVStream2 = Arrays.asList(
+ Arrays.asList(new Tuple2<String, String>("california", "giants"),
+ new Tuple2<String, String>("new york", "mets")),
+ Arrays.asList(new Tuple2<String, String>("california", "ducks"),
+ new Tuple2<String, String>("new york", "islanders")));
+
+
+ List<List<Tuple2<String, Tuple2<String, String>>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, Tuple2<String, String>>("california",
+ new Tuple2<String, String>("dodgers", "giants")),
+ new Tuple2<String, Tuple2<String, String>>("new york",
+ new Tuple2<String, String>("yankees", "mets"))),
+ Arrays.asList(
+ new Tuple2<String, Tuple2<String, String>>("california",
+ new Tuple2<String, String>("sharks", "ducks")),
+ new Tuple2<String, Tuple2<String, String>>("new york",
+ new Tuple2<String, String>("rangers", "islanders"))));
+
+
+ JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
+ sc, stringStringKVStream1, 1);
+ JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
+
+ JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
+ sc, stringStringKVStream2, 1);
+ JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
+
+ JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.join(pairStream2);
+ JavaTestUtils.attachTestOutputStream(joined);
+ List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(sc, 2, 2);
+
+ Assert.assertEquals(expected, result);
+ }
+
+ @Test
+ public void testCheckpointMasterRecovery() throws InterruptedException {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("this", "is"),
+ Arrays.asList("a", "test"),
+ Arrays.asList("counting", "letters"));
+
+ List<List<Integer>> expectedInitial = Arrays.asList(
+ Arrays.asList(4,2));
+ List<List<Integer>> expectedFinal = Arrays.asList(
+ Arrays.asList(1,4),
+ Arrays.asList(8,7));
+
+
+ File tempDir = Files.createTempDir();
+ sc.checkpoint(tempDir.getAbsolutePath(), new Duration(1000));
+
+ JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream letterCount = stream.map(new Function<String, Integer>() {
+ @Override
+ public Integer call(String s) throws Exception {
+ return s.length();
+ }
+ });
+ JavaCheckpointTestUtils.attachTestOutputStream(letterCount);
+ List<List<Integer>> initialResult = JavaTestUtils.runStreams(sc, 1, 1);
+
+ assertOrderInvariantEquals(expectedInitial, initialResult);
+ Thread.sleep(1000);
+
+ sc.stop();
+ sc = new JavaStreamingContext(tempDir.getAbsolutePath());
+ sc.start();
+ List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(sc, 2, 2);
+ assertOrderInvariantEquals(expectedFinal, finalResult);
+ }
+
+ /** TEST DISABLED: Pending a discussion about checkpoint() semantics with TD
+ @Test
+ public void testCheckpointofIndividualStream() throws InterruptedException {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("this", "is"),
+ Arrays.asList("a", "test"),
+ Arrays.asList("counting", "letters"));
+
+ List<List<Integer>> expected = Arrays.asList(
+ Arrays.asList(4,2),
+ Arrays.asList(1,4),
+ Arrays.asList(8,7));
+
+ JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(sc, inputData, 1);
+ JavaDStream letterCount = stream.map(new Function<String, Integer>() {
+ @Override
+ public Integer call(String s) throws Exception {
+ return s.length();
+ }
+ });
+ JavaCheckpointTestUtils.attachTestOutputStream(letterCount);
+
+ letterCount.checkpoint(new Duration(1000));
+
+ List<List<Integer>> result1 = JavaCheckpointTestUtils.runStreams(sc, 3, 3);
+ assertOrderInvariantEquals(expected, result1);
+ }
+ */
+
+ // Input stream tests. These mostly just test that we can instantiate a given InputStream with
+ // Java arguments and assign it to a JavaDStream without producing type errors. Testing of the
+ // InputStream functionality is deferred to the existing Scala tests.
+ @Test
+ public void testKafkaStream() {
+ HashMap<String, Integer> topics = Maps.newHashMap();
+ HashMap<KafkaPartitionKey, Long> offsets = Maps.newHashMap();
+ JavaDStream test1 = sc.kafkaStream("localhost", 12345, "group", topics);
+ JavaDStream test2 = sc.kafkaStream("localhost", 12345, "group", topics, offsets);
+ JavaDStream test3 = sc.kafkaStream("localhost", 12345, "group", topics, offsets,
+ StorageLevel.MEMORY_AND_DISK());
+ }
+
+ @Test
+ public void testNetworkTextStream() {
+ JavaDStream test = sc.networkTextStream("localhost", 12345);
+ }
+
+ @Test
+ public void testNetworkString() {
+ class Converter extends Function<InputStream, Iterable<String>> {
+ public Iterable<String> call(InputStream in) {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+ List<String> out = new ArrayList<String>();
+ try {
+ while (true) {
+ String line = reader.readLine();
+ if (line == null) { break; }
+ out.add(line);
+ }
+ } catch (IOException e) { }
+ return out;
+ }
+ }
+
+ JavaDStream test = sc.networkStream(
+ "localhost",
+ 12345,
+ new Converter(),
+ StorageLevel.MEMORY_ONLY());
+ }
+
+ @Test
+ public void testTextFileStream() {
+ JavaDStream test = sc.textFileStream("/tmp/foo");
+ }
+
+ @Test
+ public void testRawNetworkStream() {
+ JavaDStream test = sc.rawNetworkStream("localhost", 12345);
+ }
+
+ @Test
+ public void testFlumeStream() {
+ JavaDStream test = sc.flumeStream("localhost", 12345);
+ }
+
+ @Test
+ public void testFileStream() {
+ JavaPairDStream<String, String> foo =
+ sc.<String, String, SequenceFileInputFormat>fileStream("/tmp/foo");
+ }
+}