aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorwindpiger <songjun@outlook.com>2017-01-23 19:06:04 +0800
committerWenchen Fan <wenchen@databricks.com>2017-01-23 19:06:04 +0800
commit0ef1421a645be79857ef96a90464e0e669190dcf (patch)
tree79d68689ca3539d25977714e15cea35307092877 /sql
parentc99492141b1ddddb8edb6841a6e83748e5ba9bba (diff)
downloadspark-0ef1421a645be79857ef96a90464e0e669190dcf.tar.gz
spark-0ef1421a645be79857ef96a90464e0e669190dcf.tar.bz2
spark-0ef1421a645be79857ef96a90464e0e669190dcf.zip
[SPARK-19284][SQL] append to partitioned datasource table should without custom partition location
## What changes were proposed in this pull request? when we append data to a existed partitioned datasource table, the InsertIntoHadoopFsRelationCommand.getCustomPartitionLocations currently return the same location with Hive default, it should return None. ## How was this patch tested? Author: windpiger <songjun@outlook.com> Closes #16642 from windpiger/appendSchema.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala32
3 files changed, 36 insertions, 5 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 052efe5edf..5abd579476 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -122,7 +122,6 @@ case class CreateDataSourceTableAsSelectCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
assert(table.tableType != CatalogTableType.VIEW)
assert(table.provider.isDefined)
- assert(table.schema.isEmpty)
val sessionState = sparkSession.sessionState
val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
@@ -144,6 +143,8 @@ case class CreateDataSourceTableAsSelectCommand(
saveDataIntoTable(
sparkSession, table, table.storage.locationUri, query, mode, tableExists = true)
} else {
+ assert(table.schema.isEmpty)
+
val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
Some(sessionState.catalog.defaultTablePath(table.identifier))
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index beacb08994..f8c7fca028 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -95,7 +95,7 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
case c @ CreateTable(tableDesc, SaveMode.Append, Some(query))
if sparkSession.sessionState.catalog.tableExists(tableDesc.identifier) =>
// This is guaranteed by the parser and `DataFrameWriter`
- assert(tableDesc.schema.isEmpty && tableDesc.provider.isDefined)
+ assert(tableDesc.provider.isDefined)
// Analyze the query in CTAS and then we can do the normalization and checking.
val qe = sparkSession.sessionState.executePlan(query)
@@ -186,9 +186,7 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
}
c.copy(
- // trust everything from the existing table, except schema as we assume it's empty in a lot
- // of places, when we do CTAS.
- tableDesc = existingTable.copy(schema = new StructType()),
+ tableDesc = existingTable,
query = Some(newQuery))
// Here we normalize partition, bucket and sort column names, w.r.t. the case sensitivity
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
index 953604e4ac..bf7fabe332 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/PartitionedWriteSuite.scala
@@ -19,11 +19,31 @@ package org.apache.spark.sql.sources
import java.io.File
+import org.apache.hadoop.mapreduce.TaskAttemptContext
+
+import org.apache.spark.internal.Logging
import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol
import org.apache.spark.sql.functions._
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.Utils
+private class OnlyDetectCustomPathFileCommitProtocol(jobId: String, path: String, isAppend: Boolean)
+ extends SQLHadoopMapReduceCommitProtocol(jobId, path, isAppend)
+ with Serializable with Logging {
+
+ override def newTaskTempFileAbsPath(
+ taskContext: TaskAttemptContext, absoluteDir: String, ext: String): String = {
+ if (isAppend) {
+ throw new Exception("append data to an existed partitioned table, " +
+ "there should be no custom partition path sent to Task")
+ }
+
+ super.newTaskTempFileAbsPath(taskContext, absoluteDir, ext)
+ }
+}
+
class PartitionedWriteSuite extends QueryTest with SharedSQLContext {
import testImplicits._
@@ -92,6 +112,18 @@ class PartitionedWriteSuite extends QueryTest with SharedSQLContext {
}
}
+ test("append data to an existed partitioned table without custom partition path") {
+ withTable("t") {
+ withSQLConf(SQLConf.FILE_COMMIT_PROTOCOL_CLASS.key ->
+ classOf[OnlyDetectCustomPathFileCommitProtocol].getName) {
+ Seq((1, 2)).toDF("a", "b").write.partitionBy("b").saveAsTable("t")
+ // if custom partition path is detected by the task, it will throw an Exception
+ // from OnlyDetectCustomPathFileCommitProtocol above.
+ Seq((3, 2)).toDF("a", "b").write.mode("append").partitionBy("b").saveAsTable("t")
+ }
+ }
+ }
+
/** Lists files recursively. */
private def recursiveList(f: File): Array[File] = {
require(f.isDirectory)