diff options
author | windpiger <songjun@outlook.com> | 2017-01-23 19:06:04 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2017-01-23 19:06:04 +0800 |
commit | 0ef1421a645be79857ef96a90464e0e669190dcf (patch) | |
tree | 79d68689ca3539d25977714e15cea35307092877 /sql | |
parent | c99492141b1ddddb8edb6841a6e83748e5ba9bba (diff) | |
download | spark-0ef1421a645be79857ef96a90464e0e669190dcf.tar.gz spark-0ef1421a645be79857ef96a90464e0e669190dcf.tar.bz2 spark-0ef1421a645be79857ef96a90464e0e669190dcf.zip |
[SPARK-19284][SQL] append to partitioned datasource table should without custom partition location
## What changes were proposed in this pull request?
when we append data to a existed partitioned datasource table, the InsertIntoHadoopFsRelationCommand.getCustomPartitionLocations currently
return the same location with Hive default, it should return None.
## How was this patch tested?
Author: windpiger <songjun@outlook.com>
Closes #16642 from windpiger/appendSchema.
Diffstat (limited to 'sql')
3 files changed, 36 insertions, 5 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 052efe5edf..5abd579476 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -122,7 +122,6 @@ case class CreateDataSourceTableAsSelectCommand( override def run(sparkSession: SparkSession): Seq[Row] = { assert(table.tableType != CatalogTableType.VIEW) assert(table.provider.isDefined) - assert(table.schema.isEmpty) val sessionState = sparkSession.sessionState val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase) @@ -144,6 +143,8 @@ case class CreateDataSourceTableAsSelectCommand( saveDataIntoTable( sparkSession, table, table.storage.locationUri, query, mode, tableExists = true) } else { + assert(table.schema.isEmpty) + val tableLocation = if (table.tableType == CatalogTableType.MANAGED) { Some(sessionState.catalog.defaultTablePath(table.identifier)) } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index beacb08994..f8c7fca028 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -95,7 +95,7 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl case c @ CreateTable(tableDesc, SaveMode.Append, Some(query)) if sparkSession.sessionState.catalog.tableExists(tableDesc.identifier) => // This is guaranteed by the parser and `DataFrameWriter` - assert(tableDesc.schema.isEmpty && tableDesc.provider.isDefined) + assert(tableDesc.provider.isDefined) // Analyze the query in CTAS and then we can do the normalization and checking. val qe = sparkSession.sessionState.executePlan(query) @@ -186,9 +186,7 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl } c.copy( - // trust everything from the existing table, except schema as we assume it's empty in a lot - // of places, when we do CTAS. - tableDesc = existingTable.copy(schema = new StructType()), + tableDesc = existingTable, query = Some(newQuery)) // Here we normalize partition, bucket and sort column names, w.r.t. the case sensitivity diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala index 953604e4ac..bf7fabe332 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala @@ -19,11 +19,31 @@ package org.apache.spark.sql.sources import java.io.File +import org.apache.hadoop.mapreduce.TaskAttemptContext + +import org.apache.spark.internal.Logging import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.util.Utils +private class OnlyDetectCustomPathFileCommitProtocol(jobId: String, path: String, isAppend: Boolean) + extends SQLHadoopMapReduceCommitProtocol(jobId, path, isAppend) + with Serializable with Logging { + + override def newTaskTempFileAbsPath( + taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = { + if (isAppend) { + throw new Exception("append data to an existed partitioned table, " + + "there should be no custom partition path sent to Task") + } + + super.newTaskTempFileAbsPath(taskContext, absoluteDir, ext) + } +} + class PartitionedWriteSuite extends QueryTest with SharedSQLContext { import testImplicits._ @@ -92,6 +112,18 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext { } } + test("append data to an existed partitioned table without custom partition path") { + withTable("t") { + withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key -> + classOf[OnlyDetectCustomPathFileCommitProtocol].getName) { + Seq((1, 2)).toDF("a", "b").write.partitionBy("b").saveAsTable("t") + // if custom partition path is detected by the task, it will throw an Exception + // from OnlyDetectCustomPathFileCommitProtocol above. + Seq((3, 2)).toDF("a", "b").write.mode("append").partitionBy("b").saveAsTable("t") + } + } + } + /** Lists files recursively. */ private def recursiveList(f: File): Array[File] = { require(f.isDirectory) |