From 131be5d62ef6b770de5106eb268a45bca385b599 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 14 Jan 2013 03:28:25 -0800 Subject: Fixed bug in RDD checkpointing. --- core/src/main/scala/spark/rdd/CheckpointRDD.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/spark/rdd/CheckpointRDD.scala index 86c63ca2f4..6f00f6ac73 100644 --- a/core/src/main/scala/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/spark/rdd/CheckpointRDD.scala @@ -80,12 +80,12 @@ private[spark] object CheckpointRDD extends Logging { val serializer = SparkEnv.get.serializer.newInstance() val serializeStream = serializer.serializeStream(fileOutputStream) serializeStream.writeAll(iterator) - fileOutputStream.close() + serializeStream.close() if (!fs.rename(tempOutputPath, finalOutputPath)) { if (!fs.delete(finalOutputPath, true)) { throw new IOException("Checkpoint failed: failed to delete earlier output of task " - + context.attemptId); + + context.attemptId) } if (!fs.rename(tempOutputPath, finalOutputPath)) { throw new IOException("Checkpoint failed: failed to save output of task: " @@ -119,7 +119,7 @@ private[spark] object CheckpointRDD extends Logging { val rdd = sc.makeRDD(1 to 10, 10).flatMap(x => 1 to 10000) val path = new Path(hdfsPath, "temp") val fs = path.getFileSystem(new Configuration()) - sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 10) _) + sc.runJob(rdd, CheckpointRDD.writeToFile(path.toString, 1024) _) val cpRDD = new CheckpointRDD[Int](sc, path.toString) assert(cpRDD.splits.length == rdd.splits.length, "Number of splits is not the same") assert(cpRDD.collect.toList == rdd.collect.toList, "Data of splits not the same") -- cgit v1.2.3