diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-10-09 09:47:58 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-10-09 09:47:58 -0700 |
commit | ad28aebb0adfe3710bfcf741fbc9105282ee67a8 (patch) | |
tree | 9d8a461c142075c44dfbedffa3e5d4f39d24a706 | |
parent | 4780fee887b98e1bf07a21f6152c83bd03378f66 (diff) | |
parent | 8aec63b0bc36e745926680306b5a2daa1afae11f (diff) | |
download | spark-ad28aebb0adfe3710bfcf741fbc9105282ee67a8.tar.gz spark-ad28aebb0adfe3710bfcf741fbc9105282ee67a8.tar.bz2 spark-ad28aebb0adfe3710bfcf741fbc9105282ee67a8.zip |
Merge pull request #262 from andyk/document-public-apis
Document RDD api (i.e. RDD.scala)
-rw-r--r-- | core/src/main/scala/spark/RDD.scala | 79 | ||||
-rw-r--r-- | core/src/main/scala/spark/SparkContext.scala | 8 |
2 files changed, 80 insertions, 7 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index f32ff475da..17869fb31b 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -164,16 +164,32 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial // Transformations (return a new RDD) + /** + * Return a new RDD by applying a function to all elements of this RDD. + */ def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f)) - + + /** + * Return a new RDD by first applying a function to all elements of this + * RDD, and then flattening the results. + */ def flatMap[U: ClassManifest](f: T => TraversableOnce[U]): RDD[U] = new FlatMappedRDD(this, sc.clean(f)) - + + /** + * Return a new RDD containing only the elements that satisfy a predicate. + */ def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f)) + /** + * Return a new RDD containing the distinct elements in this RDD. + */ def distinct(numSplits: Int = splits.size): RDD[T] = map(x => (x, null)).reduceByKey((x, y) => x, numSplits).map(_._1) + /** + * Return a sampled subset of this RDD. + */ def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T] = new SampledRDD(this, withReplacement, fraction, seed) @@ -222,44 +238,82 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial */ def ++(other: RDD[T]): RDD[T] = this.union(other) + /** + * Return an RDD created by coalescing all elements within each partition into an array. + */ def glom(): RDD[Array[T]] = new GlommedRDD(this) def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other) + /** + * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements + * mapping to that key. + */ def groupBy[K: ClassManifest](f: T => K, numSplits: Int): RDD[(K, Seq[T])] = { val cleanF = sc.clean(f) this.map(t => (cleanF(t), t)).groupByKey(numSplits) } + /** + * Return an RDD of grouped items. + */ def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] = groupBy[K](f, sc.defaultParallelism) + /** + * Return an RDD created by piping elements to a forked external process. + */ def pipe(command: String): RDD[String] = new PipedRDD(this, command) + /** + * Return an RDD created by piping elements to a forked external process. + */ def pipe(command: Seq[String]): RDD[String] = new PipedRDD(this, command) + /** + * Return an RDD created by piping elements to a forked external process. + */ def pipe(command: Seq[String], env: Map[String, String]): RDD[String] = new PipedRDD(this, command, env) + /** + * Return a new RDD by applying a function to each partition of this RDD. + */ def mapPartitions[U: ClassManifest](f: Iterator[T] => Iterator[U]): RDD[U] = new MapPartitionsRDD(this, sc.clean(f)) + /** + * Return a new RDD by applying a function to each partition of this RDD, while tracking the index + * of the original partition. + */ def mapPartitionsWithSplit[U: ClassManifest](f: (Int, Iterator[T]) => Iterator[U]): RDD[U] = new MapPartitionsWithSplitRDD(this, sc.clean(f)) // Actions (launch a job to return a value to the user program) - + + /** + * Applies a function f to all elements of this RDD. + */ def foreach(f: T => Unit) { val cleanF = sc.clean(f) sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) } + /** + * Return an array that contains all of the elements in this RDD. + */ def collect(): Array[T] = { val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*) } + /** + * Return an array that contains all of the elements in this RDD. + */ def toArray(): Array[T] = collect() + /** + * Reduces the elements of this RDD using the specified associative binary operator. + */ def reduce(f: (T, T) => T): T = { val cleanF = sc.clean(f) val reducePartition: Iterator[T] => Option[T] = iter => { @@ -308,7 +362,10 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial (iter: Iterator[T]) => iter.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)) return results.fold(zeroValue)(cleanCombOp) } - + + /** + * Return the number of elements in the RDD. + */ def count(): Long = { sc.runJob(this, (iter: Iterator[T]) => { var result = 0L @@ -337,8 +394,9 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial } /** - * Count elements equal to each value, returning a map of (value, count) pairs. The final combine - * step happens locally on the master, equivalent to running a single reduce task. + * Return the count of each unique value in this RDD as a map of + * (value, count) pairs. The final combine step happens locally on the + * master, equivalent to running a single reduce task. * * TODO: This should perhaps be distributed by default. */ @@ -404,16 +462,25 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial return buf.toArray } + /* + * Return the first element in this RDD. + */ def first(): T = take(1) match { case Array(t) => t case _ => throw new UnsupportedOperationException("empty collection") } + /** + * Save this RDD as a text file, using string representations of elements. + */ def saveAsTextFile(path: String) { this.map(x => (NullWritable.get(), new Text(x.toString))) .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path) } + /** + * Save this RDD as a SequenceFile of serialized objects. + */ def saveAsObjectFile(path: String) { this.mapPartitions(iter => iter.grouped(10).map(_.toArray)) .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x)))) diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 84fc541f82..47e002201b 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -54,15 +54,21 @@ import spark.storage.BlockManagerMaster * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. * + * @constructor Returns a new SparkContext. * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). * @param jobName A name for your job, to display on the cluster web UI - * @param sparkHome Location where Spark is instaled on cluster nodes + * @param sparkHome Location where Spark is installed on cluster nodes * @param jars Collection of JARs to send to the cluster. These can be paths on the local file * system or HDFS, HTTP, HTTPS, or FTP URLs. */ class SparkContext(master: String, jobName: String, val sparkHome: String, val jars: Seq[String]) extends Logging { + /** + * @constructor Returns a new SparkContext. + * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). + * @param jobName A name for your job, to display on the cluster web UI + */ def this(master: String, jobName: String) = this(master, jobName, null, Nil) // Ensure logging is initialized before we spawn any threads |