aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/CheckpointSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/ShuffleSuite.scala22
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala14
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala2
4 files changed, 23 insertions, 17 deletions
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index f64f3c9036..fc00458083 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -99,7 +99,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
test("ShuffledRDD") {
testRDD(rdd => {
// Creating ShuffledRDD directly as PairRDDFunctions.combineByKey produces a MapPartitionedRDD
- new ShuffledRDD[Int, Int, (Int, Int)](rdd.map(x => (x % 2, 1)), partitioner)
+ new ShuffledRDD[Int, Int, Int, (Int, Int)](rdd.map(x => (x % 2, 1)), partitioner)
})
}
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index 47112ce66d..b40fee7e9a 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -56,8 +56,11 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
}
// If the Kryo serializer is not used correctly, the shuffle would fail because the
// default Java serializer cannot handle the non serializable class.
- val c = new ShuffledRDD[Int, NonJavaSerializableClass, (Int, NonJavaSerializableClass)](
- b, new HashPartitioner(NUM_BLOCKS)).setSerializer(new KryoSerializer(conf))
+ val c = new ShuffledRDD[Int,
+ NonJavaSerializableClass,
+ NonJavaSerializableClass,
+ (Int, NonJavaSerializableClass)](b, new HashPartitioner(NUM_BLOCKS))
+ c.setSerializer(new KryoSerializer(conf))
val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleId
assert(c.count === 10)
@@ -78,8 +81,11 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
}
// If the Kryo serializer is not used correctly, the shuffle would fail because the
// default Java serializer cannot handle the non serializable class.
- val c = new ShuffledRDD[Int, NonJavaSerializableClass, (Int, NonJavaSerializableClass)](
- b, new HashPartitioner(3)).setSerializer(new KryoSerializer(conf))
+ val c = new ShuffledRDD[Int,
+ NonJavaSerializableClass,
+ NonJavaSerializableClass,
+ (Int, NonJavaSerializableClass)](b, new HashPartitioner(3))
+ c.setSerializer(new KryoSerializer(conf))
assert(c.count === 10)
}
@@ -94,7 +100,7 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
// NOTE: The default Java serializer doesn't create zero-sized blocks.
// So, use Kryo
- val c = new ShuffledRDD[Int, Int, (Int, Int)](b, new HashPartitioner(10))
+ val c = new ShuffledRDD[Int, Int, Int, (Int, Int)](b, new HashPartitioner(10))
.setSerializer(new KryoSerializer(conf))
val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleId
@@ -120,7 +126,7 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
val b = a.map(x => (x, x*2))
// NOTE: The default Java serializer should create zero-sized blocks
- val c = new ShuffledRDD[Int, Int, (Int, Int)](b, new HashPartitioner(10))
+ val c = new ShuffledRDD[Int, Int, Int, (Int, Int)](b, new HashPartitioner(10))
val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleId
assert(c.count === 4)
@@ -141,8 +147,8 @@ class ShuffleSuite extends FunSuite with Matchers with LocalSparkContext {
def p[T1, T2](_1: T1, _2: T2) = MutablePair(_1, _2)
val data = Array(p(1, 1), p(1, 2), p(1, 3), p(2, 1))
val pairs: RDD[MutablePair[Int, Int]] = sc.parallelize(data, 2)
- val results = new ShuffledRDD[Int, Int, MutablePair[Int, Int]](pairs, new HashPartitioner(2))
- .collect()
+ val results = new ShuffledRDD[Int, Int, Int, MutablePair[Int, Int]](pairs,
+ new HashPartitioner(2)).collect()
data.foreach { pair => results should contain (pair) }
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 0e5625b764..0f9cbe213e 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -276,7 +276,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
// we can optionally shuffle to keep the upstream parallel
val coalesced5 = data.coalesce(1, shuffle = true)
val isEquals = coalesced5.dependencies.head.rdd.dependencies.head.rdd.
- asInstanceOf[ShuffledRDD[_, _, _]] != null
+ asInstanceOf[ShuffledRDD[_, _, _, _]] != null
assert(isEquals)
// when shuffling, we can increase the number of partitions
@@ -509,7 +509,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
test("takeSample") {
val n = 1000000
val data = sc.parallelize(1 to n, 2)
-
+
for (num <- List(5, 20, 100)) {
val sample = data.takeSample(withReplacement=false, num=num)
assert(sample.size === num) // Got exactly num elements
@@ -704,11 +704,11 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(ancestors3.count(_.isInstanceOf[MappedRDD[_, _]]) === 2)
// Any ancestors before the shuffle are not considered
- assert(ancestors4.size === 1)
- assert(ancestors4.count(_.isInstanceOf[ShuffledRDD[_, _, _]]) === 1)
- assert(ancestors5.size === 4)
- assert(ancestors5.count(_.isInstanceOf[ShuffledRDD[_, _, _]]) === 1)
- assert(ancestors5.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 1)
+ assert(ancestors4.size === 0)
+ assert(ancestors4.count(_.isInstanceOf[ShuffledRDD[_, _, _, _]]) === 0)
+ assert(ancestors5.size === 3)
+ assert(ancestors5.count(_.isInstanceOf[ShuffledRDD[_, _, _, _]]) === 1)
+ assert(ancestors5.count(_.isInstanceOf[MapPartitionsRDD[_, _]]) === 0)
assert(ancestors5.count(_.isInstanceOf[MappedValuesRDD[_, _, _]]) === 2)
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
index abd7b22310..6df0a08096 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala
@@ -181,7 +181,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
listener.stageInfos.size should be {2} // Shuffle map stage + result stage
val stageInfo3 = listener.stageInfos.keys.find(_.stageId == 2).get
- stageInfo3.rddInfos.size should be {2} // ShuffledRDD, MapPartitionsRDD
+ stageInfo3.rddInfos.size should be {1} // ShuffledRDD
stageInfo3.rddInfos.forall(_.numPartitions == 4) should be {true}
stageInfo3.rddInfos.exists(_.name == "Trois") should be {true}
}