diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-04-24 18:53:12 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-04-25 00:42:37 -0700 |
commit | 6e6b5204ea015fc7cc2c3e16e0032be3074413be (patch) | |
tree | fa01a8bf6ddde4e7e921e7cdb0269bf5b8cc2bf3 | |
parent | eef9ea1993270d5f07e52e807e8d149e54079aad (diff) | |
download | spark-6e6b5204ea015fc7cc2c3e16e0032be3074413be.tar.gz spark-6e6b5204ea015fc7cc2c3e16e0032be3074413be.tar.bz2 spark-6e6b5204ea015fc7cc2c3e16e0032be3074413be.zip |
Create an empty directory when checkpointing a 0-partition RDD (fixes a
test failure on Hadoop 2.0)
-rw-r--r-- | core/src/main/scala/spark/RDDCheckpointData.scala | 15 |
1 files changed, 11 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/RDDCheckpointData.scala b/core/src/main/scala/spark/RDDCheckpointData.scala index d00092e984..57e0405fb4 100644 --- a/core/src/main/scala/spark/RDDCheckpointData.scala +++ b/core/src/main/scala/spark/RDDCheckpointData.scala @@ -1,6 +1,7 @@ package spark import org.apache.hadoop.fs.Path +import org.apache.hadoop.conf.Configuration import rdd.{CheckpointRDD, CoalescedRDD} import scheduler.{ResultTask, ShuffleMapTask} @@ -62,14 +63,20 @@ private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T]) } } + // Create the output path for the checkpoint + val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id) + val fs = path.getFileSystem(new Configuration()) + if (!fs.mkdirs(path)) { + throw new SparkException("Failed to create checkpoint path " + path) + } + // Save to file, and reload it as an RDD - val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id).toString - rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path) _) - val newRDD = new CheckpointRDD[T](rdd.context, path) + rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString) _) + val newRDD = new CheckpointRDD[T](rdd.context, path.toString) // Change the dependencies and partitions of the RDD RDDCheckpointData.synchronized { - cpFile = Some(path) + cpFile = Some(path.toString) cpRDD = Some(newRDD) rdd.markCheckpointed(newRDD) // Update the RDD's dependencies and partitions cpState = Checkpointed |