/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.spark.rdd import java.util.Random import scala.collection.Map import scala.collection.JavaConversions.mapAsScalaMap import scala.collection.mutable.ArrayBuffer import scala.collection.mutable.HashMap import scala.reflect.{classTag, ClassTag} import org.apache.hadoop.io.BytesWritable import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.io.NullWritable import org.apache.hadoop.io.Text import org.apache.hadoop.mapred.TextOutputFormat import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap} import org.apache.spark.Partitioner._ import org.apache.spark.api.java.JavaRDD import org.apache.spark.partial.BoundedDouble import org.apache.spark.partial.CountEvaluator import org.apache.spark.partial.GroupedCountEvaluator import org.apache.spark.partial.PartialResult import org.apache.spark.storage.StorageLevel import org.apache.spark.util.{Utils, BoundedPriorityQueue} import org.apache.spark.SparkContext._ import org.apache.spark._ /** * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, * partitioned collection of elements that can be operated on in parallel. This class contains the * basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition, * [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value * pairs, such as `groupByKey` and `join`; [[org.apache.spark.rdd.DoubleRDDFunctions]] contains * operations available only on RDDs of Doubles; and [[org.apache.spark.rdd.SequenceFileRDDFunctions]] * contains operations available on RDDs that can be saved as SequenceFiles. These operations are * automatically available on any RDD of the right type (e.g. RDD[(Int, Int)] through implicit * conversions when you `import org.apache.spark.SparkContext._`. * * Internally, each RDD is characterized by five main properties: * * - A list of partitions * - A function for computing each split * - A list of dependencies on other RDDs * - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned) * - Optionally, a list of preferred locations to compute each split on (e.g. block locations for * an HDFS file) * * All of the scheduling and execution in Spark is done based on these methods, allowing each RDD * to implement its own way of computing itself. Indeed, users can implement custom RDDs (e.g. for * reading data from a new storage system) by overriding these functions. Please refer to the * [[http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf Spark paper]] for more details * on RDD internals. */ abstract class RDD[T: ClassTag]( @transient private var sc: SparkContext, @transient private var deps: Seq[Dependency[_]] ) extends Serializable with Logging { /** Construct an RDD with just a one-to-one dependency on one parent */ def this(@transient oneParent: RDD[_]) = this(oneParent.context , List(new OneToOneDependency(oneParent))) // ======================================================================= // Methods that should be implemented by subclasses of RDD // ======================================================================= /** Implemented by subclasses to compute a given partition. */ def compute(split: Partition, context: TaskContext): Iterator[T] /** * Implemented by subclasses to return the set of partitions in this RDD. This method will only * be called once, so it is safe to implement a time-consuming computation in it. */ protected def getPartitions: Array[Partition] /** * Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only * be called once, so it is safe to implement a time-consuming computation in it. */ protected def getDependencies: Seq[Dependency[_]] = deps /** Optionally overridden by subclasses to specify placement preferences. */ protected def getPreferredLocations(split: Partition): Seq[String] = Nil /** Optionally overridden by subclasses to specify how they are partitioned. */ @transient val partitioner: Option[Partitioner] = None // ======================================================================= // Methods and fields available on all RDDs // ======================================================================= /** The SparkContext that created this RDD. */ def sparkContext: SparkContext = sc /** A unique ID for this RDD (within its SparkContext). */ val id: Int = sc.newRddId() /** A friendly name for this RDD */ @transient var name: String = null /** Assign a name to this RDD */ def setName(_name: String) = { name = _name this } /** User-defined generator of this RDD*/ @transient var generator = Utils.getCallSiteInfo.firstUserClass /** Reset generator*/ def setGenerator(_generator: String) = { generator = _generator } /** * Set this RDD's storage level to persist its values across operations after the first time * it is computed. This can only be used to assign a new storage level if the RDD does not * have a storage level set yet.. */ def persist(newLevel: StorageLevel): RDD[T] = { // TODO: Handle changes of StorageLevel if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) { throw new UnsupportedOperationException( "Cannot change storage level of an RDD after it was already assigned a level") } storageLevel = newLevel // Register the RDD with the SparkContext sc.persistentRdds(id) = this this } /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def persist(): RDD[T] = persist(StorageLevel.MEMORY_ONLY) /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ def cache(): RDD[T] = persist() /** * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. * * @param blocking Whether to block until all blocks are deleted. * @return This RDD. */ def unpersist(blocking: Boolean = true): RDD[T] = { logInfo("Removing RDD " + id + " from persistence list") sc.env.blockManager.master.removeRdd(id, blocking) sc.persistentRdds.remove(id) storageLevel = StorageLevel.NONE this } /** Get the RDD's current storage level, or StorageLevel.NONE if none is set. */ def getStorageLevel = storageLevel // Our dependencies and partitions will be gotten by calling subclass's methods below, and will // be overwritten when we're checkpointed private var dependencies_ : Seq[Dependency[_]] = null @transient private var partitions_ : Array[Partition] = null /** An Option holding our checkpoint RDD, if we are checkpointed */ private def checkpointRDD: Option[RDD[T]] = checkpointData.flatMap(_.checkpointRDD) /** * Get the list of dependencies of this RDD, taking into account whether the * RDD is checkpointed or not. */ final def dependencies: Seq[Dependency[_]] = { checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse { if (dependencies_ == null) { dependencies_ = getDependencies } dependencies_ } } /** * Get the array of partitions of this RDD, taking into account whether the * RDD is checkpointed or not. */ final def partitions: Array[Partition] = { checkpointRDD.map(_.partitions).getOrElse { if (partitions_ == null) { partitions_ = getPartitions } partitions_ } } /** * Get the preferred locations of a partition (as hostnames), taking into account whether the * RDD is checkpointed. */ final def preferredLocations(split: Partition): Seq[String] = { checkpointRDD.map(_.getPreferredLocations(split)).getOrElse { getPreferredLocations(split) } } /** * Internal method to this RDD; will read from cache if applicable, or otherwise compute it. * This should ''not'' be called by users directly, but is available for implementors of custom * subclasses of RDD. */ final def iterator(split: Partition, context: TaskContext): Iterator[T] = { if (storageLevel != StorageLevel.NONE) { SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel) } else { computeOrReadCheckpoint(split, context) } } /** * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing. */ private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = { if (isCheckpointed) { firstParent[T].iterator(split, context) } else { compute(split, context) } } // Transformations (return a new RDD) /** * Return a new RDD by applying a function to all elements of this RDD. */ def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f)) /** * Return a new RDD by first applying a function to all elements of this * RDD, and then flattening the results. */ def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = new FlatMappedRDD(this, sc.clean(f)) /** * Return a new RDD containing only the elements that satisfy a predicate. */ def filter(f: T => Boolean): RDD[T] = new FilteredRDD(this, sc.clean(f)) /** * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int): RDD[T] = map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) def distinct(): RDD[T] = distinct(partitions.size) /** * Return a new RDD that has exactly numPartitions partitions. * * Can increase or decrease the level of parallelism in this RDD. Internally, this uses * a shuffle to redistribute data. * * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, * which can avoid performing a shuffle. */ def repartition(numPartitions: Int): RDD[T] = { coalesce(numPartitions, true) } /** * Return a new RDD that is reduced into `numPartitions` partitions. * * This results in a narrow dependency, e.g. if you go from 1000 partitions * to 100 partitions, there will not be a shuffle, instead each of the 100 * new partitions will claim 10 of the current partitions. * * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, * this may result in your computation taking place on fewer nodes than * you like (e.g. one node in the case of numPartitions = 1). To avoid this, * you can pass shuffle = true. This will add a shuffle step, but means the * current upstream partitions will be executed in parallel (per whatever * the current partitioning is). * * Note: With shuffle = true, you can actually coalesce to a larger number * of partitions. This is useful if you have a small number of partitions, * say 100, potentially with a few partitions being abnormally large. Calling * coalesce(1000, shuffle = true) will result in 1000 partitions with the * data distributed using a hash partitioner. */ def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T] = { if (shuffle) { // include a shuffle step so that our upstream tasks are still distributed new CoalescedRDD( new ShuffledRDD[T, Null, (T, Null)](map(x => (x, null)), new HashPartitioner(numPartitions)), numPartitions).keys } else { new CoalescedRDD(this, numPartitions) } } /** * Return a sampled subset of this RDD. */ def sample(withReplacement: Boolean, fraction: Double, seed: Int): RDD[T] = new SampledRDD(this, withReplacement, fraction, seed) def takeSample(withReplacement: Boolean, num: Int, seed: Int): Array[T] = { var fraction = 0.0 var total = 0 val multiplier = 3.0 val initialCount = this.count() var maxSelected = 0 if (num < 0) { throw new IllegalArgumentException("Negative number of elements requested") } if (initialCount > Integer.MAX_VALUE - 1) { maxSelected = Integer.MAX_VALUE - 1 } else { maxSelected = initialCount.toInt } if (num > initialCount && !withReplacement) { total = maxSelected fraction = multiplier * (maxSelected + 1) / initialCount } else { fraction = multiplier * (num + 1) / initialCount total = num } val rand = new Random(seed) var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect() // If the first sample didn't turn out large enough, keep trying to take samples; // this shouldn't happen often because we use a big multiplier for thei initial size while (samples.length < total) { samples = this.sample(withReplacement, fraction, rand.nextInt()).collect() } Utils.randomizeInPlace(samples, rand).take(total) } /** * Return the union of this RDD and another one. Any identical elements will appear multiple * times (use `.distinct()` to eliminate them). */ def union(other: RDD[T]): RDD[T] = new UnionRDD(sc, Array(this, other)) /** * Return the union of this RDD and another one. Any identical elements will appear multiple * times (use `.distinct()` to eliminate them). */ def ++(other: RDD[T]): RDD[T] = this.union(other) /** * Return an RDD created by coalescing all elements within each partition into an array. */ def glom(): RDD[Array[T]] = new GlommedRDD(this) /** * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of * elements (a, b) where a is in `this` and b is in `other`. */ def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other) /** * Return an RDD of grouped items. */ def groupBy[K: ClassTag](f: T => K): RDD[(K, Seq[T])] = groupBy[K](f, defaultPartitioner(this)) /** * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements * mapping to that key. */ def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Seq[T])] = groupBy(f, new HashPartitioner(numPartitions)) /** * Return an RDD of grouped items. */ def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Seq[T])] = { val cleanF = sc.clean(f) this.map(t => (cleanF(t), t)).groupByKey(p) } /** * Return an RDD created by piping elements to a forked external process. */ def pipe(command: String): RDD[String] = new PipedRDD(this, command) /** * Return an RDD created by piping elements to a forked external process. */ def pipe(command: String, env: Map[String, String]): RDD[String] = new PipedRDD(this, command, env) /** * Return an RDD created by piping elements to a forked external process. * The print behavior can be customized by providing two functions. * * @param command command to run in forked process. * @param env environment variables to set. * @param printPipeContext Before piping elements, this function is called as an oppotunity * to pipe context data. Print line function (like out.println) will be * passed as printPipeContext's parameter. * @param printRDDElement Use this function to customize how to pipe elements. This function * will be called with each RDD element as the 1st parameter, and the * print line function (like out.println()) as the 2nd parameter. * An example of pipe the RDD data of groupBy() in a streaming way, * instead of constructing a huge String to concat all the elements: * def printRDDElement(record:(String, Seq[String]), f:String=>Unit) = * for (e <- record._2){f(e)} * @return the result RDD */ def pipe( command: Seq[String], env: Map[String, String] = Map(), printPipeContext: (String => Unit) => Unit = null, printRDDElement: (T, String => Unit) => Unit = null): RDD[String] = { new PipedRDD(this, command, env, if (printPipeContext ne null) sc.clean(printPipeContext) else null, if (printRDDElement ne null) sc.clean(printRDDElement) else null) } /** * Return a new RDD by applying a function to each partition of this RDD. */ def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = { val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(iter) new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning) } /** * Return a new RDD by applying a function to each partition of this RDD, while tracking the index * of the original partition. */ def mapPartitionsWithIndex[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = { val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(index, iter) new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning) } /** * Return a new RDD by applying a function to each partition of this RDD. This is a variant of * mapPartitions that also passes the TaskContext into the closure. */ def mapPartitionsWithContext[U: ClassTag]( f: (TaskContext, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = { val func = (context: TaskContext, index: Int, iter: Iterator[T]) => f(context, iter) new MapPartitionsRDD(this, sc.clean(func), preservesPartitioning) } /** * Return a new RDD by applying a function to each partition of this RDD, while tracking the index * of the original partition. */ @deprecated("use mapPartitionsWithIndex", "0.7.0") def mapPartitionsWithSplit[U: ClassTag]( f: (Int, Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = { mapPartitionsWithIndex(f, preservesPartitioning) } /** * Maps f over this RDD, where f takes an additional parameter of type A. This * additional parameter is produced by constructA, which is called in each * partition with the index of that partition. */ def mapWith[A: ClassTag, U: ClassTag] (constructA: Int => A, preservesPartitioning: Boolean = false) (f: (T, A) => U): RDD[U] = { mapPartitionsWithIndex((index, iter) => { val a = constructA(index) iter.map(t => f(t, a)) }, preservesPartitioning) } /** * FlatMaps f over this RDD, where f takes an additional parameter of type A. This * additional parameter is produced by constructA, which is called in each * partition with the index of that partition. */ def flatMapWith[A: ClassTag, U: ClassTag] (constructA: Int => A, preservesPartitioning: Boolean = false) (f: (T, A) => Seq[U]): RDD[U] = { mapPartitionsWithIndex((index, iter) => { val a = constructA(index) iter.flatMap(t => f(t, a)) }, preservesPartitioning) } /** * Applies f to each element of this RDD, where f takes an additional parameter of type A. * This additional parameter is produced by constructA, which is called in each * partition with the index of that partition. */ def foreachWith[A: ClassTag](constructA: Int => A)(f: (T, A) => Unit) { mapPartitionsWithIndex { (index, iter) => val a = constructA(index) iter.map(t => {f(t, a); t}) }.foreach(_ => {}) } /** * Filters this RDD with p, where p takes an additional parameter of type A. This * additional parameter is produced by constructA, which is called in each * partition with the index of that partition. */ def filterWith[A: ClassTag](constructA: Int => A)(p: (T, A) => Boolean): RDD[T] = { mapPartitionsWithIndex((index, iter) => { val a = constructA(index) iter.filter(t => p(t, a)) }, preservesPartitioning = true) } /** * Zips this RDD with another one, returning key-value pairs with the first element in each RDD, * second element in each RDD, etc. Assumes that the two RDDs have the *same number of * partitions* and the *same number of elements in each partition* (e.g. one was made through * a map on the other). */ def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = new ZippedRDD(sc, this, other) /** * Zip this RDD's partitions with one (or more) RDD(s) and return a new RDD by * applying a function to the zipped partitions. Assumes that all the RDDs have the * *same number of partitions*, but does *not* require them to have the same number * of elements in each partition. */ def zipPartitions[B: ClassTag, V: ClassTag] (rdd2: RDD[B]) (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] = new ZippedPartitionsRDD2(sc, sc.clean(f), this, rdd2, false) def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag] (rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean) (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, preservesPartitioning) def zipPartitions[B: ClassTag, C: ClassTag, V: ClassTag] (rdd2: RDD[B], rdd3: RDD[C]) (f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V]): RDD[V] = new ZippedPartitionsRDD3(sc, sc.clean(f), this, rdd2, rdd3, false) def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag] (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean) (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, preservesPartitioning) def zipPartitions[B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag] (rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D]) (f: (Iterator[T], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V]): RDD[V] = new ZippedPartitionsRDD4(sc, sc.clean(f), this, rdd2, rdd3, rdd4, false) // Actions (launch a job to return a value to the user program) /** * Applies a function f to all elements of this RDD. */ def foreach(f: T => Unit) { sc.runJob(this, (iter: Iterator[T]) => iter.foreach(f)) } /** * Applies a function f to each partition of this RDD. */ def foreachPartition(f: Iterator[T] => Unit) { sc.runJob(this, (iter: Iterator[T]) => f(iter)) } /** * Return an array that contains all of the elements in this RDD. */ def collect(): Array[T] = { val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*) } /** * Return an array that contains all of the elements in this RDD. */ def toArray(): Array[T] = collect() /** * Return an RDD that contains all matching values by applying `f`. */ def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = { filter(f.isDefinedAt).map(f) } /** * Return an RDD with the elements from `this` that are not in `other`. * * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting * RDD will be <= us. */ def subtract(other: RDD[T]): RDD[T] = subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.size))) /** * Return an RDD with the elements from `this` that are not in `other`. */ def subtract(other: RDD[T], numPartitions: Int): RDD[T] = subtract(other, new HashPartitioner(numPartitions)) /** * Return an RDD with the elements from `this` that are not in `other`. */ def subtract(other: RDD[T], p: Partitioner): RDD[T] = { if (partitioner == Some(p)) { // Our partitioner knows how to handle T (which, since we have a partitioner, is // really (K, V)) so make a new Partitioner that will de-tuple our fake tuples val p2 = new Partitioner() { override def numPartitions = p.numPartitions override def getPartition(k: Any) = p.getPartition(k.asInstanceOf[(Any, _)]._1) } // Unfortunately, since we're making a new p2, we'll get ShuffleDependencies // anyway, and when calling .keys, will not have a partitioner set, even though // the SubtractedRDD will, thanks to p2's de-tupled partitioning, already be // partitioned by the right/real keys (e.g. p). this.map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys } else { this.map(x => (x, null)).subtractByKey(other.map((_, null)), p).keys } } /** * Reduces the elements of this RDD using the specified commutative and associative binary operator. */ def reduce(f: (T, T) => T): T = { val cleanF = sc.clean(f) val reducePartition: Iterator[T] => Option[T] = iter => { if (iter.hasNext) { Some(iter.reduceLeft(cleanF)) } else { None } } var jobResult: Option[T] = None val mergeResult = (index: Int, taskResult: Option[T]) => { if (taskResult != None) { jobResult = jobResult match { case Some(value) => Some(f(value, taskResult.get)) case None => taskResult } } } sc.runJob(this, reducePartition, mergeResult) // Get the final result out of our Option, or throw an exception if the RDD was empty jobResult.getOrElse(throw new UnsupportedOperationException("empty collection")) } /** * Aggregate the elements of each partition, and then the results for all the partitions, using a * given associative function and a neutral "zero value". The function op(t1, t2) is allowed to * modify t1 and return it as its result value to avoid object allocation; however, it should not * modify t2. */ def fold(zeroValue: T)(op: (T, T) => T): T = { // Clone the zero value since we will also be serializing it as part of tasks var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance()) val cleanOp = sc.clean(op) val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp) val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult) sc.runJob(this, foldPartition, mergeResult) jobResult } /** * Aggregate the elements of each partition, and then the results for all the partitions, using * given combine functions and a neutral "zero value". This function can return a different result * type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U * and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are * allowed to modify and return their first argument instead of creating a new U to avoid memory * allocation. */ def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = { // Clone the zero value since we will also be serializing it as part of tasks var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance()) val cleanSeqOp = sc.clean(seqOp) val cleanCombOp = sc.clean(combOp) val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult) sc.runJob(this, aggregatePartition, mergeResult) jobResult } /** * Return the number of elements in the RDD. */ def count(): Long = { sc.runJob(this, (iter: Iterator[T]) => { // Use a while loop to count the number of elements rather than iter.size because // iter.size uses a for loop, which is slightly slower in current version of Scala. var result = 0L while (iter.hasNext) { result += 1L iter.next() } result }).sum } /** * (Experimental) Approximate version of count() that returns a potentially incomplete result * within a timeout, even if not all tasks have finished. */ def countApprox(timeout: Long, confidence: Double = 0.95): PartialResult[BoundedDouble] = { val countElements: (TaskContext, Iterator[T]) => Long = { (ctx, iter) => var result = 0L while (iter.hasNext) { result += 1L iter.next() } result } val evaluator = new CountEvaluator(partitions.size, confidence) sc.runApproximateJob(this, countElements, evaluator, timeout) } /** * Return the count of each unique value in this RDD as a map of (value, count) pairs. The final * combine step happens locally on the master, equivalent to running a single reduce task. */ def countByValue(): Map[T, Long] = { if (elementClassTag.runtimeClass.isArray) { throw new SparkException("countByValue() does not support arrays") } // TODO: This should perhaps be distributed by default. def countPartition(iter: Iterator[T]): Iterator[OLMap[T]] = { val map = new OLMap[T] while (iter.hasNext) { val v = iter.next() map.put(v, map.getLong(v) + 1L) } Iterator(map) } def mergeMaps(m1: OLMap[T], m2: OLMap[T]): OLMap[T] = { val iter = m2.object2LongEntrySet.fastIterator() while (iter.hasNext) { val entry = iter.next() m1.put(entry.getKey, m1.getLong(entry.getKey) + entry.getLongValue) } return m1 } val myResult = mapPartitions(countPartition).reduce(mergeMaps) myResult.asInstanceOf[java.util.Map[T, Long]] // Will be wrapped as a Scala mutable Map } /** * (Experimental) Approximate version of countByValue(). */ def countByValueApprox( timeout: Long, confidence: Double = 0.95 ): PartialResult[Map[T, BoundedDouble]] = { if (elementClassTag.runtimeClass.isArray) { throw new SparkException("countByValueApprox() does not support arrays") } val countPartition: (TaskContext, Iterator[T]) => OLMap[T] = { (ctx, iter) => val map = new OLMap[T] while (iter.hasNext) { val v = iter.next() map.put(v, map.getLong(v) + 1L) } map } val evaluator = new GroupedCountEvaluator[T](partitions.size, confidence) sc.runApproximateJob(this, countPartition, evaluator, timeout) } /** * Take the first num elements of the RDD. It works by first scanning one partition, and use the * results from that partition to estimate the number of additional partitions needed to satisfy * the limit. */ def take(num: Int): Array[T] = { if (num == 0) { return new Array[T](0) } val buf = new ArrayBuffer[T] val totalParts = this.partitions.length var partsScanned = 0 while (buf.size < num && partsScanned < totalParts) { // The number of partitions to try in this iteration. It is ok for this number to be // greater than totalParts because we actually cap it at totalParts in runJob. var numPartsToTry = 1 if (partsScanned > 0) { // If we didn't find any rows after the first iteration, just try all partitions next. // Otherwise, interpolate the number of partitions we need to try, but overestimate it // by 50%. if (buf.size == 0) { numPartsToTry = totalParts - 1 } else { numPartsToTry = (1.5 * num * partsScanned / buf.size).toInt } } numPartsToTry = math.max(0, numPartsToTry) // guard against negative num of partitions val left = num - buf.size val p = partsScanned until math.min(partsScanned + numPartsToTry, totalParts) val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, p, allowLocal = true) res.foreach(buf ++= _.take(num - buf.size)) partsScanned += numPartsToTry } return buf.toArray } /** * Return the first element in this RDD. */ def first(): T = take(1) match { case Array(t) => t case _ => throw new UnsupportedOperationException("empty collection") } /** * Returns the top K elements from this RDD as defined by * the specified implicit Ordering[T]. * @param num the number of top elements to return * @param ord the implicit ordering for T * @return an array of top elements */ def top(num: Int)(implicit ord: Ordering[T]): Array[T] = { mapPartitions { items => val queue = new BoundedPriorityQueue[T](num) queue ++= items Iterator.single(queue) }.reduce { (queue1, queue2) => queue1 ++= queue2 queue1 }.toArray.sorted(ord.reverse) } /** * Returns the first K elements from this RDD as defined by * the specified implicit Ordering[T] and maintains the * ordering. * @param num the number of top elements to return * @param ord the implicit ordering for T * @return an array of top elements */ def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = top(num)(ord.reverse) /** * Save this RDD as a text file, using string representations of elements. */ def saveAsTextFile(path: String) { this.map(x => (NullWritable.get(), new Text(x.toString))) .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path) } /** * Save this RDD as a compressed text file, using string representations of elements. */ def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) { this.map(x => (NullWritable.get(), new Text(x.toString))) .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path, codec) } /** * Save this RDD as a SequenceFile of serialized objects. */ def saveAsObjectFile(path: String) { this.mapPartitions(iter => iter.grouped(10).map(_.toArray)) .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x)))) .saveAsSequenceFile(path) } /** * Creates tuples of the elements in this RDD by applying `f`. */ def keyBy[K](f: T => K): RDD[(K, T)] = { map(x => (f(x), x)) } /** A private method for tests, to look at the contents of each partition */ private[spark] def collectPartitions(): Array[Array[T]] = { sc.runJob(this, (iter: Iterator[T]) => iter.toArray) } /** * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint * directory set with SparkContext.setCheckpointDir() and all references to its parent * RDDs will be removed. This function must be called before any job has been * executed on this RDD. It is strongly recommended that this RDD is persisted in * memory, otherwise saving it on a file will require recomputation. */ def checkpoint() { if (context.checkpointDir.isEmpty) { throw new Exception("Checkpoint directory has not been set in the SparkContext") } else if (checkpointData.isEmpty) { checkpointData = Some(new RDDCheckpointData(this)) checkpointData.get.markForCheckpoint() } } /** * Return whether this RDD has been checkpointed or not */ def isCheckpointed: Boolean = { checkpointData.map(_.isCheckpointed).getOrElse(false) } /** * Gets the name of the file to which this RDD was checkpointed */ def getCheckpointFile: Option[String] = { checkpointData.flatMap(_.getCheckpointFile) } // ======================================================================= // Other internal methods and fields // ======================================================================= private var storageLevel: StorageLevel = StorageLevel.NONE /** Record user function generating this RDD. */ @transient private[spark] val origin = Utils.formatSparkCallSite private[spark] def elementClassTag: ClassTag[T] = classTag[T] private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None /** Returns the first parent RDD */ protected[spark] def firstParent[U: ClassTag] = { dependencies.head.rdd.asInstanceOf[RDD[U]] } /** The [[org.apache.spark.SparkContext]] that this RDD was created on. */ def context = sc // Avoid handling doCheckpoint multiple times to prevent excessive recursion @transient private var doCheckpointCalled = false /** * Performs the checkpointing of this RDD by saving this. It is called by the DAGScheduler * after a job using this RDD has completed (therefore the RDD has been materialized and * potentially stored in memory). doCheckpoint() is called recursively on the parent RDDs. */ private[spark] def doCheckpoint() { if (!doCheckpointCalled) { doCheckpointCalled = true if (checkpointData.isDefined) { checkpointData.get.doCheckpoint() } else { dependencies.foreach(_.rdd.doCheckpoint()) } } } /** * Changes the dependencies of this RDD from its original parents to a new RDD (`newRDD`) * created from the checkpoint file, and forget its old dependencies and partitions. */ private[spark] def markCheckpointed(checkpointRDD: RDD[_]) { clearDependencies() partitions_ = null deps = null // Forget the constructor argument for dependencies too } /** * Clears the dependencies of this RDD. This method must ensure that all references * to the original parent RDDs is removed to enable the parent RDDs to be garbage * collected. Subclasses of RDD may override this method for implementing their own cleaning * logic. See [[org.apache.spark.rdd.UnionRDD]] for an example. */ protected def clearDependencies() { dependencies_ = null } /** A description of this RDD and its recursive dependencies for debugging. */ def toDebugString: String = { def debugString(rdd: RDD[_], prefix: String = ""): Seq[String] = { Seq(prefix + rdd + " (" + rdd.partitions.size + " partitions)") ++ rdd.dependencies.flatMap(d => debugString(d.rdd, prefix + " ")) } debugString(this).mkString("\n") } override def toString: String = "%s%s[%d] at %s".format( Option(name).map(_ + " ").getOrElse(""), getClass.getSimpleName, id, origin) def toJavaRDD() : JavaRDD[T] = { new JavaRDD(this)(elementClassTag) } }