aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-04-26 23:42:42 -0700
committerYin Huai <yhuai@databricks.com>2016-04-26 23:42:42 -0700
commitd73d67f623dd65b90d0edbd7ba62e9a6ce7ebd1e (patch)
treee7db7878de1d7856b90b02f39df14e86f1460e84
parentb2a45606481a6da6e1f68d14d1095a8dcf2a0e57 (diff)
downloadspark-d73d67f623dd65b90d0edbd7ba62e9a6ce7ebd1e.tar.gz
spark-d73d67f623dd65b90d0edbd7ba62e9a6ce7ebd1e.tar.bz2
spark-d73d67f623dd65b90d0edbd7ba62e9a6ce7ebd1e.zip
[SPARK-14944][SPARK-14943][SQL] Remove HiveConf from HiveTableScanExec, HiveTableReader, and ScriptTransformation
## What changes were proposed in this pull request? This patch removes HiveConf from HiveTableScanExec and HiveTableReader and instead just uses our own configuration system. I'm splitting the large change of removing HiveConf into multiple independent pull requests because it is very difficult to debug test failures when they are all combined in one giant one. ## How was this patch tested? Should be covered by existing tests. Author: Reynold Xin <rxin@databricks.com> Closes #12727 from rxin/SPARK-14944.
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala1
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala9
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala32
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala13
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala20
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala8
6 files changed, 37 insertions, 46 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 9633f9e15b..f071df7581 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -107,7 +107,6 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
new SparkPlanner(sparkSession.sparkContext, conf, experimentalMethods.extraStrategies)
with HiveStrategies {
override val sparkSession: SparkSession = self.sparkSession
- override val hiveconf: HiveConf = self.hiveconf
override def strategies: Seq[Strategy] = {
experimentalMethods.extraStrategies ++ Seq(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 7d1f87f390..71b180e55b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -17,16 +17,12 @@
package org.apache.spark.sql.hive
-import org.apache.hadoop.hive.conf.HiveConf
-
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hive.execution._
private[hive] trait HiveStrategies {
@@ -34,13 +30,12 @@ private[hive] trait HiveStrategies {
self: SparkPlanner =>
val sparkSession: SparkSession
- val hiveconf: HiveConf
object Scripts extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.ScriptTransformation(input, script, output, child, ioschema) =>
val hiveIoSchema = HiveScriptIOSchema(ioschema)
- ScriptTransformation(input, script, output, planLater(child), hiveIoSchema)(hiveconf) :: Nil
+ ScriptTransformation(input, script, output, planLater(child), hiveIoSchema) :: Nil
case _ => Nil
}
}
@@ -78,7 +73,7 @@ private[hive] trait HiveStrategies {
projectList,
otherPredicates,
identity[Seq[Expression]],
- HiveTableScanExec(_, relation, pruningPredicates)(sparkSession, hiveconf)) :: Nil
+ HiveTableScanExec(_, relation, pruningPredicates)(sparkSession)) :: Nil
case _ =>
Nil
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index af0317f7a1..df6abc258b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -17,17 +17,14 @@
package org.apache.spark.sql.hive
-import java.util
-
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, PathFilter}
-import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._
import org.apache.hadoop.hive.ql.exec.Utilities
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.serde2.Deserializer
-import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters,
- StructObjectInspector}
+import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, StructObjectInspector}
import org.apache.hadoop.hive.serde2.objectinspector.primitive._
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf}
@@ -62,7 +59,7 @@ class HadoopTableReader(
@transient private val attributes: Seq[Attribute],
@transient private val relation: MetastoreRelation,
@transient private val sparkSession: SparkSession,
- hiveconf: HiveConf)
+ hadoopConf: Configuration)
extends TableReader with Logging {
// Hadoop honors "mapred.map.tasks" as hint, but will ignore when mapred.job.tracker is "local".
@@ -72,12 +69,15 @@ class HadoopTableReader(
private val _minSplitsPerRDD = if (sparkSession.sparkContext.isLocal) {
0 // will splitted based on block by default.
} else {
- math.max(hiveconf.getInt("mapred.map.tasks", 1), sparkSession.sparkContext.defaultMinPartitions)
+ math.max(hadoopConf.getInt("mapred.map.tasks", 1),
+ sparkSession.sparkContext.defaultMinPartitions)
}
- SparkHadoopUtil.get.appendS3AndSparkHadoopConfigurations(sparkSession.sparkContext.conf, hiveconf)
- private val _broadcastedHiveConf =
- sparkSession.sparkContext.broadcast(new SerializableConfiguration(hiveconf))
+ SparkHadoopUtil.get.appendS3AndSparkHadoopConfigurations(
+ sparkSession.sparkContext.conf, hadoopConf)
+
+ private val _broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
override def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow] =
makeRDDForTable(
@@ -105,7 +105,7 @@ class HadoopTableReader(
// Create local references to member variables, so that the entire `this` object won't be
// serialized in the closure below.
val tableDesc = relation.tableDesc
- val broadcastedHiveConf = _broadcastedHiveConf
+ val broadcastedHiveConf = _broadcastedHadoopConf
val tablePath = hiveTable.getPath
val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt)
@@ -162,7 +162,7 @@ class HadoopTableReader(
case (partition, partDeserializer) =>
def updateExistPathSetByPathPattern(pathPatternStr: String) {
val pathPattern = new Path(pathPatternStr)
- val fs = pathPattern.getFileSystem(hiveconf)
+ val fs = pathPattern.getFileSystem(hadoopConf)
val matches = fs.globStatus(pathPattern)
matches.foreach(fileStatus => existPathSet += fileStatus.getPath.toString)
}
@@ -209,7 +209,7 @@ class HadoopTableReader(
// Create local references so that the outer object isn't serialized.
val tableDesc = relation.tableDesc
- val broadcastedHiveConf = _broadcastedHiveConf
+ val broadcastedHiveConf = _broadcastedHadoopConf
val localDeserializer = partDeserializer
val mutableRow = new SpecificMutableRow(attributes.map(_.dataType))
@@ -259,7 +259,7 @@ class HadoopTableReader(
private def applyFilterIfNeeded(path: Path, filterOpt: Option[PathFilter]): String = {
filterOpt match {
case Some(filter) =>
- val fs = path.getFileSystem(hiveconf)
+ val fs = path.getFileSystem(hadoopConf)
val filteredFiles = fs.listStatus(path, filter).map(_.getPath.toString)
filteredFiles.mkString(",")
case None => path.toString
@@ -279,7 +279,7 @@ class HadoopTableReader(
val rdd = new HadoopRDD(
sparkSession.sparkContext,
- _broadcastedHiveConf.asInstanceOf[Broadcast[SerializableConfiguration]],
+ _broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]],
Some(initializeJobConfFunc),
inputFormatClass,
classOf[Writable],
@@ -302,7 +302,7 @@ private[hive] object HiveTableUtil {
val storageHandler =
org.apache.hadoop.hive.ql.metadata.HiveUtils.getStorageHandler(jobConf, property)
if (storageHandler != null) {
- val jobProperties = new util.LinkedHashMap[String, String]
+ val jobProperties = new java.util.LinkedHashMap[String, String]
if (input) {
storageHandler.configureInputJobProperties(tableDesc, jobProperties)
} else {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
index cc5bbf59db..007c3384e5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.execution
import scala.collection.JavaConverters._
-import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition}
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.objectinspector._
@@ -48,8 +48,7 @@ case class HiveTableScanExec(
requestedAttributes: Seq[Attribute],
relation: MetastoreRelation,
partitionPruningPred: Seq[Expression])(
- @transient private val sparkSession: SparkSession,
- @transient private val hiveconf: HiveConf)
+ @transient private val sparkSession: SparkSession)
extends LeafExecNode {
require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned,
@@ -77,20 +76,20 @@ case class HiveTableScanExec(
// Create a local copy of hiveconf,so that scan specific modifications should not impact
// other queries
@transient
- private[this] val hiveExtraConf = new HiveConf(hiveconf)
+ private[this] val hadoopConf = sparkSession.sessionState.newHadoopConf()
// append columns ids and names before broadcast
- addColumnMetadataToConf(hiveExtraConf)
+ addColumnMetadataToConf(hadoopConf)
@transient
private[this] val hadoopReader =
- new HadoopTableReader(attributes, relation, sparkSession, hiveExtraConf)
+ new HadoopTableReader(attributes, relation, sparkSession, hadoopConf)
private[this] def castFromString(value: String, dataType: DataType) = {
Cast(Literal(value), dataType).eval(null)
}
- private def addColumnMetadataToConf(hiveConf: HiveConf) {
+ private def addColumnMetadataToConf(hiveConf: Configuration) {
// Specifies needed column IDs for those non-partitioning columns.
val neededColumnIDs = attributes.flatMap(relation.columnOrdinals.get).map(o => o: Integer)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index f27337eb36..f6e6a75c3e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -26,7 +26,6 @@ import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.exec.{RecordReader, RecordWriter}
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.AbstractSerDe
@@ -58,17 +57,14 @@ case class ScriptTransformation(
script: String,
output: Seq[Attribute],
child: SparkPlan,
- ioschema: HiveScriptIOSchema)(@transient private val hiveconf: HiveConf)
+ ioschema: HiveScriptIOSchema)
extends UnaryExecNode {
- override protected def otherCopyArgs: Seq[HiveConf] = hiveconf :: Nil
-
override def producedAttributes: AttributeSet = outputSet -- inputSet
- private val serializedHiveConf = new SerializableConfiguration(hiveconf)
-
protected override def doExecute(): RDD[InternalRow] = {
- def processIterator(inputIterator: Iterator[InternalRow]): Iterator[InternalRow] = {
+ def processIterator(inputIterator: Iterator[InternalRow], hadoopConf: Configuration)
+ : Iterator[InternalRow] = {
val cmd = List("/bin/bash", "-c", script)
val builder = new ProcessBuilder(cmd.asJava)
@@ -76,7 +72,6 @@ case class ScriptTransformation(
val inputStream = proc.getInputStream
val outputStream = proc.getOutputStream
val errorStream = proc.getErrorStream
- val localHiveConf = serializedHiveConf.value
// In order to avoid deadlocks, we need to consume the error output of the child process.
// To avoid issues caused by large error output, we use a circular buffer to limit the amount
@@ -107,7 +102,7 @@ case class ScriptTransformation(
proc,
stderrBuffer,
TaskContext.get(),
- localHiveConf
+ hadoopConf
)
// This nullability is a performance optimization in order to avoid an Option.foreach() call
@@ -122,7 +117,7 @@ case class ScriptTransformation(
val scriptOutputStream = new DataInputStream(inputStream)
@Nullable val scriptOutputReader =
- ioschema.recordReader(scriptOutputStream, localHiveConf).orNull
+ ioschema.recordReader(scriptOutputStream, hadoopConf).orNull
var scriptOutputWritable: Writable = null
val reusedWritableObject: Writable = if (null != outputSerde) {
@@ -214,10 +209,13 @@ case class ScriptTransformation(
outputIterator
}
+ val broadcastedHadoopConf =
+ new SerializableConfiguration(sqlContext.sessionState.newHadoopConf())
+
child.execute().mapPartitions { iter =>
if (iter.hasNext) {
val proj = UnsafeProjection.create(schema)
- processIterator(iter).map(proj)
+ processIterator(iter, broadcastedHadoopConf.value).map(proj)
} else {
// If the input iterator has no rows then do not launch the external script.
Iterator.empty
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
index 1a15fb741a..19e8025d6b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
@@ -58,7 +58,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
output = Seq(AttributeReference("a", StringType)()),
child = child,
ioschema = noSerdeIOSchema
- )(hiveContext.sessionState.hiveconf),
+ ),
rowsDf.collect())
}
@@ -72,7 +72,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
output = Seq(AttributeReference("a", StringType)()),
child = child,
ioschema = serdeIOSchema
- )(hiveContext.sessionState.hiveconf),
+ ),
rowsDf.collect())
}
@@ -87,7 +87,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
output = Seq(AttributeReference("a", StringType)()),
child = ExceptionInjectingOperator(child),
ioschema = noSerdeIOSchema
- )(hiveContext.sessionState.hiveconf),
+ ),
rowsDf.collect())
}
assert(e.getMessage().contains("intentional exception"))
@@ -104,7 +104,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
output = Seq(AttributeReference("a", StringType)()),
child = ExceptionInjectingOperator(child),
ioschema = serdeIOSchema
- )(hiveContext.sessionState.hiveconf),
+ ),
rowsDf.collect())
}
assert(e.getMessage().contains("intentional exception"))