diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-01-10 19:52:45 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-01-14 09:42:36 -0800 |
commit | 280b6d018691810bbb3dd3155f059132b4475995 (patch) | |
tree | 1197a3ec7737ed413e15a8153fd637b3462a96c3 /streaming/src/main | |
parent | c2537057f9ed8723d2c33a1636edf9c9547cdc66 (diff) | |
download | spark-280b6d018691810bbb3dd3155f059132b4475995.tar.gz spark-280b6d018691810bbb3dd3155f059132b4475995.tar.bz2 spark-280b6d018691810bbb3dd3155f059132b4475995.zip |
Porting to new Duration class
Diffstat (limited to 'streaming/src/main')
4 files changed, 68 insertions, 68 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala index 1e5c279e2c..f85864df5d 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala @@ -1,6 +1,6 @@ package spark.streaming.api.java -import spark.streaming.{Time, DStream} +import spark.streaming.{Duration, Time, DStream} import spark.api.java.function.{Function => JFunction} import spark.api.java.JavaRDD import java.util.{List => JList} @@ -22,7 +22,7 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM /** Persists the RDDs of this DStream with the given storage level */ def persist(storageLevel: StorageLevel): JavaDStream[T] = dstream.persist(storageLevel) - /** Method that generates a RDD for the given time */ + /** Method that generates a RDD for the given duration */ def compute(validTime: Time): JavaRDD[T] = { dstream.compute(validTime) match { case Some(rdd) => new JavaRDD(rdd) @@ -33,34 +33,34 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM /** * Return a new DStream which is computed based on windowed batches of this DStream. * The new DStream generates RDDs with the same interval as this DStream. - * @param windowTime width of the window; must be a multiple of this DStream's interval. + * @param windowDuration width of the window; must be a multiple of this DStream's interval. * @return */ - def window(windowTime: Time): JavaDStream[T] = - dstream.window(windowTime) + def window(windowDuration: Duration): JavaDStream[T] = + dstream.window(windowDuration) /** * Return a new DStream which is computed based on windowed batches of this DStream. - * @param windowTime duration (i.e., width) of the window; + * @param windowDuration duration (i.e., width) of the window; * must be a multiple of this DStream's interval - * @param slideTime sliding interval of the window (i.e., the interval after which + * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's interval */ - def window(windowTime: Time, slideTime: Time): JavaDStream[T] = - dstream.window(windowTime, slideTime) + def window(windowDuration: Duration, slideDuration: Duration): JavaDStream[T] = + dstream.window(windowDuration, slideDuration) /** * Returns a new DStream which computed based on tumbling window on this DStream. - * This is equivalent to window(batchTime, batchTime). - * @param batchTime tumbling window duration; must be a multiple of this DStream's interval + * This is equivalent to window(batchDuration, batchDuration). + * @param batchDuration tumbling window duration; must be a multiple of this DStream's interval */ - def tumble(batchTime: Time): JavaDStream[T] = - dstream.tumble(batchTime) + def tumble(batchDuration: Duration): JavaDStream[T] = + dstream.tumble(batchDuration) /** * Returns a new DStream by unifying data of another DStream with this DStream. - * @param that Another DStream having the same interval (i.e., slideTime) as this DStream. + * @param that Another DStream having the same interval (i.e., slideDuration) as this DStream. */ def union(that: JavaDStream[T]): JavaDStream[T] = dstream.union(that.dstream) diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala index 23a0aaaefd..cb58c1351d 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala @@ -30,11 +30,11 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable /** * Returns a new DStream in which each RDD has a single element generated by counting the number - * of elements in a window over this DStream. windowTime and slideTime are as defined in the - * window() operation. This is equivalent to window(windowTime, slideTime).count() + * of elements in a window over this DStream. windowDuration and slideDuration are as defined in the + * window() operation. This is equivalent to window(windowDuration, slideDuration).count() */ - def countByWindow(windowTime: Time, slideTime: Time) : JavaDStream[Int] = { - dstream.countByWindow(windowTime, slideTime) + def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[Int] = { + dstream.countByWindow(windowDuration, slideDuration) } /** @@ -88,22 +88,22 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable /** * Returns a new DStream in which each RDD has a single element generated by reducing all - * elements in a window over this DStream. windowTime and slideTime are as defined in the - * window() operation. This is equivalent to window(windowTime, slideTime).reduce(reduceFunc) + * elements in a window over this DStream. windowDuration and slideDuration are as defined in the + * window() operation. This is equivalent to window(windowDuration, slideDuration).reduce(reduceFunc) */ def reduceByWindow( reduceFunc: JFunction2[T, T, T], invReduceFunc: JFunction2[T, T, T], - windowTime: Time, - slideTime: Time): JavaDStream[T] = { - dstream.reduceByWindow(reduceFunc, invReduceFunc, windowTime, slideTime) + windowDuration: Duration, + slideDuration: Duration): JavaDStream[T] = { + dstream.reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration) } /** - * Returns all the RDDs between 'fromTime' to 'toTime' (both included) + * Returns all the RDDs between 'fromDuration' to 'toDuration' (both included) */ - def slice(fromTime: Time, toTime: Time): JList[JavaRDD[T]] = { - new util.ArrayList(dstream.slice(fromTime, toTime).map(new JavaRDD(_)).toSeq) + def slice(fromDuration: Duration, toDuration: Duration): JList[JavaRDD[T]] = { + new util.ArrayList(dstream.slice(fromDuration, toDuration).map(new JavaRDD(_)).toSeq) } /** diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala index fa46ca9267..03336d040d 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala @@ -38,7 +38,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** Persists the RDDs of this DStream with the given storage level */ def persist(storageLevel: StorageLevel): JavaPairDStream[K, V] = dstream.persist(storageLevel) - /** Method that generates a RDD for the given time */ + /** Method that generates a RDD for the given Duration */ def compute(validTime: Time): JavaPairRDD[K, V] = { dstream.compute(validTime) match { case Some(rdd) => new JavaPairRDD(rdd) @@ -49,34 +49,34 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( /** * Return a new DStream which is computed based on windowed batches of this DStream. * The new DStream generates RDDs with the same interval as this DStream. - * @param windowTime width of the window; must be a multiple of this DStream's interval. + * @param windowDuration width of the window; must be a multiple of this DStream's interval. * @return */ - def window(windowTime: Time): JavaPairDStream[K, V] = - dstream.window(windowTime) + def window(windowDuration: Duration): JavaPairDStream[K, V] = + dstream.window(windowDuration) /** * Return a new DStream which is computed based on windowed batches of this DStream. - * @param windowTime duration (i.e., width) of the window; + * @param windowDuration duration (i.e., width) of the window; * must be a multiple of this DStream's interval - * @param slideTime sliding interval of the window (i.e., the interval after which + * @param slideDuration sliding interval of the window (i.e., the interval after which * the new DStream will generate RDDs); must be a multiple of this * DStream's interval */ - def window(windowTime: Time, slideTime: Time): JavaPairDStream[K, V] = - dstream.window(windowTime, slideTime) + def window(windowDuration: Duration, slideDuration: Duration): JavaPairDStream[K, V] = + dstream.window(windowDuration, slideDuration) /** * Returns a new DStream which computed based on tumbling window on this DStream. - * This is equivalent to window(batchTime, batchTime). - * @param batchTime tumbling window duration; must be a multiple of this DStream's interval + * This is equivalent to window(batchDuration, batchDuration). + * @param batchDuration tumbling window duration; must be a multiple of this DStream's interval */ - def tumble(batchTime: Time): JavaPairDStream[K, V] = - dstream.tumble(batchTime) + def tumble(batchDuration: Duration): JavaPairDStream[K, V] = + dstream.tumble(batchDuration) /** * Returns a new DStream by unifying data of another DStream with this DStream. - * @param that Another DStream having the same interval (i.e., slideTime) as this DStream. + * @param that Another DStream having the same interval (i.e., slideDuration) as this DStream. */ def union(that: JavaPairDStream[K, V]): JavaPairDStream[K, V] = dstream.union(that.dstream) @@ -117,66 +117,66 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( JavaPairDStream.scalaToJavaLong(dstream.countByKey()); } - def groupByKeyAndWindow(windowTime: Time, slideTime: Time): JavaPairDStream[K, JList[V]] = { - dstream.groupByKeyAndWindow(windowTime, slideTime).mapValues(seqAsJavaList _) + def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): JavaPairDStream[K, JList[V]] = { + dstream.groupByKeyAndWindow(windowDuration, slideDuration).mapValues(seqAsJavaList _) } - def groupByKeyAndWindow(windowTime: Time, slideTime: Time, numPartitions: Int) + def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int) :JavaPairDStream[K, JList[V]] = { - dstream.groupByKeyAndWindow(windowTime, slideTime, numPartitions).mapValues(seqAsJavaList _) + dstream.groupByKeyAndWindow(windowDuration, slideDuration, numPartitions).mapValues(seqAsJavaList _) } - def groupByKeyAndWindow(windowTime: Time, slideTime: Time, partitioner: Partitioner) + def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner) :JavaPairDStream[K, JList[V]] = { - dstream.groupByKeyAndWindow(windowTime, slideTime, partitioner).mapValues(seqAsJavaList _) + dstream.groupByKeyAndWindow(windowDuration, slideDuration, partitioner).mapValues(seqAsJavaList _) } - def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowTime: Time) + def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowDuration: Duration) :JavaPairDStream[K, V] = { - dstream.reduceByKeyAndWindow(reduceFunc, windowTime) + dstream.reduceByKeyAndWindow(reduceFunc, windowDuration) } - def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowTime: Time, slideTime: Time) + def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowDuration: Duration, slideDuration: Duration) :JavaPairDStream[K, V] = { - dstream.reduceByKeyAndWindow(reduceFunc, windowTime, slideTime) + dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration) } def reduceByKeyAndWindow( reduceFunc: Function2[V, V, V], - windowTime: Time, - slideTime: Time, + windowDuration: Duration, + slideDuration: Duration, numPartitions: Int): JavaPairDStream[K, V] = { - dstream.reduceByKeyAndWindow(reduceFunc, windowTime, slideTime, numPartitions) + dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, numPartitions) } - def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowTime: Time, slideTime: Time, + def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner): JavaPairDStream[K, V] = { - dstream.reduceByKeyAndWindow(reduceFunc, windowTime, slideTime, partitioner) + dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, partitioner) } def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], invReduceFunc: Function2[V, V, V], - windowTime: Time, slideTime: Time): JavaPairDStream[K, V] = { - dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowTime, slideTime) + windowDuration: Duration, slideDuration: Duration): JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration) } def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], invReduceFunc: Function2[V, V, V], - windowTime: Time, slideTime: Time, numPartitions: Int): JavaPairDStream[K, V] = { - dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowTime, slideTime, numPartitions) + windowDuration: Duration, slideDuration: Duration, numPartitions: Int): JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, numPartitions) } def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], invReduceFunc: Function2[V, V, V], - windowTime: Time, slideTime: Time, partitioner: Partitioner) + windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner) : JavaPairDStream[K, V] = { - dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowTime, slideTime, partitioner) + dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, partitioner) } - def countByKeyAndWindow(windowTime: Time, slideTime: Time): JavaPairDStream[K, JLong] = { - JavaPairDStream.scalaToJavaLong(dstream.countByKeyAndWindow(windowTime, slideTime)) + def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): JavaPairDStream[K, JLong] = { + JavaPairDStream.scalaToJavaLong(dstream.countByKeyAndWindow(windowDuration, slideDuration)) } - def countByKeyAndWindow(windowTime: Time, slideTime: Time, numPartitions: Int) + def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int) : JavaPairDStream[K, Long] = { - dstream.countByKeyAndWindow(windowTime, slideTime, numPartitions) + dstream.countByKeyAndWindow(windowDuration, slideDuration, numPartitions) } def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = { diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala index e8cd03847a..5a712d18c7 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala @@ -12,7 +12,7 @@ import java.io.InputStream import java.util.{Map => JMap} class JavaStreamingContext(val ssc: StreamingContext) { - def this(master: String, frameworkName: String, batchDuration: Time) = + def this(master: String, frameworkName: String, batchDuration: Duration) = this(new StreamingContext(master, frameworkName, batchDuration)) // TODOs: @@ -190,18 +190,18 @@ class JavaStreamingContext(val ssc: StreamingContext) { * @param directory HDFS-compatible directory where the checkpoint data will be reliably stored * @param interval checkpoint interval */ - def checkpoint(directory: String, interval: Time = null) { + def checkpoint(directory: String, interval: Duration = null) { ssc.checkpoint(directory, interval) } /** * Sets each DStreams in this context to remember RDDs it generated in the last given duration. - * DStreams remember RDDs only for a limited duration of time and releases them for garbage + * DStreams remember RDDs only for a limited duration of duration and releases them for garbage * collection. This method allows the developer to specify how to long to remember the RDDs ( * if the developer wishes to query old data outside the DStream computation). * @param duration Minimum duration that each DStream should remember its RDDs */ - def remember(duration: Time) { + def remember(duration: Duration) { ssc.remember(duration) } |