diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2012-10-31 00:56:40 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2012-10-31 00:56:40 -0700 |
commit | 34e569f40e184a6a4f21e9d79b0e8979d8f9541f (patch) | |
tree | c869ea21ca2ba0f690732e7af175b82b42351088 | |
parent | 0dcd770fdc4d558972b635b6770ed0120280ef22 (diff) | |
download | spark-34e569f40e184a6a4f21e9d79b0e8979d8f9541f.tar.gz spark-34e569f40e184a6a4f21e9d79b0e8979d8f9541f.tar.bz2 spark-34e569f40e184a6a4f21e9d79b0e8979d8f9541f.zip |
Added 'synchronized' to RDD serialization to ensure checkpoint-related changes are reflected atomically in the task closure. Added to tests to ensure that jobs running on an RDD on which checkpointing is in progress does hurt the result of the job.
-rw-r--r-- | core/src/main/scala/spark/RDD.scala | 18 | ||||
-rw-r--r-- | core/src/main/scala/spark/rdd/ShuffledRDD.scala | 7 | ||||
-rw-r--r-- | core/src/test/scala/spark/CheckpointSuite.scala | 71 |
3 files changed, 92 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index e272a0ede9..7b59a6f09e 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -1,8 +1,7 @@ package spark -import java.io.EOFException +import java.io.{ObjectOutputStream, IOException, EOFException, ObjectInputStream} import java.net.URL -import java.io.ObjectInputStream import java.util.concurrent.atomic.AtomicLong import java.util.Random import java.util.Date @@ -589,4 +588,19 @@ abstract class RDD[T: ClassManifest]( private[spark] def collectPartitions(): Array[Array[T]] = { sc.runJob(this, (iter: Iterator[T]) => iter.toArray) } + + @throws(classOf[IOException]) + private def writeObject(oos: ObjectOutputStream) { + synchronized { + oos.defaultWriteObject() + } + } + + @throws(classOf[IOException]) + private def readObject(ois: ObjectInputStream) { + synchronized { + ois.defaultReadObject() + } + } + } diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala index b7d843c26d..31774585f4 100644 --- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala +++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala @@ -27,7 +27,7 @@ class ShuffledRDD[K, V]( override val partitioner = Some(part) @transient - val splits_ = Array.tabulate[Split](part.numPartitions)(i => new ShuffledRDDSplit(i)) + var splits_ = Array.tabulate[Split](part.numPartitions)(i => new ShuffledRDDSplit(i)) override def splits = splits_ @@ -35,4 +35,9 @@ class ShuffledRDD[K, V]( val shuffledId = dependencies.head.asInstanceOf[ShuffleDependency[K, V]].shuffleId SparkEnv.get.shuffleFetcher.fetch[K, V](shuffledId, split.index) } + + override def changeDependencies(newRDD: RDD[_]) { + dependencies_ = Nil + splits_ = newRDD.splits + } } diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala index 0e5ca7dc21..57dc43ddac 100644 --- a/core/src/test/scala/spark/CheckpointSuite.scala +++ b/core/src/test/scala/spark/CheckpointSuite.scala @@ -5,8 +5,10 @@ import java.io.File import rdd.{BlockRDD, CoalescedRDD, MapPartitionsWithSplitRDD} import spark.SparkContext._ import storage.StorageLevel +import java.util.concurrent.Semaphore -class CheckpointSuite extends FunSuite with BeforeAndAfter { +class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging { + initLogging() var sc: SparkContext = _ var checkpointDir: File = _ @@ -92,6 +94,35 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter { testCheckpointing(rdd1 => rdd1.map(x => (x % 2, x)).join(rdd2)) } + /** + * This test forces two ResultTasks of the same job to be launched before and after + * the checkpointing of job's RDD is completed. + */ + test("Threading - ResultTasks") { + val op1 = (parCollection: RDD[Int]) => { + parCollection.map(x => { println("1st map running on " + x); Thread.sleep(500); (x % 2, x) }) + } + val op2 = (firstRDD: RDD[(Int, Int)]) => { + firstRDD.map(x => { println("2nd map running on " + x); Thread.sleep(500); x }) + } + testThreading(op1, op2) + } + + /** + * This test forces two ShuffleMapTasks of the same job to be launched before and after + * the checkpointing of job's RDD is completed. + */ + test("Threading - ShuffleMapTasks") { + val op1 = (parCollection: RDD[Int]) => { + parCollection.map(x => { println("1st map running on " + x); Thread.sleep(500); (x % 2, x) }) + } + val op2 = (firstRDD: RDD[(Int, Int)]) => { + firstRDD.groupByKey(2).map(x => { println("2nd map running on " + x); Thread.sleep(500); x }) + } + testThreading(op1, op2) + } + + def testCheckpointing[U: ClassManifest](op: (RDD[Int]) => RDD[U], sleepTime: Long = 500) { val parCollection = sc.makeRDD(1 to 4, 4) val operatedRDD = op(parCollection) @@ -105,6 +136,44 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter { assert(operatedRDD.collect() === result) } + def testThreading[U: ClassManifest, V: ClassManifest](op1: (RDD[Int]) => RDD[U], op2: (RDD[U]) => RDD[V]) { + + val parCollection = sc.makeRDD(1 to 2, 2) + + // This is the RDD that is to be checkpointed + val firstRDD = op1(parCollection) + val parentRDD = firstRDD.dependencies.head.rdd + firstRDD.checkpoint() + + // This the RDD that uses firstRDD. This is designed to launch a + // ShuffleMapTask that uses firstRDD. + val secondRDD = op2(firstRDD) + + // Starting first job, to initiate the checkpointing + logInfo("\nLaunching 1st job to initiate checkpointing\n") + firstRDD.collect() + + // Checkpointing has started but not completed yet + Thread.sleep(100) + assert(firstRDD.dependencies.head.rdd === parentRDD) + + // Starting second job; first task of this job will be + // launched _before_ firstRDD is marked as checkpointed + // and the second task will be launched _after_ firstRDD + // is marked as checkpointed + logInfo("\nLaunching 2nd job that is designed to launch tasks " + + "before and after checkpointing is complete\n") + val result = secondRDD.collect() + + // Check whether firstRDD has been successfully checkpointed + assert(firstRDD.dependencies.head.rdd != parentRDD) + + logInfo("\nRecomputing 2nd job to verify the results of the previous computation\n") + // Check whether the result in the previous job was correct or not + val correctResult = secondRDD.collect() + assert(result === correctResult) + } + def sleep(rdd: RDD[_]) { val startTime = System.currentTimeMillis() val maxWaitTime = 5000 |