/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.spark.rdd import java.nio.ByteBuffer import java.text.SimpleDateFormat import java.util.Date import java.util.{HashMap => JHashMap} import scala.collection.JavaConversions._ import scala.collection.Map import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.io.SequenceFile.CompressionType import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat} import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, Job => NewAPIHadoopJob, RecordWriter => NewRecordWriter, SparkHadoopMapReduceUtil} import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat} import org.apache.spark._ import org.apache.spark.annotation.Experimental import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.SparkHadoopWriter import org.apache.spark.Partitioner.defaultPartitioner import org.apache.spark.SparkContext._ import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.serializer.Serializer /** * Extra functions available on RDDs of (key, value) pairs through an implicit conversion. * Import `org.apache.spark.SparkContext._` at the top of your program to use these functions. */ class PairRDDFunctions[K, V](self: RDD[(K, V)]) (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null) extends Logging with SparkHadoopMapReduceUtil with Serializable { /** * Generic function to combine the elements for each key using a custom set of aggregation * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C * Note that V and C can be different -- for example, one might group an RDD of type * (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions: * * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list) * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list) * - `mergeCombiners`, to combine two C's into a single one. * * In addition, users can control the partitioning of the output RDD, and whether to perform * map-side aggregation (if a mapper can produce multiple items with the same key). */ def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)] = { require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0 if (keyClass.isArray) { if (mapSideCombine) { throw new SparkException("Cannot use map-side combining with array keys.") } if (partitioner.isInstanceOf[HashPartitioner]) { throw new SparkException("Default partitioner cannot partition array keys.") } } val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) if (self.partitioner == Some(partitioner)) { self.mapPartitionsWithContext((context, iter) => { new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } else if (mapSideCombine) { val combined = self.mapPartitionsWithContext((context, iter) => { aggregator.combineValuesByKey(iter, context) }, preservesPartitioning = true) val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner) .setSerializer(serializer) partitioned.mapPartitionsWithContext((context, iter) => { new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context)) }, preservesPartitioning = true) } else { // Don't apply map-side combiner. val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializer) values.mapPartitionsWithContext((context, iter) => { new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } } /** * Simplified version of combineByKey that hash-partitions the output RDD. */ def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)] = { combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions)) } /** * Merge the values for each key using an associative function and a neutral "zero value" which * may be added to the result an arbitrary number of times, and must not change the result * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.). */ def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = { // Serialize the zero value to a byte array so that we can get a new clone of it on each key val zeroBuffer = SparkEnv.get.closureSerializer.newInstance().serialize(zeroValue) val zeroArray = new Array[Byte](zeroBuffer.limit) zeroBuffer.get(zeroArray) // When deserializing, use a lazy val to create just one instance of the serializer per task lazy val cachedSerializer = SparkEnv.get.closureSerializer.newInstance() def createZero() = cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray)) combineByKey[V]((v: V) => func(createZero(), v), func, func, partitioner) } /** * Merge the values for each key using an associative function and a neutral "zero value" which * may be added to the result an arbitrary number of times, and must not change the result * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.). */ def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] = { foldByKey(zeroValue, new HashPartitioner(numPartitions))(func) } /** * Merge the values for each key using an associative function and a neutral "zero value" which * may be added to the result an arbitrary number of times, and must not change the result * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.). */ def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = { foldByKey(zeroValue, defaultPartitioner(self))(func) } /** * Merge the values for each key using an associative reduce function. This will also perform * the merging locally on each mapper before sending results to a reducer, similarly to a * "combiner" in MapReduce. */ def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = { combineByKey[V]((v: V) => v, func, func, partitioner) } /** * Merge the values for each key using an associative reduce function, but return the results * immediately to the master as a Map. This will also perform the merging locally on each mapper * before sending results to a reducer, similarly to a "combiner" in MapReduce. */ def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = { if (keyClass.isArray) { throw new SparkException("reduceByKeyLocally() does not support array keys") } def reducePartition(iter: Iterator[(K, V)]): Iterator[JHashMap[K, V]] = { val map = new JHashMap[K, V] iter.foreach { case (k, v) => val old = map.get(k) map.put(k, if (old == null) v else func(old, v)) } Iterator(map) } def mergeMaps(m1: JHashMap[K, V], m2: JHashMap[K, V]): JHashMap[K, V] = { m2.foreach { case (k, v) => val old = m1.get(k) m1.put(k, if (old == null) v else func(old, v)) } m1 } self.mapPartitions(reducePartition).reduce(mergeMaps) } /** Alias for reduceByKeyLocally */ @deprecated("Use reduceByKeyLocally", "1.0.0") def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = reduceByKeyLocally(func) /** Count the number of elements for each key, and return the result to the master as a Map. */ def countByKey(): Map[K, Long] = self.map(_._1).countByValue() /** * :: Experimental :: * Approximate version of countByKey that can return a partial result if it does * not finish within a timeout. */ @Experimental def countByKeyApprox(timeout: Long, confidence: Double = 0.95) : PartialResult[Map[K, BoundedDouble]] = { self.map(_._1).countByValueApprox(timeout, confidence) } /** * :: Experimental :: * * Return approximate number of distinct values for each key in this RDD. * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available * here. * * The relative accuracy is approximately `1.054 / sqrt(2^p)`. Setting a nonzero `sp > p` * would trigger sparse representation of registers, which may reduce the memory consumption * and increase accuracy when the cardinality is small. * * @param p The precision value for the normal set. * `p` must be a value between 4 and `sp` if `sp` is not zero (32 max). * @param sp The precision value for the sparse set, between 0 and 32. * If `sp` equals 0, the sparse representation is skipped. * @param partitioner Partitioner to use for the resulting RDD. */ @Experimental def countApproxDistinctByKey(p: Int, sp: Int, partitioner: Partitioner): RDD[(K, Long)] = { require(p >= 4, s"p ($p) must be >= 4") require(sp <= 32, s"sp ($sp) must be <= 32") require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)") val createHLL = (v: V) => { val hll = new HyperLogLogPlus(p, sp) hll.offer(v) hll } val mergeValueHLL = (hll: HyperLogLogPlus, v: V) => { hll.offer(v) hll } val mergeHLL = (h1: HyperLogLogPlus, h2: HyperLogLogPlus) => { h1.addAll(h2) h1 } combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.cardinality()) } /** * Return approximate number of distinct values for each key in this RDD. * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available * here. * * @param relativeSD Relative accuracy. Smaller values create counters that require more space. * It must be greater than 0.000017. * @param partitioner partitioner of the resulting RDD */ def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): RDD[(K, Long)] = { require(relativeSD > 0.000017, s"accuracy ($relativeSD) must be greater than 0.000017") val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt assert(p <= 32) countApproxDistinctByKey(if (p < 4) 4 else p, 0, partitioner) } /** * Return approximate number of distinct values for each key in this RDD. * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available * here. * * @param relativeSD Relative accuracy. Smaller values create counters that require more space. * It must be greater than 0.000017. * @param numPartitions number of partitions of the resulting RDD */ def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): RDD[(K, Long)] = { countApproxDistinctByKey(relativeSD, new HashPartitioner(numPartitions)) } /** * Return approximate number of distinct values for each key in this RDD. * * The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice: * Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available * here. * * @param relativeSD Relative accuracy. Smaller values create counters that require more space. * It must be greater than 0.000017. */ def countApproxDistinctByKey(relativeSD: Double = 0.05): RDD[(K, Long)] = { countApproxDistinctByKey(relativeSD, defaultPartitioner(self)) } /** * Merge the values for each key using an associative reduce function. This will also perform * the merging locally on each mapper before sending results to a reducer, similarly to a * "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions. */ def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = { reduceByKey(new HashPartitioner(numPartitions), func) } /** * Group the values for each key in the RDD into a single sequence. Allows controlling the * partitioning of the resulting key-value pair RDD by passing a Partitioner. * * Note: If you are grouping in order to perform an aggregation (such as a sum or average) over * each key, using [[PairRDDFunctions.reduceByKey]] or [[PairRDDFunctions.combineByKey]] * will provide much better performance. */ def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = { // groupByKey shouldn't use map side combine because map side combine does not // reduce the amount of data shuffled and requires all map side data be inserted // into a hash table, leading to more objects in the old gen. def createCombiner(v: V) = ArrayBuffer(v) def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v def mergeCombiners(c1: ArrayBuffer[V], c2: ArrayBuffer[V]) = c1 ++ c2 val bufs = combineByKey[ArrayBuffer[V]]( createCombiner _, mergeValue _, mergeCombiners _, partitioner, mapSideCombine=false) bufs.mapValues(_.toIterable) } /** * Group the values for each key in the RDD into a single sequence. Hash-partitions the * resulting RDD with into `numPartitions` partitions. * * Note: If you are grouping in order to perform an aggregation (such as a sum or average) over * each key, using [[PairRDDFunctions.reduceByKey]] or [[PairRDDFunctions.combineByKey]] * will provide much better performance. */ def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = { groupByKey(new HashPartitioner(numPartitions)) } /** * Return a copy of the RDD partitioned using the specified partitioner. */ def partitionBy(partitioner: Partitioner): RDD[(K, V)] = { if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) { throw new SparkException("Default partitioner cannot partition array keys.") } if (self.partitioner == Some(partitioner)) { self } else { new ShuffledRDD[K, V, (K, V)](self, partitioner) } } /** * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD. */ def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = { this.cogroup(other, partitioner).flatMapValues { case (vs, ws) => for (v <- vs; w <- ws) yield (v, w) } } /** * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to * partition the output RDD. */ def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = { this.cogroup(other, partitioner).flatMapValues { case (vs, ws) => if (ws.isEmpty) { vs.map(v => (v, None)) } else { for (v <- vs; w <- ws) yield (v, Some(w)) } } } /** * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the * pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to * partition the output RDD. */ def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Option[V], W))] = { this.cogroup(other, partitioner).flatMapValues { case (vs, ws) => if (vs.isEmpty) { ws.map(w => (None, w)) } else { for (v <- vs; w <- ws) yield (Some(v), w) } } } /** * Simplified version of combineByKey that hash-partitions the resulting RDD using the * existing partitioner/parallelism level. */ def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) : RDD[(K, C)] = { combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self)) } /** * Merge the values for each key using an associative reduce function. This will also perform * the merging locally on each mapper before sending results to a reducer, similarly to a * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/ * parallelism level. */ def reduceByKey(func: (V, V) => V): RDD[(K, V)] = { reduceByKey(defaultPartitioner(self), func) } /** * Group the values for each key in the RDD into a single sequence. Hash-partitions the * resulting RDD with the existing partitioner/parallelism level. * * Note: If you are grouping in order to perform an aggregation (such as a sum or average) over * each key, using [[PairRDDFunctions.reduceByKey]] or [[PairRDDFunctions.combineByKey]] * will provide much better performance, */ def groupByKey(): RDD[(K, Iterable[V])] = { groupByKey(defaultPartitioner(self)) } /** * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Performs a hash join across the cluster. */ def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = { join(other, defaultPartitioner(self, other)) } /** * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Performs a hash join across the cluster. */ def join[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, W))] = { join(other, new HashPartitioner(numPartitions)) } /** * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output * using the existing partitioner/parallelism level. */ def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = { leftOuterJoin(other, defaultPartitioner(self, other)) } /** * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the * pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output * into `numPartitions` partitions. */ def leftOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (V, Option[W]))] = { leftOuterJoin(other, new HashPartitioner(numPartitions)) } /** * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting * RDD using the existing partitioner/parallelism level. */ def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = { rightOuterJoin(other, defaultPartitioner(self, other)) } /** * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the * pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting * RDD into the given number of partitions. */ def rightOuterJoin[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Option[V], W))] = { rightOuterJoin(other, new HashPartitioner(numPartitions)) } /** * Return the key-value pairs in this RDD to the master as a Map. */ def collectAsMap(): Map[K, V] = { val data = self.collect() val map = new mutable.HashMap[K, V] map.sizeHint(data.length) data.foreach { case (k, v) => map.put(k, v) } map } /** * Pass each value in the key-value pair RDD through a map function without changing the keys; * this also retains the original RDD's partitioning. */ def mapValues[U](f: V => U): RDD[(K, U)] = { val cleanF = self.context.clean(f) new MappedValuesRDD(self, cleanF) } /** * Pass each value in the key-value pair RDD through a flatMap function without changing the * keys; this also retains the original RDD's partitioning. */ def flatMapValues[U](f: V => TraversableOnce[U]): RDD[(K, U)] = { val cleanF = self.context.clean(f) new FlatMappedValuesRDD(self, cleanF) } /** * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner) : RDD[(K, (Iterable[V], Iterable[W]))] = { if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) { throw new SparkException("Default partitioner cannot partition array keys.") } val cg = new CoGroupedRDD[K](Seq(self, other), partitioner) cg.mapValues { case Seq(vs, ws) => (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]]) } } /** * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner) : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = { if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) { throw new SparkException("Default partitioner cannot partition array keys.") } val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner) cg.mapValues { case Seq(vs, w1s, w2s) => (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]]) } } /** * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = { cogroup(other, defaultPartitioner(self, other)) } /** * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = { cogroup(other1, other2, defaultPartitioner(self, other1, other2)) } /** * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = { cogroup(other, new HashPartitioner(numPartitions)) } /** * For each key k in `this` or `other1` or `other2`, return a resulting RDD that contains a * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int) : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = { cogroup(other1, other2, new HashPartitioner(numPartitions)) } /** Alias for cogroup. */ def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = { cogroup(other, defaultPartitioner(self, other)) } /** Alias for cogroup. */ def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = { cogroup(other1, other2, defaultPartitioner(self, other1, other2)) } /** * Return an RDD with the pairs from `this` whose keys are not in `other`. * * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting * RDD will be <= us. */ def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)] = subtractByKey(other, self.partitioner.getOrElse(new HashPartitioner(self.partitions.size))) /** Return an RDD with the pairs from `this` whose keys are not in `other`. */ def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)] = subtractByKey(other, new HashPartitioner(numPartitions)) /** Return an RDD with the pairs from `this` whose keys are not in `other`. */ def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)] = new SubtractedRDD[K, V, W](self, other, p) /** * Return the list of values in the RDD for key `key`. This operation is done efficiently if the * RDD has a known partitioner by only searching the partition that the key maps to. */ def lookup(key: K): Seq[V] = { self.partitioner match { case Some(p) => val index = p.getPartition(key) def process(it: Iterator[(K, V)]): Seq[V] = { val buf = new ArrayBuffer[V] for ((k, v) <- it if k == key) { buf += v } buf } val res = self.context.runJob(self, process _, Array(index), false) res(0) case None => self.filter(_._1 == key).map(_._2).collect() } } /** * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class * supporting the key and value types K and V in this RDD. */ def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]) { saveAsHadoopFile(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]]) } /** * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class * supporting the key and value types K and V in this RDD. Compress the result with the * supplied codec. */ def saveAsHadoopFile[F <: OutputFormat[K, V]]( path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassTag[F]) { val runtimeClass = fm.runtimeClass saveAsHadoopFile(path, keyClass, valueClass, runtimeClass.asInstanceOf[Class[F]], codec) } /** * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat` * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD. */ def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]) { saveAsNewAPIHadoopFile(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]]) } /** * Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat` * (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD. */ def saveAsNewAPIHadoopFile( path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: NewOutputFormat[_, _]], conf: Configuration = self.context.hadoopConfiguration) { val job = new NewAPIHadoopJob(conf) job.setOutputKeyClass(keyClass) job.setOutputValueClass(valueClass) job.setOutputFormatClass(outputFormatClass) job.getConfiguration.set("mapred.output.dir", path) saveAsNewAPIHadoopDataset(job.getConfiguration) } /** * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class * supporting the key and value types K and V in this RDD. Compress with the supplied codec. */ def saveAsHadoopFile( path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], codec: Class[_ <: CompressionCodec]) { saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, new JobConf(self.context.hadoopConfiguration), Some(codec)) } /** * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class * supporting the key and value types K and V in this RDD. */ def saveAsHadoopFile( path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf = new JobConf(self.context.hadoopConfiguration), codec: Option[Class[_ <: CompressionCodec]] = None) { conf.setOutputKeyClass(keyClass) conf.setOutputValueClass(valueClass) // Doesn't work in Scala 2.9 due to what may be a generics bug // TODO: Should we uncomment this for Scala 2.10? // conf.setOutputFormat(outputFormatClass) conf.set("mapred.output.format.class", outputFormatClass.getName) for (c <- codec) { conf.setCompressMapOutput(true) conf.set("mapred.output.compress", "true") conf.setMapOutputCompressorClass(c) conf.set("mapred.output.compression.codec", c.getCanonicalName) conf.set("mapred.output.compression.type", CompressionType.BLOCK.toString) } conf.setOutputCommitter(classOf[FileOutputCommitter]) FileOutputFormat.setOutputPath(conf, SparkHadoopWriter.createPathFromString(path, conf)) saveAsHadoopDataset(conf) } /** * Output the RDD to any Hadoop-supported storage system with new Hadoop API, using a Hadoop * Configuration object for that storage system. The Conf should set an OutputFormat and any * output paths required (e.g. a table name to write to) in the same way as it would be * configured for a Hadoop MapReduce job. */ def saveAsNewAPIHadoopDataset(conf: Configuration) { val job = new NewAPIHadoopJob(conf) val formatter = new SimpleDateFormat("yyyyMMddHHmm") val jobtrackerID = formatter.format(new Date()) val stageId = self.id val wrappedConf = new SerializableWritable(job.getConfiguration) val outfmt = job.getOutputFormatClass val jobFormat = outfmt.newInstance if (jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) { // FileOutputFormat ignores the filesystem parameter jobFormat.checkOutputSpecs(job) } def writeShard(context: TaskContext, iter: Iterator[(K,V)]): Int = { // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it // around by taking a mod. We expect that no task will be attempted 2 billion times. val attemptNumber = (context.attemptId % Int.MaxValue).toInt /* "reduce task" */ val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId, attemptNumber) val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId) val format = outfmt.newInstance format match { case c: Configurable => c.setConf(wrappedConf.value) case _ => () } val committer = format.getOutputCommitter(hadoopContext) committer.setupTask(hadoopContext) val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]] try { while (iter.hasNext) { val (k, v) = iter.next() writer.write(k, v) } } finally { writer.close(hadoopContext) } committer.commitTask(hadoopContext) return 1 } val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0) val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId) val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext) jobCommitter.setupJob(jobTaskContext) self.context.runJob(self, writeShard _) jobCommitter.commitJob(jobTaskContext) } /** * Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for * that storage system. The JobConf should set an OutputFormat and any output paths required * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop * MapReduce job. */ def saveAsHadoopDataset(conf: JobConf) { val outputFormatInstance = conf.getOutputFormat val keyClass = conf.getOutputKeyClass val valueClass = conf.getOutputValueClass if (outputFormatInstance == null) { throw new SparkException("Output format class not set") } if (keyClass == null) { throw new SparkException("Output key class not set") } if (valueClass == null) { throw new SparkException("Output value class not set") } SparkHadoopUtil.get.addCredentials(conf) logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " + valueClass.getSimpleName + ")") if (outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) { // FileOutputFormat ignores the filesystem parameter val ignoredFs = FileSystem.get(conf) conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf) } val writer = new SparkHadoopWriter(conf) writer.preSetup() def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) { // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it // around by taking a mod. We expect that no task will be attempted 2 billion times. val attemptNumber = (context.attemptId % Int.MaxValue).toInt writer.setup(context.stageId, context.partitionId, attemptNumber) writer.open() try { var count = 0 while(iter.hasNext) { val record = iter.next() count += 1 writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef]) } } finally { writer.close() } writer.commit() } self.context.runJob(self, writeToFile _) writer.commitJob() } /** * Return an RDD with the keys of each tuple. */ def keys: RDD[K] = self.map(_._1) /** * Return an RDD with the values of each tuple. */ def values: RDD[V] = self.map(_._2) private[spark] def keyClass: Class[_] = kt.runtimeClass private[spark] def valueClass: Class[_] = vt.runtimeClass private[spark] def keyOrdering: Option[Ordering[K]] = Option(ord) }