aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorDhruve Ashar <dhruveashar@gmail.com>2016-10-10 10:55:57 -0500
committerTom Graves <tgraves@yahoo-inc.com>2016-10-10 10:55:57 -0500
commit4bafacaa5f50a3e986c14a38bc8df9bae303f3a0 (patch)
tree2f4afc5397359b163380f8a75ffcfee0eff731ad /core
parent7e16c94f18ec07e4de63e66e06ad757b9e2550b9 (diff)
downloadspark-4bafacaa5f50a3e986c14a38bc8df9bae303f3a0.tar.gz
spark-4bafacaa5f50a3e986c14a38bc8df9bae303f3a0.tar.bz2
spark-4bafacaa5f50a3e986c14a38bc8df9bae303f3a0.zip
[SPARK-17417][CORE] Fix # of partitions for Reliable RDD checkpointing
## What changes were proposed in this pull request? Currently the no. of partition files are limited to 10000 files (%05d format). If there are more than 10000 part files, the logic goes for a toss while recreating the RDD as it sorts them by string. More details can be found in the JIRA desc [here](https://issues.apache.org/jira/browse/SPARK-17417). ## How was this patch tested? I tested this patch by checkpointing a RDD and then manually renaming part files to the old format and tried to access the RDD. It was successfully created from the old format. Also verified loading a sample parquet file and saving it as multiple formats - CSV, JSON, Text, Parquet, ORC and read them successfully back from the saved files. I couldn't launch the unit test from my local box, so will wait for the Jenkins output. Author: Dhruve Ashar <dhruveashar@gmail.com> Closes #15370 from dhruve/bug/SPARK-17417.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala4
1 files changed, 2 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
index ab6554fd8a..eac901d100 100644
--- a/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala
@@ -69,10 +69,10 @@ private[spark] class ReliableCheckpointRDD[T: ClassTag](
val inputFiles = fs.listStatus(cpath)
.map(_.getPath)
.filter(_.getName.startsWith("part-"))
- .sortBy(_.toString)
+ .sortBy(_.getName.stripPrefix("part-").toInt)
// Fail fast if input files are invalid
inputFiles.zipWithIndex.foreach { case (path, i) =>
- if (!path.toString.endsWith(ReliableCheckpointRDD.checkpointFileName(i))) {
+ if (path.getName != ReliableCheckpointRDD.checkpointFileName(i)) {
throw new SparkException(s"Invalid checkpoint file: $path")
}
}