From 82bf4c0339808f51c9cdffa6a0a829cb5981d92d Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sun, 18 Aug 2013 20:25:45 -0700 Subject: Allow subclasses of Product2 in all key-value related classes (ShuffleDependency, PairRDDFunctions, etc). --- core/src/main/scala/spark/Aggregator.scala | 8 +- core/src/main/scala/spark/Dependency.scala | 4 +- core/src/main/scala/spark/PairRDDFunctions.scala | 104 +++++---------------- core/src/main/scala/spark/Partitioner.scala | 2 +- core/src/main/scala/spark/RDD.scala | 5 +- core/src/main/scala/spark/SparkContext.scala | 9 +- .../main/scala/spark/api/java/JavaPairRDD.scala | 13 +-- core/src/main/scala/spark/rdd/CoGroupedRDD.scala | 6 +- core/src/main/scala/spark/rdd/ShuffledRDD.scala | 6 +- core/src/main/scala/spark/rdd/SubtractedRDD.scala | 6 +- 10 files changed, 56 insertions(+), 107 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/Aggregator.scala b/core/src/main/scala/spark/Aggregator.scala index 136b4da61e..3920f8511c 100644 --- a/core/src/main/scala/spark/Aggregator.scala +++ b/core/src/main/scala/spark/Aggregator.scala @@ -28,11 +28,11 @@ import scala.collection.JavaConversions._ * @param mergeCombiners function to merge outputs from multiple mergeValue function. */ case class Aggregator[K, V, C] ( - val createCombiner: V => C, - val mergeValue: (C, V) => C, - val mergeCombiners: (C, C) => C) { + createCombiner: V => C, + mergeValue: (C, V) => C, + mergeCombiners: (C, C) => C) { - def combineValuesByKey(iter: Iterator[(K, V)]) : Iterator[(K, C)] = { + def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = { val combiners = new JHashMap[K, C] for ((k, v) <- iter) { val oldC = combiners.get(k) diff --git a/core/src/main/scala/spark/Dependency.scala b/core/src/main/scala/spark/Dependency.scala index b1edaa06f8..d5a9606570 100644 --- a/core/src/main/scala/spark/Dependency.scala +++ b/core/src/main/scala/spark/Dependency.scala @@ -44,10 +44,10 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) { * @param serializerClass class name of the serializer to use */ class ShuffleDependency[K, V]( - @transient rdd: RDD[(K, V)], + @transient rdd: RDD[_ <: Product2[K, V]], val partitioner: Partitioner, val serializerClass: String = null) - extends Dependency(rdd) { + extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) { val shuffleId: Int = rdd.context.newShuffleId() } diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 0be4b4feb8..3ae703ce1a 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -21,9 +21,8 @@ import java.nio.ByteBuffer import java.util.{Date, HashMap => JHashMap} import java.text.SimpleDateFormat -import scala.collection.Map +import scala.collection.{mutable, Map} import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.HashMap import scala.collection.JavaConversions._ import org.apache.hadoop.conf.Configuration @@ -38,7 +37,6 @@ import org.apache.hadoop.mapred.OutputFormat import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat} import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, HadoopMapReduceUtil} -import org.apache.hadoop.security.UserGroupInformation import spark.partial.BoundedDouble import spark.partial.PartialResult @@ -50,8 +48,7 @@ import spark.Partitioner._ * Extra functions available on RDDs of (key, value) pairs through an implicit conversion. * Import `spark.SparkContext._` at the top of your program to use these functions. */ -class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( - self: RDD[(K, V)]) +class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[_ <: Product2[K, V]]) extends Logging with HadoopMapReduceUtil with Serializable { @@ -85,18 +82,17 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( } val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) if (self.partitioner == Some(partitioner)) { - self.mapPartitions(aggregator.combineValuesByKey, true) + self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true) } else if (mapSideCombine) { - val mapSideCombined = self.mapPartitions(aggregator.combineValuesByKey, true) - val partitioned = new ShuffledRDD[K, C](mapSideCombined, partitioner) - .setSerializer(serializerClass) - partitioned.mapPartitions(aggregator.combineCombinersByKey, true) + val combined = self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true) + val partitioned = new ShuffledRDD[K, C](combined, partitioner).setSerializer(serializerClass) + partitioned.mapPartitions(aggregator.combineCombinersByKey, preservesPartitioning = true) } else { // Don't apply map-side combiner. // A sanity check to make sure mergeCombiners is not defined. assert(mergeCombiners == null) val values = new ShuffledRDD[K, V](self, partitioner).setSerializer(serializerClass) - values.mapPartitions(aggregator.combineValuesByKey, true) + values.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true) } } @@ -166,7 +162,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( throw new SparkException("reduceByKeyLocally() does not support array keys") } - def reducePartition(iter: Iterator[(K, V)]): Iterator[JHashMap[K, V]] = { + def reducePartition(iter: Iterator[Product2[K, V]]): Iterator[JHashMap[K, V]] = { val map = new JHashMap[K, V] for ((k, v) <- iter) { val old = map.get(k) @@ -180,7 +176,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( val old = m1.get(k) m1.put(k, if (old == null) v else func(old, v)) } - return m1 + m1 } self.mapPartitions(reducePartition).reduce(mergeMaps) @@ -378,7 +374,13 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( /** * Return the key-value pairs in this RDD to the master as a Map. */ - def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*) + def collectAsMap(): Map[K, V] = { + val data = self.toArray() + val map = new mutable.HashMap[K, V] + map.sizeHint(data.length) + data.foreach { case(k, v) => map.put(k, v) } + map + } /** * Pass each value in the key-value pair RDD through a map function without changing the keys; @@ -406,13 +408,10 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) { throw new SparkException("Default partitioner cannot partition array keys.") } - val cg = new CoGroupedRDD[K]( - Seq(self.asInstanceOf[RDD[(K, _)]], other.asInstanceOf[RDD[(K, _)]]), - partitioner) + val cg = new CoGroupedRDD[K](Seq(self, other), partitioner) val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest) - prfs.mapValues { - case Seq(vs, ws) => - (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]]) + prfs.mapValues { case Seq(vs, ws) => + (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]]) } } @@ -425,15 +424,10 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) { throw new SparkException("Default partitioner cannot partition array keys.") } - val cg = new CoGroupedRDD[K]( - Seq(self.asInstanceOf[RDD[(K, _)]], - other1.asInstanceOf[RDD[(K, _)]], - other2.asInstanceOf[RDD[(K, _)]]), - partitioner) + val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner) val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest) - prfs.mapValues { - case Seq(vs, w1s, w2s) => - (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]]) + prfs.mapValues { case Seq(vs, w1s, w2s) => + (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]]) } } @@ -507,7 +501,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( self.partitioner match { case Some(p) => val index = p.getPartition(key) - def process(it: Iterator[(K, V)]): Seq[V] = { + def process(it: Iterator[Product2[K, V]]): Seq[V] = { val buf = new ArrayBuffer[V] for ((k, v) <- it if k == key) { buf += v @@ -565,7 +559,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( val formatter = new SimpleDateFormat("yyyyMMddHHmm") val jobtrackerID = formatter.format(new Date()) val stageId = self.id - def writeShard(context: spark.TaskContext, iter: Iterator[(K,V)]): Int = { + def writeShard(context: spark.TaskContext, iter: Iterator[Product2[K,V]]): Int = { // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it // around by taking a mod. We expect that no task will be attempted 2 billion times. val attemptNumber = (context.attemptId % Int.MaxValue).toInt @@ -664,7 +658,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( val writer = new HadoopWriter(conf) writer.preSetup() - def writeToFile(context: TaskContext, iter: Iterator[(K,V)]) { + def writeToFile(context: TaskContext, iter: Iterator[Product2[K,V]]) { // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it // around by taking a mod. We expect that no task will be attempted 2 billion times. val attemptNumber = (context.attemptId % Int.MaxValue).toInt @@ -703,54 +697,6 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( private[spark] def getValueClass() = implicitly[ClassManifest[V]].erasure } -/** - * Extra functions available on RDDs of (key, value) pairs where the key is sortable through - * an implicit conversion. Import `spark.SparkContext._` at the top of your program to use these - * functions. They will work with any key type that has a `scala.math.Ordered` implementation. - */ -class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest]( - self: RDD[(K, V)]) - extends Logging - with Serializable { - - /** - * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling - * `collect` or `save` on the resulting RDD will return or output an ordered list of records - * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in - * order of the keys). - */ - def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[(K,V)] = { - val shuffled = - new ShuffledRDD[K, V](self, new RangePartitioner(numPartitions, self, ascending)) - shuffled.mapPartitions(iter => { - val buf = iter.toArray - if (ascending) { - buf.sortWith((x, y) => x._1 < y._1).iterator - } else { - buf.sortWith((x, y) => x._1 > y._1).iterator - } - }, true) - } -} - -private[spark] -class MappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => U) extends RDD[(K, U)](prev) { - override def getPartitions = firstParent[(K, V)].partitions - override val partitioner = firstParent[(K, V)].partitioner - override def compute(split: Partition, context: TaskContext) = - firstParent[(K, V)].iterator(split, context).map{ case (k, v) => (k, f(v)) } -} - -private[spark] -class FlatMappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => TraversableOnce[U]) - extends RDD[(K, U)](prev) { - - override def getPartitions = firstParent[(K, V)].partitions - override val partitioner = firstParent[(K, V)].partitioner - override def compute(split: Partition, context: TaskContext) = { - firstParent[(K, V)].iterator(split, context).flatMap { case (k, v) => f(v).map(x => (k, x)) } - } -} private[spark] object Manifests { val seqSeqManifest = classManifest[Seq[Seq[_]]] diff --git a/core/src/main/scala/spark/Partitioner.scala b/core/src/main/scala/spark/Partitioner.scala index 6035bc075e..65da8235d7 100644 --- a/core/src/main/scala/spark/Partitioner.scala +++ b/core/src/main/scala/spark/Partitioner.scala @@ -84,7 +84,7 @@ class HashPartitioner(partitions: Int) extends Partitioner { */ class RangePartitioner[K <% Ordered[K]: ClassManifest, V]( partitions: Int, - @transient rdd: RDD[(K,V)], + @transient rdd: RDD[_ <: Product2[K,V]], private val ascending: Boolean = true) extends Partitioner { diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index 503ea6ccbf..04b37df212 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -287,7 +287,10 @@ abstract class RDD[T: ClassManifest]( def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T] = { if (shuffle) { // include a shuffle step so that our upstream tasks are still distributed - new CoalescedRDD(new ShuffledRDD(map(x => (x, null)), new HashPartitioner(numPartitions)), numPartitions).keys + new CoalescedRDD( + new ShuffledRDD(map(x => (x, null)), + new HashPartitioner(numPartitions)), + numPartitions).keys } else { new CoalescedRDD(this, numPartitions) } diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 80c65dfebd..c049bd3fa9 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -60,7 +60,8 @@ import org.apache.mesos.MesosNativeLibrary import spark.deploy.{LocalSparkCluster, SparkHadoopUtil} import spark.partial.{ApproximateEvaluator, PartialResult} -import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD} +import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD, + OrderedRDDFunctions} import spark.scheduler.{DAGScheduler, DAGSchedulerSource, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler, ActiveJob} import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, @@ -833,11 +834,11 @@ class SparkContext( /** Default min number of partitions for Hadoop RDDs when not given by user */ def defaultMinSplits: Int = math.min(defaultParallelism, 2) - private var nextShuffleId = new AtomicInteger(0) + private val nextShuffleId = new AtomicInteger(0) private[spark] def newShuffleId(): Int = nextShuffleId.getAndIncrement() - private var nextRddId = new AtomicInteger(0) + private val nextRddId = new AtomicInteger(0) /** Register a new RDD, returning its RDD ID */ private[spark] def newRddId(): Int = nextRddId.getAndIncrement() @@ -886,7 +887,7 @@ object SparkContext { implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest]( rdd: RDD[(K, V)]) = - new OrderedRDDFunctions(rdd) + new OrderedRDDFunctions(rdd.asInstanceOf[RDD[Product2[K, V]]]) implicit def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = new DoubleRDDFunctions(rdd) diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala index c2995b836a..f5632428e7 100644 --- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala @@ -30,17 +30,18 @@ import org.apache.hadoop.mapred.OutputFormat import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} import org.apache.hadoop.conf.Configuration -import spark.api.java.function.{Function2 => JFunction2} -import spark.api.java.function.{Function => JFunction} -import spark.partial.BoundedDouble -import spark.partial.PartialResult -import spark.OrderedRDDFunctions -import spark.storage.StorageLevel import spark.HashPartitioner import spark.Partitioner import spark.Partitioner._ import spark.RDD import spark.SparkContext.rddToPairRDDFunctions +import spark.api.java.function.{Function2 => JFunction2} +import spark.api.java.function.{Function => JFunction} +import spark.partial.BoundedDouble +import spark.partial.PartialResult +import spark.rdd.OrderedRDDFunctions +import spark.storage.StorageLevel + class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManifest[K], implicit val vManifest: ClassManifest[V]) extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] { diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala index c2d95dc060..06e15bb73c 100644 --- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala @@ -60,7 +60,7 @@ class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep]) * @param rdds parent RDDs. * @param part partitioner used to partition the shuffle output. */ -class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner) +class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner) extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) { private var serializerClass: String = null @@ -71,13 +71,13 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner) } override def getDependencies: Seq[Dependency[_]] = { - rdds.map { rdd: RDD[(K, _)] => + rdds.map { rdd: RDD[_ <: Product2[K, _]] => if (rdd.partitioner == Some(part)) { logInfo("Adding one-to-one dependency with " + rdd) new OneToOneDependency(rdd) } else { logInfo("Adding shuffle dependency with " + rdd) - new ShuffleDependency[Any, Any](rdd.asInstanceOf[RDD[(Any, Any)]], part, serializerClass) + new ShuffleDependency[Any, Any](rdd, part, serializerClass) } } } diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala index bcf7d0d89c..2eac62f9c0 100644 --- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala +++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala @@ -17,9 +17,7 @@ package spark.rdd -import spark._ -import scala.Some -import scala.Some +import spark.{Dependency, Partitioner, RDD, SparkEnv, ShuffleDependency, Partition, TaskContext} private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition { @@ -35,7 +33,7 @@ private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition { * @tparam V the value class. */ class ShuffledRDD[K, V]( - @transient var prev: RDD[(K, V)], + @transient var prev: RDD[_ <: Product2[K, V]], part: Partitioner) extends RDD[(K, V)](prev.context, Nil) { diff --git a/core/src/main/scala/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/spark/rdd/SubtractedRDD.scala index 46b8cafaac..200e85d432 100644 --- a/core/src/main/scala/spark/rdd/SubtractedRDD.scala +++ b/core/src/main/scala/spark/rdd/SubtractedRDD.scala @@ -47,8 +47,8 @@ import spark.OneToOneDependency * out of memory because of the size of `rdd2`. */ private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassManifest]( - @transient var rdd1: RDD[(K, V)], - @transient var rdd2: RDD[(K, W)], + @transient var rdd1: RDD[_ <: Product2[K, V]], + @transient var rdd2: RDD[_ <: Product2[K, W]], part: Partitioner) extends RDD[(K, V)](rdd1.context, Nil) { @@ -66,7 +66,7 @@ private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassM new OneToOneDependency(rdd) } else { logInfo("Adding shuffle dependency with " + rdd) - new ShuffleDependency(rdd.asInstanceOf[RDD[(K, Any)]], part, serializerClass) + new ShuffleDependency(rdd, part, serializerClass) } } } -- cgit v1.2.3