aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei.zaharia@gmail.com>2013-08-19 18:30:56 -0700
committerMatei Zaharia <matei.zaharia@gmail.com>2013-08-19 18:30:56 -0700
commitabdc1f8bbb20f3d5c3cf95cb5fe7207673125710 (patch)
tree5887ae715e53534001c78b35ec03db51f2c176bf /core
parent8fa0747978e2600705657ba21f8d1ccef37dc722 (diff)
parent5054abd41b4bac4b7c8159dc23c7ee15aeb7ef2a (diff)
downloadspark-abdc1f8bbb20f3d5c3cf95cb5fe7207673125710.tar.gz
spark-abdc1f8bbb20f3d5c3cf95cb5fe7207673125710.tar.bz2
spark-abdc1f8bbb20f3d5c3cf95cb5fe7207673125710.zip
Merge pull request #847 from rxin/rdd
Allow subclasses of Product2 in all key-value related classes
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/Aggregator.scala18
-rw-r--r--core/src/main/scala/spark/BlockStoreShuffleFetcher.scala11
-rw-r--r--core/src/main/scala/spark/Dependency.scala4
-rw-r--r--core/src/main/scala/spark/PairRDDFunctions.scala136
-rw-r--r--core/src/main/scala/spark/Partitioner.scala2
-rw-r--r--core/src/main/scala/spark/RDD.scala12
-rw-r--r--core/src/main/scala/spark/ShuffleFetcher.scala5
-rw-r--r--core/src/main/scala/spark/SparkContext.scala24
-rw-r--r--core/src/main/scala/spark/api/java/JavaPairRDD.scala15
-rw-r--r--core/src/main/scala/spark/rdd/CoGroupedRDD.scala18
-rw-r--r--core/src/main/scala/spark/rdd/FlatMappedValuesRDD.scala36
-rw-r--r--core/src/main/scala/spark/rdd/MappedValuesRDD.scala34
-rw-r--r--core/src/main/scala/spark/rdd/OrderedRDDFunctions.scala51
-rw-r--r--core/src/main/scala/spark/rdd/ShuffledRDD.scala16
-rw-r--r--core/src/main/scala/spark/rdd/SubtractedRDD.scala20
-rw-r--r--core/src/main/scala/spark/scheduler/ShuffleMapTask.scala2
-rw-r--r--core/src/main/scala/spark/util/MutablePair.scala36
-rw-r--r--core/src/test/scala/spark/CheckpointSuite.scala2
-rw-r--r--core/src/test/scala/spark/PairRDDFunctionsSuite.scala7
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala2
-rw-r--r--core/src/test/scala/spark/ShuffleSuite.scala87
21 files changed, 349 insertions, 189 deletions
diff --git a/core/src/main/scala/spark/Aggregator.scala b/core/src/main/scala/spark/Aggregator.scala
index 136b4da61e..9af401986d 100644
--- a/core/src/main/scala/spark/Aggregator.scala
+++ b/core/src/main/scala/spark/Aggregator.scala
@@ -28,18 +28,18 @@ import scala.collection.JavaConversions._
* @param mergeCombiners function to merge outputs from multiple mergeValue function.
*/
case class Aggregator[K, V, C] (
- val createCombiner: V => C,
- val mergeValue: (C, V) => C,
- val mergeCombiners: (C, C) => C) {
+ createCombiner: V => C,
+ mergeValue: (C, V) => C,
+ mergeCombiners: (C, C) => C) {
- def combineValuesByKey(iter: Iterator[(K, V)]) : Iterator[(K, C)] = {
+ def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = {
val combiners = new JHashMap[K, C]
- for ((k, v) <- iter) {
- val oldC = combiners.get(k)
+ for (kv <- iter) {
+ val oldC = combiners.get(kv._1)
if (oldC == null) {
- combiners.put(k, createCombiner(v))
+ combiners.put(kv._1, createCombiner(kv._2))
} else {
- combiners.put(k, mergeValue(oldC, v))
+ combiners.put(kv._1, mergeValue(oldC, kv._2))
}
}
combiners.iterator
@@ -47,7 +47,7 @@ case class Aggregator[K, V, C] (
def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = {
val combiners = new JHashMap[K, C]
- for ((k, c) <- iter) {
+ iter.foreach { case(k, c) =>
val oldC = combiners.get(k)
if (oldC == null) {
combiners.put(k, c)
diff --git a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
index 8f6953b1f5..1ec95ed9b8 100644
--- a/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
+++ b/core/src/main/scala/spark/BlockStoreShuffleFetcher.scala
@@ -28,8 +28,9 @@ import spark.util.CompletionIterator
private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging {
- override def fetch[K, V](
- shuffleId: Int, reduceId: Int, metrics: TaskMetrics, serializer: Serializer) = {
+ override def fetch[T](shuffleId: Int, reduceId: Int, metrics: TaskMetrics, serializer: Serializer)
+ : Iterator[T] =
+ {
logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId))
val blockManager = SparkEnv.get.blockManager
@@ -49,12 +50,12 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
(address, splits.map(s => ("shuffle_%d_%d_%d".format(shuffleId, s._1, reduceId), s._2)))
}
- def unpackBlock(blockPair: (String, Option[Iterator[Any]])) : Iterator[(K, V)] = {
+ def unpackBlock(blockPair: (String, Option[Iterator[Any]])) : Iterator[T] = {
val blockId = blockPair._1
val blockOption = blockPair._2
blockOption match {
case Some(block) => {
- block.asInstanceOf[Iterator[(K, V)]]
+ block.asInstanceOf[Iterator[T]]
}
case None => {
val regex = "shuffle_([0-9]*)_([0-9]*)_([0-9]*)".r
@@ -73,7 +74,7 @@ private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Loggin
val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer)
val itr = blockFetcherItr.flatMap(unpackBlock)
- CompletionIterator[(K,V), Iterator[(K,V)]](itr, {
+ CompletionIterator[T, Iterator[T]](itr, {
val shuffleMetrics = new ShuffleReadMetrics
shuffleMetrics.shuffleFinishTime = System.currentTimeMillis
shuffleMetrics.remoteFetchTime = blockFetcherItr.remoteFetchTime
diff --git a/core/src/main/scala/spark/Dependency.scala b/core/src/main/scala/spark/Dependency.scala
index b1edaa06f8..d5a9606570 100644
--- a/core/src/main/scala/spark/Dependency.scala
+++ b/core/src/main/scala/spark/Dependency.scala
@@ -44,10 +44,10 @@ abstract class NarrowDependency[T](rdd: RDD[T]) extends Dependency(rdd) {
* @param serializerClass class name of the serializer to use
*/
class ShuffleDependency[K, V](
- @transient rdd: RDD[(K, V)],
+ @transient rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializerClass: String = null)
- extends Dependency(rdd) {
+ extends Dependency(rdd.asInstanceOf[RDD[Product2[K, V]]]) {
val shuffleId: Int = rdd.context.newShuffleId()
}
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala
index 0be4b4feb8..e7d4a7f562 100644
--- a/core/src/main/scala/spark/PairRDDFunctions.scala
+++ b/core/src/main/scala/spark/PairRDDFunctions.scala
@@ -21,9 +21,8 @@ import java.nio.ByteBuffer
import java.util.{Date, HashMap => JHashMap}
import java.text.SimpleDateFormat
-import scala.collection.Map
+import scala.collection.{mutable, Map}
import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
import org.apache.hadoop.conf.Configuration
@@ -38,7 +37,6 @@ import org.apache.hadoop.mapred.OutputFormat
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, HadoopMapReduceUtil}
-import org.apache.hadoop.security.UserGroupInformation
import spark.partial.BoundedDouble
import spark.partial.PartialResult
@@ -50,8 +48,7 @@ import spark.Partitioner._
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
* Import `spark.SparkContext._` at the top of your program to use these functions.
*/
-class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
- self: RDD[(K, V)])
+class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
extends Logging
with HadoopMapReduceUtil
with Serializable {
@@ -85,18 +82,18 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
}
val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
if (self.partitioner == Some(partitioner)) {
- self.mapPartitions(aggregator.combineValuesByKey, true)
+ self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
} else if (mapSideCombine) {
- val mapSideCombined = self.mapPartitions(aggregator.combineValuesByKey, true)
- val partitioned = new ShuffledRDD[K, C](mapSideCombined, partitioner)
+ val combined = self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
+ val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
.setSerializer(serializerClass)
- partitioned.mapPartitions(aggregator.combineCombinersByKey, true)
+ partitioned.mapPartitions(aggregator.combineCombinersByKey, preservesPartitioning = true)
} else {
// Don't apply map-side combiner.
// A sanity check to make sure mergeCombiners is not defined.
assert(mergeCombiners == null)
- val values = new ShuffledRDD[K, V](self, partitioner).setSerializer(serializerClass)
- values.mapPartitions(aggregator.combineValuesByKey, true)
+ val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass)
+ values.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true)
}
}
@@ -168,7 +165,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
def reducePartition(iter: Iterator[(K, V)]): Iterator[JHashMap[K, V]] = {
val map = new JHashMap[K, V]
- for ((k, v) <- iter) {
+ iter.foreach { case (k, v) =>
val old = map.get(k)
map.put(k, if (old == null) v else func(old, v))
}
@@ -176,11 +173,11 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
}
def mergeMaps(m1: JHashMap[K, V], m2: JHashMap[K, V]): JHashMap[K, V] = {
- for ((k, v) <- m2) {
+ m2.foreach { case (k, v) =>
val old = m1.get(k)
m1.put(k, if (old == null) v else func(old, v))
}
- return m1
+ m1
}
self.mapPartitions(reducePartition).reduce(mergeMaps)
@@ -240,7 +237,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
if (getKeyClass().isArray && partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
- new ShuffledRDD[K, V](self, partitioner)
+ new ShuffledRDD[K, V, (K, V)](self, partitioner)
}
/**
@@ -249,9 +246,8 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
*/
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
- this.cogroup(other, partitioner).flatMapValues {
- case (vs, ws) =>
- for (v <- vs.iterator; w <- ws.iterator) yield (v, w)
+ this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
+ for (v <- vs.iterator; w <- ws.iterator) yield (v, w)
}
}
@@ -262,13 +258,12 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* partition the output RDD.
*/
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
- this.cogroup(other, partitioner).flatMapValues {
- case (vs, ws) =>
- if (ws.isEmpty) {
- vs.iterator.map(v => (v, None))
- } else {
- for (v <- vs.iterator; w <- ws.iterator) yield (v, Some(w))
- }
+ this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
+ if (ws.isEmpty) {
+ vs.iterator.map(v => (v, None))
+ } else {
+ for (v <- vs.iterator; w <- ws.iterator) yield (v, Some(w))
+ }
}
}
@@ -280,13 +275,12 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
*/
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Option[V], W))] = {
- this.cogroup(other, partitioner).flatMapValues {
- case (vs, ws) =>
- if (vs.isEmpty) {
- ws.iterator.map(w => (None, w))
- } else {
- for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), w)
- }
+ this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
+ if (vs.isEmpty) {
+ ws.iterator.map(w => (None, w))
+ } else {
+ for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), w)
+ }
}
}
@@ -378,7 +372,13 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
/**
* Return the key-value pairs in this RDD to the master as a Map.
*/
- def collectAsMap(): Map[K, V] = HashMap(self.collect(): _*)
+ def collectAsMap(): Map[K, V] = {
+ val data = self.toArray()
+ val map = new mutable.HashMap[K, V]
+ map.sizeHint(data.length)
+ data.foreach { case (k, v) => map.put(k, v) }
+ map
+ }
/**
* Pass each value in the key-value pair RDD through a map function without changing the keys;
@@ -406,13 +406,10 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
- val cg = new CoGroupedRDD[K](
- Seq(self.asInstanceOf[RDD[(K, _)]], other.asInstanceOf[RDD[(K, _)]]),
- partitioner)
+ val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
- prfs.mapValues {
- case Seq(vs, ws) =>
- (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
+ prfs.mapValues { case Seq(vs, ws) =>
+ (vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
}
}
@@ -425,15 +422,10 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
- val cg = new CoGroupedRDD[K](
- Seq(self.asInstanceOf[RDD[(K, _)]],
- other1.asInstanceOf[RDD[(K, _)]],
- other2.asInstanceOf[RDD[(K, _)]]),
- partitioner)
+ val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
- prfs.mapValues {
- case Seq(vs, w1s, w2s) =>
- (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]])
+ prfs.mapValues { case Seq(vs, w1s, w2s) =>
+ (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]])
}
}
@@ -664,7 +656,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
val writer = new HadoopWriter(conf)
writer.preSetup()
- def writeToFile(context: TaskContext, iter: Iterator[(K,V)]) {
+ def writeToFile(context: TaskContext, iter: Iterator[(K, V)]) {
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
@@ -703,54 +695,6 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
private[spark] def getValueClass() = implicitly[ClassManifest[V]].erasure
}
-/**
- * Extra functions available on RDDs of (key, value) pairs where the key is sortable through
- * an implicit conversion. Import `spark.SparkContext._` at the top of your program to use these
- * functions. They will work with any key type that has a `scala.math.Ordered` implementation.
- */
-class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
- self: RDD[(K, V)])
- extends Logging
- with Serializable {
-
- /**
- * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
- * `collect` or `save` on the resulting RDD will return or output an ordered list of records
- * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
- * order of the keys).
- */
- def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[(K,V)] = {
- val shuffled =
- new ShuffledRDD[K, V](self, new RangePartitioner(numPartitions, self, ascending))
- shuffled.mapPartitions(iter => {
- val buf = iter.toArray
- if (ascending) {
- buf.sortWith((x, y) => x._1 < y._1).iterator
- } else {
- buf.sortWith((x, y) => x._1 > y._1).iterator
- }
- }, true)
- }
-}
-
-private[spark]
-class MappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => U) extends RDD[(K, U)](prev) {
- override def getPartitions = firstParent[(K, V)].partitions
- override val partitioner = firstParent[(K, V)].partitioner
- override def compute(split: Partition, context: TaskContext) =
- firstParent[(K, V)].iterator(split, context).map{ case (k, v) => (k, f(v)) }
-}
-
-private[spark]
-class FlatMappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => TraversableOnce[U])
- extends RDD[(K, U)](prev) {
-
- override def getPartitions = firstParent[(K, V)].partitions
- override val partitioner = firstParent[(K, V)].partitioner
- override def compute(split: Partition, context: TaskContext) = {
- firstParent[(K, V)].iterator(split, context).flatMap { case (k, v) => f(v).map(x => (k, x)) }
- }
-}
private[spark] object Manifests {
val seqSeqManifest = classManifest[Seq[Seq[_]]]
diff --git a/core/src/main/scala/spark/Partitioner.scala b/core/src/main/scala/spark/Partitioner.scala
index 6035bc075e..65da8235d7 100644
--- a/core/src/main/scala/spark/Partitioner.scala
+++ b/core/src/main/scala/spark/Partitioner.scala
@@ -84,7 +84,7 @@ class HashPartitioner(partitions: Int) extends Partitioner {
*/
class RangePartitioner[K <% Ordered[K]: ClassManifest, V](
partitions: Int,
- @transient rdd: RDD[(K,V)],
+ @transient rdd: RDD[_ <: Product2[K,V]],
private val ascending: Boolean = true)
extends Partitioner {
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index 503ea6ccbf..c9a044afab 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -31,9 +31,8 @@ import org.apache.hadoop.mapred.TextOutputFormat
import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
-import spark.api.java.JavaRDD
-import spark.broadcast.Broadcast
import spark.Partitioner._
+import spark.api.java.JavaRDD
import spark.partial.BoundedDouble
import spark.partial.CountEvaluator
import spark.partial.GroupedCountEvaluator
@@ -287,7 +286,10 @@ abstract class RDD[T: ClassManifest](
def coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T] = {
if (shuffle) {
// include a shuffle step so that our upstream tasks are still distributed
- new CoalescedRDD(new ShuffledRDD(map(x => (x, null)), new HashPartitioner(numPartitions)), numPartitions).keys
+ new CoalescedRDD(
+ new ShuffledRDD[T, Null, (T, Null)](map(x => (x, null)),
+ new HashPartitioner(numPartitions)),
+ numPartitions).keys
} else {
new CoalescedRDD(this, numPartitions)
}
@@ -302,8 +304,8 @@ abstract class RDD[T: ClassManifest](
def takeSample(withReplacement: Boolean, num: Int, seed: Int): Array[T] = {
var fraction = 0.0
var total = 0
- var multiplier = 3.0
- var initialCount = this.count()
+ val multiplier = 3.0
+ val initialCount = this.count()
var maxSelected = 0
if (num < 0) {
diff --git a/core/src/main/scala/spark/ShuffleFetcher.scala b/core/src/main/scala/spark/ShuffleFetcher.scala
index dcced035e7..a6839cf7a4 100644
--- a/core/src/main/scala/spark/ShuffleFetcher.scala
+++ b/core/src/main/scala/spark/ShuffleFetcher.scala
@@ -22,12 +22,13 @@ import spark.serializer.Serializer
private[spark] abstract class ShuffleFetcher {
+
/**
* Fetch the shuffle outputs for a given ShuffleDependency.
* @return An iterator over the elements of the fetched shuffle outputs.
*/
- def fetch[K, V](shuffleId: Int, reduceId: Int, metrics: TaskMetrics,
- serializer: Serializer = SparkEnv.get.serializerManager.default): Iterator[(K,V)]
+ def fetch[T](shuffleId: Int, reduceId: Int, metrics: TaskMetrics,
+ serializer: Serializer = SparkEnv.get.serializerManager.default): Iterator[T]
/** Stop the fetcher */
def stop() {}
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 80c65dfebd..185c76366f 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -20,19 +20,14 @@ package spark
import java.io._
import java.net.URI
import java.util.Properties
-import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
-import scala.collection.JavaConversions._
import scala.collection.Map
import scala.collection.generic.Growable
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashMap
import scala.util.DynamicVariable
-import scala.collection.mutable.{ConcurrentMap, HashMap}
-
-import akka.actor.Actor._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@@ -54,23 +49,22 @@ import org.apache.hadoop.mapred.TextInputFormat
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.{Job => NewHadoopJob}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
-import org.apache.hadoop.security.UserGroupInformation
import org.apache.mesos.MesosNativeLibrary
import spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import spark.partial.{ApproximateEvaluator, PartialResult}
-import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD}
+import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD,
+ OrderedRDDFunctions}
import spark.scheduler.{DAGScheduler, DAGSchedulerSource, ResultTask, ShuffleMapTask, SparkListener,
- SplitInfo, Stage, StageInfo, TaskScheduler, ActiveJob}
+ SplitInfo, Stage, StageInfo, TaskScheduler}
import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend,
ClusterScheduler, Schedulable, SchedulingMode}
import spark.scheduler.local.LocalScheduler
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import spark.storage.{StorageStatus, StorageUtils, RDDInfo, BlockManagerSource}
+import spark.ui.SparkUI
import spark.util.{MetadataCleaner, TimeStampedHashMap}
-import ui.{SparkUI}
-import spark.metrics._
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -833,11 +827,11 @@ class SparkContext(
/** Default min number of partitions for Hadoop RDDs when not given by user */
def defaultMinSplits: Int = math.min(defaultParallelism, 2)
- private var nextShuffleId = new AtomicInteger(0)
+ private val nextShuffleId = new AtomicInteger(0)
private[spark] def newShuffleId(): Int = nextShuffleId.getAndIncrement()
- private var nextRddId = new AtomicInteger(0)
+ private val nextRddId = new AtomicInteger(0)
/** Register a new RDD, returning its RDD ID */
private[spark] def newRddId(): Int = nextRddId.getAndIncrement()
@@ -886,7 +880,7 @@ object SparkContext {
implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
rdd: RDD[(K, V)]) =
- new OrderedRDDFunctions(rdd)
+ new OrderedRDDFunctions[K, V, (K, V)](rdd)
implicit def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = new DoubleRDDFunctions(rdd)
diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
index c2995b836a..effe6e5e0d 100644
--- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
@@ -30,17 +30,18 @@ import org.apache.hadoop.mapred.OutputFormat
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.conf.Configuration
-import spark.api.java.function.{Function2 => JFunction2}
-import spark.api.java.function.{Function => JFunction}
-import spark.partial.BoundedDouble
-import spark.partial.PartialResult
-import spark.OrderedRDDFunctions
-import spark.storage.StorageLevel
import spark.HashPartitioner
import spark.Partitioner
import spark.Partitioner._
import spark.RDD
import spark.SparkContext.rddToPairRDDFunctions
+import spark.api.java.function.{Function2 => JFunction2}
+import spark.api.java.function.{Function => JFunction}
+import spark.partial.BoundedDouble
+import spark.partial.PartialResult
+import spark.rdd.OrderedRDDFunctions
+import spark.storage.StorageLevel
+
class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManifest[K],
implicit val vManifest: ClassManifest[V]) extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] {
@@ -563,7 +564,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
override def compare(b: K) = comp.compare(a, b)
}
implicit def toOrdered(x: K): Ordered[K] = new KeyOrdering(x)
- fromRDD(new OrderedRDDFunctions(rdd).sortByKey(ascending))
+ fromRDD(new OrderedRDDFunctions[K, V, (K, V)](rdd).sortByKey(ascending))
}
/**
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
index c2d95dc060..01b6c23dcc 100644
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
@@ -60,7 +60,7 @@ class CoGroupPartition(idx: Int, val deps: Array[CoGroupSplitDep])
* @param rdds parent RDDs.
* @param part partitioner used to partition the shuffle output.
*/
-class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner)
+class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) {
private var serializerClass: String = null
@@ -71,13 +71,13 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner)
}
override def getDependencies: Seq[Dependency[_]] = {
- rdds.map { rdd: RDD[(K, _)] =>
+ rdds.map { rdd: RDD[_ <: Product2[K, _]] =>
if (rdd.partitioner == Some(part)) {
- logInfo("Adding one-to-one dependency with " + rdd)
+ logDebug("Adding one-to-one dependency with " + rdd)
new OneToOneDependency(rdd)
} else {
- logInfo("Adding shuffle dependency with " + rdd)
- new ShuffleDependency[Any, Any](rdd.asInstanceOf[RDD[(Any, Any)]], part, serializerClass)
+ logDebug("Adding shuffle dependency with " + rdd)
+ new ShuffleDependency[Any, Any](rdd, part, serializerClass)
}
}
}
@@ -122,15 +122,15 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(K, _)]], part: Partitioner)
for ((dep, depNum) <- split.deps.zipWithIndex) dep match {
case NarrowCoGroupSplitDep(rdd, _, itsSplit) => {
// Read them from the parent
- for ((k, v) <- rdd.iterator(itsSplit, context)) {
- getSeq(k.asInstanceOf[K])(depNum) += v
+ rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, Any]]].foreach { kv =>
+ getSeq(kv._1)(depNum) += kv._2
}
}
case ShuffleCoGroupSplitDep(shuffleId) => {
// Read map outputs of shuffle
val fetcher = SparkEnv.get.shuffleFetcher
- fetcher.fetch[K, Any](shuffleId, split.index, context.taskMetrics, ser).foreach {
- case (key, value) => getSeq(key)(depNum) += value
+ fetcher.fetch[Product2[K, Any]](shuffleId, split.index, context.taskMetrics, ser).foreach {
+ kv => getSeq(kv._1)(depNum) += kv._2
}
}
}
diff --git a/core/src/main/scala/spark/rdd/FlatMappedValuesRDD.scala b/core/src/main/scala/spark/rdd/FlatMappedValuesRDD.scala
new file mode 100644
index 0000000000..a6bdce89d8
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/FlatMappedValuesRDD.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.rdd
+
+import spark.{TaskContext, Partition, RDD}
+
+
+private[spark]
+class FlatMappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => TraversableOnce[U])
+ extends RDD[(K, U)](prev) {
+
+ override def getPartitions = firstParent[Product2[K, V]].partitions
+
+ override val partitioner = firstParent[Product2[K, V]].partitioner
+
+ override def compute(split: Partition, context: TaskContext) = {
+ firstParent[Product2[K, V]].iterator(split, context).flatMap { case Product2(k, v) =>
+ f(v).map(x => (k, x))
+ }
+ }
+}
diff --git a/core/src/main/scala/spark/rdd/MappedValuesRDD.scala b/core/src/main/scala/spark/rdd/MappedValuesRDD.scala
new file mode 100644
index 0000000000..8334e3b557
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/MappedValuesRDD.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.rdd
+
+
+import spark.{TaskContext, Partition, RDD}
+
+private[spark]
+class MappedValuesRDD[K, V, U](prev: RDD[_ <: Product2[K, V]], f: V => U)
+ extends RDD[(K, U)](prev) {
+
+ override def getPartitions = firstParent[Product2[K, U]].partitions
+
+ override val partitioner = firstParent[Product2[K, U]].partitioner
+
+ override def compute(split: Partition, context: TaskContext): Iterator[(K, U)] = {
+ firstParent[Product2[K, V]].iterator(split, context).map { case Product2(k ,v) => (k, f(v)) }
+ }
+}
diff --git a/core/src/main/scala/spark/rdd/OrderedRDDFunctions.scala b/core/src/main/scala/spark/rdd/OrderedRDDFunctions.scala
new file mode 100644
index 0000000000..9154b76035
--- /dev/null
+++ b/core/src/main/scala/spark/rdd/OrderedRDDFunctions.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.rdd
+
+import spark.{RangePartitioner, Logging, RDD}
+
+/**
+ * Extra functions available on RDDs of (key, value) pairs where the key is sortable through
+ * an implicit conversion. Import `spark.SparkContext._` at the top of your program to use these
+ * functions. They will work with any key type that has a `scala.math.Ordered` implementation.
+ */
+class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest,
+ V: ClassManifest,
+ P <: Product2[K, V] : ClassManifest](
+ self: RDD[P])
+ extends Logging with Serializable {
+
+ /**
+ * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
+ * `collect` or `save` on the resulting RDD will return or output an ordered list of records
+ * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
+ * order of the keys).
+ */
+ def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size): RDD[P] = {
+ val part = new RangePartitioner(numPartitions, self, ascending)
+ val shuffled = new ShuffledRDD[K, V, P](self, part)
+ shuffled.mapPartitions(iter => {
+ val buf = iter.toArray
+ if (ascending) {
+ buf.sortWith((x, y) => x._1 < y._1).iterator
+ } else {
+ buf.sortWith((x, y) => x._1 > y._1).iterator
+ }
+ }, preservesPartitioning = true)
+ }
+}
diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
index bcf7d0d89c..51c05af064 100644
--- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala
+++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
@@ -17,9 +17,7 @@
package spark.rdd
-import spark._
-import scala.Some
-import scala.Some
+import spark.{Dependency, Partitioner, RDD, SparkEnv, ShuffleDependency, Partition, TaskContext}
private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
@@ -34,14 +32,14 @@ private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
* @tparam K the key class.
* @tparam V the value class.
*/
-class ShuffledRDD[K, V](
- @transient var prev: RDD[(K, V)],
+class ShuffledRDD[K, V, P <: Product2[K, V] : ClassManifest](
+ @transient var prev: RDD[P],
part: Partitioner)
- extends RDD[(K, V)](prev.context, Nil) {
+ extends RDD[P](prev.context, Nil) {
private var serializerClass: String = null
- def setSerializer(cls: String): ShuffledRDD[K, V] = {
+ def setSerializer(cls: String): ShuffledRDD[K, V, P] = {
serializerClass = cls
this
}
@@ -56,9 +54,9 @@ class ShuffledRDD[K, V](
Array.tabulate[Partition](part.numPartitions)(i => new ShuffledRDDPartition(i))
}
- override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] = {
+ override def compute(split: Partition, context: TaskContext): Iterator[P] = {
val shuffledId = dependencies.head.asInstanceOf[ShuffleDependency[K, V]].shuffleId
- SparkEnv.get.shuffleFetcher.fetch[K, V](shuffledId, split.index, context.taskMetrics,
+ SparkEnv.get.shuffleFetcher.fetch[P](shuffledId, split.index, context.taskMetrics,
SparkEnv.get.serializerManager.get(serializerClass))
}
diff --git a/core/src/main/scala/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/spark/rdd/SubtractedRDD.scala
index 46b8cafaac..dadef5e17d 100644
--- a/core/src/main/scala/spark/rdd/SubtractedRDD.scala
+++ b/core/src/main/scala/spark/rdd/SubtractedRDD.scala
@@ -47,8 +47,8 @@ import spark.OneToOneDependency
* out of memory because of the size of `rdd2`.
*/
private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassManifest](
- @transient var rdd1: RDD[(K, V)],
- @transient var rdd2: RDD[(K, W)],
+ @transient var rdd1: RDD[_ <: Product2[K, V]],
+ @transient var rdd2: RDD[_ <: Product2[K, W]],
part: Partitioner)
extends RDD[(K, V)](rdd1.context, Nil) {
@@ -62,11 +62,11 @@ private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassM
override def getDependencies: Seq[Dependency[_]] = {
Seq(rdd1, rdd2).map { rdd =>
if (rdd.partitioner == Some(part)) {
- logInfo("Adding one-to-one dependency with " + rdd)
+ logDebug("Adding one-to-one dependency with " + rdd)
new OneToOneDependency(rdd)
} else {
- logInfo("Adding shuffle dependency with " + rdd)
- new ShuffleDependency(rdd.asInstanceOf[RDD[(K, Any)]], part, serializerClass)
+ logDebug("Adding shuffle dependency with " + rdd)
+ new ShuffleDependency(rdd, part, serializerClass)
}
}
}
@@ -103,16 +103,14 @@ private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassM
seq
}
}
- def integrate(dep: CoGroupSplitDep, op: ((K, V)) => Unit) = dep match {
+ def integrate(dep: CoGroupSplitDep, op: Product2[K, V] => Unit) = dep match {
case NarrowCoGroupSplitDep(rdd, _, itsSplit) => {
- for (t <- rdd.iterator(itsSplit, context))
- op(t.asInstanceOf[(K, V)])
+ rdd.iterator(itsSplit, context).asInstanceOf[Iterator[Product2[K, V]]].foreach(op)
}
case ShuffleCoGroupSplitDep(shuffleId) => {
- val iter = SparkEnv.get.shuffleFetcher.fetch(shuffleId, partition.index,
+ val iter = SparkEnv.get.shuffleFetcher.fetch[Product2[K, V]](shuffleId, partition.index,
context.taskMetrics, serializer)
- for (t <- iter)
- op(t.asInstanceOf[(K, V)])
+ iter.foreach(op)
}
}
// the first dep is rdd1; add all values to the map
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index e3bb6d1e60..121ff31121 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -148,7 +148,7 @@ private[spark] class ShuffleMapTask(
// Write the map output to its associated buckets.
for (elem <- rdd.iterator(split, taskContext)) {
- val pair = elem.asInstanceOf[(Any, Any)]
+ val pair = elem.asInstanceOf[Product2[Any, Any]]
val bucketId = dep.partitioner.getPartition(pair._1)
buckets.writers(bucketId).write(pair)
}
diff --git a/core/src/main/scala/spark/util/MutablePair.scala b/core/src/main/scala/spark/util/MutablePair.scala
new file mode 100644
index 0000000000..3063806e83
--- /dev/null
+++ b/core/src/main/scala/spark/util/MutablePair.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.util
+
+
+/**
+ * A tuple of 2 elements. This can be used as an alternative to Scala's Tuple2 when we want to
+ * minimize object allocation.
+ *
+ * @param _1 Element 1 of this MutablePair
+ * @param _2 Element 2 of this MutablePair
+ */
+case class MutablePair[@specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T1,
+ @specialized(Int, Long, Double, Char, Boolean/*, AnyRef*/) T2]
+ (var _1: T1, var _2: T2)
+ extends Product2[T1, T2]
+{
+ override def toString = "(" + _1 + "," + _2 + ")"
+
+ override def canEqual(that: Any): Boolean = that.isInstanceOf[MutablePair[T1, T2]]
+}
diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala
index a84c89e3c9..966dede2be 100644
--- a/core/src/test/scala/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/spark/CheckpointSuite.scala
@@ -99,7 +99,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
test("ShuffledRDD") {
testCheckpointing(rdd => {
// Creating ShuffledRDD directly as PairRDDFunctions.combineByKey produces a MapPartitionedRDD
- new ShuffledRDD(rdd.map(x => (x % 2, 1)), partitioner)
+ new ShuffledRDD[Int, Int, (Int, Int)](rdd.map(x => (x % 2, 1)), partitioner)
})
}
diff --git a/core/src/test/scala/spark/PairRDDFunctionsSuite.scala b/core/src/test/scala/spark/PairRDDFunctionsSuite.scala
index b102eaf4e6..328b3b5497 100644
--- a/core/src/test/scala/spark/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/spark/PairRDDFunctionsSuite.scala
@@ -21,16 +21,11 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashSet
import org.scalatest.FunSuite
-import org.scalatest.prop.Checkers
-import org.scalacheck.Arbitrary._
-import org.scalacheck.Gen
-import org.scalacheck.Prop._
import com.google.common.io.Files
-
-import spark.rdd.ShuffledRDD
import spark.SparkContext._
+
class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
test("groupByKey") {
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index cbddf4e523..75778de1cc 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -170,7 +170,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
// we can optionally shuffle to keep the upstream parallel
val coalesced5 = data.coalesce(1, shuffle = true)
- assert(coalesced5.dependencies.head.rdd.dependencies.head.rdd.asInstanceOf[ShuffledRDD[_, _]] !=
+ assert(coalesced5.dependencies.head.rdd.dependencies.head.rdd.asInstanceOf[ShuffledRDD[_, _, _]] !=
null)
}
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index c686b8cc5a..8745689c70 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -20,8 +20,11 @@ package spark
import org.scalatest.FunSuite
import org.scalatest.matchers.ShouldMatchers
-import spark.rdd.ShuffledRDD
import spark.SparkContext._
+import spark.ShuffleSuite.NonJavaSerializableClass
+import spark.rdd.{SubtractedRDD, CoGroupedRDD, OrderedRDDFunctions, ShuffledRDD}
+import spark.util.MutablePair
+
class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
test("groupByKey without compression") {
@@ -46,12 +49,12 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
val a = sc.parallelize(1 to 10, 2)
val b = a.map { x =>
- (x, new ShuffleSuite.NonJavaSerializableClass(x * 2))
+ (x, new NonJavaSerializableClass(x * 2))
}
// If the Kryo serializer is not used correctly, the shuffle would fail because the
// default Java serializer cannot handle the non serializable class.
- val c = new ShuffledRDD(b, new HashPartitioner(NUM_BLOCKS))
- .setSerializer(classOf[spark.KryoSerializer].getName)
+ val c = new ShuffledRDD[Int, NonJavaSerializableClass, (Int, NonJavaSerializableClass)](
+ b, new HashPartitioner(NUM_BLOCKS)).setSerializer(classOf[spark.KryoSerializer].getName)
val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int]].shuffleId
assert(c.count === 10)
@@ -68,12 +71,12 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
sc = new SparkContext("local-cluster[2,1,512]", "test")
val a = sc.parallelize(1 to 10, 2)
val b = a.map { x =>
- (x, new ShuffleSuite.NonJavaSerializableClass(x * 2))
+ (x, new NonJavaSerializableClass(x * 2))
}
// If the Kryo serializer is not used correctly, the shuffle would fail because the
// default Java serializer cannot handle the non serializable class.
- val c = new ShuffledRDD(b, new HashPartitioner(3))
- .setSerializer(classOf[spark.KryoSerializer].getName)
+ val c = new ShuffledRDD[Int, NonJavaSerializableClass, (Int, NonJavaSerializableClass)](
+ b, new HashPartitioner(3)).setSerializer(classOf[spark.KryoSerializer].getName)
assert(c.count === 10)
}
@@ -88,7 +91,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
// NOTE: The default Java serializer doesn't create zero-sized blocks.
// So, use Kryo
- val c = new ShuffledRDD(b, new HashPartitioner(10))
+ val c = new ShuffledRDD[Int, Int, (Int, Int)](b, new HashPartitioner(10))
.setSerializer(classOf[spark.KryoSerializer].getName)
val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int]].shuffleId
@@ -114,7 +117,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
val b = a.map(x => (x, x*2))
// NOTE: The default Java serializer should create zero-sized blocks
- val c = new ShuffledRDD(b, new HashPartitioner(10))
+ val c = new ShuffledRDD[Int, Int, (Int, Int)](b, new HashPartitioner(10))
val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int]].shuffleId
assert(c.count === 4)
@@ -128,6 +131,72 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
// We should have at most 4 non-zero sized partitions
assert(nonEmptyBlocks.size <= 4)
}
+
+ test("shuffle using mutable pairs") {
+ // Use a local cluster with 2 processes to make sure there are both local and remote blocks
+ sc = new SparkContext("local-cluster[2,1,512]", "test")
+ def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
+ val data = Array(p(1, 1), p(1, 2), p(1, 3), p(2, 1))
+ val pairs: RDD[MutablePair[Int, Int]] = sc.parallelize(data, 2)
+ val results = new ShuffledRDD[Int, Int, MutablePair[Int, Int]](pairs, new HashPartitioner(2))
+ .collect()
+
+ data.foreach { pair => results should contain (pair) }
+ }
+
+ test("sorting using mutable pairs") {
+ // This is not in SortingSuite because of the local cluster setup.
+ // Use a local cluster with 2 processes to make sure there are both local and remote blocks
+ sc = new SparkContext("local-cluster[2,1,512]", "test")
+ def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
+ val data = Array(p(1, 11), p(3, 33), p(100, 100), p(2, 22))
+ val pairs: RDD[MutablePair[Int, Int]] = sc.parallelize(data, 2)
+ val results = new OrderedRDDFunctions[Int, Int, MutablePair[Int, Int]](pairs)
+ .sortByKey().collect()
+ results(0) should be (p(1, 11))
+ results(1) should be (p(2, 22))
+ results(2) should be (p(3, 33))
+ results(3) should be (p(100, 100))
+ }
+
+ test("cogroup using mutable pairs") {
+ // Use a local cluster with 2 processes to make sure there are both local and remote blocks
+ sc = new SparkContext("local-cluster[2,1,512]", "test")
+ def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
+ val data1 = Seq(p(1, 1), p(1, 2), p(1, 3), p(2, 1))
+ val data2 = Seq(p(1, "11"), p(1, "12"), p(2, "22"), p(3, "3"))
+ val pairs1: RDD[MutablePair[Int, Int]] = sc.parallelize(data1, 2)
+ val pairs2: RDD[MutablePair[Int, String]] = sc.parallelize(data2, 2)
+ val results = new CoGroupedRDD[Int](Seq(pairs1, pairs2), new HashPartitioner(2)).collectAsMap()
+
+ assert(results(1)(0).length === 3)
+ assert(results(1)(0).contains(1))
+ assert(results(1)(0).contains(2))
+ assert(results(1)(0).contains(3))
+ assert(results(1)(1).length === 2)
+ assert(results(1)(1).contains("11"))
+ assert(results(1)(1).contains("12"))
+ assert(results(2)(0).length === 1)
+ assert(results(2)(0).contains(1))
+ assert(results(2)(1).length === 1)
+ assert(results(2)(1).contains("22"))
+ assert(results(3)(0).length === 0)
+ assert(results(3)(1).contains("3"))
+ }
+
+ test("subtract mutable pairs") {
+ // Use a local cluster with 2 processes to make sure there are both local and remote blocks
+ sc = new SparkContext("local-cluster[2,1,512]", "test")
+ def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
+ val data1 = Seq(p(1, 1), p(1, 2), p(1, 3), p(2, 1), p(3, 33))
+ val data2 = Seq(p(1, "11"), p(1, "12"), p(2, "22"))
+ val pairs1: RDD[MutablePair[Int, Int]] = sc.parallelize(data1, 2)
+ val pairs2: RDD[MutablePair[Int, String]] = sc.parallelize(data2, 2)
+ val results = new SubtractedRDD(pairs1, pairs2, new HashPartitioner(2)).collect()
+ results should have length (1)
+ // substracted rdd return results as Tuple2
+ results(0) should be ((3, 33))
+ }
}
object ShuffleSuite {