From dc8adbd359c24b414adf26127dfd58850e0c0f99 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Thu, 11 Oct 2012 00:49:03 -0700 Subject: Adding Java documentation --- core/src/main/scala/spark/RDD.scala | 2 +- .../main/scala/spark/api/java/JavaDoubleRDD.scala | 32 +++- .../main/scala/spark/api/java/JavaPairRDD.scala | 212 ++++++++++++++++++++- core/src/main/scala/spark/api/java/JavaRDD.scala | 25 ++- .../main/scala/spark/api/java/JavaRDDLike.scala | 124 +++++++++++- .../scala/spark/api/java/JavaSparkContext.scala | 69 ++++++- 6 files changed, 454 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 984738ef73..c9334f68a8 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -461,7 +461,7 @@ abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serial return buf.toArray } - /* + /** * Return the first element in this RDD. */ def first(): T = take(1) match { diff --git a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala index 9731bb4eac..4ad006c4a7 100644 --- a/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaDoubleRDD.scala @@ -22,8 +22,13 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav import JavaDoubleRDD.fromRDD + /** Persist this RDD with the default storage level (MEMORY_ONLY). */ def cache(): JavaDoubleRDD = fromRDD(srdd.cache()) + /** + * Set this RDD's storage level to persist its values across operations after the first time + * it is computed. Can only be called once on each RDD. + */ def persist(newLevel: StorageLevel): JavaDoubleRDD = fromRDD(srdd.persist(newLevel)) // first() has to be overriden here in order for its return type to be Double instead of Object. @@ -31,38 +36,63 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav // Transformations (return a new RDD) + /** + * Return a new RDD containing the distinct elements in this RDD. + */ def distinct(): JavaDoubleRDD = fromRDD(srdd.distinct()) + /** + * Return a new RDD containing the distinct elements in this RDD. + */ def distinct(numSplits: Int): JavaDoubleRDD = fromRDD(srdd.distinct(numSplits)) + /** + * Return a new RDD containing only the elements that satisfy a predicate. + */ def filter(f: JFunction[Double, java.lang.Boolean]): JavaDoubleRDD = fromRDD(srdd.filter(x => f(x).booleanValue())) + /** + * Return a sampled subset of this RDD. + */ def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaDoubleRDD = fromRDD(srdd.sample(withReplacement, fraction, seed)) + /** + * Return the union of this RDD and another one. Any identical elements will appear multiple + * times (use `.distinct()` to eliminate them). + */ def union(other: JavaDoubleRDD): JavaDoubleRDD = fromRDD(srdd.union(other.srdd)) // Double RDD functions + /** Return the sum of the elements in this RDD. */ def sum(): Double = srdd.sum() + /** Return a [[spark.StatCounter]] describing the elements in this RDD. */ def stats(): StatCounter = srdd.stats() + /** Return the mean of the elements in this RDD. */ def mean(): Double = srdd.mean() + /** Return the variance of the elements in this RDD. */ def variance(): Double = srdd.variance() + /** Return the standard deviation of the elements in this RDD. */ def stdev(): Double = srdd.stdev() + /** Return the approximate mean of the elements in this RDD. */ def meanApprox(timeout: Long, confidence: Double): PartialResult[BoundedDouble] = srdd.meanApprox(timeout, confidence) + /** Return the approximate mean of the elements in this RDD. */ def meanApprox(timeout: Long): PartialResult[BoundedDouble] = srdd.meanApprox(timeout) + /** Return the approximate sum of the elements in this RDD. */ def sumApprox(timeout: Long, confidence: Double): PartialResult[BoundedDouble] = srdd.sumApprox(timeout, confidence) - + + /** Return the approximate sum of the elements in this RDD. */ def sumApprox(timeout: Long): PartialResult[BoundedDouble] = srdd.sumApprox(timeout) } diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala index d361de8f8f..cfaa26ff52 100644 --- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala @@ -34,23 +34,44 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif // Common RDD functions + /** Persist this RDD with the default storage level (MEMORY_ONLY). */ def cache(): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.cache()) + /** + * Set this RDD's storage level to persist its values across operations after the first time + * it is computed. Can only be called once on each RDD. + */ def persist(newLevel: StorageLevel): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.persist(newLevel)) // Transformations (return a new RDD) + /** + * Return a new RDD containing the distinct elements in this RDD. + */ def distinct(): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.distinct()) + /** + * Return a new RDD containing the distinct elements in this RDD. + */ def distinct(numSplits: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.distinct(numSplits)) + /** + * Return a new RDD containing only the elements that satisfy a predicate. + */ def filter(f: Function[(K, V), java.lang.Boolean]): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.filter(x => f(x).booleanValue())) + /** + * Return a sampled subset of this RDD. + */ def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.sample(withReplacement, fraction, seed)) + /** + * Return the union of this RDD and another one. Any identical elements will appear multiple + * times (use `.distinct()` to eliminate them). + */ def union(other: JavaPairRDD[K, V]): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.union(other.rdd)) @@ -61,7 +82,21 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif override def first(): (K, V) = rdd.first() // Pair RDD functions - + + /** + * Generic function to combine the elements for each key using a custom set of aggregation + * functions. Turns a JavaPairRDD[(K, V)] into a result of type JavaPairRDD[(K, C)], for a + * "combined type" C * Note that V and C can be different -- for example, one might group an + * RDD of type (Int, Int) into an RDD of type (Int, List[Int]). Users provide three + * functions: + * + * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list) + * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list) + * - `mergeCombiners`, to combine two C's into a single one. + * + * In addition, users can control the partitioning of the output RDD, and whether to perform + * map-side aggregation (if a mapper can produce multiple items with the same key). + */ def combineByKey[C](createCombiner: Function[V, C], mergeValue: JFunction2[C, V, C], mergeCombiners: JFunction2[C, C, C], @@ -76,50 +111,113 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif )) } + /** + * Simplified version of combineByKey that hash-partitions the output RDD. + */ def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], mergeCombiners: JFunction2[C, C, C], numSplits: Int): JavaPairRDD[K, C] = combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numSplits)) + /** + * Merge the values for each key using an associative reduce function. This will also perform + * the merging locally on each mapper before sending results to a reducer, similarly to a + * "combiner" in MapReduce. + */ def reduceByKey(partitioner: Partitioner, func: JFunction2[V, V, V]): JavaPairRDD[K, V] = fromRDD(rdd.reduceByKey(partitioner, func)) + /** + * Merge the values for each key using an associative reduce function, but return the results + * immediately to the master as a Map. This will also perform the merging locally on each mapper + * before sending results to a reducer, similarly to a "combiner" in MapReduce. + */ def reduceByKeyLocally(func: JFunction2[V, V, V]): java.util.Map[K, V] = mapAsJavaMap(rdd.reduceByKeyLocally(func)) + /** Count the number of elements for each key, and return the result to the master as a Map. */ def countByKey(): java.util.Map[K, Long] = mapAsJavaMap(rdd.countByKey()) + /** + * (Experimental) Approximate version of countByKey that can return a partial result if it does + * not finish within a timeout. + */ def countByKeyApprox(timeout: Long): PartialResult[java.util.Map[K, BoundedDouble]] = rdd.countByKeyApprox(timeout).map(mapAsJavaMap) + /** + * (Experimental) Approximate version of countByKey that can return a partial result if it does + * not finish within a timeout. + */ def countByKeyApprox(timeout: Long, confidence: Double = 0.95) : PartialResult[java.util.Map[K, BoundedDouble]] = rdd.countByKeyApprox(timeout, confidence).map(mapAsJavaMap) + /** + * Merge the values for each key using an associative reduce function. This will also perform + * the merging locally on each mapper before sending results to a reducer, similarly to a + * "combiner" in MapReduce. Output will be hash-partitioned with numSplits splits. + */ def reduceByKey(func: JFunction2[V, V, V], numSplits: Int): JavaPairRDD[K, V] = fromRDD(rdd.reduceByKey(func, numSplits)) + /** + * Group the values for each key in the RDD into a single sequence. Allows controlling the + * partitioning of the resulting key-value pair RDD by passing a Partitioner. + */ def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JList[V]] = fromRDD(groupByResultToJava(rdd.groupByKey(partitioner))) + /** + * Group the values for each key in the RDD into a single sequence. Hash-partitions the + * resulting RDD with into `numSplits` partitions. + */ def groupByKey(numSplits: Int): JavaPairRDD[K, JList[V]] = fromRDD(groupByResultToJava(rdd.groupByKey(numSplits))) + /** + * Return a copy of the RDD partitioned using the specified partitioner. If `mapSideCombine` + * is true, Spark will group values of the same key together on the map side before the + * repartitioning, to only send each key over the network once. If a large number of + * duplicated keys are expected, and the size of the keys are large, `mapSideCombine` should + * be set to true. + */ def partitionBy(partitioner: Partitioner): JavaPairRDD[K, V] = fromRDD(rdd.partitionBy(partitioner)) + /** + * Merge the values for each key using an associative reduce function. This will also perform + * the merging locally on each mapper before sending results to a reducer, similarly to a + * "combiner" in MapReduce. + */ def join[W](other: JavaPairRDD[K, W], partitioner: Partitioner): JavaPairRDD[K, (V, W)] = fromRDD(rdd.join(other, partitioner)) + /** + * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the + * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to + * partition the output RDD. + */ def leftOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner) : JavaPairRDD[K, (V, Option[W])] = fromRDD(rdd.leftOuterJoin(other, partitioner)) + /** + * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the + * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the + * pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to + * partition the output RDD. + */ def rightOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner) : JavaPairRDD[K, (Option[V], W)] = fromRDD(rdd.rightOuterJoin(other, partitioner)) + /** + * Simplified version of combineByKey that hash-partitions the resulting RDD using the default + * parallelism level. + */ def combineByKey[C](createCombiner: JFunction[V, C], mergeValue: JFunction2[C, V, C], mergeCombiners: JFunction2[C, C, C]): JavaPairRDD[K, C] = { @@ -128,40 +226,94 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif fromRDD(combineByKey(createCombiner, mergeValue, mergeCombiners)) } + /** + * Merge the values for each key using an associative reduce function. This will also perform + * the merging locally on each mapper before sending results to a reducer, similarly to a + * "combiner" in MapReduce. Output will be hash-partitioned with the default parallelism level. + */ def reduceByKey(func: JFunction2[V, V, V]): JavaPairRDD[K, V] = { val partitioner = rdd.defaultPartitioner(rdd) fromRDD(reduceByKey(partitioner, func)) } + /** + * Group the values for each key in the RDD into a single sequence. Hash-partitions the + * resulting RDD with the default parallelism level. + */ def groupByKey(): JavaPairRDD[K, JList[V]] = fromRDD(groupByResultToJava(rdd.groupByKey())) + /** + * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each + * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and + * (k, v2) is in `other`. Performs a hash join across the cluster. + */ def join[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, W)] = fromRDD(rdd.join(other)) + /** + * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each + * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and + * (k, v2) is in `other`. Performs a hash join across the cluster. + */ def join[W](other: JavaPairRDD[K, W], numSplits: Int): JavaPairRDD[K, (V, W)] = fromRDD(rdd.join(other, numSplits)) + /** + * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the + * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output + * using the default level of parallelism. + */ def leftOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, Option[W])] = fromRDD(rdd.leftOuterJoin(other)) + /** + * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the + * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the + * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output + * into `numSplits` partitions. + */ def leftOuterJoin[W](other: JavaPairRDD[K, W], numSplits: Int): JavaPairRDD[K, (V, Option[W])] = fromRDD(rdd.leftOuterJoin(other, numSplits)) + /** + * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the + * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the + * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting + * RDD using the default parallelism level. + */ def rightOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Option[V], W)] = fromRDD(rdd.rightOuterJoin(other)) + /** + * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the + * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the + * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting + * RDD into the given number of partitions. + */ def rightOuterJoin[W](other: JavaPairRDD[K, W], numSplits: Int): JavaPairRDD[K, (Option[V], W)] = fromRDD(rdd.rightOuterJoin(other, numSplits)) + /** + * Return the key-value pairs in this RDD to the master as a Map. + */ def collectAsMap(): java.util.Map[K, V] = mapAsJavaMap(rdd.collectAsMap()) + /** + * Pass each value in the key-value pair RDD through a map function without changing the keys; + * this also retains the original RDD's partitioning. + */ def mapValues[U](f: Function[V, U]): JavaPairRDD[K, U] = { implicit val cm: ClassManifest[U] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]] fromRDD(rdd.mapValues(f)) } + /** + * Pass each value in the key-value pair RDD through a flatMap function without changing the + * keys; this also retains the original RDD's partitioning. + */ def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairRDD[K, U] = { import scala.collection.JavaConverters._ def fn = (x: V) => f.apply(x).asScala @@ -170,37 +322,68 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif fromRDD(rdd.flatMapValues(fn)) } + /** + * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the + * list of values for that key in `this` as well as `other`. + */ def cogroup[W](other: JavaPairRDD[K, W], partitioner: Partitioner) : JavaPairRDD[K, (JList[V], JList[W])] = fromRDD(cogroupResultToJava(rdd.cogroup(other, partitioner))) + /** + * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a + * tuple with the list of values for that key in `this`, `other1` and `other2`. + */ def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], partitioner: Partitioner) : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, partitioner))) + /** + * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the + * list of values for that key in `this` as well as `other`. + */ def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = fromRDD(cogroupResultToJava(rdd.cogroup(other))) + /** + * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a + * tuple with the list of values for that key in `this`, `other1` and `other2`. + */ def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2]) : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2))) + /** + * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the + * list of values for that key in `this` as well as `other`. + */ def cogroup[W](other: JavaPairRDD[K, W], numSplits: Int): JavaPairRDD[K, (JList[V], JList[W])] = fromRDD(cogroupResultToJava(rdd.cogroup(other, numSplits))) + /** + * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a + * tuple with the list of values for that key in `this`, `other1` and `other2`. + */ def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numSplits: Int) : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numSplits))) + /** Alias for cogroup. */ def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = fromRDD(cogroupResultToJava(rdd.groupWith(other))) + /** Alias for cogroup. */ def groupWith[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2]) : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = fromRDD(cogroupResult2ToJava(rdd.groupWith(other1, other2))) + /** + * Return the list of values in the RDD for key `key`. This operation is done efficiently if the + * RDD has a known partitioner by only searching the partition that the key maps to. + */ def lookup(key: K): JList[V] = seqAsJavaList(rdd.lookup(key)) + /** Output the RDD to any Hadoop-supported file system. */ def saveAsHadoopFile[F <: OutputFormat[_, _]]( path: String, keyClass: Class[_], @@ -210,6 +393,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, conf) } + /** Output the RDD to any Hadoop-supported file system. */ def saveAsHadoopFile[F <: OutputFormat[_, _]]( path: String, keyClass: Class[_], @@ -218,6 +402,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass) } + /** Output the RDD to any Hadoop-supported file system. */ def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]]( path: String, keyClass: Class[_], @@ -227,6 +412,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, conf) } + /** Output the RDD to any Hadoop-supported file system. */ def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]]( path: String, keyClass: Class[_], @@ -235,6 +421,12 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass) } + /** + * Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for + * that storage system. The JobConf should set an OutputFormat and any output paths required + * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop + * MapReduce job. + */ def saveAsHadoopDataset(conf: JobConf) { rdd.saveAsHadoopDataset(conf) } @@ -243,13 +435,31 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif // Ordered RDD Functions def sortByKey(): JavaPairRDD[K, V] = sortByKey(true) + /** + * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling + * `collect` or `save` on the resulting RDD will return or output an ordered list of records + * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in + * order of the keys). + */ def sortByKey(ascending: Boolean): JavaPairRDD[K, V] = { val comp = com.google.common.collect.Ordering.natural().asInstanceOf[Comparator[K]] sortByKey(comp, true) } + /** + * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling + * `collect` or `save` on the resulting RDD will return or output an ordered list of records + * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in + * order of the keys). + */ def sortByKey(comp: Comparator[K]): JavaPairRDD[K, V] = sortByKey(comp, true) + /** + * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling + * `collect` or `save` on the resulting RDD will return or output an ordered list of records + * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in + * order of the keys). + */ def sortByKey(comp: Comparator[K], ascending: Boolean): JavaPairRDD[K, V] = { class KeyOrdering(val a: K) extends Ordered[K] { override def compare(b: K) = comp.compare(a, b) diff --git a/core/src/main/scala/spark/api/java/JavaRDD.scala b/core/src/main/scala/spark/api/java/JavaRDD.scala index b3e1977bcb..50be5a6cbe 100644 --- a/core/src/main/scala/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaRDD.scala @@ -11,22 +11,43 @@ JavaRDDLike[T, JavaRDD[T]] { // Common RDD functions + /** Persist this RDD with the default storage level (MEMORY_ONLY). */ def cache(): JavaRDD[T] = wrapRDD(rdd.cache()) + /** + * Set this RDD's storage level to persist its values across operations after the first time + * it is computed. Can only be called once on each RDD. + */ def persist(newLevel: StorageLevel): JavaRDD[T] = wrapRDD(rdd.persist(newLevel)) // Transformations (return a new RDD) + /** + * Return a new RDD containing the distinct elements in this RDD. + */ def distinct(): JavaRDD[T] = wrapRDD(rdd.distinct()) + /** + * Return a new RDD containing the distinct elements in this RDD. + */ def distinct(numSplits: Int): JavaRDD[T] = wrapRDD(rdd.distinct(numSplits)) - + + /** + * Return a new RDD containing only the elements that satisfy a predicate. + */ def filter(f: JFunction[T, java.lang.Boolean]): JavaRDD[T] = wrapRDD(rdd.filter((x => f(x).booleanValue()))) + /** + * Return a sampled subset of this RDD. + */ def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaRDD[T] = wrapRDD(rdd.sample(withReplacement, fraction, seed)) - + + /** + * Return the union of this RDD and another one. Any identical elements will appear multiple + * times (use `.distinct()` to eliminate them). + */ def union(other: JavaRDD[T]): JavaRDD[T] = wrapRDD(rdd.union(other.rdd)) } diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala index 785dd96394..43d0ca0e2f 100644 --- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala @@ -19,41 +19,71 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def rdd: RDD[T] + /** Set of partitions in this RDD. */ def splits: JList[Split] = new java.util.ArrayList(rdd.splits.toSeq) + /** The [[spark.SparkContext]] that this RDD was created on. */ def context: SparkContext = rdd.context - + + /** A unique ID for this RDD (within its SparkContext). */ def id: Int = rdd.id + /** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */ def getStorageLevel: StorageLevel = rdd.getStorageLevel + /** + * Internal method to this RDD; will read from cache if applicable, or otherwise compute it. + * This should ''not'' be called by users directly, but is available for implementors of custom + * subclasses of RDD. + */ def iterator(split: Split): java.util.Iterator[T] = asJavaIterator(rdd.iterator(split)) // Transformations (return a new RDD) + /** + * Return a new RDD by applying a function to all elements of this RDD. + */ def map[R](f: JFunction[T, R]): JavaRDD[R] = new JavaRDD(rdd.map(f)(f.returnType()))(f.returnType()) + /** + * Return a new RDD by applying a function to all elements of this RDD. + */ def map[R](f: DoubleFunction[T]): JavaDoubleRDD = new JavaDoubleRDD(rdd.map(x => f(x).doubleValue())) + /** + * Return a new RDD by applying a function to all elements of this RDD. + */ def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = { def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]] new JavaPairRDD(rdd.map(f)(cm))(f.keyType(), f.valueType()) } + /** + * Return a new RDD by first applying a function to all elements of this + * RDD, and then flattening the results. + */ def flatMap[U](f: FlatMapFunction[T, U]): JavaRDD[U] = { import scala.collection.JavaConverters._ def fn = (x: T) => f.apply(x).asScala JavaRDD.fromRDD(rdd.flatMap(fn)(f.elementType()))(f.elementType()) } + /** + * Return a new RDD by first applying a function to all elements of this + * RDD, and then flattening the results. + */ def flatMap(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = { import scala.collection.JavaConverters._ def fn = (x: T) => f.apply(x).asScala new JavaDoubleRDD(rdd.flatMap(fn).map((x: java.lang.Double) => x.doubleValue())) } + /** + * Return a new RDD by first applying a function to all elements of this + * RDD, and then flattening the results. + */ def flatMap[K, V](f: PairFlatMapFunction[T, K, V]): JavaPairRDD[K, V] = { import scala.collection.JavaConverters._ def fn = (x: T) => f.apply(x).asScala @@ -61,22 +91,35 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(f.keyType(), f.valueType()) } + /** + * Return a new RDD by applying a function to each partition of this RDD. + */ def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] = { def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) JavaRDD.fromRDD(rdd.mapPartitions(fn)(f.elementType()))(f.elementType()) } + + /** + * Return a new RDD by applying a function to each partition of this RDD. + */ def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = { def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: java.lang.Double) => x.doubleValue())) } + /** + * Return a new RDD by applying a function to each partition of this RDD. + */ def mapPartitions[K, V](f: PairFlatMapFunction[java.util.Iterator[T], K, V]): JavaPairRDD[K, V] = { def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(f.keyType(), f.valueType()) } + /** + * Return an RDD created by coalescing all elements within each partition into an array. + */ def glom(): JavaRDD[JList[T]] = new JavaRDD(rdd.glom().map(x => new java.util.ArrayList[T](x.toSeq))) @@ -84,6 +127,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { JavaPairRDD.fromRDD(rdd.cartesian(other.rdd)(other.classManifest))(classManifest, other.classManifest) + /** + * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements + * mapping to that key. + */ def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JList[T]] = { implicit val kcm: ClassManifest[K] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] @@ -92,6 +139,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(f.returnType)))(kcm, vcm) } + /** + * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements + * mapping to that key. + */ def groupBy[K](f: JFunction[T, K], numSplits: Int): JavaPairRDD[K, JList[T]] = { implicit val kcm: ClassManifest[K] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] @@ -100,56 +151,114 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numSplits)(f.returnType)))(kcm, vcm) } + /** + * Return an RDD created by piping elements to a forked external process. + */ def pipe(command: String): JavaRDD[String] = rdd.pipe(command) + /** + * Return an RDD created by piping elements to a forked external process. + */ def pipe(command: JList[String]): JavaRDD[String] = rdd.pipe(asScalaBuffer(command)) + /** + * Return an RDD created by piping elements to a forked external process. + */ def pipe(command: JList[String], env: java.util.Map[String, String]): JavaRDD[String] = rdd.pipe(asScalaBuffer(command), mapAsScalaMap(env)) // Actions (launch a job to return a value to the user program) - + + /** + * Applies a function f to all elements of this RDD. + */ def foreach(f: VoidFunction[T]) { val cleanF = rdd.context.clean(f) rdd.foreach(cleanF) } + /** + * Return an array that contains all of the elements in this RDD. + */ def collect(): JList[T] = { import scala.collection.JavaConversions._ val arr: java.util.Collection[T] = rdd.collect().toSeq new java.util.ArrayList(arr) } - + + /** + * Reduces the elements of this RDD using the specified associative binary operator. + */ def reduce(f: JFunction2[T, T, T]): T = rdd.reduce(f) + /** + * Aggregate the elements of each partition, and then the results for all the partitions, using a + * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to + * modify t1 and return it as its result value to avoid object allocation; however, it should not + * modify t2. + */ def fold(zeroValue: T)(f: JFunction2[T, T, T]): T = rdd.fold(zeroValue)(f) + /** + * Aggregate the elements of each partition, and then the results for all the partitions, using + * given combine functions and a neutral "zero value". This function can return a different result + * type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U + * and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are + * allowed to modify and return their first argument instead of creating a new U to avoid memory + * allocation. + */ def aggregate[U](zeroValue: U)(seqOp: JFunction2[U, T, U], combOp: JFunction2[U, U, U]): U = rdd.aggregate(zeroValue)(seqOp, combOp)(seqOp.returnType) + /** + * Return the number of elements in the RDD. + */ def count(): Long = rdd.count() + /** + * (Experimental) Approximate version of count() that returns a potentially incomplete result + * within a timeout, even if not all tasks have finished. + */ def countApprox(timeout: Long, confidence: Double): PartialResult[BoundedDouble] = rdd.countApprox(timeout, confidence) + /** + * (Experimental) Approximate version of count() that returns a potentially incomplete result + * within a timeout, even if not all tasks have finished. + */ def countApprox(timeout: Long): PartialResult[BoundedDouble] = rdd.countApprox(timeout) + /** + * Return the count of each unique value in this RDD as a map of (value, count) pairs. The final + * combine step happens locally on the master, equivalent to running a single reduce task. + */ def countByValue(): java.util.Map[T, java.lang.Long] = mapAsJavaMap(rdd.countByValue().map((x => (x._1, new lang.Long(x._2))))) + /** + * (Experimental) Approximate version of countByValue(). + */ def countByValueApprox( timeout: Long, confidence: Double ): PartialResult[java.util.Map[T, BoundedDouble]] = rdd.countByValueApprox(timeout, confidence).map(mapAsJavaMap) + /** + * (Experimental) Approximate version of countByValue(). + */ def countByValueApprox(timeout: Long): PartialResult[java.util.Map[T, BoundedDouble]] = rdd.countByValueApprox(timeout).map(mapAsJavaMap) + /** + * Take the first num elements of the RDD. This currently scans the partitions *one by one*, so + * it will be slow if a lot of partitions are required. In that case, use collect() to get the + * whole RDD instead. + */ def take(num: Int): JList[T] = { import scala.collection.JavaConversions._ val arr: java.util.Collection[T] = rdd.take(num).toSeq @@ -162,9 +271,18 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { new java.util.ArrayList(arr) } + /** + * Return the first element in this RDD. + */ def first(): T = rdd.first() + /** + * Save this RDD as a text file, using string representations of elements. + */ def saveAsTextFile(path: String) = rdd.saveAsTextFile(path) + /** + * Save this RDD as a SequenceFile of serialized objects. + */ def saveAsObjectFile(path: String) = rdd.saveAsObjectFile(path) } diff --git a/core/src/main/scala/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/spark/api/java/JavaSparkContext.scala index 4a7d945a8d..c78b09c750 100644 --- a/core/src/main/scala/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/spark/api/java/JavaSparkContext.scala @@ -18,26 +18,47 @@ import scala.collection.JavaConversions class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWorkaround { + /** + * @constructor Returns a new SparkContext. + * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). + * @param frameworkName A name for your job, to display on the cluster web UI + */ def this(master: String, frameworkName: String) = this(new SparkContext(master, frameworkName)) + /** + * @constructor Returns a new SparkContext. + * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). + * @param frameworkName A name for your job, to display on the cluster web UI + * @param sparkHome The SPARK_HOME directory on the slave nodes + * @param jarFile A path to a local jar file containing this job + */ def this(master: String, frameworkName: String, sparkHome: String, jarFile: String) = this(new SparkContext(master, frameworkName, sparkHome, Seq(jarFile))) + /** + * @constructor Returns a new SparkContext. + * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). + * @param frameworkName A name for your job, to display on the cluster web UI + * @param sparkHome The SPARK_HOME directory on the slave nodes + * @param jars A set of jar files relating to this job + */ def this(master: String, frameworkName: String, sparkHome: String, jars: Array[String]) = this(new SparkContext(master, frameworkName, sparkHome, jars.toSeq)) val env = sc.env + /** Distribute a local Scala collection to form an RDD. */ def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = { implicit val cm: ClassManifest[T] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices) } + /** Distribute a local Scala collection to form an RDD. */ def parallelize[T](list: java.util.List[T]): JavaRDD[T] = parallelize(list, sc.defaultParallelism) - + /** Distribute a local Scala collection to form an RDD. */ def parallelizePairs[K, V](list: java.util.List[Tuple2[K, V]], numSlices: Int) : JavaPairRDD[K, V] = { implicit val kcm: ClassManifest[K] = @@ -47,21 +68,32 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork JavaPairRDD.fromRDD(sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices)) } + /** Distribute a local Scala collection to form an RDD. */ def parallelizePairs[K, V](list: java.util.List[Tuple2[K, V]]): JavaPairRDD[K, V] = parallelizePairs(list, sc.defaultParallelism) + /** Distribute a local Scala collection to form an RDD. */ def parallelizeDoubles(list: java.util.List[java.lang.Double], numSlices: Int): JavaDoubleRDD = JavaDoubleRDD.fromRDD(sc.parallelize(JavaConversions.asScalaBuffer(list).map(_.doubleValue()), numSlices)) + /** Distribute a local Scala collection to form an RDD. */ def parallelizeDoubles(list: java.util.List[java.lang.Double]): JavaDoubleRDD = parallelizeDoubles(list, sc.defaultParallelism) + /** + * Read a text file from HDFS, a local file system (available on all nodes), or any + * Hadoop-supported file system URI, and return it as an RDD of Strings. + */ def textFile(path: String): JavaRDD[String] = sc.textFile(path) + /** + * Read a text file from HDFS, a local file system (available on all nodes), or any + * Hadoop-supported file system URI, and return it as an RDD of Strings. + */ def textFile(path: String, minSplits: Int): JavaRDD[String] = sc.textFile(path, minSplits) - /**Get an RDD for a Hadoop SequenceFile with given key and value types */ + /**Get an RDD for a Hadoop SequenceFile with given key and value types. */ def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V], @@ -72,6 +104,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits)) } + /**Get an RDD for a Hadoop SequenceFile. */ def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): JavaPairRDD[K, V] = { implicit val kcm = ClassManifest.fromClass(keyClass) @@ -92,6 +125,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork sc.objectFile(path, minSplits)(cm) } + /** + * Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and + * BytesWritable values that contain a serialized partition. This is still an experimental storage + * format and may not be supported exactly as is in future Spark releases. It will also be pretty + * slow if you use the default serializer (Java serialization), though the nice thing about it is + * that there's very little effort required to save arbitrary objects. + */ def objectFile[T](path: String): JavaRDD[T] = { implicit val cm: ClassManifest[T] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] @@ -180,12 +220,14 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass)) } + /** Build the union of two or more RDDs. */ override def union[T](first: JavaRDD[T], rest: java.util.List[JavaRDD[T]]): JavaRDD[T] = { val rdds: Seq[RDD[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd) implicit val cm: ClassManifest[T] = first.classManifest sc.union(rdds)(cm) } + /** Build the union of two or more RDDs. */ override def union[K, V](first: JavaPairRDD[K, V], rest: java.util.List[JavaPairRDD[K, V]]) : JavaPairRDD[K, V] = { val rdds: Seq[RDD[(K, V)]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd) @@ -195,26 +237,49 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork new JavaPairRDD(sc.union(rdds)(cm))(kcm, vcm) } + /** Build the union of two or more RDDs. */ override def union(first: JavaDoubleRDD, rest: java.util.List[JavaDoubleRDD]): JavaDoubleRDD = { val rdds: Seq[RDD[Double]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.srdd) new JavaDoubleRDD(sc.union(rdds)) } + /** + * Create an [[spark.Accumulator]] integer variable, which tasks can "add" values + * to using the `+=` method. Only the master can access the accumulator's `value`. + */ def intAccumulator(initialValue: Int): Accumulator[Int] = sc.accumulator(initialValue)(IntAccumulatorParam) + /** + * Create an [[spark.Accumulator]] double variable, which tasks can "add" values + * to using the `+=` method. Only the master can access the accumulator's `value`. + */ def doubleAccumulator(initialValue: Double): Accumulator[Double] = sc.accumulator(initialValue)(DoubleAccumulatorParam) + /** + * Create an [[spark.Accumulator]] variable of a given type, which tasks can "add" values + * to using the `+=` method. Only the master can access the accumulator's `value`. + */ def accumulator[T](initialValue: T, accumulatorParam: AccumulatorParam[T]): Accumulator[T] = sc.accumulator(initialValue)(accumulatorParam) + /** + * Broadcast a read-only variable to the cluster, returning a [[spark.Broadcast]] object for + * reading it in distributed functions. The variable will be sent to each cluster only once. + */ def broadcast[T](value: T): Broadcast[T] = sc.broadcast(value) + /** Shut down the SparkContext. */ def stop() { sc.stop() } + /** + * Get Spark's home location from either a value set through the constructor, + * or the spark.home Java property, or the SPARK_HOME environment variable + * (in that order of preference). If neither of these is set, return None. + */ def getSparkHome(): Option[String] = sc.getSparkHome() } -- cgit v1.2.3