diff options
author | Reynold Xin <rxin@apache.org> | 2014-01-14 11:16:08 -0800 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-01-14 11:16:08 -0800 |
commit | 6a12b9ebc53d8851b9e1bc52452d4cc6d4c13ca1 (patch) | |
tree | e8cfbd6b6c5fec421545585db6bc686e3d5a126a | |
parent | 71b3007dbde2211ed1487c1c6d5f877c9a74fdb5 (diff) | |
download | spark-6a12b9ebc53d8851b9e1bc52452d4cc6d4c13ca1.tar.gz spark-6a12b9ebc53d8851b9e1bc52452d4cc6d4c13ca1.tar.bz2 spark-6a12b9ebc53d8851b9e1bc52452d4cc6d4c13ca1.zip |
Updated API doc for Accumulable and Accumulator.
-rw-r--r-- | core/src/main/scala/org/apache/spark/Accumulators.scala | 40 |
1 files changed, 31 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index 2ba871a600..fe1537a610 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -17,17 +17,17 @@ package org.apache.spark -import java.io._ +import java.io.ObjectInputStream import scala.collection.mutable.Map import scala.collection.generic.Growable import org.apache.spark.serializer.JavaSerializer /** - * A datatype that can be accumulated, ie has an commutative and associative "add" operation, + * A data type that can be accumulated, ie has an commutative and associative "add" operation, * but where the result type, `R`, may be different from the element type being added, `T`. * - * You must define how to add data, and how to merge two of these together. For some datatypes, + * You must define how to add data, and how to merge two of these together. For some data types, * such as a counter, these might be the same operation. In that case, you can use the simpler * [[org.apache.spark.Accumulator]]. They won't always be the same, though -- e.g., imagine you are * accumulating a set. You will add items to the set, and you will union two sets together. @@ -45,7 +45,7 @@ class Accumulable[R, T] ( val id = Accumulators.newId @transient private var value_ = initialValue // Current value on master val zero = param.zero(initialValue) // Zero value to be passed to workers - var deserialized = false + private var deserialized = false Accumulators.register(this, true) @@ -127,7 +127,7 @@ class Accumulable[R, T] ( /** * Helper object defining how to accumulate values of a particular type. An implicit - * AccumulableParam needs to be available when you create Accumulables of a specific type. + * AccumulableParam needs to be available when you create [[Accumulable]]s of a specific type. * * @tparam R the full accumulated data (result type) * @tparam T partial data that can be added in @@ -186,7 +186,29 @@ class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Ser /** * A simpler value of [[Accumulable]] where the result type being accumulated is the same - * as the types of elements being merged. + * as the types of elements being merged, i.e. variables that are only "added" to through an + * associative operation and can therefore be efficiently supported in parallel. They can be used + * to implement counters (as in MapReduce) or sums. Spark natively supports accumulators of type + * `Int` and `Double`, and programmers can add support for new types. + * + * An accumulator is created from an initial value `v` by calling [[SparkContext#accumulator]]. + * Tasks running on the cluster can then add to it using the [[Accumulable#+=]] operator. + * However, they cannot read its value. Only the driver program can read the accumulator's value, + * using its value method. + * + * The interpreter session below shows an accumulator being used to add up the elements of an array: + * + * {{{ + * scala> val accum = sc.accumulator(0) + * accum: spark.Accumulator[Int] = 0 + * + * scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x) + * ... + * 10/09/29 18:41:08 INFO SparkContext: Tasks finished in 0.317106 s + * + * scala> accum.value + * res2: Int = 10 + * }}} * * @param initialValue initial value of accumulator * @param param helper object defining how to add elements of type `T` @@ -196,9 +218,9 @@ class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T]) extends Accumulable[T,T](initialValue, param) /** - * A simpler version of [[org.apache.spark.AccumulableParam]] where the only datatype you can add in is the same type - * as the accumulated value. An implicit AccumulatorParam object needs to be available when you create - * Accumulators of a specific type. + * A simpler version of [[org.apache.spark.AccumulableParam]] where the only data type you can add + * in is the same type as the accumulated value. An implicit AccumulatorParam object needs to be + * available when you create Accumulators of a specific type. * * @tparam T type of value to accumulate */ |