aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-10-09 18:38:36 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-10-09 18:38:36 -0700
commitee2fcb2ce68e01f9d6c41753bf33ccbaa5cb1181 (patch)
treecb4be56ac4dc26c43e5f56c175c5c460def0e4b9 /core/src/main
parentbc0bc672d02e8f5f12cd1e14863db36c42acff96 (diff)
downloadspark-ee2fcb2ce68e01f9d6c41753bf33ccbaa5cb1181.tar.gz
spark-ee2fcb2ce68e01f9d6c41753bf33ccbaa5cb1181.tar.bz2
spark-ee2fcb2ce68e01f9d6c41753bf33ccbaa5cb1181.zip
Added documentation to all the *RDDFunction classes, and moved them into
the spark package to make them more visible. Also documented various other miscellaneous things in the API.
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/DoubleRDDFunctions.scala (renamed from core/src/main/scala/spark/rdd/DoubleRDDFunctions.scala)23
-rw-r--r--core/src/main/scala/spark/HadoopWriter.scala9
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala (renamed from core/src/main/scala/spark/rdd/PairRDDFunctions.scala)226
-rw-r--r--core/src/main/scala/spark/RDD.scala13
-rw-r--r--core/src/main/scala/spark/SequenceFileRDDFunctions.scala (renamed from core/src/main/scala/spark/rdd/SequenceFileRDDFunctions.scala)15
-rw-r--r--core/src/main/scala/spark/SparkContext.scala4
-rw-r--r--core/src/main/scala/spark/api/java/JavaPairRDD.scala4
-rw-r--r--core/src/main/scala/spark/storage/StorageLevel.scala7
-rw-r--r--core/src/main/scala/spark/util/StatCounter.scala26
9 files changed, 273 insertions, 54 deletions
diff --git a/core/src/main/scala/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/spark/DoubleRDDFunctions.scala
index d232ddeb7c..b2a0e2b631 100644
--- a/core/src/main/scala/spark/rdd/DoubleRDDFunctions.scala
+++ b/core/src/main/scala/spark/DoubleRDDFunctions.scala
@@ -1,39 +1,52 @@
-package spark.rdd
+package spark
import spark.partial.BoundedDouble
import spark.partial.MeanEvaluator
import spark.partial.PartialResult
import spark.partial.SumEvaluator
-
-import spark.Logging
-import spark.RDD
-import spark.TaskContext
import spark.util.StatCounter
/**
* Extra functions available on RDDs of Doubles through an implicit conversion.
+ * Import `spark.SparkContext._` at the top of your program to use these functions.
*/
class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
+ /** Add up the elements in this RDD. */
def sum(): Double = {
self.reduce(_ + _)
}
+ /**
+ * Return a [[spark.util.StatCounter]] object that captures the mean, variance and count
+ * of the RDD's elements in one operation.
+ */
def stats(): StatCounter = {
self.mapPartitions(nums => Iterator(StatCounter(nums))).reduce((a, b) => a.merge(b))
}
+ /** Compute the mean of this RDD's elements. */
def mean(): Double = stats().mean
+ /** Compute the variance of this RDD's elements. */
def variance(): Double = stats().variance
+ /** Compute the standard deviation of this RDD's elements. */
def stdev(): Double = stats().stdev
+ /**
+ * Compute the sample standard deviation of this RDD's elements (which corrects for bias in
+ * estimating the standard deviation by dividing by N-1 instead of N).
+ */
+ def sampleStdev(): Double = stats().stdev
+
+ /** (Experimental) Approximate operation to return the mean within a timeout. */
def meanApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
val evaluator = new MeanEvaluator(self.splits.size, confidence)
self.context.runApproximateJob(self, processPartition, evaluator, timeout)
}
+ /** (Experimental) Approximate operation to return the sum within a timeout. */
def sumApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns)
val evaluator = new SumEvaluator(self.splits.size, confidence)
diff --git a/core/src/main/scala/spark/HadoopWriter.scala b/core/src/main/scala/spark/HadoopWriter.scala
index ebb51607e6..ca584d2d5a 100644
--- a/core/src/main/scala/spark/HadoopWriter.scala
+++ b/core/src/main/scala/spark/HadoopWriter.scala
@@ -16,9 +16,12 @@ import spark.Logging
import spark.SerializableWritable
/**
- * Saves an RDD using a Hadoop OutputFormat as specified by a JobConf. The JobConf should also
- * contain an output key class, an output value class, a filename to write to, etc exactly like in
- * a Hadoop job.
+ * An internal helper class that saves an RDD using a Hadoop OutputFormat. This is only public
+ * because we need to access this class from the `spark` package to use some package-private Hadoop
+ * functions, but this class should not be used directly by users.
+ *
+ * Saves the RDD using a JobConf, which should contain an output key class, an output value class,
+ * a filename to write to, etc, exactly like in a Hadoop MapReduce job.
*/
class HadoopWriter(@transient jobConf: JobConf) extends Logging with Serializable {
diff --git a/core/src/main/scala/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 2a94ea263a..0240fd95c7 100644
--- a/core/src/main/scala/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -1,4 +1,4 @@
-package spark.rdd
+package spark
import java.io.EOFException
import java.io.ObjectInputStream
@@ -36,27 +36,31 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext
import spark.partial.BoundedDouble
import spark.partial.PartialResult
-import spark.Aggregator
-import spark.HashPartitioner
-import spark.Logging
-import spark.OneToOneDependency
-import spark.Partitioner
-import spark.RangePartitioner
-import spark.RDD
-import spark.SerializableWritable
+import spark.rdd._
import spark.SparkContext._
-import spark.SparkException
-import spark.Split
-import spark.TaskContext
/**
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
+ * Import `spark.SparkContext._` at the top of your program to use these functions.
*/
class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
self: RDD[(K, V)])
extends Logging
with Serializable {
+ /**
+ * Generic function to combine the elements for each key using a custom set of aggregation
+ * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
+ * Note that V and C can be different -- for example, one might group an RDD of type
+ * (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions:
+ *
+ * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
+ * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
+ * - `mergeCombiners`, to combine two C's into a single one.
+ *
+ * In addition, users can control the partitioning of the output RDD, and whether to perform
+ * map-side aggregation (if a mapper can produce multiple items with the same key).
+ */
def combineByKey[C](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
@@ -74,6 +78,9 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
new ShuffledAggregatedRDD(self, aggregator, partitioner)
}
+ /**
+ * Simplified version of combineByKey that hash-partitions the output RDD.
+ */
def combineByKey[C](createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
@@ -81,10 +88,20 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numSplits))
}
+ /**
+ * Merge the values for each key using an associative reduce function. This will also perform
+ * the merging locally on each mapper before sending results to a reducer, similarly to a
+ * "combiner" in MapReduce.
+ */
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
combineByKey[V]((v: V) => v, func, func, partitioner)
}
+ /**
+ * Merge the values for each key using an associative reduce function, but return the results
+ * immediately to the master as a Map. This will also perform the merging locally on each mapper
+ * before sending results to a reducer, similarly to a "combiner" in MapReduce.
+ */
def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = {
def reducePartition(iter: Iterator[(K, V)]): Iterator[JHashMap[K, V]] = {
val map = new JHashMap[K, V]
@@ -106,22 +123,34 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
self.mapPartitions(reducePartition).reduce(mergeMaps)
}
- // Alias for backwards compatibility
+ /** Alias for reduceByKeyLocally */
def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = reduceByKeyLocally(func)
- // TODO: This should probably be a distributed version
+ /** Count the number of elements for each key, and return the result to the master as a Map. */
def countByKey(): Map[K, Long] = self.map(_._1).countByValue()
- // TODO: This should probably be a distributed version
+ /**
+ * (Experimental) Approximate version of countByKey that can return a partial result if it does
+ * not finish within a timeout.
+ */
def countByKeyApprox(timeout: Long, confidence: Double = 0.95)
: PartialResult[Map[K, BoundedDouble]] = {
self.map(_._1).countByValueApprox(timeout, confidence)
}
+ /**
+ * Merge the values for each key using an associative reduce function. This will also perform
+ * the merging locally on each mapper before sending results to a reducer, similarly to a
+ * "combiner" in MapReduce. Output will be hash-partitioned with numSplits splits.
+ */
def reduceByKey(func: (V, V) => V, numSplits: Int): RDD[(K, V)] = {
reduceByKey(new HashPartitioner(numSplits), func)
}
+ /**
+ * Group the values for each key in the RDD into a single sequence. Allows controlling the
+ * partitioning of the resulting key-value pair RDD by passing a Partitioner.
+ */
def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
def createCombiner(v: V) = ArrayBuffer(v)
def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v
@@ -131,16 +160,20 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
bufs.asInstanceOf[RDD[(K, Seq[V])]]
}
+ /**
+ * Group the values for each key in the RDD into a single sequence. Hash-partitions the
+ * resulting RDD with into `numSplits` partitions.
+ */
def groupByKey(numSplits: Int): RDD[(K, Seq[V])] = {
groupByKey(new HashPartitioner(numSplits))
}
/**
- * Repartition the RDD using the specified partitioner. If mapSideCombine is
- * true, Spark will group values of the same key together on the map side
- * before the repartitioning. If a large number of duplicated keys are
- * expected, and the size of the keys are large, mapSideCombine should be set
- * to true.
+ * Return a copy of the RDD partitioned using the specified partitioner. If `mapSideCombine`
+ * is true, Spark will group values of the same key together on the map side before the
+ * repartitioning, to only send each key over the network once. If a large number of
+ * duplicated keys are expected, and the size of the keys are large, `mapSideCombine` should
+ * be set to true.
*/
def partitionBy(partitioner: Partitioner, mapSideCombine: Boolean = false): RDD[(K, V)] = {
if (mapSideCombine) {
@@ -155,6 +188,11 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
}
}
+ /**
+ * Merge the values for each key using an associative reduce function. This will also perform
+ * the merging locally on each mapper before sending results to a reducer, similarly to a
+ * "combiner" in MapReduce.
+ */
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
this.cogroup(other, partitioner).flatMapValues {
case (vs, ws) =>
@@ -162,6 +200,12 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
}
}
+ /**
+ * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
+ * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
+ * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to
+ * partition the output RDD.
+ */
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
this.cogroup(other, partitioner).flatMapValues {
case (vs, ws) =>
@@ -173,6 +217,12 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
}
}
+ /**
+ * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
+ * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
+ * pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to
+ * partition the output RDD.
+ */
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Option[V], W))] = {
this.cogroup(other, partitioner).flatMapValues {
@@ -185,56 +235,117 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
}
}
- def combineByKey[C](createCombiner: V => C,
- mergeValue: (C, V) => C,
- mergeCombiners: (C, C) => C) : RDD[(K, C)] = {
+ /**
+ * Simplified version of combineByKey that hash-partitions the resulting RDD using the default
+ * parallelism level.
+ */
+ def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
+ : RDD[(K, C)] = {
combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
}
+ /**
+ * Merge the values for each key using an associative reduce function. This will also perform
+ * the merging locally on each mapper before sending results to a reducer, similarly to a
+ * "combiner" in MapReduce. Output will be hash-partitioned with the default parallelism level.
+ */
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = {
reduceByKey(defaultPartitioner(self), func)
}
+ /**
+ * Group the values for each key in the RDD into a single sequence. Hash-partitions the
+ * resulting RDD with the default parallelism level.
+ */
def groupByKey(): RDD[(K, Seq[V])] = {
groupByKey(defaultPartitioner(self))
}
+ /**
+ * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
+ * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
+ * (k, v2) is in `other`. Performs a hash join across the cluster.
+ */
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = {
join(other, defaultPartitioner(self, other))
}
+ /**
+ * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
+ * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
+ * (k, v2) is in `other`. Performs a hash join across the cluster.
+ */
def join[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, W))] = {
join(other, new HashPartitioner(numSplits))
}
+ /**
+ * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
+ * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
+ * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
+ * using the default level of parallelism.
+ */
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = {
leftOuterJoin(other, defaultPartitioner(self, other))
}
+ /**
+ * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
+ * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
+ * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
+ * into `numSplits` partitions.
+ */
def leftOuterJoin[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, Option[W]))] = {
leftOuterJoin(other, new HashPartitioner(numSplits))
}
+ /**
+ * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
+ * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
+ * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
+ * RDD using the default parallelism level.
+ */
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = {
rightOuterJoin(other, defaultPartitioner(self, other))
}
+ /**
+ * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
+ * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
+ * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
+ * RDD into the given number of partitions.
+ */
def rightOuterJoin[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (Option[V], W))] = {
rightOuterJoin(other, new HashPartitioner(numSplits))
}
+ /**
+ * Return the key-value pairs in this RDD to the master as a Map.
+ */
def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*)
+ /**
+ * Pass each value in the key-value pair RDD through a map function without changing the keys;
+ * this also retains the original RDD's partitioning.
+ */
def mapValues[U](f: V => U): RDD[(K, U)] = {
val cleanF = self.context.clean(f)
new MappedValuesRDD(self, cleanF)
}
+ /**
+ * Pass each value in the key-value pair RDD through a flatMap function without changing the
+ * keys; this also retains the original RDD's partitioning.
+ */
def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = {
val cleanF = self.context.clean(f)
new FlatMappedValuesRDD(self, cleanF)
}
+ /**
+ * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
+ * list of values for that key in `this` as well as `other`.
+ */
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = {
val cg = new CoGroupedRDD[K](
Seq(self.asInstanceOf[RDD[(_, _)]], other.asInstanceOf[RDD[(_, _)]]),
@@ -246,6 +357,10 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
}
}
+ /**
+ * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
+ * tuple with the list of values for that key in `this`, `other1` and `other2`.
+ */
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
val cg = new CoGroupedRDD[K](
@@ -260,28 +375,46 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
}
}
+ /**
+ * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
+ * list of values for that key in `this` as well as `other`.
+ */
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
cogroup(other, defaultPartitioner(self, other))
}
+ /**
+ * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
+ * tuple with the list of values for that key in `this`, `other1` and `other2`.
+ */
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
cogroup(other1, other2, defaultPartitioner(self, other1, other2))
}
+ /**
+ * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
+ * list of values for that key in `this` as well as `other`.
+ */
def cogroup[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (Seq[V], Seq[W]))] = {
cogroup(other, new HashPartitioner(numSplits))
}
+ /**
+ * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a
+ * tuple with the list of values for that key in `this`, `other1` and `other2`.
+ */
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numSplits: Int)
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
cogroup(other1, other2, new HashPartitioner(numSplits))
}
+ /** Alias for cogroup. */
def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
cogroup(other, defaultPartitioner(self, other))
}
+ /** Alias for cogroup. */
def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
cogroup(other1, other2, defaultPartitioner(self, other1, other2))
@@ -298,6 +431,10 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
return new HashPartitioner(self.context.defaultParallelism)
}
+ /**
+ * Return the list of values in the RDD for key `key`. This operation is done efficiently if the
+ * RDD has a known partitioner by only searching the partition that the key maps to.
+ */
def lookup(key: K): Seq[V] = {
self.partitioner match {
case Some(p) =>
@@ -316,14 +453,26 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
}
}
+ /**
+ * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
+ * supporting the key and value types K and V in this RDD.
+ */
def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) {
saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
}
+ /**
+ * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat`
+ * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD.
+ */
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) {
saveAsNewAPIHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
}
+ /**
+ * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat`
+ * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD.
+ */
def saveAsNewAPIHadoopFile(
path: String,
keyClass: Class[_],
@@ -332,6 +481,10 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, new Configuration)
}
+ /**
+ * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat`
+ * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD.
+ */
def saveAsNewAPIHadoopFile(
path: String,
keyClass: Class[_],
@@ -379,6 +532,10 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
jobCommitter.cleanupJob(jobTaskContext)
}
+ /**
+ * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
+ * supporting the key and value types K and V in this RDD.
+ */
def saveAsHadoopFile(
path: String,
keyClass: Class[_],
@@ -394,6 +551,12 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
saveAsHadoopDataset(conf)
}
+ /**
+ * Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for
+ * that storage system. The JobConf should set an OutputFormat and any output paths required
+ * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop
+ * MapReduce job.
+ */
def saveAsHadoopDataset(conf: JobConf) {
val outputFormatClass = conf.getOutputFormat
val keyClass = conf.getOutputKeyClass
@@ -436,21 +599,33 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
writer.cleanup()
}
- def getKeyClass() = implicitly[ClassManifest[K]].erasure
+ private[spark] def getKeyClass() = implicitly[ClassManifest[K]].erasure
- def getValueClass() = implicitly[ClassManifest[V]].erasure
+ private[spark] def getValueClass() = implicitly[ClassManifest[V]].erasure
}
+/**
+ * Extra functions available on RDDs of (key, value) pairs where the key is sortable through
+ * an implicit conversion. Import `spark.SparkContext._` at the top of your program to use these
+ * functions. They will work with any key type that has a `scala.math.Ordered` implementation.
+ */
class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
self: RDD[(K, V)])
extends Logging
with Serializable {
+ /**
+ * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
+ * `collect` or `save` on the resulting RDD will return or output an ordered list of records
+ * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
+ * order of the keys).
+ */
def sortByKey(ascending: Boolean = true, numSplits: Int = self.splits.size): RDD[(K,V)] = {
new ShuffledSortedRDD(self, ascending, numSplits)
}
}
+private[spark]
class MappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => U) extends RDD[(K, U)](prev.context) {
override def splits = prev.splits
override val dependencies = List(new OneToOneDependency(prev))
@@ -458,6 +633,7 @@ class MappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => U) extends RDD[(K, U)]
override def compute(split: Split) = prev.iterator(split).map{case (k, v) => (k, f(v))}
}
+private[spark]
class FlatMappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => TraversableOnce[U])
extends RDD[(K, U)](prev.context) {
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 17869fb31b..984738ef73 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -378,7 +378,8 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
}
/**
- * Approximate version of count() that returns a potentially incomplete result after a timeout.
+ * (Experimental) Approximate version of count() that returns a potentially incomplete result
+ * within a timeout, even if not all tasks have finished.
*/
def countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = {
val countElements: (TaskContext, Iterator[T]) => Long = { (ctx, iter) =>
@@ -394,13 +395,11 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
}
/**
- * Return the count of each unique value in this RDD as a map of
- * (value, count) pairs. The final combine step happens locally on the
- * master, equivalent to running a single reduce task.
- *
- * TODO: This should perhaps be distributed by default.
+ * Return the count of each unique value in this RDD as a map of (value, count) pairs. The final
+ * combine step happens locally on the master, equivalent to running a single reduce task.
*/
def countByValue(): Map[T, Long] = {
+ // TODO: This should perhaps be distributed by default.
def countPartition(iter: Iterator[T]): Iterator[OLMap[T]] = {
val map = new OLMap[T]
while (iter.hasNext) {
@@ -422,7 +421,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
}
/**
- * Approximate version of countByValue().
+ * (Experimental) Approximate version of countByValue().
*/
def countByValueApprox(
timeout: Long,
diff --git a/core/src/main/scala/spark/rdd/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
index 24c731fa92..a34aee69c1 100644
--- a/core/src/main/scala/spark/rdd/SequenceFileRDDFunctions.scala
+++ b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala
@@ -1,4 +1,4 @@
-package spark.rdd
+package spark
import java.io.EOFException
import java.net.URL
@@ -23,21 +23,21 @@ import org.apache.hadoop.io.NullWritable
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.Text
-import spark.Logging
-import spark.RDD
import spark.SparkContext._
/**
* Extra functions available on RDDs of (key, value) pairs to create a Hadoop SequenceFile,
* through an implicit conversion. Note that this can't be part of PairRDDFunctions because
* we need more implicit parameters to convert our keys and values to Writable.
+ *
+ * Users should import `spark.SparkContext._` at the top of their program to use these functions.
*/
class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : ClassManifest](
self: RDD[(K, V)])
extends Logging
with Serializable {
- def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = {
+ private def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = {
val c = {
if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) {
classManifest[T].erasure
@@ -49,6 +49,13 @@ class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : Cla
c.asInstanceOf[Class[_ <: Writable]]
}
+ /**
+ * Output the RDD as a Hadoop SequenceFile using the Writable types we infer from the RDD's key
+ * and value types. If the key or value are Writable, then we use their classes directly;
+ * otherwise we map primitive types such as Int and Double to IntWritable, DoubleWritable, etc,
+ * byte arrays to BytesWritable, and Strings to Text. The `path` can be on any Hadoop-supported
+ * file system.
+ */
def saveAsSequenceFile(path: String) {
def anyToWritable[U <% Writable](u: U): Writable = u
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 02a08778c3..8739c8bb6d 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -35,12 +35,8 @@ import spark.broadcast._
import spark.deploy.LocalSparkCluster
import spark.partial.ApproximateEvaluator
import spark.partial.PartialResult
-import spark.rdd.DoubleRDDFunctions
import spark.rdd.HadoopRDD
import spark.rdd.NewHadoopRDD
-import spark.rdd.OrderedRDDFunctions
-import spark.rdd.PairRDDFunctions
-import spark.rdd.SequenceFileRDDFunctions
import spark.rdd.UnionRDD
import spark.scheduler.ShuffleMapTask
import spark.scheduler.DAGScheduler
diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
index 3c4399493c..d361de8f8f 100644
--- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
@@ -15,7 +15,7 @@ import spark.api.java.function.{Function2 => JFunction2}
import spark.api.java.function.{Function => JFunction}
import spark.partial.BoundedDouble
import spark.partial.PartialResult
-import spark.rdd.OrderedRDDFunctions
+import spark.OrderedRDDFunctions
import spark.storage.StorageLevel
import spark.HashPartitioner
import spark.Partitioner
@@ -279,4 +279,4 @@ object JavaPairRDD {
new JavaPairRDD[K, V](rdd)
implicit def toRDD[K, V](rdd: JavaPairRDD[K, V]): RDD[(K, V)] = rdd.rdd
-} \ No newline at end of file
+}
diff --git a/core/src/main/scala/spark/storage/StorageLevel.scala b/core/src/main/scala/spark/storage/StorageLevel.scala
index 2d52fac1ef..c497f03e0c 100644
--- a/core/src/main/scala/spark/storage/StorageLevel.scala
+++ b/core/src/main/scala/spark/storage/StorageLevel.scala
@@ -2,6 +2,13 @@ package spark.storage
import java.io.{Externalizable, ObjectInput, ObjectOutput}
+/**
+ * Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory,
+ * whether to drop the RDD to disk if it falls out of memory, whether to keep the data in memory
+ * in a serialized format, and whether to replicate the RDD partitions on multiple nodes.
+ * The [[spark.storage.StorageLevel$]] singleton object contains some static constants for
+ * commonly useful storage levels.
+ */
class StorageLevel(
var useDisk: Boolean,
var useMemory: Boolean,
diff --git a/core/src/main/scala/spark/util/StatCounter.scala b/core/src/main/scala/spark/util/StatCounter.scala
index 023ec09332..9d7e2b804b 100644
--- a/core/src/main/scala/spark/util/StatCounter.scala
+++ b/core/src/main/scala/spark/util/StatCounter.scala
@@ -2,10 +2,11 @@ package spark.util
/**
* A class for tracking the statistics of a set of numbers (count, mean and variance) in a
- * numerically robust way. Includes support for merging two StatCounters. Based on Welford and
- * Chan's algorithms described at http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance.
+ * numerically robust way. Includes support for merging two StatCounters. Based on
+ * [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance Welford and Chan's algorithms for running variance]].
+ *
+ * @constructor Initialize the StatCounter with the given values.
*/
-private[spark]
class StatCounter(values: TraversableOnce[Double]) extends Serializable {
private var n: Long = 0 // Running count of our values
private var mu: Double = 0 // Running mean of our values
@@ -13,8 +14,10 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
merge(values)
+ /** @constructor Initialize the StatCounter with no values. */
def this() = this(Nil)
+ /** Add a value into this StatCounter, updating the internal statistics. */
def merge(value: Double): StatCounter = {
val delta = value - mu
n += 1
@@ -23,11 +26,13 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
this
}
+ /** Add multiple values into this StatCounter, updating the internal statistics. */
def merge(values: TraversableOnce[Double]): StatCounter = {
values.foreach(v => merge(v))
this
}
+ /** Merge another StatCounter into this one, adding up the internal statistics. */
def merge(other: StatCounter): StatCounter = {
if (other == this) {
merge(other.copy()) // Avoid overwriting fields in a weird order
@@ -46,6 +51,7 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
}
}
+ /** Clone this StatCounter */
def copy(): StatCounter = {
val other = new StatCounter
other.n = n
@@ -60,6 +66,7 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
def sum: Double = n * mu
+ /** Return the variance of the values. */
def variance: Double = {
if (n == 0)
Double.NaN
@@ -67,6 +74,10 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
m2 / n
}
+ /**
+ * Return the sample variance, which corrects for bias in estimating the variance by dividing
+ * by N-1 instead of N.
+ */
def sampleVariance: Double = {
if (n <= 1)
Double.NaN
@@ -74,8 +85,13 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
m2 / (n - 1)
}
+ /** Return the standard deviation of the values. */
def stdev: Double = math.sqrt(variance)
+ /**
+ * Return the sample standard deviation of the values, which corrects for bias in estimating the
+ * variance by dividing by N-1 instead of N.
+ */
def sampleStdev: Double = math.sqrt(sampleVariance)
override def toString: String = {
@@ -83,8 +99,10 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
}
}
-private[spark] object StatCounter {
+object StatCounter {
+ /** Build a StatCounter from a list of values. */
def apply(values: TraversableOnce[Double]) = new StatCounter(values)
+ /** Build a StatCounter from a list of values passed as variable-length arguments. */
def apply(values: Double*) = new StatCounter(values)
}