aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-01-06 17:17:32 -0800
committerReynold Xin <rxin@databricks.com>2016-01-06 17:17:32 -0800
commitac56cf605b61803c26e0004b43c703cca7e02d61 (patch)
tree948bf3a6d0a70b44cb9bce47d0e70f46258ad963 /core
parent917d3fc069fb9ea1c1487119c9c12b373f4f9b77 (diff)
downloadspark-ac56cf605b61803c26e0004b43c703cca7e02d61.tar.gz
spark-ac56cf605b61803c26e0004b43c703cca7e02d61.tar.bz2
spark-ac56cf605b61803c26e0004b43c703cca7e02d61.zip
[SPARK-12604][CORE] Java count(AprroxDistinct)ByKey methods return Scala Long not Java
Change Java countByKey, countApproxDistinctByKey return types to use Java Long, not Scala; update similar methods for consistency on java.long.Long.valueOf with no API change Author: Sean Owen <sowen@cloudera.com> Closes #10554 from srowen/SPARK-12604.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala32
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala16
-rw-r--r--core/src/test/java/org/apache/spark/JavaAPISuite.java18
3 files changed, 36 insertions, 30 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 91dc18697c..76752e1fde 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -17,8 +17,9 @@
package org.apache.spark.api.java
+import java.{lang => jl}
import java.lang.{Iterable => JIterable}
-import java.util.{Comparator, List => JList, Map => JMap}
+import java.util.{Comparator, List => JList}
import scala.collection.JavaConverters._
import scala.language.implicitConversions
@@ -139,7 +140,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* math.ceil(numItems * samplingRate) over all key values.
*/
def sampleByKey(withReplacement: Boolean,
- fractions: JMap[K, Double],
+ fractions: java.util.Map[K, Double],
seed: Long): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.sampleByKey(withReplacement, fractions.asScala, seed))
@@ -154,7 +155,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* Use Utils.random.nextLong as the default seed for the random number generator.
*/
def sampleByKey(withReplacement: Boolean,
- fractions: JMap[K, Double]): JavaPairRDD[K, V] =
+ fractions: java.util.Map[K, Double]): JavaPairRDD[K, V] =
sampleByKey(withReplacement, fractions, Utils.random.nextLong)
/**
@@ -168,7 +169,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* two additional passes.
*/
def sampleByKeyExact(withReplacement: Boolean,
- fractions: JMap[K, Double],
+ fractions: java.util.Map[K, Double],
seed: Long): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd.sampleByKeyExact(withReplacement, fractions.asScala, seed))
@@ -184,7 +185,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
*
* Use Utils.random.nextLong as the default seed for the random number generator.
*/
- def sampleByKeyExact(withReplacement: Boolean, fractions: JMap[K, Double]): JavaPairRDD[K, V] =
+ def sampleByKeyExact(
+ withReplacement: Boolean,
+ fractions: java.util.Map[K, Double]): JavaPairRDD[K, V] =
sampleByKeyExact(withReplacement, fractions, Utils.random.nextLong)
/**
@@ -292,7 +295,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
mapAsSerializableJavaMap(rdd.reduceByKeyLocally(func))
/** Count the number of elements for each key, and return the result to the master as a Map. */
- def countByKey(): java.util.Map[K, Long] = mapAsSerializableJavaMap(rdd.countByKey())
+ def countByKey(): java.util.Map[K, jl.Long] =
+ mapAsSerializableJavaMap(rdd.countByKey().mapValues(jl.Long.valueOf))
/**
* Approximate version of countByKey that can return a partial result if it does
@@ -934,9 +938,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* It must be greater than 0.000017.
* @param partitioner partitioner of the resulting RDD.
*/
- def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaPairRDD[K, Long] =
- {
- fromRDD(rdd.countApproxDistinctByKey(relativeSD, partitioner))
+ def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner)
+ : JavaPairRDD[K, jl.Long] = {
+ fromRDD(rdd.countApproxDistinctByKey(relativeSD, partitioner)).
+ asInstanceOf[JavaPairRDD[K, jl.Long]]
}
/**
@@ -950,8 +955,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* It must be greater than 0.000017.
* @param numPartitions number of partitions of the resulting RDD.
*/
- def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaPairRDD[K, Long] = {
- fromRDD(rdd.countApproxDistinctByKey(relativeSD, numPartitions))
+ def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaPairRDD[K, jl.Long] = {
+ fromRDD(rdd.countApproxDistinctByKey(relativeSD, numPartitions)).
+ asInstanceOf[JavaPairRDD[K, jl.Long]]
}
/**
@@ -964,8 +970,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
* It must be greater than 0.000017.
*/
- def countApproxDistinctByKey(relativeSD: Double): JavaPairRDD[K, Long] = {
- fromRDD(rdd.countApproxDistinctByKey(relativeSD))
+ def countApproxDistinctByKey(relativeSD: Double): JavaPairRDD[K, jl.Long] = {
+ fromRDD(rdd.countApproxDistinctByKey(relativeSD)).asInstanceOf[JavaPairRDD[K, jl.Long]]
}
/** Assign a name to this RDD */
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 6d3485d88a..1b1a9dce39 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -18,7 +18,7 @@
package org.apache.spark.api.java
import java.{lang => jl}
-import java.lang.{Iterable => JIterable, Long => JLong}
+import java.lang.{Iterable => JIterable}
import java.util.{Comparator, Iterator => JIterator, List => JList}
import scala.collection.JavaConverters._
@@ -305,8 +305,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method
* won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]].
*/
- def zipWithUniqueId(): JavaPairRDD[T, JLong] = {
- JavaPairRDD.fromRDD(rdd.zipWithUniqueId()).asInstanceOf[JavaPairRDD[T, JLong]]
+ def zipWithUniqueId(): JavaPairRDD[T, jl.Long] = {
+ JavaPairRDD.fromRDD(rdd.zipWithUniqueId()).asInstanceOf[JavaPairRDD[T, jl.Long]]
}
/**
@@ -316,8 +316,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type.
* This method needs to trigger a spark job when this RDD contains more than one partitions.
*/
- def zipWithIndex(): JavaPairRDD[T, JLong] = {
- JavaPairRDD.fromRDD(rdd.zipWithIndex()).asInstanceOf[JavaPairRDD[T, JLong]]
+ def zipWithIndex(): JavaPairRDD[T, jl.Long] = {
+ JavaPairRDD.fromRDD(rdd.zipWithIndex()).asInstanceOf[JavaPairRDD[T, jl.Long]]
}
// Actions (launch a job to return a value to the user program)
@@ -448,7 +448,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* combine step happens locally on the master, equivalent to running a single reduce task.
*/
def countByValue(): java.util.Map[T, jl.Long] =
- mapAsSerializableJavaMap(rdd.countByValue().map((x => (x._1, new jl.Long(x._2)))))
+ mapAsSerializableJavaMap(rdd.countByValue().mapValues(jl.Long.valueOf))
/**
* (Experimental) Approximate version of countByValue().
@@ -631,8 +631,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* The asynchronous version of `count`, which returns a
* future for counting the number of elements in this RDD.
*/
- def countAsync(): JavaFutureAction[JLong] = {
- new JavaFutureActionWrapper[Long, JLong](rdd.countAsync(), JLong.valueOf)
+ def countAsync(): JavaFutureAction[jl.Long] = {
+ new JavaFutureActionWrapper[Long, jl.Long](rdd.countAsync(), jl.Long.valueOf)
}
/**
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 502f86f178..47382e4231 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -1580,11 +1580,11 @@ public class JavaAPISuite implements Serializable {
}
double relativeSD = 0.001;
JavaPairRDD<Integer, Integer> pairRdd = sc.parallelizePairs(arrayData);
- List<Tuple2<Integer, Object>> res = pairRdd.countApproxDistinctByKey(relativeSD, 8).collect();
- for (Tuple2<Integer, Object> resItem : res) {
- double count = (double)resItem._1();
- Long resCount = (Long)resItem._2();
- Double error = Math.abs((resCount - count) / count);
+ List<Tuple2<Integer, Long>> res = pairRdd.countApproxDistinctByKey(relativeSD, 8).collect();
+ for (Tuple2<Integer, Long> resItem : res) {
+ double count = resItem._1();
+ long resCount = resItem._2();
+ double error = Math.abs((resCount - count) / count);
Assert.assertTrue(error < 0.1);
}
@@ -1633,12 +1633,12 @@ public class JavaAPISuite implements Serializable {
fractions.put(0, 0.5);
fractions.put(1, 1.0);
JavaPairRDD<Integer, Integer> wr = rdd2.sampleByKey(true, fractions, 1L);
- Map<Integer, Long> wrCounts = (Map<Integer, Long>) (Object) wr.countByKey();
+ Map<Integer, Long> wrCounts = wr.countByKey();
Assert.assertEquals(2, wrCounts.size());
Assert.assertTrue(wrCounts.get(0) > 0);
Assert.assertTrue(wrCounts.get(1) > 0);
JavaPairRDD<Integer, Integer> wor = rdd2.sampleByKey(false, fractions, 1L);
- Map<Integer, Long> worCounts = (Map<Integer, Long>) (Object) wor.countByKey();
+ Map<Integer, Long> worCounts = wor.countByKey();
Assert.assertEquals(2, worCounts.size());
Assert.assertTrue(worCounts.get(0) > 0);
Assert.assertTrue(worCounts.get(1) > 0);
@@ -1659,12 +1659,12 @@ public class JavaAPISuite implements Serializable {
fractions.put(0, 0.5);
fractions.put(1, 1.0);
JavaPairRDD<Integer, Integer> wrExact = rdd2.sampleByKeyExact(true, fractions, 1L);
- Map<Integer, Long> wrExactCounts = (Map<Integer, Long>) (Object) wrExact.countByKey();
+ Map<Integer, Long> wrExactCounts = wrExact.countByKey();
Assert.assertEquals(2, wrExactCounts.size());
Assert.assertTrue(wrExactCounts.get(0) == 2);
Assert.assertTrue(wrExactCounts.get(1) == 4);
JavaPairRDD<Integer, Integer> worExact = rdd2.sampleByKeyExact(false, fractions, 1L);
- Map<Integer, Long> worExactCounts = (Map<Integer, Long>) (Object) worExact.countByKey();
+ Map<Integer, Long> worExactCounts = worExact.countByKey();
Assert.assertEquals(2, worExactCounts.size());
Assert.assertTrue(worExactCounts.get(0) == 2);
Assert.assertTrue(worExactCounts.get(1) == 4);