aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-01-10 19:52:45 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-01-14 09:42:36 -0800
commit280b6d018691810bbb3dd3155f059132b4475995 (patch)
tree1197a3ec7737ed413e15a8153fd637b3462a96c3 /streaming/src/main
parentc2537057f9ed8723d2c33a1636edf9c9547cdc66 (diff)
downloadspark-280b6d018691810bbb3dd3155f059132b4475995.tar.gz
spark-280b6d018691810bbb3dd3155f059132b4475995.tar.bz2
spark-280b6d018691810bbb3dd3155f059132b4475995.zip
Porting to new Duration class
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala28
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala24
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala76
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala8
4 files changed, 68 insertions, 68 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
index 1e5c279e2c..f85864df5d 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStream.scala
@@ -1,6 +1,6 @@
package spark.streaming.api.java
-import spark.streaming.{Time, DStream}
+import spark.streaming.{Duration, Time, DStream}
import spark.api.java.function.{Function => JFunction}
import spark.api.java.JavaRDD
import java.util.{List => JList}
@@ -22,7 +22,7 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM
/** Persists the RDDs of this DStream with the given storage level */
def persist(storageLevel: StorageLevel): JavaDStream[T] = dstream.persist(storageLevel)
- /** Method that generates a RDD for the given time */
+ /** Method that generates a RDD for the given duration */
def compute(validTime: Time): JavaRDD[T] = {
dstream.compute(validTime) match {
case Some(rdd) => new JavaRDD(rdd)
@@ -33,34 +33,34 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM
/**
* Return a new DStream which is computed based on windowed batches of this DStream.
* The new DStream generates RDDs with the same interval as this DStream.
- * @param windowTime width of the window; must be a multiple of this DStream's interval.
+ * @param windowDuration width of the window; must be a multiple of this DStream's interval.
* @return
*/
- def window(windowTime: Time): JavaDStream[T] =
- dstream.window(windowTime)
+ def window(windowDuration: Duration): JavaDStream[T] =
+ dstream.window(windowDuration)
/**
* Return a new DStream which is computed based on windowed batches of this DStream.
- * @param windowTime duration (i.e., width) of the window;
+ * @param windowDuration duration (i.e., width) of the window;
* must be a multiple of this DStream's interval
- * @param slideTime sliding interval of the window (i.e., the interval after which
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's interval
*/
- def window(windowTime: Time, slideTime: Time): JavaDStream[T] =
- dstream.window(windowTime, slideTime)
+ def window(windowDuration: Duration, slideDuration: Duration): JavaDStream[T] =
+ dstream.window(windowDuration, slideDuration)
/**
* Returns a new DStream which computed based on tumbling window on this DStream.
- * This is equivalent to window(batchTime, batchTime).
- * @param batchTime tumbling window duration; must be a multiple of this DStream's interval
+ * This is equivalent to window(batchDuration, batchDuration).
+ * @param batchDuration tumbling window duration; must be a multiple of this DStream's interval
*/
- def tumble(batchTime: Time): JavaDStream[T] =
- dstream.tumble(batchTime)
+ def tumble(batchDuration: Duration): JavaDStream[T] =
+ dstream.tumble(batchDuration)
/**
* Returns a new DStream by unifying data of another DStream with this DStream.
- * @param that Another DStream having the same interval (i.e., slideTime) as this DStream.
+ * @param that Another DStream having the same interval (i.e., slideDuration) as this DStream.
*/
def union(that: JavaDStream[T]): JavaDStream[T] =
dstream.union(that.dstream)
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
index 23a0aaaefd..cb58c1351d 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
@@ -30,11 +30,11 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
/**
* Returns a new DStream in which each RDD has a single element generated by counting the number
- * of elements in a window over this DStream. windowTime and slideTime are as defined in the
- * window() operation. This is equivalent to window(windowTime, slideTime).count()
+ * of elements in a window over this DStream. windowDuration and slideDuration are as defined in the
+ * window() operation. This is equivalent to window(windowDuration, slideDuration).count()
*/
- def countByWindow(windowTime: Time, slideTime: Time) : JavaDStream[Int] = {
- dstream.countByWindow(windowTime, slideTime)
+ def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[Int] = {
+ dstream.countByWindow(windowDuration, slideDuration)
}
/**
@@ -88,22 +88,22 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
/**
* Returns a new DStream in which each RDD has a single element generated by reducing all
- * elements in a window over this DStream. windowTime and slideTime are as defined in the
- * window() operation. This is equivalent to window(windowTime, slideTime).reduce(reduceFunc)
+ * elements in a window over this DStream. windowDuration and slideDuration are as defined in the
+ * window() operation. This is equivalent to window(windowDuration, slideDuration).reduce(reduceFunc)
*/
def reduceByWindow(
reduceFunc: JFunction2[T, T, T],
invReduceFunc: JFunction2[T, T, T],
- windowTime: Time,
- slideTime: Time): JavaDStream[T] = {
- dstream.reduceByWindow(reduceFunc, invReduceFunc, windowTime, slideTime)
+ windowDuration: Duration,
+ slideDuration: Duration): JavaDStream[T] = {
+ dstream.reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration)
}
/**
- * Returns all the RDDs between 'fromTime' to 'toTime' (both included)
+ * Returns all the RDDs between 'fromDuration' to 'toDuration' (both included)
*/
- def slice(fromTime: Time, toTime: Time): JList[JavaRDD[T]] = {
- new util.ArrayList(dstream.slice(fromTime, toTime).map(new JavaRDD(_)).toSeq)
+ def slice(fromDuration: Duration, toDuration: Duration): JList[JavaRDD[T]] = {
+ new util.ArrayList(dstream.slice(fromDuration, toDuration).map(new JavaRDD(_)).toSeq)
}
/**
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
index fa46ca9267..03336d040d 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
@@ -38,7 +38,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/** Persists the RDDs of this DStream with the given storage level */
def persist(storageLevel: StorageLevel): JavaPairDStream[K, V] = dstream.persist(storageLevel)
- /** Method that generates a RDD for the given time */
+ /** Method that generates a RDD for the given Duration */
def compute(validTime: Time): JavaPairRDD[K, V] = {
dstream.compute(validTime) match {
case Some(rdd) => new JavaPairRDD(rdd)
@@ -49,34 +49,34 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
/**
* Return a new DStream which is computed based on windowed batches of this DStream.
* The new DStream generates RDDs with the same interval as this DStream.
- * @param windowTime width of the window; must be a multiple of this DStream's interval.
+ * @param windowDuration width of the window; must be a multiple of this DStream's interval.
* @return
*/
- def window(windowTime: Time): JavaPairDStream[K, V] =
- dstream.window(windowTime)
+ def window(windowDuration: Duration): JavaPairDStream[K, V] =
+ dstream.window(windowDuration)
/**
* Return a new DStream which is computed based on windowed batches of this DStream.
- * @param windowTime duration (i.e., width) of the window;
+ * @param windowDuration duration (i.e., width) of the window;
* must be a multiple of this DStream's interval
- * @param slideTime sliding interval of the window (i.e., the interval after which
+ * @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's interval
*/
- def window(windowTime: Time, slideTime: Time): JavaPairDStream[K, V] =
- dstream.window(windowTime, slideTime)
+ def window(windowDuration: Duration, slideDuration: Duration): JavaPairDStream[K, V] =
+ dstream.window(windowDuration, slideDuration)
/**
* Returns a new DStream which computed based on tumbling window on this DStream.
- * This is equivalent to window(batchTime, batchTime).
- * @param batchTime tumbling window duration; must be a multiple of this DStream's interval
+ * This is equivalent to window(batchDuration, batchDuration).
+ * @param batchDuration tumbling window duration; must be a multiple of this DStream's interval
*/
- def tumble(batchTime: Time): JavaPairDStream[K, V] =
- dstream.tumble(batchTime)
+ def tumble(batchDuration: Duration): JavaPairDStream[K, V] =
+ dstream.tumble(batchDuration)
/**
* Returns a new DStream by unifying data of another DStream with this DStream.
- * @param that Another DStream having the same interval (i.e., slideTime) as this DStream.
+ * @param that Another DStream having the same interval (i.e., slideDuration) as this DStream.
*/
def union(that: JavaPairDStream[K, V]): JavaPairDStream[K, V] =
dstream.union(that.dstream)
@@ -117,66 +117,66 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
JavaPairDStream.scalaToJavaLong(dstream.countByKey());
}
- def groupByKeyAndWindow(windowTime: Time, slideTime: Time): JavaPairDStream[K, JList[V]] = {
- dstream.groupByKeyAndWindow(windowTime, slideTime).mapValues(seqAsJavaList _)
+ def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): JavaPairDStream[K, JList[V]] = {
+ dstream.groupByKeyAndWindow(windowDuration, slideDuration).mapValues(seqAsJavaList _)
}
- def groupByKeyAndWindow(windowTime: Time, slideTime: Time, numPartitions: Int)
+ def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int)
:JavaPairDStream[K, JList[V]] = {
- dstream.groupByKeyAndWindow(windowTime, slideTime, numPartitions).mapValues(seqAsJavaList _)
+ dstream.groupByKeyAndWindow(windowDuration, slideDuration, numPartitions).mapValues(seqAsJavaList _)
}
- def groupByKeyAndWindow(windowTime: Time, slideTime: Time, partitioner: Partitioner)
+ def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner)
:JavaPairDStream[K, JList[V]] = {
- dstream.groupByKeyAndWindow(windowTime, slideTime, partitioner).mapValues(seqAsJavaList _)
+ dstream.groupByKeyAndWindow(windowDuration, slideDuration, partitioner).mapValues(seqAsJavaList _)
}
- def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowTime: Time)
+ def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowDuration: Duration)
:JavaPairDStream[K, V] = {
- dstream.reduceByKeyAndWindow(reduceFunc, windowTime)
+ dstream.reduceByKeyAndWindow(reduceFunc, windowDuration)
}
- def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowTime: Time, slideTime: Time)
+ def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowDuration: Duration, slideDuration: Duration)
:JavaPairDStream[K, V] = {
- dstream.reduceByKeyAndWindow(reduceFunc, windowTime, slideTime)
+ dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration)
}
def reduceByKeyAndWindow(
reduceFunc: Function2[V, V, V],
- windowTime: Time,
- slideTime: Time,
+ windowDuration: Duration,
+ slideDuration: Duration,
numPartitions: Int): JavaPairDStream[K, V] = {
- dstream.reduceByKeyAndWindow(reduceFunc, windowTime, slideTime, numPartitions)
+ dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, numPartitions)
}
- def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowTime: Time, slideTime: Time,
+ def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowDuration: Duration, slideDuration: Duration,
partitioner: Partitioner): JavaPairDStream[K, V] = {
- dstream.reduceByKeyAndWindow(reduceFunc, windowTime, slideTime, partitioner)
+ dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, partitioner)
}
def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], invReduceFunc: Function2[V, V, V],
- windowTime: Time, slideTime: Time): JavaPairDStream[K, V] = {
- dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowTime, slideTime)
+ windowDuration: Duration, slideDuration: Duration): JavaPairDStream[K, V] = {
+ dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration)
}
def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], invReduceFunc: Function2[V, V, V],
- windowTime: Time, slideTime: Time, numPartitions: Int): JavaPairDStream[K, V] = {
- dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowTime, slideTime, numPartitions)
+ windowDuration: Duration, slideDuration: Duration, numPartitions: Int): JavaPairDStream[K, V] = {
+ dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, numPartitions)
}
def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], invReduceFunc: Function2[V, V, V],
- windowTime: Time, slideTime: Time, partitioner: Partitioner)
+ windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner)
: JavaPairDStream[K, V] = {
- dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowTime, slideTime, partitioner)
+ dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, partitioner)
}
- def countByKeyAndWindow(windowTime: Time, slideTime: Time): JavaPairDStream[K, JLong] = {
- JavaPairDStream.scalaToJavaLong(dstream.countByKeyAndWindow(windowTime, slideTime))
+ def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): JavaPairDStream[K, JLong] = {
+ JavaPairDStream.scalaToJavaLong(dstream.countByKeyAndWindow(windowDuration, slideDuration))
}
- def countByKeyAndWindow(windowTime: Time, slideTime: Time, numPartitions: Int)
+ def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int)
: JavaPairDStream[K, Long] = {
- dstream.countByKeyAndWindow(windowTime, slideTime, numPartitions)
+ dstream.countByKeyAndWindow(windowDuration, slideDuration, numPartitions)
}
def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = {
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index e8cd03847a..5a712d18c7 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -12,7 +12,7 @@ import java.io.InputStream
import java.util.{Map => JMap}
class JavaStreamingContext(val ssc: StreamingContext) {
- def this(master: String, frameworkName: String, batchDuration: Time) =
+ def this(master: String, frameworkName: String, batchDuration: Duration) =
this(new StreamingContext(master, frameworkName, batchDuration))
// TODOs:
@@ -190,18 +190,18 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param directory HDFS-compatible directory where the checkpoint data will be reliably stored
* @param interval checkpoint interval
*/
- def checkpoint(directory: String, interval: Time = null) {
+ def checkpoint(directory: String, interval: Duration = null) {
ssc.checkpoint(directory, interval)
}
/**
* Sets each DStreams in this context to remember RDDs it generated in the last given duration.
- * DStreams remember RDDs only for a limited duration of time and releases them for garbage
+ * DStreams remember RDDs only for a limited duration of duration and releases them for garbage
* collection. This method allows the developer to specify how to long to remember the RDDs (
* if the developer wishes to query old data outside the DStream computation).
* @param duration Minimum duration that each DStream should remember its RDDs
*/
- def remember(duration: Time) {
+ def remember(duration: Duration) {
ssc.remember(duration)
}