aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-12-07 13:45:52 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2012-12-07 13:45:52 -0800
commit1f3a75ae9e518c003d84fa38a54583ecd841ffdc (patch)
tree17f262dbba0353a0bf288eb4e9f0563761410474 /core
parent21a08529768a5073bc5c15b6c2642ceef2acd0d5 (diff)
downloadspark-1f3a75ae9e518c003d84fa38a54583ecd841ffdc.tar.gz
spark-1f3a75ae9e518c003d84fa38a54583ecd841ffdc.tar.bz2
spark-1f3a75ae9e518c003d84fa38a54583ecd841ffdc.zip
Modified checkpoint testsuite to more comprehensively test checkpointing of various RDDs. Fixed checkpoint bug (splits referring to parent RDDs or parent splits) in UnionRDD and CoalescedRDD. Fixed bug in testing ShuffledRDD. Removed unnecessary and useless map-side combining step for narrow dependencies in CoGroupedRDD. Removed unncessary WeakReference stuff from many other RDDs.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/rdd/CartesianRDD.scala1
-rw-r--r--core/src/main/scala/spark/rdd/CoGroupedRDD.scala9
-rw-r--r--core/src/main/scala/spark/rdd/CoalescedRDD.scala25
-rw-r--r--core/src/main/scala/spark/rdd/FilteredRDD.scala6
-rw-r--r--core/src/main/scala/spark/rdd/FlatMappedRDD.scala6
-rw-r--r--core/src/main/scala/spark/rdd/GlommedRDD.scala6
-rw-r--r--core/src/main/scala/spark/rdd/MapPartitionsRDD.scala6
-rw-r--r--core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala6
-rw-r--r--core/src/main/scala/spark/rdd/MappedRDD.scala6
-rw-r--r--core/src/main/scala/spark/rdd/PipedRDD.scala10
-rw-r--r--core/src/main/scala/spark/rdd/SampledRDD.scala12
-rw-r--r--core/src/main/scala/spark/rdd/ShuffledRDD.scala16
-rw-r--r--core/src/main/scala/spark/rdd/UnionRDD.scala19
-rw-r--r--core/src/test/scala/spark/CheckpointSuite.scala267
14 files changed, 285 insertions, 110 deletions
diff --git a/core/src/main/scala/spark/rdd/CartesianRDD.scala b/core/src/main/scala/spark/rdd/CartesianRDD.scala
index 9bfc3f8ca3..1d753a5168 100644
--- a/core/src/main/scala/spark/rdd/CartesianRDD.scala
+++ b/core/src/main/scala/spark/rdd/CartesianRDD.scala
@@ -1,7 +1,6 @@
package spark.rdd
import spark._
-import java.lang.ref.WeakReference
private[spark]
class CartesianSplit(idx: Int, val s1: Split, val s2: Split) extends Split with Serializable {
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
index adfecea966..57d472666b 100644
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
@@ -17,6 +17,7 @@ import java.io.{ObjectOutputStream, IOException}
private[spark] sealed trait CoGroupSplitDep extends Serializable
private[spark] case class NarrowCoGroupSplitDep(rdd: RDD[_], splitIndex: Int, var split: Split = null)
extends CoGroupSplitDep {
+
@throws(classOf[IOException])
private def writeObject(oos: ObjectOutputStream) {
rdd.synchronized {
@@ -50,12 +51,12 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
var deps_ = {
val deps = new ArrayBuffer[Dependency[_]]
for ((rdd, index) <- rdds.zipWithIndex) {
- val mapSideCombinedRDD = rdd.mapPartitions(aggr.combineValuesByKey(_), true)
- if (mapSideCombinedRDD.partitioner == Some(part)) {
- logInfo("Adding one-to-one dependency with " + mapSideCombinedRDD)
- deps += new OneToOneDependency(mapSideCombinedRDD)
+ if (rdd.partitioner == Some(part)) {
+ logInfo("Adding one-to-one dependency with " + rdd)
+ deps += new OneToOneDependency(rdd)
} else {
logInfo("Adding shuffle dependency with " + rdd)
+ val mapSideCombinedRDD = rdd.mapPartitions(aggr.combineValuesByKey(_), true)
deps += new ShuffleDependency[Any, ArrayBuffer[Any]](mapSideCombinedRDD, part)
}
}
diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
index 90c3b8bfd8..0b4499e2eb 100644
--- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
@@ -1,9 +1,24 @@
package spark.rdd
import spark._
-import java.lang.ref.WeakReference
+import java.io.{ObjectOutputStream, IOException}
-private class CoalescedRDDSplit(val index: Int, val parents: Array[Split]) extends Split
+private[spark] case class CoalescedRDDSplit(
+ index: Int,
+ @transient rdd: RDD[_],
+ parentsIndices: Array[Int]
+ ) extends Split {
+ var parents: Seq[Split] = parentsIndices.map(rdd.splits(_))
+
+ @throws(classOf[IOException])
+ private def writeObject(oos: ObjectOutputStream) {
+ rdd.synchronized {
+ // Update the reference to parent split at the time of task serialization
+ parents = parentsIndices.map(rdd.splits(_))
+ oos.defaultWriteObject()
+ }
+ }
+}
/**
* Coalesce the partitions of a parent RDD (`prev`) into fewer partitions, so that each partition of
@@ -21,12 +36,12 @@ class CoalescedRDD[T: ClassManifest](
@transient var splits_ : Array[Split] = {
val prevSplits = prev.splits
if (prevSplits.length < maxPartitions) {
- prevSplits.zipWithIndex.map{ case (s, idx) => new CoalescedRDDSplit(idx, Array(s)) }
+ prevSplits.map(_.index).map{idx => new CoalescedRDDSplit(idx, prev, Array(idx)) }
} else {
(0 until maxPartitions).map { i =>
val rangeStart = (i * prevSplits.length) / maxPartitions
val rangeEnd = ((i + 1) * prevSplits.length) / maxPartitions
- new CoalescedRDDSplit(i, prevSplits.slice(rangeStart, rangeEnd))
+ new CoalescedRDDSplit(i, prev, (rangeStart until rangeEnd).toArray)
}.toArray
}
}
@@ -42,7 +57,7 @@ class CoalescedRDD[T: ClassManifest](
var deps_ : List[Dependency[_]] = List(
new NarrowDependency(prev) {
def getParents(id: Int): Seq[Int] =
- splits(id).asInstanceOf[CoalescedRDDSplit].parents.map(_.index)
+ splits(id).asInstanceOf[CoalescedRDDSplit].parentsIndices
}
)
diff --git a/core/src/main/scala/spark/rdd/FilteredRDD.scala b/core/src/main/scala/spark/rdd/FilteredRDD.scala
index 1370cf6faf..02f2e7c246 100644
--- a/core/src/main/scala/spark/rdd/FilteredRDD.scala
+++ b/core/src/main/scala/spark/rdd/FilteredRDD.scala
@@ -1,15 +1,13 @@
package spark.rdd
-import spark.OneToOneDependency
import spark.RDD
import spark.Split
-import java.lang.ref.WeakReference
private[spark]
class FilteredRDD[T: ClassManifest](
- prev: WeakReference[RDD[T]],
+ prev: RDD[T],
f: T => Boolean)
- extends RDD[T](prev.get) {
+ extends RDD[T](prev) {
override def splits = firstParent[T].splits
override def compute(split: Split) = firstParent[T].iterator(split).filter(f)
diff --git a/core/src/main/scala/spark/rdd/FlatMappedRDD.scala b/core/src/main/scala/spark/rdd/FlatMappedRDD.scala
index 6b2cc67568..cdc8ecdcfe 100644
--- a/core/src/main/scala/spark/rdd/FlatMappedRDD.scala
+++ b/core/src/main/scala/spark/rdd/FlatMappedRDD.scala
@@ -1,15 +1,13 @@
package spark.rdd
-import spark.OneToOneDependency
import spark.RDD
import spark.Split
-import java.lang.ref.WeakReference
private[spark]
class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
- prev: WeakReference[RDD[T]],
+ prev: RDD[T],
f: T => TraversableOnce[U])
- extends RDD[U](prev.get) {
+ extends RDD[U](prev) {
override def splits = firstParent[T].splits
override def compute(split: Split) = firstParent[T].iterator(split).flatMap(f)
diff --git a/core/src/main/scala/spark/rdd/GlommedRDD.scala b/core/src/main/scala/spark/rdd/GlommedRDD.scala
index 0f0b6ab0ff..df6f61c69d 100644
--- a/core/src/main/scala/spark/rdd/GlommedRDD.scala
+++ b/core/src/main/scala/spark/rdd/GlommedRDD.scala
@@ -1,13 +1,11 @@
package spark.rdd
-import spark.OneToOneDependency
import spark.RDD
import spark.Split
-import java.lang.ref.WeakReference
private[spark]
-class GlommedRDD[T: ClassManifest](prev: WeakReference[RDD[T]])
- extends RDD[Array[T]](prev.get) {
+class GlommedRDD[T: ClassManifest](prev: RDD[T])
+ extends RDD[Array[T]](prev) {
override def splits = firstParent[T].splits
override def compute(split: Split) = Array(firstParent[T].iterator(split).toArray).iterator
} \ No newline at end of file
diff --git a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala
index b04f56cfcc..23b9fb023b 100644
--- a/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala
+++ b/core/src/main/scala/spark/rdd/MapPartitionsRDD.scala
@@ -1,16 +1,14 @@
package spark.rdd
-import spark.OneToOneDependency
import spark.RDD
import spark.Split
-import java.lang.ref.WeakReference
private[spark]
class MapPartitionsRDD[U: ClassManifest, T: ClassManifest](
- prev: WeakReference[RDD[T]],
+ prev: RDD[T],
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false)
- extends RDD[U](prev.get) {
+ extends RDD[U](prev) {
override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
diff --git a/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala b/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala
index 7a4b6ffb03..41955c1d7a 100644
--- a/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala
+++ b/core/src/main/scala/spark/rdd/MapPartitionsWithSplitRDD.scala
@@ -1,9 +1,7 @@
package spark.rdd
-import spark.OneToOneDependency
import spark.RDD
import spark.Split
-import java.lang.ref.WeakReference
/**
* A variant of the MapPartitionsRDD that passes the split index into the
@@ -12,9 +10,9 @@ import java.lang.ref.WeakReference
*/
private[spark]
class MapPartitionsWithSplitRDD[U: ClassManifest, T: ClassManifest](
- prev: WeakReference[RDD[T]],
+ prev: RDD[T],
f: (Int, Iterator[T]) => Iterator[U])
- extends RDD[U](prev.get) {
+ extends RDD[U](prev) {
override def splits = firstParent[T].splits
override def compute(split: Split) = f(split.index, firstParent[T].iterator(split))
diff --git a/core/src/main/scala/spark/rdd/MappedRDD.scala b/core/src/main/scala/spark/rdd/MappedRDD.scala
index 8fa1872e0a..6f8cb21fd3 100644
--- a/core/src/main/scala/spark/rdd/MappedRDD.scala
+++ b/core/src/main/scala/spark/rdd/MappedRDD.scala
@@ -1,15 +1,13 @@
package spark.rdd
-import spark.OneToOneDependency
import spark.RDD
import spark.Split
-import java.lang.ref.WeakReference
private[spark]
class MappedRDD[U: ClassManifest, T: ClassManifest](
- prev: WeakReference[RDD[T]],
+ prev: RDD[T],
f: T => U)
- extends RDD[U](prev.get) {
+ extends RDD[U](prev) {
override def splits = firstParent[T].splits
override def compute(split: Split) = firstParent[T].iterator(split).map(f)
diff --git a/core/src/main/scala/spark/rdd/PipedRDD.scala b/core/src/main/scala/spark/rdd/PipedRDD.scala
index d9293a9d1a..d2047375ea 100644
--- a/core/src/main/scala/spark/rdd/PipedRDD.scala
+++ b/core/src/main/scala/spark/rdd/PipedRDD.scala
@@ -8,11 +8,9 @@ import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
-import spark.OneToOneDependency
import spark.RDD
import spark.SparkEnv
import spark.Split
-import java.lang.ref.WeakReference
/**
@@ -20,16 +18,16 @@ import java.lang.ref.WeakReference
* (printing them one per line) and returns the output as a collection of strings.
*/
class PipedRDD[T: ClassManifest](
- prev: WeakReference[RDD[T]],
+ prev: RDD[T],
command: Seq[String],
envVars: Map[String, String])
- extends RDD[String](prev.get) {
+ extends RDD[String](prev) {
- def this(prev: WeakReference[RDD[T]], command: Seq[String]) = this(prev, command, Map())
+ def this(prev: RDD[T], command: Seq[String]) = this(prev, command, Map())
// Similar to Runtime.exec(), if we are given a single string, split it into words
// using a standard StringTokenizer (i.e. by spaces)
- def this(prev: WeakReference[RDD[T]], command: String) = this(prev, PipedRDD.tokenize(command))
+ def this(prev: RDD[T], command: String) = this(prev, PipedRDD.tokenize(command))
override def splits = firstParent[T].splits
diff --git a/core/src/main/scala/spark/rdd/SampledRDD.scala b/core/src/main/scala/spark/rdd/SampledRDD.scala
index f273f257f8..c622e14a66 100644
--- a/core/src/main/scala/spark/rdd/SampledRDD.scala
+++ b/core/src/main/scala/spark/rdd/SampledRDD.scala
@@ -7,7 +7,6 @@ import cern.jet.random.engine.DRand
import spark.RDD
import spark.OneToOneDependency
import spark.Split
-import java.lang.ref.WeakReference
private[spark]
class SampledRDDSplit(val prev: Split, val seed: Int) extends Split with Serializable {
@@ -15,14 +14,14 @@ class SampledRDDSplit(val prev: Split, val seed: Int) extends Split with Seriali
}
class SampledRDD[T: ClassManifest](
- prev: WeakReference[RDD[T]],
+ prev: RDD[T],
withReplacement: Boolean,
frac: Double,
seed: Int)
- extends RDD[T](prev.get) {
+ extends RDD[T](prev) {
@transient
- val splits_ = {
+ var splits_ : Array[Split] = {
val rg = new Random(seed)
firstParent[T].splits.map(x => new SampledRDDSplit(x, rg.nextInt))
}
@@ -51,4 +50,9 @@ class SampledRDD[T: ClassManifest](
firstParent[T].iterator(split.prev).filter(x => (rand.nextDouble <= frac))
}
}
+
+ override def changeDependencies(newRDD: RDD[_]) {
+ dependencies_ = List(new OneToOneDependency(newRDD.asInstanceOf[RDD[Any]]))
+ splits_ = newRDD.splits
+ }
}
diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
index 31774585f4..a9dd3f35ed 100644
--- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala
+++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
@@ -1,11 +1,7 @@
package spark.rdd
-import spark.Partitioner
-import spark.RDD
-import spark.ShuffleDependency
-import spark.SparkEnv
-import spark.Split
-import java.lang.ref.WeakReference
+import spark._
+import scala.Some
private[spark] class ShuffledRDDSplit(val idx: Int) extends Split {
override val index = idx
@@ -14,15 +10,15 @@ private[spark] class ShuffledRDDSplit(val idx: Int) extends Split {
/**
* The resulting RDD from a shuffle (e.g. repartitioning of data).
- * @param parent the parent RDD.
+ * @param prev the parent RDD.
* @param part the partitioner used to partition the RDD
* @tparam K the key class.
* @tparam V the value class.
*/
class ShuffledRDD[K, V](
- @transient prev: WeakReference[RDD[(K, V)]],
+ prev: RDD[(K, V)],
part: Partitioner)
- extends RDD[(K, V)](prev.get.context, List(new ShuffleDependency(prev.get, part))) {
+ extends RDD[(K, V)](prev.context, List(new ShuffleDependency(prev, part))) {
override val partitioner = Some(part)
@@ -37,7 +33,7 @@ class ShuffledRDD[K, V](
}
override def changeDependencies(newRDD: RDD[_]) {
- dependencies_ = Nil
+ dependencies_ = List(new OneToOneDependency(newRDD.asInstanceOf[RDD[Any]]))
splits_ = newRDD.splits
}
}
diff --git a/core/src/main/scala/spark/rdd/UnionRDD.scala b/core/src/main/scala/spark/rdd/UnionRDD.scala
index 30eb8483b6..a5948dd1f1 100644
--- a/core/src/main/scala/spark/rdd/UnionRDD.scala
+++ b/core/src/main/scala/spark/rdd/UnionRDD.scala
@@ -3,18 +3,28 @@ package spark.rdd
import scala.collection.mutable.ArrayBuffer
import spark._
-import java.lang.ref.WeakReference
+import java.io.{ObjectOutputStream, IOException}
private[spark] class UnionSplit[T: ClassManifest](
- idx: Int,
+ idx: Int,
rdd: RDD[T],
- split: Split)
+ splitIndex: Int,
+ var split: Split = null)
extends Split
with Serializable {
def iterator() = rdd.iterator(split)
def preferredLocations() = rdd.preferredLocations(split)
override val index: Int = idx
+
+ @throws(classOf[IOException])
+ private def writeObject(oos: ObjectOutputStream) {
+ rdd.synchronized {
+ // Update the reference to parent split at the time of task serialization
+ split = rdd.splits(splitIndex)
+ oos.defaultWriteObject()
+ }
+ }
}
class UnionRDD[T: ClassManifest](
@@ -27,7 +37,7 @@ class UnionRDD[T: ClassManifest](
val array = new Array[Split](rdds.map(_.splits.size).sum)
var pos = 0
for (rdd <- rdds; split <- rdd.splits) {
- array(pos) = new UnionSplit(pos, rdd, split)
+ array(pos) = new UnionSplit(pos, rdd, split.index)
pos += 1
}
array
@@ -52,7 +62,6 @@ class UnionRDD[T: ClassManifest](
override def preferredLocations(s: Split): Seq[String] =
s.asInstanceOf[UnionSplit[T]].preferredLocations()
-
override def changeDependencies(newRDD: RDD[_]) {
deps_ = List(new OneToOneDependency(newRDD))
splits_ = newRDD.splits
diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala
index 2cafef444c..51bd59e2b1 100644
--- a/core/src/test/scala/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/spark/CheckpointSuite.scala
@@ -2,17 +2,16 @@ package spark
import org.scalatest.{BeforeAndAfter, FunSuite}
import java.io.File
-import rdd.{BlockRDD, CoalescedRDD, MapPartitionsWithSplitRDD}
+import spark.rdd._
import spark.SparkContext._
import storage.StorageLevel
-import java.util.concurrent.Semaphore
-import collection.mutable.ArrayBuffer
class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging {
initLogging()
var sc: SparkContext = _
var checkpointDir: File = _
+ val partitioner = new HashPartitioner(2)
before {
checkpointDir = File.createTempFile("temp", "")
@@ -40,7 +39,6 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging {
parCollection.checkpoint()
assert(parCollection.dependencies === Nil)
val result = parCollection.collect()
- sleep(parCollection) // slightly extra time as loading classes for the first can take some time
assert(sc.objectFile[Int](parCollection.getCheckpointFile.get).collect() === result)
assert(parCollection.dependencies != Nil)
assert(parCollection.collect() === result)
@@ -53,7 +51,6 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging {
val blockRDD = new BlockRDD[String](sc, Array(blockId))
blockRDD.checkpoint()
val result = blockRDD.collect()
- sleep(blockRDD)
assert(sc.objectFile[String](blockRDD.getCheckpointFile.get).collect() === result)
assert(blockRDD.dependencies != Nil)
assert(blockRDD.collect() === result)
@@ -68,79 +65,247 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging {
testCheckpointing(_.mapPartitions(_.map(_.toString)))
testCheckpointing(r => new MapPartitionsWithSplitRDD(r,
(i: Int, iter: Iterator[Int]) => iter.map(_.toString) ))
- testCheckpointing(_.map(x => (x % 2, 1)).reduceByKey(_ + _).mapValues(_.toString), 1000)
- testCheckpointing(_.map(x => (x % 2, 1)).reduceByKey(_ + _).flatMapValues(x => 1 to x), 1000)
+ testCheckpointing(_.map(x => (x % 2, 1)).reduceByKey(_ + _).mapValues(_.toString))
+ testCheckpointing(_.map(x => (x % 2, 1)).reduceByKey(_ + _).flatMapValues(x => 1 to x))
testCheckpointing(_.pipe(Seq("cat")))
}
test("ShuffledRDD") {
- testCheckpointing(_.map(x => (x % 2, 1)).reduceByKey(_ + _))
+ // Creating ShuffledRDD directly as PairRDDFunctions.combineByKey produces a MapPartitionedRDD
+ testCheckpointing(rdd => {
+ new ShuffledRDD(rdd.map(x => (x % 2, 1)), partitioner)
+ })
}
test("UnionRDD") {
- testCheckpointing(_.union(sc.makeRDD(5 to 6, 4)))
+ def otherRDD = sc.makeRDD(1 to 10, 4)
+ testCheckpointing(_.union(otherRDD), false, true)
+ testParentCheckpointing(_.union(otherRDD), false, true)
}
test("CartesianRDD") {
- testCheckpointing(_.cartesian(sc.makeRDD(5 to 6, 4)), 1000)
+ def otherRDD = sc.makeRDD(1 to 10, 4)
+ testCheckpointing(_.cartesian(otherRDD))
+ testParentCheckpointing(_.cartesian(otherRDD), true, false)
}
test("CoalescedRDD") {
testCheckpointing(new CoalescedRDD(_, 2))
+
+ // Test whether size of CoalescedRDD reduce in size after parent RDD is checkpointed
+ // Current implementation of CoalescedRDDSplit has transient reference to parent RDD,
+ // so does not serialize the RDD (not need to check its size).
+ testParentCheckpointing(new CoalescedRDD(_, 2), true, false)
+
+ // Test that the CoalescedRDDSplit updates parent splits (CoalescedRDDSplit.parents) after
+ // the parent RDD has been checkpointed and parent splits have been changed to HadoopSplits.
+ // Note that this test is very specific to the current implementation of CoalescedRDDSplits
+ val ones = sc.makeRDD(1 to 100, 10).map(x => x)
+ ones.checkpoint // checkpoint that MappedRDD
+ val coalesced = new CoalescedRDD(ones, 2)
+ val splitBeforeCheckpoint =
+ serializeDeserialize(coalesced.splits.head.asInstanceOf[CoalescedRDDSplit])
+ coalesced.count() // do the checkpointing
+ val splitAfterCheckpoint =
+ serializeDeserialize(coalesced.splits.head.asInstanceOf[CoalescedRDDSplit])
+ assert(
+ splitAfterCheckpoint.parents.head != splitBeforeCheckpoint.parents.head,
+ "CoalescedRDDSplit.parents not updated after parent RDD checkpointed"
+ )
}
test("CoGroupedRDD") {
- val rdd2 = sc.makeRDD(5 to 6, 4).map(x => (x % 2, 1))
- testCheckpointing(rdd1 => rdd1.map(x => (x % 2, 1)).cogroup(rdd2))
- testCheckpointing(rdd1 => rdd1.map(x => (x % 2, x)).join(rdd2))
+ // Test serialized size
+ // RDD with long lineage of one-to-one dependencies through cogroup transformations
+ val longLineageRDD1 = generateLongLineageRDDForCoGroupedRDD()
+ testCheckpointing(rdd1 => {
+ CheckpointSuite.cogroup(longLineageRDD1, rdd1.map(x => (x % 2, 1)), partitioner)
+ }, false, true)
- // Special test to make sure that the CoGroupSplit of CoGroupedRDD do not
- // hold on to the splits of its parent RDDs, as the splits of parent RDDs
- // may change while checkpointing. Rather the splits of parent RDDs must
- // be fetched at the time of serialization to ensure the latest splits to
- // be sent along with the task.
+ val longLineageRDD2 = generateLongLineageRDDForCoGroupedRDD()
+ testParentCheckpointing(rdd1 => {
+ CheckpointSuite.cogroup(longLineageRDD2, sc.makeRDD(1 to 2, 2).map(x => (x % 2, 1)), partitioner)
+ }, false, true)
+ }
- val add = (x: (Seq[Int], Seq[Int])) => (x._1 ++ x._2).reduce(_ + _)
+ /**
+ * Test checkpointing of the final RDD generated by the given operation. By default,
+ * this method tests whether the size of serialized RDD has reduced after checkpointing or not.
+ * It can also test whether the size of serialized RDD splits has reduced after checkpointing or
+ * not, but this is not done by default as usually the splits do not refer to any RDD and
+ * therefore never store the lineage.
+ */
+ def testCheckpointing[U: ClassManifest](
+ op: (RDD[Int]) => RDD[U],
+ testRDDSize: Boolean = true,
+ testRDDSplitSize: Boolean = false
+ ) {
+ // Generate the final RDD using given RDD operation
+ val baseRDD = generateLongLineageRDD
+ val operatedRDD = op(baseRDD)
+ val parentRDD = operatedRDD.dependencies.headOption.orNull
+ val rddType = operatedRDD.getClass.getSimpleName
- val ones = sc.parallelize(1 to 100, 1).map(x => (x,1))
- val reduced = ones.reduceByKey(_ + _)
- val seqOfCogrouped = new ArrayBuffer[RDD[(Int, Int)]]()
- seqOfCogrouped += reduced.cogroup(ones).mapValues[Int](add)
- for(i <- 1 to 10) {
- seqOfCogrouped += seqOfCogrouped.last.cogroup(ones).mapValues(add)
- }
- val finalCogrouped = seqOfCogrouped.last
- val intermediateCogrouped = seqOfCogrouped(5)
-
- val bytesBeforeCheckpoint = Utils.serialize(finalCogrouped.splits)
- intermediateCogrouped.checkpoint()
- finalCogrouped.count()
- sleep(intermediateCogrouped)
- val bytesAfterCheckpoint = Utils.serialize(finalCogrouped.splits)
- println("Before = " + bytesBeforeCheckpoint.size + ", after = " + bytesAfterCheckpoint.size)
- assert(bytesAfterCheckpoint.size < bytesBeforeCheckpoint.size,
- "CoGroupedSplits still holds on to the splits of its parent RDDs")
- }
-
- def testCheckpointing[U: ClassManifest](op: (RDD[Int]) => RDD[U], sleepTime: Long = 500) {
- val parCollection = sc.makeRDD(1 to 4, 4)
- val operatedRDD = op(parCollection)
+ // Find serialized sizes before and after the checkpoint
+ val (rddSizeBeforeCheckpoint, splitSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
operatedRDD.checkpoint()
- val parentRDD = operatedRDD.dependencies.head.rdd
val result = operatedRDD.collect()
- sleep(operatedRDD)
- //println(parentRDD + ", " + operatedRDD.dependencies.head.rdd )
+ val (rddSizeAfterCheckpoint, splitSizeAfterCheckpoint) = getSerializedSizes(operatedRDD)
+
+ // Test whether the checkpoint file has been created
assert(sc.objectFile[U](operatedRDD.getCheckpointFile.get).collect() === result)
+
+ // Test whether dependencies have been changed from its earlier parent RDD
assert(operatedRDD.dependencies.head.rdd != parentRDD)
+
+ // Test whether the splits have been changed to the new Hadoop splits
+ assert(operatedRDD.splits.toList === operatedRDD.checkpointData.cpRDDSplits.toList)
+
+ // Test whether the data in the checkpointed RDD is same as original
assert(operatedRDD.collect() === result)
+
+ // Test whether serialized size of the RDD has reduced. If the RDD
+ // does not have any dependency to another RDD (e.g., ParallelCollection,
+ // ShuffleRDD with ShuffleDependency), it may not reduce in size after checkpointing.
+ if (testRDDSize) {
+ println("Size of " + rddType +
+ "[" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]")
+ assert(
+ rddSizeAfterCheckpoint < rddSizeBeforeCheckpoint,
+ "Size of " + rddType + " did not reduce after checkpointing " +
+ "[" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]"
+ )
+ }
+
+ // Test whether serialized size of the splits has reduced. If the splits
+ // do not have any non-transient reference to another RDD or another RDD's splits, it
+ // does not refer to a lineage and therefore may not reduce in size after checkpointing.
+ // However, if the original splits before checkpointing do refer to a parent RDD, the splits
+ // must be forgotten after checkpointing (to remove all reference to parent RDDs) and
+ // replaced with the HadoopSplits of the checkpointed RDD.
+ if (testRDDSplitSize) {
+ println("Size of " + rddType + " splits "
+ + "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]")
+ assert(
+ splitSizeAfterCheckpoint < splitSizeBeforeCheckpoint,
+ "Size of " + rddType + " splits did not reduce after checkpointing " +
+ "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]"
+ )
+ }
}
- def sleep(rdd: RDD[_]) {
- val startTime = System.currentTimeMillis()
- val maxWaitTime = 5000
- while(rdd.isCheckpointed == false && System.currentTimeMillis() < startTime + maxWaitTime) {
- Thread.sleep(50)
+ /**
+ * Test whether checkpointing of the parent of the generated RDD also
+ * truncates the lineage or not. Some RDDs like CoGroupedRDD hold on to its parent
+ * RDDs splits. So even if the parent RDD is checkpointed and its splits changed,
+ * this RDD will remember the splits and therefore potentially the whole lineage.
+ */
+ def testParentCheckpointing[U: ClassManifest](
+ op: (RDD[Int]) => RDD[U],
+ testRDDSize: Boolean,
+ testRDDSplitSize: Boolean
+ ) {
+ // Generate the final RDD using given RDD operation
+ val baseRDD = generateLongLineageRDD
+ val operatedRDD = op(baseRDD)
+ val parentRDD = operatedRDD.dependencies.head.rdd
+ val rddType = operatedRDD.getClass.getSimpleName
+ val parentRDDType = parentRDD.getClass.getSimpleName
+
+ // Find serialized sizes before and after the checkpoint
+ val (rddSizeBeforeCheckpoint, splitSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
+ parentRDD.checkpoint() // checkpoint the parent RDD, not the generated one
+ val result = operatedRDD.collect()
+ val (rddSizeAfterCheckpoint, splitSizeAfterCheckpoint) = getSerializedSizes(operatedRDD)
+
+ // Test whether the data in the checkpointed RDD is same as original
+ assert(operatedRDD.collect() === result)
+
+ // Test whether serialized size of the RDD has reduced because of its parent being
+ // checkpointed. If this RDD or its parent RDD do not have any dependency
+ // to another RDD (e.g., ParallelCollection, ShuffleRDD with ShuffleDependency), it may
+ // not reduce in size after checkpointing.
+ if (testRDDSize) {
+ assert(
+ rddSizeAfterCheckpoint < rddSizeBeforeCheckpoint,
+ "Size of " + rddType + " did not reduce after parent checkpointing parent " + parentRDDType +
+ "[" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]"
+ )
+ }
+
+ // Test whether serialized size of the splits has reduced because of its parent being
+ // checkpointed. If the splits do not have any non-transient reference to another RDD
+ // or another RDD's splits, it does not refer to a lineage and therefore may not reduce
+ // in size after checkpointing. However, if the splits do refer to the *splits* of a parent
+ // RDD, then these splits must update reference to the parent RDD splits as the parent RDD's
+ // splits must have changed after checkpointing.
+ if (testRDDSplitSize) {
+ assert(
+ splitSizeAfterCheckpoint < splitSizeBeforeCheckpoint,
+ "Size of " + rddType + " splits did not reduce after checkpointing parent " + parentRDDType +
+ "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]"
+ )
}
- assert(rdd.isCheckpointed === true, "Waiting for checkpoint to complete took more than " + maxWaitTime + " ms")
+
+ }
+
+ /**
+ * Generate an RDD with a long lineage of one-to-one dependencies.
+ */
+ def generateLongLineageRDD(): RDD[Int] = {
+ var rdd = sc.makeRDD(1 to 100, 4)
+ for (i <- 1 to 20) {
+ rdd = rdd.map(x => x)
+ }
+ rdd
+ }
+
+ /**
+ * Generate an RDD with a long lineage specifically for CoGroupedRDD.
+ * A CoGroupedRDD can have a long lineage only one of its parents have a long lineage
+ * and narrow dependency with this RDD. This method generate such an RDD by a sequence
+ * of cogroups and mapValues which creates a long lineage of narrow dependencies.
+ */
+ def generateLongLineageRDDForCoGroupedRDD() = {
+ val add = (x: (Seq[Int], Seq[Int])) => (x._1 ++ x._2).reduce(_ + _)
+
+ def ones: RDD[(Int, Int)] = sc.makeRDD(1 to 2, 2).map(x => (x % 2, 1)).reduceByKey(partitioner, _ + _)
+
+ var cogrouped: RDD[(Int, (Seq[Int], Seq[Int]))] = ones.cogroup(ones)
+ for(i <- 1 to 10) {
+ cogrouped = cogrouped.mapValues(add).cogroup(ones)
+ }
+ cogrouped.mapValues(add)
+ }
+
+ /**
+ * Get serialized sizes of the RDD and its splits
+ */
+ def getSerializedSizes(rdd: RDD[_]): (Int, Int) = {
+ (Utils.serialize(rdd).size, Utils.serialize(rdd.splits).size)
+ }
+
+ /**
+ * Serialize and deserialize an object. This is useful to verify the objects
+ * contents after deserialization (e.g., the contents of an RDD split after
+ * it is sent to a slave along with a task)
+ */
+ def serializeDeserialize[T](obj: T): T = {
+ val bytes = Utils.serialize(obj)
+ Utils.deserialize[T](bytes)
}
}
+
+
+object CheckpointSuite {
+ // This is a custom cogroup function that does not use mapValues like
+ // the PairRDDFunctions.cogroup()
+ def cogroup[K, V](first: RDD[(K, V)], second: RDD[(K, V)], part: Partitioner) = {
+ println("First = " + first + ", second = " + second)
+ new CoGroupedRDD[K](
+ Seq(first.asInstanceOf[RDD[(_, _)]], second.asInstanceOf[RDD[(_, _)]]),
+ part
+ ).asInstanceOf[RDD[(K, Seq[Seq[V]])]]
+ }
+
+} \ No newline at end of file