diff options
author | Yijie Shen <henry.yijieshen@gmail.com> | 2015-08-14 21:03:14 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-08-14 21:03:14 -0700 |
commit | 6c4fdbec33af287d24cd0995ecbd7191545d05c9 (patch) | |
tree | d44ea98701f1405b97c13d1976c8abb71548d673 /sql/core | |
parent | ec29f2034a3306cc0afdc4c160b42c2eefa0897c (diff) | |
download | spark-6c4fdbec33af287d24cd0995ecbd7191545d05c9.tar.gz spark-6c4fdbec33af287d24cd0995ecbd7191545d05c9.tar.bz2 spark-6c4fdbec33af287d24cd0995ecbd7191545d05c9.zip |
[SPARK-8887] [SQL] Explicit define which data types can be used as dynamic partition columns
This PR enforce dynamic partition column data type requirements by adding analysis rules.
JIRA: https://issues.apache.org/jira/browse/SPARK-8887
Author: Yijie Shen <henry.yijieshen@gmail.com>
Closes #8201 from yjshen/dynamic_partition_columns.
Diffstat (limited to 'sql/core')
4 files changed, 24 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 66dfcc308c..0a2007e158 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -26,6 +26,7 @@ import scala.util.Try import org.apache.hadoop.fs.Path import org.apache.hadoop.util.Shell +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Cast, Literal} import org.apache.spark.sql.types._ @@ -270,6 +271,18 @@ private[sql] object PartitioningUtils { private val upCastingOrder: Seq[DataType] = Seq(NullType, IntegerType, LongType, FloatType, DoubleType, StringType) + def validatePartitionColumnDataTypes( + schema: StructType, + partitionColumns: Array[String]): Unit = { + + ResolvedDataSource.partitionColumnsSchema(schema, partitionColumns).foreach { field => + field.dataType match { + case _: AtomicType => // OK + case _ => throw new AnalysisException(s"Cannot use ${field.dataType} for partition column") + } + } + } + /** * Given a collection of [[Literal]]s, resolves possible type conflicts by up-casting "lower" * types. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala index 7770bbd712..8fbaf3a305 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala @@ -143,7 +143,7 @@ object ResolvedDataSource extends Logging { new ResolvedDataSource(clazz, relation) } - private def partitionColumnsSchema( + def partitionColumnsSchema( schema: StructType, partitionColumns: Array[String]): StructType = { StructType(partitionColumns.map { col => @@ -179,6 +179,9 @@ object ResolvedDataSource extends Logging { val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) path.makeQualified(fs.getUri, fs.getWorkingDirectory) } + + PartitioningUtils.validatePartitionColumnDataTypes(data.schema, partitionColumns) + val dataSchema = StructType(data.schema.filterNot(f => partitionColumns.contains(f.name))) val r = dataSource.createRelation( sqlContext, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala index 2f11f40422..d36197e50d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala @@ -287,7 +287,7 @@ private[sql] class DynamicPartitionWriterContainer( PartitioningUtils.escapePathName _, StringType, Seq(Cast(c, StringType)), Seq(StringType)) val str = If(IsNull(c), Literal(defaultPartitionName), escaped) val partitionName = Literal(c.name + "=") :: str :: Nil - if (i == 0) partitionName else Literal(Path.SEPARATOR_CHAR.toString) :: partitionName + if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName } // Returns the partition path given a partition key. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 40ca8bf409..9d3d35692f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -116,6 +116,8 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => // OK } + PartitioningUtils.validatePartitionColumnDataTypes(r.schema, part.keySet.toArray) + // Get all input data source relations of the query. val srcRelations = query.collect { case LogicalRelation(src: BaseRelation) => src @@ -138,10 +140,10 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => // OK } - case CreateTableUsingAsSelect(tableName, _, _, _, SaveMode.Overwrite, _, query) => + case CreateTableUsingAsSelect(tableName, _, _, partitionColumns, mode, _, query) => // When the SaveMode is Overwrite, we need to check if the table is an input table of // the query. If so, we will throw an AnalysisException to let users know it is not allowed. - if (catalog.tableExists(Seq(tableName))) { + if (mode == SaveMode.Overwrite && catalog.tableExists(Seq(tableName))) { // Need to remove SubQuery operator. EliminateSubQueries(catalog.lookupRelation(Seq(tableName))) match { // Only do the check if the table is a data source table @@ -164,6 +166,8 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => // OK } + PartitioningUtils.validatePartitionColumnDataTypes(query.schema, partitionColumns) + case _ => // OK } } |