aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/sql-programming-guide.md6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala48
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala79
5 files changed, 119 insertions, 24 deletions
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index cde5830c73..40e33f757d 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1102,7 +1102,11 @@ root
{% endhighlight %}
Notice that the data types of the partitioning columns are automatically inferred. Currently,
-numeric data types and string type are supported.
+numeric data types and string type are supported. Sometimes users may not want to automatically
+infer the data types of the partitioning columns. For these use cases, the automatic type inference
+can be configured by `spark.sql.sources.partitionColumnTypeInference.enabled`, which is default to
+`true`. When type inference is disabled, string type will be used for the partitioning columns.
+
### Schema merging
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 77c6af27d1..c778889045 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -71,6 +71,9 @@ private[spark] object SQLConf {
// Whether to perform partition discovery when loading external data sources. Default to true.
val PARTITION_DISCOVERY_ENABLED = "spark.sql.sources.partitionDiscovery.enabled"
+ // Whether to perform partition column type inference. Default to true.
+ val PARTITION_COLUMN_TYPE_INFERENCE = "spark.sql.sources.partitionColumnTypeInference.enabled"
+
// The output committer class used by FSBasedRelation. The specified class needs to be a
// subclass of org.apache.hadoop.mapreduce.OutputCommitter.
val OUTPUT_COMMITTER_CLASS = "spark.sql.sources.outputCommitterClass"
@@ -250,6 +253,9 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
private[spark] def partitionDiscoveryEnabled() =
getConf(SQLConf.PARTITION_DISCOVERY_ENABLED, "true").toBoolean
+ private[spark] def partitionColumnTypeInferenceEnabled() =
+ getConf(SQLConf.PARTITION_COLUMN_TYPE_INFERENCE, "true").toBoolean
+
// Do not use a value larger than 4000 as the default value of this property.
// See the comments of SCHEMA_STRING_LENGTH_THRESHOLD above for more information.
private[spark] def schemaStringLengthThreshold: Int =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
index c4c99de5a3..9f6ec2ed8f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/PartitioningUtils.scala
@@ -72,10 +72,11 @@ private[sql] object PartitioningUtils {
*/
private[sql] def parsePartitions(
paths: Seq[Path],
- defaultPartitionName: String): PartitionSpec = {
+ defaultPartitionName: String,
+ typeInference: Boolean): PartitionSpec = {
// First, we need to parse every partition's path and see if we can find partition values.
val pathsWithPartitionValues = paths.flatMap { path =>
- parsePartition(path, defaultPartitionName).map(path -> _)
+ parsePartition(path, defaultPartitionName, typeInference).map(path -> _)
}
if (pathsWithPartitionValues.isEmpty) {
@@ -124,7 +125,8 @@ private[sql] object PartitioningUtils {
*/
private[sql] def parsePartition(
path: Path,
- defaultPartitionName: String): Option[PartitionValues] = {
+ defaultPartitionName: String,
+ typeInference: Boolean): Option[PartitionValues] = {
val columns = ArrayBuffer.empty[(String, Literal)]
// Old Hadoop versions don't have `Path.isRoot`
var finished = path.getParent == null
@@ -137,7 +139,7 @@ private[sql] object PartitioningUtils {
return None
}
- val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName)
+ val maybeColumn = parsePartitionColumn(chopped.getName, defaultPartitionName, typeInference)
maybeColumn.foreach(columns += _)
chopped = chopped.getParent
finished = maybeColumn.isEmpty || chopped.getParent == null
@@ -153,7 +155,8 @@ private[sql] object PartitioningUtils {
private def parsePartitionColumn(
columnSpec: String,
- defaultPartitionName: String): Option[(String, Literal)] = {
+ defaultPartitionName: String,
+ typeInference: Boolean): Option[(String, Literal)] = {
val equalSignIndex = columnSpec.indexOf('=')
if (equalSignIndex == -1) {
None
@@ -164,7 +167,7 @@ private[sql] object PartitioningUtils {
val rawColumnValue = columnSpec.drop(equalSignIndex + 1)
assert(rawColumnValue.nonEmpty, s"Empty partition column value in '$columnSpec'")
- val literal = inferPartitionColumnValue(rawColumnValue, defaultPartitionName)
+ val literal = inferPartitionColumnValue(rawColumnValue, defaultPartitionName, typeInference)
Some(columnName -> literal)
}
}
@@ -211,19 +214,28 @@ private[sql] object PartitioningUtils {
*/
private[sql] def inferPartitionColumnValue(
raw: String,
- defaultPartitionName: String): Literal = {
- // First tries integral types
- Try(Literal.create(Integer.parseInt(raw), IntegerType))
- .orElse(Try(Literal.create(JLong.parseLong(raw), LongType)))
- // Then falls back to fractional types
- .orElse(Try(Literal.create(JFloat.parseFloat(raw), FloatType)))
- .orElse(Try(Literal.create(JDouble.parseDouble(raw), DoubleType)))
- .orElse(Try(Literal.create(new JBigDecimal(raw), DecimalType.Unlimited)))
- // Then falls back to string
- .getOrElse {
- if (raw == defaultPartitionName) Literal.create(null, NullType)
- else Literal.create(unescapePathName(raw), StringType)
+ defaultPartitionName: String,
+ typeInference: Boolean): Literal = {
+ if (typeInference) {
+ // First tries integral types
+ Try(Literal.create(Integer.parseInt(raw), IntegerType))
+ .orElse(Try(Literal.create(JLong.parseLong(raw), LongType)))
+ // Then falls back to fractional types
+ .orElse(Try(Literal.create(JFloat.parseFloat(raw), FloatType)))
+ .orElse(Try(Literal.create(JDouble.parseDouble(raw), DoubleType)))
+ .orElse(Try(Literal.create(new JBigDecimal(raw), DecimalType.Unlimited)))
+ // Then falls back to string
+ .getOrElse {
+ if (raw == defaultPartitionName) Literal.create(null, NullType)
+ else Literal.create(unescapePathName(raw), StringType)
+ }
+ } else {
+ if (raw == defaultPartitionName) {
+ Literal.create(null, NullType)
+ } else {
+ Literal.create(unescapePathName(raw), StringType)
}
+ }
}
private val upCastingOrder: Seq[DataType] =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 25887ba9a1..d1547fb1e4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -491,9 +491,11 @@ abstract class HadoopFsRelation private[sql](maybePartitionSpec: Option[Partitio
}
private def discoverPartitions(): PartitionSpec = {
+ val typeInference = sqlContext.conf.partitionColumnTypeInferenceEnabled()
// We use leaf dirs containing data files to discover the schema.
val leafDirs = fileStatusCache.leafDirToChildrenFiles.keys.toSeq
- PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME)
+ PartitioningUtils.parsePartitions(leafDirs, PartitioningUtils.DEFAULT_PARTITION_NAME,
+ typeInference)
}
/**
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
index d9a010a981..c2f1cc8ffd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
@@ -48,7 +48,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
test("column type inference") {
def check(raw: String, literal: Literal): Unit = {
- assert(inferPartitionColumnValue(raw, defaultPartitionName) === literal)
+ assert(inferPartitionColumnValue(raw, defaultPartitionName, true) === literal)
}
check("10", Literal.create(10, IntegerType))
@@ -60,12 +60,12 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
test("parse partition") {
def check(path: String, expected: Option[PartitionValues]): Unit = {
- assert(expected === parsePartition(new Path(path), defaultPartitionName))
+ assert(expected === parsePartition(new Path(path), defaultPartitionName, true))
}
def checkThrows[T <: Throwable: Manifest](path: String, expected: String): Unit = {
val message = intercept[T] {
- parsePartition(new Path(path), defaultPartitionName).get
+ parsePartition(new Path(path), defaultPartitionName, true).get
}.getMessage
assert(message.contains(expected))
@@ -105,7 +105,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
test("parse partitions") {
def check(paths: Seq[String], spec: PartitionSpec): Unit = {
- assert(parsePartitions(paths.map(new Path(_)), defaultPartitionName) === spec)
+ assert(parsePartitions(paths.map(new Path(_)), defaultPartitionName, true) === spec)
}
check(Seq(
@@ -174,6 +174,77 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
PartitionSpec.emptySpec)
}
+ test("parse partitions with type inference disabled") {
+ def check(paths: Seq[String], spec: PartitionSpec): Unit = {
+ assert(parsePartitions(paths.map(new Path(_)), defaultPartitionName, false) === spec)
+ }
+
+ check(Seq(
+ "hdfs://host:9000/path/a=10/b=hello"),
+ PartitionSpec(
+ StructType(Seq(
+ StructField("a", StringType),
+ StructField("b", StringType))),
+ Seq(Partition(Row("10", "hello"), "hdfs://host:9000/path/a=10/b=hello"))))
+
+ check(Seq(
+ "hdfs://host:9000/path/a=10/b=20",
+ "hdfs://host:9000/path/a=10.5/b=hello"),
+ PartitionSpec(
+ StructType(Seq(
+ StructField("a", StringType),
+ StructField("b", StringType))),
+ Seq(
+ Partition(Row("10", "20"), "hdfs://host:9000/path/a=10/b=20"),
+ Partition(Row("10.5", "hello"), "hdfs://host:9000/path/a=10.5/b=hello"))))
+
+ check(Seq(
+ "hdfs://host:9000/path/_temporary",
+ "hdfs://host:9000/path/a=10/b=20",
+ "hdfs://host:9000/path/a=10.5/b=hello",
+ "hdfs://host:9000/path/a=10.5/_temporary",
+ "hdfs://host:9000/path/a=10.5/_TeMpOrArY",
+ "hdfs://host:9000/path/a=10.5/b=hello/_temporary",
+ "hdfs://host:9000/path/a=10.5/b=hello/_TEMPORARY",
+ "hdfs://host:9000/path/_temporary/path",
+ "hdfs://host:9000/path/a=11/_temporary/path",
+ "hdfs://host:9000/path/a=10.5/b=world/_temporary/path"),
+ PartitionSpec(
+ StructType(Seq(
+ StructField("a", StringType),
+ StructField("b", StringType))),
+ Seq(
+ Partition(Row("10", "20"), "hdfs://host:9000/path/a=10/b=20"),
+ Partition(Row("10.5", "hello"), "hdfs://host:9000/path/a=10.5/b=hello"))))
+
+ check(Seq(
+ s"hdfs://host:9000/path/a=10/b=20",
+ s"hdfs://host:9000/path/a=$defaultPartitionName/b=hello"),
+ PartitionSpec(
+ StructType(Seq(
+ StructField("a", StringType),
+ StructField("b", StringType))),
+ Seq(
+ Partition(Row("10", "20"), s"hdfs://host:9000/path/a=10/b=20"),
+ Partition(Row(null, "hello"), s"hdfs://host:9000/path/a=$defaultPartitionName/b=hello"))))
+
+ check(Seq(
+ s"hdfs://host:9000/path/a=10/b=$defaultPartitionName",
+ s"hdfs://host:9000/path/a=10.5/b=$defaultPartitionName"),
+ PartitionSpec(
+ StructType(Seq(
+ StructField("a", StringType),
+ StructField("b", StringType))),
+ Seq(
+ Partition(Row("10", null), s"hdfs://host:9000/path/a=10/b=$defaultPartitionName"),
+ Partition(Row("10.5", null), s"hdfs://host:9000/path/a=10.5/b=$defaultPartitionName"))))
+
+ check(Seq(
+ s"hdfs://host:9000/path1",
+ s"hdfs://host:9000/path2"),
+ PartitionSpec.emptySpec)
+ }
+
test("read partitioned table - normal case") {
withTempDir { base =>
for {