aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-12-10 23:36:37 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2012-12-10 23:36:37 -0800
commit746afc2e6513d5f32f261ec0dbf2823f78a5e960 (patch)
tree84c3bc10fcb8931a74a4c7be594b0cc9764d09ab /core
parent1f3a75ae9e518c003d84fa38a54583ecd841ffdc (diff)
downloadspark-746afc2e6513d5f32f261ec0dbf2823f78a5e960.tar.gz
spark-746afc2e6513d5f32f261ec0dbf2823f78a5e960.tar.bz2
spark-746afc2e6513d5f32f261ec0dbf2823f78a5e960.zip
Bunch of bug fixes related to checkpointing in RDDs. RDDCheckpointData object is used to lock all serialization and dependency changes for checkpointing. ResultTask converted to Externalizable and serialized RDD is cached like ShuffleMapTask.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/ParallelCollection.scala10
-rw-r--r--core/src/main/scala/spark/RDD.scala10
-rw-r--r--core/src/main/scala/spark/RDDCheckpointData.scala76
-rw-r--r--core/src/main/scala/spark/SparkContext.scala5
-rw-r--r--core/src/main/scala/spark/rdd/BlockRDD.scala9
-rw-r--r--core/src/main/scala/spark/rdd/CartesianRDD.scala21
-rw-r--r--core/src/main/scala/spark/rdd/CoGroupedRDD.scala18
-rw-r--r--core/src/main/scala/spark/rdd/CoalescedRDD.scala8
-rw-r--r--core/src/main/scala/spark/rdd/HadoopRDD.scala4
-rw-r--r--core/src/main/scala/spark/rdd/UnionRDD.scala15
-rw-r--r--core/src/main/scala/spark/scheduler/ResultTask.scala95
-rw-r--r--core/src/main/scala/spark/scheduler/ShuffleMapTask.scala21
-rw-r--r--core/src/test/scala/spark/CheckpointSuite.scala187
13 files changed, 389 insertions, 90 deletions
diff --git a/core/src/main/scala/spark/ParallelCollection.scala b/core/src/main/scala/spark/ParallelCollection.scala
index 9725017b61..9d12af6912 100644
--- a/core/src/main/scala/spark/ParallelCollection.scala
+++ b/core/src/main/scala/spark/ParallelCollection.scala
@@ -28,10 +28,11 @@ private[spark] class ParallelCollection[T: ClassManifest](
extends RDD[T](sc, Nil) {
// TODO: Right now, each split sends along its full data, even if later down the RDD chain it gets
// cached. It might be worthwhile to write the data to a file in the DFS and read it in the split
- // instead. UPDATE: With the new changes to enable checkpointing, this an be done.
+ // instead.
+ // UPDATE: A parallel collection can be checkpointed to HDFS, which achieves this goal.
@transient
- val splits_ = {
+ var splits_ : Array[Split] = {
val slices = ParallelCollection.slice(data, numSlices).toArray
slices.indices.map(i => new ParallelCollectionSplit(id, i, slices(i))).toArray
}
@@ -41,6 +42,11 @@ private[spark] class ParallelCollection[T: ClassManifest](
override def compute(s: Split) = s.asInstanceOf[ParallelCollectionSplit[T]].iterator
override def preferredLocations(s: Split): Seq[String] = Nil
+
+ override def changeDependencies(newRDD: RDD[_]) {
+ dependencies_ = List(new OneToOneDependency(newRDD.asInstanceOf[RDD[Any]]))
+ splits_ = newRDD.splits
+ }
}
private object ParallelCollection {
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index e9bd131e61..efa03d5185 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -163,7 +163,7 @@ abstract class RDD[T: ClassManifest](
final def iterator(split: Split): Iterator[T] = {
if (isCheckpointed) {
// ASSUMPTION: Checkpoint Hadoop RDD will have same number of splits as original
- checkpointData.iterator(split.index)
+ checkpointData.iterator(split)
} else if (storageLevel != StorageLevel.NONE) {
SparkEnv.get.cacheTracker.getOrCompute[T](this, split, storageLevel)
} else {
@@ -556,16 +556,12 @@ abstract class RDD[T: ClassManifest](
@throws(classOf[IOException])
private def writeObject(oos: ObjectOutputStream) {
- synchronized {
- oos.defaultWriteObject()
- }
+ oos.defaultWriteObject()
}
@throws(classOf[IOException])
private def readObject(ois: ObjectInputStream) {
- synchronized {
- ois.defaultReadObject()
- }
+ ois.defaultReadObject()
}
}
diff --git a/core/src/main/scala/spark/RDDCheckpointData.scala b/core/src/main/scala/spark/RDDCheckpointData.scala
index eb4482acee..ff2ed4cdfc 100644
--- a/core/src/main/scala/spark/RDDCheckpointData.scala
+++ b/core/src/main/scala/spark/RDDCheckpointData.scala
@@ -1,12 +1,20 @@
package spark
import org.apache.hadoop.fs.Path
+import rdd.CoalescedRDD
+import scheduler.{ResultTask, ShuffleMapTask}
-
+/**
+ * This class contains all the information of the regarding RDD checkpointing.
+ */
private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T])
-extends Serializable {
+extends Logging with Serializable {
+ /**
+ * This class manages the state transition of an RDD through checkpointing
+ * [ Not checkpointed --> marked for checkpointing --> checkpointing in progress --> checkpointed ]
+ */
class CheckpointState extends Serializable {
var state = 0
@@ -20,24 +28,30 @@ extends Serializable {
}
val cpState = new CheckpointState()
- var cpFile: Option[String] = None
- var cpRDD: Option[RDD[T]] = None
- var cpRDDSplits: Seq[Split] = Nil
+ @transient var cpFile: Option[String] = None
+ @transient var cpRDD: Option[RDD[T]] = None
+ @transient var cpRDDSplits: Seq[Split] = Nil
+ // Mark the RDD for checkpointing
def markForCheckpoint() = {
- rdd.synchronized { cpState.mark() }
+ RDDCheckpointData.synchronized { cpState.mark() }
}
+ // Is the RDD already checkpointed
def isCheckpointed() = {
- rdd.synchronized { cpState.isCheckpointed }
+ RDDCheckpointData.synchronized { cpState.isCheckpointed }
}
+ // Get the file to which this RDD was checkpointed to as a Option
def getCheckpointFile() = {
- rdd.synchronized { cpFile }
+ RDDCheckpointData.synchronized { cpFile }
}
+ // Do the checkpointing of the RDD. Called after the first job using that RDD is over.
def doCheckpoint() {
- rdd.synchronized {
+ // If it is marked for checkpointing AND checkpointing is not already in progress,
+ // then set it to be in progress, else return
+ RDDCheckpointData.synchronized {
if (cpState.isMarked && !cpState.isInProgress) {
cpState.start()
} else {
@@ -45,24 +59,56 @@ extends Serializable {
}
}
+ // Save to file, and reload it as an RDD
val file = new Path(rdd.context.checkpointDir, "rdd-" + rdd.id).toString
rdd.saveAsObjectFile(file)
- val newRDD = rdd.context.objectFile[T](file, rdd.splits.size)
- rdd.synchronized {
- rdd.changeDependencies(newRDD)
+ val newRDD = {
+ val hadoopRDD = rdd.context.objectFile[T](file, rdd.splits.size)
+
+ val oldSplits = rdd.splits.size
+ val newSplits = hadoopRDD.splits.size
+
+ logDebug("RDD splits = " + oldSplits + " --> " + newSplits)
+ if (newSplits < oldSplits) {
+ throw new Exception("# splits after checkpointing is less than before " +
+ "[" + oldSplits + " --> " + newSplits)
+ } else if (newSplits > oldSplits) {
+ new CoalescedRDD(hadoopRDD, rdd.splits.size)
+ } else {
+ hadoopRDD
+ }
+ }
+ logDebug("New RDD has " + newRDD.splits.size + " splits")
+
+ // Change the dependencies and splits of the RDD
+ RDDCheckpointData.synchronized {
cpFile = Some(file)
cpRDD = Some(newRDD)
cpRDDSplits = newRDD.splits
+ rdd.changeDependencies(newRDD)
cpState.finish()
+ RDDCheckpointData.checkpointCompleted()
+ logInfo("Done checkpointing RDD " + rdd.id + ", new parent is RDD " + newRDD.id)
}
}
+ // Get preferred location of a split after checkpointing
def preferredLocations(split: Split) = {
- cpRDD.get.preferredLocations(split)
+ RDDCheckpointData.synchronized {
+ cpRDD.get.preferredLocations(split)
+ }
}
- def iterator(splitIndex: Int): Iterator[T] = {
- cpRDD.get.iterator(cpRDDSplits(splitIndex))
+ // Get iterator. This is called at the worker nodes.
+ def iterator(split: Split): Iterator[T] = {
+ rdd.firstParent[T].iterator(split)
+ }
+}
+
+private[spark] object RDDCheckpointData {
+ def checkpointCompleted() {
+ ShuffleMapTask.clearCache()
+ ResultTask.clearCache()
}
}
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index d7b46bee38..654b1c2eb7 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -40,9 +40,7 @@ import spark.partial.PartialResult
import spark.rdd.HadoopRDD
import spark.rdd.NewHadoopRDD
import spark.rdd.UnionRDD
-import spark.scheduler.ShuffleMapTask
-import spark.scheduler.DAGScheduler
-import spark.scheduler.TaskScheduler
+import scheduler.{ResultTask, ShuffleMapTask, DAGScheduler, TaskScheduler}
import spark.scheduler.local.LocalScheduler
import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler}
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
@@ -486,6 +484,7 @@ class SparkContext(
clearJars()
SparkEnv.set(null)
ShuffleMapTask.clearCache()
+ ResultTask.clearCache()
logInfo("Successfully stopped SparkContext")
}
diff --git a/core/src/main/scala/spark/rdd/BlockRDD.scala b/core/src/main/scala/spark/rdd/BlockRDD.scala
index 590f9eb738..0c8cdd10dd 100644
--- a/core/src/main/scala/spark/rdd/BlockRDD.scala
+++ b/core/src/main/scala/spark/rdd/BlockRDD.scala
@@ -2,7 +2,7 @@ package spark.rdd
import scala.collection.mutable.HashMap
-import spark.Dependency
+import spark.OneToOneDependency
import spark.RDD
import spark.SparkContext
import spark.SparkEnv
@@ -17,7 +17,7 @@ class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[St
extends RDD[T](sc, Nil) {
@transient
- val splits_ = (0 until blockIds.size).map(i => {
+ var splits_ : Array[Split] = (0 until blockIds.size).map(i => {
new BlockRDDSplit(blockIds(i), i).asInstanceOf[Split]
}).toArray
@@ -43,5 +43,10 @@ class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[St
override def preferredLocations(split: Split) =
locations_(split.asInstanceOf[BlockRDDSplit].blockId)
+
+ override def changeDependencies(newRDD: RDD[_]) {
+ dependencies_ = List(new OneToOneDependency(newRDD.asInstanceOf[RDD[Any]]))
+ splits_ = newRDD.splits
+ }
}
diff --git a/core/src/main/scala/spark/rdd/CartesianRDD.scala b/core/src/main/scala/spark/rdd/CartesianRDD.scala
index 1d753a5168..9975e79b08 100644
--- a/core/src/main/scala/spark/rdd/CartesianRDD.scala
+++ b/core/src/main/scala/spark/rdd/CartesianRDD.scala
@@ -1,10 +1,27 @@
package spark.rdd
import spark._
+import java.io.{ObjectOutputStream, IOException}
private[spark]
-class CartesianSplit(idx: Int, val s1: Split, val s2: Split) extends Split with Serializable {
+class CartesianSplit(
+ idx: Int,
+ @transient rdd1: RDD[_],
+ @transient rdd2: RDD[_],
+ s1Index: Int,
+ s2Index: Int
+ ) extends Split {
+ var s1 = rdd1.splits(s1Index)
+ var s2 = rdd2.splits(s2Index)
override val index: Int = idx
+
+ @throws(classOf[IOException])
+ private def writeObject(oos: ObjectOutputStream) {
+ // Update the reference to parent split at the time of task serialization
+ s1 = rdd1.splits(s1Index)
+ s2 = rdd2.splits(s2Index)
+ oos.defaultWriteObject()
+ }
}
private[spark]
@@ -23,7 +40,7 @@ class CartesianRDD[T: ClassManifest, U:ClassManifest](
val array = new Array[Split](rdd1.splits.size * rdd2.splits.size)
for (s1 <- rdd1.splits; s2 <- rdd2.splits) {
val idx = s1.index * numSplitsInRdd2 + s2.index
- array(idx) = new CartesianSplit(idx, s1, s2)
+ array(idx) = new CartesianSplit(idx, rdd1, rdd2, s1.index, s2.index)
}
array
}
diff --git a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
index 57d472666b..e4e70b13ba 100644
--- a/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoGroupedRDD.scala
@@ -20,11 +20,9 @@ private[spark] case class NarrowCoGroupSplitDep(rdd: RDD[_], splitIndex: Int, va
@throws(classOf[IOException])
private def writeObject(oos: ObjectOutputStream) {
- rdd.synchronized {
- // Update the reference to parent split at the time of task serialization
- split = rdd.splits(splitIndex)
- oos.defaultWriteObject()
- }
+ // Update the reference to parent split at the time of task serialization
+ split = rdd.splits(splitIndex)
+ oos.defaultWriteObject()
}
}
private[spark] case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep
@@ -42,7 +40,8 @@ private[spark] class CoGroupAggregator
{ (b1, b2) => b1 ++ b2 })
with Serializable
-class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
+class
+CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
extends RDD[(K, Seq[Seq[_]])](rdds.head.context, Nil) with Logging {
val aggr = new CoGroupAggregator
@@ -63,7 +62,9 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
deps.toList
}
- override def dependencies = deps_
+ // Pre-checkpoint dependencies deps_ should be transient (deps_)
+ // but post-checkpoint dependencies must not be transient (dependencies_)
+ override def dependencies = if (isCheckpointed) dependencies_ else deps_
@transient
var splits_ : Array[Split] = {
@@ -114,7 +115,8 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
}
override def changeDependencies(newRDD: RDD[_]) {
- deps_ = List(new OneToOneDependency(newRDD.asInstanceOf[RDD[Any]]))
+ deps_ = null
+ dependencies_ = List(new OneToOneDependency(newRDD.asInstanceOf[RDD[Any]]))
splits_ = newRDD.splits
rdds = null
}
diff --git a/core/src/main/scala/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
index 0b4499e2eb..088958942e 100644
--- a/core/src/main/scala/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/spark/rdd/CoalescedRDD.scala
@@ -12,11 +12,9 @@ private[spark] case class CoalescedRDDSplit(
@throws(classOf[IOException])
private def writeObject(oos: ObjectOutputStream) {
- rdd.synchronized {
- // Update the reference to parent split at the time of task serialization
- parents = parentsIndices.map(rdd.splits(_))
- oos.defaultWriteObject()
- }
+ // Update the reference to parent split at the time of task serialization
+ parents = parentsIndices.map(rdd.splits(_))
+ oos.defaultWriteObject()
}
}
diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala
index a12531ea89..af54f23ebc 100644
--- a/core/src/main/scala/spark/rdd/HadoopRDD.scala
+++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala
@@ -115,4 +115,8 @@ class HadoopRDD[K, V](
val hadoopSplit = split.asInstanceOf[HadoopSplit]
hadoopSplit.inputSplit.value.getLocations.filter(_ != "localhost")
}
+
+ override def checkpoint() {
+ // Do nothing. Hadoop RDD cannot be checkpointed.
+ }
}
diff --git a/core/src/main/scala/spark/rdd/UnionRDD.scala b/core/src/main/scala/spark/rdd/UnionRDD.scala
index a5948dd1f1..808729f18d 100644
--- a/core/src/main/scala/spark/rdd/UnionRDD.scala
+++ b/core/src/main/scala/spark/rdd/UnionRDD.scala
@@ -19,11 +19,9 @@ private[spark] class UnionSplit[T: ClassManifest](
@throws(classOf[IOException])
private def writeObject(oos: ObjectOutputStream) {
- rdd.synchronized {
- // Update the reference to parent split at the time of task serialization
- split = rdd.splits(splitIndex)
- oos.defaultWriteObject()
- }
+ // Update the reference to parent split at the time of task serialization
+ split = rdd.splits(splitIndex)
+ oos.defaultWriteObject()
}
}
@@ -55,7 +53,9 @@ class UnionRDD[T: ClassManifest](
deps.toList
}
- override def dependencies = deps_
+ // Pre-checkpoint dependencies deps_ should be transient (deps_)
+ // but post-checkpoint dependencies must not be transient (dependencies_)
+ override def dependencies = if (isCheckpointed) dependencies_ else deps_
override def compute(s: Split): Iterator[T] = s.asInstanceOf[UnionSplit[T]].iterator()
@@ -63,7 +63,8 @@ class UnionRDD[T: ClassManifest](
s.asInstanceOf[UnionSplit[T]].preferredLocations()
override def changeDependencies(newRDD: RDD[_]) {
- deps_ = List(new OneToOneDependency(newRDD))
+ deps_ = null
+ dependencies_ = List(new OneToOneDependency(newRDD))
splits_ = newRDD.splits
rdds = null
}
diff --git a/core/src/main/scala/spark/scheduler/ResultTask.scala b/core/src/main/scala/spark/scheduler/ResultTask.scala
index 2ebd4075a2..bcb9e4956b 100644
--- a/core/src/main/scala/spark/scheduler/ResultTask.scala
+++ b/core/src/main/scala/spark/scheduler/ResultTask.scala
@@ -1,17 +1,73 @@
package spark.scheduler
import spark._
+import java.io._
+import util.{MetadataCleaner, TimeStampedHashMap}
+import java.util.zip.{GZIPInputStream, GZIPOutputStream}
+
+private[spark] object ResultTask {
+
+ // A simple map between the stage id to the serialized byte array of a task.
+ // Served as a cache for task serialization because serialization can be
+ // expensive on the master node if it needs to launch thousands of tasks.
+ val serializedInfoCache = new TimeStampedHashMap[Int, Array[Byte]]
+
+ val metadataCleaner = new MetadataCleaner("ResultTask", serializedInfoCache.cleanup)
+
+ def serializeInfo(stageId: Int, rdd: RDD[_], func: (TaskContext, Iterator[_]) => _): Array[Byte] = {
+ synchronized {
+ val old = serializedInfoCache.get(stageId).orNull
+ if (old != null) {
+ return old
+ } else {
+ val out = new ByteArrayOutputStream
+ val ser = SparkEnv.get.closureSerializer.newInstance
+ val objOut = ser.serializeStream(new GZIPOutputStream(out))
+ objOut.writeObject(rdd)
+ objOut.writeObject(func)
+ objOut.close()
+ val bytes = out.toByteArray
+ serializedInfoCache.put(stageId, bytes)
+ return bytes
+ }
+ }
+ }
+
+ def deserializeInfo(stageId: Int, bytes: Array[Byte]): (RDD[_], (TaskContext, Iterator[_]) => _) = {
+ synchronized {
+ val loader = Thread.currentThread.getContextClassLoader
+ val in = new GZIPInputStream(new ByteArrayInputStream(bytes))
+ val ser = SparkEnv.get.closureSerializer.newInstance
+ val objIn = ser.deserializeStream(in)
+ val rdd = objIn.readObject().asInstanceOf[RDD[_]]
+ val func = objIn.readObject().asInstanceOf[(TaskContext, Iterator[_]) => _]
+ return (rdd, func)
+ }
+ }
+
+ def clearCache() {
+ synchronized {
+ serializedInfoCache.clear()
+ }
+ }
+}
+
private[spark] class ResultTask[T, U](
stageId: Int,
- rdd: RDD[T],
- func: (TaskContext, Iterator[T]) => U,
- val partition: Int,
+ var rdd: RDD[T],
+ var func: (TaskContext, Iterator[T]) => U,
+ var partition: Int,
@transient locs: Seq[String],
val outputId: Int)
- extends Task[U](stageId) {
-
- val split = rdd.splits(partition)
+ extends Task[U](stageId) with Externalizable {
+
+ def this() = this(0, null, null, 0, null, 0)
+ var split = if (rdd == null) {
+ null
+ } else {
+ rdd.splits(partition)
+ }
override def run(attemptId: Long): U = {
val context = new TaskContext(stageId, partition, attemptId)
@@ -21,4 +77,31 @@ private[spark] class ResultTask[T, U](
override def preferredLocations: Seq[String] = locs
override def toString = "ResultTask(" + stageId + ", " + partition + ")"
+
+ override def writeExternal(out: ObjectOutput) {
+ RDDCheckpointData.synchronized {
+ split = rdd.splits(partition)
+ out.writeInt(stageId)
+ val bytes = ResultTask.serializeInfo(
+ stageId, rdd, func.asInstanceOf[(TaskContext, Iterator[_]) => _])
+ out.writeInt(bytes.length)
+ out.write(bytes)
+ out.writeInt(partition)
+ out.writeInt(outputId)
+ out.writeObject(split)
+ }
+ }
+
+ override def readExternal(in: ObjectInput) {
+ val stageId = in.readInt()
+ val numBytes = in.readInt()
+ val bytes = new Array[Byte](numBytes)
+ in.readFully(bytes)
+ val (rdd_, func_) = ResultTask.deserializeInfo(stageId, bytes)
+ rdd = rdd_.asInstanceOf[RDD[T]]
+ func = func_.asInstanceOf[(TaskContext, Iterator[T]) => U]
+ partition = in.readInt()
+ val outputId = in.readInt()
+ split = in.readObject().asInstanceOf[Split]
+ }
}
diff --git a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
index 683f5ebec3..5d28c40778 100644
--- a/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/spark/scheduler/ShuffleMapTask.scala
@@ -84,19 +84,22 @@ private[spark] class ShuffleMapTask(
def this() = this(0, null, null, 0, null)
var split = if (rdd == null) {
- null
- } else {
+ null
+ } else {
rdd.splits(partition)
}
override def writeExternal(out: ObjectOutput) {
- out.writeInt(stageId)
- val bytes = ShuffleMapTask.serializeInfo(stageId, rdd, dep)
- out.writeInt(bytes.length)
- out.write(bytes)
- out.writeInt(partition)
- out.writeLong(generation)
- out.writeObject(split)
+ RDDCheckpointData.synchronized {
+ split = rdd.splits(partition)
+ out.writeInt(stageId)
+ val bytes = ShuffleMapTask.serializeInfo(stageId, rdd, dep)
+ out.writeInt(bytes.length)
+ out.write(bytes)
+ out.writeInt(partition)
+ out.writeLong(generation)
+ out.writeObject(split)
+ }
}
override def readExternal(in: ObjectInput) {
diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala
index 51bd59e2b1..7b323e089c 100644
--- a/core/src/test/scala/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/spark/CheckpointSuite.scala
@@ -34,13 +34,30 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging {
}
}
+ test("RDDs with one-to-one dependencies") {
+ testCheckpointing(_.map(x => x.toString))
+ testCheckpointing(_.flatMap(x => 1 to x))
+ testCheckpointing(_.filter(_ % 2 == 0))
+ testCheckpointing(_.sample(false, 0.5, 0))
+ testCheckpointing(_.glom())
+ testCheckpointing(_.mapPartitions(_.map(_.toString)))
+ testCheckpointing(r => new MapPartitionsWithSplitRDD(r,
+ (i: Int, iter: Iterator[Int]) => iter.map(_.toString) ))
+ testCheckpointing(_.map(x => (x % 2, 1)).reduceByKey(_ + _).mapValues(_.toString))
+ testCheckpointing(_.map(x => (x % 2, 1)).reduceByKey(_ + _).flatMapValues(x => 1 to x))
+ testCheckpointing(_.pipe(Seq("cat")))
+ }
+
test("ParallelCollection") {
- val parCollection = sc.makeRDD(1 to 4)
+ val parCollection = sc.makeRDD(1 to 4, 2)
+ val numSplits = parCollection.splits.size
parCollection.checkpoint()
assert(parCollection.dependencies === Nil)
val result = parCollection.collect()
assert(sc.objectFile[Int](parCollection.getCheckpointFile.get).collect() === result)
assert(parCollection.dependencies != Nil)
+ assert(parCollection.splits.length === numSplits)
+ assert(parCollection.splits.toList === parCollection.checkpointData.cpRDDSplits.toList)
assert(parCollection.collect() === result)
}
@@ -49,44 +66,58 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging {
val blockManager = SparkEnv.get.blockManager
blockManager.putSingle(blockId, "test", StorageLevel.MEMORY_ONLY)
val blockRDD = new BlockRDD[String](sc, Array(blockId))
+ val numSplits = blockRDD.splits.size
blockRDD.checkpoint()
val result = blockRDD.collect()
assert(sc.objectFile[String](blockRDD.getCheckpointFile.get).collect() === result)
assert(blockRDD.dependencies != Nil)
+ assert(blockRDD.splits.length === numSplits)
+ assert(blockRDD.splits.toList === blockRDD.checkpointData.cpRDDSplits.toList)
assert(blockRDD.collect() === result)
}
- test("RDDs with one-to-one dependencies") {
- testCheckpointing(_.map(x => x.toString))
- testCheckpointing(_.flatMap(x => 1 to x))
- testCheckpointing(_.filter(_ % 2 == 0))
- testCheckpointing(_.sample(false, 0.5, 0))
- testCheckpointing(_.glom())
- testCheckpointing(_.mapPartitions(_.map(_.toString)))
- testCheckpointing(r => new MapPartitionsWithSplitRDD(r,
- (i: Int, iter: Iterator[Int]) => iter.map(_.toString) ))
- testCheckpointing(_.map(x => (x % 2, 1)).reduceByKey(_ + _).mapValues(_.toString))
- testCheckpointing(_.map(x => (x % 2, 1)).reduceByKey(_ + _).flatMapValues(x => 1 to x))
- testCheckpointing(_.pipe(Seq("cat")))
- }
-
test("ShuffledRDD") {
- // Creating ShuffledRDD directly as PairRDDFunctions.combineByKey produces a MapPartitionedRDD
testCheckpointing(rdd => {
+ // Creating ShuffledRDD directly as PairRDDFunctions.combineByKey produces a MapPartitionedRDD
new ShuffledRDD(rdd.map(x => (x % 2, 1)), partitioner)
})
}
test("UnionRDD") {
def otherRDD = sc.makeRDD(1 to 10, 4)
+
+ // Test whether the size of UnionRDDSplits reduce in size after parent RDD is checkpointed.
+ // Current implementation of UnionRDD has transient reference to parent RDDs,
+ // so only the splits will reduce in serialized size, not the RDD.
testCheckpointing(_.union(otherRDD), false, true)
testParentCheckpointing(_.union(otherRDD), false, true)
}
test("CartesianRDD") {
- def otherRDD = sc.makeRDD(1 to 10, 4)
- testCheckpointing(_.cartesian(otherRDD))
- testParentCheckpointing(_.cartesian(otherRDD), true, false)
+ def otherRDD = sc.makeRDD(1 to 10, 1)
+ testCheckpointing(new CartesianRDD(sc, _, otherRDD))
+
+ // Test whether size of CoalescedRDD reduce in size after parent RDD is checkpointed
+ // Current implementation of CoalescedRDDSplit has transient reference to parent RDD,
+ // so only the RDD will reduce in serialized size, not the splits.
+ testParentCheckpointing(new CartesianRDD(sc, _, otherRDD), true, false)
+
+ // Test that the CartesianRDD updates parent splits (CartesianRDD.s1/s2) after
+ // the parent RDD has been checkpointed and parent splits have been changed to HadoopSplits.
+ // Note that this test is very specific to the current implementation of CartesianRDD.
+ val ones = sc.makeRDD(1 to 100, 10).map(x => x)
+ ones.checkpoint // checkpoint that MappedRDD
+ val cartesian = new CartesianRDD(sc, ones, ones)
+ val splitBeforeCheckpoint =
+ serializeDeserialize(cartesian.splits.head.asInstanceOf[CartesianSplit])
+ cartesian.count() // do the checkpointing
+ val splitAfterCheckpoint =
+ serializeDeserialize(cartesian.splits.head.asInstanceOf[CartesianSplit])
+ assert(
+ (splitAfterCheckpoint.s1 != splitBeforeCheckpoint.s1) &&
+ (splitAfterCheckpoint.s2 != splitBeforeCheckpoint.s2),
+ "CartesianRDD.parents not updated after parent RDD checkpointed"
+ )
}
test("CoalescedRDD") {
@@ -94,7 +125,7 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging {
// Test whether size of CoalescedRDD reduce in size after parent RDD is checkpointed
// Current implementation of CoalescedRDDSplit has transient reference to parent RDD,
- // so does not serialize the RDD (not need to check its size).
+ // so only the RDD will reduce in serialized size, not the splits.
testParentCheckpointing(new CoalescedRDD(_, 2), true, false)
// Test that the CoalescedRDDSplit updates parent splits (CoalescedRDDSplit.parents) after
@@ -145,13 +176,14 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging {
val operatedRDD = op(baseRDD)
val parentRDD = operatedRDD.dependencies.headOption.orNull
val rddType = operatedRDD.getClass.getSimpleName
+ val numSplits = operatedRDD.splits.length
// Find serialized sizes before and after the checkpoint
val (rddSizeBeforeCheckpoint, splitSizeBeforeCheckpoint) = getSerializedSizes(operatedRDD)
operatedRDD.checkpoint()
val result = operatedRDD.collect()
val (rddSizeAfterCheckpoint, splitSizeAfterCheckpoint) = getSerializedSizes(operatedRDD)
-
+
// Test whether the checkpoint file has been created
assert(sc.objectFile[U](operatedRDD.getCheckpointFile.get).collect() === result)
@@ -160,6 +192,9 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging {
// Test whether the splits have been changed to the new Hadoop splits
assert(operatedRDD.splits.toList === operatedRDD.checkpointData.cpRDDSplits.toList)
+
+ // Test whether the number of splits is same as before
+ assert(operatedRDD.splits.length === numSplits)
// Test whether the data in the checkpointed RDD is same as original
assert(operatedRDD.collect() === result)
@@ -168,7 +203,7 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging {
// does not have any dependency to another RDD (e.g., ParallelCollection,
// ShuffleRDD with ShuffleDependency), it may not reduce in size after checkpointing.
if (testRDDSize) {
- println("Size of " + rddType +
+ logInfo("Size of " + rddType +
"[" + rddSizeBeforeCheckpoint + " --> " + rddSizeAfterCheckpoint + "]")
assert(
rddSizeAfterCheckpoint < rddSizeBeforeCheckpoint,
@@ -184,7 +219,7 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging {
// must be forgotten after checkpointing (to remove all reference to parent RDDs) and
// replaced with the HadoopSplits of the checkpointed RDD.
if (testRDDSplitSize) {
- println("Size of " + rddType + " splits "
+ logInfo("Size of " + rddType + " splits "
+ "[" + splitSizeBeforeCheckpoint + " --> " + splitSizeAfterCheckpoint + "]")
assert(
splitSizeAfterCheckpoint < splitSizeBeforeCheckpoint,
@@ -294,14 +329,118 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging {
val bytes = Utils.serialize(obj)
Utils.deserialize[T](bytes)
}
+ /*
+ test("Consistency check for ResultTask") {
+ // Time ----------------------->
+ // Core 1: |<- count in thread 1, task 1 ->| |<-- checkpoint, task 1 ---->| |<- count in thread 2, task 2 ->|
+ // Core 2: |<- count in thread 1, task 2 ->| |<--- checkpoint, task 2 ---------->| |<- count in thread 2, task 1 ->|
+ // |
+ // checkpoint completed
+ sc.stop(); sc = null
+ System.clearProperty("spark.master.port")
+
+ val dir = File.createTempFile("temp_", "")
+ dir.delete()
+ val ctxt = new SparkContext("local[2]", "ResultTask")
+ ctxt.setCheckpointDir(dir.toString)
+
+ try {
+ val rdd = ctxt.makeRDD(1 to 2, 2).map(x => {
+ val state = CheckpointSuite.incrementState()
+ println("State = " + state)
+ if (state <= 3) {
+ // If executing the two tasks for the job comouting rdd.count
+ // of thread 1, or the first task for the recomputation due
+ // to checkpointing (saveing to HDFS), then do nothing
+ } else if (state == 4) {
+ // If executing the second task for the recomputation due to
+ // checkpointing. then prolong this task, to allow rdd.count
+ // of thread 2 to start before checkpoint of this RDD is completed
+
+ Thread.sleep(1000)
+ println("State = " + state + " wake up")
+ } else {
+ // Else executing the tasks from thread 2
+ Thread.sleep(1000)
+ println("State = " + state + " wake up")
+ }
+
+ (x, 1)
+ })
+ rdd.checkpoint()
+ val env = SparkEnv.get
+
+ val thread1 = new Thread() {
+ override def run() {
+ try {
+ SparkEnv.set(env)
+ rdd.count()
+ } catch {
+ case e: Exception => CheckpointSuite.failed("Exception in thread 1", e)
+ }
+ }
+ }
+ thread1.start()
+
+ val thread2 = new Thread() {
+ override def run() {
+ try {
+ SparkEnv.set(env)
+ CheckpointSuite.waitTillState(3)
+ println("\n\n\n\n")
+ rdd.count()
+ } catch {
+ case e: Exception => CheckpointSuite.failed("Exception in thread 2", e)
+ }
+ }
+ }
+ thread2.start()
+
+ thread1.join()
+ thread2.join()
+ } finally {
+ dir.delete()
+ }
+
+ assert(!CheckpointSuite.failed, CheckpointSuite.failureMessage)
+
+ ctxt.stop()
+
+ }
+ */
}
object CheckpointSuite {
+ /*
+ var state = 0
+ var failed = false
+ var failureMessage = ""
+
+ def incrementState(): Int = {
+ this.synchronized { state += 1; this.notifyAll(); state }
+ }
+
+ def getState(): Int = {
+ this.synchronized( state )
+ }
+
+ def waitTillState(s: Int) {
+ while(state < s) {
+ this.synchronized { this.wait() }
+ }
+ }
+
+ def failed(msg: String, ex: Exception) {
+ failed = true
+ failureMessage += msg + "\n" + ex + "\n\n"
+ }
+ */
+
// This is a custom cogroup function that does not use mapValues like
// the PairRDDFunctions.cogroup()
def cogroup[K, V](first: RDD[(K, V)], second: RDD[(K, V)], part: Partitioner) = {
- println("First = " + first + ", second = " + second)
+ //println("First = " + first + ", second = " + second)
new CoGroupedRDD[K](
Seq(first.asInstanceOf[RDD[(_, _)]], second.asInstanceOf[RDD[(_, _)]]),
part