aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorMatei Zaharia <matei@databricks.com>2014-06-11 20:45:29 -0700
committerReynold Xin <rxin@apache.org>2014-06-11 20:45:29 -0700
commit508fd371d6dbb826fd8a00787d347235b549e189 (patch)
tree2cc77c43ab7b9f30a9b1afc6f55d9438779e46ab /core/src/test
parentd9203350b06a9c737421ba244162b1365402c01b (diff)
downloadspark-508fd371d6dbb826fd8a00787d347235b549e189.tar.gz
spark-508fd371d6dbb826fd8a00787d347235b549e189.tar.bz2
spark-508fd371d6dbb826fd8a00787d347235b549e189.zip
[SPARK-2044] Pluggable interface for shuffles
This is a first cut at moving shuffle logic behind a pluggable interface, as described at https://issues.apache.org/jira/browse/SPARK-2044, to let us more easily experiment with new shuffle implementations. It moves the existing shuffle code to a class HashShuffleManager behind a general ShuffleManager interface. Two things are still missing to make this complete: * MapOutputTracker needs to be hidden behind the ShuffleManager interface; this will also require adding methods to ShuffleManager that will let the DAGScheduler interact with it as it does with the MapOutputTracker today * The code to do map-sides and reduce-side combine in ShuffledRDD, PairRDDFunctions, etc needs to be moved into the ShuffleManager's readers and writers However, some of these may also be done later after we merge the current interface. Author: Matei Zaharia <matei@databricks.com> Closes #1009 from mateiz/pluggable-shuffle and squashes the following commits: 7a09862 [Matei Zaharia] review comments be33d3f [Matei Zaharia] review comments 1513d4e [Matei Zaharia] Add ASF header ac56831 [Matei Zaharia] Bug fix and better error message 4f681ba [Matei Zaharia] Move write part of ShuffleMapTask to ShuffleManager f6f011d [Matei Zaharia] Move hash shuffle reader behind ShuffleManager interface 55c7717 [Matei Zaharia] Changed RDD code to use ShuffleReader 75cc044 [Matei Zaharia] Partial work to move hash shuffle in
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/ShuffleSuite.scala6
2 files changed, 6 insertions, 6 deletions
diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
index dc2db66df6..13b415cccb 100644
--- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala
@@ -201,7 +201,7 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
def newPairRDD = newRDD.map(_ -> 1)
def newShuffleRDD = newPairRDD.reduceByKey(_ + _)
def newBroadcast = sc.broadcast(1 to 100)
- def newRDDWithShuffleDependencies: (RDD[_], Seq[ShuffleDependency[_, _]]) = {
+ def newRDDWithShuffleDependencies: (RDD[_], Seq[ShuffleDependency[_, _, _]]) = {
def getAllDependencies(rdd: RDD[_]): Seq[Dependency[_]] = {
rdd.dependencies ++ rdd.dependencies.flatMap { dep =>
getAllDependencies(dep.rdd)
@@ -211,8 +211,8 @@ class ContextCleanerSuite extends FunSuite with BeforeAndAfter with LocalSparkCo
// Get all the shuffle dependencies
val shuffleDeps = getAllDependencies(rdd)
- .filter(_.isInstanceOf[ShuffleDependency[_, _]])
- .map(_.asInstanceOf[ShuffleDependency[_, _]])
+ .filter(_.isInstanceOf[ShuffleDependency[_, _, _]])
+ .map(_.asInstanceOf[ShuffleDependency[_, _, _]])
(rdd, shuffleDeps)
}
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 7b0607dd3e..47112ce66d 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -58,7 +58,7 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
// default Java serializer cannot handle the non serializable class.
val c = new ShuffledRDD[Int, NonJavaSerializableClass, (Int, NonJavaSerializableClass)](
b, new HashPartitioner(NUM_BLOCKS)).setSerializer(new KryoSerializer(conf))
- val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int]].shuffleId
+ val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleId
assert(c.count === 10)
@@ -97,7 +97,7 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
val c = new ShuffledRDD[Int, Int, (Int, Int)](b, new HashPartitioner(10))
.setSerializer(new KryoSerializer(conf))
- val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int]].shuffleId
+ val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleId
assert(c.count === 4)
val blockSizes = (0 until NUM_BLOCKS).flatMap { id =>
@@ -122,7 +122,7 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
// NOTE: The default Java serializer should create zero-sized blocks
val c = new ShuffledRDD[Int, Int, (Int, Int)](b, new HashPartitioner(10))
- val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[Int, Int]].shuffleId
+ val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleId
assert(c.count === 4)
val blockSizes = (0 until NUM_BLOCKS).flatMap { id =>