aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-27 09:30:57 -0700
committerReynold Xin <rxin@databricks.com>2016-04-27 09:30:57 -0700
commitea017b5574cd26b90f586a3f5bf8ccab3fe02e4b (patch)
tree806fec7f3a888f3f1682150a06ddc33c6066e509
parent08dc89361d4d78cfb5153b708bc563e34390cfc0 (diff)
downloadspark-ea017b5574cd26b90f586a3f5bf8ccab3fe02e4b.tar.gz
spark-ea017b5574cd26b90f586a3f5bf8ccab3fe02e4b.tar.bz2
spark-ea017b5574cd26b90f586a3f5bf8ccab3fe02e4b.zip
[SPARK-14949][SQL] Remove HiveConf dependency from InsertIntoHiveTable
## What changes were proposed in this pull request? This patch removes the use of HiveConf from InsertIntoHiveTable. I think this is the last major use of HiveConf and after this we can try to remove the execution HiveConf. ## How was this patch tested? Internal refactoring and should be covered by existing tests. Author: Reynold Xin <rxin@databricks.com> Closes #12728 from rxin/SPARK-14949.
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala30
1 files changed, 14 insertions, 16 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 3cb6081687..cba10caf98 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -21,8 +21,6 @@ import java.util
import scala.collection.JavaConverters._
-import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}
@@ -35,7 +33,7 @@ import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
import org.apache.spark.SparkException
import org.apache.spark.util.SerializableJobConf
-private[hive]
+
case class InsertIntoHiveTable(
table: MetastoreRelation,
partition: Map[String, Option[String]],
@@ -45,8 +43,6 @@ case class InsertIntoHiveTable(
@transient private val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
@transient private val client = sessionState.metadataHive
- @transient private val hiveconf = sessionState.hiveconf
- @transient private lazy val hiveContext = new Context(hiveconf)
def output: Seq[Attribute] = Seq.empty
@@ -70,7 +66,6 @@ case class InsertIntoHiveTable(
writerContainer.driverSideSetup()
sqlContext.sparkContext.runJob(rdd, writerContainer.writeToFile _)
writerContainer.commitJob()
-
}
/**
@@ -85,19 +80,20 @@ case class InsertIntoHiveTable(
// instances within the closure, since Serializer is not serializable while TableDesc is.
val tableDesc = table.tableDesc
val tableLocation = table.hiveQlTable.getDataLocation
- val tmpLocation = hiveContext.getExternalTmpPath(tableLocation)
+ val hadoopConf = sessionState.newHadoopConf()
+ val tmpLocation = new Context(hadoopConf).getExternalTmpPath(tableLocation)
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
- val isCompressed = hiveconf.getBoolean(
- ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal)
+ val isCompressed =
+ sessionState.conf.getConfString("hive.exec.compress.output", "false").toBoolean
if (isCompressed) {
// Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
// and "mapred.output.compression.type" have no impact on ORC because it uses table properties
// to store compression information.
- hiveconf.set("mapred.output.compress", "true")
+ hadoopConf.set("mapred.output.compress", "true")
fileSinkConf.setCompressed(true)
- fileSinkConf.setCompressCodec(hiveconf.get("mapred.output.compression.codec"))
- fileSinkConf.setCompressType(hiveconf.get("mapred.output.compression.type"))
+ fileSinkConf.setCompressCodec(hadoopConf.get("mapred.output.compression.codec"))
+ fileSinkConf.setCompressType(hadoopConf.get("mapred.output.compression.type"))
}
val numDynamicPartitions = partition.values.count(_.isEmpty)
@@ -114,13 +110,15 @@ case class InsertIntoHiveTable(
// Validate partition spec if there exist any dynamic partitions
if (numDynamicPartitions > 0) {
// Report error if dynamic partitioning is not enabled
- if (!hiveconf.getBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING)) {
+ if (!sessionState.conf.getConfString("hive.exec.dynamic.partition", "true").toBoolean) {
throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg)
}
// Report error if dynamic partition strict mode is on but no static partition is found
- if (numStaticPartitions == 0 && hiveconf.getVar(
- HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) {
+ if (numStaticPartitions == 0 &&
+ sessionState.conf.getConfString(
+ "hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict"))
+ {
throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg)
}
@@ -131,7 +129,7 @@ case class InsertIntoHiveTable(
}
}
- val jobConf = new JobConf(hiveconf)
+ val jobConf = new JobConf(hadoopConf)
val jobConfSer = new SerializableJobConf(jobConf)
// When speculation is on and output committer class name contains "Direct", we should warn