From ea017b5574cd26b90f586a3f5bf8ccab3fe02e4b Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 27 Apr 2016 09:30:57 -0700 Subject: [SPARK-14949][SQL] Remove HiveConf dependency from InsertIntoHiveTable ## What changes were proposed in this pull request? This patch removes the use of HiveConf from InsertIntoHiveTable. I think this is the last major use of HiveConf and after this we can try to remove the execution HiveConf. ## How was this patch tested? Internal refactoring and should be covered by existing tests. Author: Reynold Xin Closes #12728 from rxin/SPARK-14949. --- .../sql/hive/execution/InsertIntoHiveTable.scala | 30 ++++++++++------------ 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala index 3cb6081687..cba10caf98 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -21,8 +21,6 @@ import java.util import scala.collection.JavaConverters._ -import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.conf.HiveConf.ConfVars import org.apache.hadoop.hive.ql.{Context, ErrorMsg} import org.apache.hadoop.mapred.{FileOutputFormat, JobConf} @@ -35,7 +33,7 @@ import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc} import org.apache.spark.SparkException import org.apache.spark.util.SerializableJobConf -private[hive] + case class InsertIntoHiveTable( table: MetastoreRelation, partition: Map[String, Option[String]], @@ -45,8 +43,6 @@ case class InsertIntoHiveTable( @transient private val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState] @transient private val client = sessionState.metadataHive - @transient private val hiveconf = sessionState.hiveconf - @transient private lazy val hiveContext = new Context(hiveconf) def output: Seq[Attribute] = Seq.empty @@ -70,7 +66,6 @@ case class InsertIntoHiveTable( writerContainer.driverSideSetup() sqlContext.sparkContext.runJob(rdd, writerContainer.writeToFile _) writerContainer.commitJob() - } /** @@ -85,19 +80,20 @@ case class InsertIntoHiveTable( // instances within the closure, since Serializer is not serializable while TableDesc is. val tableDesc = table.tableDesc val tableLocation = table.hiveQlTable.getDataLocation - val tmpLocation = hiveContext.getExternalTmpPath(tableLocation) + val hadoopConf = sessionState.newHadoopConf() + val tmpLocation = new Context(hadoopConf).getExternalTmpPath(tableLocation) val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) - val isCompressed = hiveconf.getBoolean( - ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal) + val isCompressed = + sessionState.conf.getConfString("hive.exec.compress.output", "false").toBoolean if (isCompressed) { // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec", // and "mapred.output.compression.type" have no impact on ORC because it uses table properties // to store compression information. - hiveconf.set("mapred.output.compress", "true") + hadoopConf.set("mapred.output.compress", "true") fileSinkConf.setCompressed(true) - fileSinkConf.setCompressCodec(hiveconf.get("mapred.output.compression.codec")) - fileSinkConf.setCompressType(hiveconf.get("mapred.output.compression.type")) + fileSinkConf.setCompressCodec(hadoopConf.get("mapred.output.compression.codec")) + fileSinkConf.setCompressType(hadoopConf.get("mapred.output.compression.type")) } val numDynamicPartitions = partition.values.count(_.isEmpty) @@ -114,13 +110,15 @@ case class InsertIntoHiveTable( // Validate partition spec if there exist any dynamic partitions if (numDynamicPartitions > 0) { // Report error if dynamic partitioning is not enabled - if (!hiveconf.getBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING)) { + if (!sessionState.conf.getConfString("hive.exec.dynamic.partition", "true").toBoolean) { throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg) } // Report error if dynamic partition strict mode is on but no static partition is found - if (numStaticPartitions == 0 && hiveconf.getVar( - HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) { + if (numStaticPartitions == 0 && + sessionState.conf.getConfString( + "hive.exec.dynamic.partition.mode", "strict").equalsIgnoreCase("strict")) + { throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg) } @@ -131,7 +129,7 @@ case class InsertIntoHiveTable( } } - val jobConf = new JobConf(hiveconf) + val jobConf = new JobConf(hadoopConf) val jobConfSer = new SerializableJobConf(jobConf) // When speculation is on and output committer class name contains "Direct", we should warn -- cgit v1.2.3