diff options
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala | 12 | ||||
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala | 37 |
2 files changed, 46 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 60a9d1f020..e6fc9749c7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -21,8 +21,6 @@ import java.util.Properties import scala.collection.JavaConverters._ -import org.apache.hadoop.fs.Path - import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project} @@ -243,7 +241,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { private def insertInto(tableIdent: TableIdentifier): Unit = { assertNotBucketed("insertInto") - val partitions = normalizedParCols.map(_.map(col => col -> (Option.empty[String])).toMap) + + if (partitioningColumns.isDefined) { + throw new AnalysisException( + "insertInto() can't be used together with partitionBy(). " + + "Partition columns are defined by the table into which is being inserted." + ) + } + + val partitions = normalizedParCols.map(_.map(col => col -> Option.empty[String]).toMap) val overwrite = mode == SaveMode.Overwrite // A partitioned relation's schema can be different from the input logicalPlan, since diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala index 3bf45ced75..b890b4bffd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala @@ -346,6 +346,43 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef } } + private def testPartitionedHiveSerDeTable(testName: String)(f: String => Unit): Unit = { + test(s"Hive SerDe table - $testName") { + val hiveTable = "hive_table" + + withTable(hiveTable) { + withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") { + sql(s"CREATE TABLE $hiveTable (a INT) PARTITIONED BY (b INT, c INT) STORED AS TEXTFILE") + f(hiveTable) + } + } + } + } + + private def testPartitionedDataSourceTable(testName: String)(f: String => Unit): Unit = { + test(s"Data source table - $testName") { + val dsTable = "ds_table" + + withTable(dsTable) { + sql(s"CREATE TABLE $dsTable (a INT, b INT, c INT) USING PARQUET PARTITIONED BY (b, c)") + f(dsTable) + } + } + } + + private def testPartitionedTable(testName: String)(f: String => Unit): Unit = { + testPartitionedHiveSerDeTable(testName)(f) + testPartitionedDataSourceTable(testName)(f) + } + + testPartitionedTable("partitionBy() can't be used together with insertInto()") { tableName => + val cause = intercept[AnalysisException] { + Seq((1, 2, 3)).toDF("a", "b", "c").write.partitionBy("b", "c").insertInto(tableName) + } + + assert(cause.getMessage.contains("insertInto() can't be used together with partitionBy().")) + } + test("InsertIntoTable#resolved should include dynamic partitions") { withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) { sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)") |