diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-20 00:41:47 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-20 00:41:47 -0800 |
commit | 61f4bbda0d4e3ecbd8b955232a741231936a25de (patch) | |
tree | cc0da62bfcc95670c942ed5dcb080f55cc7e5590 /core/src/test/scala | |
parent | de41c436a0088efc83bc4705dcd279e61b085759 (diff) | |
download | spark-61f4bbda0d4e3ecbd8b955232a741231936a25de.tar.gz spark-61f4bbda0d4e3ecbd8b955232a741231936a25de.tar.bz2 spark-61f4bbda0d4e3ecbd8b955232a741231936a25de.zip |
Added tests for PartitionerAwareUnionRDD in the CheckpointSuite. Refactored CheckpointSuite to make the tests simpler and more reliable. Added missing test for ZippedRDD.
Diffstat (limited to 'core/src/test/scala')
-rw-r--r-- | core/src/test/scala/org/apache/spark/CheckpointSuite.scala | 361 |
1 files changed, 205 insertions, 156 deletions
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala index f25d921d3f..81046af9f3 100644 --- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala @@ -57,15 +57,15 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { } test("RDDs with one-to-one dependencies") { - testCheckpointing(_.map(x => x.toString)) - testCheckpointing(_.flatMap(x => 1 to x)) - testCheckpointing(_.filter(_ % 2 == 0)) - testCheckpointing(_.sample(false, 0.5, 0)) - testCheckpointing(_.glom()) - testCheckpointing(_.mapPartitions(_.map(_.toString))) - testCheckpointing(_.map(x => (x % 2, 1)).reduceByKey(_ + _).mapValues(_.toString)) - testCheckpointing(_.map(x => (x % 2, 1)).reduceByKey(_ + _).flatMapValues(x => 1 to x)) - testCheckpointing(_.pipe(Seq("cat"))) + testRDD(_.map(x => x.toString)) + testRDD(_.flatMap(x => 1 to x)) + testRDD(_.filter(_ % 2 == 0)) + testRDD(_.sample(false, 0.5, 0)) + testRDD(_.glom()) + testRDD(_.mapPartitions(_.map(_.toString))) + testRDD(_.map(x => (x % 2, 1)).reduceByKey(_ + _).mapValues(_.toString)) + testRDD(_.map(x => (x % 2, 1)).reduceByKey(_ + _).flatMapValues(x => 1 to x)) + testRDD(_.pipe(Seq("cat"))) } test("ParallelCollection") { @@ -97,7 +97,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { } test("ShuffledRDD") { - testCheckpointing(rdd => { + testRDD(rdd => { // Creating ShuffledRDD directly as PairRDDFunctions.combineByKey produces a MapPartitionedRDD new ShuffledRDD[Int, Int, (Int, Int)](rdd.map(x => (x % 2, 1)), partitioner) }) @@ -105,25 +105,17 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { test("UnionRDD") { def otherRDD = sc.makeRDD(1 to 10, 1) - - // Test whether the size of UnionRDDPartitions reduce in size after parent RDD is checkpointed. - // Current implementation of UnionRDD has transient reference to parent RDDs, - // so only the partitions will reduce in serialized size, not the RDD. - testCheckpointing(_.union(otherRDD), false, true) - testParentCheckpointing(_.union(otherRDD), false, true) + testRDD(_.union(otherRDD)) + testRDDPartitions(_.union(otherRDD)) } test("CartesianRDD") { def otherRDD = sc.makeRDD(1 to 10, 1) - testCheckpointing(new CartesianRDD(sc, _, otherRDD)) - - // Test whether size of CoalescedRDD reduce in size after parent RDD is checkpointed - // Current implementation of CoalescedRDDPartition has transient reference to parent RDD, - // so only the RDD will reduce in serialized size, not the partitions. - testParentCheckpointing(new CartesianRDD(sc, _, otherRDD), true, false) + testRDD(new CartesianRDD(sc, _, otherRDD)) + testRDDPartitions(new CartesianRDD(sc, _, otherRDD)) // Test that the CartesianRDD updates parent partitions (CartesianRDD.s1/s2) after - // the parent RDD has been checkpointed and parent partitions have been changed to HadoopPartitions. + // the parent RDD has been checkpointed and parent partitions have been changed. // Note that this test is very specific to the current implementation of CartesianRDD. val ones = sc.makeRDD(1 to 100, 10).map(x => x) ones.checkpoint() // checkpoint that MappedRDD @@ -134,23 +126,20 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { val splitAfterCheckpoint = serializeDeserialize(cartesian.partitions.head.asInstanceOf[CartesianPartition]) assert( - (splitAfterCheckpoint.s1 != splitBeforeCheckpoint.s1) && - (splitAfterCheckpoint.s2 != splitBeforeCheckpoint.s2), - "CartesianRDD.parents not updated after parent RDD checkpointed" + (splitAfterCheckpoint.s1.getClass != splitBeforeCheckpoint.s1.getClass) && + (splitAfterCheckpoint.s2.getClass != splitBeforeCheckpoint.s2.getClass), + "CartesianRDD.s1 and CartesianRDD.s2 not updated after parent RDD is checkpointed" ) } test("CoalescedRDD") { - testCheckpointing(_.coalesce(2)) + testRDD(_.coalesce(2)) + testRDDPartitions(_.coalesce(2)) - // Test whether size of CoalescedRDD reduce in size after parent RDD is checkpointed - // Current implementation of CoalescedRDDPartition has transient reference to parent RDD, - // so only the RDD will reduce in serialized size, not the partitions. - testParentCheckpointing(_.coalesce(2), true, false) - - // Test that the CoalescedRDDPartition updates parent partitions (CoalescedRDDPartition.parents) after - // the parent RDD has been checkpointed and parent partitions have been changed to HadoopPartitions. - // Note that this test is very specific to the current implementation of CoalescedRDDPartitions + // Test that the CoalescedRDDPartition updates parent partitions (CoalescedRDDPartition.parents) + // after the parent RDD has been checkpointed and parent partitions have been changed. + // Note that this test is very specific to the current implementation of + // CoalescedRDDPartitions. val ones = sc.makeRDD(1 to 100, 10).map(x => x) ones.checkpoint() // checkpoint that MappedRDD val coalesced = new CoalescedRDD(ones, 2) @@ -160,33 +149,78 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { val splitAfterCheckpoint = serializeDeserialize(coalesced.partitions.head.asInstanceOf[CoalescedRDDPartition]) assert( - splitAfterCheckpoint.parents.head != splitBeforeCheckpoint.parents.head, - "CoalescedRDDPartition.parents not updated after parent RDD checkpointed" + splitAfterCheckpoint.parents.head.getClass != splitBeforeCheckpoint.parents.head.getClass, + "CoalescedRDDPartition.parents not updated after parent RDD is checkpointed" ) } test("CoGroupedRDD") { - val longLineageRDD1 = generateLongLineageRDDForCoGroupedRDD() - testCheckpointing(rdd => { + val longLineageRDD1 = generateFatPairRDD() + testRDD(rdd => { CheckpointSuite.cogroup(longLineageRDD1, rdd.map(x => (x % 2, 1)), partitioner) - }, false, true) + }) - val longLineageRDD2 = generateLongLineageRDDForCoGroupedRDD() - testParentCheckpointing(rdd => { + val longLineageRDD2 = generateFatPairRDD() + testRDDPartitions(rdd => { CheckpointSuite.cogroup( longLineageRDD2, sc.makeRDD(1 to 2, 2).map(x => (x % 2, 1)), partitioner) - }, false, true) + }) } test("ZippedRDD") { - testCheckpointing( - rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)), true, false) - - // Test whether size of ZippedRDD reduce in size after parent RDD is checkpointed - // Current implementation of ZippedRDDPartitions has transient references to parent RDDs, - // so only the RDD will reduce in serialized size, not the partitions. - testParentCheckpointing( - rdd => new ZippedRDD(sc, rdd, rdd.map(x => x)), true, false) + testRDD(rdd => new ZippedRDD(sc, rdd, rdd.map(x => x))) + testRDDPartitions(rdd => new ZippedRDD(sc, rdd, rdd.map(x => x))) + + // Test that the ZippedPartition updates parent partitions + // after the parent RDD has been checkpointed and parent partitions have been changed. + // Note that this test is very specific to the current implementation of ZippedRDD. + val rdd = generateFatRDD() + val zippedRDD = new ZippedRDD(sc, rdd, rdd.map(x => x)) + zippedRDD.rdd1.checkpoint() + zippedRDD.rdd2.checkpoint() + val partitionBeforeCheckpoint = + serializeDeserialize(zippedRDD.partitions.head.asInstanceOf[ZippedPartition[_, _]]) + zippedRDD.count() + val partitionAfterCheckpoint = + serializeDeserialize(zippedRDD.partitions.head.asInstanceOf[ZippedPartition[_, _]]) + assert( + partitionAfterCheckpoint.partition1.getClass != partitionBeforeCheckpoint.partition1.getClass && + partitionAfterCheckpoint.partition2.getClass != partitionBeforeCheckpoint.partition2.getClass, + "ZippedRDD.partition1 and ZippedRDD.partition2 not updated after parent RDD is checkpointed" + ) + } + + test("PartitionerAwareUnionRDD") { + testRDD(rdd => { + new PartitionerAwareUnionRDD[(Int, Int)](sc, Array( + generateFatPairRDD(), + rdd.map(x => (x % 2, 1)).reduceByKey(partitioner, _ + _) + )) + }) + + testRDDPartitions(rdd => { + new PartitionerAwareUnionRDD[(Int, Int)](sc, Array( + generateFatPairRDD(), + rdd.map(x => (x % 2, 1)).reduceByKey(partitioner, _ + _) + )) + }) + + // Test that the PartitionerAwareUnionRDD updates parent partitions + // (PartitionerAwareUnionRDD.parents) after the parent RDD has been checkpointed and parent + // partitions have been changed. Note that this test is very specific to the current + // implementation of PartitionerAwareUnionRDD. + val pairRDD = generateFatPairRDD() + pairRDD.checkpoint() + val unionRDD = new PartitionerAwareUnionRDD(sc, Array(pairRDD)) + val partitionBeforeCheckpoint = serializeDeserialize( + unionRDD.partitions.head.asInstanceOf[PartitionerAwareUnionRDDPartition]) + pairRDD.count() + val partitionAfterCheckpoint = serializeDeserialize( + unionRDD.partitions.head.asInstanceOf[PartitionerAwareUnionRDDPartition]) + assert( + partitionBeforeCheckpoint.parents.head.getClass != partitionAfterCheckpoint.parents.head.getClass, + "PartitionerAwareUnionRDDPartition.parents not updated after parent RDD is checkpointed" + ) } test("CheckpointRDD with zero partitions") { @@ -200,29 +234,32 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { } /** - * Test checkpointing of the final RDD generated by the given operation. By default, - * this method tests whether the size of serialized RDD has reduced after checkpointing or not. - * It can also test whether the size of serialized RDD partitions has reduced after checkpointing or - * not, but this is not done by default as usually the partitions do not refer to any RDD and - * therefore never store the lineage. + * Test checkpointing of the RDD generated by the given operation. It tests whether the + * serialized size of the RDD is reduce after checkpointing or not. This function should be called + * on all RDDs that have a parent RDD (i.e., do not call on ParallelCollection, BlockRDD, etc.). */ - def testCheckpointing[U: ClassTag]( - op: (RDD[Int]) => RDD[U], - testRDDSize: Boolean = true, - testRDDPartitionSize: Boolean = false - ) { + def testRDD[U: ClassTag](op: (RDD[Int]) => RDD[U]) { // Generate the final RDD using given RDD operation - val baseRDD = generateLongLineageRDD() + val baseRDD = generateFatRDD() val operatedRDD = op(baseRDD) val parentRDD = operatedRDD.dependencies.headOption.orNull val rddType = operatedRDD.getClass.getSimpleName val numPartitions = operatedRDD.partitions.length + // Force initialization of all the data structures in RDDs + // Without this, serializing the RDD will give a wrong estimate of the size of the RDD + initializeRdd(operatedRDD) + + val partitionsBeforeCheckpoint = operatedRDD.partitions + // Find serialized sizes before and after the checkpoint - val (rddSizeBeforeCheckpoint, splitSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD) + logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString) + val (rddSizeBeforeCheckpoint, partitionSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD) operatedRDD.checkpoint() val result = operatedRDD.collect() - val (rddSizeAfterCheckpoint, splitSizeAfterCheckpoint) = getSerializedSizes(operatedRDD) + operatedRDD.collect() // force re-initialization of post-checkpoint lazy variables + val (rddSizeAfterCheckpoint, partitionSizeAfterCheckpoint) = getSerializedSizes(operatedRDD) + logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString) // Test whether the checkpoint file has been created assert(sc.checkpointFile[U](operatedRDD.getCheckpointFile.get).collect() === result) @@ -230,6 +267,9 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { // Test whether dependencies have been changed from its earlier parent RDD assert(operatedRDD.dependencies.head.rdd != parentRDD) + // Test whether the partitions have been changed from its earlier partitions + assert(operatedRDD.partitions.toList != partitionsBeforeCheckpoint.toList) + // Test whether the partitions have been changed to the new Hadoop partitions assert(operatedRDD.partitions.toList === operatedRDD.checkpointData.get.getPartitions.toList) @@ -239,122 +279,72 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { // Test whether the data in the checkpointed RDD is same as original assert(operatedRDD.collect() === result) - // Test whether serialized size of the RDD has reduced. If the RDD - // does not have any dependency to another RDD (e.g., ParallelCollection, - // ShuffleRDD with ShuffleDependency), it may not reduce in size after checkpointing. - if (testRDDSize) { - logInfo("Size of " + rddType + - "[" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]") - assert( - rddSizeAfterCheckpoint < rddSizeBeforeCheckpoint, - "Size of " + rddType + " did not reduce after checkpointing " + - "[" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]" - ) - } + // Test whether serialized size of the RDD has reduced. + logInfo("Size of " + rddType + + " [" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]") + assert( + rddSizeAfterCheckpoint < rddSizeBeforeCheckpoint, + "Size of " + rddType + " did not reduce after checkpointing " + + " [" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]" + ) - // Test whether serialized size of the partitions has reduced. If the partitions - // do not have any non-transient reference to another RDD or another RDD's partitions, it - // does not refer to a lineage and therefore may not reduce in size after checkpointing. - // However, if the original partitions before checkpointing do refer to a parent RDD, the partitions - // must be forgotten after checkpointing (to remove all reference to parent RDDs) and - // replaced with the HadooPartitions of the checkpointed RDD. - if (testRDDPartitionSize) { - logInfo("Size of " + rddType + " partitions " - + "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]") - assert( - splitSizeAfterCheckpoint < splitSizeBeforeCheckpoint, - "Size of " + rddType + " partitions did not reduce after checkpointing " + - "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]" - ) - } } /** * Test whether checkpointing of the parent of the generated RDD also * truncates the lineage or not. Some RDDs like CoGroupedRDD hold on to its parent * RDDs partitions. So even if the parent RDD is checkpointed and its partitions changed, - * this RDD will remember the partitions and therefore potentially the whole lineage. + * the generated RDD will remember the partitions and therefore potentially the whole lineage. + * This function should be called only those RDD whose partitions refer to parent RDD's + * partitions (i.e., do not call it on simple RDD like MappedRDD). + * */ - def testParentCheckpointing[U: ClassTag]( - op: (RDD[Int]) => RDD[U], - testRDDSize: Boolean, - testRDDPartitionSize: Boolean - ) { + def testRDDPartitions[U: ClassTag](op: (RDD[Int]) => RDD[U]) { // Generate the final RDD using given RDD operation - val baseRDD = generateLongLineageRDD() + val baseRDD = generateFatRDD() val operatedRDD = op(baseRDD) - val parentRDD = operatedRDD.dependencies.head.rdd + val parentRDDs = operatedRDD.dependencies.map(_.rdd) val rddType = operatedRDD.getClass.getSimpleName - val parentRDDType = parentRDD.getClass.getSimpleName - // Get the partitions and dependencies of the parent in case they're lazily computed - parentRDD.dependencies - parentRDD.partitions + // Force initialization of all the data structures in RDDs + // Without this, serializing the RDD will give a wrong estimate of the size of the RDD + initializeRdd(operatedRDD) // Find serialized sizes before and after the checkpoint - val (rddSizeBeforeCheckpoint, splitSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD) - parentRDD.checkpoint() // checkpoint the parent RDD, not the generated one - val result = operatedRDD.collect() - val (rddSizeAfterCheckpoint, splitSizeAfterCheckpoint) = getSerializedSizes(operatedRDD) + logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString) + val (rddSizeBeforeCheckpoint, partitionSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD) + parentRDDs.foreach(_.checkpoint()) // checkpoint the parent RDD, not the generated one + val result = operatedRDD.collect() // force checkpointing + operatedRDD.collect() // force re-initialization of post-checkpoint lazy variables + val (rddSizeAfterCheckpoint, partitionSizeAfterCheckpoint) = getSerializedSizes(operatedRDD) + logInfo("RDD after checkpoint: " + operatedRDD + "\n" + operatedRDD.toDebugString) // Test whether the data in the checkpointed RDD is same as original assert(operatedRDD.collect() === result) - // Test whether serialized size of the RDD has reduced because of its parent being - // checkpointed. If this RDD or its parent RDD do not have any dependency - // to another RDD (e.g., ParallelCollection, ShuffleRDD with ShuffleDependency), it may - // not reduce in size after checkpointing. - if (testRDDSize) { - assert( - rddSizeAfterCheckpoint < rddSizeBeforeCheckpoint, - "Size of " + rddType + " did not reduce after checkpointing parent " + parentRDDType + - "[" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]" - ) - } - - // Test whether serialized size of the partitions has reduced because of its parent being - // checkpointed. If the partitions do not have any non-transient reference to another RDD - // or another RDD's partitions, it does not refer to a lineage and therefore may not reduce - // in size after checkpointing. However, if the partitions do refer to the *partitions* of a parent - // RDD, then these partitions must update reference to the parent RDD partitions as the parent RDD's - // partitions must have changed after checkpointing. - if (testRDDPartitionSize) { - assert( - splitSizeAfterCheckpoint < splitSizeBeforeCheckpoint, - "Size of " + rddType + " partitions did not reduce after checkpointing parent " + parentRDDType + - "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]" - ) - } - + // Test whether serialized size of the partitions has reduced + logInfo("Size of partitions of " + rddType + + " [" + partitionSizeBeforeCheckpoint + " --> " + partitionSizeAfterCheckpoint + "]") + assert( + partitionSizeAfterCheckpoint < partitionSizeBeforeCheckpoint, + "Size of " + rddType + " partitions did not reduce after checkpointing parent RDDs" + + " [" + partitionSizeBeforeCheckpoint + " --> " + partitionSizeAfterCheckpoint + "]" + ) } /** - * Generate an RDD with a long lineage of one-to-one dependencies. + * Generate an RDD such that both the RDD and its partitions have large size. */ - def generateLongLineageRDD(): RDD[Int] = { - var rdd = sc.makeRDD(1 to 100, 4) - for (i <- 1 to 50) { - rdd = rdd.map(x => x + 1) - } - rdd + def generateFatRDD(): RDD[Int] = { + new FatRDD(sc.makeRDD(1 to 100, 4)).map(x => x) } /** - * Generate an RDD with a long lineage specifically for CoGroupedRDD. - * A CoGroupedRDD can have a long lineage only one of its parents have a long lineage - * and narrow dependency with this RDD. This method generate such an RDD by a sequence - * of cogroups and mapValues which creates a long lineage of narrow dependencies. + * Generate an pair RDD (with partitioner) such that both the RDD and its partitions + * have large size. */ - def generateLongLineageRDDForCoGroupedRDD() = { - val add = (x: (Seq[Int], Seq[Int])) => (x._1 ++ x._2).reduce(_ + _) - - def ones: RDD[(Int, Int)] = sc.makeRDD(1 to 2, 2).map(x => (x % 2, 1)).reduceByKey(partitioner, _ + _) - - var cogrouped: RDD[(Int, (Seq[Int], Seq[Int]))] = ones.cogroup(ones) - for(i <- 1 to 10) { - cogrouped = cogrouped.mapValues(add).cogroup(ones) - } - cogrouped.mapValues(add) + def generateFatPairRDD() = { + new FatPairRDD(sc.makeRDD(1 to 100, 4), partitioner).mapValues(x => x) } /** @@ -362,8 +352,26 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { * upon checkpointing. Ignores the checkpointData field, which may grow when we checkpoint. */ def getSerializedSizes(rdd: RDD[_]): (Int, Int) = { - (Utils.serialize(rdd).length - Utils.serialize(rdd.checkpointData).length, - Utils.serialize(rdd.partitions).length) + val rddSize = Utils.serialize(rdd).size + val rddCpDataSize = Utils.serialize(rdd.checkpointData).size + val rddPartitionSize = Utils.serialize(rdd.partitions).size + val rddDependenciesSize = Utils.serialize(rdd.dependencies).size + + // Print detailed size, helps in debugging + logInfo("Serialized sizes of " + rdd + + ": RDD = " + rddSize + + ", RDD checkpoint data = " + rddCpDataSize + + ", RDD partitions = " + rddPartitionSize + + ", RDD dependencies = " + rddDependenciesSize + ) + // this makes sure that serializing the RDD's checkpoint data does not + // serialize the whole RDD as well + assert( + rddSize > rddCpDataSize, + "RDD's checkpoint data (" + rddCpDataSize + ") is equal or larger than the " + + "whole RDD with checkpoint data (" + rddSize + ")" + ) + (rddSize - rddCpDataSize, rddPartitionSize) } /** @@ -375,8 +383,49 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { val bytes = Utils.serialize(obj) Utils.deserialize[T](bytes) } + + /** + * Recursively force the initialization of the all members of an RDD and it parents. + */ + def initializeRdd(rdd: RDD[_]) { + rdd.partitions // forces the + rdd.dependencies.map(_.rdd).foreach(initializeRdd(_)) + } } +/** RDD partition that has large serialized size. */ +class FatPartition(val partition: Partition) extends Partition { + val bigData = new Array[Byte](10000) + def index: Int = partition.index +} + +/** RDD that has large serialized size. */ +class FatRDD(parent: RDD[Int]) extends RDD[Int](parent) { + val bigData = new Array[Byte](100000) + + protected def getPartitions: Array[Partition] = { + parent.partitions.map(p => new FatPartition(p)) + } + + def compute(split: Partition, context: TaskContext): Iterator[Int] = { + parent.compute(split.asInstanceOf[FatPartition].partition, context) + } +} + +/** Pair RDD that has large serialized size. */ +class FatPairRDD(parent: RDD[Int], _partitioner: Partitioner) extends RDD[(Int, Int)](parent) { + val bigData = new Array[Byte](100000) + + protected def getPartitions: Array[Partition] = { + parent.partitions.map(p => new FatPartition(p)) + } + + @transient override val partitioner = Some(_partitioner) + + def compute(split: Partition, context: TaskContext): Iterator[(Int, Int)] = { + parent.compute(split.asInstanceOf[FatPartition].partition, context).map(x => (x, x)) + } +} object CheckpointSuite { // This is a custom cogroup function that does not use mapValues like |