diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-01-11 12:34:36 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-01-14 09:42:36 -0800 |
commit | a292ed8d8af069ee1318cdf7c00d3db8d3ba8db9 (patch) | |
tree | 158e165b3bd60dc196542f8679fa7f70e4253091 | |
parent | 3461cd99b7b680be9c9dc263382b42f30c9edd7d (diff) | |
download | spark-a292ed8d8af069ee1318cdf7c00d3db8d3ba8db9.tar.gz spark-a292ed8d8af069ee1318cdf7c00d3db8d3ba8db9.tar.bz2 spark-a292ed8d8af069ee1318cdf7c00d3db8d3ba8db9.zip |
Some style cleanup
-rw-r--r-- | streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala | 9 | ||||
-rw-r--r-- | streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala | 137 |
2 files changed, 93 insertions, 53 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala index 91bcca9afa..80d8865725 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaDStreamLike.scala @@ -97,10 +97,11 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable * window() operation. This is equivalent to window(windowDuration, slideDuration).reduce(reduceFunc) */ def reduceByWindow( - reduceFunc: JFunction2[T, T, T], - invReduceFunc: JFunction2[T, T, T], - windowDuration: Duration, - slideDuration: Duration): JavaDStream[T] = { + reduceFunc: JFunction2[T, T, T], + invReduceFunc: JFunction2[T, T, T], + windowDuration: Duration, + slideDuration: Duration + ): JavaDStream[T] = { dstream.reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration) } diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala index b25a3f109c..eeb1f07939 100644 --- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala @@ -101,9 +101,10 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( dstream.reduceByKey(func, numPartitions) def combineByKey[C](createCombiner: JFunction[V, C], - mergeValue: JFunction2[C, V, C], - mergeCombiners: JFunction2[C, C, C], - partitioner: Partitioner): JavaPairDStream[K, C] = { + mergeValue: JFunction2[C, V, C], + mergeCombiners: JFunction2[C, C, C], + partitioner: Partitioner + ): JavaPairDStream[K, C] = { implicit val cm: ClassManifest[C] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[C]] dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner) @@ -117,18 +118,24 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( JavaPairDStream.scalaToJavaLong(dstream.countByKey()); } - def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): JavaPairDStream[K, JList[V]] = { + def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration) + : JavaPairDStream[K, JList[V]] = { dstream.groupByKeyAndWindow(windowDuration, slideDuration).mapValues(seqAsJavaList _) } def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int) :JavaPairDStream[K, JList[V]] = { - dstream.groupByKeyAndWindow(windowDuration, slideDuration, numPartitions).mapValues(seqAsJavaList _) + dstream.groupByKeyAndWindow(windowDuration, slideDuration, numPartitions) + .mapValues(seqAsJavaList _) } - def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner) - :JavaPairDStream[K, JList[V]] = { - dstream.groupByKeyAndWindow(windowDuration, slideDuration, partitioner).mapValues(seqAsJavaList _) + def groupByKeyAndWindow( + windowDuration: Duration, + slideDuration: Duration, + partitioner: Partitioner + ):JavaPairDStream[K, JList[V]] = { + dstream.groupByKeyAndWindow(windowDuration, slideDuration, partitioner) + .mapValues(seqAsJavaList _) } def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowDuration: Duration) @@ -136,46 +143,78 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( dstream.reduceByKeyAndWindow(reduceFunc, windowDuration) } - def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowDuration: Duration, slideDuration: Duration) - :JavaPairDStream[K, V] = { + def reduceByKeyAndWindow( + reduceFunc: Function2[V, V, V], + windowDuration: Duration, + slideDuration: Duration + ):JavaPairDStream[K, V] = { dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration) } def reduceByKeyAndWindow( - reduceFunc: Function2[V, V, V], - windowDuration: Duration, - slideDuration: Duration, - numPartitions: Int): JavaPairDStream[K, V] = { + reduceFunc: Function2[V, V, V], + windowDuration: Duration, + slideDuration: Duration, + numPartitions: Int + ): JavaPairDStream[K, V] = { dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, numPartitions) } - def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowDuration: Duration, slideDuration: Duration, - partitioner: Partitioner): JavaPairDStream[K, V] = { + def reduceByKeyAndWindow( + reduceFunc: Function2[V, V, V], + windowDuration: Duration, + slideDuration: Duration, + partitioner: Partitioner + ): JavaPairDStream[K, V] = { dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, partitioner) } - def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], invReduceFunc: Function2[V, V, V], - windowDuration: Duration, slideDuration: Duration): JavaPairDStream[K, V] = { + def reduceByKeyAndWindow( + reduceFunc: Function2[V, V, V], + invReduceFunc: Function2[V, V, V], + windowDuration: Duration, + slideDuration: Duration + ): JavaPairDStream[K, V] = { dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration) } - def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], invReduceFunc: Function2[V, V, V], - windowDuration: Duration, slideDuration: Duration, numPartitions: Int): JavaPairDStream[K, V] = { - dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, numPartitions) + def reduceByKeyAndWindow( + reduceFunc: Function2[V, V, V], + invReduceFunc: Function2[V, V, V], + windowDuration: Duration, + slideDuration: Duration, + numPartitions: Int + ): JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow( + reduceFunc, + invReduceFunc, + windowDuration, + slideDuration, + numPartitions) } - def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], invReduceFunc: Function2[V, V, V], - windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner) - : JavaPairDStream[K, V] = { - dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, partitioner) + def reduceByKeyAndWindow( + reduceFunc: Function2[V, V, V], + invReduceFunc: Function2[V, V, V], + windowDuration: Duration, + slideDuration: Duration, + partitioner: Partitioner + ): JavaPairDStream[K, V] = { + dstream.reduceByKeyAndWindow( + reduceFunc, + invReduceFunc, + windowDuration, + slideDuration, + partitioner) } - def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): JavaPairDStream[K, JLong] = { + def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration) + : JavaPairDStream[K, JLong] = { JavaPairDStream.scalaToJavaLong(dstream.countByKeyAndWindow(windowDuration, slideDuration)) } def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int) - : JavaPairDStream[K, Long] = { + : JavaPairDStream[K, Long] = { dstream.countByKeyAndWindow(windowDuration, slideDuration, numPartitions) } @@ -225,21 +264,21 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } def saveAsHadoopFiles( - prefix: String, - suffix: String, - keyClass: Class[_], - valueClass: Class[_], - outputFormatClass: Class[_ <: OutputFormat[_, _]]) { + prefix: String, + suffix: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: OutputFormat[_, _]]) { dstream.saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass) } def saveAsHadoopFiles( - prefix: String, - suffix: String, - keyClass: Class[_], - valueClass: Class[_], - outputFormatClass: Class[_ <: OutputFormat[_, _]], - conf: JobConf) { + prefix: String, + suffix: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: OutputFormat[_, _]], + conf: JobConf) { dstream.saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf) } @@ -248,21 +287,21 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( } def saveAsNewAPIHadoopFiles( - prefix: String, - suffix: String, - keyClass: Class[_], - valueClass: Class[_], - outputFormatClass: Class[_ <: NewOutputFormat[_, _]]) { + prefix: String, + suffix: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: NewOutputFormat[_, _]]) { dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass) } def saveAsNewAPIHadoopFiles( - prefix: String, - suffix: String, - keyClass: Class[_], - valueClass: Class[_], - outputFormatClass: Class[_ <: NewOutputFormat[_, _]], - conf: Configuration = new Configuration) { + prefix: String, + suffix: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: NewOutputFormat[_, _]], + conf: Configuration = new Configuration) { dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf) } |