diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala | 73 |
1 files changed, 70 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala index 5fd1fab580..da30cf619a 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala @@ -17,18 +17,23 @@ package org.apache.spark.api.java +import scala.reflect.ClassTag + import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext.doubleRDDToDoubleRDDFunctions import org.apache.spark.api.java.function.{Function => JFunction} import org.apache.spark.util.StatCounter import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.storage.StorageLevel + import java.lang.Double import org.apache.spark.Partitioner +import scala.collection.JavaConverters._ + class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, JavaDoubleRDD] { - override val classManifest: ClassManifest[Double] = implicitly[ClassManifest[Double]] + override val classTag: ClassTag[Double] = implicitly[ClassTag[Double]] override val rdd: RDD[Double] = srdd.map(x => Double.valueOf(x)) @@ -42,12 +47,25 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): JavaDoubleRDD = fromRDD(srdd.cache()) - /** + /** * Set this RDD's storage level to persist its values across operations after the first time * it is computed. Can only be called once on each RDD. */ def persist(newLevel: StorageLevel): JavaDoubleRDD = fromRDD(srdd.persist(newLevel)) + /** + * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. + * This method blocks until all blocks are deleted. + */ + def unpersist(): JavaDoubleRDD = fromRDD(srdd.unpersist()) + + /** + * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. + * + * @param blocking Whether to block until all blocks are deleted. + */ + def unpersist(blocking: Boolean): JavaDoubleRDD = fromRDD(srdd.unpersist(blocking)) + // first() has to be overriden here in order for its return type to be Double instead of Object. override def first(): Double = srdd.first() @@ -81,8 +99,19 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav fromRDD(srdd.coalesce(numPartitions, shuffle)) /** + * Return a new RDD that has exactly numPartitions partitions. + * + * Can increase or decrease the level of parallelism in this RDD. Internally, this uses + * a shuffle to redistribute data. + * + * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, + * which can avoid performing a shuffle. + */ + def repartition(numPartitions: Int): JavaDoubleRDD = fromRDD(srdd.repartition(numPartitions)) + + /** * Return an RDD with the elements from `this` that are not in `other`. - * + * * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting * RDD will be <= us. */ @@ -158,6 +187,44 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav /** (Experimental) Approximate operation to return the sum within a timeout. */ def sumApprox(timeout: Long): PartialResult[BoundedDouble] = srdd.sumApprox(timeout) + + /** + * Compute a histogram of the data using bucketCount number of buckets evenly + * spaced between the minimum and maximum of the RDD. For example if the min + * value is 0 and the max is 100 and there are two buckets the resulting + * buckets will be [0,50) [50,100]. bucketCount must be at least 1 + * If the RDD contains infinity, NaN throws an exception + * If the elements in RDD do not vary (max == min) always returns a single bucket. + */ + def histogram(bucketCount: Int): Pair[Array[scala.Double], Array[Long]] = { + val result = srdd.histogram(bucketCount) + (result._1, result._2) + } + + /** + * Compute a histogram using the provided buckets. The buckets are all open + * to the left except for the last which is closed + * e.g. for the array + * [1,10,20,50] the buckets are [1,10) [10,20) [20,50] + * e.g 1<=x<10 , 10<=x<20, 20<=x<50 + * And on the input of 1 and 50 we would have a histogram of 1,0,0 + * + * Note: if your histogram is evenly spaced (e.g. [0, 10, 20, 30]) this can be switched + * from an O(log n) inseration to O(1) per element. (where n = # buckets) if you set evenBuckets + * to true. + * buckets must be sorted and not contain any duplicates. + * buckets array must be at least two elements + * All NaN entries are treated the same. If you have a NaN bucket it must be + * the maximum value of the last position and all NaN entries will be counted + * in that bucket. + */ + def histogram(buckets: Array[scala.Double]): Array[Long] = { + srdd.histogram(buckets, false) + } + + def histogram(buckets: Array[Double], evenBuckets: Boolean): Array[Long] = { + srdd.histogram(buckets.map(_.toDouble), evenBuckets) + } } object JavaDoubleRDD { |