aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-01-11 12:34:36 -0800
committerPatrick Wendell <pwendell@gmail.com>2013-01-14 09:42:36 -0800
commita292ed8d8af069ee1318cdf7c00d3db8d3ba8db9 (patch)
tree158e165b3bd60dc196542f8679fa7f70e4253091 /streaming/src
parent3461cd99b7b680be9c9dc263382b42f30c9edd7d (diff)
downloadspark-a292ed8d8af069ee1318cdf7c00d3db8d3ba8db9.tar.gz
spark-a292ed8d8af069ee1318cdf7c00d3db8d3ba8db9.tar.bz2
spark-a292ed8d8af069ee1318cdf7c00d3db8d3ba8db9.zip
Some style cleanup
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala9
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala137
2 files changed, 93 insertions, 53 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
index 91bcca9afa..80d8865725 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala
@@ -97,10 +97,11 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
* window() operation. This is equivalent to window(windowDuration, slideDuration).reduce(reduceFunc)
*/
def reduceByWindow(
- reduceFunc: JFunction2[T, T, T],
- invReduceFunc: JFunction2[T, T, T],
- windowDuration: Duration,
- slideDuration: Duration): JavaDStream[T] = {
+ reduceFunc: JFunction2[T, T, T],
+ invReduceFunc: JFunction2[T, T, T],
+ windowDuration: Duration,
+ slideDuration: Duration
+ ): JavaDStream[T] = {
dstream.reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration)
}
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
index b25a3f109c..eeb1f07939 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
@@ -101,9 +101,10 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.reduceByKey(func, numPartitions)
def combineByKey[C](createCombiner: JFunction[V, C],
- mergeValue: JFunction2[C, V, C],
- mergeCombiners: JFunction2[C, C, C],
- partitioner: Partitioner): JavaPairDStream[K, C] = {
+ mergeValue: JFunction2[C, V, C],
+ mergeCombiners: JFunction2[C, C, C],
+ partitioner: Partitioner
+ ): JavaPairDStream[K, C] = {
implicit val cm: ClassManifest[C] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[C]]
dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner)
@@ -117,18 +118,24 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
JavaPairDStream.scalaToJavaLong(dstream.countByKey());
}
- def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): JavaPairDStream[K, JList[V]] = {
+ def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration)
+ : JavaPairDStream[K, JList[V]] = {
dstream.groupByKeyAndWindow(windowDuration, slideDuration).mapValues(seqAsJavaList _)
}
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int)
:JavaPairDStream[K, JList[V]] = {
- dstream.groupByKeyAndWindow(windowDuration, slideDuration, numPartitions).mapValues(seqAsJavaList _)
+ dstream.groupByKeyAndWindow(windowDuration, slideDuration, numPartitions)
+ .mapValues(seqAsJavaList _)
}
- def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner)
- :JavaPairDStream[K, JList[V]] = {
- dstream.groupByKeyAndWindow(windowDuration, slideDuration, partitioner).mapValues(seqAsJavaList _)
+ def groupByKeyAndWindow(
+ windowDuration: Duration,
+ slideDuration: Duration,
+ partitioner: Partitioner
+ ):JavaPairDStream[K, JList[V]] = {
+ dstream.groupByKeyAndWindow(windowDuration, slideDuration, partitioner)
+ .mapValues(seqAsJavaList _)
}
def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowDuration: Duration)
@@ -136,46 +143,78 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.reduceByKeyAndWindow(reduceFunc, windowDuration)
}
- def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowDuration: Duration, slideDuration: Duration)
- :JavaPairDStream[K, V] = {
+ def reduceByKeyAndWindow(
+ reduceFunc: Function2[V, V, V],
+ windowDuration: Duration,
+ slideDuration: Duration
+ ):JavaPairDStream[K, V] = {
dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration)
}
def reduceByKeyAndWindow(
- reduceFunc: Function2[V, V, V],
- windowDuration: Duration,
- slideDuration: Duration,
- numPartitions: Int): JavaPairDStream[K, V] = {
+ reduceFunc: Function2[V, V, V],
+ windowDuration: Duration,
+ slideDuration: Duration,
+ numPartitions: Int
+ ): JavaPairDStream[K, V] = {
dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, numPartitions)
}
- def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowDuration: Duration, slideDuration: Duration,
- partitioner: Partitioner): JavaPairDStream[K, V] = {
+ def reduceByKeyAndWindow(
+ reduceFunc: Function2[V, V, V],
+ windowDuration: Duration,
+ slideDuration: Duration,
+ partitioner: Partitioner
+ ): JavaPairDStream[K, V] = {
dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, partitioner)
}
- def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], invReduceFunc: Function2[V, V, V],
- windowDuration: Duration, slideDuration: Duration): JavaPairDStream[K, V] = {
+ def reduceByKeyAndWindow(
+ reduceFunc: Function2[V, V, V],
+ invReduceFunc: Function2[V, V, V],
+ windowDuration: Duration,
+ slideDuration: Duration
+ ): JavaPairDStream[K, V] = {
dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration)
}
- def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], invReduceFunc: Function2[V, V, V],
- windowDuration: Duration, slideDuration: Duration, numPartitions: Int): JavaPairDStream[K, V] = {
- dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, numPartitions)
+ def reduceByKeyAndWindow(
+ reduceFunc: Function2[V, V, V],
+ invReduceFunc: Function2[V, V, V],
+ windowDuration: Duration,
+ slideDuration: Duration,
+ numPartitions: Int
+ ): JavaPairDStream[K, V] = {
+ dstream.reduceByKeyAndWindow(
+ reduceFunc,
+ invReduceFunc,
+ windowDuration,
+ slideDuration,
+ numPartitions)
}
- def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], invReduceFunc: Function2[V, V, V],
- windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner)
- : JavaPairDStream[K, V] = {
- dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, partitioner)
+ def reduceByKeyAndWindow(
+ reduceFunc: Function2[V, V, V],
+ invReduceFunc: Function2[V, V, V],
+ windowDuration: Duration,
+ slideDuration: Duration,
+ partitioner: Partitioner
+ ): JavaPairDStream[K, V] = {
+ dstream.reduceByKeyAndWindow(
+ reduceFunc,
+ invReduceFunc,
+ windowDuration,
+ slideDuration,
+ partitioner)
}
- def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): JavaPairDStream[K, JLong] = {
+ def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration)
+ : JavaPairDStream[K, JLong] = {
JavaPairDStream.scalaToJavaLong(dstream.countByKeyAndWindow(windowDuration, slideDuration))
}
def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int)
- : JavaPairDStream[K, Long] = {
+ : JavaPairDStream[K, Long] = {
dstream.countByKeyAndWindow(windowDuration, slideDuration, numPartitions)
}
@@ -225,21 +264,21 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
def saveAsHadoopFiles(
- prefix: String,
- suffix: String,
- keyClass: Class[_],
- valueClass: Class[_],
- outputFormatClass: Class[_ <: OutputFormat[_, _]]) {
+ prefix: String,
+ suffix: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[_ <: OutputFormat[_, _]]) {
dstream.saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass)
}
def saveAsHadoopFiles(
- prefix: String,
- suffix: String,
- keyClass: Class[_],
- valueClass: Class[_],
- outputFormatClass: Class[_ <: OutputFormat[_, _]],
- conf: JobConf) {
+ prefix: String,
+ suffix: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[_ <: OutputFormat[_, _]],
+ conf: JobConf) {
dstream.saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf)
}
@@ -248,21 +287,21 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
def saveAsNewAPIHadoopFiles(
- prefix: String,
- suffix: String,
- keyClass: Class[_],
- valueClass: Class[_],
- outputFormatClass: Class[_ <: NewOutputFormat[_, _]]) {
+ prefix: String,
+ suffix: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[_ <: NewOutputFormat[_, _]]) {
dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass)
}
def saveAsNewAPIHadoopFiles(
- prefix: String,
- suffix: String,
- keyClass: Class[_],
- valueClass: Class[_],
- outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
- conf: Configuration = new Configuration) {
+ prefix: String,
+ suffix: String,
+ keyClass: Class[_],
+ valueClass: Class[_],
+ outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
+ conf: Configuration = new Configuration) {
dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf)
}