aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala1
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala9
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala32
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala13
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala20
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala8
6 files changed, 37 insertions, 46 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 9633f9e15b..f071df7581 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -107,7 +107,6 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
new SparkPlanner(sparkSession.sparkContext, conf, experimentalMethods.extraStrategies)
with HiveStrategies {
override val sparkSession: SparkSession = self.sparkSession
- override val hiveconf: HiveConf = self.hiveconf
override def strategies: Seq[Strategy] = {
experimentalMethods.extraStrategies ++ Seq(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 7d1f87f390..71b180e55b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -17,16 +17,12 @@
package org.apache.spark.sql.hive
-import org.apache.hadoop.hive.conf.HiveConf
-
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.hive.execution._
private[hive] trait HiveStrategies {
@@ -34,13 +30,12 @@ private[hive] trait HiveStrategies {
self: SparkPlanner =>
val sparkSession: SparkSession
- val hiveconf: HiveConf
object Scripts extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.ScriptTransformation(input, script, output, child, ioschema) =>
val hiveIoSchema = HiveScriptIOSchema(ioschema)
- ScriptTransformation(input, script, output, planLater(child), hiveIoSchema)(hiveconf) :: Nil
+ ScriptTransformation(input, script, output, planLater(child), hiveIoSchema) :: Nil
case _ => Nil
}
}
@@ -78,7 +73,7 @@ private[hive] trait HiveStrategies {
projectList,
otherPredicates,
identity[Seq[Expression]],
- HiveTableScanExec(_, relation, pruningPredicates)(sparkSession, hiveconf)) :: Nil
+ HiveTableScanExec(_, relation, pruningPredicates)(sparkSession)) :: Nil
case _ =>
Nil
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index af0317f7a1..df6abc258b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -17,17 +17,14 @@
package org.apache.spark.sql.hive
-import java.util
-
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, PathFilter}
-import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._
import org.apache.hadoop.hive.ql.exec.Utilities
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.serde2.Deserializer
-import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters,
- StructObjectInspector}
+import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorConverters, StructObjectInspector}
import org.apache.hadoop.hive.serde2.objectinspector.primitive._
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf}
@@ -62,7 +59,7 @@ class HadoopTableReader(
@transient private val attributes: Seq[Attribute],
@transient private val relation: MetastoreRelation,
@transient private val sparkSession: SparkSession,
- hiveconf: HiveConf)
+ hadoopConf: Configuration)
extends TableReader with Logging {
// Hadoop honors "mapred.map.tasks" as hint, but will ignore when mapred.job.tracker is "local".
@@ -72,12 +69,15 @@ class HadoopTableReader(
private val _minSplitsPerRDD = if (sparkSession.sparkContext.isLocal) {
0 // will splitted based on block by default.
} else {
- math.max(hiveconf.getInt("mapred.map.tasks", 1), sparkSession.sparkContext.defaultMinPartitions)
+ math.max(hadoopConf.getInt("mapred.map.tasks", 1),
+ sparkSession.sparkContext.defaultMinPartitions)
}
- SparkHadoopUtil.get.appendS3AndSparkHadoopConfigurations(sparkSession.sparkContext.conf, hiveconf)
- private val _broadcastedHiveConf =
- sparkSession.sparkContext.broadcast(new SerializableConfiguration(hiveconf))
+ SparkHadoopUtil.get.appendS3AndSparkHadoopConfigurations(
+ sparkSession.sparkContext.conf, hadoopConf)
+
+ private val _broadcastedHadoopConf =
+ sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))
override def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow] =
makeRDDForTable(
@@ -105,7 +105,7 @@ class HadoopTableReader(
// Create local references to member variables, so that the entire `this` object won't be
// serialized in the closure below.
val tableDesc = relation.tableDesc
- val broadcastedHiveConf = _broadcastedHiveConf
+ val broadcastedHiveConf = _broadcastedHadoopConf
val tablePath = hiveTable.getPath
val inputPathStr = applyFilterIfNeeded(tablePath, filterOpt)
@@ -162,7 +162,7 @@ class HadoopTableReader(
case (partition, partDeserializer) =>
def updateExistPathSetByPathPattern(pathPatternStr: String) {
val pathPattern = new Path(pathPatternStr)
- val fs = pathPattern.getFileSystem(hiveconf)
+ val fs = pathPattern.getFileSystem(hadoopConf)
val matches = fs.globStatus(pathPattern)
matches.foreach(fileStatus => existPathSet += fileStatus.getPath.toString)
}
@@ -209,7 +209,7 @@ class HadoopTableReader(
// Create local references so that the outer object isn't serialized.
val tableDesc = relation.tableDesc
- val broadcastedHiveConf = _broadcastedHiveConf
+ val broadcastedHiveConf = _broadcastedHadoopConf
val localDeserializer = partDeserializer
val mutableRow = new SpecificMutableRow(attributes.map(_.dataType))
@@ -259,7 +259,7 @@ class HadoopTableReader(
private def applyFilterIfNeeded(path: Path, filterOpt: Option[PathFilter]): String = {
filterOpt match {
case Some(filter) =>
- val fs = path.getFileSystem(hiveconf)
+ val fs = path.getFileSystem(hadoopConf)
val filteredFiles = fs.listStatus(path, filter).map(_.getPath.toString)
filteredFiles.mkString(",")
case None => path.toString
@@ -279,7 +279,7 @@ class HadoopTableReader(
val rdd = new HadoopRDD(
sparkSession.sparkContext,
- _broadcastedHiveConf.asInstanceOf[Broadcast[SerializableConfiguration]],
+ _broadcastedHadoopConf.asInstanceOf[Broadcast[SerializableConfiguration]],
Some(initializeJobConfFunc),
inputFormatClass,
classOf[Writable],
@@ -302,7 +302,7 @@ private[hive] object HiveTableUtil {
val storageHandler =
org.apache.hadoop.hive.ql.metadata.HiveUtils.getStorageHandler(jobConf, property)
if (storageHandler != null) {
- val jobProperties = new util.LinkedHashMap[String, String]
+ val jobProperties = new java.util.LinkedHashMap[String, String]
if (input) {
storageHandler.configureInputJobProperties(tableDesc, jobProperties)
} else {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
index cc5bbf59db..007c3384e5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.execution
import scala.collection.JavaConverters._
-import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition}
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.objectinspector._
@@ -48,8 +48,7 @@ case class HiveTableScanExec(
requestedAttributes: Seq[Attribute],
relation: MetastoreRelation,
partitionPruningPred: Seq[Expression])(
- @transient private val sparkSession: SparkSession,
- @transient private val hiveconf: HiveConf)
+ @transient private val sparkSession: SparkSession)
extends LeafExecNode {
require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned,
@@ -77,20 +76,20 @@ case class HiveTableScanExec(
// Create a local copy of hiveconf,so that scan specific modifications should not impact
// other queries
@transient
- private[this] val hiveExtraConf = new HiveConf(hiveconf)
+ private[this] val hadoopConf = sparkSession.sessionState.newHadoopConf()
// append columns ids and names before broadcast
- addColumnMetadataToConf(hiveExtraConf)
+ addColumnMetadataToConf(hadoopConf)
@transient
private[this] val hadoopReader =
- new HadoopTableReader(attributes, relation, sparkSession, hiveExtraConf)
+ new HadoopTableReader(attributes, relation, sparkSession, hadoopConf)
private[this] def castFromString(value: String, dataType: DataType) = {
Cast(Literal(value), dataType).eval(null)
}
- private def addColumnMetadataToConf(hiveConf: HiveConf) {
+ private def addColumnMetadataToConf(hiveConf: Configuration) {
// Specifies needed column IDs for those non-partitioning columns.
val neededColumnIDs = attributes.flatMap(relation.columnOrdinals.get).map(o => o: Integer)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index f27337eb36..f6e6a75c3e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -26,7 +26,6 @@ import scala.collection.JavaConverters._
import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hive.conf.HiveConf
import org.apache.hadoop.hive.ql.exec.{RecordReader, RecordWriter}
import org.apache.hadoop.hive.serde.serdeConstants
import org.apache.hadoop.hive.serde2.AbstractSerDe
@@ -58,17 +57,14 @@ case class ScriptTransformation(
script: String,
output: Seq[Attribute],
child: SparkPlan,
- ioschema: HiveScriptIOSchema)(@transient private val hiveconf: HiveConf)
+ ioschema: HiveScriptIOSchema)
extends UnaryExecNode {
- override protected def otherCopyArgs: Seq[HiveConf] = hiveconf :: Nil
-
override def producedAttributes: AttributeSet = outputSet -- inputSet
- private val serializedHiveConf = new SerializableConfiguration(hiveconf)
-
protected override def doExecute(): RDD[InternalRow] = {
- def processIterator(inputIterator: Iterator[InternalRow]): Iterator[InternalRow] = {
+ def processIterator(inputIterator: Iterator[InternalRow], hadoopConf: Configuration)
+ : Iterator[InternalRow] = {
val cmd = List("/bin/bash", "-c", script)
val builder = new ProcessBuilder(cmd.asJava)
@@ -76,7 +72,6 @@ case class ScriptTransformation(
val inputStream = proc.getInputStream
val outputStream = proc.getOutputStream
val errorStream = proc.getErrorStream
- val localHiveConf = serializedHiveConf.value
// In order to avoid deadlocks, we need to consume the error output of the child process.
// To avoid issues caused by large error output, we use a circular buffer to limit the amount
@@ -107,7 +102,7 @@ case class ScriptTransformation(
proc,
stderrBuffer,
TaskContext.get(),
- localHiveConf
+ hadoopConf
)
// This nullability is a performance optimization in order to avoid an Option.foreach() call
@@ -122,7 +117,7 @@ case class ScriptTransformation(
val scriptOutputStream = new DataInputStream(inputStream)
@Nullable val scriptOutputReader =
- ioschema.recordReader(scriptOutputStream, localHiveConf).orNull
+ ioschema.recordReader(scriptOutputStream, hadoopConf).orNull
var scriptOutputWritable: Writable = null
val reusedWritableObject: Writable = if (null != outputSerde) {
@@ -214,10 +209,13 @@ case class ScriptTransformation(
outputIterator
}
+ val broadcastedHadoopConf =
+ new SerializableConfiguration(sqlContext.sessionState.newHadoopConf())
+
child.execute().mapPartitions { iter =>
if (iter.hasNext) {
val proj = UnsafeProjection.create(schema)
- processIterator(iter).map(proj)
+ processIterator(iter, broadcastedHadoopConf.value).map(proj)
} else {
// If the input iterator has no rows then do not launch the external script.
Iterator.empty
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
index 1a15fb741a..19e8025d6b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
@@ -58,7 +58,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
output = Seq(AttributeReference("a", StringType)()),
child = child,
ioschema = noSerdeIOSchema
- )(hiveContext.sessionState.hiveconf),
+ ),
rowsDf.collect())
}
@@ -72,7 +72,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
output = Seq(AttributeReference("a", StringType)()),
child = child,
ioschema = serdeIOSchema
- )(hiveContext.sessionState.hiveconf),
+ ),
rowsDf.collect())
}
@@ -87,7 +87,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
output = Seq(AttributeReference("a", StringType)()),
child = ExceptionInjectingOperator(child),
ioschema = noSerdeIOSchema
- )(hiveContext.sessionState.hiveconf),
+ ),
rowsDf.collect())
}
assert(e.getMessage().contains("intentional exception"))
@@ -104,7 +104,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
output = Seq(AttributeReference("a", StringType)()),
child = ExceptionInjectingOperator(child),
ioschema = serdeIOSchema
- )(hiveContext.sessionState.hiveconf),
+ ),
rowsDf.collect())
}
assert(e.getMessage().contains("intentional exception"))