aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorStephen Haberman <stephen@exigencecorp.com>2013-02-25 23:48:52 -0600
committerStephen Haberman <stephen@exigencecorp.com>2013-02-25 23:48:52 -0600
commita4adeb255c66bbbb8eb7f4abcfd2b4c63906be31 (patch)
treeb0071ea76237b2f1d882d75f1d24dcd3ecad6c17 /core/src/test
parent921be765339ac6a1b1a12672d73620855984eade (diff)
parentd6e6abece306008c50410807669596d73d6d6738 (diff)
downloadspark-a4adeb255c66bbbb8eb7f4abcfd2b4c63906be31.tar.gz
spark-a4adeb255c66bbbb8eb7f4abcfd2b4c63906be31.tar.bz2
spark-a4adeb255c66bbbb8eb7f4abcfd2b4c63906be31.zip
Merge branch 'master' into nomocks
Conflicts: core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/spark/CheckpointSuite.scala126
-rw-r--r--core/src/test/scala/spark/DistributedSuite.scala21
-rw-r--r--core/src/test/scala/spark/JavaAPISuite.java24
-rw-r--r--core/src/test/scala/spark/PartitioningSuite.scala8
-rw-r--r--core/src/test/scala/spark/RDDSuite.scala21
-rw-r--r--core/src/test/scala/spark/ShuffleSuite.scala70
-rw-r--r--core/src/test/scala/spark/SortingSuite.scala10
-rw-r--r--core/src/test/scala/spark/rdd/ParallelCollectionSplitSuite.scala (renamed from core/src/test/scala/spark/ParallelCollectionSplitSuite.scala)40
-rw-r--r--core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala21
-rw-r--r--core/src/test/scala/spark/scheduler/TaskContextSuite.scala10
10 files changed, 240 insertions, 111 deletions
diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala
index 0b74607fb8..ca385972fb 100644
--- a/core/src/test/scala/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/spark/CheckpointSuite.scala
@@ -34,7 +34,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
testCheckpointing(_.sample(false, 0.5, 0))
testCheckpointing(_.glom())
testCheckpointing(_.mapPartitions(_.map(_.toString)))
- testCheckpointing(r => new MapPartitionsWithSplitRDD(r,
+ testCheckpointing(r => new MapPartitionsWithIndexRDD(r,
(i: Int, iter: Iterator[Int]) => iter.map(_.toString), false ))
testCheckpointing(_.map(x => (x % 2, 1)).reduceByKey(_ + _).mapValues(_.toString))
testCheckpointing(_.map(x => (x % 2, 1)).reduceByKey(_ + _).flatMapValues(x => 1 to x))
@@ -43,14 +43,14 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
test("ParallelCollection") {
val parCollection = sc.makeRDD(1 to 4, 2)
- val numSplits = parCollection.splits.size
+ val numPartitions = parCollection.partitions.size
parCollection.checkpoint()
assert(parCollection.dependencies === Nil)
val result = parCollection.collect()
assert(sc.checkpointFile[Int](parCollection.getCheckpointFile.get).collect() === result)
assert(parCollection.dependencies != Nil)
- assert(parCollection.splits.length === numSplits)
- assert(parCollection.splits.toList === parCollection.checkpointData.get.getSplits.toList)
+ assert(parCollection.partitions.length === numPartitions)
+ assert(parCollection.partitions.toList === parCollection.checkpointData.get.getPartitions.toList)
assert(parCollection.collect() === result)
}
@@ -59,13 +59,13 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
val blockManager = SparkEnv.get.blockManager
blockManager.putSingle(blockId, "test", StorageLevel.MEMORY_ONLY)
val blockRDD = new BlockRDD[String](sc, Array(blockId))
- val numSplits = blockRDD.splits.size
+ val numPartitions = blockRDD.partitions.size
blockRDD.checkpoint()
val result = blockRDD.collect()
assert(sc.checkpointFile[String](blockRDD.getCheckpointFile.get).collect() === result)
assert(blockRDD.dependencies != Nil)
- assert(blockRDD.splits.length === numSplits)
- assert(blockRDD.splits.toList === blockRDD.checkpointData.get.getSplits.toList)
+ assert(blockRDD.partitions.length === numPartitions)
+ assert(blockRDD.partitions.toList === blockRDD.checkpointData.get.getPartitions.toList)
assert(blockRDD.collect() === result)
}
@@ -79,9 +79,9 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
test("UnionRDD") {
def otherRDD = sc.makeRDD(1 to 10, 1)
- // Test whether the size of UnionRDDSplits reduce in size after parent RDD is checkpointed.
+ // Test whether the size of UnionRDDPartitions reduce in size after parent RDD is checkpointed.
// Current implementation of UnionRDD has transient reference to parent RDDs,
- // so only the splits will reduce in serialized size, not the RDD.
+ // so only the partitions will reduce in serialized size, not the RDD.
testCheckpointing(_.union(otherRDD), false, true)
testParentCheckpointing(_.union(otherRDD), false, true)
}
@@ -91,21 +91,21 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
testCheckpointing(new CartesianRDD(sc, _, otherRDD))
// Test whether size of CoalescedRDD reduce in size after parent RDD is checkpointed
- // Current implementation of CoalescedRDDSplit has transient reference to parent RDD,
- // so only the RDD will reduce in serialized size, not the splits.
+ // Current implementation of CoalescedRDDPartition has transient reference to parent RDD,
+ // so only the RDD will reduce in serialized size, not the partitions.
testParentCheckpointing(new CartesianRDD(sc, _, otherRDD), true, false)
- // Test that the CartesianRDD updates parent splits (CartesianRDD.s1/s2) after
- // the parent RDD has been checkpointed and parent splits have been changed to HadoopSplits.
+ // Test that the CartesianRDD updates parent partitions (CartesianRDD.s1/s2) after
+ // the parent RDD has been checkpointed and parent partitions have been changed to HadoopPartitions.
// Note that this test is very specific to the current implementation of CartesianRDD.
val ones = sc.makeRDD(1 to 100, 10).map(x => x)
ones.checkpoint() // checkpoint that MappedRDD
val cartesian = new CartesianRDD(sc, ones, ones)
val splitBeforeCheckpoint =
- serializeDeserialize(cartesian.splits.head.asInstanceOf[CartesianSplit])
+ serializeDeserialize(cartesian.partitions.head.asInstanceOf[CartesianPartition])
cartesian.count() // do the checkpointing
val splitAfterCheckpoint =
- serializeDeserialize(cartesian.splits.head.asInstanceOf[CartesianSplit])
+ serializeDeserialize(cartesian.partitions.head.asInstanceOf[CartesianPartition])
assert(
(splitAfterCheckpoint.s1 != splitBeforeCheckpoint.s1) &&
(splitAfterCheckpoint.s2 != splitBeforeCheckpoint.s2),
@@ -114,27 +114,27 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
}
test("CoalescedRDD") {
- testCheckpointing(new CoalescedRDD(_, 2))
+ testCheckpointing(_.coalesce(2))
// Test whether size of CoalescedRDD reduce in size after parent RDD is checkpointed
- // Current implementation of CoalescedRDDSplit has transient reference to parent RDD,
- // so only the RDD will reduce in serialized size, not the splits.
- testParentCheckpointing(new CoalescedRDD(_, 2), true, false)
+ // Current implementation of CoalescedRDDPartition has transient reference to parent RDD,
+ // so only the RDD will reduce in serialized size, not the partitions.
+ testParentCheckpointing(_.coalesce(2), true, false)
- // Test that the CoalescedRDDSplit updates parent splits (CoalescedRDDSplit.parents) after
- // the parent RDD has been checkpointed and parent splits have been changed to HadoopSplits.
- // Note that this test is very specific to the current implementation of CoalescedRDDSplits
+ // Test that the CoalescedRDDPartition updates parent partitions (CoalescedRDDPartition.parents) after
+ // the parent RDD has been checkpointed and parent partitions have been changed to HadoopPartitions.
+ // Note that this test is very specific to the current implementation of CoalescedRDDPartitions
val ones = sc.makeRDD(1 to 100, 10).map(x => x)
ones.checkpoint() // checkpoint that MappedRDD
val coalesced = new CoalescedRDD(ones, 2)
val splitBeforeCheckpoint =
- serializeDeserialize(coalesced.splits.head.asInstanceOf[CoalescedRDDSplit])
+ serializeDeserialize(coalesced.partitions.head.asInstanceOf[CoalescedRDDPartition])
coalesced.count() // do the checkpointing
val splitAfterCheckpoint =
- serializeDeserialize(coalesced.splits.head.asInstanceOf[CoalescedRDDSplit])
+ serializeDeserialize(coalesced.partitions.head.asInstanceOf[CoalescedRDDPartition])
assert(
splitAfterCheckpoint.parents.head != splitBeforeCheckpoint.parents.head,
- "CoalescedRDDSplit.parents not updated after parent RDD checkpointed"
+ "CoalescedRDDPartition.parents not updated after parent RDD checkpointed"
)
}
@@ -156,30 +156,40 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)), true, false)
// Test whether size of ZippedRDD reduce in size after parent RDD is checkpointed
- // Current implementation of ZippedRDDSplit has transient references to parent RDDs,
- // so only the RDD will reduce in serialized size, not the splits.
+ // Current implementation of ZippedRDDPartitions has transient references to parent RDDs,
+ // so only the RDD will reduce in serialized size, not the partitions.
testParentCheckpointing(
rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)), true, false)
}
+ test("CheckpointRDD with zero partitions") {
+ val rdd = new BlockRDD[Int](sc, Array[String]())
+ assert(rdd.partitions.size === 0)
+ assert(rdd.isCheckpointed === false)
+ rdd.checkpoint()
+ assert(rdd.count() === 0)
+ assert(rdd.isCheckpointed === true)
+ assert(rdd.partitions.size === 0)
+ }
+
/**
* Test checkpointing of the final RDD generated by the given operation. By default,
* this method tests whether the size of serialized RDD has reduced after checkpointing or not.
- * It can also test whether the size of serialized RDD splits has reduced after checkpointing or
- * not, but this is not done by default as usually the splits do not refer to any RDD and
+ * It can also test whether the size of serialized RDD partitions has reduced after checkpointing or
+ * not, but this is not done by default as usually the partitions do not refer to any RDD and
* therefore never store the lineage.
*/
def testCheckpointing[U: ClassManifest](
op: (RDD[Int]) => RDD[U],
testRDDSize: Boolean = true,
- testRDDSplitSize: Boolean = false
+ testRDDPartitionSize: Boolean = false
) {
// Generate the final RDD using given RDD operation
val baseRDD = generateLongLineageRDD()
val operatedRDD = op(baseRDD)
val parentRDD = operatedRDD.dependencies.headOption.orNull
val rddType = operatedRDD.getClass.getSimpleName
- val numSplits = operatedRDD.splits.length
+ val numPartitions = operatedRDD.partitions.length
// Find serialized sizes before and after the checkpoint
val (rddSizeBeforeCheckpoint, splitSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
@@ -193,11 +203,11 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
// Test whether dependencies have been changed from its earlier parent RDD
assert(operatedRDD.dependencies.head.rdd != parentRDD)
- // Test whether the splits have been changed to the new Hadoop splits
- assert(operatedRDD.splits.toList === operatedRDD.checkpointData.get.getSplits.toList)
+ // Test whether the partitions have been changed to the new Hadoop partitions
+ assert(operatedRDD.partitions.toList === operatedRDD.checkpointData.get.getPartitions.toList)
- // Test whether the number of splits is same as before
- assert(operatedRDD.splits.length === numSplits)
+ // Test whether the number of partitions is same as before
+ assert(operatedRDD.partitions.length === numPartitions)
// Test whether the data in the checkpointed RDD is same as original
assert(operatedRDD.collect() === result)
@@ -215,18 +225,18 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
)
}
- // Test whether serialized size of the splits has reduced. If the splits
- // do not have any non-transient reference to another RDD or another RDD's splits, it
+ // Test whether serialized size of the partitions has reduced. If the partitions
+ // do not have any non-transient reference to another RDD or another RDD's partitions, it
// does not refer to a lineage and therefore may not reduce in size after checkpointing.
- // However, if the original splits before checkpointing do refer to a parent RDD, the splits
+ // However, if the original partitions before checkpointing do refer to a parent RDD, the partitions
// must be forgotten after checkpointing (to remove all reference to parent RDDs) and
- // replaced with the HadoopSplits of the checkpointed RDD.
- if (testRDDSplitSize) {
- logInfo("Size of " + rddType + " splits "
+ // replaced with the HadooPartitions of the checkpointed RDD.
+ if (testRDDPartitionSize) {
+ logInfo("Size of " + rddType + " partitions "
+ "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]")
assert(
splitSizeAfterCheckpoint < splitSizeBeforeCheckpoint,
- "Size of " + rddType + " splits did not reduce after checkpointing " +
+ "Size of " + rddType + " partitions did not reduce after checkpointing " +
"[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]"
)
}
@@ -235,13 +245,13 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
/**
* Test whether checkpointing of the parent of the generated RDD also
* truncates the lineage or not. Some RDDs like CoGroupedRDD hold on to its parent
- * RDDs splits. So even if the parent RDD is checkpointed and its splits changed,
- * this RDD will remember the splits and therefore potentially the whole lineage.
+ * RDDs partitions. So even if the parent RDD is checkpointed and its partitions changed,
+ * this RDD will remember the partitions and therefore potentially the whole lineage.
*/
def testParentCheckpointing[U: ClassManifest](
op: (RDD[Int]) => RDD[U],
testRDDSize: Boolean,
- testRDDSplitSize: Boolean
+ testRDDPartitionSize: Boolean
) {
// Generate the final RDD using given RDD operation
val baseRDD = generateLongLineageRDD()
@@ -250,9 +260,9 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
val rddType = operatedRDD.getClass.getSimpleName
val parentRDDType = parentRDD.getClass.getSimpleName
- // Get the splits and dependencies of the parent in case they're lazily computed
+ // Get the partitions and dependencies of the parent in case they're lazily computed
parentRDD.dependencies
- parentRDD.splits
+ parentRDD.partitions
// Find serialized sizes before and after the checkpoint
val (rddSizeBeforeCheckpoint, splitSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
@@ -275,16 +285,16 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
)
}
- // Test whether serialized size of the splits has reduced because of its parent being
- // checkpointed. If the splits do not have any non-transient reference to another RDD
- // or another RDD's splits, it does not refer to a lineage and therefore may not reduce
- // in size after checkpointing. However, if the splits do refer to the *splits* of a parent
- // RDD, then these splits must update reference to the parent RDD splits as the parent RDD's
- // splits must have changed after checkpointing.
- if (testRDDSplitSize) {
+ // Test whether serialized size of the partitions has reduced because of its parent being
+ // checkpointed. If the partitions do not have any non-transient reference to another RDD
+ // or another RDD's partitions, it does not refer to a lineage and therefore may not reduce
+ // in size after checkpointing. However, if the partitions do refer to the *partitions* of a parent
+ // RDD, then these partitions must update reference to the parent RDD partitions as the parent RDD's
+ // partitions must have changed after checkpointing.
+ if (testRDDPartitionSize) {
assert(
splitSizeAfterCheckpoint < splitSizeBeforeCheckpoint,
- "Size of " + rddType + " splits did not reduce after checkpointing parent " + parentRDDType +
+ "Size of " + rddType + " partitions did not reduce after checkpointing parent " + parentRDDType +
"[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]"
)
}
@@ -321,12 +331,12 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
}
/**
- * Get serialized sizes of the RDD and its splits, in order to test whether the size shrinks
+ * Get serialized sizes of the RDD and its partitions, in order to test whether the size shrinks
* upon checkpointing. Ignores the checkpointData field, which may grow when we checkpoint.
*/
def getSerializedSizes(rdd: RDD[_]): (Int, Int) = {
(Utils.serialize(rdd).length - Utils.serialize(rdd.checkpointData).length,
- Utils.serialize(rdd.splits).length)
+ Utils.serialize(rdd.partitions).length)
}
/**
@@ -347,7 +357,7 @@ object CheckpointSuite {
def cogroup[K, V](first: RDD[(K, V)], second: RDD[(K, V)], part: Partitioner) = {
//println("First = " + first + ", second = " + second)
new CoGroupedRDD[K](
- Seq(first.asInstanceOf[RDD[(_, _)]], second.asInstanceOf[RDD[(_, _)]]),
+ Seq(first.asInstanceOf[RDD[(K, _)]], second.asInstanceOf[RDD[(K, _)]]),
part
).asInstanceOf[RDD[(K, Seq[Seq[V]])]]
}
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala
index 0e2585daa4..caa4ba3a37 100644
--- a/core/src/test/scala/spark/DistributedSuite.scala
+++ b/core/src/test/scala/spark/DistributedSuite.scala
@@ -217,6 +217,27 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
assert(grouped.collect.size === 1)
}
}
+
+ test("recover from node failures with replication") {
+ import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity}
+ DistributedSuite.amMaster = true
+ // Using more than two nodes so we don't have a symmetric communication pattern and might
+ // cache a partially correct list of peers.
+ sc = new SparkContext("local-cluster[3,1,512]", "test")
+ for (i <- 1 to 3) {
+ val data = sc.parallelize(Seq(true, false, false, false), 4)
+ data.persist(StorageLevel.MEMORY_ONLY_2)
+
+ assert(data.count === 4)
+ assert(data.map(markNodeIfIdentity).collect.size === 4)
+ assert(data.map(failOnMarkedIdentity).collect.size === 4)
+
+ // Create a new replicated RDD to make sure that cached peer information doesn't cause
+ // problems.
+ val data2 = sc.parallelize(Seq(true, true), 2).persist(StorageLevel.MEMORY_ONLY_2)
+ assert(data2.count === 2)
+ }
+ }
}
object DistributedSuite {
diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java
index 934e4c2f67..9ffe7c5f99 100644
--- a/core/src/test/scala/spark/JavaAPISuite.java
+++ b/core/src/test/scala/spark/JavaAPISuite.java
@@ -696,4 +696,28 @@ public class JavaAPISuite implements Serializable {
JavaRDD<Integer> recovered = sc.checkpointFile(rdd.getCheckpointFile().get());
Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), recovered.collect());
}
+
+ @Test
+ public void mapOnPairRDD() {
+ JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2,3,4));
+ JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() {
+ @Override
+ public Tuple2<Integer, Integer> call(Integer i) throws Exception {
+ return new Tuple2<Integer, Integer>(i, i % 2);
+ }
+ });
+ JavaPairRDD<Integer, Integer> rdd3 = rdd2.map(
+ new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
+ @Override
+ public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> in) throws Exception {
+ return new Tuple2<Integer, Integer>(in._2(), in._1());
+ }
+ });
+ Assert.assertEquals(Arrays.asList(
+ new Tuple2<Integer, Integer>(1, 1),
+ new Tuple2<Integer, Integer>(0, 2),
+ new Tuple2<Integer, Integer>(1, 3),
+ new Tuple2<Integer, Integer>(0, 4)), rdd3.collect());
+
+ }
}
diff --git a/core/src/test/scala/spark/PartitioningSuite.scala b/core/src/test/scala/spark/PartitioningSuite.scala
index af1107cd19..60db759c25 100644
--- a/core/src/test/scala/spark/PartitioningSuite.scala
+++ b/core/src/test/scala/spark/PartitioningSuite.scala
@@ -84,10 +84,10 @@ class PartitioningSuite extends FunSuite with LocalSparkContext {
assert(grouped4.groupByKey(3).partitioner != grouped4.partitioner)
assert(grouped4.groupByKey(4).partitioner === grouped4.partitioner)
- assert(grouped2.join(grouped4).partitioner === grouped2.partitioner)
- assert(grouped2.leftOuterJoin(grouped4).partitioner === grouped2.partitioner)
- assert(grouped2.rightOuterJoin(grouped4).partitioner === grouped2.partitioner)
- assert(grouped2.cogroup(grouped4).partitioner === grouped2.partitioner)
+ assert(grouped2.join(grouped4).partitioner === grouped4.partitioner)
+ assert(grouped2.leftOuterJoin(grouped4).partitioner === grouped4.partitioner)
+ assert(grouped2.rightOuterJoin(grouped4).partitioner === grouped4.partitioner)
+ assert(grouped2.cogroup(grouped4).partitioner === grouped4.partitioner)
assert(grouped2.join(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.leftOuterJoin(reduced2).partitioner === grouped2.partitioner)
diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala
index fe7deb10d6..9739ba869b 100644
--- a/core/src/test/scala/spark/RDDSuite.scala
+++ b/core/src/test/scala/spark/RDDSuite.scala
@@ -33,6 +33,11 @@ class RDDSuite extends FunSuite with LocalSparkContext {
}
assert(partitionSumsWithSplit.collect().toList === List((0, 3), (1, 7)))
+ val partitionSumsWithIndex = nums.mapPartitionsWithIndex {
+ case(split, iter) => Iterator((split, iter.reduceLeft(_ + _)))
+ }
+ assert(partitionSumsWithIndex.collect().toList === List((0, 3), (1, 7)))
+
intercept[UnsupportedOperationException] {
nums.filter(_ > 5).reduce(_ + _)
}
@@ -97,12 +102,12 @@ class RDDSuite extends FunSuite with LocalSparkContext {
test("caching with failures") {
sc = new SparkContext("local", "test")
- val onlySplit = new Split { override def index: Int = 0 }
+ val onlySplit = new Partition { override def index: Int = 0 }
var shouldFail = true
val rdd = new RDD[Int](sc, Nil) {
- override def getSplits: Array[Split] = Array(onlySplit)
+ override def getPartitions: Array[Partition] = Array(onlySplit)
override val getDependencies = List[Dependency[_]]()
- override def compute(split: Split, context: TaskContext): Iterator[Int] = {
+ override def compute(split: Partition, context: TaskContext): Iterator[Int] = {
if (shouldFail) {
throw new Exception("injected failure")
} else {
@@ -122,7 +127,7 @@ class RDDSuite extends FunSuite with LocalSparkContext {
sc = new SparkContext("local", "test")
val data = sc.parallelize(1 to 10, 10)
- val coalesced1 = new CoalescedRDD(data, 2)
+ val coalesced1 = data.coalesce(2)
assert(coalesced1.collect().toList === (1 to 10).toList)
assert(coalesced1.glom().collect().map(_.toList).toList ===
List(List(1, 2, 3, 4, 5), List(6, 7, 8, 9, 10)))
@@ -133,19 +138,19 @@ class RDDSuite extends FunSuite with LocalSparkContext {
assert(coalesced1.dependencies.head.asInstanceOf[NarrowDependency[_]].getParents(1).toList ===
List(5, 6, 7, 8, 9))
- val coalesced2 = new CoalescedRDD(data, 3)
+ val coalesced2 = data.coalesce(3)
assert(coalesced2.collect().toList === (1 to 10).toList)
assert(coalesced2.glom().collect().map(_.toList).toList ===
List(List(1, 2, 3), List(4, 5, 6), List(7, 8, 9, 10)))
- val coalesced3 = new CoalescedRDD(data, 10)
+ val coalesced3 = data.coalesce(10)
assert(coalesced3.collect().toList === (1 to 10).toList)
assert(coalesced3.glom().collect().map(_.toList).toList ===
(1 to 10).map(x => List(x)).toList)
// If we try to coalesce into more partitions than the original RDD, it should just
// keep the original number of partitions.
- val coalesced4 = new CoalescedRDD(data, 20)
+ val coalesced4 = data.coalesce(20)
assert(coalesced4.collect().toList === (1 to 10).toList)
assert(coalesced4.glom().collect().map(_.toList).toList ===
(1 to 10).map(x => List(x)).toList)
@@ -168,7 +173,7 @@ class RDDSuite extends FunSuite with LocalSparkContext {
val data = sc.parallelize(1 to 10, 10)
// Note that split number starts from 0, so > 8 means only 10th partition left.
val prunedRdd = new PartitionPruningRDD(data, splitNum => splitNum > 8)
- assert(prunedRdd.splits.size === 1)
+ assert(prunedRdd.partitions.size === 1)
val prunedData = prunedRdd.collect()
assert(prunedData.size === 1)
assert(prunedData(0) === 10)
diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala
index 3493b9511f..8411291b2c 100644
--- a/core/src/test/scala/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/spark/ShuffleSuite.scala
@@ -1,6 +1,7 @@
package spark
import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashSet
import org.scalatest.FunSuite
import org.scalatest.matchers.ShouldMatchers
@@ -98,6 +99,28 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
val sums = pairs.reduceByKey(_+_, 10).collect()
assert(sums.toSet === Set((1, 7), (2, 1)))
}
+
+ test("reduceByKey with partitioner") {
+ sc = new SparkContext("local", "test")
+ val p = new Partitioner() {
+ def numPartitions = 2
+ def getPartition(key: Any) = key.asInstanceOf[Int]
+ }
+ val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 1), (0, 1))).partitionBy(p)
+ val sums = pairs.reduceByKey(_+_)
+ assert(sums.collect().toSet === Set((1, 4), (0, 1)))
+ assert(sums.partitioner === Some(p))
+ // count the dependencies to make sure there is only 1 ShuffledRDD
+ val deps = new HashSet[RDD[_]]()
+ def visit(r: RDD[_]) {
+ for (dep <- r.dependencies) {
+ deps += dep.rdd
+ visit(dep.rdd)
+ }
+ }
+ visit(sums)
+ assert(deps.size === 2) // ShuffledRDD, ParallelCollection
+ }
test("join") {
sc = new SparkContext("local", "test")
@@ -199,7 +222,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
sc = new SparkContext("local", "test")
val emptyDir = Files.createTempDir()
val file = sc.textFile(emptyDir.getAbsolutePath)
- assert(file.splits.size == 0)
+ assert(file.partitions.size == 0)
assert(file.collect().toList === Nil)
// Test that a shuffle on the file works, because this used to be a bug
assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
@@ -211,6 +234,51 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
assert(rdd.keys.collect().toList === List(1, 2))
assert(rdd.values.collect().toList === List("a", "b"))
}
+
+ test("default partitioner uses partition size") {
+ sc = new SparkContext("local", "test")
+ // specify 2000 partitions
+ val a = sc.makeRDD(Array(1, 2, 3, 4), 2000)
+ // do a map, which loses the partitioner
+ val b = a.map(a => (a, (a * 2).toString))
+ // then a group by, and see we didn't revert to 2 partitions
+ val c = b.groupByKey()
+ assert(c.partitions.size === 2000)
+ }
+
+ test("default partitioner uses largest partitioner") {
+ sc = new SparkContext("local", "test")
+ val a = sc.makeRDD(Array((1, "a"), (2, "b")), 2)
+ val b = sc.makeRDD(Array((1, "a"), (2, "b")), 2000)
+ val c = a.join(b)
+ assert(c.partitions.size === 2000)
+ }
+
+ test("subtract") {
+ sc = new SparkContext("local", "test")
+ val a = sc.parallelize(Array(1, 2, 3), 2)
+ val b = sc.parallelize(Array(2, 3, 4), 4)
+ val c = a.subtract(b)
+ assert(c.collect().toSet === Set(1))
+ assert(c.partitions.size === a.partitions.size)
+ }
+
+ test("subtract with narrow dependency") {
+ sc = new SparkContext("local", "test")
+ // use a deterministic partitioner
+ val p = new Partitioner() {
+ def numPartitions = 5
+ def getPartition(key: Any) = key.asInstanceOf[Int]
+ }
+ // partitionBy so we have a narrow dependency
+ val a = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"))).partitionBy(p)
+ println(sc.runJob(a, (i: Iterator[(Int, String)]) => i.toList).toList)
+ // more partitions/no partitioner so a shuffle dependency
+ val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4)
+ val c = a.subtract(b)
+ assert(c.collect().toSet === Set((1, "a"), (3, "c")))
+ assert(c.partitioner.get === p)
+ }
}
object ShuffleSuite {
diff --git a/core/src/test/scala/spark/SortingSuite.scala b/core/src/test/scala/spark/SortingSuite.scala
index edb8c839fc..495f957e53 100644
--- a/core/src/test/scala/spark/SortingSuite.scala
+++ b/core/src/test/scala/spark/SortingSuite.scala
@@ -19,7 +19,7 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w
val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
val pairs = sc.parallelize(pairArr, 2)
val sorted = pairs.sortByKey()
- assert(sorted.splits.size === 2)
+ assert(sorted.partitions.size === 2)
assert(sorted.collect() === pairArr.sortBy(_._1))
}
@@ -29,17 +29,17 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w
val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
val pairs = sc.parallelize(pairArr, 2)
val sorted = pairs.sortByKey(true, 1)
- assert(sorted.splits.size === 1)
+ assert(sorted.partitions.size === 1)
assert(sorted.collect() === pairArr.sortBy(_._1))
}
- test("large array with many splits") {
+ test("large array with many partitions") {
sc = new SparkContext("local", "test")
val rand = new scala.util.Random()
val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
val pairs = sc.parallelize(pairArr, 2)
val sorted = pairs.sortByKey(true, 20)
- assert(sorted.splits.size === 20)
+ assert(sorted.partitions.size === 20)
assert(sorted.collect() === pairArr.sortBy(_._1))
}
@@ -59,7 +59,7 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w
assert(pairs.sortByKey(false, 1).collect() === pairArr.sortWith((x, y) => x._1 > y._1))
}
- test("sort descending with many splits") {
+ test("sort descending with many partitions") {
sc = new SparkContext("local", "test")
val rand = new scala.util.Random()
val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
diff --git a/core/src/test/scala/spark/ParallelCollectionSplitSuite.scala b/core/src/test/scala/spark/rdd/ParallelCollectionSplitSuite.scala
index 450c69bd58..d27a2538e4 100644
--- a/core/src/test/scala/spark/ParallelCollectionSplitSuite.scala
+++ b/core/src/test/scala/spark/rdd/ParallelCollectionSplitSuite.scala
@@ -1,4 +1,4 @@
-package spark
+package spark.rdd
import scala.collection.immutable.NumericRange
@@ -11,7 +11,7 @@ import org.scalacheck.Prop._
class ParallelCollectionSplitSuite extends FunSuite with Checkers {
test("one element per slice") {
val data = Array(1, 2, 3)
- val slices = ParallelCollection.slice(data, 3)
+ val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3)
assert(slices(0).mkString(",") === "1")
assert(slices(1).mkString(",") === "2")
@@ -20,14 +20,14 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
test("one slice") {
val data = Array(1, 2, 3)
- val slices = ParallelCollection.slice(data, 1)
+ val slices = ParallelCollectionRDD.slice(data, 1)
assert(slices.size === 1)
assert(slices(0).mkString(",") === "1,2,3")
}
test("equal slices") {
val data = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
- val slices = ParallelCollection.slice(data, 3)
+ val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3)
assert(slices(0).mkString(",") === "1,2,3")
assert(slices(1).mkString(",") === "4,5,6")
@@ -36,7 +36,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
test("non-equal slices") {
val data = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
- val slices = ParallelCollection.slice(data, 3)
+ val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3)
assert(slices(0).mkString(",") === "1,2,3")
assert(slices(1).mkString(",") === "4,5,6")
@@ -45,7 +45,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
test("splitting exclusive range") {
val data = 0 until 100
- val slices = ParallelCollection.slice(data, 3)
+ val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3)
assert(slices(0).mkString(",") === (0 to 32).mkString(","))
assert(slices(1).mkString(",") === (33 to 65).mkString(","))
@@ -54,7 +54,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
test("splitting inclusive range") {
val data = 0 to 100
- val slices = ParallelCollection.slice(data, 3)
+ val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3)
assert(slices(0).mkString(",") === (0 to 32).mkString(","))
assert(slices(1).mkString(",") === (33 to 66).mkString(","))
@@ -63,24 +63,24 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
test("empty data") {
val data = new Array[Int](0)
- val slices = ParallelCollection.slice(data, 5)
+ val slices = ParallelCollectionRDD.slice(data, 5)
assert(slices.size === 5)
for (slice <- slices) assert(slice.size === 0)
}
test("zero slices") {
val data = Array(1, 2, 3)
- intercept[IllegalArgumentException] { ParallelCollection.slice(data, 0) }
+ intercept[IllegalArgumentException] { ParallelCollectionRDD.slice(data, 0) }
}
test("negative number of slices") {
val data = Array(1, 2, 3)
- intercept[IllegalArgumentException] { ParallelCollection.slice(data, -5) }
+ intercept[IllegalArgumentException] { ParallelCollectionRDD.slice(data, -5) }
}
test("exclusive ranges sliced into ranges") {
val data = 1 until 100
- val slices = ParallelCollection.slice(data, 3)
+ val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3)
assert(slices.map(_.size).reduceLeft(_+_) === 99)
assert(slices.forall(_.isInstanceOf[Range]))
@@ -88,7 +88,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
test("inclusive ranges sliced into ranges") {
val data = 1 to 100
- val slices = ParallelCollection.slice(data, 3)
+ val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3)
assert(slices.map(_.size).reduceLeft(_+_) === 100)
assert(slices.forall(_.isInstanceOf[Range]))
@@ -97,7 +97,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
test("large ranges don't overflow") {
val N = 100 * 1000 * 1000
val data = 0 until N
- val slices = ParallelCollection.slice(data, 40)
+ val slices = ParallelCollectionRDD.slice(data, 40)
assert(slices.size === 40)
for (i <- 0 until 40) {
assert(slices(i).isInstanceOf[Range])
@@ -117,7 +117,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
(tuple: (List[Int], Int)) =>
val d = tuple._1
val n = tuple._2
- val slices = ParallelCollection.slice(d, n)
+ val slices = ParallelCollectionRDD.slice(d, n)
("n slices" |: slices.size == n) &&
("concat to d" |: Seq.concat(slices: _*).mkString(",") == d.mkString(",")) &&
("equal sizes" |: slices.map(_.size).forall(x => x==d.size/n || x==d.size/n+1))
@@ -134,7 +134,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
} yield (a until b by step, n)
val prop = forAll(gen) {
case (d: Range, n: Int) =>
- val slices = ParallelCollection.slice(d, n)
+ val slices = ParallelCollectionRDD.slice(d, n)
("n slices" |: slices.size == n) &&
("all ranges" |: slices.forall(_.isInstanceOf[Range])) &&
("concat to d" |: Seq.concat(slices: _*).mkString(",") == d.mkString(",")) &&
@@ -152,7 +152,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
} yield (a to b by step, n)
val prop = forAll(gen) {
case (d: Range, n: Int) =>
- val slices = ParallelCollection.slice(d, n)
+ val slices = ParallelCollectionRDD.slice(d, n)
("n slices" |: slices.size == n) &&
("all ranges" |: slices.forall(_.isInstanceOf[Range])) &&
("concat to d" |: Seq.concat(slices: _*).mkString(",") == d.mkString(",")) &&
@@ -163,7 +163,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
test("exclusive ranges of longs") {
val data = 1L until 100L
- val slices = ParallelCollection.slice(data, 3)
+ val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3)
assert(slices.map(_.size).reduceLeft(_+_) === 99)
assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
@@ -171,7 +171,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
test("inclusive ranges of longs") {
val data = 1L to 100L
- val slices = ParallelCollection.slice(data, 3)
+ val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3)
assert(slices.map(_.size).reduceLeft(_+_) === 100)
assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
@@ -179,7 +179,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
test("exclusive ranges of doubles") {
val data = 1.0 until 100.0 by 1.0
- val slices = ParallelCollection.slice(data, 3)
+ val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3)
assert(slices.map(_.size).reduceLeft(_+_) === 99)
assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
@@ -187,7 +187,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
test("inclusive ranges of doubles") {
val data = 1.0 to 100.0 by 1.0
- val slices = ParallelCollection.slice(data, 3)
+ val slices = ParallelCollectionRDD.slice(data, 3)
assert(slices.size === 3)
assert(slices.map(_.size).reduceLeft(_+_) === 100)
assert(slices.forall(_.isInstanceOf[NumericRange[_]]))
diff --git a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
index edc5a7dfba..07cccc7ce0 100644
--- a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
@@ -14,7 +14,7 @@ import spark.MapOutputTracker
import spark.RDD
import spark.SparkContext
import spark.SparkException
-import spark.Split
+import spark.Partition
import spark.TaskContext
import spark.TaskEndReason
@@ -111,18 +111,18 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter {
* so we can test that DAGScheduler does not try to execute RDDs locally.
*/
private def makeRdd(
- numSplits: Int,
+ numPartitions: Int,
dependencies: List[Dependency[_]],
locations: Seq[Seq[String]] = Nil
): MyRDD = {
- val maxSplit = numSplits - 1
+ val maxPartition = numPartitions - 1
return new MyRDD(sc, dependencies) {
- override def compute(split: Split, context: TaskContext): Iterator[(Int, Int)] =
+ override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
throw new RuntimeException("should not be reached")
- override def getSplits() = (0 to maxSplit).map(i => new Split {
+ override def getPartitions = (0 to maxPartition).map(i => new Partition {
override def index = i
}).toArray
- override def getPreferredLocations(split: Split): Seq[String] =
+ override def getPreferredLocations(split: Partition): Seq[String] =
if (locations.isDefinedAt(split.index))
locations(split.index)
else
@@ -196,9 +196,10 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter {
test("local job") {
val rdd = new MyRDD(sc, Nil) {
- override def compute(split: Split, context: TaskContext) = Array(42 -> 0).iterator
- override def getSplits() = Array(new Split { override def index = 0 })
- override def getPreferredLocations(split: Split) = Nil
+ override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] =
+ Array(42 -> 0).iterator
+ override def getPartitions = Array( new Partition { override def index = 0 } )
+ override def getPreferredLocations(split: Partition) = Nil
override def toString = "DAGSchedulerSuite Local RDD"
}
runEvent(JobSubmitted(rdd, jobComputeFunc, Array(0), true, null, listener))
@@ -397,4 +398,4 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter {
private def makeBlockManagerId(host: String): BlockManagerId =
BlockManagerId("exec-" + host, host, 12345)
-} \ No newline at end of file
+}
diff --git a/core/src/test/scala/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/spark/scheduler/TaskContextSuite.scala
index a5db7103f5..647bcaf860 100644
--- a/core/src/test/scala/spark/scheduler/TaskContextSuite.scala
+++ b/core/src/test/scala/spark/scheduler/TaskContextSuite.scala
@@ -5,7 +5,7 @@ import org.scalatest.BeforeAndAfter
import spark.TaskContext
import spark.RDD
import spark.SparkContext
-import spark.Split
+import spark.Partition
import spark.LocalSparkContext
class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
@@ -14,8 +14,8 @@ class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkConte
var completed = false
sc = new SparkContext("local", "test")
val rdd = new RDD[String](sc, List()) {
- override def getSplits = Array[Split](StubSplit(0))
- override def compute(split: Split, context: TaskContext) = {
+ override def getPartitions = Array[Partition](StubPartition(0))
+ override def compute(split: Partition, context: TaskContext) = {
context.addOnCompleteCallback(() => completed = true)
sys.error("failed")
}
@@ -28,5 +28,5 @@ class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkConte
assert(completed === true)
}
- case class StubSplit(val index: Int) extends Split
-} \ No newline at end of file
+ case class StubPartition(val index: Int) extends Partition
+}