aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-01-06 17:17:32 -0800
committerReynold Xin <rxin@databricks.com>2016-01-06 17:17:32 -0800
commitac56cf605b61803c26e0004b43c703cca7e02d61 (patch)
tree948bf3a6d0a70b44cb9bce47d0e70f46258ad963 /streaming/src/main
parent917d3fc069fb9ea1c1487119c9c12b373f4f9b77 (diff)
downloadspark-ac56cf605b61803c26e0004b43c703cca7e02d61.tar.gz
spark-ac56cf605b61803c26e0004b43c703cca7e02d61.tar.bz2
spark-ac56cf605b61803c26e0004b43c703cca7e02d61.zip
[SPARK-12604][CORE] Java count(AprroxDistinct)ByKey methods return Scala Long not Java
Change Java countByKey, countApproxDistinctByKey return types to use Java Long, not Scala; update similar methods for consistency on java.long.Long.valueOf with no API change Author: Sean Owen <sowen@cloudera.com> Closes #10554 from srowen/SPARK-12604.
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala18
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala7
2 files changed, 13 insertions, 12 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
index 84acec7d8e..733147f63e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaDStreamLike.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming.api.java
-import java.lang.{Long => JLong}
+import java.{lang => jl}
import java.util.{List => JList}
import scala.collection.JavaConverters._
@@ -50,8 +50,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
def wrapRDD(in: RDD[T]): R
- implicit def scalaIntToJavaLong(in: DStream[Long]): JavaDStream[JLong] = {
- in.map(new JLong(_))
+ implicit def scalaIntToJavaLong(in: DStream[Long]): JavaDStream[jl.Long] = {
+ in.map(jl.Long.valueOf)
}
/**
@@ -74,14 +74,14 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* Return a new DStream in which each RDD has a single element generated by counting each RDD
* of this DStream.
*/
- def count(): JavaDStream[JLong] = dstream.count()
+ def count(): JavaDStream[jl.Long] = dstream.count()
/**
* Return a new DStream in which each RDD contains the counts of each distinct value in
* each RDD of this DStream. Hash partitioning is used to generate the RDDs with
* Spark's default number of partitions.
*/
- def countByValue(): JavaPairDStream[T, JLong] = {
+ def countByValue(): JavaPairDStream[T, jl.Long] = {
JavaPairDStream.scalaToJavaLong(dstream.countByValue())
}
@@ -91,7 +91,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* partitions.
* @param numPartitions number of partitions of each RDD in the new DStream.
*/
- def countByValue(numPartitions: Int): JavaPairDStream[T, JLong] = {
+ def countByValue(numPartitions: Int): JavaPairDStream[T, jl.Long] = {
JavaPairDStream.scalaToJavaLong(dstream.countByValue(numPartitions))
}
@@ -101,7 +101,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* of elements in a window over this DStream. windowDuration and slideDuration are as defined in
* the window() operation. This is equivalent to window(windowDuration, slideDuration).count()
*/
- def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[JLong] = {
+ def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[jl.Long] = {
dstream.countByWindow(windowDuration, slideDuration)
}
@@ -116,7 +116,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* DStream's batching interval
*/
def countByValueAndWindow(windowDuration: Duration, slideDuration: Duration)
- : JavaPairDStream[T, JLong] = {
+ : JavaPairDStream[T, jl.Long] = {
JavaPairDStream.scalaToJavaLong(
dstream.countByValueAndWindow(windowDuration, slideDuration))
}
@@ -133,7 +133,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* @param numPartitions number of partitions of each RDD in the new DStream.
*/
def countByValueAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int)
- : JavaPairDStream[T, JLong] = {
+ : JavaPairDStream[T, jl.Long] = {
JavaPairDStream.scalaToJavaLong(
dstream.countByValueAndWindow(windowDuration, slideDuration, numPartitions))
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index 2bf3ccec6b..af0d84b332 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -17,7 +17,8 @@
package org.apache.spark.streaming.api.java
-import java.lang.{Iterable => JIterable, Long => JLong}
+import java.{lang => jl}
+import java.lang.{Iterable => JIterable}
import java.util.{List => JList}
import scala.collection.JavaConverters._
@@ -847,7 +848,7 @@ object JavaPairDStream {
}
def scalaToJavaLong[K: ClassTag](dstream: JavaPairDStream[K, Long])
- : JavaPairDStream[K, JLong] = {
- DStream.toPairDStreamFunctions(dstream.dstream).mapValues(new JLong(_))
+ : JavaPairDStream[K, jl.Long] = {
+ DStream.toPairDStreamFunctions(dstream.dstream).mapValues(jl.Long.valueOf)
}
}