aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYijie Shen <henry.yijieshen@gmail.com>2015-08-14 21:03:14 -0700
committerReynold Xin <rxin@databricks.com>2015-08-14 21:03:14 -0700
commit6c4fdbec33af287d24cd0995ecbd7191545d05c9 (patch)
treed44ea98701f1405b97c13d1976c8abb71548d673
parentec29f2034a3306cc0afdc4c160b42c2eefa0897c (diff)
downloadspark-6c4fdbec33af287d24cd0995ecbd7191545d05c9.tar.gz
spark-6c4fdbec33af287d24cd0995ecbd7191545d05c9.tar.bz2
spark-6c4fdbec33af287d24cd0995ecbd7191545d05c9.zip
[SPARK-8887] [SQL] Explicit define which data types can be used as dynamic partition columns
This PR enforce dynamic partition column data type requirements by adding analysis rules. JIRA: https://issues.apache.org/jira/browse/SPARK-8887 Author: Yijie Shen <henry.yijieshen@gmail.com> Closes #8201 from yjshen/dynamic_partition_columns.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala17
5 files changed, 41 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 66dfcc308c..0a2007e158 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -26,6 +26,7 @@ import scala.util.Try
import org.apache.hadoop.fs.Path
import org.apache.hadoop.util.Shell
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.types._
@@ -270,6 +271,18 @@ private[sql] object PartitioningUtils {
private val upCastingOrder: Seq[DataType] =
Seq(NullType, IntegerType, LongType, FloatType, DoubleType, StringType)
+ def validatePartitionColumnDataTypes(
+ schema: StructType,
+ partitionColumns: Array[String]): Unit = {
+
+ ResolvedDataSource.partitionColumnsSchema(schema, partitionColumns).foreach { field =>
+ field.dataType match {
+ case _: AtomicType => // OK
+ case _ => throw new AnalysisException(s"Cannot use ${field.dataType} for partition column")
+ }
+ }
+ }
+
/**
* Given a collection of [[Literal]]s, resolves possible type conflicts by up-casting "lower"
* types.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
index 7770bbd712..8fbaf3a305 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala
@@ -143,7 +143,7 @@ object ResolvedDataSource extends Logging {
new ResolvedDataSource(clazz, relation)
}
- private def partitionColumnsSchema(
+ def partitionColumnsSchema(
schema: StructType,
partitionColumns: Array[String]): StructType = {
StructType(partitionColumns.map { col =>
@@ -179,6 +179,9 @@ object ResolvedDataSource extends Logging {
val fs = path.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
}
+
+ PartitioningUtils.validatePartitionColumnDataTypes(data.schema, partitionColumns)
+
val dataSchema = StructType(data.schema.filterNot(f => partitionColumns.contains(f.name)))
val r = dataSource.createRelation(
sqlContext,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
index 2f11f40422..d36197e50d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala
@@ -287,7 +287,7 @@ private[sql] class DynamicPartitionWriterContainer(
PartitioningUtils.escapePathName _, StringType, Seq(Cast(c, StringType)), Seq(StringType))
val str = If(IsNull(c), Literal(defaultPartitionName), escaped)
val partitionName = Literal(c.name + "=") :: str :: Nil
- if (i == 0) partitionName else Literal(Path.SEPARATOR_CHAR.toString) :: partitionName
+ if (i == 0) partitionName else Literal(Path.SEPARATOR) :: partitionName
}
// Returns the partition path given a partition key.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 40ca8bf409..9d3d35692f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -116,6 +116,8 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
// OK
}
+ PartitioningUtils.validatePartitionColumnDataTypes(r.schema, part.keySet.toArray)
+
// Get all input data source relations of the query.
val srcRelations = query.collect {
case LogicalRelation(src: BaseRelation) => src
@@ -138,10 +140,10 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
// OK
}
- case CreateTableUsingAsSelect(tableName, _, _, _, SaveMode.Overwrite, _, query) =>
+ case CreateTableUsingAsSelect(tableName, _, _, partitionColumns, mode, _, query) =>
// When the SaveMode is Overwrite, we need to check if the table is an input table of
// the query. If so, we will throw an AnalysisException to let users know it is not allowed.
- if (catalog.tableExists(Seq(tableName))) {
+ if (mode == SaveMode.Overwrite && catalog.tableExists(Seq(tableName))) {
// Need to remove SubQuery operator.
EliminateSubQueries(catalog.lookupRelation(Seq(tableName))) match {
// Only do the check if the table is a data source table
@@ -164,6 +166,8 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
// OK
}
+ PartitioningUtils.validatePartitionColumnDataTypes(query.schema, partitionColumns)
+
case _ => // OK
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index af445626fb..8d0d9218dd 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.sources
+import java.sql.Date
+
import scala.collection.JavaConversions._
import org.apache.hadoop.conf.Configuration
@@ -553,6 +555,21 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue))
}
}
+
+ test("SPARK-8887: Explicitly define which data types can be used as dynamic partition columns") {
+ val df = Seq(
+ (1, "v1", Array(1, 2, 3), Map("k1" -> "v1"), Tuple2(1, "4")),
+ (2, "v2", Array(4, 5, 6), Map("k2" -> "v2"), Tuple2(2, "5")),
+ (3, "v3", Array(7, 8, 9), Map("k3" -> "v3"), Tuple2(3, "6"))).toDF("a", "b", "c", "d", "e")
+ withTempDir { file =>
+ intercept[AnalysisException] {
+ df.write.format(dataSourceName).partitionBy("c", "d", "e").save(file.getCanonicalPath)
+ }
+ }
+ intercept[AnalysisException] {
+ df.write.format(dataSourceName).partitionBy("c", "d", "e").saveAsTable("t")
+ }
+ }
}
// This class is used to test SPARK-8578. We should not use any custom output committer when