aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2014-06-23 20:25:46 -0700
committerMatei Zaharia <matei@databricks.com>2014-06-23 20:25:46 -0700
commit56eb8af187b19f09810baafb314b21e07cf0a79c (patch)
tree30bcca83114d3c28659876c6da593eda2241f30d /core/src/test
parent51c8168377a89d20d0b2d7b9a28af58593a0fe0c (diff)
downloadspark-56eb8af187b19f09810baafb314b21e07cf0a79c.tar.gz
spark-56eb8af187b19f09810baafb314b21e07cf0a79c.tar.bz2
spark-56eb8af187b19f09810baafb314b21e07cf0a79c.zip
[SPARK-2124] Move aggregation into shuffle implementations
This PR is a sub-task of SPARK-2044 to move the execution of aggregation into shuffle implementations. I leave `CoGoupedRDD` and `SubtractedRDD` unchanged because they have their implementations of aggregation. I'm not sure is it suitable to change these two RDDs. Also I do not move sort related code of `OrderedRDDFunctions` into shuffle, this will be solved in another sub-task. Author: jerryshao <saisai.shao@intel.com> Closes #1064 from jerryshao/SPARK-2124 and squashes the following commits: 4a05a40 [jerryshao] Modify according to comments 1f7dcc8 [jerryshao] Style changes 50a2fd6 [jerryshao] Fix test suite issue after moving aggregator to Shuffle reader and writer 1a96190 [jerryshao] Code modification related to the ShuffledRDD 308f635 [jerryshao] initial works of move combiner to ShuffleManager's reader and writer
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/CheckpointSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/ShuffleSuite.scala22
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala14
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala2
4 files changed, 23 insertions, 17 deletions
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index f64f3c9036..fc00458083 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -99,7 +99,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
test("ShuffledRDD") {
testRDD(rdd => {
// Creating ShuffledRDD directly as PairRDDFunctions.combineByKey produces a MapPartitionedRDD
- new ShuffledRDD[Int, Int, (Int, Int)](rdd.map(x => (x % 2, 1)), partitioner)
+ new ShuffledRDD[Int, Int, Int, (Int, Int)](rdd.map(x => (x % 2, 1)), partitioner)
})
}
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 47112ce66d..b40fee7e9a 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -56,8 +56,11 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
}
// If the Kryo serializer is not used correctly, the shuffle would fail because the
// default Java serializer cannot handle the non serializable class.
- val c = new ShuffledRDD[Int, NonJavaSerializableClass, (Int, NonJavaSerializableClass)](
- b, new HashPartitioner(NUM_BLOCKS)).setSerializer(new KryoSerializer(conf))
+ val c = new ShuffledRDD[Int,
+ NonJavaSerializableClass,
+ NonJavaSerializableClass,
+ (Int, NonJavaSerializableClass)](b, new HashPartitioner(NUM_BLOCKS))
+ c.setSerializer(new KryoSerializer(conf))
val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleId
assert(c.count === 10)
@@ -78,8 +81,11 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
}
// If the Kryo serializer is not used correctly, the shuffle would fail because the
// default Java serializer cannot handle the non serializable class.
- val c = new ShuffledRDD[Int, NonJavaSerializableClass, (Int, NonJavaSerializableClass)](
- b, new HashPartitioner(3)).setSerializer(new KryoSerializer(conf))
+ val c = new ShuffledRDD[Int,
+ NonJavaSerializableClass,
+ NonJavaSerializableClass,
+ (Int, NonJavaSerializableClass)](b, new HashPartitioner(3))
+ c.setSerializer(new KryoSerializer(conf))
assert(c.count === 10)
}
@@ -94,7 +100,7 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
// NOTE: The default Java serializer doesn't create zero-sized blocks.
// So, use Kryo
- val c = new ShuffledRDD[Int, Int, (Int, Int)](b, new HashPartitioner(10))
+ val c = new ShuffledRDD[Int, Int, Int, (Int, Int)](b, new HashPartitioner(10))
.setSerializer(new KryoSerializer(conf))
val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleId
@@ -120,7 +126,7 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
val b = a.map(x => (x, x*2))
// NOTE: The default Java serializer should create zero-sized blocks
- val c = new ShuffledRDD[Int, Int, (Int, Int)](b, new HashPartitioner(10))
+ val c = new ShuffledRDD[Int, Int, Int, (Int, Int)](b, new HashPartitioner(10))
val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleId
assert(c.count === 4)
@@ -141,8 +147,8 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
val data = Array(p(1, 1), p(1, 2), p(1, 3), p(2, 1))
val pairs: RDD[MutablePair[Int, Int]] = sc.parallelize(data, 2)
- val results = new ShuffledRDD[Int, Int, MutablePair[Int, Int]](pairs, new HashPartitioner(2))
- .collect()
+ val results = new ShuffledRDD[Int, Int, Int, MutablePair[Int, Int]](pairs,
+ new HashPartitioner(2)).collect()
data.foreach { pair => results should contain (pair) }
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 0e5625b764..0f9cbe213e 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -276,7 +276,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
// we can optionally shuffle to keep the upstream parallel
val coalesced5 = data.coalesce(1, shuffle = true)
val isEquals = coalesced5.dependencies.head.rdd.dependencies.head.rdd.
- asInstanceOf[ShuffledRDD[_, _, _]] != null
+ asInstanceOf[ShuffledRDD[_, _, _, _]] != null
assert(isEquals)
// when shuffling, we can increase the number of partitions
@@ -509,7 +509,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
test("takeSample") {
val n = 1000000
val data = sc.parallelize(1 to n, 2)
-
+
for (num <- List(5, 20, 100)) {
val sample = data.takeSample(withReplacement=false, num=num)
assert(sample.size === num) // Got exactly num elements
@@ -704,11 +704,11 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(ancestors3.count(_.isInstanceOf[MappedRDD[_, _]]) === 2)
// Any ancestors before the shuffle are not considered
- assert(ancestors4.size === 1)
- assert(ancestors4.count(_.isInstanceOf[ShuffledRDD[_, _, _]]) === 1)
- assert(ancestors5.size === 4)
- assert(ancestors5.count(_.isInstanceOf[ShuffledRDD[_, _, _]]) === 1)
- assert(ancestors5.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 1)
+ assert(ancestors4.size === 0)
+ assert(ancestors4.count(_.isInstanceOf[ShuffledRDD[_, _, _, _]]) === 0)
+ assert(ancestors5.size === 3)
+ assert(ancestors5.count(_.isInstanceOf[ShuffledRDD[_, _, _, _]]) === 1)
+ assert(ancestors5.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 0)
assert(ancestors5.count(_.isInstanceOf[MappedValuesRDD[_, _, _]]) === 2)
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index abd7b22310..6df0a08096 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -181,7 +181,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.stageInfos.size should be {2} // Shuffle map stage + result stage
val stageInfo3 = listener.stageInfos.keys.find(_.stageId == 2).get
- stageInfo3.rddInfos.size should be {2} // ShuffledRDD, MapPartitionsRDD
+ stageInfo3.rddInfos.size should be {1} // ShuffledRDD
stageInfo3.rddInfos.forall(_.numPartitions == 4) should be {true}
stageInfo3.rddInfos.exists(_.name == "Trois") should be {true}
}