From 56eb8af187b19f09810baafb314b21e07cf0a79c Mon Sep 17 00:00:00 2001 From: jerryshao Date: Mon, 23 Jun 2014 20:25:46 -0700 Subject: [SPARK-2124] Move aggregation into shuffle implementations This PR is a sub-task of SPARK-2044 to move the execution of aggregation into shuffle implementations. I leave `CoGoupedRDD` and `SubtractedRDD` unchanged because they have their implementations of aggregation. I'm not sure is it suitable to change these two RDDs. Also I do not move sort related code of `OrderedRDDFunctions` into shuffle, this will be solved in another sub-task. Author: jerryshao Closes #1064 from jerryshao/SPARK-2124 and squashes the following commits: 4a05a40 [jerryshao] Modify according to comments 1f7dcc8 [jerryshao] Style changes 50a2fd6 [jerryshao] Fix test suite issue after moving aggregator to Shuffle reader and writer 1a96190 [jerryshao] Code modification related to the ShuffledRDD 308f635 [jerryshao] initial works of move combiner to ShuffleManager's reader and writer --- .../scala/org/apache/spark/CheckpointSuite.scala | 2 +- .../test/scala/org/apache/spark/ShuffleSuite.scala | 22 ++++++++++++++-------- .../test/scala/org/apache/spark/rdd/RDDSuite.scala | 14 +++++++------- .../spark/scheduler/SparkListenerSuite.scala | 2 +- 4 files changed, 23 insertions(+), 17 deletions(-) (limited to 'core/src/test') diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala index f64f3c9036..fc00458083 100644 --- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala @@ -99,7 +99,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { test("ShuffledRDD") { testRDD(rdd => { // Creating ShuffledRDD directly as PairRDDFunctions.combineByKey produces a MapPartitionedRDD - new ShuffledRDD[Int, Int, (Int, Int)](rdd.map(x => (x % 2, 1)), partitioner) + new ShuffledRDD[Int, Int, Int, (Int, Int)](rdd.map(x => (x % 2, 1)), partitioner) }) } diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index 47112ce66d..b40fee7e9a 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -56,8 +56,11 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext { } // If the Kryo serializer is not used correctly, the shuffle would fail because the // default Java serializer cannot handle the non serializable class. - val c = new ShuffledRDD[Int, NonJavaSerializableClass, (Int, NonJavaSerializableClass)]( - b, new HashPartitioner(NUM_BLOCKS)).setSerializer(new KryoSerializer(conf)) + val c = new ShuffledRDD[Int, + NonJavaSerializableClass, + NonJavaSerializableClass, + (Int, NonJavaSerializableClass)](b, new HashPartitioner(NUM_BLOCKS)) + c.setSerializer(new KryoSerializer(conf)) val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleId assert(c.count === 10) @@ -78,8 +81,11 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext { } // If the Kryo serializer is not used correctly, the shuffle would fail because the // default Java serializer cannot handle the non serializable class. - val c = new ShuffledRDD[Int, NonJavaSerializableClass, (Int, NonJavaSerializableClass)]( - b, new HashPartitioner(3)).setSerializer(new KryoSerializer(conf)) + val c = new ShuffledRDD[Int, + NonJavaSerializableClass, + NonJavaSerializableClass, + (Int, NonJavaSerializableClass)](b, new HashPartitioner(3)) + c.setSerializer(new KryoSerializer(conf)) assert(c.count === 10) } @@ -94,7 +100,7 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext { // NOTE: The default Java serializer doesn't create zero-sized blocks. // So, use Kryo - val c = new ShuffledRDD[Int, Int, (Int, Int)](b, new HashPartitioner(10)) + val c = new ShuffledRDD[Int, Int, Int, (Int, Int)](b, new HashPartitioner(10)) .setSerializer(new KryoSerializer(conf)) val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleId @@ -120,7 +126,7 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext { val b = a.map(x => (x, x*2)) // NOTE: The default Java serializer should create zero-sized blocks - val c = new ShuffledRDD[Int, Int, (Int, Int)](b, new HashPartitioner(10)) + val c = new ShuffledRDD[Int, Int, Int, (Int, Int)](b, new HashPartitioner(10)) val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleId assert(c.count === 4) @@ -141,8 +147,8 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext { def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2) val data = Array(p(1, 1), p(1, 2), p(1, 3), p(2, 1)) val pairs: RDD[MutablePair[Int, Int]] = sc.parallelize(data, 2) - val results = new ShuffledRDD[Int, Int, MutablePair[Int, Int]](pairs, new HashPartitioner(2)) - .collect() + val results = new ShuffledRDD[Int, Int, Int, MutablePair[Int, Int]](pairs, + new HashPartitioner(2)).collect() data.foreach { pair => results should contain (pair) } } diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 0e5625b764..0f9cbe213e 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -276,7 +276,7 @@ class RDDSuite extends FunSuite with SharedSparkContext { // we can optionally shuffle to keep the upstream parallel val coalesced5 = data.coalesce(1, shuffle = true) val isEquals = coalesced5.dependencies.head.rdd.dependencies.head.rdd. - asInstanceOf[ShuffledRDD[_, _, _]] != null + asInstanceOf[ShuffledRDD[_, _, _, _]] != null assert(isEquals) // when shuffling, we can increase the number of partitions @@ -509,7 +509,7 @@ class RDDSuite extends FunSuite with SharedSparkContext { test("takeSample") { val n = 1000000 val data = sc.parallelize(1 to n, 2) - + for (num <- List(5, 20, 100)) { val sample = data.takeSample(withReplacement=false, num=num) assert(sample.size === num) // Got exactly num elements @@ -704,11 +704,11 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(ancestors3.count(_.isInstanceOf[MappedRDD[_, _]]) === 2) // Any ancestors before the shuffle are not considered - assert(ancestors4.size === 1) - assert(ancestors4.count(_.isInstanceOf[ShuffledRDD[_, _, _]]) === 1) - assert(ancestors5.size === 4) - assert(ancestors5.count(_.isInstanceOf[ShuffledRDD[_, _, _]]) === 1) - assert(ancestors5.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 1) + assert(ancestors4.size === 0) + assert(ancestors4.count(_.isInstanceOf[ShuffledRDD[_, _, _, _]]) === 0) + assert(ancestors5.size === 3) + assert(ancestors5.count(_.isInstanceOf[ShuffledRDD[_, _, _, _]]) === 1) + assert(ancestors5.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 0) assert(ancestors5.count(_.isInstanceOf[MappedValuesRDD[_, _, _]]) === 2) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index abd7b22310..6df0a08096 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -181,7 +181,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) listener.stageInfos.size should be {2} // Shuffle map stage + result stage val stageInfo3 = listener.stageInfos.keys.find(_.stageId == 2).get - stageInfo3.rddInfos.size should be {2} // ShuffledRDD, MapPartitionsRDD + stageInfo3.rddInfos.size should be {1} // ShuffledRDD stageInfo3.rddInfos.forall(_.numPartitions == 4) should be {true} stageInfo3.rddInfos.exists(_.name == "Trois") should be {true} } -- cgit v1.2.3