From 280b6d018691810bbb3dd3155f059132b4475995 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Thu, 10 Jan 2013 19:52:45 -0800 Subject: Porting to new Duration class --- .../spark/streaming/api/java/JavaDStream.scala | 28 ++++---- .../spark/streaming/api/java/JavaDStreamLike.scala | 24 +++---- .../spark/streaming/api/java/JavaPairDStream.scala | 76 +++++++++++----------- .../streaming/api/java/JavaStreamingContext.scala | 8 +-- .../test/scala/spark/streaming/JavaAPISuite.java | 20 +++--- 5 files changed, 78 insertions(+), 78 deletions(-) (limited to 'streaming/src') diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala index 1e5c279e2c..f85864df5d 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala @@ -1,6 +1,6 @@ package spark.streaming.api.java -import spark.streaming.{Time, DStream} +import spark.streaming.{Duration, Time, DStream} import spark.api.java.function.{Function => JFunction} import spark.api.java.JavaRDD import java.util.{List => JList} @@ -22,7 +22,7 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM /** Persists the RDDs of this DStream with the given storage level */ def persist(storageLevel: StorageLevel): JavaDStream[T] = dstream.persist(storageLevel) - /** Method that generates a RDD for the given time */ + /** Method that generates a RDD for the given duration */ def compute(validTime: Time): JavaRDD[T] = { dstream.compute(validTime) match { case Some(rdd) => new JavaRDD(rdd) @@ -33,34 +33,34 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM /** * Return a new DStream which is computed based on windowed batches of this DStream. * The new DStream generates RDDs with the same interval as this DStream. - * @param windowTime width of the window; must be a multiple of this DStream's interval. + * @param windowDuration width of the window; must be a multiple of this DStream's interval. * @return */ - def window(windowTime: Time): JavaDStream[T] = - dstream.window(windowTime) + def window(windowDuration: Duration): JavaDStream[T] = + dstream.window(windowDuration) /** * Return a new DStream which is computed based on windowed batches of this DStream. - * @param windowTime duration (i.e., width) of the window; + * @param windowDuration duration (i.e., width) of the window; * must be a multiple of this DStream's interval - * @param slideTime sliding interval of the window (i.e., the interval after which + * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's interval */ - def window(windowTime: Time, slideTime: Time): JavaDStream[T] = - dstream.window(windowTime, slideTime) + def window(windowDuration: Duration, slideDuration: Duration): JavaDStream[T] = + dstream.window(windowDuration, slideDuration) /** * Returns a new DStream which computed based on tumbling window on this DStream. - * This is equivalent to window(batchTime, batchTime). - * @param batchTime tumbling window duration; must be a multiple of this DStream's interval + * This is equivalent to window(batchDuration, batchDuration). + * @param batchDuration tumbling window duration; must be a multiple of this DStream's interval */ - def tumble(batchTime: Time): JavaDStream[T] = - dstream.tumble(batchTime) + def tumble(batchDuration: Duration): JavaDStream[T] = + dstream.tumble(batchDuration) /** * Returns a new DStream by unifying data of another DStream with this DStream. - * @param that Another DStream having the same interval (i.e., slideTime) as this DStream. + * @param that Another DStream having the same interval (i.e., slideDuration) as this DStream. */ def union(that: JavaDStream[T]): JavaDStream[T] = dstream.union(that.dstream) diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala index 23a0aaaefd..cb58c1351d 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala @@ -30,11 +30,11 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable /** * Returns a new DStream in which each RDD has a single element generated by counting the number - * of elements in a window over this DStream. windowTime and slideTime are as defined in the - * window() operation. This is equivalent to window(windowTime, slideTime).count() + * of elements in a window over this DStream. windowDuration and slideDuration are as defined in the + * window() operation. This is equivalent to window(windowDuration, slideDuration).count() */ - def countByWindow(windowTime: Time, slideTime: Time) : JavaDStream[Int] = { - dstream.countByWindow(windowTime, slideTime) + def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[Int] = { + dstream.countByWindow(windowDuration, slideDuration) } /** @@ -88,22 +88,22 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable /** * Returns a new DStream in which each RDD has a single element generated by reducing all - * elements in a window over this DStream. windowTime and slideTime are as defined in the - * window() operation. This is equivalent to window(windowTime, slideTime).reduce(reduceFunc) + * elements in a window over this DStream. windowDuration and slideDuration are as defined in the + * window() operation. This is equivalent to window(windowDuration, slideDuration).reduce(reduceFunc) */ def reduceByWindow( reduceFunc: JFunction2[T, T, T], invReduceFunc: JFunction2[T, T, T], - windowTime: Time, - slideTime: Time): JavaDStream[T] = { - dstream.reduceByWindow(reduceFunc, invReduceFunc, windowTime, slideTime) + windowDuration: Duration, + slideDuration: Duration): JavaDStream[T] = { + dstream.reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration) } /** - * Returns all the RDDs between 'fromTime' to 'toTime' (both included) + * Returns all the RDDs between 'fromDuration' to 'toDuration' (both included) */ - def slice(fromTime: Time, toTime: Time): JList[JavaRDD[T]] = { - new util.ArrayList(dstream.slice(fromTime, toTime).map(new JavaRDD(_)).toSeq) + def slice(fromDuration: Duration, toDuration: Duration): JList[JavaRDD[T]] = { + new util.ArrayList(dstream.slice(fromDuration, toDuration).map(new JavaRDD(_)).toSeq) } /** diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala index fa46ca9267..03336d040d 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala @@ -38,7 +38,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** Persists the RDDs of this DStream with the given storage level */ def persist(storageLevel: StorageLevel): JavaPairDStream[K, V] = dstream.persist(storageLevel) - /** Method that generates a RDD for the given time */ + /** Method that generates a RDD for the given Duration */ def compute(validTime: Time): JavaPairRDD[K, V] = { dstream.compute(validTime) match { case Some(rdd) => new JavaPairRDD(rdd) @@ -49,34 +49,34 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Return a new DStream which is computed based on windowed batches of this DStream. * The new DStream generates RDDs with the same interval as this DStream. - * @param windowTime width of the window; must be a multiple of this DStream's interval. + * @param windowDuration width of the window; must be a multiple of this DStream's interval. * @return */ - def window(windowTime: Time): JavaPairDStream[K, V] = - dstream.window(windowTime) + def window(windowDuration: Duration): JavaPairDStream[K, V] = + dstream.window(windowDuration) /** * Return a new DStream which is computed based on windowed batches of this DStream. - * @param windowTime duration (i.e., width) of the window; + * @param windowDuration duration (i.e., width) of the window; * must be a multiple of this DStream's interval - * @param slideTime sliding interval of the window (i.e., the interval after which + * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's interval */ - def window(windowTime: Time, slideTime: Time): JavaPairDStream[K, V] = - dstream.window(windowTime, slideTime) + def window(windowDuration: Duration, slideDuration: Duration): JavaPairDStream[K, V] = + dstream.window(windowDuration, slideDuration) /** * Returns a new DStream which computed based on tumbling window on this DStream. - * This is equivalent to window(batchTime, batchTime). - * @param batchTime tumbling window duration; must be a multiple of this DStream's interval + * This is equivalent to window(batchDuration, batchDuration). + * @param batchDuration tumbling window duration; must be a multiple of this DStream's interval */ - def tumble(batchTime: Time): JavaPairDStream[K, V] = - dstream.tumble(batchTime) + def tumble(batchDuration: Duration): JavaPairDStream[K, V] = + dstream.tumble(batchDuration) /** * Returns a new DStream by unifying data of another DStream with this DStream. - * @param that Another DStream having the same interval (i.e., slideTime) as this DStream. + * @param that Another DStream having the same interval (i.e., slideDuration) as this DStream. */ def union(that: JavaPairDStream[K, V]): JavaPairDStream[K, V] = dstream.union(that.dstream) @@ -117,66 +117,66 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( JavaPairDStream.scalaToJavaLong(dstream.countByKey()); } - def groupByKeyAndWindow(windowTime: Time, slideTime: Time): JavaPairDStream[K, JList[V]] = { - dstream.groupByKeyAndWindow(windowTime, slideTime).mapValues(seqAsJavaList _) + def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): JavaPairDStream[K, JList[V]] = { + dstream.groupByKeyAndWindow(windowDuration, slideDuration).mapValues(seqAsJavaList _) } - def groupByKeyAndWindow(windowTime: Time, slideTime: Time, numPartitions: Int) + def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int) :JavaPairDStream[K, JList[V]] = { - dstream.groupByKeyAndWindow(windowTime, slideTime, numPartitions).mapValues(seqAsJavaList _) + dstream.groupByKeyAndWindow(windowDuration, slideDuration, numPartitions).mapValues(seqAsJavaList _) } - def groupByKeyAndWindow(windowTime: Time, slideTime: Time, partitioner: Partitioner) + def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner) :JavaPairDStream[K, JList[V]] = { - dstream.groupByKeyAndWindow(windowTime, slideTime, partitioner).mapValues(seqAsJavaList _) + dstream.groupByKeyAndWindow(windowDuration, slideDuration, partitioner).mapValues(seqAsJavaList _) } - def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowTime: Time) + def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowDuration: Duration) :JavaPairDStream[K, V] = { - dstream.reduceByKeyAndWindow(reduceFunc, windowTime) + dstream.reduceByKeyAndWindow(reduceFunc, windowDuration) } - def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowTime: Time, slideTime: Time) + def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowDuration: Duration, slideDuration: Duration) :JavaPairDStream[K, V] = { - dstream.reduceByKeyAndWindow(reduceFunc, windowTime, slideTime) + dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration) } def reduceByKeyAndWindow( reduceFunc: Function2[V, V, V], - windowTime: Time, - slideTime: Time, + windowDuration: Duration, + slideDuration: Duration, numPartitions: Int): JavaPairDStream[K, V] = { - dstream.reduceByKeyAndWindow(reduceFunc, windowTime, slideTime, numPartitions) + dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, numPartitions) } - def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowTime: Time, slideTime: Time, + def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner): JavaPairDStream[K, V] = { - dstream.reduceByKeyAndWindow(reduceFunc, windowTime, slideTime, partitioner) + dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, partitioner) } def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], invReduceFunc: Function2[V, V, V], - windowTime: Time, slideTime: Time): JavaPairDStream[K, V] = { - dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowTime, slideTime) + windowDuration: Duration, slideDuration: Duration): JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration) } def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], invReduceFunc: Function2[V, V, V], - windowTime: Time, slideTime: Time, numPartitions: Int): JavaPairDStream[K, V] = { - dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowTime, slideTime, numPartitions) + windowDuration: Duration, slideDuration: Duration, numPartitions: Int): JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, numPartitions) } def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], invReduceFunc: Function2[V, V, V], - windowTime: Time, slideTime: Time, partitioner: Partitioner) + windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner) : JavaPairDStream[K, V] = { - dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowTime, slideTime, partitioner) + dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, partitioner) } - def countByKeyAndWindow(windowTime: Time, slideTime: Time): JavaPairDStream[K, JLong] = { - JavaPairDStream.scalaToJavaLong(dstream.countByKeyAndWindow(windowTime, slideTime)) + def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): JavaPairDStream[K, JLong] = { + JavaPairDStream.scalaToJavaLong(dstream.countByKeyAndWindow(windowDuration, slideDuration)) } - def countByKeyAndWindow(windowTime: Time, slideTime: Time, numPartitions: Int) + def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int) : JavaPairDStream[K, Long] = { - dstream.countByKeyAndWindow(windowTime, slideTime, numPartitions) + dstream.countByKeyAndWindow(windowDuration, slideDuration, numPartitions) } def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = { diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala index e8cd03847a..5a712d18c7 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -12,7 +12,7 @@ import java.io.InputStream import java.util.{Map => JMap} class JavaStreamingContext(val ssc: StreamingContext) { - def this(master: String, frameworkName: String, batchDuration: Time) = + def this(master: String, frameworkName: String, batchDuration: Duration) = this(new StreamingContext(master, frameworkName, batchDuration)) // TODOs: @@ -190,18 +190,18 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored * @param interval checkpoint interval */ - def checkpoint(directory: String, interval: Time = null) { + def checkpoint(directory: String, interval: Duration = null) { ssc.checkpoint(directory, interval) } /** * Sets each DStreams in this context to remember RDDs it generated in the last given duration. - * DStreams remember RDDs only for a limited duration of time and releases them for garbage + * DStreams remember RDDs only for a limited duration of duration and releases them for garbage * collection. This method allows the developer to specify how to long to remember the RDDs ( * if the developer wishes to query old data outside the DStream computation). * @param duration Minimum duration that each DStream should remember its RDDs */ - def remember(duration: Time) { + def remember(duration: Duration) { ssc.remember(duration) } diff --git a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java index 6584d861ed..26ff5b1ccd 100644 --- a/streaming/src/test/scala/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/scala/spark/streaming/JavaAPISuite.java @@ -29,7 +29,7 @@ public class JavaAPISuite implements Serializable { @Before public void setUp() { - sc = new JavaStreamingContext("local[2]", "test", new Time(1000)); + sc = new JavaStreamingContext("local[2]", "test", new Duration(1000)); } @After @@ -96,7 +96,7 @@ public class JavaAPISuite implements Serializable { Arrays.asList(7,8,9)); JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); - JavaDStream windowed = stream.window(new Time(2000)); + JavaDStream windowed = stream.window(new Duration(2000)); JavaTestUtils.attachTestOutputStream(windowed); List> result = JavaTestUtils.runStreams(sc, 4, 4); @@ -104,7 +104,7 @@ public class JavaAPISuite implements Serializable { } @Test - public void testWindowWithSlideTime() { + public void testWindowWithSlideDuration() { List> inputData = Arrays.asList( Arrays.asList(1,2,3), Arrays.asList(4,5,6), @@ -120,7 +120,7 @@ public class JavaAPISuite implements Serializable { Arrays.asList(13,14,15,16,17,18)); JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); - JavaDStream windowed = stream.window(new Time(4000), new Time(2000)); + JavaDStream windowed = stream.window(new Duration(4000), new Duration(2000)); JavaTestUtils.attachTestOutputStream(windowed); List> result = JavaTestUtils.runStreams(sc, 8, 4); @@ -143,7 +143,7 @@ public class JavaAPISuite implements Serializable { Arrays.asList(13,14,15,16,17,18)); JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); - JavaDStream windowed = stream.tumble(new Time(2000)); + JavaDStream windowed = stream.tumble(new Duration(2000)); JavaTestUtils.attachTestOutputStream(windowed); List> result = JavaTestUtils.runStreams(sc, 6, 3); @@ -267,7 +267,7 @@ public class JavaAPISuite implements Serializable { JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1); JavaDStream reducedWindowed = stream.reduceByWindow(new IntegerSum(), - new IntegerDifference(), new Time(2000), new Time(1000)); + new IntegerDifference(), new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(reducedWindowed); List> result = JavaTestUtils.runStreams(sc, 4, 4); @@ -517,7 +517,7 @@ public class JavaAPISuite implements Serializable { JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream> groupWindowed = - pairStream.groupByKeyAndWindow(new Time(2000), new Time(1000)); + pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(groupWindowed); List>>> result = JavaTestUtils.runStreams(sc, 3, 3); @@ -540,7 +540,7 @@ public class JavaAPISuite implements Serializable { JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream reduceWindowed = - pairStream.reduceByKeyAndWindow(new IntegerSum(), new Time(2000), new Time(1000)); + pairStream.reduceByKeyAndWindow(new IntegerSum(), new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(reduceWindowed); List>> result = JavaTestUtils.runStreams(sc, 3, 3); @@ -563,7 +563,7 @@ public class JavaAPISuite implements Serializable { JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream reduceWindowed = - pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(), new Time(2000), new Time(1000)); + pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(), new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(reduceWindowed); List>> result = JavaTestUtils.runStreams(sc, 3, 3); @@ -590,7 +590,7 @@ public class JavaAPISuite implements Serializable { JavaPairDStream pairStream = JavaPairDStream.fromJavaDStream(stream); JavaPairDStream counted = - pairStream.countByKeyAndWindow(new Time(2000), new Time(1000)); + pairStream.countByKeyAndWindow(new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(counted); List>> result = JavaTestUtils.runStreams(sc, 3, 3); -- cgit v1.2.3