From 8d7b77bcb545e7e1167cf6e4a010809d5bd76c5a Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Fri, 12 Oct 2012 17:53:20 -0700 Subject: Some doc and usability improvements: - Added a StorageLevels class for easy access to StorageLevel constants in Java - Added doc comments on Function classes in Java - Updated Accumulator and HadoopWriter docs slightly --- core/src/main/scala/spark/Accumulators.scala | 56 ++++++++++++++-------- core/src/main/scala/spark/HadoopWriter.scala | 2 +- .../main/scala/spark/api/java/StorageLevels.java | 20 ++++++++ .../api/java/function/DoubleFlatMapFunction.java | 3 ++ .../spark/api/java/function/DoubleFunction.java | 3 ++ .../spark/api/java/function/FlatMapFunction.scala | 3 ++ .../scala/spark/api/java/function/Function.java | 5 +- .../scala/spark/api/java/function/Function2.java | 3 ++ .../api/java/function/PairFlatMapFunction.java | 4 ++ .../spark/api/java/function/PairFunction.java | 3 ++ .../spark/api/java/function/VoidFunction.scala | 3 ++ 11 files changed, 83 insertions(+), 22 deletions(-) create mode 100644 core/src/main/scala/spark/api/java/StorageLevels.java (limited to 'core/src/main') diff --git a/core/src/main/scala/spark/Accumulators.scala b/core/src/main/scala/spark/Accumulators.scala index 62186de80d..bacd0ace37 100644 --- a/core/src/main/scala/spark/Accumulators.scala +++ b/core/src/main/scala/spark/Accumulators.scala @@ -6,16 +6,17 @@ import scala.collection.mutable.Map import scala.collection.generic.Growable /** - * A datatype that can be accumulated, i.e. has an commutative and associative +. + * A datatype that can be accumulated, i.e. has an commutative and associative "add" operation, + * but where the result type, `R`, may be different from the element type being added, `T`. * - * You must define how to add data, and how to merge two of these together. For some datatypes, these might be - * the same operation (eg., a counter). In that case, you might want to use [[spark.AccumulatorParam]]. They won't - * always be the same, though -- eg., imagine you are accumulating a set. You will add items to the set, and you - * will union two sets together. + * You must define how to add data, and how to merge two of these together. For some datatypes, + * such as a counter, these might be the same operation. In that case, you can use the simpler + * [[spark.Accumulator]]. They won't always be the same, though -- e.g., imagine you are + * accumulating a set. You will add items to the set, and you will union two sets together. * * @param initialValue initial value of accumulator - * @param param helper object defining how to add elements of type `T` - * @tparam R the full accumulated data + * @param param helper object defining how to add elements of type `R` and `T` + * @tparam R the full accumulated data (result type) * @tparam T partial data that can be added in */ class Accumulable[R, T] ( @@ -44,6 +45,10 @@ class Accumulable[R, T] ( * @param term the other Accumulable that will get merged with this */ def ++= (term: R) { value_ = param.addInPlace(value_, term)} + + /** + * Access the accumulator's current value; only allowed on master. + */ def value = { if (!deserialized) value_ else throw new UnsupportedOperationException("Can't read accumulator value in task") @@ -60,6 +65,9 @@ class Accumulable[R, T] ( */ def localValue = value_ + /** + * Set the accumulator's value; only allowed on master. + */ def value_= (r: R) { if (!deserialized) value_ = r else throw new UnsupportedOperationException("Can't assign accumulator value in task") @@ -77,28 +85,37 @@ class Accumulable[R, T] ( } /** - * Helper object defining how to accumulate values of a particular type. + * Helper object defining how to accumulate values of a particular type. An implicit + * AccumulableParam needs to be available when you create Accumulables of a specific type. * - * @tparam R the full accumulated data + * @tparam R the full accumulated data (result type) * @tparam T partial data that can be added in */ trait AccumulableParam[R, T] extends Serializable { /** - * Add additional data to the accumulator value. + * Add additional data to the accumulator value. Is allowed to modify and return `r` + * for efficiency (to avoid allocating objects). + * * @param r the current value of the accumulator * @param t the data to be added to the accumulator * @return the new value of the accumulator */ - def addAccumulator(r: R, t: T) : R + def addAccumulator(r: R, t: T): R /** - * Merge two accumulated values together + * Merge two accumulated values together. Is allowed to modify and return the first value + * for efficiency (to avoid allocating objects). + * * @param r1 one set of accumulated data * @param r2 another set of accumulated data * @return both data sets merged together */ def addInPlace(r1: R, r2: R): R + /** + * Return the "zero" (identity) value for an accumulator type, given its initial value. For + * example, if R was a vector of N dimensions, this would return a vector of N zeroes. + */ def zero(initialValue: R): R } @@ -106,12 +123,12 @@ private[spark] class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable, T] extends AccumulableParam[R,T] { - def addAccumulator(growable: R, elem: T) : R = { + def addAccumulator(growable: R, elem: T): R = { growable += elem growable } - def addInPlace(t1: R, t2: R) : R = { + def addInPlace(t1: R, t2: R): R = { t1 ++= t2 t1 } @@ -134,17 +151,18 @@ class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Ser * @param param helper object defining how to add elements of type `T` * @tparam T result type */ -class Accumulator[T]( - @transient initialValue: T, - param: AccumulatorParam[T]) extends Accumulable[T,T](initialValue, param) +class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T]) + extends Accumulable[T,T](initialValue, param) /** * A simpler version of [[spark.AccumulableParam]] where the only datatype you can add in is the same type - * as the accumulated value + * as the accumulated value. An implicit AccumulatorParam object needs to be available when you create + * Accumulators of a specific type. + * * @tparam T type of value to accumulate */ trait AccumulatorParam[T] extends AccumulableParam[T, T] { - def addAccumulator(t1: T, t2: T) : T = { + def addAccumulator(t1: T, t2: T): T = { addInPlace(t1, t2) } } diff --git a/core/src/main/scala/spark/HadoopWriter.scala b/core/src/main/scala/spark/HadoopWriter.scala index ca584d2d5a..ffe0f3c4a1 100644 --- a/core/src/main/scala/spark/HadoopWriter.scala +++ b/core/src/main/scala/spark/HadoopWriter.scala @@ -16,7 +16,7 @@ import spark.Logging import spark.SerializableWritable /** - * An internal helper class that saves an RDD using a Hadoop OutputFormat. This is only public + * Internal helper class that saves an RDD using a Hadoop OutputFormat. This is only public * because we need to access this class from the `spark` package to use some package-private Hadoop * functions, but this class should not be used directly by users. * diff --git a/core/src/main/scala/spark/api/java/StorageLevels.java b/core/src/main/scala/spark/api/java/StorageLevels.java new file mode 100644 index 0000000000..722af3c06c --- /dev/null +++ b/core/src/main/scala/spark/api/java/StorageLevels.java @@ -0,0 +1,20 @@ +package spark.api.java; + +import spark.storage.StorageLevel; + +/** + * Expose some commonly useful storage level constants. + */ +public class StorageLevels { + public static final StorageLevel NONE = new StorageLevel(false, false, false, 1); + public static final StorageLevel DISK_ONLY = new StorageLevel(true, false, false, 1); + public static final StorageLevel DISK_ONLY_2 = new StorageLevel(true, false, false, 2); + public static final StorageLevel MEMORY_ONLY = new StorageLevel(false, true, true, 1); + public static final StorageLevel MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2); + public static final StorageLevel MEMORY_ONLY_SER = new StorageLevel(false, true, false, 1); + public static final StorageLevel MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2); + public static final StorageLevel MEMORY_AND_DISK = new StorageLevel(true, true, true, 1); + public static final StorageLevel MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2); + public static final StorageLevel MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, 1); + public static final StorageLevel MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2); +} diff --git a/core/src/main/scala/spark/api/java/function/DoubleFlatMapFunction.java b/core/src/main/scala/spark/api/java/function/DoubleFlatMapFunction.java index 7b6478c2cd..3a8192be3a 100644 --- a/core/src/main/scala/spark/api/java/function/DoubleFlatMapFunction.java +++ b/core/src/main/scala/spark/api/java/function/DoubleFlatMapFunction.java @@ -5,6 +5,9 @@ import scala.runtime.AbstractFunction1; import java.io.Serializable; +/** + * A function that returns zero or more records of type Double from each input record. + */ // DoubleFlatMapFunction does not extend FlatMapFunction because flatMap is // overloaded for both FlatMapFunction and DoubleFlatMapFunction. public abstract class DoubleFlatMapFunction extends AbstractFunction1> diff --git a/core/src/main/scala/spark/api/java/function/DoubleFunction.java b/core/src/main/scala/spark/api/java/function/DoubleFunction.java index a03a72c835..c6ef76d088 100644 --- a/core/src/main/scala/spark/api/java/function/DoubleFunction.java +++ b/core/src/main/scala/spark/api/java/function/DoubleFunction.java @@ -5,6 +5,9 @@ import scala.runtime.AbstractFunction1; import java.io.Serializable; +/** + * A function that returns Doubles, and can be used to construct DoubleRDDs. + */ // DoubleFunction does not extend Function because some UDF functions, like map, // are overloaded for both Function and DoubleFunction. public abstract class DoubleFunction extends WrappedFunction1 diff --git a/core/src/main/scala/spark/api/java/function/FlatMapFunction.scala b/core/src/main/scala/spark/api/java/function/FlatMapFunction.scala index bcba38c569..e027cdacd3 100644 --- a/core/src/main/scala/spark/api/java/function/FlatMapFunction.scala +++ b/core/src/main/scala/spark/api/java/function/FlatMapFunction.scala @@ -1,5 +1,8 @@ package spark.api.java.function +/** + * A function that returns zero or more output records from each input record. + */ abstract class FlatMapFunction[T, R] extends Function[T, java.lang.Iterable[R]] { @throws(classOf[Exception]) def call(x: T) : java.lang.Iterable[R] diff --git a/core/src/main/scala/spark/api/java/function/Function.java b/core/src/main/scala/spark/api/java/function/Function.java index f6f2e5fd76..dae8295f21 100644 --- a/core/src/main/scala/spark/api/java/function/Function.java +++ b/core/src/main/scala/spark/api/java/function/Function.java @@ -8,8 +8,9 @@ import java.io.Serializable; /** - * Base class for functions whose return types do not have special RDDs; DoubleFunction is - * handled separately, to allow DoubleRDDs to be constructed when mapping RDDs to doubles. + * Base class for functions whose return types do not create special RDDs. PairFunction and + * DoubleFunction are handled separately, to allow PairRDDs and DoubleRDDs to be constructed + * when mapping RDDs of other types. */ public abstract class Function extends WrappedFunction1 implements Serializable { public abstract R call(T t) throws Exception; diff --git a/core/src/main/scala/spark/api/java/function/Function2.java b/core/src/main/scala/spark/api/java/function/Function2.java index be48b173b8..69bf12c8c9 100644 --- a/core/src/main/scala/spark/api/java/function/Function2.java +++ b/core/src/main/scala/spark/api/java/function/Function2.java @@ -6,6 +6,9 @@ import scala.runtime.AbstractFunction2; import java.io.Serializable; +/** + * A two-argument function that takes arguments of type T1 and T2 and returns an R. + */ public abstract class Function2 extends WrappedFunction2 implements Serializable { diff --git a/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java b/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java index c074b9c717..b3cc4df6aa 100644 --- a/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java +++ b/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java @@ -7,6 +7,10 @@ import scala.runtime.AbstractFunction1; import java.io.Serializable; +/** + * A function that returns zero or more key-value pair records from each input record. The + * key-value pairs are represented as scala.Tuple2 objects. + */ // PairFlatMapFunction does not extend FlatMapFunction because flatMap is // overloaded for both FlatMapFunction and PairFlatMapFunction. public abstract class PairFlatMapFunction diff --git a/core/src/main/scala/spark/api/java/function/PairFunction.java b/core/src/main/scala/spark/api/java/function/PairFunction.java index 7f5bb7de13..9fc6df4b88 100644 --- a/core/src/main/scala/spark/api/java/function/PairFunction.java +++ b/core/src/main/scala/spark/api/java/function/PairFunction.java @@ -7,6 +7,9 @@ import scala.runtime.AbstractFunction1; import java.io.Serializable; +/** + * A function that returns key-value pairs (Tuple2), and can be used to construct PairRDDs. + */ // PairFunction does not extend Function because some UDF functions, like map, // are overloaded for both Function and PairFunction. public abstract class PairFunction diff --git a/core/src/main/scala/spark/api/java/function/VoidFunction.scala b/core/src/main/scala/spark/api/java/function/VoidFunction.scala index 0eefe337e8..b0096cf2bf 100644 --- a/core/src/main/scala/spark/api/java/function/VoidFunction.scala +++ b/core/src/main/scala/spark/api/java/function/VoidFunction.scala @@ -1,5 +1,8 @@ package spark.api.java.function +/** + * A function with no return value. + */ // This allows Java users to write void methods without having to return Unit. abstract class VoidFunction[T] extends Serializable { @throws(classOf[Exception]) -- cgit v1.2.3