aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala167
1 files changed, 167 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
new file mode 100644
index 0000000000..5fd1fab580
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkContext.doubleRDDToDoubleRDDFunctions
+import org.apache.spark.api.java.function.{Function => JFunction}
+import org.apache.spark.util.StatCounter
+import org.apache.spark.partial.{BoundedDouble, PartialResult}
+import org.apache.spark.storage.StorageLevel
+import java.lang.Double
+import org.apache.spark.Partitioner
+
+class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, JavaDoubleRDD] {
+
+ override val classManifest: ClassManifest[Double] = implicitly[ClassManifest[Double]]
+
+ override val rdd: RDD[Double] = srdd.map(x => Double.valueOf(x))
+
+ override def wrapRDD(rdd: RDD[Double]): JavaDoubleRDD =
+ new JavaDoubleRDD(rdd.map(_.doubleValue))
+
+ // Common RDD functions
+
+ import JavaDoubleRDD.fromRDD
+
+ /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
+ def cache(): JavaDoubleRDD = fromRDD(srdd.cache())
+
+ /**
+ * Set this RDD's storage level to persist its values across operations after the first time
+ * it is computed. Can only be called once on each RDD.
+ */
+ def persist(newLevel: StorageLevel): JavaDoubleRDD = fromRDD(srdd.persist(newLevel))
+
+ // first() has to be overriden here in order for its return type to be Double instead of Object.
+ override def first(): Double = srdd.first()
+
+ // Transformations (return a new RDD)
+
+ /**
+ * Return a new RDD containing the distinct elements in this RDD.
+ */
+ def distinct(): JavaDoubleRDD = fromRDD(srdd.distinct())
+
+ /**
+ * Return a new RDD containing the distinct elements in this RDD.
+ */
+ def distinct(numPartitions: Int): JavaDoubleRDD = fromRDD(srdd.distinct(numPartitions))
+
+ /**
+ * Return a new RDD containing only the elements that satisfy a predicate.
+ */
+ def filter(f: JFunction[Double, java.lang.Boolean]): JavaDoubleRDD =
+ fromRDD(srdd.filter(x => f(x).booleanValue()))
+
+ /**
+ * Return a new RDD that is reduced into `numPartitions` partitions.
+ */
+ def coalesce(numPartitions: Int): JavaDoubleRDD = fromRDD(srdd.coalesce(numPartitions))
+
+ /**
+ * Return a new RDD that is reduced into `numPartitions` partitions.
+ */
+ def coalesce(numPartitions: Int, shuffle: Boolean): JavaDoubleRDD =
+ fromRDD(srdd.coalesce(numPartitions, shuffle))
+
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ *
+ * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
+ * RDD will be <= us.
+ */
+ def subtract(other: JavaDoubleRDD): JavaDoubleRDD =
+ fromRDD(srdd.subtract(other))
+
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ */
+ def subtract(other: JavaDoubleRDD, numPartitions: Int): JavaDoubleRDD =
+ fromRDD(srdd.subtract(other, numPartitions))
+
+ /**
+ * Return an RDD with the elements from `this` that are not in `other`.
+ */
+ def subtract(other: JavaDoubleRDD, p: Partitioner): JavaDoubleRDD =
+ fromRDD(srdd.subtract(other, p))
+
+ /**
+ * Return a sampled subset of this RDD.
+ */
+ def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaDoubleRDD =
+ fromRDD(srdd.sample(withReplacement, fraction, seed))
+
+ /**
+ * Return the union of this RDD and another one. Any identical elements will appear multiple
+ * times (use `.distinct()` to eliminate them).
+ */
+ def union(other: JavaDoubleRDD): JavaDoubleRDD = fromRDD(srdd.union(other.srdd))
+
+ // Double RDD functions
+
+ /** Add up the elements in this RDD. */
+ def sum(): Double = srdd.sum()
+
+ /**
+ * Return a [[org.apache.spark.util.StatCounter]] object that captures the mean, variance and count
+ * of the RDD's elements in one operation.
+ */
+ def stats(): StatCounter = srdd.stats()
+
+ /** Compute the mean of this RDD's elements. */
+ def mean(): Double = srdd.mean()
+
+ /** Compute the variance of this RDD's elements. */
+ def variance(): Double = srdd.variance()
+
+ /** Compute the standard deviation of this RDD's elements. */
+ def stdev(): Double = srdd.stdev()
+
+ /**
+ * Compute the sample standard deviation of this RDD's elements (which corrects for bias in
+ * estimating the standard deviation by dividing by N-1 instead of N).
+ */
+ def sampleStdev(): Double = srdd.sampleStdev()
+
+ /**
+ * Compute the sample variance of this RDD's elements (which corrects for bias in
+ * estimating the standard variance by dividing by N-1 instead of N).
+ */
+ def sampleVariance(): Double = srdd.sampleVariance()
+
+ /** Return the approximate mean of the elements in this RDD. */
+ def meanApprox(timeout: Long, confidence: Double): PartialResult[BoundedDouble] =
+ srdd.meanApprox(timeout, confidence)
+
+ /** (Experimental) Approximate operation to return the mean within a timeout. */
+ def meanApprox(timeout: Long): PartialResult[BoundedDouble] = srdd.meanApprox(timeout)
+
+ /** (Experimental) Approximate operation to return the sum within a timeout. */
+ def sumApprox(timeout: Long, confidence: Double): PartialResult[BoundedDouble] =
+ srdd.sumApprox(timeout, confidence)
+
+ /** (Experimental) Approximate operation to return the sum within a timeout. */
+ def sumApprox(timeout: Long): PartialResult[BoundedDouble] = srdd.sumApprox(timeout)
+}
+
+object JavaDoubleRDD {
+ def fromRDD(rdd: RDD[scala.Double]): JavaDoubleRDD = new JavaDoubleRDD(rdd)
+
+ implicit def toRDD(rdd: JavaDoubleRDD): RDD[scala.Double] = rdd.srdd
+}