aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-10-12 17:53:20 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-10-12 17:53:20 -0700
commit8d7b77bcb545e7e1167cf6e4a010809d5bd76c5a (patch)
tree2088f18ec9c9c4db88208a9de77e90476ade6245 /core/src/main
parent682b2d9329ebfd82f5231acd940f9dbd2037d2ae (diff)
downloadspark-8d7b77bcb545e7e1167cf6e4a010809d5bd76c5a.tar.gz
spark-8d7b77bcb545e7e1167cf6e4a010809d5bd76c5a.tar.bz2
spark-8d7b77bcb545e7e1167cf6e4a010809d5bd76c5a.zip
Some doc and usability improvements:
- Added a StorageLevels class for easy access to StorageLevel constants in Java - Added doc comments on Function classes in Java - Updated Accumulator and HadoopWriter docs slightly
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/Accumulators.scala56
-rw-r--r--core/src/main/scala/spark/HadoopWriter.scala2
-rw-r--r--core/src/main/scala/spark/api/java/StorageLevels.java20
-rw-r--r--core/src/main/scala/spark/api/java/function/DoubleFlatMapFunction.java3
-rw-r--r--core/src/main/scala/spark/api/java/function/DoubleFunction.java3
-rw-r--r--core/src/main/scala/spark/api/java/function/FlatMapFunction.scala3
-rw-r--r--core/src/main/scala/spark/api/java/function/Function.java5
-rw-r--r--core/src/main/scala/spark/api/java/function/Function2.java3
-rw-r--r--core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java4
-rw-r--r--core/src/main/scala/spark/api/java/function/PairFunction.java3
-rw-r--r--core/src/main/scala/spark/api/java/function/VoidFunction.scala3
11 files changed, 83 insertions, 22 deletions
diff --git a/core/src/main/scala/spark/Accumulators.scala b/core/src/main/scala/spark/Accumulators.scala
index 62186de80d..bacd0ace37 100644
--- a/core/src/main/scala/spark/Accumulators.scala
+++ b/core/src/main/scala/spark/Accumulators.scala
@@ -6,16 +6,17 @@ import scala.collection.mutable.Map
import scala.collection.generic.Growable
/**
- * A datatype that can be accumulated, i.e. has an commutative and associative +.
+ * A datatype that can be accumulated, i.e. has an commutative and associative "add" operation,
+ * but where the result type, `R`, may be different from the element type being added, `T`.
*
- * You must define how to add data, and how to merge two of these together. For some datatypes, these might be
- * the same operation (eg., a counter). In that case, you might want to use [[spark.AccumulatorParam]]. They won't
- * always be the same, though -- eg., imagine you are accumulating a set. You will add items to the set, and you
- * will union two sets together.
+ * You must define how to add data, and how to merge two of these together. For some datatypes,
+ * such as a counter, these might be the same operation. In that case, you can use the simpler
+ * [[spark.Accumulator]]. They won't always be the same, though -- e.g., imagine you are
+ * accumulating a set. You will add items to the set, and you will union two sets together.
*
* @param initialValue initial value of accumulator
- * @param param helper object defining how to add elements of type `T`
- * @tparam R the full accumulated data
+ * @param param helper object defining how to add elements of type `R` and `T`
+ * @tparam R the full accumulated data (result type)
* @tparam T partial data that can be added in
*/
class Accumulable[R, T] (
@@ -44,6 +45,10 @@ class Accumulable[R, T] (
* @param term the other Accumulable that will get merged with this
*/
def ++= (term: R) { value_ = param.addInPlace(value_, term)}
+
+ /**
+ * Access the accumulator's current value; only allowed on master.
+ */
def value = {
if (!deserialized) value_
else throw new UnsupportedOperationException("Can't read accumulator value in task")
@@ -60,6 +65,9 @@ class Accumulable[R, T] (
*/
def localValue = value_
+ /**
+ * Set the accumulator's value; only allowed on master.
+ */
def value_= (r: R) {
if (!deserialized) value_ = r
else throw new UnsupportedOperationException("Can't assign accumulator value in task")
@@ -77,28 +85,37 @@ class Accumulable[R, T] (
}
/**
- * Helper object defining how to accumulate values of a particular type.
+ * Helper object defining how to accumulate values of a particular type. An implicit
+ * AccumulableParam needs to be available when you create Accumulables of a specific type.
*
- * @tparam R the full accumulated data
+ * @tparam R the full accumulated data (result type)
* @tparam T partial data that can be added in
*/
trait AccumulableParam[R, T] extends Serializable {
/**
- * Add additional data to the accumulator value.
+ * Add additional data to the accumulator value. Is allowed to modify and return `r`
+ * for efficiency (to avoid allocating objects).
+ *
* @param r the current value of the accumulator
* @param t the data to be added to the accumulator
* @return the new value of the accumulator
*/
- def addAccumulator(r: R, t: T) : R
+ def addAccumulator(r: R, t: T): R
/**
- * Merge two accumulated values together
+ * Merge two accumulated values together. Is allowed to modify and return the first value
+ * for efficiency (to avoid allocating objects).
+ *
* @param r1 one set of accumulated data
* @param r2 another set of accumulated data
* @return both data sets merged together
*/
def addInPlace(r1: R, r2: R): R
+ /**
+ * Return the "zero" (identity) value for an accumulator type, given its initial value. For
+ * example, if R was a vector of N dimensions, this would return a vector of N zeroes.
+ */
def zero(initialValue: R): R
}
@@ -106,12 +123,12 @@ private[spark]
class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable, T]
extends AccumulableParam[R,T] {
- def addAccumulator(growable: R, elem: T) : R = {
+ def addAccumulator(growable: R, elem: T): R = {
growable += elem
growable
}
- def addInPlace(t1: R, t2: R) : R = {
+ def addInPlace(t1: R, t2: R): R = {
t1 ++= t2
t1
}
@@ -134,17 +151,18 @@ class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Ser
* @param param helper object defining how to add elements of type `T`
* @tparam T result type
*/
-class Accumulator[T](
- @transient initialValue: T,
- param: AccumulatorParam[T]) extends Accumulable[T,T](initialValue, param)
+class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T])
+ extends Accumulable[T,T](initialValue, param)
/**
* A simpler version of [[spark.AccumulableParam]] where the only datatype you can add in is the same type
- * as the accumulated value
+ * as the accumulated value. An implicit AccumulatorParam object needs to be available when you create
+ * Accumulators of a specific type.
+ *
* @tparam T type of value to accumulate
*/
trait AccumulatorParam[T] extends AccumulableParam[T, T] {
- def addAccumulator(t1: T, t2: T) : T = {
+ def addAccumulator(t1: T, t2: T): T = {
addInPlace(t1, t2)
}
}
diff --git a/core/src/main/scala/spark/HadoopWriter.scala b/core/src/main/scala/spark/HadoopWriter.scala
index ca584d2d5a..ffe0f3c4a1 100644
--- a/core/src/main/scala/spark/HadoopWriter.scala
+++ b/core/src/main/scala/spark/HadoopWriter.scala
@@ -16,7 +16,7 @@ import spark.Logging
import spark.SerializableWritable
/**
- * An internal helper class that saves an RDD using a Hadoop OutputFormat. This is only public
+ * Internal helper class that saves an RDD using a Hadoop OutputFormat. This is only public
* because we need to access this class from the `spark` package to use some package-private Hadoop
* functions, but this class should not be used directly by users.
*
diff --git a/core/src/main/scala/spark/api/java/StorageLevels.java b/core/src/main/scala/spark/api/java/StorageLevels.java
new file mode 100644
index 0000000000..722af3c06c
--- /dev/null
+++ b/core/src/main/scala/spark/api/java/StorageLevels.java
@@ -0,0 +1,20 @@
+package spark.api.java;
+
+import spark.storage.StorageLevel;
+
+/**
+ * Expose some commonly useful storage level constants.
+ */
+public class StorageLevels {
+ public static final StorageLevel NONE = new StorageLevel(false, false, false, 1);
+ public static final StorageLevel DISK_ONLY = new StorageLevel(true, false, false, 1);
+ public static final StorageLevel DISK_ONLY_2 = new StorageLevel(true, false, false, 2);
+ public static final StorageLevel MEMORY_ONLY = new StorageLevel(false, true, true, 1);
+ public static final StorageLevel MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2);
+ public static final StorageLevel MEMORY_ONLY_SER = new StorageLevel(false, true, false, 1);
+ public static final StorageLevel MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2);
+ public static final StorageLevel MEMORY_AND_DISK = new StorageLevel(true, true, true, 1);
+ public static final StorageLevel MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2);
+ public static final StorageLevel MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, 1);
+ public static final StorageLevel MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2);
+}
diff --git a/core/src/main/scala/spark/api/java/function/DoubleFlatMapFunction.java b/core/src/main/scala/spark/api/java/function/DoubleFlatMapFunction.java
index 7b6478c2cd..3a8192be3a 100644
--- a/core/src/main/scala/spark/api/java/function/DoubleFlatMapFunction.java
+++ b/core/src/main/scala/spark/api/java/function/DoubleFlatMapFunction.java
@@ -5,6 +5,9 @@ import scala.runtime.AbstractFunction1;
import java.io.Serializable;
+/**
+ * A function that returns zero or more records of type Double from each input record.
+ */
// DoubleFlatMapFunction does not extend FlatMapFunction because flatMap is
// overloaded for both FlatMapFunction and DoubleFlatMapFunction.
public abstract class DoubleFlatMapFunction<T> extends AbstractFunction1<T, Iterable<Double>>
diff --git a/core/src/main/scala/spark/api/java/function/DoubleFunction.java b/core/src/main/scala/spark/api/java/function/DoubleFunction.java
index a03a72c835..c6ef76d088 100644
--- a/core/src/main/scala/spark/api/java/function/DoubleFunction.java
+++ b/core/src/main/scala/spark/api/java/function/DoubleFunction.java
@@ -5,6 +5,9 @@ import scala.runtime.AbstractFunction1;
import java.io.Serializable;
+/**
+ * A function that returns Doubles, and can be used to construct DoubleRDDs.
+ */
// DoubleFunction does not extend Function because some UDF functions, like map,
// are overloaded for both Function and DoubleFunction.
public abstract class DoubleFunction<T> extends WrappedFunction1<T, Double>
diff --git a/core/src/main/scala/spark/api/java/function/FlatMapFunction.scala b/core/src/main/scala/spark/api/java/function/FlatMapFunction.scala
index bcba38c569..e027cdacd3 100644
--- a/core/src/main/scala/spark/api/java/function/FlatMapFunction.scala
+++ b/core/src/main/scala/spark/api/java/function/FlatMapFunction.scala
@@ -1,5 +1,8 @@
package spark.api.java.function
+/**
+ * A function that returns zero or more output records from each input record.
+ */
abstract class FlatMapFunction[T, R] extends Function[T, java.lang.Iterable[R]] {
@throws(classOf[Exception])
def call(x: T) : java.lang.Iterable[R]
diff --git a/core/src/main/scala/spark/api/java/function/Function.java b/core/src/main/scala/spark/api/java/function/Function.java
index f6f2e5fd76..dae8295f21 100644
--- a/core/src/main/scala/spark/api/java/function/Function.java
+++ b/core/src/main/scala/spark/api/java/function/Function.java
@@ -8,8 +8,9 @@ import java.io.Serializable;
/**
- * Base class for functions whose return types do not have special RDDs; DoubleFunction is
- * handled separately, to allow DoubleRDDs to be constructed when mapping RDDs to doubles.
+ * Base class for functions whose return types do not create special RDDs. PairFunction and
+ * DoubleFunction are handled separately, to allow PairRDDs and DoubleRDDs to be constructed
+ * when mapping RDDs of other types.
*/
public abstract class Function<T, R> extends WrappedFunction1<T, R> implements Serializable {
public abstract R call(T t) throws Exception;
diff --git a/core/src/main/scala/spark/api/java/function/Function2.java b/core/src/main/scala/spark/api/java/function/Function2.java
index be48b173b8..69bf12c8c9 100644
--- a/core/src/main/scala/spark/api/java/function/Function2.java
+++ b/core/src/main/scala/spark/api/java/function/Function2.java
@@ -6,6 +6,9 @@ import scala.runtime.AbstractFunction2;
import java.io.Serializable;
+/**
+ * A two-argument function that takes arguments of type T1 and T2 and returns an R.
+ */
public abstract class Function2<T1, T2, R> extends WrappedFunction2<T1, T2, R>
implements Serializable {
diff --git a/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java b/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java
index c074b9c717..b3cc4df6aa 100644
--- a/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java
+++ b/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java
@@ -7,6 +7,10 @@ import scala.runtime.AbstractFunction1;
import java.io.Serializable;
+/**
+ * A function that returns zero or more key-value pair records from each input record. The
+ * key-value pairs are represented as scala.Tuple2 objects.
+ */
// PairFlatMapFunction does not extend FlatMapFunction because flatMap is
// overloaded for both FlatMapFunction and PairFlatMapFunction.
public abstract class PairFlatMapFunction<T, K, V>
diff --git a/core/src/main/scala/spark/api/java/function/PairFunction.java b/core/src/main/scala/spark/api/java/function/PairFunction.java
index 7f5bb7de13..9fc6df4b88 100644
--- a/core/src/main/scala/spark/api/java/function/PairFunction.java
+++ b/core/src/main/scala/spark/api/java/function/PairFunction.java
@@ -7,6 +7,9 @@ import scala.runtime.AbstractFunction1;
import java.io.Serializable;
+/**
+ * A function that returns key-value pairs (Tuple2<K, V>), and can be used to construct PairRDDs.
+ */
// PairFunction does not extend Function because some UDF functions, like map,
// are overloaded for both Function and PairFunction.
public abstract class PairFunction<T, K, V>
diff --git a/core/src/main/scala/spark/api/java/function/VoidFunction.scala b/core/src/main/scala/spark/api/java/function/VoidFunction.scala
index 0eefe337e8..b0096cf2bf 100644
--- a/core/src/main/scala/spark/api/java/function/VoidFunction.scala
+++ b/core/src/main/scala/spark/api/java/function/VoidFunction.scala
@@ -1,5 +1,8 @@
package spark.api.java.function
+/**
+ * A function with no return value.
+ */
// This allows Java users to write void methods without having to return Unit.
abstract class VoidFunction[T] extends Serializable {
@throws(classOf[Exception])