diff options
author | Stephen Haberman <stephen@exigencecorp.com> | 2013-02-25 23:48:52 -0600 |
---|---|---|
committer | Stephen Haberman <stephen@exigencecorp.com> | 2013-02-25 23:48:52 -0600 |
commit | a4adeb255c66bbbb8eb7f4abcfd2b4c63906be31 (patch) | |
tree | b0071ea76237b2f1d882d75f1d24dcd3ecad6c17 /core/src/test | |
parent | 921be765339ac6a1b1a12672d73620855984eade (diff) | |
parent | d6e6abece306008c50410807669596d73d6d6738 (diff) | |
download | spark-a4adeb255c66bbbb8eb7f4abcfd2b4c63906be31.tar.gz spark-a4adeb255c66bbbb8eb7f4abcfd2b4c63906be31.tar.bz2 spark-a4adeb255c66bbbb8eb7f4abcfd2b4c63906be31.zip |
Merge branch 'master' into nomocks
Conflicts:
core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala
Diffstat (limited to 'core/src/test')
-rw-r--r-- | core/src/test/scala/spark/CheckpointSuite.scala | 126 | ||||
-rw-r--r-- | core/src/test/scala/spark/DistributedSuite.scala | 21 | ||||
-rw-r--r-- | core/src/test/scala/spark/JavaAPISuite.java | 24 | ||||
-rw-r--r-- | core/src/test/scala/spark/PartitioningSuite.scala | 8 | ||||
-rw-r--r-- | core/src/test/scala/spark/RDDSuite.scala | 21 | ||||
-rw-r--r-- | core/src/test/scala/spark/ShuffleSuite.scala | 70 | ||||
-rw-r--r-- | core/src/test/scala/spark/SortingSuite.scala | 10 | ||||
-rw-r--r-- | core/src/test/scala/spark/rdd/ParallelCollectionSplitSuite.scala (renamed from core/src/test/scala/spark/ParallelCollectionSplitSuite.scala) | 40 | ||||
-rw-r--r-- | core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala | 21 | ||||
-rw-r--r-- | core/src/test/scala/spark/scheduler/TaskContextSuite.scala | 10 |
10 files changed, 240 insertions, 111 deletions
diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala index 0b74607fb8..ca385972fb 100644 --- a/core/src/test/scala/spark/CheckpointSuite.scala +++ b/core/src/test/scala/spark/CheckpointSuite.scala @@ -34,7 +34,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { testCheckpointing(_.sample(false, 0.5, 0)) testCheckpointing(_.glom()) testCheckpointing(_.mapPartitions(_.map(_.toString))) - testCheckpointing(r => new MapPartitionsWithSplitRDD(r, + testCheckpointing(r => new MapPartitionsWithIndexRDD(r, (i: Int, iter: Iterator[Int]) => iter.map(_.toString), false )) testCheckpointing(_.map(x => (x % 2, 1)).reduceByKey(_ + _).mapValues(_.toString)) testCheckpointing(_.map(x => (x % 2, 1)).reduceByKey(_ + _).flatMapValues(x => 1 to x)) @@ -43,14 +43,14 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { test("ParallelCollection") { val parCollection = sc.makeRDD(1 to 4, 2) - val numSplits = parCollection.splits.size + val numPartitions = parCollection.partitions.size parCollection.checkpoint() assert(parCollection.dependencies === Nil) val result = parCollection.collect() assert(sc.checkpointFile[Int](parCollection.getCheckpointFile.get).collect() === result) assert(parCollection.dependencies != Nil) - assert(parCollection.splits.length === numSplits) - assert(parCollection.splits.toList === parCollection.checkpointData.get.getSplits.toList) + assert(parCollection.partitions.length === numPartitions) + assert(parCollection.partitions.toList === parCollection.checkpointData.get.getPartitions.toList) assert(parCollection.collect() === result) } @@ -59,13 +59,13 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { val blockManager = SparkEnv.get.blockManager blockManager.putSingle(blockId, "test", StorageLevel.MEMORY_ONLY) val blockRDD = new BlockRDD[String](sc, Array(blockId)) - val numSplits = blockRDD.splits.size + val numPartitions = blockRDD.partitions.size blockRDD.checkpoint() val result = blockRDD.collect() assert(sc.checkpointFile[String](blockRDD.getCheckpointFile.get).collect() === result) assert(blockRDD.dependencies != Nil) - assert(blockRDD.splits.length === numSplits) - assert(blockRDD.splits.toList === blockRDD.checkpointData.get.getSplits.toList) + assert(blockRDD.partitions.length === numPartitions) + assert(blockRDD.partitions.toList === blockRDD.checkpointData.get.getPartitions.toList) assert(blockRDD.collect() === result) } @@ -79,9 +79,9 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { test("UnionRDD") { def otherRDD = sc.makeRDD(1 to 10, 1) - // Test whether the size of UnionRDDSplits reduce in size after parent RDD is checkpointed. + // Test whether the size of UnionRDDPartitions reduce in size after parent RDD is checkpointed. // Current implementation of UnionRDD has transient reference to parent RDDs, - // so only the splits will reduce in serialized size, not the RDD. + // so only the partitions will reduce in serialized size, not the RDD. testCheckpointing(_.union(otherRDD), false, true) testParentCheckpointing(_.union(otherRDD), false, true) } @@ -91,21 +91,21 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { testCheckpointing(new CartesianRDD(sc, _, otherRDD)) // Test whether size of CoalescedRDD reduce in size after parent RDD is checkpointed - // Current implementation of CoalescedRDDSplit has transient reference to parent RDD, - // so only the RDD will reduce in serialized size, not the splits. + // Current implementation of CoalescedRDDPartition has transient reference to parent RDD, + // so only the RDD will reduce in serialized size, not the partitions. testParentCheckpointing(new CartesianRDD(sc, _, otherRDD), true, false) - // Test that the CartesianRDD updates parent splits (CartesianRDD.s1/s2) after - // the parent RDD has been checkpointed and parent splits have been changed to HadoopSplits. + // Test that the CartesianRDD updates parent partitions (CartesianRDD.s1/s2) after + // the parent RDD has been checkpointed and parent partitions have been changed to HadoopPartitions. // Note that this test is very specific to the current implementation of CartesianRDD. val ones = sc.makeRDD(1 to 100, 10).map(x => x) ones.checkpoint() // checkpoint that MappedRDD val cartesian = new CartesianRDD(sc, ones, ones) val splitBeforeCheckpoint = - serializeDeserialize(cartesian.splits.head.asInstanceOf[CartesianSplit]) + serializeDeserialize(cartesian.partitions.head.asInstanceOf[CartesianPartition]) cartesian.count() // do the checkpointing val splitAfterCheckpoint = - serializeDeserialize(cartesian.splits.head.asInstanceOf[CartesianSplit]) + serializeDeserialize(cartesian.partitions.head.asInstanceOf[CartesianPartition]) assert( (splitAfterCheckpoint.s1 != splitBeforeCheckpoint.s1) && (splitAfterCheckpoint.s2 != splitBeforeCheckpoint.s2), @@ -114,27 +114,27 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { } test("CoalescedRDD") { - testCheckpointing(new CoalescedRDD(_, 2)) + testCheckpointing(_.coalesce(2)) // Test whether size of CoalescedRDD reduce in size after parent RDD is checkpointed - // Current implementation of CoalescedRDDSplit has transient reference to parent RDD, - // so only the RDD will reduce in serialized size, not the splits. - testParentCheckpointing(new CoalescedRDD(_, 2), true, false) + // Current implementation of CoalescedRDDPartition has transient reference to parent RDD, + // so only the RDD will reduce in serialized size, not the partitions. + testParentCheckpointing(_.coalesce(2), true, false) - // Test that the CoalescedRDDSplit updates parent splits (CoalescedRDDSplit.parents) after - // the parent RDD has been checkpointed and parent splits have been changed to HadoopSplits. - // Note that this test is very specific to the current implementation of CoalescedRDDSplits + // Test that the CoalescedRDDPartition updates parent partitions (CoalescedRDDPartition.parents) after + // the parent RDD has been checkpointed and parent partitions have been changed to HadoopPartitions. + // Note that this test is very specific to the current implementation of CoalescedRDDPartitions val ones = sc.makeRDD(1 to 100, 10).map(x => x) ones.checkpoint() // checkpoint that MappedRDD val coalesced = new CoalescedRDD(ones, 2) val splitBeforeCheckpoint = - serializeDeserialize(coalesced.splits.head.asInstanceOf[CoalescedRDDSplit]) + serializeDeserialize(coalesced.partitions.head.asInstanceOf[CoalescedRDDPartition]) coalesced.count() // do the checkpointing val splitAfterCheckpoint = - serializeDeserialize(coalesced.splits.head.asInstanceOf[CoalescedRDDSplit]) + serializeDeserialize(coalesced.partitions.head.asInstanceOf[CoalescedRDDPartition]) assert( splitAfterCheckpoint.parents.head != splitBeforeCheckpoint.parents.head, - "CoalescedRDDSplit.parents not updated after parent RDD checkpointed" + "CoalescedRDDPartition.parents not updated after parent RDD checkpointed" ) } @@ -156,30 +156,40 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)), true, false) // Test whether size of ZippedRDD reduce in size after parent RDD is checkpointed - // Current implementation of ZippedRDDSplit has transient references to parent RDDs, - // so only the RDD will reduce in serialized size, not the splits. + // Current implementation of ZippedRDDPartitions has transient references to parent RDDs, + // so only the RDD will reduce in serialized size, not the partitions. testParentCheckpointing( rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)), true, false) } + test("CheckpointRDD with zero partitions") { + val rdd = new BlockRDD[Int](sc, Array[String]()) + assert(rdd.partitions.size === 0) + assert(rdd.isCheckpointed === false) + rdd.checkpoint() + assert(rdd.count() === 0) + assert(rdd.isCheckpointed === true) + assert(rdd.partitions.size === 0) + } + /** * Test checkpointing of the final RDD generated by the given operation. By default, * this method tests whether the size of serialized RDD has reduced after checkpointing or not. - * It can also test whether the size of serialized RDD splits has reduced after checkpointing or - * not, but this is not done by default as usually the splits do not refer to any RDD and + * It can also test whether the size of serialized RDD partitions has reduced after checkpointing or + * not, but this is not done by default as usually the partitions do not refer to any RDD and * therefore never store the lineage. */ def testCheckpointing[U: ClassManifest]( op: (RDD[Int]) => RDD[U], testRDDSize: Boolean = true, - testRDDSplitSize: Boolean = false + testRDDPartitionSize: Boolean = false ) { // Generate the final RDD using given RDD operation val baseRDD = generateLongLineageRDD() val operatedRDD = op(baseRDD) val parentRDD = operatedRDD.dependencies.headOption.orNull val rddType = operatedRDD.getClass.getSimpleName - val numSplits = operatedRDD.splits.length + val numPartitions = operatedRDD.partitions.length // Find serialized sizes before and after the checkpoint val (rddSizeBeforeCheckpoint, splitSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD) @@ -193,11 +203,11 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { // Test whether dependencies have been changed from its earlier parent RDD assert(operatedRDD.dependencies.head.rdd != parentRDD) - // Test whether the splits have been changed to the new Hadoop splits - assert(operatedRDD.splits.toList === operatedRDD.checkpointData.get.getSplits.toList) + // Test whether the partitions have been changed to the new Hadoop partitions + assert(operatedRDD.partitions.toList === operatedRDD.checkpointData.get.getPartitions.toList) - // Test whether the number of splits is same as before - assert(operatedRDD.splits.length === numSplits) + // Test whether the number of partitions is same as before + assert(operatedRDD.partitions.length === numPartitions) // Test whether the data in the checkpointed RDD is same as original assert(operatedRDD.collect() === result) @@ -215,18 +225,18 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { ) } - // Test whether serialized size of the splits has reduced. If the splits - // do not have any non-transient reference to another RDD or another RDD's splits, it + // Test whether serialized size of the partitions has reduced. If the partitions + // do not have any non-transient reference to another RDD or another RDD's partitions, it // does not refer to a lineage and therefore may not reduce in size after checkpointing. - // However, if the original splits before checkpointing do refer to a parent RDD, the splits + // However, if the original partitions before checkpointing do refer to a parent RDD, the partitions // must be forgotten after checkpointing (to remove all reference to parent RDDs) and - // replaced with the HadoopSplits of the checkpointed RDD. - if (testRDDSplitSize) { - logInfo("Size of " + rddType + " splits " + // replaced with the HadooPartitions of the checkpointed RDD. + if (testRDDPartitionSize) { + logInfo("Size of " + rddType + " partitions " + "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]") assert( splitSizeAfterCheckpoint < splitSizeBeforeCheckpoint, - "Size of " + rddType + " splits did not reduce after checkpointing " + + "Size of " + rddType + " partitions did not reduce after checkpointing " + "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]" ) } @@ -235,13 +245,13 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { /** * Test whether checkpointing of the parent of the generated RDD also * truncates the lineage or not. Some RDDs like CoGroupedRDD hold on to its parent - * RDDs splits. So even if the parent RDD is checkpointed and its splits changed, - * this RDD will remember the splits and therefore potentially the whole lineage. + * RDDs partitions. So even if the parent RDD is checkpointed and its partitions changed, + * this RDD will remember the partitions and therefore potentially the whole lineage. */ def testParentCheckpointing[U: ClassManifest]( op: (RDD[Int]) => RDD[U], testRDDSize: Boolean, - testRDDSplitSize: Boolean + testRDDPartitionSize: Boolean ) { // Generate the final RDD using given RDD operation val baseRDD = generateLongLineageRDD() @@ -250,9 +260,9 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { val rddType = operatedRDD.getClass.getSimpleName val parentRDDType = parentRDD.getClass.getSimpleName - // Get the splits and dependencies of the parent in case they're lazily computed + // Get the partitions and dependencies of the parent in case they're lazily computed parentRDD.dependencies - parentRDD.splits + parentRDD.partitions // Find serialized sizes before and after the checkpoint val (rddSizeBeforeCheckpoint, splitSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD) @@ -275,16 +285,16 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { ) } - // Test whether serialized size of the splits has reduced because of its parent being - // checkpointed. If the splits do not have any non-transient reference to another RDD - // or another RDD's splits, it does not refer to a lineage and therefore may not reduce - // in size after checkpointing. However, if the splits do refer to the *splits* of a parent - // RDD, then these splits must update reference to the parent RDD splits as the parent RDD's - // splits must have changed after checkpointing. - if (testRDDSplitSize) { + // Test whether serialized size of the partitions has reduced because of its parent being + // checkpointed. If the partitions do not have any non-transient reference to another RDD + // or another RDD's partitions, it does not refer to a lineage and therefore may not reduce + // in size after checkpointing. However, if the partitions do refer to the *partitions* of a parent + // RDD, then these partitions must update reference to the parent RDD partitions as the parent RDD's + // partitions must have changed after checkpointing. + if (testRDDPartitionSize) { assert( splitSizeAfterCheckpoint < splitSizeBeforeCheckpoint, - "Size of " + rddType + " splits did not reduce after checkpointing parent " + parentRDDType + + "Size of " + rddType + " partitions did not reduce after checkpointing parent " + parentRDDType + "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]" ) } @@ -321,12 +331,12 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { } /** - * Get serialized sizes of the RDD and its splits, in order to test whether the size shrinks + * Get serialized sizes of the RDD and its partitions, in order to test whether the size shrinks * upon checkpointing. Ignores the checkpointData field, which may grow when we checkpoint. */ def getSerializedSizes(rdd: RDD[_]): (Int, Int) = { (Utils.serialize(rdd).length - Utils.serialize(rdd.checkpointData).length, - Utils.serialize(rdd.splits).length) + Utils.serialize(rdd.partitions).length) } /** @@ -347,7 +357,7 @@ object CheckpointSuite { def cogroup[K, V](first: RDD[(K, V)], second: RDD[(K, V)], part: Partitioner) = { //println("First = " + first + ", second = " + second) new CoGroupedRDD[K]( - Seq(first.asInstanceOf[RDD[(_, _)]], second.asInstanceOf[RDD[(_, _)]]), + Seq(first.asInstanceOf[RDD[(K, _)]], second.asInstanceOf[RDD[(K, _)]]), part ).asInstanceOf[RDD[(K, Seq[Seq[V]])]] } diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala index 0e2585daa4..caa4ba3a37 100644 --- a/core/src/test/scala/spark/DistributedSuite.scala +++ b/core/src/test/scala/spark/DistributedSuite.scala @@ -217,6 +217,27 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter assert(grouped.collect.size === 1) } } + + test("recover from node failures with replication") { + import DistributedSuite.{markNodeIfIdentity, failOnMarkedIdentity} + DistributedSuite.amMaster = true + // Using more than two nodes so we don't have a symmetric communication pattern and might + // cache a partially correct list of peers. + sc = new SparkContext("local-cluster[3,1,512]", "test") + for (i <- 1 to 3) { + val data = sc.parallelize(Seq(true, false, false, false), 4) + data.persist(StorageLevel.MEMORY_ONLY_2) + + assert(data.count === 4) + assert(data.map(markNodeIfIdentity).collect.size === 4) + assert(data.map(failOnMarkedIdentity).collect.size === 4) + + // Create a new replicated RDD to make sure that cached peer information doesn't cause + // problems. + val data2 = sc.parallelize(Seq(true, true), 2).persist(StorageLevel.MEMORY_ONLY_2) + assert(data2.count === 2) + } + } } object DistributedSuite { diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java index 934e4c2f67..9ffe7c5f99 100644 --- a/core/src/test/scala/spark/JavaAPISuite.java +++ b/core/src/test/scala/spark/JavaAPISuite.java @@ -696,4 +696,28 @@ public class JavaAPISuite implements Serializable { JavaRDD<Integer> recovered = sc.checkpointFile(rdd.getCheckpointFile().get()); Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), recovered.collect()); } + + @Test + public void mapOnPairRDD() { + JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1,2,3,4)); + JavaPairRDD<Integer, Integer> rdd2 = rdd1.map(new PairFunction<Integer, Integer, Integer>() { + @Override + public Tuple2<Integer, Integer> call(Integer i) throws Exception { + return new Tuple2<Integer, Integer>(i, i % 2); + } + }); + JavaPairRDD<Integer, Integer> rdd3 = rdd2.map( + new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() { + @Override + public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> in) throws Exception { + return new Tuple2<Integer, Integer>(in._2(), in._1()); + } + }); + Assert.assertEquals(Arrays.asList( + new Tuple2<Integer, Integer>(1, 1), + new Tuple2<Integer, Integer>(0, 2), + new Tuple2<Integer, Integer>(1, 3), + new Tuple2<Integer, Integer>(0, 4)), rdd3.collect()); + + } } diff --git a/core/src/test/scala/spark/PartitioningSuite.scala b/core/src/test/scala/spark/PartitioningSuite.scala index af1107cd19..60db759c25 100644 --- a/core/src/test/scala/spark/PartitioningSuite.scala +++ b/core/src/test/scala/spark/PartitioningSuite.scala @@ -84,10 +84,10 @@ class PartitioningSuite extends FunSuite with LocalSparkContext { assert(grouped4.groupByKey(3).partitioner != grouped4.partitioner) assert(grouped4.groupByKey(4).partitioner === grouped4.partitioner) - assert(grouped2.join(grouped4).partitioner === grouped2.partitioner) - assert(grouped2.leftOuterJoin(grouped4).partitioner === grouped2.partitioner) - assert(grouped2.rightOuterJoin(grouped4).partitioner === grouped2.partitioner) - assert(grouped2.cogroup(grouped4).partitioner === grouped2.partitioner) + assert(grouped2.join(grouped4).partitioner === grouped4.partitioner) + assert(grouped2.leftOuterJoin(grouped4).partitioner === grouped4.partitioner) + assert(grouped2.rightOuterJoin(grouped4).partitioner === grouped4.partitioner) + assert(grouped2.cogroup(grouped4).partitioner === grouped4.partitioner) assert(grouped2.join(reduced2).partitioner === grouped2.partitioner) assert(grouped2.leftOuterJoin(reduced2).partitioner === grouped2.partitioner) diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index fe7deb10d6..9739ba869b 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -33,6 +33,11 @@ class RDDSuite extends FunSuite with LocalSparkContext { } assert(partitionSumsWithSplit.collect().toList === List((0, 3), (1, 7))) + val partitionSumsWithIndex = nums.mapPartitionsWithIndex { + case(split, iter) => Iterator((split, iter.reduceLeft(_ + _))) + } + assert(partitionSumsWithIndex.collect().toList === List((0, 3), (1, 7))) + intercept[UnsupportedOperationException] { nums.filter(_ > 5).reduce(_ + _) } @@ -97,12 +102,12 @@ class RDDSuite extends FunSuite with LocalSparkContext { test("caching with failures") { sc = new SparkContext("local", "test") - val onlySplit = new Split { override def index: Int = 0 } + val onlySplit = new Partition { override def index: Int = 0 } var shouldFail = true val rdd = new RDD[Int](sc, Nil) { - override def getSplits: Array[Split] = Array(onlySplit) + override def getPartitions: Array[Partition] = Array(onlySplit) override val getDependencies = List[Dependency[_]]() - override def compute(split: Split, context: TaskContext): Iterator[Int] = { + override def compute(split: Partition, context: TaskContext): Iterator[Int] = { if (shouldFail) { throw new Exception("injected failure") } else { @@ -122,7 +127,7 @@ class RDDSuite extends FunSuite with LocalSparkContext { sc = new SparkContext("local", "test") val data = sc.parallelize(1 to 10, 10) - val coalesced1 = new CoalescedRDD(data, 2) + val coalesced1 = data.coalesce(2) assert(coalesced1.collect().toList === (1 to 10).toList) assert(coalesced1.glom().collect().map(_.toList).toList === List(List(1, 2, 3, 4, 5), List(6, 7, 8, 9, 10))) @@ -133,19 +138,19 @@ class RDDSuite extends FunSuite with LocalSparkContext { assert(coalesced1.dependencies.head.asInstanceOf[NarrowDependency[_]].getParents(1).toList === List(5, 6, 7, 8, 9)) - val coalesced2 = new CoalescedRDD(data, 3) + val coalesced2 = data.coalesce(3) assert(coalesced2.collect().toList === (1 to 10).toList) assert(coalesced2.glom().collect().map(_.toList).toList === List(List(1, 2, 3), List(4, 5, 6), List(7, 8, 9, 10))) - val coalesced3 = new CoalescedRDD(data, 10) + val coalesced3 = data.coalesce(10) assert(coalesced3.collect().toList === (1 to 10).toList) assert(coalesced3.glom().collect().map(_.toList).toList === (1 to 10).map(x => List(x)).toList) // If we try to coalesce into more partitions than the original RDD, it should just // keep the original number of partitions. - val coalesced4 = new CoalescedRDD(data, 20) + val coalesced4 = data.coalesce(20) assert(coalesced4.collect().toList === (1 to 10).toList) assert(coalesced4.glom().collect().map(_.toList).toList === (1 to 10).map(x => List(x)).toList) @@ -168,7 +173,7 @@ class RDDSuite extends FunSuite with LocalSparkContext { val data = sc.parallelize(1 to 10, 10) // Note that split number starts from 0, so > 8 means only 10th partition left. val prunedRdd = new PartitionPruningRDD(data, splitNum => splitNum > 8) - assert(prunedRdd.splits.size === 1) + assert(prunedRdd.partitions.size === 1) val prunedData = prunedRdd.collect() assert(prunedData.size === 1) assert(prunedData(0) === 10) diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index 3493b9511f..8411291b2c 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -1,6 +1,7 @@ package spark import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.HashSet import org.scalatest.FunSuite import org.scalatest.matchers.ShouldMatchers @@ -98,6 +99,28 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext { val sums = pairs.reduceByKey(_+_, 10).collect() assert(sums.toSet === Set((1, 7), (2, 1))) } + + test("reduceByKey with partitioner") { + sc = new SparkContext("local", "test") + val p = new Partitioner() { + def numPartitions = 2 + def getPartition(key: Any) = key.asInstanceOf[Int] + } + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 1), (0, 1))).partitionBy(p) + val sums = pairs.reduceByKey(_+_) + assert(sums.collect().toSet === Set((1, 4), (0, 1))) + assert(sums.partitioner === Some(p)) + // count the dependencies to make sure there is only 1 ShuffledRDD + val deps = new HashSet[RDD[_]]() + def visit(r: RDD[_]) { + for (dep <- r.dependencies) { + deps += dep.rdd + visit(dep.rdd) + } + } + visit(sums) + assert(deps.size === 2) // ShuffledRDD, ParallelCollection + } test("join") { sc = new SparkContext("local", "test") @@ -199,7 +222,7 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext { sc = new SparkContext("local", "test") val emptyDir = Files.createTempDir() val file = sc.textFile(emptyDir.getAbsolutePath) - assert(file.splits.size == 0) + assert(file.partitions.size == 0) assert(file.collect().toList === Nil) // Test that a shuffle on the file works, because this used to be a bug assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil) @@ -211,6 +234,51 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext { assert(rdd.keys.collect().toList === List(1, 2)) assert(rdd.values.collect().toList === List("a", "b")) } + + test("default partitioner uses partition size") { + sc = new SparkContext("local", "test") + // specify 2000 partitions + val a = sc.makeRDD(Array(1, 2, 3, 4), 2000) + // do a map, which loses the partitioner + val b = a.map(a => (a, (a * 2).toString)) + // then a group by, and see we didn't revert to 2 partitions + val c = b.groupByKey() + assert(c.partitions.size === 2000) + } + + test("default partitioner uses largest partitioner") { + sc = new SparkContext("local", "test") + val a = sc.makeRDD(Array((1, "a"), (2, "b")), 2) + val b = sc.makeRDD(Array((1, "a"), (2, "b")), 2000) + val c = a.join(b) + assert(c.partitions.size === 2000) + } + + test("subtract") { + sc = new SparkContext("local", "test") + val a = sc.parallelize(Array(1, 2, 3), 2) + val b = sc.parallelize(Array(2, 3, 4), 4) + val c = a.subtract(b) + assert(c.collect().toSet === Set(1)) + assert(c.partitions.size === a.partitions.size) + } + + test("subtract with narrow dependency") { + sc = new SparkContext("local", "test") + // use a deterministic partitioner + val p = new Partitioner() { + def numPartitions = 5 + def getPartition(key: Any) = key.asInstanceOf[Int] + } + // partitionBy so we have a narrow dependency + val a = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"))).partitionBy(p) + println(sc.runJob(a, (i: Iterator[(Int, String)]) => i.toList).toList) + // more partitions/no partitioner so a shuffle dependency + val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4) + val c = a.subtract(b) + assert(c.collect().toSet === Set((1, "a"), (3, "c"))) + assert(c.partitioner.get === p) + } } object ShuffleSuite { diff --git a/core/src/test/scala/spark/SortingSuite.scala b/core/src/test/scala/spark/SortingSuite.scala index edb8c839fc..495f957e53 100644 --- a/core/src/test/scala/spark/SortingSuite.scala +++ b/core/src/test/scala/spark/SortingSuite.scala @@ -19,7 +19,7 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) } val pairs = sc.parallelize(pairArr, 2) val sorted = pairs.sortByKey() - assert(sorted.splits.size === 2) + assert(sorted.partitions.size === 2) assert(sorted.collect() === pairArr.sortBy(_._1)) } @@ -29,17 +29,17 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) } val pairs = sc.parallelize(pairArr, 2) val sorted = pairs.sortByKey(true, 1) - assert(sorted.splits.size === 1) + assert(sorted.partitions.size === 1) assert(sorted.collect() === pairArr.sortBy(_._1)) } - test("large array with many splits") { + test("large array with many partitions") { sc = new SparkContext("local", "test") val rand = new scala.util.Random() val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) } val pairs = sc.parallelize(pairArr, 2) val sorted = pairs.sortByKey(true, 20) - assert(sorted.splits.size === 20) + assert(sorted.partitions.size === 20) assert(sorted.collect() === pairArr.sortBy(_._1)) } @@ -59,7 +59,7 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w assert(pairs.sortByKey(false, 1).collect() === pairArr.sortWith((x, y) => x._1 > y._1)) } - test("sort descending with many splits") { + test("sort descending with many partitions") { sc = new SparkContext("local", "test") val rand = new scala.util.Random() val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) } diff --git a/core/src/test/scala/spark/ParallelCollectionSplitSuite.scala b/core/src/test/scala/spark/rdd/ParallelCollectionSplitSuite.scala index 450c69bd58..d27a2538e4 100644 --- a/core/src/test/scala/spark/ParallelCollectionSplitSuite.scala +++ b/core/src/test/scala/spark/rdd/ParallelCollectionSplitSuite.scala @@ -1,4 +1,4 @@ -package spark +package spark.rdd import scala.collection.immutable.NumericRange @@ -11,7 +11,7 @@ import org.scalacheck.Prop._ class ParallelCollectionSplitSuite extends FunSuite with Checkers { test("one element per slice") { val data = Array(1, 2, 3) - val slices = ParallelCollection.slice(data, 3) + val slices = ParallelCollectionRDD.slice(data, 3) assert(slices.size === 3) assert(slices(0).mkString(",") === "1") assert(slices(1).mkString(",") === "2") @@ -20,14 +20,14 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { test("one slice") { val data = Array(1, 2, 3) - val slices = ParallelCollection.slice(data, 1) + val slices = ParallelCollectionRDD.slice(data, 1) assert(slices.size === 1) assert(slices(0).mkString(",") === "1,2,3") } test("equal slices") { val data = Array(1, 2, 3, 4, 5, 6, 7, 8, 9) - val slices = ParallelCollection.slice(data, 3) + val slices = ParallelCollectionRDD.slice(data, 3) assert(slices.size === 3) assert(slices(0).mkString(",") === "1,2,3") assert(slices(1).mkString(",") === "4,5,6") @@ -36,7 +36,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { test("non-equal slices") { val data = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) - val slices = ParallelCollection.slice(data, 3) + val slices = ParallelCollectionRDD.slice(data, 3) assert(slices.size === 3) assert(slices(0).mkString(",") === "1,2,3") assert(slices(1).mkString(",") === "4,5,6") @@ -45,7 +45,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { test("splitting exclusive range") { val data = 0 until 100 - val slices = ParallelCollection.slice(data, 3) + val slices = ParallelCollectionRDD.slice(data, 3) assert(slices.size === 3) assert(slices(0).mkString(",") === (0 to 32).mkString(",")) assert(slices(1).mkString(",") === (33 to 65).mkString(",")) @@ -54,7 +54,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { test("splitting inclusive range") { val data = 0 to 100 - val slices = ParallelCollection.slice(data, 3) + val slices = ParallelCollectionRDD.slice(data, 3) assert(slices.size === 3) assert(slices(0).mkString(",") === (0 to 32).mkString(",")) assert(slices(1).mkString(",") === (33 to 66).mkString(",")) @@ -63,24 +63,24 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { test("empty data") { val data = new Array[Int](0) - val slices = ParallelCollection.slice(data, 5) + val slices = ParallelCollectionRDD.slice(data, 5) assert(slices.size === 5) for (slice <- slices) assert(slice.size === 0) } test("zero slices") { val data = Array(1, 2, 3) - intercept[IllegalArgumentException] { ParallelCollection.slice(data, 0) } + intercept[IllegalArgumentException] { ParallelCollectionRDD.slice(data, 0) } } test("negative number of slices") { val data = Array(1, 2, 3) - intercept[IllegalArgumentException] { ParallelCollection.slice(data, -5) } + intercept[IllegalArgumentException] { ParallelCollectionRDD.slice(data, -5) } } test("exclusive ranges sliced into ranges") { val data = 1 until 100 - val slices = ParallelCollection.slice(data, 3) + val slices = ParallelCollectionRDD.slice(data, 3) assert(slices.size === 3) assert(slices.map(_.size).reduceLeft(_+_) === 99) assert(slices.forall(_.isInstanceOf[Range])) @@ -88,7 +88,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { test("inclusive ranges sliced into ranges") { val data = 1 to 100 - val slices = ParallelCollection.slice(data, 3) + val slices = ParallelCollectionRDD.slice(data, 3) assert(slices.size === 3) assert(slices.map(_.size).reduceLeft(_+_) === 100) assert(slices.forall(_.isInstanceOf[Range])) @@ -97,7 +97,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { test("large ranges don't overflow") { val N = 100 * 1000 * 1000 val data = 0 until N - val slices = ParallelCollection.slice(data, 40) + val slices = ParallelCollectionRDD.slice(data, 40) assert(slices.size === 40) for (i <- 0 until 40) { assert(slices(i).isInstanceOf[Range]) @@ -117,7 +117,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { (tuple: (List[Int], Int)) => val d = tuple._1 val n = tuple._2 - val slices = ParallelCollection.slice(d, n) + val slices = ParallelCollectionRDD.slice(d, n) ("n slices" |: slices.size == n) && ("concat to d" |: Seq.concat(slices: _*).mkString(",") == d.mkString(",")) && ("equal sizes" |: slices.map(_.size).forall(x => x==d.size/n || x==d.size/n+1)) @@ -134,7 +134,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { } yield (a until b by step, n) val prop = forAll(gen) { case (d: Range, n: Int) => - val slices = ParallelCollection.slice(d, n) + val slices = ParallelCollectionRDD.slice(d, n) ("n slices" |: slices.size == n) && ("all ranges" |: slices.forall(_.isInstanceOf[Range])) && ("concat to d" |: Seq.concat(slices: _*).mkString(",") == d.mkString(",")) && @@ -152,7 +152,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { } yield (a to b by step, n) val prop = forAll(gen) { case (d: Range, n: Int) => - val slices = ParallelCollection.slice(d, n) + val slices = ParallelCollectionRDD.slice(d, n) ("n slices" |: slices.size == n) && ("all ranges" |: slices.forall(_.isInstanceOf[Range])) && ("concat to d" |: Seq.concat(slices: _*).mkString(",") == d.mkString(",")) && @@ -163,7 +163,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { test("exclusive ranges of longs") { val data = 1L until 100L - val slices = ParallelCollection.slice(data, 3) + val slices = ParallelCollectionRDD.slice(data, 3) assert(slices.size === 3) assert(slices.map(_.size).reduceLeft(_+_) === 99) assert(slices.forall(_.isInstanceOf[NumericRange[_]])) @@ -171,7 +171,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { test("inclusive ranges of longs") { val data = 1L to 100L - val slices = ParallelCollection.slice(data, 3) + val slices = ParallelCollectionRDD.slice(data, 3) assert(slices.size === 3) assert(slices.map(_.size).reduceLeft(_+_) === 100) assert(slices.forall(_.isInstanceOf[NumericRange[_]])) @@ -179,7 +179,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { test("exclusive ranges of doubles") { val data = 1.0 until 100.0 by 1.0 - val slices = ParallelCollection.slice(data, 3) + val slices = ParallelCollectionRDD.slice(data, 3) assert(slices.size === 3) assert(slices.map(_.size).reduceLeft(_+_) === 99) assert(slices.forall(_.isInstanceOf[NumericRange[_]])) @@ -187,7 +187,7 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers { test("inclusive ranges of doubles") { val data = 1.0 to 100.0 by 1.0 - val slices = ParallelCollection.slice(data, 3) + val slices = ParallelCollectionRDD.slice(data, 3) assert(slices.size === 3) assert(slices.map(_.size).reduceLeft(_+_) === 100) assert(slices.forall(_.isInstanceOf[NumericRange[_]])) diff --git a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala index edc5a7dfba..07cccc7ce0 100644 --- a/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/spark/scheduler/DAGSchedulerSuite.scala @@ -14,7 +14,7 @@ import spark.MapOutputTracker import spark.RDD import spark.SparkContext import spark.SparkException -import spark.Split +import spark.Partition import spark.TaskContext import spark.TaskEndReason @@ -111,18 +111,18 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter { * so we can test that DAGScheduler does not try to execute RDDs locally. */ private def makeRdd( - numSplits: Int, + numPartitions: Int, dependencies: List[Dependency[_]], locations: Seq[Seq[String]] = Nil ): MyRDD = { - val maxSplit = numSplits - 1 + val maxPartition = numPartitions - 1 return new MyRDD(sc, dependencies) { - override def compute(split: Split, context: TaskContext): Iterator[(Int, Int)] = + override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = throw new RuntimeException("should not be reached") - override def getSplits() = (0 to maxSplit).map(i => new Split { + override def getPartitions = (0 to maxPartition).map(i => new Partition { override def index = i }).toArray - override def getPreferredLocations(split: Split): Seq[String] = + override def getPreferredLocations(split: Partition): Seq[String] = if (locations.isDefinedAt(split.index)) locations(split.index) else @@ -196,9 +196,10 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter { test("local job") { val rdd = new MyRDD(sc, Nil) { - override def compute(split: Split, context: TaskContext) = Array(42 -> 0).iterator - override def getSplits() = Array(new Split { override def index = 0 }) - override def getPreferredLocations(split: Split) = Nil + override def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = + Array(42 -> 0).iterator + override def getPartitions = Array( new Partition { override def index = 0 } ) + override def getPreferredLocations(split: Partition) = Nil override def toString = "DAGSchedulerSuite Local RDD" } runEvent(JobSubmitted(rdd, jobComputeFunc, Array(0), true, null, listener)) @@ -397,4 +398,4 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter { private def makeBlockManagerId(host: String): BlockManagerId = BlockManagerId("exec-" + host, host, 12345) -}
\ No newline at end of file +} diff --git a/core/src/test/scala/spark/scheduler/TaskContextSuite.scala b/core/src/test/scala/spark/scheduler/TaskContextSuite.scala index a5db7103f5..647bcaf860 100644 --- a/core/src/test/scala/spark/scheduler/TaskContextSuite.scala +++ b/core/src/test/scala/spark/scheduler/TaskContextSuite.scala @@ -5,7 +5,7 @@ import org.scalatest.BeforeAndAfter import spark.TaskContext import spark.RDD import spark.SparkContext -import spark.Split +import spark.Partition import spark.LocalSparkContext class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkContext { @@ -14,8 +14,8 @@ class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkConte var completed = false sc = new SparkContext("local", "test") val rdd = new RDD[String](sc, List()) { - override def getSplits = Array[Split](StubSplit(0)) - override def compute(split: Split, context: TaskContext) = { + override def getPartitions = Array[Partition](StubPartition(0)) + override def compute(split: Partition, context: TaskContext) = { context.addOnCompleteCallback(() => completed = true) sys.error("failed") } @@ -28,5 +28,5 @@ class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkConte assert(completed === true) } - case class StubSplit(val index: Int) extends Split -}
\ No newline at end of file + case class StubPartition(val index: Int) extends Partition +} |