aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2012-10-08 22:25:03 -0700
committerPatrick Wendell <pwendell@gmail.com>2012-10-08 22:25:11 -0700
commitac310098ef6a195981080a0ae840533141780943 (patch)
tree21ed88bf9c377eb5ef8dbbb8626ea04c1a47fd3a /core/src/main
parentbd688940a187d51c3ce5edd7ffc255a536799327 (diff)
downloadspark-ac310098ef6a195981080a0ae840533141780943.tar.gz
spark-ac310098ef6a195981080a0ae840533141780943.tar.bz2
spark-ac310098ef6a195981080a0ae840533141780943.zip
More docs in RDD class
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/RDD.scala46
1 files changed, 45 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 42d5b821f8..4d984591bd 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -164,6 +164,9 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
// Transformations (return a new RDD)
+ /**
+ * Return a new RDD by applying a function to all elements of this RDD.
+ */
def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
/**
@@ -184,6 +187,9 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
def distinct(numSplits: Int = splits.size): RDD[T] =
map(x => (x, null)).reduceByKey((x, y) => x, numSplits).map(_._1)
+ /**
+ * Return a sampled subset of this RDD.
+ */
def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T] =
new SampledRDD(this, withReplacement, fraction, seed)
@@ -232,27 +238,53 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
*/
def ++(other: RDD[T]): RDD[T] = this.union(other)
+ /**
+ * Return an RDD created by coalescing all elements within each partition into an array.
+ */
def glom(): RDD[Array[T]] = new GlommedRDD(this)
def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other)
+ /**
+ * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
+ * mapping to that key.
+ */
def groupBy[K: ClassManifest](f: T => K, numSplits: Int): RDD[(K, Seq[T])] = {
val cleanF = sc.clean(f)
this.map(t => (cleanF(t), t)).groupByKey(numSplits)
}
+ /**
+ * Return an RDD of grouped items.
+ */
def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] = groupBy[K](f, sc.defaultParallelism)
+ /**
+ * Return an RDD created by piping elements to a forked external process.
+ */
def pipe(command: String): RDD[String] = new PipedRDD(this, command)
+ /**
+ * Return an RDD created by piping elements to a forked external process.
+ */
def pipe(command: Seq[String]): RDD[String] = new PipedRDD(this, command)
+ /**
+ * Return an RDD created by piping elements to a forked external process.
+ */
def pipe(command: Seq[String], env: Map[String, String]): RDD[String] =
new PipedRDD(this, command, env)
+ /**
+ * Return a new RDD by applying a function to each partition of this RDD.
+ */
def mapPartitions[U: ClassManifest](f: Iterator[T] => Iterator[U]): RDD[U] =
new MapPartitionsRDD(this, sc.clean(f))
+ /**
+ * Return a new RDD by applying a function to each partition of this RDD, while tracking the index
+ * of the original partition.
+ */
def mapPartitionsWithSplit[U: ClassManifest](f: (Int, Iterator[T]) => Iterator[U]): RDD[U] =
new MapPartitionsWithSplitRDD(this, sc.clean(f))
@@ -273,9 +305,15 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
-
+
+ /**
+ * Return an array that contains all of the elements in this RDD.
+ */
def toArray(): Array[T] = collect()
+ /**
+ * Reduces the elements of this RDD using the specified associative binary operator.
+ */
def reduce(f: (T, T) => T): T = {
val cleanF = sc.clean(f)
val reducePartition: Iterator[T] => Option[T] = iter => {
@@ -432,11 +470,17 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial
case _ => throw new UnsupportedOperationException("empty collection")
}
+ /**
+ * Save this RDD as a text file, using string representations of elements.
+ */
def saveAsTextFile(path: String) {
this.map(x => (NullWritable.get(), new Text(x.toString)))
.saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
}
+ /**
+ * Save this RDD as a SequenceFile of serialized objects.
+ */
def saveAsObjectFile(path: String) {
this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
.map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))