diff options
author | Sean Owen <sowen@cloudera.com> | 2016-01-06 17:17:32 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-01-06 17:17:32 -0800 |
commit | ac56cf605b61803c26e0004b43c703cca7e02d61 (patch) | |
tree | 948bf3a6d0a70b44cb9bce47d0e70f46258ad963 /core | |
parent | 917d3fc069fb9ea1c1487119c9c12b373f4f9b77 (diff) | |
download | spark-ac56cf605b61803c26e0004b43c703cca7e02d61.tar.gz spark-ac56cf605b61803c26e0004b43c703cca7e02d61.tar.bz2 spark-ac56cf605b61803c26e0004b43c703cca7e02d61.zip |
[SPARK-12604][CORE] Java count(AprroxDistinct)ByKey methods return Scala Long not Java
Change Java countByKey, countApproxDistinctByKey return types to use Java Long, not Scala; update similar methods for consistency on java.long.Long.valueOf with no API change
Author: Sean Owen <sowen@cloudera.com>
Closes #10554 from srowen/SPARK-12604.
Diffstat (limited to 'core')
3 files changed, 36 insertions, 30 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 91dc18697c..76752e1fde 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -17,8 +17,9 @@ package org.apache.spark.api.java +import java.{lang => jl} import java.lang.{Iterable => JIterable} -import java.util.{Comparator, List => JList, Map => JMap} +import java.util.{Comparator, List => JList} import scala.collection.JavaConverters._ import scala.language.implicitConversions @@ -139,7 +140,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * math.ceil(numItems * samplingRate) over all key values. */ def sampleByKey(withReplacement: Boolean, - fractions: JMap[K, Double], + fractions: java.util.Map[K, Double], seed: Long): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.sampleByKey(withReplacement, fractions.asScala, seed)) @@ -154,7 +155,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * Use Utils.random.nextLong as the default seed for the random number generator. */ def sampleByKey(withReplacement: Boolean, - fractions: JMap[K, Double]): JavaPairRDD[K, V] = + fractions: java.util.Map[K, Double]): JavaPairRDD[K, V] = sampleByKey(withReplacement, fractions, Utils.random.nextLong) /** @@ -168,7 +169,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * two additional passes. */ def sampleByKeyExact(withReplacement: Boolean, - fractions: JMap[K, Double], + fractions: java.util.Map[K, Double], seed: Long): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.sampleByKeyExact(withReplacement, fractions.asScala, seed)) @@ -184,7 +185,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * * Use Utils.random.nextLong as the default seed for the random number generator. */ - def sampleByKeyExact(withReplacement: Boolean, fractions: JMap[K, Double]): JavaPairRDD[K, V] = + def sampleByKeyExact( + withReplacement: Boolean, + fractions: java.util.Map[K, Double]): JavaPairRDD[K, V] = sampleByKeyExact(withReplacement, fractions, Utils.random.nextLong) /** @@ -292,7 +295,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) mapAsSerializableJavaMap(rdd.reduceByKeyLocally(func)) /** Count the number of elements for each key, and return the result to the master as a Map. */ - def countByKey(): java.util.Map[K, Long] = mapAsSerializableJavaMap(rdd.countByKey()) + def countByKey(): java.util.Map[K, jl.Long] = + mapAsSerializableJavaMap(rdd.countByKey().mapValues(jl.Long.valueOf)) /** * Approximate version of countByKey that can return a partial result if it does @@ -934,9 +938,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * It must be greater than 0.000017. * @param partitioner partitioner of the resulting RDD. */ - def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaPairRDD[K, Long] = - { - fromRDD(rdd.countApproxDistinctByKey(relativeSD, partitioner)) + def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner) + : JavaPairRDD[K, jl.Long] = { + fromRDD(rdd.countApproxDistinctByKey(relativeSD, partitioner)). + asInstanceOf[JavaPairRDD[K, jl.Long]] } /** @@ -950,8 +955,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * It must be greater than 0.000017. * @param numPartitions number of partitions of the resulting RDD. */ - def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaPairRDD[K, Long] = { - fromRDD(rdd.countApproxDistinctByKey(relativeSD, numPartitions)) + def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaPairRDD[K, jl.Long] = { + fromRDD(rdd.countApproxDistinctByKey(relativeSD, numPartitions)). + asInstanceOf[JavaPairRDD[K, jl.Long]] } /** @@ -964,8 +970,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * @param relativeSD Relative accuracy. Smaller values create counters that require more space. * It must be greater than 0.000017. */ - def countApproxDistinctByKey(relativeSD: Double): JavaPairRDD[K, Long] = { - fromRDD(rdd.countApproxDistinctByKey(relativeSD)) + def countApproxDistinctByKey(relativeSD: Double): JavaPairRDD[K, jl.Long] = { + fromRDD(rdd.countApproxDistinctByKey(relativeSD)).asInstanceOf[JavaPairRDD[K, jl.Long]] } /** Assign a name to this RDD */ diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 6d3485d88a..1b1a9dce39 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -18,7 +18,7 @@ package org.apache.spark.api.java import java.{lang => jl} -import java.lang.{Iterable => JIterable, Long => JLong} +import java.lang.{Iterable => JIterable} import java.util.{Comparator, Iterator => JIterator, List => JList} import scala.collection.JavaConverters._ @@ -305,8 +305,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method * won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]]. */ - def zipWithUniqueId(): JavaPairRDD[T, JLong] = { - JavaPairRDD.fromRDD(rdd.zipWithUniqueId()).asInstanceOf[JavaPairRDD[T, JLong]] + def zipWithUniqueId(): JavaPairRDD[T, jl.Long] = { + JavaPairRDD.fromRDD(rdd.zipWithUniqueId()).asInstanceOf[JavaPairRDD[T, jl.Long]] } /** @@ -316,8 +316,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type. * This method needs to trigger a spark job when this RDD contains more than one partitions. */ - def zipWithIndex(): JavaPairRDD[T, JLong] = { - JavaPairRDD.fromRDD(rdd.zipWithIndex()).asInstanceOf[JavaPairRDD[T, JLong]] + def zipWithIndex(): JavaPairRDD[T, jl.Long] = { + JavaPairRDD.fromRDD(rdd.zipWithIndex()).asInstanceOf[JavaPairRDD[T, jl.Long]] } // Actions (launch a job to return a value to the user program) @@ -448,7 +448,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * combine step happens locally on the master, equivalent to running a single reduce task. */ def countByValue(): java.util.Map[T, jl.Long] = - mapAsSerializableJavaMap(rdd.countByValue().map((x => (x._1, new jl.Long(x._2))))) + mapAsSerializableJavaMap(rdd.countByValue().mapValues(jl.Long.valueOf)) /** * (Experimental) Approximate version of countByValue(). @@ -631,8 +631,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * The asynchronous version of `count`, which returns a * future for counting the number of elements in this RDD. */ - def countAsync(): JavaFutureAction[JLong] = { - new JavaFutureActionWrapper[Long, JLong](rdd.countAsync(), JLong.valueOf) + def countAsync(): JavaFutureAction[jl.Long] = { + new JavaFutureActionWrapper[Long, jl.Long](rdd.countAsync(), jl.Long.valueOf) } /** diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 502f86f178..47382e4231 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -1580,11 +1580,11 @@ public class JavaAPISuite implements Serializable { } double relativeSD = 0.001; JavaPairRDD<Integer, Integer> pairRdd = sc.parallelizePairs(arrayData); - List<Tuple2<Integer, Object>> res = pairRdd.countApproxDistinctByKey(relativeSD, 8).collect(); - for (Tuple2<Integer, Object> resItem : res) { - double count = (double)resItem._1(); - Long resCount = (Long)resItem._2(); - Double error = Math.abs((resCount - count) / count); + List<Tuple2<Integer, Long>> res = pairRdd.countApproxDistinctByKey(relativeSD, 8).collect(); + for (Tuple2<Integer, Long> resItem : res) { + double count = resItem._1(); + long resCount = resItem._2(); + double error = Math.abs((resCount - count) / count); Assert.assertTrue(error < 0.1); } @@ -1633,12 +1633,12 @@ public class JavaAPISuite implements Serializable { fractions.put(0, 0.5); fractions.put(1, 1.0); JavaPairRDD<Integer, Integer> wr = rdd2.sampleByKey(true, fractions, 1L); - Map<Integer, Long> wrCounts = (Map<Integer, Long>) (Object) wr.countByKey(); + Map<Integer, Long> wrCounts = wr.countByKey(); Assert.assertEquals(2, wrCounts.size()); Assert.assertTrue(wrCounts.get(0) > 0); Assert.assertTrue(wrCounts.get(1) > 0); JavaPairRDD<Integer, Integer> wor = rdd2.sampleByKey(false, fractions, 1L); - Map<Integer, Long> worCounts = (Map<Integer, Long>) (Object) wor.countByKey(); + Map<Integer, Long> worCounts = wor.countByKey(); Assert.assertEquals(2, worCounts.size()); Assert.assertTrue(worCounts.get(0) > 0); Assert.assertTrue(worCounts.get(1) > 0); @@ -1659,12 +1659,12 @@ public class JavaAPISuite implements Serializable { fractions.put(0, 0.5); fractions.put(1, 1.0); JavaPairRDD<Integer, Integer> wrExact = rdd2.sampleByKeyExact(true, fractions, 1L); - Map<Integer, Long> wrExactCounts = (Map<Integer, Long>) (Object) wrExact.countByKey(); + Map<Integer, Long> wrExactCounts = wrExact.countByKey(); Assert.assertEquals(2, wrExactCounts.size()); Assert.assertTrue(wrExactCounts.get(0) == 2); Assert.assertTrue(wrExactCounts.get(1) == 4); JavaPairRDD<Integer, Integer> worExact = rdd2.sampleByKeyExact(false, fractions, 1L); - Map<Integer, Long> worExactCounts = (Map<Integer, Long>) (Object) worExact.countByKey(); + Map<Integer, Long> worExactCounts = worExact.countByKey(); Assert.assertEquals(2, worExactCounts.size()); Assert.assertTrue(worExactCounts.get(0) == 2); Assert.assertTrue(worExactCounts.get(1) == 4); |