aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-04-24 18:53:12 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-04-25 00:42:37 -0700
commit6e6b5204ea015fc7cc2c3e16e0032be3074413be (patch)
treefa01a8bf6ddde4e7e921e7cdb0269bf5b8cc2bf3
parenteef9ea1993270d5f07e52e807e8d149e54079aad (diff)
downloadspark-6e6b5204ea015fc7cc2c3e16e0032be3074413be.tar.gz
spark-6e6b5204ea015fc7cc2c3e16e0032be3074413be.tar.bz2
spark-6e6b5204ea015fc7cc2c3e16e0032be3074413be.zip
Create an empty directory when checkpointing a 0-partition RDD (fixes a
test failure on Hadoop 2.0)
-rw-r--r--core/src/main/scala/spark/RDDCheckpointData.scala15
1 files changed, 11 insertions, 4 deletions
diff --git a/core/src/main/scala/spark/RDDCheckpointData.scala b/core/src/main/scala/spark/RDDCheckpointData.scala
index d00092e984..57e0405fb4 100644
--- a/core/src/main/scala/spark/RDDCheckpointData.scala
+++ b/core/src/main/scala/spark/RDDCheckpointData.scala
@@ -1,6 +1,7 @@
package spark
import org.apache.hadoop.fs.Path
+import org.apache.hadoop.conf.Configuration
import rdd.{CheckpointRDD, CoalescedRDD}
import scheduler.{ResultTask, ShuffleMapTask}
@@ -62,14 +63,20 @@ private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T])
}
}
+ // Create the output path for the checkpoint
+ val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id)
+ val fs = path.getFileSystem(new Configuration())
+ if (!fs.mkdirs(path)) {
+ throw new SparkException("Failed to create checkpoint path " + path)
+ }
+
// Save to file, and reload it as an RDD
- val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id).toString
- rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path) _)
- val newRDD = new CheckpointRDD[T](rdd.context, path)
+ rdd.context.runJob(rdd, CheckpointRDD.writeToFile(path.toString) _)
+ val newRDD = new CheckpointRDD[T](rdd.context, path.toString)
// Change the dependencies and partitions of the RDD
RDDCheckpointData.synchronized {
- cpFile = Some(path)
+ cpFile = Some(path.toString)
cpRDD = Some(newRDD)
rdd.markCheckpointed(newRDD) // Update the RDD's dependencies and partitions
cpState = Checkpointed