aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala12
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala37
2 files changed, 46 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 60a9d1f020..e6fc9749c7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -21,8 +21,6 @@ import java.util.Properties
import scala.collection.JavaConverters._
-import org.apache.hadoop.fs.Path
-
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Project}
@@ -243,7 +241,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
private def insertInto(tableIdent: TableIdentifier): Unit = {
assertNotBucketed("insertInto")
- val partitions = normalizedParCols.map(_.map(col => col -> (Option.empty[String])).toMap)
+
+ if (partitioningColumns.isDefined) {
+ throw new AnalysisException(
+ "insertInto() can't be used together with partitionBy(). " +
+ "Partition columns are defined by the table into which is being inserted."
+ )
+ }
+
+ val partitions = normalizedParCols.map(_.map(col => col -> Option.empty[String]).toMap)
val overwrite = mode == SaveMode.Overwrite
// A partitioned relation's schema can be different from the input logicalPlan, since
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index 3bf45ced75..b890b4bffd 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -346,6 +346,43 @@ class InsertIntoHiveTableSuite extends QueryTest with TestHiveSingleton with Bef
}
}
+ private def testPartitionedHiveSerDeTable(testName: String)(f: String => Unit): Unit = {
+ test(s"Hive SerDe table - $testName") {
+ val hiveTable = "hive_table"
+
+ withTable(hiveTable) {
+ withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
+ sql(s"CREATE TABLE $hiveTable (a INT) PARTITIONED BY (b INT, c INT) STORED AS TEXTFILE")
+ f(hiveTable)
+ }
+ }
+ }
+ }
+
+ private def testPartitionedDataSourceTable(testName: String)(f: String => Unit): Unit = {
+ test(s"Data source table - $testName") {
+ val dsTable = "ds_table"
+
+ withTable(dsTable) {
+ sql(s"CREATE TABLE $dsTable (a INT, b INT, c INT) USING PARQUET PARTITIONED BY (b, c)")
+ f(dsTable)
+ }
+ }
+ }
+
+ private def testPartitionedTable(testName: String)(f: String => Unit): Unit = {
+ testPartitionedHiveSerDeTable(testName)(f)
+ testPartitionedDataSourceTable(testName)(f)
+ }
+
+ testPartitionedTable("partitionBy() can't be used together with insertInto()") { tableName =>
+ val cause = intercept[AnalysisException] {
+ Seq((1, 2, 3)).toDF("a", "b", "c").write.partitionBy("b", "c").insertInto(tableName)
+ }
+
+ assert(cause.getMessage.contains("insertInto() can't be used together with partitionBy()."))
+ }
+
test("InsertIntoTable#resolved should include dynamic partitions") {
withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
sql("CREATE TABLE partitioned (id bigint, data string) PARTITIONED BY (part string)")