aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-01-10 19:52:45 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-01-14 09:42:36 -0800
commit280b6d018691810bbb3dd3155f059132b4475995 (patch)
tree1197a3ec7737ed413e15a8153fd637b3462a96c3 /streaming
parentc2537057f9ed8723d2c33a1636edf9c9547cdc66 (diff)
downloadspark-280b6d018691810bbb3dd3155f059132b4475995.tar.gz
spark-280b6d018691810bbb3dd3155f059132b4475995.tar.bz2
spark-280b6d018691810bbb3dd3155f059132b4475995.zip
Porting to new Duration class
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala28
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala24
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala76
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala8
-rw-r--r--streaming/src/test/scala/spark/streaming/JavaAPISuite.java20
5 files changed, 78 insertions, 78 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
index 1e5c279e2c..f85864df5d 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
@@ -1,6 +1,6 @@
package spark.streaming.api.java
-import spark.streaming.{Time, DStream}
+import spark.streaming.{Duration, Time, DStream}
import spark.api.java.function.{Function => JFunction}
import spark.api.java.JavaRDD
import java.util.{List => JList}
@@ -22,7 +22,7 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM
/** Persists the RDDs of this DStream with the given storage level */
def persist(storageLevel: StorageLevel): JavaDStream[T] = dstream.persist(storageLevel)
- /** Method that generates a RDD for the given time */
+ /** Method that generates a RDD for the given duration */
def compute(validTime: Time): JavaRDD[T] = {
dstream.compute(validTime) match {
case Some(rdd) => new JavaRDD(rdd)
@@ -33,34 +33,34 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM
/**
* Return a new DStream which is computed based on windowed batches of this DStream.
* The new DStream generates RDDs with the same interval as this DStream.
- * @param windowTime width of the window; must be a multiple of this DStream's interval.
+ * @param windowDuration width of the window; must be a multiple of this DStream's interval.
* @return
*/
- def window(windowTime: Time): JavaDStream[T] =
- dstream.window(windowTime)
+ def window(windowDuration: Duration): JavaDStream[T] =
+ dstream.window(windowDuration)
/**
* Return a new DStream which is computed based on windowed batches of this DStream.
- * @param windowTime duration (i.e., width) of the window;
+ * @param windowDuration duration (i.e., width) of the window;
* must be a multiple of this DStream's interval
- * @param slideTime sliding interval of the window (i.e., the interval after which
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's interval
*/
- def window(windowTime: Time, slideTime: Time): JavaDStream[T] =
- dstream.window(windowTime, slideTime)
+ def window(windowDuration: Duration, slideDuration: Duration): JavaDStream[T] =
+ dstream.window(windowDuration, slideDuration)
/**
* Returns a new DStream which computed based on tumbling window on this DStream.
- * This is equivalent to window(batchTime, batchTime).
- * @param batchTime tumbling window duration; must be a multiple of this DStream's interval
+ * This is equivalent to window(batchDuration, batchDuration).
+ * @param batchDuration tumbling window duration; must be a multiple of this DStream's interval
*/
- def tumble(batchTime: Time): JavaDStream[T] =
- dstream.tumble(batchTime)
+ def tumble(batchDuration: Duration): JavaDStream[T] =
+ dstream.tumble(batchDuration)
/**
* Returns a new DStream by unifying data of another DStream with this DStream.
- * @param that Another DStream having the same interval (i.e., slideTime) as this DStream.
+ * @param that Another DStream having the same interval (i.e., slideDuration) as this DStream.
*/
def union(that: JavaDStream[T]): JavaDStream[T] =
dstream.union(that.dstream)
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
index 23a0aaaefd..cb58c1351d 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
@@ -30,11 +30,11 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
/**
* Returns a new DStream in which each RDD has a single element generated by counting the number
- * of elements in a window over this DStream. windowTime and slideTime are as defined in the
- * window() operation. This is equivalent to window(windowTime, slideTime).count()
+ * of elements in a window over this DStream. windowDuration and slideDuration are as defined in the
+ * window() operation. This is equivalent to window(windowDuration, slideDuration).count()
*/
- def countByWindow(windowTime: Time, slideTime: Time) : JavaDStream[Int] = {
- dstream.countByWindow(windowTime, slideTime)
+ def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[Int] = {
+ dstream.countByWindow(windowDuration, slideDuration)
}
/**
@@ -88,22 +88,22 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
/**
* Returns a new DStream in which each RDD has a single element generated by reducing all
- * elements in a window over this DStream. windowTime and slideTime are as defined in the
- * window() operation. This is equivalent to window(windowTime, slideTime).reduce(reduceFunc)
+ * elements in a window over this DStream. windowDuration and slideDuration are as defined in the
+ * window() operation. This is equivalent to window(windowDuration, slideDuration).reduce(reduceFunc)
*/
def reduceByWindow(
reduceFunc: JFunction2[T, T, T],
invReduceFunc: JFunction2[T, T, T],
- windowTime: Time,
- slideTime: Time): JavaDStream[T] = {
- dstream.reduceByWindow(reduceFunc, invReduceFunc, windowTime, slideTime)
+ windowDuration: Duration,
+ slideDuration: Duration): JavaDStream[T] = {
+ dstream.reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration)
}
/**
- * Returns all the RDDs between 'fromTime' to 'toTime' (both included)
+ * Returns all the RDDs between 'fromDuration' to 'toDuration' (both included)
*/
- def slice(fromTime: Time, toTime: Time): JList[JavaRDD[T]] = {
- new util.ArrayList(dstream.slice(fromTime, toTime).map(new JavaRDD(_)).toSeq)
+ def slice(fromDuration: Duration, toDuration: Duration): JList[JavaRDD[T]] = {
+ new util.ArrayList(dstream.slice(fromDuration, toDuration).map(new JavaRDD(_)).toSeq)
}
/**
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
index fa46ca9267..03336d040d 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
@@ -38,7 +38,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/** Persists the RDDs of this DStream with the given storage level */
def persist(storageLevel: StorageLevel): JavaPairDStream[K, V] = dstream.persist(storageLevel)
- /** Method that generates a RDD for the given time */
+ /** Method that generates a RDD for the given Duration */
def compute(validTime: Time): JavaPairRDD[K, V] = {
dstream.compute(validTime) match {
case Some(rdd) => new JavaPairRDD(rdd)
@@ -49,34 +49,34 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Return a new DStream which is computed based on windowed batches of this DStream.
* The new DStream generates RDDs with the same interval as this DStream.
- * @param windowTime width of the window; must be a multiple of this DStream's interval.
+ * @param windowDuration width of the window; must be a multiple of this DStream's interval.
* @return
*/
- def window(windowTime: Time): JavaPairDStream[K, V] =
- dstream.window(windowTime)
+ def window(windowDuration: Duration): JavaPairDStream[K, V] =
+ dstream.window(windowDuration)
/**
* Return a new DStream which is computed based on windowed batches of this DStream.
- * @param windowTime duration (i.e., width) of the window;
+ * @param windowDuration duration (i.e., width) of the window;
* must be a multiple of this DStream's interval
- * @param slideTime sliding interval of the window (i.e., the interval after which
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's interval
*/
- def window(windowTime: Time, slideTime: Time): JavaPairDStream[K, V] =
- dstream.window(windowTime, slideTime)
+ def window(windowDuration: Duration, slideDuration: Duration): JavaPairDStream[K, V] =
+ dstream.window(windowDuration, slideDuration)
/**
* Returns a new DStream which computed based on tumbling window on this DStream.
- * This is equivalent to window(batchTime, batchTime).
- * @param batchTime tumbling window duration; must be a multiple of this DStream's interval
+ * This is equivalent to window(batchDuration, batchDuration).
+ * @param batchDuration tumbling window duration; must be a multiple of this DStream's interval
*/
- def tumble(batchTime: Time): JavaPairDStream[K, V] =
- dstream.tumble(batchTime)
+ def tumble(batchDuration: Duration): JavaPairDStream[K, V] =
+ dstream.tumble(batchDuration)
/**
* Returns a new DStream by unifying data of another DStream with this DStream.
- * @param that Another DStream having the same interval (i.e., slideTime) as this DStream.
+ * @param that Another DStream having the same interval (i.e., slideDuration) as this DStream.
*/
def union(that: JavaPairDStream[K, V]): JavaPairDStream[K, V] =
dstream.union(that.dstream)
@@ -117,66 +117,66 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
JavaPairDStream.scalaToJavaLong(dstream.countByKey());
}
- def groupByKeyAndWindow(windowTime: Time, slideTime: Time): JavaPairDStream[K, JList[V]] = {
- dstream.groupByKeyAndWindow(windowTime, slideTime).mapValues(seqAsJavaList _)
+ def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): JavaPairDStream[K, JList[V]] = {
+ dstream.groupByKeyAndWindow(windowDuration, slideDuration).mapValues(seqAsJavaList _)
}
- def groupByKeyAndWindow(windowTime: Time, slideTime: Time, numPartitions: Int)
+ def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int)
:JavaPairDStream[K, JList[V]] = {
- dstream.groupByKeyAndWindow(windowTime, slideTime, numPartitions).mapValues(seqAsJavaList _)
+ dstream.groupByKeyAndWindow(windowDuration, slideDuration, numPartitions).mapValues(seqAsJavaList _)
}
- def groupByKeyAndWindow(windowTime: Time, slideTime: Time, partitioner: Partitioner)
+ def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner)
:JavaPairDStream[K, JList[V]] = {
- dstream.groupByKeyAndWindow(windowTime, slideTime, partitioner).mapValues(seqAsJavaList _)
+ dstream.groupByKeyAndWindow(windowDuration, slideDuration, partitioner).mapValues(seqAsJavaList _)
}
- def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowTime: Time)
+ def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowDuration: Duration)
:JavaPairDStream[K, V] = {
- dstream.reduceByKeyAndWindow(reduceFunc, windowTime)
+ dstream.reduceByKeyAndWindow(reduceFunc, windowDuration)
}
- def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowTime: Time, slideTime: Time)
+ def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowDuration: Duration, slideDuration: Duration)
:JavaPairDStream[K, V] = {
- dstream.reduceByKeyAndWindow(reduceFunc, windowTime, slideTime)
+ dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration)
}
def reduceByKeyAndWindow(
reduceFunc: Function2[V, V, V],
- windowTime: Time,
- slideTime: Time,
+ windowDuration: Duration,
+ slideDuration: Duration,
numPartitions: Int): JavaPairDStream[K, V] = {
- dstream.reduceByKeyAndWindow(reduceFunc, windowTime, slideTime, numPartitions)
+ dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, numPartitions)
}
- def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowTime: Time, slideTime: Time,
+ def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowDuration: Duration, slideDuration: Duration,
partitioner: Partitioner): JavaPairDStream[K, V] = {
- dstream.reduceByKeyAndWindow(reduceFunc, windowTime, slideTime, partitioner)
+ dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, partitioner)
}
def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], invReduceFunc: Function2[V, V, V],
- windowTime: Time, slideTime: Time): JavaPairDStream[K, V] = {
- dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowTime, slideTime)
+ windowDuration: Duration, slideDuration: Duration): JavaPairDStream[K, V] = {
+ dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration)
}
def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], invReduceFunc: Function2[V, V, V],
- windowTime: Time, slideTime: Time, numPartitions: Int): JavaPairDStream[K, V] = {
- dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowTime, slideTime, numPartitions)
+ windowDuration: Duration, slideDuration: Duration, numPartitions: Int): JavaPairDStream[K, V] = {
+ dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, numPartitions)
}
def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], invReduceFunc: Function2[V, V, V],
- windowTime: Time, slideTime: Time, partitioner: Partitioner)
+ windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner)
: JavaPairDStream[K, V] = {
- dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowTime, slideTime, partitioner)
+ dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, partitioner)
}
- def countByKeyAndWindow(windowTime: Time, slideTime: Time): JavaPairDStream[K, JLong] = {
- JavaPairDStream.scalaToJavaLong(dstream.countByKeyAndWindow(windowTime, slideTime))
+ def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): JavaPairDStream[K, JLong] = {
+ JavaPairDStream.scalaToJavaLong(dstream.countByKeyAndWindow(windowDuration, slideDuration))
}
- def countByKeyAndWindow(windowTime: Time, slideTime: Time, numPartitions: Int)
+ def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int)
: JavaPairDStream[K, Long] = {
- dstream.countByKeyAndWindow(windowTime, slideTime, numPartitions)
+ dstream.countByKeyAndWindow(windowDuration, slideDuration, numPartitions)
}
def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = {
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index e8cd03847a..5a712d18c7 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -12,7 +12,7 @@ import java.io.InputStream
import java.util.{Map => JMap}
class JavaStreamingContext(val ssc: StreamingContext) {
- def this(master: String, frameworkName: String, batchDuration: Time) =
+ def this(master: String, frameworkName: String, batchDuration: Duration) =
this(new StreamingContext(master, frameworkName, batchDuration))
// TODOs:
@@ -190,18 +190,18 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param directory HDFS-compatible directory where the checkpoint data will be reliably stored
* @param interval checkpoint interval
*/
- def checkpoint(directory: String, interval: Time = null) {
+ def checkpoint(directory: String, interval: Duration = null) {
ssc.checkpoint(directory, interval)
}
/**
* Sets each DStreams in this context to remember RDDs it generated in the last given duration.
- * DStreams remember RDDs only for a limited duration of time and releases them for garbage
+ * DStreams remember RDDs only for a limited duration of duration and releases them for garbage
* collection. This method allows the developer to specify how to long to remember the RDDs (
* if the developer wishes to query old data outside the DStream computation).
* @param duration Minimum duration that each DStream should remember its RDDs
*/
- def remember(duration: Time) {
+ def remember(duration: Duration) {
ssc.remember(duration)
}
diff --git a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
index 6584d861ed..26ff5b1ccd 100644
--- a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java
@@ -29,7 +29,7 @@ public class JavaAPISuite implements Serializable {
@Before
public void setUp() {
- sc = new JavaStreamingContext("local[2]", "test", new Time(1000));
+ sc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
}
@After
@@ -96,7 +96,7 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(7,8,9));
JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
- JavaDStream windowed = stream.window(new Time(2000));
+ JavaDStream windowed = stream.window(new Duration(2000));
JavaTestUtils.attachTestOutputStream(windowed);
List<List<Integer>> result = JavaTestUtils.runStreams(sc, 4, 4);
@@ -104,7 +104,7 @@ public class JavaAPISuite implements Serializable {
}
@Test
- public void testWindowWithSlideTime() {
+ public void testWindowWithSlideDuration() {
List<List<Integer>> inputData = Arrays.asList(
Arrays.asList(1,2,3),
Arrays.asList(4,5,6),
@@ -120,7 +120,7 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(13,14,15,16,17,18));
JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
- JavaDStream windowed = stream.window(new Time(4000), new Time(2000));
+ JavaDStream windowed = stream.window(new Duration(4000), new Duration(2000));
JavaTestUtils.attachTestOutputStream(windowed);
List<List<Integer>> result = JavaTestUtils.runStreams(sc, 8, 4);
@@ -143,7 +143,7 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(13,14,15,16,17,18));
JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
- JavaDStream windowed = stream.tumble(new Time(2000));
+ JavaDStream windowed = stream.tumble(new Duration(2000));
JavaTestUtils.attachTestOutputStream(windowed);
List<List<Integer>> result = JavaTestUtils.runStreams(sc, 6, 3);
@@ -267,7 +267,7 @@ public class JavaAPISuite implements Serializable {
JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
JavaDStream reducedWindowed = stream.reduceByWindow(new IntegerSum(),
- new IntegerDifference(), new Time(2000), new Time(1000));
+ new IntegerDifference(), new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(reducedWindowed);
List<List<Integer>> result = JavaTestUtils.runStreams(sc, 4, 4);
@@ -517,7 +517,7 @@ public class JavaAPISuite implements Serializable {
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, List<String>> groupWindowed =
- pairStream.groupByKeyAndWindow(new Time(2000), new Time(1000));
+ pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(groupWindowed);
List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(sc, 3, 3);
@@ -540,7 +540,7 @@ public class JavaAPISuite implements Serializable {
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, Integer> reduceWindowed =
- pairStream.reduceByKeyAndWindow(new IntegerSum(), new Time(2000), new Time(1000));
+ pairStream.reduceByKeyAndWindow(new IntegerSum(), new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(reduceWindowed);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 3, 3);
@@ -563,7 +563,7 @@ public class JavaAPISuite implements Serializable {
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, Integer> reduceWindowed =
- pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(), new Time(2000), new Time(1000));
+ pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(), new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(reduceWindowed);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 3, 3);
@@ -590,7 +590,7 @@ public class JavaAPISuite implements Serializable {
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, Long> counted =
- pairStream.countByKeyAndWindow(new Time(2000), new Time(1000));
+ pairStream.countByKeyAndWindow(new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(counted);
List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(sc, 3, 3);