aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2012-10-31 00:56:40 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2012-10-31 00:56:40 -0700
commit34e569f40e184a6a4f21e9d79b0e8979d8f9541f (patch)
treec869ea21ca2ba0f690732e7af175b82b42351088
parent0dcd770fdc4d558972b635b6770ed0120280ef22 (diff)
downloadspark-34e569f40e184a6a4f21e9d79b0e8979d8f9541f.tar.gz
spark-34e569f40e184a6a4f21e9d79b0e8979d8f9541f.tar.bz2
spark-34e569f40e184a6a4f21e9d79b0e8979d8f9541f.zip
Added 'synchronized' to RDD serialization to ensure checkpoint-related changes are reflected atomically in the task closure. Added to tests to ensure that jobs running on an RDD on which checkpointing is in progress does hurt the result of the job.
-rw-r--r--core/src/main/scala/spark/RDD.scala18
-rw-r--r--core/src/main/scala/spark/rdd/ShuffledRDD.scala7
-rw-r--r--core/src/test/scala/spark/CheckpointSuite.scala71
3 files changed, 92 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala
index e272a0ede9..7b59a6f09e 100644
--- a/core/src/main/scala/spark/RDD.scala
+++ b/core/src/main/scala/spark/RDD.scala
@@ -1,8 +1,7 @@
package spark
-import java.io.EOFException
+import java.io.{ObjectOutputStream, IOException, EOFException, ObjectInputStream}
import java.net.URL
-import java.io.ObjectInputStream
import java.util.concurrent.atomic.AtomicLong
import java.util.Random
import java.util.Date
@@ -589,4 +588,19 @@ abstract class RDD[T: ClassManifest](
private[spark] def collectPartitions(): Array[Array[T]] = {
sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
}
+
+ @throws(classOf[IOException])
+ private def writeObject(oos: ObjectOutputStream) {
+ synchronized {
+ oos.defaultWriteObject()
+ }
+ }
+
+ @throws(classOf[IOException])
+ private def readObject(ois: ObjectInputStream) {
+ synchronized {
+ ois.defaultReadObject()
+ }
+ }
+
}
diff --git a/core/src/main/scala/spark/rdd/ShuffledRDD.scala b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
index b7d843c26d..31774585f4 100644
--- a/core/src/main/scala/spark/rdd/ShuffledRDD.scala
+++ b/core/src/main/scala/spark/rdd/ShuffledRDD.scala
@@ -27,7 +27,7 @@ class ShuffledRDD[K, V](
override val partitioner = Some(part)
@transient
- val splits_ = Array.tabulate[Split](part.numPartitions)(i => new ShuffledRDDSplit(i))
+ var splits_ = Array.tabulate[Split](part.numPartitions)(i => new ShuffledRDDSplit(i))
override def splits = splits_
@@ -35,4 +35,9 @@ class ShuffledRDD[K, V](
val shuffledId = dependencies.head.asInstanceOf[ShuffleDependency[K, V]].shuffleId
SparkEnv.get.shuffleFetcher.fetch[K, V](shuffledId, split.index)
}
+
+ override def changeDependencies(newRDD: RDD[_]) {
+ dependencies_ = Nil
+ splits_ = newRDD.splits
+ }
}
diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala
index 0e5ca7dc21..57dc43ddac 100644
--- a/core/src/test/scala/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/spark/CheckpointSuite.scala
@@ -5,8 +5,10 @@ import java.io.File
import rdd.{BlockRDD, CoalescedRDD, MapPartitionsWithSplitRDD}
import spark.SparkContext._
import storage.StorageLevel
+import java.util.concurrent.Semaphore
-class CheckpointSuite extends FunSuite with BeforeAndAfter {
+class CheckpointSuite extends FunSuite with BeforeAndAfter with Logging {
+ initLogging()
var sc: SparkContext = _
var checkpointDir: File = _
@@ -92,6 +94,35 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter {
testCheckpointing(rdd1 => rdd1.map(x => (x % 2, x)).join(rdd2))
}
+ /**
+ * This test forces two ResultTasks of the same job to be launched before and after
+ * the checkpointing of job's RDD is completed.
+ */
+ test("Threading - ResultTasks") {
+ val op1 = (parCollection: RDD[Int]) => {
+ parCollection.map(x => { println("1st map running on " + x); Thread.sleep(500); (x % 2, x) })
+ }
+ val op2 = (firstRDD: RDD[(Int, Int)]) => {
+ firstRDD.map(x => { println("2nd map running on " + x); Thread.sleep(500); x })
+ }
+ testThreading(op1, op2)
+ }
+
+ /**
+ * This test forces two ShuffleMapTasks of the same job to be launched before and after
+ * the checkpointing of job's RDD is completed.
+ */
+ test("Threading - ShuffleMapTasks") {
+ val op1 = (parCollection: RDD[Int]) => {
+ parCollection.map(x => { println("1st map running on " + x); Thread.sleep(500); (x % 2, x) })
+ }
+ val op2 = (firstRDD: RDD[(Int, Int)]) => {
+ firstRDD.groupByKey(2).map(x => { println("2nd map running on " + x); Thread.sleep(500); x })
+ }
+ testThreading(op1, op2)
+ }
+
+
def testCheckpointing[U: ClassManifest](op: (RDD[Int]) => RDD[U], sleepTime: Long = 500) {
val parCollection = sc.makeRDD(1 to 4, 4)
val operatedRDD = op(parCollection)
@@ -105,6 +136,44 @@ class CheckpointSuite extends FunSuite with BeforeAndAfter {
assert(operatedRDD.collect() === result)
}
+ def testThreading[U: ClassManifest, V: ClassManifest](op1: (RDD[Int]) => RDD[U], op2: (RDD[U]) => RDD[V]) {
+
+ val parCollection = sc.makeRDD(1 to 2, 2)
+
+ // This is the RDD that is to be checkpointed
+ val firstRDD = op1(parCollection)
+ val parentRDD = firstRDD.dependencies.head.rdd
+ firstRDD.checkpoint()
+
+ // This the RDD that uses firstRDD. This is designed to launch a
+ // ShuffleMapTask that uses firstRDD.
+ val secondRDD = op2(firstRDD)
+
+ // Starting first job, to initiate the checkpointing
+ logInfo("\nLaunching 1st job to initiate checkpointing\n")
+ firstRDD.collect()
+
+ // Checkpointing has started but not completed yet
+ Thread.sleep(100)
+ assert(firstRDD.dependencies.head.rdd === parentRDD)
+
+ // Starting second job; first task of this job will be
+ // launched _before_ firstRDD is marked as checkpointed
+ // and the second task will be launched _after_ firstRDD
+ // is marked as checkpointed
+ logInfo("\nLaunching 2nd job that is designed to launch tasks " +
+ "before and after checkpointing is complete\n")
+ val result = secondRDD.collect()
+
+ // Check whether firstRDD has been successfully checkpointed
+ assert(firstRDD.dependencies.head.rdd != parentRDD)
+
+ logInfo("\nRecomputing 2nd job to verify the results of the previous computation\n")
+ // Check whether the result in the previous job was correct or not
+ val correctResult = secondRDD.collect()
+ assert(result === correctResult)
+ }
+
def sleep(rdd: RDD[_]) {
val startTime = System.currentTimeMillis()
val maxWaitTime = 5000