aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2017-01-17 23:37:59 -0800
committergatorsmile <gatorsmile@gmail.com>2017-01-17 23:37:59 -0800
commit4494cd9716d64a6c7cfa548abadb5dd0c4c143a6 (patch)
tree6c06c19fb977106fc819a883889e9aa2ffefdcb9 /sql/hive
parente7f982b20d8a1c0db711e0dcfe26b2f39f98dd64 (diff)
downloadspark-4494cd9716d64a6c7cfa548abadb5dd0c4c143a6.tar.gz
spark-4494cd9716d64a6c7cfa548abadb5dd0c4c143a6.tar.bz2
spark-4494cd9716d64a6c7cfa548abadb5dd0c4c143a6.zip
[SPARK-18243][SQL] Port Hive writing to use FileFormat interface
## What changes were proposed in this pull request? Inserting data into Hive tables has its own implementation that is distinct from data sources: `InsertIntoHiveTable`, `SparkHiveWriterContainer` and `SparkHiveDynamicPartitionWriterContainer`. Note that one other major difference is that data source tables write directly to the final destination without using some staging directory, and then Spark itself adds the partitions/tables to the catalog. Hive tables actually write to some staging directory, and then call Hive metastore's loadPartition/loadTable function to load those data in. So we still need to keep `InsertIntoHiveTable` to put this special logic. In the future, we should think of writing to the hive table location directly, so that we don't need to call `loadTable`/`loadPartition` at the end and remove `InsertIntoHiveTable`. This PR removes `SparkHiveWriterContainer` and `SparkHiveDynamicPartitionWriterContainer`, and create a `HiveFileFormat` to implement the write logic. In the future, we should also implement the read logic in `HiveFileFormat`. ## How was this patch tested? existing tests Author: Wenchen Fan <wenchen@databricks.com> Closes #16517 from cloud-fan/insert-hive.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala77
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala149
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala187
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala (renamed from sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala)2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala356
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala10
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala8
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala10
13 files changed, 303 insertions, 513 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 9b4b8b6fcd..4e30d038b1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -66,6 +66,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
PreprocessTableInsertion(conf) ::
DataSourceAnalysis(conf) ::
new DetermineHiveSerde(conf) ::
+ new HiveAnalysis(sparkSession) ::
new ResolveDataSource(sparkSession) :: Nil
override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog))
@@ -88,7 +89,6 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
SpecialLimits,
InMemoryScans,
HiveTableScans,
- DataSinks,
Scripts,
Aggregation,
JoinSelection,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index d1f11e78b4..7987a0a84c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -21,14 +21,14 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
-import org.apache.spark.sql.catalyst.plans._
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, ScriptTransformation}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.command.{DDLUtils, ExecutedCommandExec}
+import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.CreateTable
import org.apache.spark.sql.hive.execution._
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
+import org.apache.spark.sql.types.StructType
/**
@@ -86,6 +86,47 @@ class DetermineHiveSerde(conf: SQLConf) extends Rule[LogicalPlan] {
}
}
+class HiveAnalysis(session: SparkSession) extends Rule[LogicalPlan] {
+ override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+ case InsertIntoTable(table: MetastoreRelation, partSpec, query, overwrite, ifNotExists)
+ if hasBeenPreprocessed(table.output, table.partitionKeys.toStructType, partSpec, query) =>
+ InsertIntoHiveTable(table, partSpec, query, overwrite, ifNotExists)
+
+ case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
+ // Currently `DataFrameWriter.saveAsTable` doesn't support the Append mode of hive serde
+ // tables yet.
+ if (mode == SaveMode.Append) {
+ throw new AnalysisException(
+ "CTAS for hive serde tables does not support append semantics.")
+ }
+
+ val dbName = tableDesc.identifier.database.getOrElse(session.catalog.currentDatabase)
+ CreateHiveTableAsSelectCommand(
+ tableDesc.copy(identifier = tableDesc.identifier.copy(database = Some(dbName))),
+ query,
+ mode == SaveMode.Ignore)
+ }
+
+ /**
+ * Returns true if the [[InsertIntoTable]] plan has already been preprocessed by analyzer rule
+ * [[PreprocessTableInsertion]]. It is important that this rule([[HiveAnalysis]]) has to
+ * be run after [[PreprocessTableInsertion]], to normalize the column names in partition spec and
+ * fix the schema mismatch by adding Cast.
+ */
+ private def hasBeenPreprocessed(
+ tableOutput: Seq[Attribute],
+ partSchema: StructType,
+ partSpec: Map[String, Option[String]],
+ query: LogicalPlan): Boolean = {
+ val partColNames = partSchema.map(_.name).toSet
+ query.resolved && partSpec.keys.forall(partColNames.contains) && {
+ val staticPartCols = partSpec.filter(_._2.isDefined).keySet
+ val expectedColumns = tableOutput.filterNot(a => staticPartCols.contains(a.name))
+ expectedColumns.toStructType.sameType(query.schema)
+ }
+ }
+}
+
private[hive] trait HiveStrategies {
// Possibly being too clever with types here... or not clever enough.
self: SparkPlanner =>
@@ -94,35 +135,9 @@ private[hive] trait HiveStrategies {
object Scripts extends Strategy {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case logical.ScriptTransformation(input, script, output, child, ioschema) =>
+ case ScriptTransformation(input, script, output, child, ioschema) =>
val hiveIoSchema = HiveScriptIOSchema(ioschema)
- ScriptTransformation(input, script, output, planLater(child), hiveIoSchema) :: Nil
- case _ => Nil
- }
- }
-
- object DataSinks extends Strategy {
- def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
- case logical.InsertIntoTable(
- table: MetastoreRelation, partition, child, overwrite, ifNotExists) =>
- InsertIntoHiveTable(
- table, partition, planLater(child), overwrite, ifNotExists) :: Nil
-
- case CreateTable(tableDesc, mode, Some(query)) if DDLUtils.isHiveTable(tableDesc) =>
- // Currently `DataFrameWriter.saveAsTable` doesn't support
- // the Append mode of hive serde tables yet.
- if (mode == SaveMode.Append) {
- throw new AnalysisException(
- "CTAS for hive serde tables does not support append semantics.")
- }
-
- val dbName = tableDesc.identifier.database.getOrElse(sparkSession.catalog.currentDatabase)
- val cmd = CreateHiveTableAsSelectCommand(
- tableDesc.copy(identifier = tableDesc.identifier.copy(database = Some(dbName))),
- query,
- mode == SaveMode.Ignore)
- ExecutedCommandExec(cmd) :: Nil
-
+ ScriptTransformationExec(input, script, output, planLater(child), hiveIoSchema) :: Nil
case _ => Nil
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index aaf30f41f2..b4b63032ab 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -311,10 +311,10 @@ private[hive] object HiveTableUtil {
// that calls Hive.get() which tries to access metastore, but it's not valid in runtime
// it would be fixed in next version of hive but till then, we should use this instead
def configureJobPropertiesForStorageHandler(
- tableDesc: TableDesc, jobConf: JobConf, input: Boolean) {
+ tableDesc: TableDesc, conf: Configuration, input: Boolean) {
val property = tableDesc.getProperties.getProperty(META_TABLE_STORAGE)
val storageHandler =
- org.apache.hadoop.hive.ql.metadata.HiveUtils.getStorageHandler(jobConf, property)
+ org.apache.hadoop.hive.ql.metadata.HiveUtils.getStorageHandler(conf, property)
if (storageHandler != null) {
val jobProperties = new java.util.LinkedHashMap[String, String]
if (input) {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
index b1b8439efa..4e2193b6ab 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/package.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hive
/** Support for interacting with different versions of the HiveMetastoreClient */
package object client {
- private[client] abstract class HiveVersion(
+ private[hive] abstract class HiveVersion(
val fullVersion: String,
val extraDeps: Seq[String] = Nil,
val exclusions: Seq[String] = Nil)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
new file mode 100644
index 0000000000..cc2b60bc41
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveFileFormat.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.apache.hadoop.hive.ql.exec.Utilities
+import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
+import org.apache.hadoop.hive.serde2.Serializer
+import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, StructObjectInspector}
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.{JobConf, Reporter}
+import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriter, OutputWriterFactory}
+import org.apache.spark.sql.hive.{HiveInspectors, HiveTableUtil}
+import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.SerializableJobConf
+
+/**
+ * `FileFormat` for writing Hive tables.
+ *
+ * TODO: implement the read logic.
+ */
+class HiveFileFormat(fileSinkConf: FileSinkDesc) extends FileFormat with Logging {
+ override def inferSchema(
+ sparkSession: SparkSession,
+ options: Map[String, String],
+ files: Seq[FileStatus]): Option[StructType] = {
+ throw new UnsupportedOperationException(s"inferSchema is not supported for hive data source.")
+ }
+
+ override def prepareWrite(
+ sparkSession: SparkSession,
+ job: Job,
+ options: Map[String, String],
+ dataSchema: StructType): OutputWriterFactory = {
+ val conf = job.getConfiguration
+ val tableDesc = fileSinkConf.getTableInfo
+ conf.set("mapred.output.format.class", tableDesc.getOutputFileFormatClassName)
+
+ // When speculation is on and output committer class name contains "Direct", we should warn
+ // users that they may loss data if they are using a direct output committer.
+ val speculationEnabled = sparkSession.sparkContext.conf.getBoolean("spark.speculation", false)
+ val outputCommitterClass = conf.get("mapred.output.committer.class", "")
+ if (speculationEnabled && outputCommitterClass.contains("Direct")) {
+ val warningMessage =
+ s"$outputCommitterClass may be an output committer that writes data directly to " +
+ "the final location. Because speculation is enabled, this output committer may " +
+ "cause data loss (see the case in SPARK-10063). If possible, please use an output " +
+ "committer that does not have this behavior (e.g. FileOutputCommitter)."
+ logWarning(warningMessage)
+ }
+
+ // Add table properties from storage handler to hadoopConf, so any custom storage
+ // handler settings can be set to hadoopConf
+ HiveTableUtil.configureJobPropertiesForStorageHandler(tableDesc, conf, false)
+ Utilities.copyTableJobPropertiesToConf(tableDesc, conf)
+
+ // Avoid referencing the outer object.
+ val fileSinkConfSer = fileSinkConf
+ new OutputWriterFactory {
+ private val jobConf = new SerializableJobConf(new JobConf(conf))
+ @transient private lazy val outputFormat =
+ jobConf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef, Writable]]
+
+ override def getFileExtension(context: TaskAttemptContext): String = {
+ Utilities.getFileExtension(jobConf.value, fileSinkConfSer.getCompressed, outputFormat)
+ }
+
+ override def newInstance(
+ path: String,
+ dataSchema: StructType,
+ context: TaskAttemptContext): OutputWriter = {
+ new HiveOutputWriter(path, fileSinkConfSer, jobConf.value, dataSchema)
+ }
+ }
+ }
+}
+
+class HiveOutputWriter(
+ path: String,
+ fileSinkConf: FileSinkDesc,
+ jobConf: JobConf,
+ dataSchema: StructType) extends OutputWriter with HiveInspectors {
+
+ private def tableDesc = fileSinkConf.getTableInfo
+
+ private val serializer = {
+ val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
+ serializer.initialize(null, tableDesc.getProperties)
+ serializer
+ }
+
+ private val hiveWriter = HiveFileFormatUtils.getHiveRecordWriter(
+ jobConf,
+ tableDesc,
+ serializer.getSerializedClass,
+ fileSinkConf,
+ new Path(path),
+ Reporter.NULL)
+
+ private val standardOI = ObjectInspectorUtils
+ .getStandardObjectInspector(
+ tableDesc.getDeserializer.getObjectInspector,
+ ObjectInspectorCopyOption.JAVA)
+ .asInstanceOf[StructObjectInspector]
+
+ private val fieldOIs =
+ standardOI.getAllStructFieldRefs.asScala.map(_.getFieldObjectInspector).toArray
+ private val dataTypes = dataSchema.map(_.dataType).toArray
+ private val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => wrapperFor(f, dt) }
+ private val outputData = new Array[Any](fieldOIs.length)
+
+ override def write(row: InternalRow): Unit = {
+ var i = 0
+ while (i < fieldOIs.length) {
+ outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(row.get(i, dataTypes(i)))
+ i += 1
+ }
+ hiveWriter.write(serializer.serialize(outputData, standardOI))
+ }
+
+ override def close(): Unit = {
+ // Seems the boolean value passed into close does not matter.
+ hiveWriter.close(false)
+ }
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index aa858e808e..ce418ae135 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -24,22 +24,22 @@ import java.util.{Date, Locale, Random}
import scala.util.control.NonFatal
+import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.common.FileUtils
import org.apache.hadoop.hive.ql.exec.TaskRunner
import org.apache.hadoop.hive.ql.ErrorMsg
-import org.apache.hadoop.mapred.{FileOutputFormat, JobConf}
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.internal.io.FileCommitProtocol
+import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.physical.Partitioning
-import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.command.RunnableCommand
+import org.apache.spark.sql.execution.datasources.FileFormatWriter
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
+import org.apache.spark.sql.hive.client.HiveVersion
import org.apache.spark.SparkException
-import org.apache.spark.util.SerializableJobConf
/**
@@ -69,26 +69,20 @@ import org.apache.spark.util.SerializableJobConf
* {{{
* Map('a' -> Some('1'), 'b' -> None)
* }}}.
- * @param child the logical plan representing data to write to.
+ * @param query the logical plan representing data to write to.
* @param overwrite overwrite existing table or partitions.
* @param ifNotExists If true, only write if the table or partition does not exist.
*/
case class InsertIntoHiveTable(
table: MetastoreRelation,
partition: Map[String, Option[String]],
- child: SparkPlan,
+ query: LogicalPlan,
overwrite: Boolean,
- ifNotExists: Boolean) extends UnaryExecNode {
+ ifNotExists: Boolean) extends RunnableCommand {
- @transient private val sessionState = sqlContext.sessionState.asInstanceOf[HiveSessionState]
- @transient private val externalCatalog = sqlContext.sharedState.externalCatalog
+ override protected def innerChildren: Seq[LogicalPlan] = query :: Nil
- def output: Seq[Attribute] = Seq.empty
-
- val hadoopConf = sessionState.newHadoopConf()
var createdTempDir: Option[Path] = None
- val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
- val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
private def executionId: String = {
val rand: Random = new Random
@@ -96,7 +90,10 @@ case class InsertIntoHiveTable(
"hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
}
- private def getStagingDir(inputPath: Path): Path = {
+ private def getStagingDir(
+ inputPath: Path,
+ hadoopConf: Configuration,
+ stagingDir: String): Path = {
val inputPathUri: URI = inputPath.toUri
val inputPathName: String = inputPathUri.getPath
val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
@@ -121,17 +118,27 @@ case class InsertIntoHiveTable(
throw new RuntimeException(
"Cannot create staging directory '" + dir.toString + "': " + e.getMessage, e)
}
- return dir
+ dir
}
- private def getExternalScratchDir(extURI: URI): Path = {
- getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath))
+ private def getExternalScratchDir(
+ extURI: URI,
+ hadoopConf: Configuration,
+ stagingDir: String): Path = {
+ getStagingDir(
+ new Path(extURI.getScheme, extURI.getAuthority, extURI.getPath),
+ hadoopConf,
+ stagingDir)
}
- def getExternalTmpPath(path: Path): Path = {
+ def getExternalTmpPath(
+ path: Path,
+ hiveVersion: HiveVersion,
+ hadoopConf: Configuration,
+ stagingDir: String,
+ scratchDir: String): Path = {
import org.apache.spark.sql.hive.client.hive._
- val hiveVersion = externalCatalog.asInstanceOf[HiveExternalCatalog].client.version
// Before Hive 1.1, when inserting into a table, Hive will create the staging directory under
// a common scratch directory. After the writing is finished, Hive will simply empty the table
// directory and move the staging directory to it.
@@ -142,16 +149,19 @@ case class InsertIntoHiveTable(
// staging directory under the table director for Hive prior to 1.1, the staging directory will
// be removed by Hive when Hive is trying to empty the table directory.
if (hiveVersion == v12 || hiveVersion == v13 || hiveVersion == v14 || hiveVersion == v1_0) {
- oldVersionExternalTempPath(path)
+ oldVersionExternalTempPath(path, hadoopConf, scratchDir)
} else if (hiveVersion == v1_1 || hiveVersion == v1_2) {
- newVersionExternalTempPath(path)
+ newVersionExternalTempPath(path, hadoopConf, stagingDir)
} else {
throw new IllegalStateException("Unsupported hive version: " + hiveVersion.fullVersion)
}
}
// Mostly copied from Context.java#getExternalTmpPath of Hive 0.13
- def oldVersionExternalTempPath(path: Path): Path = {
+ def oldVersionExternalTempPath(
+ path: Path,
+ hadoopConf: Configuration,
+ scratchDir: String): Path = {
val extURI: URI = path.toUri
val scratchPath = new Path(scratchDir, executionId)
var dirPath = new Path(
@@ -176,54 +186,44 @@ case class InsertIntoHiveTable(
}
// Mostly copied from Context.java#getExternalTmpPath of Hive 1.2
- def newVersionExternalTempPath(path: Path): Path = {
+ def newVersionExternalTempPath(
+ path: Path,
+ hadoopConf: Configuration,
+ stagingDir: String): Path = {
val extURI: URI = path.toUri
if (extURI.getScheme == "viewfs") {
- getExtTmpPathRelTo(path.getParent)
+ getExtTmpPathRelTo(path.getParent, hadoopConf, stagingDir)
} else {
- new Path(getExternalScratchDir(extURI), "-ext-10000")
+ new Path(getExternalScratchDir(extURI, hadoopConf, stagingDir), "-ext-10000")
}
}
- def getExtTmpPathRelTo(path: Path): Path = {
- new Path(getStagingDir(path), "-ext-10000") // Hive uses 10000
- }
-
- private def saveAsHiveFile(
- rdd: RDD[InternalRow],
- valueClass: Class[_],
- fileSinkConf: FileSinkDesc,
- conf: SerializableJobConf,
- writerContainer: SparkHiveWriterContainer): Unit = {
- assert(valueClass != null, "Output value class not set")
- conf.value.setOutputValueClass(valueClass)
-
- val outputFileFormatClassName = fileSinkConf.getTableInfo.getOutputFileFormatClassName
- assert(outputFileFormatClassName != null, "Output format class not set")
- conf.value.set("mapred.output.format.class", outputFileFormatClassName)
-
- FileOutputFormat.setOutputPath(
- conf.value,
- SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName(), conf.value))
- log.debug("Saving as hadoop file of type " + valueClass.getSimpleName)
- writerContainer.driverSideSetup()
- sqlContext.sparkContext.runJob(rdd, writerContainer.writeToFile _)
- writerContainer.commitJob()
+ def getExtTmpPathRelTo(
+ path: Path,
+ hadoopConf: Configuration,
+ stagingDir: String): Path = {
+ new Path(getStagingDir(path, hadoopConf, stagingDir), "-ext-10000") // Hive uses 10000
}
/**
* Inserts all the rows in the table into Hive. Row objects are properly serialized with the
* `org.apache.hadoop.hive.serde2.SerDe` and the
* `org.apache.hadoop.mapred.OutputFormat` provided by the table definition.
- *
- * Note: this is run once and then kept to avoid double insertions.
*/
- protected[sql] lazy val sideEffectResult: Seq[InternalRow] = {
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val sessionState = sparkSession.sessionState
+ val externalCatalog = sparkSession.sharedState.externalCatalog
+ val hiveVersion = externalCatalog.asInstanceOf[HiveExternalCatalog].client.version
+ val hadoopConf = sessionState.newHadoopConf()
+ val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
+ val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
+
// Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer
// instances within the closure, since Serializer is not serializable while TableDesc is.
val tableDesc = table.tableDesc
val tableLocation = table.hiveQlTable.getDataLocation
- val tmpLocation = getExternalTmpPath(tableLocation)
+ val tmpLocation =
+ getExternalTmpPath(tableLocation, hiveVersion, hadoopConf, stagingDir, scratchDir)
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
val isCompressed = hadoopConf.get("hive.exec.compress.output", "false").toBoolean
@@ -276,40 +276,31 @@ case class InsertIntoHiveTable(
}
}
- val jobConf = new JobConf(hadoopConf)
- val jobConfSer = new SerializableJobConf(jobConf)
-
- // When speculation is on and output committer class name contains "Direct", we should warn
- // users that they may loss data if they are using a direct output committer.
- val speculationEnabled = sqlContext.sparkContext.conf.getBoolean("spark.speculation", false)
- val outputCommitterClass = jobConf.get("mapred.output.committer.class", "")
- if (speculationEnabled && outputCommitterClass.contains("Direct")) {
- val warningMessage =
- s"$outputCommitterClass may be an output committer that writes data directly to " +
- "the final location. Because speculation is enabled, this output committer may " +
- "cause data loss (see the case in SPARK-10063). If possible, please use an output " +
- "committer that does not have this behavior (e.g. FileOutputCommitter)."
- logWarning(warningMessage)
+ val committer = FileCommitProtocol.instantiate(
+ sparkSession.sessionState.conf.fileCommitProtocolClass,
+ jobId = java.util.UUID.randomUUID().toString,
+ outputPath = tmpLocation.toString,
+ isAppend = false)
+
+ val partitionAttributes = partitionColumnNames.takeRight(numDynamicPartitions).map { name =>
+ query.resolve(name :: Nil, sparkSession.sessionState.analyzer.resolver).getOrElse {
+ throw new AnalysisException(
+ s"Unable to resolve $name given [${query.output.map(_.name).mkString(", ")}]")
+ }.asInstanceOf[Attribute]
}
- val writerContainer = if (numDynamicPartitions > 0) {
- val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions)
- new SparkHiveDynamicPartitionWriterContainer(
- jobConf,
- fileSinkConf,
- dynamicPartColNames,
- child.output)
- } else {
- new SparkHiveWriterContainer(
- jobConf,
- fileSinkConf,
- child.output)
- }
-
- @transient val outputClass = writerContainer.newSerializer(table.tableDesc).getSerializedClass
- saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer)
+ FileFormatWriter.write(
+ sparkSession = sparkSession,
+ queryExecution = Dataset.ofRows(sparkSession, query).queryExecution,
+ fileFormat = new HiveFileFormat(fileSinkConf),
+ committer = committer,
+ outputSpec = FileFormatWriter.OutputSpec(tmpLocation.toString, Map.empty),
+ hadoopConf = hadoopConf,
+ partitionColumns = partitionAttributes,
+ bucketSpec = None,
+ refreshFunction = _ => (),
+ options = Map.empty)
- val outputPath = FileOutputFormat.getOutputPath(jobConf)
// TODO: Correctly set holdDDLTime.
// In most of the time, we should have holdDDLTime = false.
// holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint.
@@ -319,7 +310,7 @@ case class InsertIntoHiveTable(
externalCatalog.loadDynamicPartitions(
db = table.catalogTable.database,
table = table.catalogTable.identifier.table,
- outputPath.toString,
+ tmpLocation.toString,
partitionSpec,
overwrite,
numDynamicPartitions,
@@ -363,7 +354,7 @@ case class InsertIntoHiveTable(
externalCatalog.loadPartition(
table.catalogTable.database,
table.catalogTable.identifier.table,
- outputPath.toString,
+ tmpLocation.toString,
partitionSpec,
isOverwrite = doHiveOverwrite,
holdDDLTime = holdDDLTime,
@@ -375,7 +366,7 @@ case class InsertIntoHiveTable(
externalCatalog.loadTable(
table.catalogTable.database,
table.catalogTable.identifier.table,
- outputPath.toString, // TODO: URI
+ tmpLocation.toString, // TODO: URI
overwrite,
holdDDLTime,
isSrcLocal = false)
@@ -391,21 +382,13 @@ case class InsertIntoHiveTable(
}
// Invalidate the cache.
- sqlContext.sharedState.cacheManager.invalidateCache(table)
- sqlContext.sessionState.catalog.refreshTable(table.catalogTable.identifier)
+ sparkSession.sharedState.cacheManager.invalidateCache(table)
+ sparkSession.sessionState.catalog.refreshTable(table.catalogTable.identifier)
// It would be nice to just return the childRdd unchanged so insert operations could be chained,
// however for now we return an empty list to simplify compatibility checks with hive, which
// does not return anything for insert operations.
// TODO: implement hive compatibility as rules.
- Seq.empty[InternalRow]
- }
-
- override def outputPartitioning: Partitioning = child.outputPartitioning
-
- override def executeCollect(): Array[InternalRow] = sideEffectResult.toArray
-
- protected override def doExecute(): RDD[InternalRow] = {
- sqlContext.sparkContext.parallelize(sideEffectResult.asInstanceOf[Seq[InternalRow]], 1)
+ Seq.empty[Row]
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala
index 50855e48bc..e7c165c5f8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala
@@ -52,7 +52,7 @@ import org.apache.spark.util.{CircularBuffer, RedirectThread, SerializableConfig
* @param script the command that should be executed.
* @param output the attributes that are produced by the script.
*/
-case class ScriptTransformation(
+case class ScriptTransformationExec(
input: Seq[Expression],
script: String,
output: Seq[Attribute],
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
deleted file mode 100644
index 0c9321068c..0000000000
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ /dev/null
@@ -1,356 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.text.NumberFormat
-import java.util.{Date, Locale}
-
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hive.common.FileUtils
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars
-import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities}
-import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
-import org.apache.hadoop.hive.ql.plan.TableDesc
-import org.apache.hadoop.hive.serde2.Serializer
-import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, StructObjectInspector}
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapred._
-import org.apache.hadoop.mapreduce.TaskType
-
-import org.apache.spark._
-import org.apache.spark.internal.Logging
-import org.apache.spark.internal.io.SparkHadoopWriterUtils
-import org.apache.spark.mapred.SparkHadoopMapRedUtil
-import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.execution.UnsafeKVExternalSorter
-import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
-import org.apache.spark.sql.types._
-import org.apache.spark.util.SerializableJobConf
-import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
-
-/**
- * Internal helper class that saves an RDD using a Hive OutputFormat.
- * It is based on `SparkHadoopWriter`.
- */
-private[hive] class SparkHiveWriterContainer(
- @transient private val jobConf: JobConf,
- fileSinkConf: FileSinkDesc,
- inputSchema: Seq[Attribute])
- extends Logging
- with HiveInspectors
- with Serializable {
-
- private val now = new Date()
- private val tableDesc: TableDesc = fileSinkConf.getTableInfo
- // Add table properties from storage handler to jobConf, so any custom storage
- // handler settings can be set to jobConf
- if (tableDesc != null) {
- HiveTableUtil.configureJobPropertiesForStorageHandler(tableDesc, jobConf, false)
- Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf)
- }
- protected val conf = new SerializableJobConf(jobConf)
-
- private var jobID = 0
- private var splitID = 0
- private var attemptID = 0
- private var jID: SerializableWritable[JobID] = null
- private var taID: SerializableWritable[TaskAttemptID] = null
-
- @transient private var writer: FileSinkOperator.RecordWriter = null
- @transient protected lazy val committer = conf.value.getOutputCommitter
- @transient protected lazy val jobContext = new JobContextImpl(conf.value, jID.value)
- @transient private lazy val taskContext = new TaskAttemptContextImpl(conf.value, taID.value)
- @transient private lazy val outputFormat =
- conf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef, Writable]]
-
- def driverSideSetup() {
- setIDs(0, 0, 0)
- setConfParams()
- committer.setupJob(jobContext)
- }
-
- def executorSideSetup(jobId: Int, splitId: Int, attemptId: Int) {
- setIDs(jobId, splitId, attemptId)
- setConfParams()
- committer.setupTask(taskContext)
- initWriters()
- }
-
- protected def getOutputName: String = {
- val numberFormat = NumberFormat.getInstance(Locale.US)
- numberFormat.setMinimumIntegerDigits(5)
- numberFormat.setGroupingUsed(false)
- val extension = Utilities.getFileExtension(conf.value, fileSinkConf.getCompressed, outputFormat)
- "part-" + numberFormat.format(splitID) + extension
- }
-
- def close() {
- // Seems the boolean value passed into close does not matter.
- if (writer != null) {
- writer.close(false)
- commit()
- }
- }
-
- def commitJob() {
- committer.commitJob(jobContext)
- }
-
- protected def initWriters() {
- // NOTE this method is executed at the executor side.
- // For Hive tables without partitions or with only static partitions, only 1 writer is needed.
- writer = HiveFileFormatUtils.getHiveRecordWriter(
- conf.value,
- fileSinkConf.getTableInfo,
- conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
- fileSinkConf,
- FileOutputFormat.getTaskOutputPath(conf.value, getOutputName),
- Reporter.NULL)
- }
-
- protected def commit() {
- SparkHadoopMapRedUtil.commitTask(committer, taskContext, jobID, splitID)
- }
-
- def abortTask(): Unit = {
- if (committer != null) {
- committer.abortTask(taskContext)
- }
- logError(s"Task attempt $taskContext aborted.")
- }
-
- private def setIDs(jobId: Int, splitId: Int, attemptId: Int) {
- jobID = jobId
- splitID = splitId
- attemptID = attemptId
-
- jID = new SerializableWritable[JobID](SparkHadoopWriterUtils.createJobID(now, jobId))
- taID = new SerializableWritable[TaskAttemptID](
- new TaskAttemptID(new TaskID(jID.value, TaskType.MAP, splitID), attemptID))
- }
-
- private def setConfParams() {
- conf.value.set("mapred.job.id", jID.value.toString)
- conf.value.set("mapred.tip.id", taID.value.getTaskID.toString)
- conf.value.set("mapred.task.id", taID.value.toString)
- conf.value.setBoolean("mapred.task.is.map", true)
- conf.value.setInt("mapred.task.partition", splitID)
- }
-
- def newSerializer(tableDesc: TableDesc): Serializer = {
- val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
- serializer.initialize(null, tableDesc.getProperties)
- serializer
- }
-
- protected def prepareForWrite() = {
- val serializer = newSerializer(fileSinkConf.getTableInfo)
- val standardOI = ObjectInspectorUtils
- .getStandardObjectInspector(
- fileSinkConf.getTableInfo.getDeserializer.getObjectInspector,
- ObjectInspectorCopyOption.JAVA)
- .asInstanceOf[StructObjectInspector]
-
- val fieldOIs = standardOI.getAllStructFieldRefs.asScala.map(_.getFieldObjectInspector).toArray
- val dataTypes = inputSchema.map(_.dataType)
- val wrappers = fieldOIs.zip(dataTypes).map { case (f, dt) => wrapperFor(f, dt) }
- val outputData = new Array[Any](fieldOIs.length)
- (serializer, standardOI, fieldOIs, dataTypes, wrappers, outputData)
- }
-
- // this function is executed on executor side
- def writeToFile(context: TaskContext, iterator: Iterator[InternalRow]): Unit = {
- val (serializer, standardOI, fieldOIs, dataTypes, wrappers, outputData) = prepareForWrite()
- executorSideSetup(context.stageId, context.partitionId, context.attemptNumber)
-
- iterator.foreach { row =>
- var i = 0
- while (i < fieldOIs.length) {
- outputData(i) = if (row.isNullAt(i)) null else wrappers(i)(row.get(i, dataTypes(i)))
- i += 1
- }
- writer.write(serializer.serialize(outputData, standardOI))
- }
-
- close()
- }
-}
-
-private[hive] object SparkHiveWriterContainer {
- def createPathFromString(path: String, conf: JobConf): Path = {
- if (path == null) {
- throw new IllegalArgumentException("Output path is null")
- }
- val outputPath = new Path(path)
- val fs = outputPath.getFileSystem(conf)
- if (outputPath == null || fs == null) {
- throw new IllegalArgumentException("Incorrectly formatted output path")
- }
- outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
- }
-}
-
-private[spark] object SparkHiveDynamicPartitionWriterContainer {
- val SUCCESSFUL_JOB_OUTPUT_DIR_MARKER = "mapreduce.fileoutputcommitter.marksuccessfuljobs"
-}
-
-private[spark] class SparkHiveDynamicPartitionWriterContainer(
- jobConf: JobConf,
- fileSinkConf: FileSinkDesc,
- dynamicPartColNames: Array[String],
- inputSchema: Seq[Attribute])
- extends SparkHiveWriterContainer(jobConf, fileSinkConf, inputSchema) {
-
- import SparkHiveDynamicPartitionWriterContainer._
-
- private val defaultPartName = jobConf.get(
- ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultStrVal)
-
- override protected def initWriters(): Unit = {
- // do nothing
- }
-
- override def close(): Unit = {
- // do nothing
- }
-
- override def commitJob(): Unit = {
- // This is a hack to avoid writing _SUCCESS mark file. In lower versions of Hadoop (e.g. 1.0.4),
- // semantics of FileSystem.globStatus() is different from higher versions (e.g. 2.4.1) and will
- // include _SUCCESS file when glob'ing for dynamic partition data files.
- //
- // Better solution is to add a step similar to what Hive FileSinkOperator.jobCloseOp does:
- // calling something like Utilities.mvFileToFinalPath to cleanup the output directory and then
- // load it with loadDynamicPartitions/loadPartition/loadTable.
- val oldMarker = conf.value.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true)
- conf.value.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, false)
- super.commitJob()
- conf.value.setBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, oldMarker)
- }
-
- // this function is executed on executor side
- override def writeToFile(context: TaskContext, iterator: Iterator[InternalRow]): Unit = {
- val (serializer, standardOI, fieldOIs, dataTypes, wrappers, outputData) = prepareForWrite()
- executorSideSetup(context.stageId, context.partitionId, context.attemptNumber)
-
- val partitionOutput = inputSchema.takeRight(dynamicPartColNames.length)
- val dataOutput = inputSchema.take(fieldOIs.length)
- // Returns the partition key given an input row
- val getPartitionKey = UnsafeProjection.create(partitionOutput, inputSchema)
- // Returns the data columns to be written given an input row
- val getOutputRow = UnsafeProjection.create(dataOutput, inputSchema)
-
- val fun: AnyRef = (pathString: String) => FileUtils.escapePathName(pathString, defaultPartName)
- // Expressions that given a partition key build a string like: col1=val/col2=val/...
- val partitionStringExpression = partitionOutput.zipWithIndex.flatMap { case (c, i) =>
- val escaped =
- ScalaUDF(fun, StringType, Seq(Cast(c, StringType)), Seq(StringType))
- val str = If(IsNull(c), Literal(defaultPartName), escaped)
- val partitionName = Literal(dynamicPartColNames(i) + "=") :: str :: Nil
- if (i == 0) partitionName else Literal(Path.SEPARATOR_CHAR.toString) :: partitionName
- }
-
- // Returns the partition path given a partition key.
- val getPartitionString =
- UnsafeProjection.create(Concat(partitionStringExpression) :: Nil, partitionOutput)
-
- // If anything below fails, we should abort the task.
- try {
- val sorter: UnsafeKVExternalSorter = new UnsafeKVExternalSorter(
- StructType.fromAttributes(partitionOutput),
- StructType.fromAttributes(dataOutput),
- SparkEnv.get.blockManager,
- SparkEnv.get.serializerManager,
- TaskContext.get().taskMemoryManager().pageSizeBytes,
- SparkEnv.get.conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold",
- UnsafeExternalSorter.DEFAULT_NUM_ELEMENTS_FOR_SPILL_THRESHOLD))
-
- while (iterator.hasNext) {
- val inputRow = iterator.next()
- val currentKey = getPartitionKey(inputRow)
- sorter.insertKV(currentKey, getOutputRow(inputRow))
- }
-
- logInfo(s"Sorting complete. Writing out partition files one at a time.")
- val sortedIterator = sorter.sortedIterator()
- var currentKey: InternalRow = null
- var currentWriter: FileSinkOperator.RecordWriter = null
- try {
- while (sortedIterator.next()) {
- if (currentKey != sortedIterator.getKey) {
- if (currentWriter != null) {
- currentWriter.close(false)
- }
- currentKey = sortedIterator.getKey.copy()
- logDebug(s"Writing partition: $currentKey")
- currentWriter = newOutputWriter(currentKey)
- }
-
- var i = 0
- while (i < fieldOIs.length) {
- outputData(i) = if (sortedIterator.getValue.isNullAt(i)) {
- null
- } else {
- wrappers(i)(sortedIterator.getValue.get(i, dataTypes(i)))
- }
- i += 1
- }
- currentWriter.write(serializer.serialize(outputData, standardOI))
- }
- } finally {
- if (currentWriter != null) {
- currentWriter.close(false)
- }
- }
- commit()
- } catch {
- case cause: Throwable =>
- logError("Aborting task.", cause)
- abortTask()
- throw new SparkException("Task failed while writing rows.", cause)
- }
- /** Open and returns a new OutputWriter given a partition key. */
- def newOutputWriter(key: InternalRow): FileSinkOperator.RecordWriter = {
- val partitionPath = getPartitionString(key).getString(0)
- val newFileSinkDesc = new FileSinkDesc(
- fileSinkConf.getDirName + partitionPath,
- fileSinkConf.getTableInfo,
- fileSinkConf.getCompressed)
- newFileSinkDesc.setCompressCodec(fileSinkConf.getCompressCodec)
- newFileSinkDesc.setCompressType(fileSinkConf.getCompressType)
-
- // use the path like ${hive_tmp}/_temporary/${attemptId}/
- // to avoid write to the same file when `spark.speculation=true`
- val path = FileOutputFormat.getTaskOutputPath(
- conf.value,
- partitionPath.stripPrefix("/") + "/" + getOutputName)
-
- HiveFileFormatUtils.getHiveRecordWriter(
- conf.value,
- fileSinkConf.getTableInfo,
- conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
- newFileSinkDesc,
- path,
- Reporter.NULL)
- }
- }
-}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
index 5cb8519d2a..28b5bfd581 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala
@@ -565,8 +565,8 @@ class VersionsSuite extends SparkFunSuite with SQLTestUtils with TestHiveSinglet
val filePaths = dir.map(_.getName).toList
folders.flatMap(listFiles) ++: filePaths
}
- val expectedFiles = ".part-00000.crc" :: "part-00000" :: Nil
- assert(listFiles(tmpDir).sorted == expectedFiles)
+ // expect 2 files left: `.part-00000-random-uuid.crc` and `part-00000-random-uuid`
+ assert(listFiles(tmpDir).length == 2)
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
index 05a15166f8..4772a264d6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveComparisonTest.scala
@@ -26,6 +26,7 @@ import scala.util.control.NonFatal
import org.scalatest.{BeforeAndAfterAll, GivenWhenThen}
import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.Dataset
import org.apache.spark.sql.catalyst.SQLBuilder
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
@@ -441,23 +442,20 @@ abstract class HiveComparisonTest
val executions = queryList.map(new TestHiveQueryExecution(_))
executions.foreach(_.toRdd)
val tablesGenerated = queryList.zip(executions).flatMap {
- // We should take executedPlan instead of sparkPlan, because in following codes we
- // will run the collected plans. As we will do extra processing for sparkPlan such
- // as adding exchange, collapsing codegen stages, etc., collecting sparkPlan here
- // will cause some errors when running these plans later.
- case (q, e) => e.executedPlan.collect {
+ case (q, e) => e.analyzed.collect {
case i: InsertIntoHiveTable if tablesRead contains i.table.tableName =>
(q, e, i)
}
}
tablesGenerated.map { case (hiveql, execution, insert) =>
+ val rdd = Dataset.ofRows(TestHive.sparkSession, insert.query).queryExecution.toRdd
s"""
|=== Generated Table ===
|$hiveql
|$execution
|== Results ==
- |${insert.child.execute().collect().mkString("\n")}
+ |${rdd.collect().mkString("\n")}
""".stripMargin
}.mkString("\n")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index ef62be39cd..882a184124 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -22,6 +22,7 @@ import java.io.File
import org.apache.hadoop.fs.Path
import org.scalatest.BeforeAndAfterEach
+import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, TableAlreadyExistsException}
import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogTable, CatalogTableType}
@@ -799,7 +800,7 @@ class HiveDDLSuite
test("Create Cataloged Table As Select - Drop Table After Runtime Exception") {
withTable("tab") {
- intercept[RuntimeException] {
+ intercept[SparkException] {
sql(
"""
|CREATE TABLE tab
@@ -1273,7 +1274,7 @@ class HiveDDLSuite
sql("INSERT INTO t SELECT 1")
checkAnswer(spark.table("t"), Row(1))
// Check if this is compressed as ZLIB.
- val maybeOrcFile = path.listFiles().find(_.getName.endsWith("part-00000"))
+ val maybeOrcFile = path.listFiles().find(!_.getName.endsWith(".crc"))
assert(maybeOrcFile.isDefined)
val orcFilePath = maybeOrcFile.get.toPath.toString
val expectedCompressionKind =
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 2ae66d1b2f..75ba92cada 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -1043,8 +1043,8 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
assertResult(1, "Duplicated project detected\n" + analyzedPlan) {
analyzedPlan.collect {
- case _: Project => ()
- }.size
+ case i: InsertIntoHiveTable => i.query.collect { case p: Project => () }.size
+ }.sum
}
}
@@ -1062,8 +1062,8 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
assertResult(2, "Duplicated project detected\n" + analyzedPlan) {
analyzedPlan.collect {
- case _: Project => ()
- }.size
+ case i: InsertIntoHiveTable => i.query.collect { case p: Project => () }.size
+ }.sum
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
index d3475a79a7..5318b4650b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
@@ -55,7 +55,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a")
checkAnswer(
rowsDf,
- (child: SparkPlan) => new ScriptTransformation(
+ (child: SparkPlan) => new ScriptTransformationExec(
input = Seq(rowsDf.col("a").expr),
script = "cat",
output = Seq(AttributeReference("a", StringType)()),
@@ -71,7 +71,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a")
checkAnswer(
rowsDf,
- (child: SparkPlan) => new ScriptTransformation(
+ (child: SparkPlan) => new ScriptTransformationExec(
input = Seq(rowsDf.col("a").expr),
script = "cat",
output = Seq(AttributeReference("a", StringType)()),
@@ -88,7 +88,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
val e = intercept[TestFailedException] {
checkAnswer(
rowsDf,
- (child: SparkPlan) => new ScriptTransformation(
+ (child: SparkPlan) => new ScriptTransformationExec(
input = Seq(rowsDf.col("a").expr),
script = "cat",
output = Seq(AttributeReference("a", StringType)()),
@@ -107,7 +107,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
val e = intercept[TestFailedException] {
checkAnswer(
rowsDf,
- (child: SparkPlan) => new ScriptTransformation(
+ (child: SparkPlan) => new ScriptTransformationExec(
input = Seq(rowsDf.col("a").expr),
script = "cat",
output = Seq(AttributeReference("a", StringType)()),
@@ -126,7 +126,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton {
val e = intercept[SparkException] {
val plan =
- new ScriptTransformation(
+ new ScriptTransformationExec(
input = Seq(rowsDf.col("a").expr),
script = "some_non_existent_command",
output = Seq(AttributeReference("a", StringType)()),