aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-09-20 09:53:28 -0700
committerYin Huai <yhuai@databricks.com>2016-09-20 09:53:28 -0700
commiteb004c66200da7df36dd0a9a11999fc352197916 (patch)
tree62ca0ab65e791c83357d73934b513b7da133da10
parentd5ec5dbb0dc0358b0394626c80781e422f9af581 (diff)
downloadspark-eb004c66200da7df36dd0a9a11999fc352197916.tar.gz
spark-eb004c66200da7df36dd0a9a11999fc352197916.tar.bz2
spark-eb004c66200da7df36dd0a9a11999fc352197916.zip
[SPARK-17051][SQL] we should use hadoopConf in InsertIntoHiveTable
## What changes were proposed in this pull request? Hive confs in hive-site.xml will be loaded in `hadoopConf`, so we should use `hadoopConf` in `InsertIntoHiveTable` instead of `SessionState.conf` ## How was this patch tested? N/A Author: Wenchen Fan <wenchen@databricks.com> Closes #14634 from cloud-fan/bug.
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala9
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala32
2 files changed, 31 insertions, 10 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 7eec9c787c..53bb3b93db 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -147,8 +147,7 @@ case class InsertIntoHiveTable(
val hadoopConf = sessionState.newHadoopConf()
val tmpLocation = getExternalTmpPath(tableLocation, hadoopConf)
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
- val isCompressed =
- sessionState.conf.getConfString("hive.exec.compress.output", "false").toBoolean
+ val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean
if (isCompressed) {
// Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
@@ -182,15 +181,13 @@ case class InsertIntoHiveTable(
// Validate partition spec if there exist any dynamic partitions
if (numDynamicPartitions > 0) {
// Report error if dynamic partitioning is not enabled
- if (!sessionState.conf.getConfString("hive.exec.dynamic.partition", "true").toBoolean) {
+ if (!hadoopConf.get("hive.exec.dynamic.partition", "true").toBoolean) {
throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg)
}
// Report error if dynamic partition strict mode is on but no static partition is found
if (numStaticPartitions == 0 &&
- sessionState.conf.getConfString(
- "hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict"))
- {
+ hadoopConf.get("hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict")) {
throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 1d1a958d3f..2b945dbbe0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -26,16 +26,17 @@ import scala.util.Try
import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.scalatest.BeforeAndAfter
-import org.apache.spark.{SparkException, SparkFiles}
-import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
+import org.apache.spark.SparkFiles
+import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.Cast
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.execution.joins.BroadcastNestedLoopJoinExec
import org.apache.spark.sql.hive._
-import org.apache.spark.sql.hive.test.{TestHive, TestHiveContext}
+import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SQLTestUtils
case class TestData(a: Int, b: String)
@@ -43,7 +44,7 @@ case class TestData(a: Int, b: String)
* A set of test cases expressed in Hive QL that are not covered by the tests
* included in the hive distribution.
*/
-class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
+class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAndAfter {
private val originalTimeZone = TimeZone.getDefault
private val originalLocale = Locale.getDefault
@@ -51,6 +52,8 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
private val originalCrossJoinEnabled = TestHive.conf.crossJoinEnabled
+ def spark: SparkSession = sparkSession
+
override def beforeAll() {
super.beforeAll()
TestHive.setCacheTables(true)
@@ -1199,6 +1202,27 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter {
}
assertUnsupportedFeature { sql("DROP TEMPORARY MACRO SIGMOID") }
}
+
+ test("dynamic partitioning is allowed when hive.exec.dynamic.partition.mode is nonstrict") {
+ val modeConfKey = "hive.exec.dynamic.partition.mode"
+ withTable("with_parts") {
+ sql("CREATE TABLE with_parts(key INT) PARTITIONED BY (p INT)")
+
+ withSQLConf(modeConfKey -> "nonstrict") {
+ sql("INSERT OVERWRITE TABLE with_parts partition(p) select 1, 2")
+ assert(spark.table("with_parts").filter($"p" === 2).collect().head == Row(1, 2))
+ }
+
+ val originalValue = spark.sparkContext.hadoopConfiguration.get(modeConfKey, "nonstrict")
+ try {
+ spark.sparkContext.hadoopConfiguration.set(modeConfKey, "nonstrict")
+ sql("INSERT OVERWRITE TABLE with_parts partition(p) select 3, 4")
+ assert(spark.table("with_parts").filter($"p" === 4).collect().head == Row(3, 4))
+ } finally {
+ spark.sparkContext.hadoopConfiguration.set(modeConfKey, originalValue)
+ }
+ }
+ }
}
// for SPARK-2180 test