diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-20 10:26:36 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-20 10:26:36 -0800 |
commit | 334ab9244113e4b792fd51697ef80ab0d3b3de25 (patch) | |
tree | a96b412afa7af5a1831ed380b5d6463e3c56ac39 | |
parent | 1cb725e41768abc37e755f0739aa447d32502975 (diff) | |
download | spark-334ab9244113e4b792fd51697ef80ab0d3b3de25.tar.gz spark-334ab9244113e4b792fd51697ef80ab0d3b3de25.tar.bz2 spark-334ab9244113e4b792fd51697ef80ab0d3b3de25.zip |
Fixed bug in CheckpointSuite
-rw-r--r-- | core/src/main/scala/spark/rdd/CheckpointRDD.scala | 8 | ||||
-rw-r--r-- | core/src/test/scala/spark/CheckpointSuite.scala | 4 |
2 files changed, 6 insertions, 6 deletions
diff --git a/core/src/main/scala/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/spark/rdd/CheckpointRDD.scala index 3328477959..9e37bdf659 100644 --- a/core/src/main/scala/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/spark/rdd/CheckpointRDD.scala @@ -22,10 +22,10 @@ class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: Stri override def getPartitions: Array[Partition] = { val dirContents = fs.listStatus(new Path(checkpointPath)) - val splitFiles = dirContents.map(_.getPath.toString).filter(_.contains("part-")).sorted - val numPartitions = splitFiles.size - if (numPartitions > 0 && !splitFiles(0).endsWith(CheckpointRDD.splitIdToFile(0)) || - !splitFiles(numPartitions-1).endsWith(CheckpointRDD.splitIdToFile(numPartitions-1))) { + val partitionFiles = dirContents.map(_.getPath.toString).filter(_.contains("part-")).sorted + val numPartitions = partitionFiles.size + if (numPartitions > 0 && (! partitionFiles(0).endsWith(CheckpointRDD.splitIdToFile(0)) || + ! partitionFiles(numPartitions-1).endsWith(CheckpointRDD.splitIdToFile(numPartitions-1)))) { throw new SparkException("Invalid checkpoint directory: " + checkpointPath) } Array.tabulate(numPartitions)(i => new CheckpointRDDPartition(i)) diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala index 1935ac9e49..ca385972fb 100644 --- a/core/src/test/scala/spark/CheckpointSuite.scala +++ b/core/src/test/scala/spark/CheckpointSuite.scala @@ -164,12 +164,12 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { test("CheckpointRDD with zero partitions") { val rdd = new BlockRDD[Int](sc, Array[String]()) - assert(rdd.splits.size === 0) + assert(rdd.partitions.size === 0) assert(rdd.isCheckpointed === false) rdd.checkpoint() assert(rdd.count() === 0) assert(rdd.isCheckpointed === true) - assert(rdd.splits.size === 0) + assert(rdd.partitions.size === 0) } /** |