aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <reynoldx@gmail.com>2013-08-18 20:25:45 -0700
committerReynold Xin <reynoldx@gmail.com>2013-08-18 20:25:45 -0700
commit82bf4c0339808f51c9cdffa6a0a829cb5981d92d (patch)
treef856c38556c96384033dda5f3093581f08f8fa70 /core
parent1e137a5a21b67d0fb74e28e47b8d8b1b4ede931c (diff)
downloadspark-82bf4c0339808f51c9cdffa6a0a829cb5981d92d.tar.gz
spark-82bf4c0339808f51c9cdffa6a0a829cb5981d92d.tar.bz2
spark-82bf4c0339808f51c9cdffa6a0a829cb5981d92d.zip
Allow subclasses of Product2 in all key-value related classes (ShuffleDependency, PairRDDFunctions, etc).
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/Aggregator.scala8
-rw-r--r--core/src/main/scala/spark/Dependency.scala4
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala104
-rw-r--r--core/src/main/scala/spark/Partitioner.scala2
-rw-r--r--core/src/main/scala/spark/RDD.scala5
-rw-r--r--core/src/main/scala/spark/SparkContext.scala9
-rw-r--r--core/src/main/scala/spark/api/java/JavaPairRDD.scala13
-rw-r--r--core/src/main/scala/spark/rdd/CoGroupedRDD.scala6
-rw-r--r--core/src/main/scala/spark/rdd/ShuffledRDD.scala6
-rw-r--r--core/src/main/scala/spark/rdd/SubtractedRDD.scala6
10 files changed, 56 insertions, 107 deletions
diff --git a/core/src/main/scala/spark/Aggregator.scala b/core/src/main/scala/spark/Aggregator.scala
index 136b4da61e..3920f8511c 100644
--- a/core/src/main/scala/spark/Aggregator.scala
+++ b/core/src/main/scala/spark/Aggregator.scala
@@ -28,11 +28,11 @@ import scala.collection.JavaConversions._
* @param mergeCombiners function to merge outputs from multiple mergeValue function.
*/
case class Aggregator[K, V, C] (
- val createCombiner: V => C,
- val mergeValue: (C, V) => C,
- val mergeCombiners: (C, C) => C) {
+ createCombiner: V => C,
+ mergeValue: (C, V) => C,
+ mergeCombiners: (C, C) => C) {
- def combineValuesByKey(iter: Iterator[(K, V)]) : Iterator[(K, C)] = {
+ def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
val combiners = new JHashMap[K, C]
for ((k, v) <- iter) {
val oldC = combiners.get(k)
diff --git a/core/src/main/scala/spark/Dependency.scala b/core/src/main/scala/spark/Dependency.scala
index b1edaa06f8..d5a9606570 100644
--- a/core/src/main/scala/spark/Dependency.scala
+++ b/core/src/main/scala/spark/Dependency.scala
@@ -44,10 +44,10 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {
* @param serializerClass class name of the serializer to use
*/
class ShuffleDependency[K, V](
- @transient rdd: RDD[(K, V)],
+ @transient rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializerClass: String = null)
- extends Dependency(rdd) {
+ extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {
val shuffleId: Int = rdd.context.newShuffleId()
}
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 0be4b4feb8..3ae703ce1a 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -21,9 +21,8 @@ import java.nio.ByteBuffer
import java.util.{Date, HashMap => JHashMap}
import java.text.SimpleDateFormat
-import scala.collection.Map
+import scala.collection.{mutable, Map}
import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
import org.apache.hadoop.conf.Configuration
@@ -38,7 +37,6 @@ import org.apache.hadoop.mapred.OutputFormat
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, HadoopMapReduceUtil}
-import org.apache.hadoop.security.UserGroupInformation
import spark.partial.BoundedDouble
import spark.partial.PartialResult
@@ -50,8 +48,7 @@ import spark.Partitioner._
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
* Import `spark.SparkContext._` at the top of your program to use these functions.
*/
-class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
- self: RDD[(K, V)])
+class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[_ <: Product2[K, V]])
extends Logging
with HadoopMapReduceUtil
with Serializable {
@@ -85,18 +82,17 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
}
val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
if (self.partitioner == Some(partitioner)) {
- self.mapPartitions(aggregator.combineValuesByKey, true)
+ self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
} else if (mapSideCombine) {
- val mapSideCombined = self.mapPartitions(aggregator.combineValuesByKey, true)
- val partitioned = new ShuffledRDD[K, C](mapSideCombined, partitioner)
- .setSerializer(serializerClass)
- partitioned.mapPartitions(aggregator.combineCombinersByKey, true)
+ val combined = self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
+ val partitioned = new ShuffledRDD[K, C](combined, partitioner).setSerializer(serializerClass)
+ partitioned.mapPartitions(aggregator.combineCombinersByKey, preservesPartitioning = true)
} else {
// Don't apply map-side combiner.
// A sanity check to make sure mergeCombiners is not defined.
assert(mergeCombiners == null)
val values = new ShuffledRDD[K, V](self, partitioner).setSerializer(serializerClass)
- values.mapPartitions(aggregator.combineValuesByKey, true)
+ values.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
}
}
@@ -166,7 +162,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
throw new SparkException("reduceByKeyLocally() does not support array keys")
}
- def reducePartition(iter: Iterator[(K, V)]): Iterator[JHashMap[K, V]] = {
+ def reducePartition(iter: Iterator[Product2[K, V]]): Iterator[JHashMap[K, V]] = {
val map = new JHashMap[K, V]
for ((k, v) <- iter) {
val old = map.get(k)
@@ -180,7 +176,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
val old = m1.get(k)
m1.put(k, if (old == null) v else func(old, v))
}
- return m1
+ m1
}
self.mapPartitions(reducePartition).reduce(mergeMaps)
@@ -378,7 +374,13 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
/**
* Return the key-value pairs in this RDD to the master as a Map.
*/
- def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*)
+ def collectAsMap(): Map[K, V] = {
+ val data = self.toArray()
+ val map = new mutable.HashMap[K, V]
+ map.sizeHint(data.length)
+ data.foreach { case(k, v) => map.put(k, v) }
+ map
+ }
/**
* Pass each value in the key-value pair RDD through a map function without changing the keys;
@@ -406,13 +408,10 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
- val cg = new CoGroupedRDD[K](
- Seq(self.asInstanceOf[RDD[(K, _)]], other.asInstanceOf[RDD[(K, _)]]),
- partitioner)
+ val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
- prfs.mapValues {
- case Seq(vs, ws) =>
- (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
+ prfs.mapValues { case Seq(vs, ws) =>
+ (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
}
}
@@ -425,15 +424,10 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
- val cg = new CoGroupedRDD[K](
- Seq(self.asInstanceOf[RDD[(K, _)]],
- other1.asInstanceOf[RDD[(K, _)]],
- other2.asInstanceOf[RDD[(K, _)]]),
- partitioner)
+ val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
- prfs.mapValues {
- case Seq(vs, w1s, w2s) =>
- (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]])
+ prfs.mapValues { case Seq(vs, w1s, w2s) =>
+ (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]])
}
}
@@ -507,7 +501,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
self.partitioner match {
case Some(p) =>
val index = p.getPartition(key)
- def process(it: Iterator[(K, V)]): Seq[V] = {
+ def process(it: Iterator[Product2[K, V]]): Seq[V] = {
val buf = new ArrayBuffer[V]
for ((k, v) <- it if k == key) {
buf += v
@@ -565,7 +559,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
val formatter = new SimpleDateFormat("yyyyMMddHHmm")
val jobtrackerID = formatter.format(new Date())
val stageId = self.id
- def writeShard(context: spark.TaskContext, iter: Iterator[(K,V)]): Int = {
+ def writeShard(context: spark.TaskContext, iter: Iterator[Product2[K,V]]): Int = {
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
@@ -664,7 +658,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
val writer = new HadoopWriter(conf)
writer.preSetup()
- def writeToFile(context: TaskContext, iter: Iterator[(K,V)]) {
+ def writeToFile(context: TaskContext, iter: Iterator[Product2[K,V]]) {
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
@@ -703,54 +697,6 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
private[spark] def getValueClass() = implicitly[ClassManifest[V]].erasure
}
-/**
- * Extra functions available on RDDs of (key, value) pairs where the key is sortable through
- * an implicit conversion. Import `spark.SparkContext._` at the top of your program to use these
- * functions. They will work with any key type that has a `scala.math.Ordered` implementation.
- */
-class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
- self: RDD[(K, V)])
- extends Logging
- with Serializable {
-
- /**
- * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
- * `collect` or `save` on the resulting RDD will return or output an ordered list of records
- * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
- * order of the keys).
- */
- def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[(K,V)] = {
- val shuffled =
- new ShuffledRDD[K, V](self, new RangePartitioner(numPartitions, self, ascending))
- shuffled.mapPartitions(iter => {
- val buf = iter.toArray
- if (ascending) {
- buf.sortWith((x, y) => x._1 < y._1).iterator
- } else {
- buf.sortWith((x, y) => x._1 > y._1).iterator
- }
- }, true)
- }
-}
-
-private[spark]
-class MappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => U) extends RDD[(K, U)](prev) {
- override def getPartitions = firstParent[(K, V)].partitions
- override val partitioner = firstParent[(K, V)].partitioner
- override def compute(split: Partition, context: TaskContext) =
- firstParent[(K, V)].iterator(split, context).map{ case (k, v) => (k, f(v)) }
-}
-
-private[spark]
-class FlatMappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => TraversableOnce[U])
- extends RDD[(K, U)](prev) {
-
- override def getPartitions = firstParent[(K, V)].partitions
- override val partitioner = firstParent[(K, V)].partitioner
- override def compute(split: Partition, context: TaskContext) = {
- firstParent[(K, V)].iterator(split, context).flatMap { case (k, v) => f(v).map(x => (k, x)) }
- }
-}
private[spark] object Manifests {
val seqSeqManifest = classManifest[Seq[Seq[_]]]
diff --git a/core/src/main/scala/spark/Partitioner.scala b/core/src/main/scala/spark/Partitioner.scala
index 6035bc075e..65da8235d7 100644
--- a/core/src/main/scala/spark/Partitioner.scala
+++ b/core/src/main/scala/spark/Partitioner.scala
@@ -84,7 +84,7 @@ class HashPartitioner(partitions: Int) extends Partitioner {
*/
class RangePartitioner[K <% Ordered[K]: ClassManifest, V](
partitions: Int,
- @transient rdd: RDD[(K,V)],
+ @transient rdd: RDD[_ <: Product2[K,V]],
private val ascending: Boolean = true)
extends Partitioner {
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 503ea6ccbf..04b37df212 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -287,7 +287,10 @@ abstract class RDD[T: ClassManifest](
def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T] = {
if (shuffle) {
// include a shuffle step so that our upstream tasks are still distributed
- new CoalescedRDD(new ShuffledRDD(map(x => (x, null)), new HashPartitioner(numPartitions)), numPartitions).keys
+ new CoalescedRDD(
+ new ShuffledRDD(map(x => (x, null)),
+ new HashPartitioner(numPartitions)),
+ numPartitions).keys
} else {
new CoalescedRDD(this, numPartitions)
}
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 80c65dfebd..c049bd3fa9 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -60,7 +60,8 @@ import org.apache.mesos.MesosNativeLibrary
import spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import spark.partial.{ApproximateEvaluator, PartialResult}
-import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD}
+import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD,
+ OrderedRDDFunctions}
import spark.scheduler.{DAGScheduler, DAGSchedulerSource, ResultTask, ShuffleMapTask, SparkListener,
SplitInfo, Stage, StageInfo, TaskScheduler, ActiveJob}
import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend,
@@ -833,11 +834,11 @@ class SparkContext(
/** Default min number of partitions for Hadoop RDDs when not given by user */
def defaultMinSplits: Int = math.min(defaultParallelism, 2)
- private var nextShuffleId = new AtomicInteger(0)
+ private val nextShuffleId = new AtomicInteger(0)
private[spark] def newShuffleId(): Int = nextShuffleId.getAndIncrement()
- private var nextRddId = new AtomicInteger(0)
+ private val nextRddId = new AtomicInteger(0)
/** Register a new RDD, returning its RDD ID */
private[spark] def newRddId(): Int = nextRddId.getAndIncrement()
@@ -886,7 +887,7 @@ object SparkContext {
implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
rdd: RDD[(K, V)]) =
- new OrderedRDDFunctions(rdd)
+ new OrderedRDDFunctions(rdd.asInstanceOf[RDD[Product2[K, V]]])
implicit def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = new DoubleRDDFunctions(rdd)
diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
index c2995b836a..f5632428e7 100644
--- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
@@ -30,17 +30,18 @@ import org.apache.hadoop.mapred.OutputFormat
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.conf.Configuration
-import spark.api.java.function.{Function2 => JFunction2}
-import spark.api.java.function.{Function => JFunction}
-import spark.partial.BoundedDouble
-import spark.partial.PartialResult
-import spark.OrderedRDDFunctions
-import spark.storage.StorageLevel
import spark.HashPartitioner
import spark.Partitioner
import spark.Partitioner._
import spark.RDD
import spark.SparkContext.rddToPairRDDFunctions
+import spark.api.java.function.{Function2 => JFunction2}
+import spark.api.java.function.{Function => JFunction}
+import spark.partial.BoundedDouble
+import spark.partial.PartialResult
+import spark.rdd.OrderedRDDFunctions
+import spark.storage.StorageLevel
+
class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManifest[K],
implicit val vManifest: ClassManifest[V]) extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] {
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
index c2d95dc060..06e15bb73c 100644
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
@@ -60,7 +60,7 @@ class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep])
* @param rdds parent RDDs.
* @param part partitioner used to partition the shuffle output.
*/
-class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner)
+class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {
private var serializerClass: String = null
@@ -71,13 +71,13 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner)
}
override def getDependencies: Seq[Dependency[_]] = {
- rdds.map { rdd: RDD[(K, _)] =>
+ rdds.map { rdd: RDD[_ <: Product2[K, _]] =>
if (rdd.partitioner == Some(part)) {
logInfo("Adding one-to-one dependency with " + rdd)
new OneToOneDependency(rdd)
} else {
logInfo("Adding shuffle dependency with " + rdd)
- new ShuffleDependency[Any, Any](rdd.asInstanceOf[RDD[(Any, Any)]], part, serializerClass)
+ new ShuffleDependency[Any, Any](rdd, part, serializerClass)
}
}
}
diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
index bcf7d0d89c..2eac62f9c0 100644
--- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala
+++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
@@ -17,9 +17,7 @@
package spark.rdd
-import spark._
-import scala.Some
-import scala.Some
+import spark.{Dependency, Partitioner, RDD, SparkEnv, ShuffleDependency, Partition, TaskContext}
private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
@@ -35,7 +33,7 @@ private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
* @tparam V the value class.
*/
class ShuffledRDD[K, V](
- @transient var prev: RDD[(K, V)],
+ @transient var prev: RDD[_ <: Product2[K, V]],
part: Partitioner)
extends RDD[(K, V)](prev.context, Nil) {
diff --git a/core/src/main/scala/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/spark/rdd/SubtractedRDD.scala
index 46b8cafaac..200e85d432 100644
--- a/core/src/main/scala/spark/rdd/SubtractedRDD.scala
+++ b/core/src/main/scala/spark/rdd/SubtractedRDD.scala
@@ -47,8 +47,8 @@ import spark.OneToOneDependency
* out of memory because of the size of `rdd2`.
*/
private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassManifest](
- @transient var rdd1: RDD[(K, V)],
- @transient var rdd2: RDD[(K, W)],
+ @transient var rdd1: RDD[_ <: Product2[K, V]],
+ @transient var rdd2: RDD[_ <: Product2[K, W]],
part: Partitioner)
extends RDD[(K, V)](rdd1.context, Nil) {
@@ -66,7 +66,7 @@ private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassM
new OneToOneDependency(rdd)
} else {
logInfo("Adding shuffle dependency with " + rdd)
- new ShuffleDependency(rdd.asInstanceOf[RDD[(K, Any)]], part, serializerClass)
+ new ShuffleDependency(rdd, part, serializerClass)
}
}
}