From ee2fcb2ce68e01f9d6c41753bf33ccbaa5cb1181 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Tue, 9 Oct 2012 18:38:36 -0700 Subject: Added documentation to all the *RDDFunction classes, and moved them into the spark package to make them more visible. Also documented various other miscellaneous things in the API. --- core/src/main/scala/spark/DoubleRDDFunctions.scala | 55 ++ core/src/main/scala/spark/HadoopWriter.scala | 9 +- core/src/main/scala/spark/PairRDDFunctions.scala | 651 +++++++++++++++++++++ core/src/main/scala/spark/RDD.scala | 13 +- .../scala/spark/SequenceFileRDDFunctions.scala | 79 +++ core/src/main/scala/spark/SparkContext.scala | 4 - .../main/scala/spark/api/java/JavaPairRDD.scala | 4 +- .../main/scala/spark/rdd/DoubleRDDFunctions.scala | 42 -- .../main/scala/spark/rdd/PairRDDFunctions.scala | 475 --------------- .../scala/spark/rdd/SequenceFileRDDFunctions.scala | 72 --- .../main/scala/spark/storage/StorageLevel.scala | 7 + core/src/main/scala/spark/util/StatCounter.scala | 26 +- 12 files changed, 828 insertions(+), 609 deletions(-) create mode 100644 core/src/main/scala/spark/DoubleRDDFunctions.scala create mode 100644 core/src/main/scala/spark/PairRDDFunctions.scala create mode 100644 core/src/main/scala/spark/SequenceFileRDDFunctions.scala delete mode 100644 core/src/main/scala/spark/rdd/DoubleRDDFunctions.scala delete mode 100644 core/src/main/scala/spark/rdd/PairRDDFunctions.scala delete mode 100644 core/src/main/scala/spark/rdd/SequenceFileRDDFunctions.scala (limited to 'core/src/main') diff --git a/core/src/main/scala/spark/DoubleRDDFunctions.scala b/core/src/main/scala/spark/DoubleRDDFunctions.scala new file mode 100644 index 0000000000..b2a0e2b631 --- /dev/null +++ b/core/src/main/scala/spark/DoubleRDDFunctions.scala @@ -0,0 +1,55 @@ +package spark + +import spark.partial.BoundedDouble +import spark.partial.MeanEvaluator +import spark.partial.PartialResult +import spark.partial.SumEvaluator +import spark.util.StatCounter + +/** + * Extra functions available on RDDs of Doubles through an implicit conversion. + * Import `spark.SparkContext._` at the top of your program to use these functions. + */ +class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable { + /** Add up the elements in this RDD. */ + def sum(): Double = { + self.reduce(_ + _) + } + + /** + * Return a [[spark.util.StatCounter]] object that captures the mean, variance and count + * of the RDD's elements in one operation. + */ + def stats(): StatCounter = { + self.mapPartitions(nums => Iterator(StatCounter(nums))).reduce((a, b) => a.merge(b)) + } + + /** Compute the mean of this RDD's elements. */ + def mean(): Double = stats().mean + + /** Compute the variance of this RDD's elements. */ + def variance(): Double = stats().variance + + /** Compute the standard deviation of this RDD's elements. */ + def stdev(): Double = stats().stdev + + /** + * Compute the sample standard deviation of this RDD's elements (which corrects for bias in + * estimating the standard deviation by dividing by N-1 instead of N). + */ + def sampleStdev(): Double = stats().stdev + + /** (Experimental) Approximate operation to return the mean within a timeout. */ + def meanApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = { + val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns) + val evaluator = new MeanEvaluator(self.splits.size, confidence) + self.context.runApproximateJob(self, processPartition, evaluator, timeout) + } + + /** (Experimental) Approximate operation to return the sum within a timeout. */ + def sumApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = { + val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns) + val evaluator = new SumEvaluator(self.splits.size, confidence) + self.context.runApproximateJob(self, processPartition, evaluator, timeout) + } +} diff --git a/core/src/main/scala/spark/HadoopWriter.scala b/core/src/main/scala/spark/HadoopWriter.scala index ebb51607e6..ca584d2d5a 100644 --- a/core/src/main/scala/spark/HadoopWriter.scala +++ b/core/src/main/scala/spark/HadoopWriter.scala @@ -16,9 +16,12 @@ import spark.Logging import spark.SerializableWritable /** - * Saves an RDD using a Hadoop OutputFormat as specified by a JobConf. The JobConf should also - * contain an output key class, an output value class, a filename to write to, etc exactly like in - * a Hadoop job. + * An internal helper class that saves an RDD using a Hadoop OutputFormat. This is only public + * because we need to access this class from the `spark` package to use some package-private Hadoop + * functions, but this class should not be used directly by users. + * + * Saves the RDD using a JobConf, which should contain an output key class, an output value class, + * a filename to write to, etc, exactly like in a Hadoop MapReduce job. */ class HadoopWriter(@transient jobConf: JobConf) extends Logging with Serializable { diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala new file mode 100644 index 0000000000..0240fd95c7 --- /dev/null +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -0,0 +1,651 @@ +package spark + +import java.io.EOFException +import java.io.ObjectInputStream +import java.net.URL +import java.util.{Date, HashMap => JHashMap} +import java.util.concurrent.atomic.AtomicLong +import java.text.SimpleDateFormat + +import scala.collection.Map +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.HashMap +import scala.collection.JavaConversions._ + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.BytesWritable +import org.apache.hadoop.io.NullWritable +import org.apache.hadoop.io.Text +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapred.FileOutputCommitter +import org.apache.hadoop.mapred.FileOutputFormat +import org.apache.hadoop.mapred.HadoopWriter +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapred.OutputCommitter +import org.apache.hadoop.mapred.OutputFormat +import org.apache.hadoop.mapred.SequenceFileOutputFormat +import org.apache.hadoop.mapred.TextOutputFormat + +import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat} +import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} +import org.apache.hadoop.mapreduce.{RecordWriter => NewRecordWriter} +import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob} +import org.apache.hadoop.mapreduce.TaskAttemptID +import org.apache.hadoop.mapreduce.TaskAttemptContext + +import spark.partial.BoundedDouble +import spark.partial.PartialResult +import spark.rdd._ +import spark.SparkContext._ + +/** + * Extra functions available on RDDs of (key, value) pairs through an implicit conversion. + * Import `spark.SparkContext._` at the top of your program to use these functions. + */ +class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( + self: RDD[(K, V)]) + extends Logging + with Serializable { + + /** + * Generic function to combine the elements for each key using a custom set of aggregation + * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C + * Note that V and C can be different -- for example, one might group an RDD of type + * (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions: + * + * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list) + * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list) + * - `mergeCombiners`, to combine two C's into a single one. + * + * In addition, users can control the partitioning of the output RDD, and whether to perform + * map-side aggregation (if a mapper can produce multiple items with the same key). + */ + def combineByKey[C](createCombiner: V => C, + mergeValue: (C, V) => C, + mergeCombiners: (C, C) => C, + partitioner: Partitioner, + mapSideCombine: Boolean = true): RDD[(K, C)] = { + val aggregator = + if (mapSideCombine) { + new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) + } else { + // Don't apply map-side combiner. + // A sanity check to make sure mergeCombiners is not defined. + assert(mergeCombiners == null) + new Aggregator[K, V, C](createCombiner, mergeValue, null, false) + } + new ShuffledAggregatedRDD(self, aggregator, partitioner) + } + + /** + * Simplified version of combineByKey that hash-partitions the output RDD. + */ + def combineByKey[C](createCombiner: V => C, + mergeValue: (C, V) => C, + mergeCombiners: (C, C) => C, + numSplits: Int): RDD[(K, C)] = { + combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numSplits)) + } + + /** + * Merge the values for each key using an associative reduce function. This will also perform + * the merging locally on each mapper before sending results to a reducer, similarly to a + * "combiner" in MapReduce. + */ + def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = { + combineByKey[V]((v: V) => v, func, func, partitioner) + } + + /** + * Merge the values for each key using an associative reduce function, but return the results + * immediately to the master as a Map. This will also perform the merging locally on each mapper + * before sending results to a reducer, similarly to a "combiner" in MapReduce. + */ + def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = { + def reducePartition(iter: Iterator[(K, V)]): Iterator[JHashMap[K, V]] = { + val map = new JHashMap[K, V] + for ((k, v) <- iter) { + val old = map.get(k) + map.put(k, if (old == null) v else func(old, v)) + } + Iterator(map) + } + + def mergeMaps(m1: JHashMap[K, V], m2: JHashMap[K, V]): JHashMap[K, V] = { + for ((k, v) <- m2) { + val old = m1.get(k) + m1.put(k, if (old == null) v else func(old, v)) + } + return m1 + } + + self.mapPartitions(reducePartition).reduce(mergeMaps) + } + + /** Alias for reduceByKeyLocally */ + def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = reduceByKeyLocally(func) + + /** Count the number of elements for each key, and return the result to the master as a Map. */ + def countByKey(): Map[K, Long] = self.map(_._1).countByValue() + + /** + * (Experimental) Approximate version of countByKey that can return a partial result if it does + * not finish within a timeout. + */ + def countByKeyApprox(timeout: Long, confidence: Double = 0.95) + : PartialResult[Map[K, BoundedDouble]] = { + self.map(_._1).countByValueApprox(timeout, confidence) + } + + /** + * Merge the values for each key using an associative reduce function. This will also perform + * the merging locally on each mapper before sending results to a reducer, similarly to a + * "combiner" in MapReduce. Output will be hash-partitioned with numSplits splits. + */ + def reduceByKey(func: (V, V) => V, numSplits: Int): RDD[(K, V)] = { + reduceByKey(new HashPartitioner(numSplits), func) + } + + /** + * Group the values for each key in the RDD into a single sequence. Allows controlling the + * partitioning of the resulting key-value pair RDD by passing a Partitioner. + */ + def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = { + def createCombiner(v: V) = ArrayBuffer(v) + def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v + def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2 + val bufs = combineByKey[ArrayBuffer[V]]( + createCombiner _, mergeValue _, mergeCombiners _, partitioner) + bufs.asInstanceOf[RDD[(K, Seq[V])]] + } + + /** + * Group the values for each key in the RDD into a single sequence. Hash-partitions the + * resulting RDD with into `numSplits` partitions. + */ + def groupByKey(numSplits: Int): RDD[(K, Seq[V])] = { + groupByKey(new HashPartitioner(numSplits)) + } + + /** + * Return a copy of the RDD partitioned using the specified partitioner. If `mapSideCombine` + * is true, Spark will group values of the same key together on the map side before the + * repartitioning, to only send each key over the network once. If a large number of + * duplicated keys are expected, and the size of the keys are large, `mapSideCombine` should + * be set to true. + */ + def partitionBy(partitioner: Partitioner, mapSideCombine: Boolean = false): RDD[(K, V)] = { + if (mapSideCombine) { + def createCombiner(v: V) = ArrayBuffer(v) + def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v + def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2 + val bufs = combineByKey[ArrayBuffer[V]]( + createCombiner _, mergeValue _, mergeCombiners _, partitioner) + bufs.flatMapValues(buf => buf) + } else { + new RepartitionShuffledRDD(self, partitioner) + } + } + + /** + * Merge the values for each key using an associative reduce function. This will also perform + * the merging locally on each mapper before sending results to a reducer, similarly to a + * "combiner" in MapReduce. + */ + def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = { + this.cogroup(other, partitioner).flatMapValues { + case (vs, ws) => + for (v <- vs.iterator; w <- ws.iterator) yield (v, w) + } + } + + /** + * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the + * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to + * partition the output RDD. + */ + def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = { + this.cogroup(other, partitioner).flatMapValues { + case (vs, ws) => + if (ws.isEmpty) { + vs.iterator.map(v => (v, None)) + } else { + for (v <- vs.iterator; w <- ws.iterator) yield (v, Some(w)) + } + } + } + + /** + * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the + * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the + * pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to + * partition the output RDD. + */ + def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) + : RDD[(K, (Option[V], W))] = { + this.cogroup(other, partitioner).flatMapValues { + case (vs, ws) => + if (vs.isEmpty) { + ws.iterator.map(w => (None, w)) + } else { + for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), w) + } + } + } + + /** + * Simplified version of combineByKey that hash-partitions the resulting RDD using the default + * parallelism level. + */ + def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) + : RDD[(K, C)] = { + combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self)) + } + + /** + * Merge the values for each key using an associative reduce function. This will also perform + * the merging locally on each mapper before sending results to a reducer, similarly to a + * "combiner" in MapReduce. Output will be hash-partitioned with the default parallelism level. + */ + def reduceByKey(func: (V, V) => V): RDD[(K, V)] = { + reduceByKey(defaultPartitioner(self), func) + } + + /** + * Group the values for each key in the RDD into a single sequence. Hash-partitions the + * resulting RDD with the default parallelism level. + */ + def groupByKey(): RDD[(K, Seq[V])] = { + groupByKey(defaultPartitioner(self)) + } + + /** + * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each + * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and + * (k, v2) is in `other`. Performs a hash join across the cluster. + */ + def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = { + join(other, defaultPartitioner(self, other)) + } + + /** + * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each + * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and + * (k, v2) is in `other`. Performs a hash join across the cluster. + */ + def join[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, W))] = { + join(other, new HashPartitioner(numSplits)) + } + + /** + * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the + * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output + * using the default level of parallelism. + */ + def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = { + leftOuterJoin(other, defaultPartitioner(self, other)) + } + + /** + * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the + * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output + * into `numSplits` partitions. + */ + def leftOuterJoin[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, Option[W]))] = { + leftOuterJoin(other, new HashPartitioner(numSplits)) + } + + /** + * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the + * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the + * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting + * RDD using the default parallelism level. + */ + def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = { + rightOuterJoin(other, defaultPartitioner(self, other)) + } + + /** + * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the + * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the + * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting + * RDD into the given number of partitions. + */ + def rightOuterJoin[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (Option[V], W))] = { + rightOuterJoin(other, new HashPartitioner(numSplits)) + } + + /** + * Return the key-value pairs in this RDD to the master as a Map. + */ + def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*) + + /** + * Pass each value in the key-value pair RDD through a map function without changing the keys; + * this also retains the original RDD's partitioning. + */ + def mapValues[U](f: V => U): RDD[(K, U)] = { + val cleanF = self.context.clean(f) + new MappedValuesRDD(self, cleanF) + } + + /** + * Pass each value in the key-value pair RDD through a flatMap function without changing the + * keys; this also retains the original RDD's partitioning. + */ + def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = { + val cleanF = self.context.clean(f) + new FlatMappedValuesRDD(self, cleanF) + } + + /** + * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the + * list of values for that key in `this` as well as `other`. + */ + def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = { + val cg = new CoGroupedRDD[K]( + Seq(self.asInstanceOf[RDD[(_, _)]], other.asInstanceOf[RDD[(_, _)]]), + partitioner) + val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest) + prfs.mapValues { + case Seq(vs, ws) => + (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]]) + } + } + + /** + * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a + * tuple with the list of values for that key in `this`, `other1` and `other2`. + */ + def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner) + : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { + val cg = new CoGroupedRDD[K]( + Seq(self.asInstanceOf[RDD[(_, _)]], + other1.asInstanceOf[RDD[(_, _)]], + other2.asInstanceOf[RDD[(_, _)]]), + partitioner) + val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest) + prfs.mapValues { + case Seq(vs, w1s, w2s) => + (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]]) + } + } + + /** + * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the + * list of values for that key in `this` as well as `other`. + */ + def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { + cogroup(other, defaultPartitioner(self, other)) + } + + /** + * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a + * tuple with the list of values for that key in `this`, `other1` and `other2`. + */ + def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) + : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { + cogroup(other1, other2, defaultPartitioner(self, other1, other2)) + } + + /** + * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the + * list of values for that key in `this` as well as `other`. + */ + def cogroup[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (Seq[V], Seq[W]))] = { + cogroup(other, new HashPartitioner(numSplits)) + } + + /** + * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a + * tuple with the list of values for that key in `this`, `other1` and `other2`. + */ + def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numSplits: Int) + : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { + cogroup(other1, other2, new HashPartitioner(numSplits)) + } + + /** Alias for cogroup. */ + def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { + cogroup(other, defaultPartitioner(self, other)) + } + + /** Alias for cogroup. */ + def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) + : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { + cogroup(other1, other2, defaultPartitioner(self, other1, other2)) + } + + /** + * Choose a partitioner to use for a cogroup-like operation between a number of RDDs. If any of + * the RDDs already has a partitioner, choose that one, otherwise use a default HashPartitioner. + */ + def defaultPartitioner(rdds: RDD[_]*): Partitioner = { + for (r <- rdds if r.partitioner != None) { + return r.partitioner.get + } + return new HashPartitioner(self.context.defaultParallelism) + } + + /** + * Return the list of values in the RDD for key `key`. This operation is done efficiently if the + * RDD has a known partitioner by only searching the partition that the key maps to. + */ + def lookup(key: K): Seq[V] = { + self.partitioner match { + case Some(p) => + val index = p.getPartition(key) + def process(it: Iterator[(K, V)]): Seq[V] = { + val buf = new ArrayBuffer[V] + for ((k, v) <- it if k == key) { + buf += v + } + buf + } + val res = self.context.runJob(self, process _, Array(index), false) + res(0) + case None => + throw new UnsupportedOperationException("lookup() called on an RDD without a partitioner") + } + } + + /** + * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class + * supporting the key and value types K and V in this RDD. + */ + def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) { + saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]]) + } + + /** + * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat` + * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD. + */ + def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) { + saveAsNewAPIHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]]) + } + + /** + * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat` + * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD. + */ + def saveAsNewAPIHadoopFile( + path: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: NewOutputFormat[_, _]]) { + saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, new Configuration) + } + + /** + * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat` + * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD. + */ + def saveAsNewAPIHadoopFile( + path: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: NewOutputFormat[_, _]], + conf: Configuration) { + val job = new NewAPIHadoopJob(conf) + job.setOutputKeyClass(keyClass) + job.setOutputValueClass(valueClass) + val wrappedConf = new SerializableWritable(job.getConfiguration) + NewFileOutputFormat.setOutputPath(job, new Path(path)) + val formatter = new SimpleDateFormat("yyyyMMddHHmm") + val jobtrackerID = formatter.format(new Date()) + val stageId = self.id + def writeShard(context: spark.TaskContext, iter: Iterator[(K,V)]): Int = { + // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it + // around by taking a mod. We expect that no task will be attempted 2 billion times. + val attemptNumber = (context.attemptId % Int.MaxValue).toInt + /* "reduce task" */ + val attemptId = new TaskAttemptID(jobtrackerID, + stageId, false, context.splitId, attemptNumber) + val hadoopContext = new TaskAttemptContext(wrappedConf.value, attemptId) + val format = outputFormatClass.newInstance + val committer = format.getOutputCommitter(hadoopContext) + committer.setupTask(hadoopContext) + val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]] + while (iter.hasNext) { + val (k, v) = iter.next + writer.write(k, v) + } + writer.close(hadoopContext) + committer.commitTask(hadoopContext) + return 1 + } + val jobFormat = outputFormatClass.newInstance + /* apparently we need a TaskAttemptID to construct an OutputCommitter; + * however we're only going to use this local OutputCommitter for + * setupJob/commitJob, so we just use a dummy "map" task. + */ + val jobAttemptId = new TaskAttemptID(jobtrackerID, stageId, true, 0, 0) + val jobTaskContext = new TaskAttemptContext(wrappedConf.value, jobAttemptId) + val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext) + jobCommitter.setupJob(jobTaskContext) + val count = self.context.runJob(self, writeShard _).sum + jobCommitter.cleanupJob(jobTaskContext) + } + + /** + * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class + * supporting the key and value types K and V in this RDD. + */ + def saveAsHadoopFile( + path: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[_ <: OutputFormat[_, _]], + conf: JobConf = new JobConf) { + conf.setOutputKeyClass(keyClass) + conf.setOutputValueClass(valueClass) + // conf.setOutputFormat(outputFormatClass) // Doesn't work in Scala 2.9 due to what may be a generics bug + conf.set("mapred.output.format.class", outputFormatClass.getName) + conf.setOutputCommitter(classOf[FileOutputCommitter]) + FileOutputFormat.setOutputPath(conf, HadoopWriter.createPathFromString(path, conf)) + saveAsHadoopDataset(conf) + } + + /** + * Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for + * that storage system. The JobConf should set an OutputFormat and any output paths required + * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop + * MapReduce job. + */ + def saveAsHadoopDataset(conf: JobConf) { + val outputFormatClass = conf.getOutputFormat + val keyClass = conf.getOutputKeyClass + val valueClass = conf.getOutputValueClass + if (outputFormatClass == null) { + throw new SparkException("Output format class not set") + } + if (keyClass == null) { + throw new SparkException("Output key class not set") + } + if (valueClass == null) { + throw new SparkException("Output value class not set") + } + + logInfo("Saving as hadoop file of type (" + keyClass.getSimpleName+ ", " + valueClass.getSimpleName+ ")") + + val writer = new HadoopWriter(conf) + writer.preSetup() + + def writeToFile(context: TaskContext, iter: Iterator[(K,V)]) { + // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it + // around by taking a mod. We expect that no task will be attempted 2 billion times. + val attemptNumber = (context.attemptId % Int.MaxValue).toInt + + writer.setup(context.stageId, context.splitId, attemptNumber) + writer.open() + + var count = 0 + while(iter.hasNext) { + val record = iter.next + count += 1 + writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef]) + } + + writer.close() + writer.commit() + } + + self.context.runJob(self, writeToFile _) + writer.cleanup() + } + + private[spark] def getKeyClass() = implicitly[ClassManifest[K]].erasure + + private[spark] def getValueClass() = implicitly[ClassManifest[V]].erasure +} + +/** + * Extra functions available on RDDs of (key, value) pairs where the key is sortable through + * an implicit conversion. Import `spark.SparkContext._` at the top of your program to use these + * functions. They will work with any key type that has a `scala.math.Ordered` implementation. + */ +class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest]( + self: RDD[(K, V)]) + extends Logging + with Serializable { + + /** + * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling + * `collect` or `save` on the resulting RDD will return or output an ordered list of records + * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in + * order of the keys). + */ + def sortByKey(ascending: Boolean = true, numSplits: Int = self.splits.size): RDD[(K,V)] = { + new ShuffledSortedRDD(self, ascending, numSplits) + } +} + +private[spark] +class MappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => U) extends RDD[(K, U)](prev.context) { + override def splits = prev.splits + override val dependencies = List(new OneToOneDependency(prev)) + override val partitioner = prev.partitioner + override def compute(split: Split) = prev.iterator(split).map{case (k, v) => (k, f(v))} +} + +private[spark] +class FlatMappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => TraversableOnce[U]) + extends RDD[(K, U)](prev.context) { + + override def splits = prev.splits + override val dependencies = List(new OneToOneDependency(prev)) + override val partitioner = prev.partitioner + + override def compute(split: Split) = { + prev.iterator(split).flatMap { case (k, v) => f(v).map(x => (k, x)) } + } +} + +private[spark] object Manifests { + val seqSeqManifest = classManifest[Seq[Seq[_]]] +} diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 17869fb31b..984738ef73 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -378,7 +378,8 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial } /** - * Approximate version of count() that returns a potentially incomplete result after a timeout. + * (Experimental) Approximate version of count() that returns a potentially incomplete result + * within a timeout, even if not all tasks have finished. */ def countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = { val countElements: (TaskContext, Iterator[T]) => Long = { (ctx, iter) => @@ -394,13 +395,11 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial } /** - * Return the count of each unique value in this RDD as a map of - * (value, count) pairs. The final combine step happens locally on the - * master, equivalent to running a single reduce task. - * - * TODO: This should perhaps be distributed by default. + * Return the count of each unique value in this RDD as a map of (value, count) pairs. The final + * combine step happens locally on the master, equivalent to running a single reduce task. */ def countByValue(): Map[T, Long] = { + // TODO: This should perhaps be distributed by default. def countPartition(iter: Iterator[T]): Iterator[OLMap[T]] = { val map = new OLMap[T] while (iter.hasNext) { @@ -422,7 +421,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial } /** - * Approximate version of countByValue(). + * (Experimental) Approximate version of countByValue(). */ def countByValueApprox( timeout: Long, diff --git a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala new file mode 100644 index 0000000000..a34aee69c1 --- /dev/null +++ b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala @@ -0,0 +1,79 @@ +package spark + +import java.io.EOFException +import java.net.URL +import java.io.ObjectInputStream +import java.util.concurrent.atomic.AtomicLong +import java.util.HashSet +import java.util.Random +import java.util.Date + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.Map +import scala.collection.mutable.HashMap + +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapred.OutputFormat +import org.apache.hadoop.mapred.TextOutputFormat +import org.apache.hadoop.mapred.SequenceFileOutputFormat +import org.apache.hadoop.mapred.OutputCommitter +import org.apache.hadoop.mapred.FileOutputCommitter +import org.apache.hadoop.io.Writable +import org.apache.hadoop.io.NullWritable +import org.apache.hadoop.io.BytesWritable +import org.apache.hadoop.io.Text + +import spark.SparkContext._ + +/** + * Extra functions available on RDDs of (key, value) pairs to create a Hadoop SequenceFile, + * through an implicit conversion. Note that this can't be part of PairRDDFunctions because + * we need more implicit parameters to convert our keys and values to Writable. + * + * Users should import `spark.SparkContext._` at the top of their program to use these functions. + */ +class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : ClassManifest]( + self: RDD[(K, V)]) + extends Logging + with Serializable { + + private def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = { + val c = { + if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) { + classManifest[T].erasure + } else { + implicitly[T => Writable].getClass.getMethods()(0).getReturnType + } + // TODO: use something like WritableConverter to avoid reflection + } + c.asInstanceOf[Class[_ <: Writable]] + } + + /** + * Output the RDD as a Hadoop SequenceFile using the Writable types we infer from the RDD's key + * and value types. If the key or value are Writable, then we use their classes directly; + * otherwise we map primitive types such as Int and Double to IntWritable, DoubleWritable, etc, + * byte arrays to BytesWritable, and Strings to Text. The `path` can be on any Hadoop-supported + * file system. + */ + def saveAsSequenceFile(path: String) { + def anyToWritable[U <% Writable](u: U): Writable = u + + val keyClass = getWritableClass[K] + val valueClass = getWritableClass[V] + val convertKey = !classOf[Writable].isAssignableFrom(self.getKeyClass) + val convertValue = !classOf[Writable].isAssignableFrom(self.getValueClass) + + logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" ) + val format = classOf[SequenceFileOutputFormat[Writable, Writable]] + if (!convertKey && !convertValue) { + self.saveAsHadoopFile(path, keyClass, valueClass, format) + } else if (!convertKey && convertValue) { + self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format) + } else if (convertKey && !convertValue) { + self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(path, keyClass, valueClass, format) + } else if (convertKey && convertValue) { + self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format) + } + } +} diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 02a08778c3..8739c8bb6d 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -35,12 +35,8 @@ import spark.broadcast._ import spark.deploy.LocalSparkCluster import spark.partial.ApproximateEvaluator import spark.partial.PartialResult -import spark.rdd.DoubleRDDFunctions import spark.rdd.HadoopRDD import spark.rdd.NewHadoopRDD -import spark.rdd.OrderedRDDFunctions -import spark.rdd.PairRDDFunctions -import spark.rdd.SequenceFileRDDFunctions import spark.rdd.UnionRDD import spark.scheduler.ShuffleMapTask import spark.scheduler.DAGScheduler diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala index 3c4399493c..d361de8f8f 100644 --- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala @@ -15,7 +15,7 @@ import spark.api.java.function.{Function2 => JFunction2} import spark.api.java.function.{Function => JFunction} import spark.partial.BoundedDouble import spark.partial.PartialResult -import spark.rdd.OrderedRDDFunctions +import spark.OrderedRDDFunctions import spark.storage.StorageLevel import spark.HashPartitioner import spark.Partitioner @@ -279,4 +279,4 @@ object JavaPairRDD { new JavaPairRDD[K, V](rdd) implicit def toRDD[K, V](rdd: JavaPairRDD[K, V]): RDD[(K, V)] = rdd.rdd -} \ No newline at end of file +} diff --git a/core/src/main/scala/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/spark/rdd/DoubleRDDFunctions.scala deleted file mode 100644 index d232ddeb7c..0000000000 --- a/core/src/main/scala/spark/rdd/DoubleRDDFunctions.scala +++ /dev/null @@ -1,42 +0,0 @@ -package spark.rdd - -import spark.partial.BoundedDouble -import spark.partial.MeanEvaluator -import spark.partial.PartialResult -import spark.partial.SumEvaluator - -import spark.Logging -import spark.RDD -import spark.TaskContext -import spark.util.StatCounter - -/** - * Extra functions available on RDDs of Doubles through an implicit conversion. - */ -class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable { - def sum(): Double = { - self.reduce(_ + _) - } - - def stats(): StatCounter = { - self.mapPartitions(nums => Iterator(StatCounter(nums))).reduce((a, b) => a.merge(b)) - } - - def mean(): Double = stats().mean - - def variance(): Double = stats().variance - - def stdev(): Double = stats().stdev - - def meanApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = { - val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns) - val evaluator = new MeanEvaluator(self.splits.size, confidence) - self.context.runApproximateJob(self, processPartition, evaluator, timeout) - } - - def sumApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = { - val processPartition = (ctx: TaskContext, ns: Iterator[Double]) => StatCounter(ns) - val evaluator = new SumEvaluator(self.splits.size, confidence) - self.context.runApproximateJob(self, processPartition, evaluator, timeout) - } -} diff --git a/core/src/main/scala/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/spark/rdd/PairRDDFunctions.scala deleted file mode 100644 index 2a94ea263a..0000000000 --- a/core/src/main/scala/spark/rdd/PairRDDFunctions.scala +++ /dev/null @@ -1,475 +0,0 @@ -package spark.rdd - -import java.io.EOFException -import java.io.ObjectInputStream -import java.net.URL -import java.util.{Date, HashMap => JHashMap} -import java.util.concurrent.atomic.AtomicLong -import java.text.SimpleDateFormat - -import scala.collection.Map -import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.HashMap -import scala.collection.JavaConversions._ - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.apache.hadoop.io.BytesWritable -import org.apache.hadoop.io.NullWritable -import org.apache.hadoop.io.Text -import org.apache.hadoop.io.Writable -import org.apache.hadoop.mapred.FileOutputCommitter -import org.apache.hadoop.mapred.FileOutputFormat -import org.apache.hadoop.mapred.HadoopWriter -import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.mapred.OutputCommitter -import org.apache.hadoop.mapred.OutputFormat -import org.apache.hadoop.mapred.SequenceFileOutputFormat -import org.apache.hadoop.mapred.TextOutputFormat - -import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat} -import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} -import org.apache.hadoop.mapreduce.{RecordWriter => NewRecordWriter} -import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob} -import org.apache.hadoop.mapreduce.TaskAttemptID -import org.apache.hadoop.mapreduce.TaskAttemptContext - -import spark.partial.BoundedDouble -import spark.partial.PartialResult -import spark.Aggregator -import spark.HashPartitioner -import spark.Logging -import spark.OneToOneDependency -import spark.Partitioner -import spark.RangePartitioner -import spark.RDD -import spark.SerializableWritable -import spark.SparkContext._ -import spark.SparkException -import spark.Split -import spark.TaskContext - -/** - * Extra functions available on RDDs of (key, value) pairs through an implicit conversion. - */ -class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( - self: RDD[(K, V)]) - extends Logging - with Serializable { - - def combineByKey[C](createCombiner: V => C, - mergeValue: (C, V) => C, - mergeCombiners: (C, C) => C, - partitioner: Partitioner, - mapSideCombine: Boolean = true): RDD[(K, C)] = { - val aggregator = - if (mapSideCombine) { - new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) - } else { - // Don't apply map-side combiner. - // A sanity check to make sure mergeCombiners is not defined. - assert(mergeCombiners == null) - new Aggregator[K, V, C](createCombiner, mergeValue, null, false) - } - new ShuffledAggregatedRDD(self, aggregator, partitioner) - } - - def combineByKey[C](createCombiner: V => C, - mergeValue: (C, V) => C, - mergeCombiners: (C, C) => C, - numSplits: Int): RDD[(K, C)] = { - combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numSplits)) - } - - def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = { - combineByKey[V]((v: V) => v, func, func, partitioner) - } - - def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = { - def reducePartition(iter: Iterator[(K, V)]): Iterator[JHashMap[K, V]] = { - val map = new JHashMap[K, V] - for ((k, v) <- iter) { - val old = map.get(k) - map.put(k, if (old == null) v else func(old, v)) - } - Iterator(map) - } - - def mergeMaps(m1: JHashMap[K, V], m2: JHashMap[K, V]): JHashMap[K, V] = { - for ((k, v) <- m2) { - val old = m1.get(k) - m1.put(k, if (old == null) v else func(old, v)) - } - return m1 - } - - self.mapPartitions(reducePartition).reduce(mergeMaps) - } - - // Alias for backwards compatibility - def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = reduceByKeyLocally(func) - - // TODO: This should probably be a distributed version - def countByKey(): Map[K, Long] = self.map(_._1).countByValue() - - // TODO: This should probably be a distributed version - def countByKeyApprox(timeout: Long, confidence: Double = 0.95) - : PartialResult[Map[K, BoundedDouble]] = { - self.map(_._1).countByValueApprox(timeout, confidence) - } - - def reduceByKey(func: (V, V) => V, numSplits: Int): RDD[(K, V)] = { - reduceByKey(new HashPartitioner(numSplits), func) - } - - def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = { - def createCombiner(v: V) = ArrayBuffer(v) - def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v - def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2 - val bufs = combineByKey[ArrayBuffer[V]]( - createCombiner _, mergeValue _, mergeCombiners _, partitioner) - bufs.asInstanceOf[RDD[(K, Seq[V])]] - } - - def groupByKey(numSplits: Int): RDD[(K, Seq[V])] = { - groupByKey(new HashPartitioner(numSplits)) - } - - /** - * Repartition the RDD using the specified partitioner. If mapSideCombine is - * true, Spark will group values of the same key together on the map side - * before the repartitioning. If a large number of duplicated keys are - * expected, and the size of the keys are large, mapSideCombine should be set - * to true. - */ - def partitionBy(partitioner: Partitioner, mapSideCombine: Boolean = false): RDD[(K, V)] = { - if (mapSideCombine) { - def createCombiner(v: V) = ArrayBuffer(v) - def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v - def mergeCombiners(b1: ArrayBuffer[V], b2: ArrayBuffer[V]) = b1 ++= b2 - val bufs = combineByKey[ArrayBuffer[V]]( - createCombiner _, mergeValue _, mergeCombiners _, partitioner) - bufs.flatMapValues(buf => buf) - } else { - new RepartitionShuffledRDD(self, partitioner) - } - } - - def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = { - this.cogroup(other, partitioner).flatMapValues { - case (vs, ws) => - for (v <- vs.iterator; w <- ws.iterator) yield (v, w) - } - } - - def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = { - this.cogroup(other, partitioner).flatMapValues { - case (vs, ws) => - if (ws.isEmpty) { - vs.iterator.map(v => (v, None)) - } else { - for (v <- vs.iterator; w <- ws.iterator) yield (v, Some(w)) - } - } - } - - def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) - : RDD[(K, (Option[V], W))] = { - this.cogroup(other, partitioner).flatMapValues { - case (vs, ws) => - if (vs.isEmpty) { - ws.iterator.map(w => (None, w)) - } else { - for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), w) - } - } - } - - def combineByKey[C](createCombiner: V => C, - mergeValue: (C, V) => C, - mergeCombiners: (C, C) => C) : RDD[(K, C)] = { - combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self)) - } - - def reduceByKey(func: (V, V) => V): RDD[(K, V)] = { - reduceByKey(defaultPartitioner(self), func) - } - - def groupByKey(): RDD[(K, Seq[V])] = { - groupByKey(defaultPartitioner(self)) - } - - def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = { - join(other, defaultPartitioner(self, other)) - } - - def join[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, W))] = { - join(other, new HashPartitioner(numSplits)) - } - - def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = { - leftOuterJoin(other, defaultPartitioner(self, other)) - } - - def leftOuterJoin[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (V, Option[W]))] = { - leftOuterJoin(other, new HashPartitioner(numSplits)) - } - - def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = { - rightOuterJoin(other, defaultPartitioner(self, other)) - } - - def rightOuterJoin[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (Option[V], W))] = { - rightOuterJoin(other, new HashPartitioner(numSplits)) - } - - def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*) - - def mapValues[U](f: V => U): RDD[(K, U)] = { - val cleanF = self.context.clean(f) - new MappedValuesRDD(self, cleanF) - } - - def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = { - val cleanF = self.context.clean(f) - new FlatMappedValuesRDD(self, cleanF) - } - - def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = { - val cg = new CoGroupedRDD[K]( - Seq(self.asInstanceOf[RDD[(_, _)]], other.asInstanceOf[RDD[(_, _)]]), - partitioner) - val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest) - prfs.mapValues { - case Seq(vs, ws) => - (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]]) - } - } - - def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner) - : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { - val cg = new CoGroupedRDD[K]( - Seq(self.asInstanceOf[RDD[(_, _)]], - other1.asInstanceOf[RDD[(_, _)]], - other2.asInstanceOf[RDD[(_, _)]]), - partitioner) - val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest) - prfs.mapValues { - case Seq(vs, w1s, w2s) => - (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]]) - } - } - - def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { - cogroup(other, defaultPartitioner(self, other)) - } - - def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) - : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { - cogroup(other1, other2, defaultPartitioner(self, other1, other2)) - } - - def cogroup[W](other: RDD[(K, W)], numSplits: Int): RDD[(K, (Seq[V], Seq[W]))] = { - cogroup(other, new HashPartitioner(numSplits)) - } - - def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numSplits: Int) - : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { - cogroup(other1, other2, new HashPartitioner(numSplits)) - } - - def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { - cogroup(other, defaultPartitioner(self, other)) - } - - def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) - : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { - cogroup(other1, other2, defaultPartitioner(self, other1, other2)) - } - - /** - * Choose a partitioner to use for a cogroup-like operation between a number of RDDs. If any of - * the RDDs already has a partitioner, choose that one, otherwise use a default HashPartitioner. - */ - def defaultPartitioner(rdds: RDD[_]*): Partitioner = { - for (r <- rdds if r.partitioner != None) { - return r.partitioner.get - } - return new HashPartitioner(self.context.defaultParallelism) - } - - def lookup(key: K): Seq[V] = { - self.partitioner match { - case Some(p) => - val index = p.getPartition(key) - def process(it: Iterator[(K, V)]): Seq[V] = { - val buf = new ArrayBuffer[V] - for ((k, v) <- it if k == key) { - buf += v - } - buf - } - val res = self.context.runJob(self, process _, Array(index), false) - res(0) - case None => - throw new UnsupportedOperationException("lookup() called on an RDD without a partitioner") - } - } - - def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) { - saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]]) - } - - def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) { - saveAsNewAPIHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]]) - } - - def saveAsNewAPIHadoopFile( - path: String, - keyClass: Class[_], - valueClass: Class[_], - outputFormatClass: Class[_ <: NewOutputFormat[_, _]]) { - saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, new Configuration) - } - - def saveAsNewAPIHadoopFile( - path: String, - keyClass: Class[_], - valueClass: Class[_], - outputFormatClass: Class[_ <: NewOutputFormat[_, _]], - conf: Configuration) { - val job = new NewAPIHadoopJob(conf) - job.setOutputKeyClass(keyClass) - job.setOutputValueClass(valueClass) - val wrappedConf = new SerializableWritable(job.getConfiguration) - NewFileOutputFormat.setOutputPath(job, new Path(path)) - val formatter = new SimpleDateFormat("yyyyMMddHHmm") - val jobtrackerID = formatter.format(new Date()) - val stageId = self.id - def writeShard(context: spark.TaskContext, iter: Iterator[(K,V)]): Int = { - // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it - // around by taking a mod. We expect that no task will be attempted 2 billion times. - val attemptNumber = (context.attemptId % Int.MaxValue).toInt - /* "reduce task" */ - val attemptId = new TaskAttemptID(jobtrackerID, - stageId, false, context.splitId, attemptNumber) - val hadoopContext = new TaskAttemptContext(wrappedConf.value, attemptId) - val format = outputFormatClass.newInstance - val committer = format.getOutputCommitter(hadoopContext) - committer.setupTask(hadoopContext) - val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]] - while (iter.hasNext) { - val (k, v) = iter.next - writer.write(k, v) - } - writer.close(hadoopContext) - committer.commitTask(hadoopContext) - return 1 - } - val jobFormat = outputFormatClass.newInstance - /* apparently we need a TaskAttemptID to construct an OutputCommitter; - * however we're only going to use this local OutputCommitter for - * setupJob/commitJob, so we just use a dummy "map" task. - */ - val jobAttemptId = new TaskAttemptID(jobtrackerID, stageId, true, 0, 0) - val jobTaskContext = new TaskAttemptContext(wrappedConf.value, jobAttemptId) - val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext) - jobCommitter.setupJob(jobTaskContext) - val count = self.context.runJob(self, writeShard _).sum - jobCommitter.cleanupJob(jobTaskContext) - } - - def saveAsHadoopFile( - path: String, - keyClass: Class[_], - valueClass: Class[_], - outputFormatClass: Class[_ <: OutputFormat[_, _]], - conf: JobConf = new JobConf) { - conf.setOutputKeyClass(keyClass) - conf.setOutputValueClass(valueClass) - // conf.setOutputFormat(outputFormatClass) // Doesn't work in Scala 2.9 due to what may be a generics bug - conf.set("mapred.output.format.class", outputFormatClass.getName) - conf.setOutputCommitter(classOf[FileOutputCommitter]) - FileOutputFormat.setOutputPath(conf, HadoopWriter.createPathFromString(path, conf)) - saveAsHadoopDataset(conf) - } - - def saveAsHadoopDataset(conf: JobConf) { - val outputFormatClass = conf.getOutputFormat - val keyClass = conf.getOutputKeyClass - val valueClass = conf.getOutputValueClass - if (outputFormatClass == null) { - throw new SparkException("Output format class not set") - } - if (keyClass == null) { - throw new SparkException("Output key class not set") - } - if (valueClass == null) { - throw new SparkException("Output value class not set") - } - - logInfo("Saving as hadoop file of type (" + keyClass.getSimpleName+ ", " + valueClass.getSimpleName+ ")") - - val writer = new HadoopWriter(conf) - writer.preSetup() - - def writeToFile(context: TaskContext, iter: Iterator[(K,V)]) { - // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it - // around by taking a mod. We expect that no task will be attempted 2 billion times. - val attemptNumber = (context.attemptId % Int.MaxValue).toInt - - writer.setup(context.stageId, context.splitId, attemptNumber) - writer.open() - - var count = 0 - while(iter.hasNext) { - val record = iter.next - count += 1 - writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef]) - } - - writer.close() - writer.commit() - } - - self.context.runJob(self, writeToFile _) - writer.cleanup() - } - - def getKeyClass() = implicitly[ClassManifest[K]].erasure - - def getValueClass() = implicitly[ClassManifest[V]].erasure -} - -class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest]( - self: RDD[(K, V)]) - extends Logging - with Serializable { - - def sortByKey(ascending: Boolean = true, numSplits: Int = self.splits.size): RDD[(K,V)] = { - new ShuffledSortedRDD(self, ascending, numSplits) - } -} - -class MappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => U) extends RDD[(K, U)](prev.context) { - override def splits = prev.splits - override val dependencies = List(new OneToOneDependency(prev)) - override val partitioner = prev.partitioner - override def compute(split: Split) = prev.iterator(split).map{case (k, v) => (k, f(v))} -} - -class FlatMappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => TraversableOnce[U]) - extends RDD[(K, U)](prev.context) { - - override def splits = prev.splits - override val dependencies = List(new OneToOneDependency(prev)) - override val partitioner = prev.partitioner - - override def compute(split: Split) = { - prev.iterator(split).flatMap { case (k, v) => f(v).map(x => (k, x)) } - } -} - -private[spark] object Manifests { - val seqSeqManifest = classManifest[Seq[Seq[_]]] -} diff --git a/core/src/main/scala/spark/rdd/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/rdd/SequenceFileRDDFunctions.scala deleted file mode 100644 index 24c731fa92..0000000000 --- a/core/src/main/scala/spark/rdd/SequenceFileRDDFunctions.scala +++ /dev/null @@ -1,72 +0,0 @@ -package spark.rdd - -import java.io.EOFException -import java.net.URL -import java.io.ObjectInputStream -import java.util.concurrent.atomic.AtomicLong -import java.util.HashSet -import java.util.Random -import java.util.Date - -import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.Map -import scala.collection.mutable.HashMap - -import org.apache.hadoop.mapred.JobConf -import org.apache.hadoop.mapred.OutputFormat -import org.apache.hadoop.mapred.TextOutputFormat -import org.apache.hadoop.mapred.SequenceFileOutputFormat -import org.apache.hadoop.mapred.OutputCommitter -import org.apache.hadoop.mapred.FileOutputCommitter -import org.apache.hadoop.io.Writable -import org.apache.hadoop.io.NullWritable -import org.apache.hadoop.io.BytesWritable -import org.apache.hadoop.io.Text - -import spark.Logging -import spark.RDD -import spark.SparkContext._ - -/** - * Extra functions available on RDDs of (key, value) pairs to create a Hadoop SequenceFile, - * through an implicit conversion. Note that this can't be part of PairRDDFunctions because - * we need more implicit parameters to convert our keys and values to Writable. - */ -class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : ClassManifest]( - self: RDD[(K, V)]) - extends Logging - with Serializable { - - def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = { - val c = { - if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) { - classManifest[T].erasure - } else { - implicitly[T => Writable].getClass.getMethods()(0).getReturnType - } - // TODO: use something like WritableConverter to avoid reflection - } - c.asInstanceOf[Class[_ <: Writable]] - } - - def saveAsSequenceFile(path: String) { - def anyToWritable[U <% Writable](u: U): Writable = u - - val keyClass = getWritableClass[K] - val valueClass = getWritableClass[V] - val convertKey = !classOf[Writable].isAssignableFrom(self.getKeyClass) - val convertValue = !classOf[Writable].isAssignableFrom(self.getValueClass) - - logInfo("Saving as sequence file of type (" + keyClass.getSimpleName + "," + valueClass.getSimpleName + ")" ) - val format = classOf[SequenceFileOutputFormat[Writable, Writable]] - if (!convertKey && !convertValue) { - self.saveAsHadoopFile(path, keyClass, valueClass, format) - } else if (!convertKey && convertValue) { - self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format) - } else if (convertKey && !convertValue) { - self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile(path, keyClass, valueClass, format) - } else if (convertKey && convertValue) { - self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile(path, keyClass, valueClass, format) - } - } -} diff --git a/core/src/main/scala/spark/storage/StorageLevel.scala b/core/src/main/scala/spark/storage/StorageLevel.scala index 2d52fac1ef..c497f03e0c 100644 --- a/core/src/main/scala/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/spark/storage/StorageLevel.scala @@ -2,6 +2,13 @@ package spark.storage import java.io.{Externalizable, ObjectInput, ObjectOutput} +/** + * Flags for controlling the storage of an RDD. Each StorageLevel records whether to use memory, + * whether to drop the RDD to disk if it falls out of memory, whether to keep the data in memory + * in a serialized format, and whether to replicate the RDD partitions on multiple nodes. + * The [[spark.storage.StorageLevel$]] singleton object contains some static constants for + * commonly useful storage levels. + */ class StorageLevel( var useDisk: Boolean, var useMemory: Boolean, diff --git a/core/src/main/scala/spark/util/StatCounter.scala b/core/src/main/scala/spark/util/StatCounter.scala index 023ec09332..9d7e2b804b 100644 --- a/core/src/main/scala/spark/util/StatCounter.scala +++ b/core/src/main/scala/spark/util/StatCounter.scala @@ -2,10 +2,11 @@ package spark.util /** * A class for tracking the statistics of a set of numbers (count, mean and variance) in a - * numerically robust way. Includes support for merging two StatCounters. Based on Welford and - * Chan's algorithms described at http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance. + * numerically robust way. Includes support for merging two StatCounters. Based on + * [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance Welford and Chan's algorithms for running variance]]. + * + * @constructor Initialize the StatCounter with the given values. */ -private[spark] class StatCounter(values: TraversableOnce[Double]) extends Serializable { private var n: Long = 0 // Running count of our values private var mu: Double = 0 // Running mean of our values @@ -13,8 +14,10 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { merge(values) + /** @constructor Initialize the StatCounter with no values. */ def this() = this(Nil) + /** Add a value into this StatCounter, updating the internal statistics. */ def merge(value: Double): StatCounter = { val delta = value - mu n += 1 @@ -23,11 +26,13 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { this } + /** Add multiple values into this StatCounter, updating the internal statistics. */ def merge(values: TraversableOnce[Double]): StatCounter = { values.foreach(v => merge(v)) this } + /** Merge another StatCounter into this one, adding up the internal statistics. */ def merge(other: StatCounter): StatCounter = { if (other == this) { merge(other.copy()) // Avoid overwriting fields in a weird order @@ -46,6 +51,7 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { } } + /** Clone this StatCounter */ def copy(): StatCounter = { val other = new StatCounter other.n = n @@ -60,6 +66,7 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { def sum: Double = n * mu + /** Return the variance of the values. */ def variance: Double = { if (n == 0) Double.NaN @@ -67,6 +74,10 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { m2 / n } + /** + * Return the sample variance, which corrects for bias in estimating the variance by dividing + * by N-1 instead of N. + */ def sampleVariance: Double = { if (n <= 1) Double.NaN @@ -74,8 +85,13 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { m2 / (n - 1) } + /** Return the standard deviation of the values. */ def stdev: Double = math.sqrt(variance) + /** + * Return the sample standard deviation of the values, which corrects for bias in estimating the + * variance by dividing by N-1 instead of N. + */ def sampleStdev: Double = math.sqrt(sampleVariance) override def toString: String = { @@ -83,8 +99,10 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { } } -private[spark] object StatCounter { +object StatCounter { + /** Build a StatCounter from a list of values. */ def apply(values: TraversableOnce[Double]) = new StatCounter(values) + /** Build a StatCounter from a list of values passed as variable-length arguments. */ def apply(values: Double*) = new StatCounter(values) } -- cgit v1.2.3