aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/spark/DoubleRDDFunctions.scala
blob: 1fbf66b7ded3c2e16ed708159be075e12ea0e8e3 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package spark

import spark.partial.BoundedDouble
import spark.partial.MeanEvaluator
import spark.partial.PartialResult
import spark.partial.SumEvaluator

import spark.util.StatCounter

/**
 * Extra functions available on RDDs of Doubles through an implicit conversion.
 */
class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
  def sum(): Double = {
    self.reduce(_ + _)
  }

  def stats(): StatCounter = {
    self.mapPartitions(nums => Iterator(StatCounter(nums))).reduce((a, b) => a.merge(b))
  }

  def mean(): Double = stats().mean

  def variance(): Double = stats().variance

  def stdev(): Double = stats().stdev

  def meanApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
    val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
    val evaluator = new MeanEvaluator(self.splits.size, confidence)
    self.context.runApproximateJob(self, processPartition, evaluator, timeout)
  }

  def sumApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
    val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
    val evaluator = new SumEvaluator(self.splits.size, confidence)
    self.context.runApproximateJob(self, processPartition, evaluator, timeout)
  }
}