aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-01-19 10:44:51 -0800
committerYin Huai <yhuai@databricks.com>2016-01-19 10:44:51 -0800
commite14817b528ccab4b4685b45a95e2325630b5fc53 (patch)
tree5347fe30325c3c8f4f7154d15c1625a94f78a839 /sql/core
parent0ddba6d88ff093a96b4931f71bd0a599afbbca78 (diff)
downloadspark-e14817b528ccab4b4685b45a95e2325630b5fc53.tar.gz
spark-e14817b528ccab4b4685b45a95e2325630b5fc53.tar.bz2
spark-e14817b528ccab4b4685b45a95e2325630b5fc53.zip
[SPARK-12870][SQL] better format bucket id in file name
for normal parquet file without bucket, it's file name ends with a jobUUID which maybe all numbers and mistakeny regarded as bucket id. This PR improves the format of bucket id in file name by using a different seperator, `_`, so that the regex is more robust. Author: Wenchen Fan <wenchen@databricks.com> Closes #10799 from cloud-fan/fix-bucket.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala2
3 files changed, 12 insertions, 6 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala
index c7ecd6125d..3e0d484b74 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/bucket.scala
@@ -57,15 +57,21 @@ private[sql] abstract class BucketedOutputWriterFactory extends OutputWriterFact
private[sql] object BucketingUtils {
// The file name of bucketed data should have 3 parts:
- // 1. some other information in the head of file name, ends with `-`
- // 2. bucket id part, some numbers
+ // 1. some other information in the head of file name
+ // 2. bucket id part, some numbers, starts with "_"
+ // * The other-information part may use `-` as separator and may have numbers at the end,
+ // e.g. a normal parquet file without bucketing may have name:
+ // part-r-00000-2dd664f9-d2c4-4ffe-878f-431234567891.gz.parquet, and we will mistakenly
+ // treat `431234567891` as bucket id. So here we pick `_` as separator.
// 3. optional file extension part, in the tail of file name, starts with `.`
// An example of bucketed parquet file name with bucket id 3:
- // part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb-00003.gz.parquet
- private val bucketedFileName = """.*-(\d+)(?:\..*)?$""".r
+ // part-r-00000-2dd664f9-d2c4-4ffe-878f-c6c70c1fb0cb_00003.gz.parquet
+ private val bucketedFileName = """.*_(\d+)(?:\..*)?$""".r
def getBucketId(fileName: String): Option[Int] = fileName match {
case bucketedFileName(bucketId) => Some(bucketId.toInt)
case other => None
}
+
+ def bucketIdToString(id: Int): String = f"_$id%05d"
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
index 20c60b9c43..31c5620c9a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JSONRelation.scala
@@ -193,7 +193,7 @@ private[json] class JsonOutputWriter(
val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
val taskAttemptId = context.getTaskAttemptID
val split = taskAttemptId.getTaskID.getId
- val bucketString = bucketId.map(id => f"-$id%05d").getOrElse("")
+ val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$bucketString$extension")
}
}.getRecordWriter(context)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 30ddec686c..b460ec1d26 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -90,7 +90,7 @@ private[sql] class ParquetOutputWriter(
val uniqueWriteJobId = configuration.get("spark.sql.sources.writeJobUUID")
val taskAttemptId = context.getTaskAttemptID
val split = taskAttemptId.getTaskID.getId
- val bucketString = bucketId.map(id => f"-$id%05d").getOrElse("")
+ val bucketString = bucketId.map(BucketingUtils.bucketIdToString).getOrElse("")
new Path(path, f"part-r-$split%05d-$uniqueWriteJobId$bucketString$extension")
}
}