diff options
author | Reynold Xin <rxin@databricks.com> | 2016-04-26 23:42:42 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-04-26 23:42:42 -0700 |
commit | d73d67f623dd65b90d0edbd7ba62e9a6ce7ebd1e (patch) | |
tree | e7db7878de1d7856b90b02f39df14e86f1460e84 /sql/hive/src/main | |
parent | b2a45606481a6da6e1f68d14d1095a8dcf2a0e57 (diff) | |
download | spark-d73d67f623dd65b90d0edbd7ba62e9a6ce7ebd1e.tar.gz spark-d73d67f623dd65b90d0edbd7ba62e9a6ce7ebd1e.tar.bz2 spark-d73d67f623dd65b90d0edbd7ba62e9a6ce7ebd1e.zip |
[SPARK-14944][SPARK-14943][SQL] Remove HiveConf from HiveTableScanExec, HiveTableReader, and ScriptTransformation
## What changes were proposed in this pull request?
This patch removes HiveConf from HiveTableScanExec and HiveTableReader and instead just uses our own configuration system. I'm splitting the large change of removing HiveConf into multiple independent pull requests because it is very difficult to debug test failures when they are all combined in one giant one.
## How was this patch tested?
Should be covered by existing tests.
Author: Reynold Xin <rxin@databricks.com>
Closes #12727 from rxin/SPARK-14944.
Diffstat (limited to 'sql/hive/src/main')
5 files changed, 33 insertions, 42 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 9633f9e15b..f071df7581 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -107,7 +107,6 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) new SparkPlanner(sparkSession.sparkContext, conf, experimentalMethods.extraStrategies) with HiveStrategies { override val sparkSession: SparkSession = self.sparkSession - override val hiveconf: HiveConf = self.hiveconf override def strategies: Seq[Strategy] = { experimentalMethods.extraStrategies ++ Seq( diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 7d1f87f390..71b180e55b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -17,16 +17,12 @@ package org.apache.spark.sql.hive -import org.apache.hadoop.hive.conf.HiveConf - import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hive.execution._ private[hive] trait HiveStrategies { @@ -34,13 +30,12 @@ private[hive] trait HiveStrategies { self: SparkPlanner => val sparkSession: SparkSession - val hiveconf: HiveConf object Scripts extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.ScriptTransformation(input, script, output, child, ioschema) => val hiveIoSchema = HiveScriptIOSchema(ioschema) - ScriptTransformation(input, script, output, planLater(child), hiveIoSchema)(hiveconf) :: Nil + ScriptTransformation(input, script, output, planLater(child), hiveIoSchema) :: Nil case _ => Nil } } @@ -78,7 +73,7 @@ private[hive] trait HiveStrategies { projectList, otherPredicates, identity[Seq[Expression]], - HiveTableScanExec(_, relation, pruningPredicates)(sparkSession, hiveconf)) :: Nil + HiveTableScanExec(_, relation, pruningPredicates)(sparkSession)) :: Nil case _ => Nil } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index af0317f7a1..df6abc258b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -17,17 +17,14 @@ package org.apache.spark.sql.hive -import java.util - +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, PathFilter} -import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._ import org.apache.hadoop.hive.ql.exec.Utilities import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable} import org.apache.hadoop.hive.ql.plan.TableDesc import org.apache.hadoop.hive.serde2.Deserializer -import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, - StructObjectInspector} +import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, StructObjectInspector} import org.apache.hadoop.hive.serde2.objectinspector.primitive._ import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf} @@ -62,7 +59,7 @@ class HadoopTableReader( @transient private val attributes: Seq[Attribute], @transient private val relation: MetastoreRelation, @transient private val sparkSession: SparkSession, - hiveconf: HiveConf) + hadoopConf: Configuration) extends TableReader with Logging { // Hadoop honors "mapred.map.tasks" as hint, but will ignore when mapred.job.tracker is "local". @@ -72,12 +69,15 @@ class HadoopTableReader( private val _minSplitsPerRDD = if (sparkSession.sparkContext.isLocal) { 0 // will splitted based on block by default. } else { - math.max(hiveconf.getInt("mapred.map.tasks", 1), sparkSession.sparkContext.defaultMinPartitions) + math.max(hadoopConf.getInt("mapred.map.tasks", 1), + sparkSession.sparkContext.defaultMinPartitions) } - SparkHadoopUtil.get.appendS3AndSparkHadoopConfigurations(sparkSession.sparkContext.conf, hiveconf) - private val _broadcastedHiveConf = - sparkSession.sparkContext.broadcast(new SerializableConfiguration(hiveconf)) + SparkHadoopUtil.get.appendS3AndSparkHadoopConfigurations( + sparkSession.sparkContext.conf, hadoopConf) + + private val _broadcastedHadoopConf = + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf)) override def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow] = makeRDDForTable( @@ -105,7 +105,7 @@ class HadoopTableReader( // Create local references to member variables, so that the entire `this` object won't be // serialized in the closure below. val tableDesc = relation.tableDesc - val broadcastedHiveConf = _broadcastedHiveConf + val broadcastedHiveConf = _broadcastedHadoopConf val tablePath = hiveTable.getPath val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt) @@ -162,7 +162,7 @@ class HadoopTableReader( case (partition, partDeserializer) => def updateExistPathSetByPathPattern(pathPatternStr: String) { val pathPattern = new Path(pathPatternStr) - val fs = pathPattern.getFileSystem(hiveconf) + val fs = pathPattern.getFileSystem(hadoopConf) val matches = fs.globStatus(pathPattern) matches.foreach(fileStatus => existPathSet += fileStatus.getPath.toString) } @@ -209,7 +209,7 @@ class HadoopTableReader( // Create local references so that the outer object isn't serialized. val tableDesc = relation.tableDesc - val broadcastedHiveConf = _broadcastedHiveConf + val broadcastedHiveConf = _broadcastedHadoopConf val localDeserializer = partDeserializer val mutableRow = new SpecificMutableRow(attributes.map(_.dataType)) @@ -259,7 +259,7 @@ class HadoopTableReader( private def applyFilterIfNeeded(path: Path, filterOpt: Option[PathFilter]): String = { filterOpt match { case Some(filter) => - val fs = path.getFileSystem(hiveconf) + val fs = path.getFileSystem(hadoopConf) val filteredFiles = fs.listStatus(path, filter).map(_.getPath.toString) filteredFiles.mkString(",") case None => path.toString @@ -279,7 +279,7 @@ class HadoopTableReader( val rdd = new HadoopRDD( sparkSession.sparkContext, - _broadcastedHiveConf.asInstanceOf[Broadcast[SerializableConfiguration]], + _broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]], Some(initializeJobConfFunc), inputFormatClass, classOf[Writable], @@ -302,7 +302,7 @@ private[hive] object HiveTableUtil { val storageHandler = org.apache.hadoop.hive.ql.metadata.HiveUtils.getStorageHandler(jobConf, property) if (storageHandler != null) { - val jobProperties = new util.LinkedHashMap[String, String] + val jobProperties = new java.util.LinkedHashMap[String, String] if (input) { storageHandler.configureInputJobProperties(tableDesc, jobProperties) } else { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index cc5bbf59db..007c3384e5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.execution import scala.collection.JavaConverters._ -import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition} import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.objectinspector._ @@ -48,8 +48,7 @@ case class HiveTableScanExec( requestedAttributes: Seq[Attribute], relation: MetastoreRelation, partitionPruningPred: Seq[Expression])( - @transient private val sparkSession: SparkSession, - @transient private val hiveconf: HiveConf) + @transient private val sparkSession: SparkSession) extends LeafExecNode { require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned, @@ -77,20 +76,20 @@ case class HiveTableScanExec( // Create a local copy of hiveconf,so that scan specific modifications should not impact // other queries @transient - private[this] val hiveExtraConf = new HiveConf(hiveconf) + private[this] val hadoopConf = sparkSession.sessionState.newHadoopConf() // append columns ids and names before broadcast - addColumnMetadataToConf(hiveExtraConf) + addColumnMetadataToConf(hadoopConf) @transient private[this] val hadoopReader = - new HadoopTableReader(attributes, relation, sparkSession, hiveExtraConf) + new HadoopTableReader(attributes, relation, sparkSession, hadoopConf) private[this] def castFromString(value: String, dataType: DataType) = { Cast(Literal(value), dataType).eval(null) } - private def addColumnMetadataToConf(hiveConf: HiveConf) { + private def addColumnMetadataToConf(hiveConf: Configuration) { // Specifies needed column IDs for those non-partitioning columns. val neededColumnIDs = attributes.flatMap(relation.columnOrdinals.get).map(o => o: Integer) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala index f27337eb36..f6e6a75c3e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala @@ -26,7 +26,6 @@ import scala.collection.JavaConverters._ import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.ql.exec.{RecordReader, RecordWriter} import org.apache.hadoop.hive.serde.serdeConstants import org.apache.hadoop.hive.serde2.AbstractSerDe @@ -58,17 +57,14 @@ case class ScriptTransformation( script: String, output: Seq[Attribute], child: SparkPlan, - ioschema: HiveScriptIOSchema)(@transient private val hiveconf: HiveConf) + ioschema: HiveScriptIOSchema) extends UnaryExecNode { - override protected def otherCopyArgs: Seq[HiveConf] = hiveconf :: Nil - override def producedAttributes: AttributeSet = outputSet -- inputSet - private val serializedHiveConf = new SerializableConfiguration(hiveconf) - protected override def doExecute(): RDD[InternalRow] = { - def processIterator(inputIterator: Iterator[InternalRow]): Iterator[InternalRow] = { + def processIterator(inputIterator: Iterator[InternalRow], hadoopConf: Configuration) + : Iterator[InternalRow] = { val cmd = List("/bin/bash", "-c", script) val builder = new ProcessBuilder(cmd.asJava) @@ -76,7 +72,6 @@ case class ScriptTransformation( val inputStream = proc.getInputStream val outputStream = proc.getOutputStream val errorStream = proc.getErrorStream - val localHiveConf = serializedHiveConf.value // In order to avoid deadlocks, we need to consume the error output of the child process. // To avoid issues caused by large error output, we use a circular buffer to limit the amount @@ -107,7 +102,7 @@ case class ScriptTransformation( proc, stderrBuffer, TaskContext.get(), - localHiveConf + hadoopConf ) // This nullability is a performance optimization in order to avoid an Option.foreach() call @@ -122,7 +117,7 @@ case class ScriptTransformation( val scriptOutputStream = new DataInputStream(inputStream) @Nullable val scriptOutputReader = - ioschema.recordReader(scriptOutputStream, localHiveConf).orNull + ioschema.recordReader(scriptOutputStream, hadoopConf).orNull var scriptOutputWritable: Writable = null val reusedWritableObject: Writable = if (null != outputSerde) { @@ -214,10 +209,13 @@ case class ScriptTransformation( outputIterator } + val broadcastedHadoopConf = + new SerializableConfiguration(sqlContext.sessionState.newHadoopConf()) + child.execute().mapPartitions { iter => if (iter.hasNext) { val proj = UnsafeProjection.create(schema) - processIterator(iter).map(proj) + processIterator(iter, broadcastedHadoopConf.value).map(proj) } else { // If the input iterator has no rows then do not launch the external script. Iterator.empty |