aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-12-01 14:08:36 -0800
committerAndrew Or <andrew@databricks.com>2015-12-01 14:08:36 -0800
commit60b541ee1b97c9e5e84aa2af2ce856f316ad22b3 (patch)
treebfa408b8238ed10fc0a6bd248efa9f81957598e9 /core/src/test/scala/org/apache
parent2cef1cdfbb5393270ae83179b6a4e50c3cbf9e93 (diff)
downloadspark-60b541ee1b97c9e5e84aa2af2ce856f316ad22b3.tar.gz
spark-60b541ee1b97c9e5e84aa2af2ce856f316ad22b3.tar.bz2
spark-60b541ee1b97c9e5e84aa2af2ce856f316ad22b3.zip
[SPARK-12004] Preserve the RDD partitioner through RDD checkpointing
The solution is the save the RDD partitioner in a separate file in the RDD checkpoint directory. That is, `<checkpoint dir>/_partitioner`. In most cases, whether the RDD partitioner was recovered or not, does not affect the correctness, only reduces performance. So this solution makes a best-effort attempt to save and recover the partitioner. If either fails, the checkpointing is not affected. This makes this patch safe and backward compatible. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #9983 from tdas/SPARK-12004.
Diffstat (limited to 'core/src/test/scala/org/apache')
-rw-r--r--core/src/test/scala/org/apache/spark/CheckpointSuite.scala61
1 files changed, 56 insertions, 5 deletions
diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
index ab23326c6c..553d46285a 100644
--- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala
@@ -21,7 +21,8 @@ import java.io.File
import scala.reflect.ClassTag
-import org.apache.spark.CheckpointSuite._
+import org.apache.hadoop.fs.Path
+
import org.apache.spark.rdd._
import org.apache.spark.storage.{BlockId, StorageLevel, TestBlockId}
import org.apache.spark.util.Utils
@@ -74,8 +75,10 @@ trait RDDCheckpointTester { self: SparkFunSuite =>
// Test whether the checkpoint file has been created
if (reliableCheckpoint) {
- assert(
- collectFunc(sparkContext.checkpointFile[U](operatedRDD.getCheckpointFile.get)) === result)
+ assert(operatedRDD.getCheckpointFile.nonEmpty)
+ val recoveredRDD = sparkContext.checkpointFile[U](operatedRDD.getCheckpointFile.get)
+ assert(collectFunc(recoveredRDD) === result)
+ assert(recoveredRDD.partitioner === operatedRDD.partitioner)
}
// Test whether dependencies have been changed from its earlier parent RDD
@@ -211,9 +214,14 @@ trait RDDCheckpointTester { self: SparkFunSuite =>
}
/** Run a test twice, once for local checkpointing and once for reliable checkpointing. */
- protected def runTest(name: String)(body: Boolean => Unit): Unit = {
+ protected def runTest(
+ name: String,
+ skipLocalCheckpoint: Boolean = false
+ )(body: Boolean => Unit): Unit = {
test(name + " [reliable checkpoint]")(body(true))
- test(name + " [local checkpoint]")(body(false))
+ if (!skipLocalCheckpoint) {
+ test(name + " [local checkpoint]")(body(false))
+ }
}
/**
@@ -264,6 +272,49 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS
assert(flatMappedRDD.collect() === result)
}
+ runTest("checkpointing partitioners", skipLocalCheckpoint = true) { _: Boolean =>
+
+ def testPartitionerCheckpointing(
+ partitioner: Partitioner,
+ corruptPartitionerFile: Boolean = false
+ ): Unit = {
+ val rddWithPartitioner = sc.makeRDD(1 to 4).map { _ -> 1 }.partitionBy(partitioner)
+ rddWithPartitioner.checkpoint()
+ rddWithPartitioner.count()
+ assert(rddWithPartitioner.getCheckpointFile.get.nonEmpty,
+ "checkpointing was not successful")
+
+ if (corruptPartitionerFile) {
+ // Overwrite the partitioner file with garbage data
+ val checkpointDir = new Path(rddWithPartitioner.getCheckpointFile.get)
+ val fs = checkpointDir.getFileSystem(sc.hadoopConfiguration)
+ val partitionerFile = fs.listStatus(checkpointDir)
+ .find(_.getPath.getName.contains("partitioner"))
+ .map(_.getPath)
+ require(partitionerFile.nonEmpty, "could not find the partitioner file for testing")
+ val output = fs.create(partitionerFile.get, true)
+ output.write(100)
+ output.close()
+ }
+
+ val newRDD = sc.checkpointFile[(Int, Int)](rddWithPartitioner.getCheckpointFile.get)
+ assert(newRDD.collect().toSet === rddWithPartitioner.collect().toSet, "RDD not recovered")
+
+ if (!corruptPartitionerFile) {
+ assert(newRDD.partitioner != None, "partitioner not recovered")
+ assert(newRDD.partitioner === rddWithPartitioner.partitioner,
+ "recovered partitioner does not match")
+ } else {
+ assert(newRDD.partitioner == None, "partitioner unexpectedly recovered")
+ }
+ }
+
+ testPartitionerCheckpointing(partitioner)
+
+ // Test that corrupted partitioner file does not prevent recovery of RDD
+ testPartitionerCheckpointing(partitioner, corruptPartitionerFile = true)
+ }
+
runTest("RDDs with one-to-one dependencies") { reliableCheckpoint: Boolean =>
testRDD(_.map(x => x.toString), reliableCheckpoint)
testRDD(_.flatMap(x => 1 to x), reliableCheckpoint)