aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-09-30 09:43:46 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-09-30 09:43:46 -0700
commitb64fcbd2dcec3418397328399c58f98d990a54f1 (patch)
tree2269127e0ab42c7a82fdb49283e940210bb838a9 /sql
parentb167a8c7e75d9e816784bd655bce1feb6c447210 (diff)
downloadspark-b64fcbd2dcec3418397328399c58f98d990a54f1.tar.gz
spark-b64fcbd2dcec3418397328399c58f98d990a54f1.tar.bz2
spark-b64fcbd2dcec3418397328399c58f98d990a54f1.zip
Revert "[SPARK-3007][SQL]Add Dynamic Partition support to Spark Sql hive"
This reverts commit 0bbe7faeffa17577ae8a33dfcd8c4c783db5c909.
Diffstat (limited to 'sql')
-rw-r--r--sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala17
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala195
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala5
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala207
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala217
-rw-r--r--sql/hive/src/test/resources/golden/dynamic_partition-0-be33aaa7253c8f248ff3921cd7dae3400
-rw-r--r--sql/hive/src/test/resources/golden/dynamic_partition-1-640552dd462707563fd255a713f83b410
-rw-r--r--sql/hive/src/test/resources/golden/dynamic_partition-2-36456c9d0d2e3ef72ab5ba9ba48e54931
-rw-r--r--sql/hive/src/test/resources/golden/dynamic_partition-3-b7f7fa7ebf666f4fee27e149d8c6961f0
-rw-r--r--sql/hive/src/test/resources/golden/dynamic_partition-4-8bdb71ad8cb3cc3026043def2525de3a0
-rw-r--r--sql/hive/src/test/resources/golden/dynamic_partition-5-c630dce438f3792e7fb0f523fbbb3e1e0
-rw-r--r--sql/hive/src/test/resources/golden/dynamic_partition-6-7abc9ec8a36cdc5e89e955265a7fd7cf0
-rw-r--r--sql/hive/src/test/resources/golden/dynamic_partition-7-be33aaa7253c8f248ff3921cd7dae3400
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala100
14 files changed, 299 insertions, 443 deletions
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index 35e9c9939d..556c984ad3 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -220,23 +220,6 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
*/
override def whiteList = Seq(
"add_part_exist",
- "dynamic_partition_skip_default",
- "infer_bucket_sort_dyn_part",
- "load_dyn_part1",
- "load_dyn_part2",
- "load_dyn_part3",
- "load_dyn_part4",
- "load_dyn_part5",
- "load_dyn_part6",
- "load_dyn_part7",
- "load_dyn_part8",
- "load_dyn_part9",
- "load_dyn_part10",
- "load_dyn_part11",
- "load_dyn_part12",
- "load_dyn_part13",
- "load_dyn_part14",
- "load_dyn_part14_win",
"add_part_multiple",
"add_partition_no_whitelist",
"add_partition_with_whitelist",
diff --git a/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
new file mode 100644
index 0000000000..ab7862f4f9
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.IOException
+import java.text.NumberFormat
+import java.util.Date
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities}
+import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc
+import org.apache.hadoop.mapred._
+import org.apache.hadoop.io.Writable
+
+import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter}
+
+/**
+ * Internal helper class that saves an RDD using a Hive OutputFormat.
+ * It is based on [[SparkHadoopWriter]].
+ */
+private[hive] class SparkHiveHadoopWriter(
+ @transient jobConf: JobConf,
+ fileSinkConf: FileSinkDesc)
+ extends Logging
+ with SparkHadoopMapRedUtil
+ with Serializable {
+
+ private val now = new Date()
+ private val conf = new SerializableWritable(jobConf)
+
+ private var jobID = 0
+ private var splitID = 0
+ private var attemptID = 0
+ private var jID: SerializableWritable[JobID] = null
+ private var taID: SerializableWritable[TaskAttemptID] = null
+
+ @transient private var writer: FileSinkOperator.RecordWriter = null
+ @transient private var format: HiveOutputFormat[AnyRef, Writable] = null
+ @transient private var committer: OutputCommitter = null
+ @transient private var jobContext: JobContext = null
+ @transient private var taskContext: TaskAttemptContext = null
+
+ def preSetup() {
+ setIDs(0, 0, 0)
+ setConfParams()
+
+ val jCtxt = getJobContext()
+ getOutputCommitter().setupJob(jCtxt)
+ }
+
+
+ def setup(jobid: Int, splitid: Int, attemptid: Int) {
+ setIDs(jobid, splitid, attemptid)
+ setConfParams()
+ }
+
+ def open() {
+ val numfmt = NumberFormat.getInstance()
+ numfmt.setMinimumIntegerDigits(5)
+ numfmt.setGroupingUsed(false)
+
+ val extension = Utilities.getFileExtension(
+ conf.value,
+ fileSinkConf.getCompressed,
+ getOutputFormat())
+
+ val outputName = "part-" + numfmt.format(splitID) + extension
+ val path = FileOutputFormat.getTaskOutputPath(conf.value, outputName)
+
+ getOutputCommitter().setupTask(getTaskContext())
+ writer = HiveFileFormatUtils.getHiveRecordWriter(
+ conf.value,
+ fileSinkConf.getTableInfo,
+ conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
+ fileSinkConf,
+ path,
+ null)
+ }
+
+ def write(value: Writable) {
+ if (writer != null) {
+ writer.write(value)
+ } else {
+ throw new IOException("Writer is null, open() has not been called")
+ }
+ }
+
+ def close() {
+ // Seems the boolean value passed into close does not matter.
+ writer.close(false)
+ }
+
+ def commit() {
+ val taCtxt = getTaskContext()
+ val cmtr = getOutputCommitter()
+ if (cmtr.needsTaskCommit(taCtxt)) {
+ try {
+ cmtr.commitTask(taCtxt)
+ logInfo (taID + ": Committed")
+ } catch {
+ case e: IOException =>
+ logError("Error committing the output of task: " + taID.value, e)
+ cmtr.abortTask(taCtxt)
+ throw e
+ }
+ } else {
+ logWarning ("No need to commit output of task: " + taID.value)
+ }
+ }
+
+ def commitJob() {
+ // always ? Or if cmtr.needsTaskCommit ?
+ val cmtr = getOutputCommitter()
+ cmtr.commitJob(getJobContext())
+ }
+
+ // ********* Private Functions *********
+
+ private def getOutputFormat(): HiveOutputFormat[AnyRef,Writable] = {
+ if (format == null) {
+ format = conf.value.getOutputFormat()
+ .asInstanceOf[HiveOutputFormat[AnyRef,Writable]]
+ }
+ format
+ }
+
+ private def getOutputCommitter(): OutputCommitter = {
+ if (committer == null) {
+ committer = conf.value.getOutputCommitter
+ }
+ committer
+ }
+
+ private def getJobContext(): JobContext = {
+ if (jobContext == null) {
+ jobContext = newJobContext(conf.value, jID.value)
+ }
+ jobContext
+ }
+
+ private def getTaskContext(): TaskAttemptContext = {
+ if (taskContext == null) {
+ taskContext = newTaskAttemptContext(conf.value, taID.value)
+ }
+ taskContext
+ }
+
+ private def setIDs(jobId: Int, splitId: Int, attemptId: Int) {
+ jobID = jobId
+ splitID = splitId
+ attemptID = attemptId
+
+ jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobId))
+ taID = new SerializableWritable[TaskAttemptID](
+ new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID))
+ }
+
+ private def setConfParams() {
+ conf.value.set("mapred.job.id", jID.value.toString)
+ conf.value.set("mapred.tip.id", taID.value.getTaskID.toString)
+ conf.value.set("mapred.task.id", taID.value.toString)
+ conf.value.setBoolean("mapred.task.is.map", true)
+ conf.value.setInt("mapred.task.partition", splitID)
+ }
+}
+
+private[hive] object SparkHiveHadoopWriter {
+ def createPathFromString(path: String, conf: JobConf): Path = {
+ if (path == null) {
+ throw new IllegalArgumentException("Output path is null")
+ }
+ val outputPath = new Path(path)
+ val fs = outputPath.getFileSystem(conf)
+ if (outputPath == null || fs == null) {
+ throw new IllegalArgumentException("Incorrectly formatted output path")
+ }
+ outputPath.makeQualified(fs)
+ }
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 4e30e6e06f..0aa6292c01 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -837,6 +837,11 @@ private[hive] object HiveQl {
cleanIdentifier(key.toLowerCase) -> None
}.toMap).getOrElse(Map.empty)
+ if (partitionKeys.values.exists(p => p.isEmpty)) {
+ throw new NotImplementedError(s"Do not support INSERT INTO/OVERWRITE with" +
+ s"dynamic partitioning.")
+ }
+
InsertIntoTable(UnresolvedRelation(db, tableName, None), partitionKeys, query, overwrite)
case a: ASTNode =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 3d2ee01069..a284a91a91 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -19,25 +19,27 @@ package org.apache.spark.sql.hive.execution
import scala.collection.JavaConversions._
+import java.util.{HashMap => JHashMap}
+
import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar}
-import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.metastore.MetaStoreUtils
+import org.apache.hadoop.hive.ql.Context
import org.apache.hadoop.hive.ql.metadata.Hive
import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
-import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
import org.apache.hadoop.hive.serde2.Serializer
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
import org.apache.hadoop.hive.serde2.objectinspector._
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.{JavaHiveDecimalObjectInspector, JavaHiveVarcharObjectInspector}
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveDecimalObjectInspector
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveVarcharObjectInspector
+import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf}
+import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.Row
-import org.apache.spark.sql.execution.{Command, SparkPlan, UnaryNode}
-import org.apache.spark.sql.hive._
-import org.apache.spark.{SerializableWritable, SparkException, TaskContext}
+import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
+import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation, SparkHiveHadoopWriter}
/**
* :: DeveloperApi ::
@@ -49,7 +51,7 @@ case class InsertIntoHiveTable(
child: SparkPlan,
overwrite: Boolean)
(@transient sc: HiveContext)
- extends UnaryNode with Command {
+ extends UnaryNode {
@transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
@transient private lazy val hiveContext = new Context(sc.hiveconf)
@@ -99,74 +101,66 @@ case class InsertIntoHiveTable(
}
def saveAsHiveFile(
- rdd: RDD[Row],
+ rdd: RDD[Writable],
valueClass: Class[_],
fileSinkConf: FileSinkDesc,
- conf: SerializableWritable[JobConf],
- writerContainer: SparkHiveWriterContainer) {
- assert(valueClass != null, "Output value class not set")
- conf.value.setOutputValueClass(valueClass)
-
- val outputFileFormatClassName = fileSinkConf.getTableInfo.getOutputFileFormatClassName
- assert(outputFileFormatClassName != null, "Output format class not set")
- conf.value.set("mapred.output.format.class", outputFileFormatClassName)
-
- val isCompressed = conf.value.getBoolean(
- ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal)
-
+ conf: JobConf,
+ isCompressed: Boolean) {
+ if (valueClass == null) {
+ throw new SparkException("Output value class not set")
+ }
+ conf.setOutputValueClass(valueClass)
+ if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null) {
+ throw new SparkException("Output format class not set")
+ }
+ // Doesn't work in Scala 2.9 due to what may be a generics bug
+ // TODO: Should we uncomment this for Scala 2.10?
+ // conf.setOutputFormat(outputFormatClass)
+ conf.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName)
if (isCompressed) {
// Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
// and "mapred.output.compression.type" have no impact on ORC because it uses table properties
// to store compression information.
- conf.value.set("mapred.output.compress", "true")
+ conf.set("mapred.output.compress", "true")
fileSinkConf.setCompressed(true)
- fileSinkConf.setCompressCodec(conf.value.get("mapred.output.compression.codec"))
- fileSinkConf.setCompressType(conf.value.get("mapred.output.compression.type"))
+ fileSinkConf.setCompressCodec(conf.get("mapred.output.compression.codec"))
+ fileSinkConf.setCompressType(conf.get("mapred.output.compression.type"))
}
- conf.value.setOutputCommitter(classOf[FileOutputCommitter])
-
+ conf.setOutputCommitter(classOf[FileOutputCommitter])
FileOutputFormat.setOutputPath(
- conf.value,
- SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName, conf.value))
- log.debug("Saving as hadoop file of type " + valueClass.getSimpleName)
+ conf,
+ SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, conf))
- writerContainer.driverSideSetup()
- sc.sparkContext.runJob(rdd, writeToFile _)
- writerContainer.commitJob()
-
- // Note that this function is executed on executor side
- def writeToFile(context: TaskContext, iterator: Iterator[Row]) {
- val serializer = newSerializer(fileSinkConf.getTableInfo)
- val standardOI = ObjectInspectorUtils
- .getStandardObjectInspector(
- fileSinkConf.getTableInfo.getDeserializer.getObjectInspector,
- ObjectInspectorCopyOption.JAVA)
- .asInstanceOf[StructObjectInspector]
+ log.debug("Saving as hadoop file of type " + valueClass.getSimpleName)
- val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray
- val outputData = new Array[Any](fieldOIs.length)
+ val writer = new SparkHiveHadoopWriter(conf, fileSinkConf)
+ writer.preSetup()
+ def writeToFile(context: TaskContext, iter: Iterator[Writable]) {
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
- writerContainer.executorSideSetup(context.stageId, context.partitionId, attemptNumber)
- iterator.foreach { row =>
- var i = 0
- while (i < fieldOIs.length) {
- // TODO (lian) avoid per row dynamic dispatching and pattern matching cost in `wrap`
- outputData(i) = wrap(row(i), fieldOIs(i))
- i += 1
- }
+ writer.setup(context.stageId, context.partitionId, attemptNumber)
+ writer.open()
- val writer = writerContainer.getLocalFileWriter(row)
- writer.write(serializer.serialize(outputData, standardOI))
+ var count = 0
+ while(iter.hasNext) {
+ val record = iter.next()
+ count += 1
+ writer.write(record)
}
- writerContainer.close()
+ writer.close()
+ writer.commit()
}
+
+ sc.sparkContext.runJob(rdd, writeToFile _)
+ writer.commitJob()
}
+ override def execute() = result
+
/**
* Inserts all the rows in the table into Hive. Row objects are properly serialized with the
* `org.apache.hadoop.hive.serde2.SerDe` and the
@@ -174,57 +168,50 @@ case class InsertIntoHiveTable(
*
* Note: this is run once and then kept to avoid double insertions.
*/
- override protected[sql] lazy val sideEffectResult: Seq[Row] = {
+ private lazy val result: RDD[Row] = {
+ val childRdd = child.execute()
+ assert(childRdd != null)
+
// Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer
// instances within the closure, since Serializer is not serializable while TableDesc is.
val tableDesc = table.tableDesc
val tableLocation = table.hiveQlTable.getDataLocation
val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation)
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
+ val rdd = childRdd.mapPartitions { iter =>
+ val serializer = newSerializer(fileSinkConf.getTableInfo)
+ val standardOI = ObjectInspectorUtils
+ .getStandardObjectInspector(
+ fileSinkConf.getTableInfo.getDeserializer.getObjectInspector,
+ ObjectInspectorCopyOption.JAVA)
+ .asInstanceOf[StructObjectInspector]
- val numDynamicPartitions = partition.values.count(_.isEmpty)
- val numStaticPartitions = partition.values.count(_.nonEmpty)
- val partitionSpec = partition.map {
- case (key, Some(value)) => key -> value
- case (key, None) => key -> ""
- }
-
- // All partition column names in the format of "<column name 1>/<column name 2>/..."
- val partitionColumns = fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns")
- val partitionColumnNames = Option(partitionColumns).map(_.split("/")).orNull
-
- // Validate partition spec if there exist any dynamic partitions
- if (numDynamicPartitions > 0) {
- // Report error if dynamic partitioning is not enabled
- if (!sc.hiveconf.getBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING)) {
- throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg)
- }
- // Report error if dynamic partition strict mode is on but no static partition is found
- if (numStaticPartitions == 0 &&
- sc.hiveconf.getVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) {
- throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg)
- }
+ val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray
+ val outputData = new Array[Any](fieldOIs.length)
+ iter.map { row =>
+ var i = 0
+ while (i < row.length) {
+ // Casts Strings to HiveVarchars when necessary.
+ outputData(i) = wrap(row(i), fieldOIs(i))
+ i += 1
+ }
- // Report error if any static partition appears after a dynamic partition
- val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty)
- isDynamic.init.zip(isDynamic.tail).find(_ == (true, false)).foreach { _ =>
- throw new SparkException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg)
+ serializer.serialize(outputData, standardOI)
}
}
+ // ORC stores compression information in table properties. While, there are other formats
+ // (e.g. RCFile) that rely on hadoop configurations to store compression information.
val jobConf = new JobConf(sc.hiveconf)
- val jobConfSer = new SerializableWritable(jobConf)
-
- val writerContainer = if (numDynamicPartitions > 0) {
- val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions)
- new SparkHiveDynamicPartitionWriterContainer(jobConf, fileSinkConf, dynamicPartColNames)
- } else {
- new SparkHiveWriterContainer(jobConf, fileSinkConf)
- }
-
- saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer)
-
+ saveAsHiveFile(
+ rdd,
+ outputClass,
+ fileSinkConf,
+ jobConf,
+ sc.hiveconf.getBoolean("hive.exec.compress.output", false))
+
+ // TODO: Handle dynamic partitioning.
val outputPath = FileOutputFormat.getOutputPath(jobConf)
// Have to construct the format of dbname.tablename.
val qualifiedTableName = s"${table.databaseName}.${table.tableName}"
@@ -233,6 +220,10 @@ case class InsertIntoHiveTable(
// holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint.
val holdDDLTime = false
if (partition.nonEmpty) {
+ val partitionSpec = partition.map {
+ case (key, Some(value)) => key -> value
+ case (key, None) => key -> "" // Should not reach here right now.
+ }
val partVals = MetaStoreUtils.getPvals(table.hiveQlTable.getPartCols, partitionSpec)
db.validatePartitionNameCharacters(partVals)
// inheritTableSpecs is set to true. It should be set to false for a IMPORT query
@@ -240,26 +231,14 @@ case class InsertIntoHiveTable(
val inheritTableSpecs = true
// TODO: Correctly set isSkewedStoreAsSubdir.
val isSkewedStoreAsSubdir = false
- if (numDynamicPartitions > 0) {
- db.loadDynamicPartitions(
- outputPath,
- qualifiedTableName,
- partitionSpec,
- overwrite,
- numDynamicPartitions,
- holdDDLTime,
- isSkewedStoreAsSubdir
- )
- } else {
- db.loadPartition(
- outputPath,
- qualifiedTableName,
- partitionSpec,
- overwrite,
- holdDDLTime,
- inheritTableSpecs,
- isSkewedStoreAsSubdir)
- }
+ db.loadPartition(
+ outputPath,
+ qualifiedTableName,
+ partitionSpec,
+ overwrite,
+ holdDDLTime,
+ inheritTableSpecs,
+ isSkewedStoreAsSubdir)
} else {
db.loadTable(
outputPath,
@@ -272,6 +251,6 @@ case class InsertIntoHiveTable(
// however for now we return an empty list to simplify compatibility checks with hive, which
// does not return anything for insert operations.
// TODO: implement hive compatibility as rules.
- Seq.empty[Row]
+ sc.sparkContext.makeRDD(Nil, 1)
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
deleted file mode 100644
index a667188fa5..0000000000
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.io.IOException
-import java.text.NumberFormat
-import java.util.Date
-
-import scala.collection.mutable
-
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars
-import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities}
-import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
-import org.apache.hadoop.hive.ql.plan.FileSinkDesc
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapred._
-
-import org.apache.spark.sql.Row
-import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter}
-
-/**
- * Internal helper class that saves an RDD using a Hive OutputFormat.
- * It is based on [[SparkHadoopWriter]].
- */
-private[hive] class SparkHiveWriterContainer(
- @transient jobConf: JobConf,
- fileSinkConf: FileSinkDesc)
- extends Logging
- with SparkHadoopMapRedUtil
- with Serializable {
-
- private val now = new Date()
- protected val conf = new SerializableWritable(jobConf)
-
- private var jobID = 0
- private var splitID = 0
- private var attemptID = 0
- private var jID: SerializableWritable[JobID] = null
- private var taID: SerializableWritable[TaskAttemptID] = null
-
- @transient private var writer: FileSinkOperator.RecordWriter = null
- @transient private lazy val committer = conf.value.getOutputCommitter
- @transient private lazy val jobContext = newJobContext(conf.value, jID.value)
- @transient private lazy val taskContext = newTaskAttemptContext(conf.value, taID.value)
- @transient private lazy val outputFormat =
- conf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef,Writable]]
-
- def driverSideSetup() {
- setIDs(0, 0, 0)
- setConfParams()
- committer.setupJob(jobContext)
- }
-
- def executorSideSetup(jobId: Int, splitId: Int, attemptId: Int) {
- setIDs(jobId, splitId, attemptId)
- setConfParams()
- committer.setupTask(taskContext)
- initWriters()
- }
-
- protected def getOutputName: String = {
- val numberFormat = NumberFormat.getInstance()
- numberFormat.setMinimumIntegerDigits(5)
- numberFormat.setGroupingUsed(false)
- val extension = Utilities.getFileExtension(conf.value, fileSinkConf.getCompressed, outputFormat)
- "part-" + numberFormat.format(splitID) + extension
- }
-
- def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = writer
-
- def close() {
- // Seems the boolean value passed into close does not matter.
- writer.close(false)
- commit()
- }
-
- def commitJob() {
- committer.commitJob(jobContext)
- }
-
- protected def initWriters() {
- // NOTE this method is executed at the executor side.
- // For Hive tables without partitions or with only static partitions, only 1 writer is needed.
- writer = HiveFileFormatUtils.getHiveRecordWriter(
- conf.value,
- fileSinkConf.getTableInfo,
- conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
- fileSinkConf,
- FileOutputFormat.getTaskOutputPath(conf.value, getOutputName),
- Reporter.NULL)
- }
-
- protected def commit() {
- if (committer.needsTaskCommit(taskContext)) {
- try {
- committer.commitTask(taskContext)
- logInfo (taID + ": Committed")
- } catch {
- case e: IOException =>
- logError("Error committing the output of task: " + taID.value, e)
- committer.abortTask(taskContext)
- throw e
- }
- } else {
- logInfo("No need to commit output of task: " + taID.value)
- }
- }
-
- // ********* Private Functions *********
-
- private def setIDs(jobId: Int, splitId: Int, attemptId: Int) {
- jobID = jobId
- splitID = splitId
- attemptID = attemptId
-
- jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobId))
- taID = new SerializableWritable[TaskAttemptID](
- new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID))
- }
-
- private def setConfParams() {
- conf.value.set("mapred.job.id", jID.value.toString)
- conf.value.set("mapred.tip.id", taID.value.getTaskID.toString)
- conf.value.set("mapred.task.id", taID.value.toString)
- conf.value.setBoolean("mapred.task.is.map", true)
- conf.value.setInt("mapred.task.partition", splitID)
- }
-}
-
-private[hive] object SparkHiveWriterContainer {
- def createPathFromString(path: String, conf: JobConf): Path = {
- if (path == null) {
- throw new IllegalArgumentException("Output path is null")
- }
- val outputPath = new Path(path)
- val fs = outputPath.getFileSystem(conf)
- if (outputPath == null || fs == null) {
- throw new IllegalArgumentException("Incorrectly formatted output path")
- }
- outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
- }
-}
-
-private[spark] class SparkHiveDynamicPartitionWriterContainer(
- @transient jobConf: JobConf,
- fileSinkConf: FileSinkDesc,
- dynamicPartColNames: Array[String])
- extends SparkHiveWriterContainer(jobConf, fileSinkConf) {
-
- private val defaultPartName = jobConf.get(
- ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultVal)
-
- @transient private var writers: mutable.HashMap[String, FileSinkOperator.RecordWriter] = _
-
- override protected def initWriters(): Unit = {
- // NOTE: This method is executed at the executor side.
- // Actual writers are created for each dynamic partition on the fly.
- writers = mutable.HashMap.empty[String, FileSinkOperator.RecordWriter]
- }
-
- override def close(): Unit = {
- writers.values.foreach(_.close(false))
- commit()
- }
-
- override def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = {
- val dynamicPartPath = dynamicPartColNames
- .zip(row.takeRight(dynamicPartColNames.length))
- .map { case (col, rawVal) =>
- val string = String.valueOf(rawVal)
- s"/$col=${if (rawVal == null || string.isEmpty) defaultPartName else string}"
- }
- .mkString
-
- def newWriter = {
- val newFileSinkDesc = new FileSinkDesc(
- fileSinkConf.getDirName + dynamicPartPath,
- fileSinkConf.getTableInfo,
- fileSinkConf.getCompressed)
- newFileSinkDesc.setCompressCodec(fileSinkConf.getCompressCodec)
- newFileSinkDesc.setCompressType(fileSinkConf.getCompressType)
-
- val path = {
- val outputPath = FileOutputFormat.getOutputPath(conf.value)
- assert(outputPath != null, "Undefined job output-path")
- val workPath = new Path(outputPath, dynamicPartPath.stripPrefix("/"))
- new Path(workPath, getOutputName)
- }
-
- HiveFileFormatUtils.getHiveRecordWriter(
- conf.value,
- fileSinkConf.getTableInfo,
- conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
- newFileSinkDesc,
- path,
- Reporter.NULL)
- }
-
- writers.getOrElseUpdate(dynamicPartPath, newWriter)
- }
-}
diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-0-be33aaa7253c8f248ff3921cd7dae340 b/sql/hive/src/test/resources/golden/dynamic_partition-0-be33aaa7253c8f248ff3921cd7dae340
deleted file mode 100644
index e69de29bb2..0000000000
--- a/sql/hive/src/test/resources/golden/dynamic_partition-0-be33aaa7253c8f248ff3921cd7dae340
+++ /dev/null
diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-1-640552dd462707563fd255a713f83b41 b/sql/hive/src/test/resources/golden/dynamic_partition-1-640552dd462707563fd255a713f83b41
deleted file mode 100644
index e69de29bb2..0000000000
--- a/sql/hive/src/test/resources/golden/dynamic_partition-1-640552dd462707563fd255a713f83b41
+++ /dev/null
diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-2-36456c9d0d2e3ef72ab5ba9ba48e5493 b/sql/hive/src/test/resources/golden/dynamic_partition-2-36456c9d0d2e3ef72ab5ba9ba48e5493
deleted file mode 100644
index 573541ac97..0000000000
--- a/sql/hive/src/test/resources/golden/dynamic_partition-2-36456c9d0d2e3ef72ab5ba9ba48e5493
+++ /dev/null
@@ -1 +0,0 @@
-0
diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-3-b7f7fa7ebf666f4fee27e149d8c6961f b/sql/hive/src/test/resources/golden/dynamic_partition-3-b7f7fa7ebf666f4fee27e149d8c6961f
deleted file mode 100644
index e69de29bb2..0000000000
--- a/sql/hive/src/test/resources/golden/dynamic_partition-3-b7f7fa7ebf666f4fee27e149d8c6961f
+++ /dev/null
diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-4-8bdb71ad8cb3cc3026043def2525de3a b/sql/hive/src/test/resources/golden/dynamic_partition-4-8bdb71ad8cb3cc3026043def2525de3a
deleted file mode 100644
index e69de29bb2..0000000000
--- a/sql/hive/src/test/resources/golden/dynamic_partition-4-8bdb71ad8cb3cc3026043def2525de3a
+++ /dev/null
diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-5-c630dce438f3792e7fb0f523fbbb3e1e b/sql/hive/src/test/resources/golden/dynamic_partition-5-c630dce438f3792e7fb0f523fbbb3e1e
deleted file mode 100644
index e69de29bb2..0000000000
--- a/sql/hive/src/test/resources/golden/dynamic_partition-5-c630dce438f3792e7fb0f523fbbb3e1e
+++ /dev/null
diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-6-7abc9ec8a36cdc5e89e955265a7fd7cf b/sql/hive/src/test/resources/golden/dynamic_partition-6-7abc9ec8a36cdc5e89e955265a7fd7cf
deleted file mode 100644
index e69de29bb2..0000000000
--- a/sql/hive/src/test/resources/golden/dynamic_partition-6-7abc9ec8a36cdc5e89e955265a7fd7cf
+++ /dev/null
diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-7-be33aaa7253c8f248ff3921cd7dae340 b/sql/hive/src/test/resources/golden/dynamic_partition-7-be33aaa7253c8f248ff3921cd7dae340
deleted file mode 100644
index e69de29bb2..0000000000
--- a/sql/hive/src/test/resources/golden/dynamic_partition-7-be33aaa7253c8f248ff3921cd7dae340
+++ /dev/null
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 5d743a51b4..2da8a6fac3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -19,9 +19,6 @@ package org.apache.spark.sql.hive.execution
import scala.util.Try
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars
-
-import org.apache.spark.SparkException
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
@@ -383,7 +380,7 @@ class HiveQuerySuite extends HiveComparisonTest {
def isExplanation(result: SchemaRDD) = {
val explanation = result.select('plan).collect().map { case Row(plan: String) => plan }
- explanation.contains("== Physical Plan ==")
+ explanation.exists(_ == "== Physical Plan ==")
}
test("SPARK-1704: Explain commands as a SchemaRDD") {
@@ -571,91 +568,6 @@ class HiveQuerySuite extends HiveComparisonTest {
case class LogEntry(filename: String, message: String)
case class LogFile(name: String)
- createQueryTest("dynamic_partition",
- """
- |DROP TABLE IF EXISTS dynamic_part_table;
- |CREATE TABLE dynamic_part_table(intcol INT) PARTITIONED BY (partcol1 INT, partcol2 INT);
- |
- |SET hive.exec.dynamic.partition.mode=nonstrict;
- |
- |INSERT INTO TABLE dynamic_part_table PARTITION(partcol1, partcol2)
- |SELECT 1, 1, 1 FROM src WHERE key=150;
- |
- |INSERT INTO TABLE dynamic_part_table PARTITION(partcol1, partcol2)
- |SELECT 1, NULL, 1 FROM src WHERE key=150;
- |
- |INSERT INTO TABLE dynamic_part_table PARTITION(partcol1, partcol2)
- |SELECT 1, 1, NULL FROM src WHERE key=150;
- |
- |INSERT INTO TABLe dynamic_part_table PARTITION(partcol1, partcol2)
- |SELECT 1, NULL, NULL FROM src WHERE key=150;
- |
- |DROP TABLE IF EXISTS dynamic_part_table;
- """.stripMargin)
-
- test("Dynamic partition folder layout") {
- sql("DROP TABLE IF EXISTS dynamic_part_table")
- sql("CREATE TABLE dynamic_part_table(intcol INT) PARTITIONED BY (partcol1 INT, partcol2 INT)")
- sql("SET hive.exec.dynamic.partition.mode=nonstrict")
-
- val data = Map(
- Seq("1", "1") -> 1,
- Seq("1", "NULL") -> 2,
- Seq("NULL", "1") -> 3,
- Seq("NULL", "NULL") -> 4)
-
- data.foreach { case (parts, value) =>
- sql(
- s"""INSERT INTO TABLE dynamic_part_table PARTITION(partcol1, partcol2)
- |SELECT $value, ${parts.mkString(", ")} FROM src WHERE key=150
- """.stripMargin)
-
- val partFolder = Seq("partcol1", "partcol2")
- .zip(parts)
- .map { case (k, v) =>
- if (v == "NULL") {
- s"$k=${ConfVars.DEFAULTPARTITIONNAME.defaultVal}"
- } else {
- s"$k=$v"
- }
- }
- .mkString("/")
-
- // Loads partition data to a temporary table to verify contents
- val path = s"$warehousePath/dynamic_part_table/$partFolder/part-00000"
-
- sql("DROP TABLE IF EXISTS dp_verify")
- sql("CREATE TABLE dp_verify(intcol INT)")
- sql(s"LOAD DATA LOCAL INPATH '$path' INTO TABLE dp_verify")
-
- assert(sql("SELECT * FROM dp_verify").collect() === Array(Row(value)))
- }
- }
-
- test("Partition spec validation") {
- sql("DROP TABLE IF EXISTS dp_test")
- sql("CREATE TABLE dp_test(key INT, value STRING) PARTITIONED BY (dp INT, sp INT)")
- sql("SET hive.exec.dynamic.partition.mode=strict")
-
- // Should throw when using strict dynamic partition mode without any static partition
- intercept[SparkException] {
- sql(
- """INSERT INTO TABLE dp_test PARTITION(dp)
- |SELECT key, value, key % 5 FROM src
- """.stripMargin)
- }
-
- sql("SET hive.exec.dynamic.partition.mode=nonstrict")
-
- // Should throw when a static partition appears after a dynamic partition
- intercept[SparkException] {
- sql(
- """INSERT INTO TABLE dp_test PARTITION(dp, sp = 1)
- |SELECT key, value, key % 5 FROM src
- """.stripMargin)
- }
- }
-
test("SPARK-3414 regression: should store analyzed logical plan when registering a temp table") {
sparkContext.makeRDD(Seq.empty[LogEntry]).registerTempTable("rawLogs")
sparkContext.makeRDD(Seq.empty[LogFile]).registerTempTable("logFiles")
@@ -713,27 +625,27 @@ class HiveQuerySuite extends HiveComparisonTest {
assert(sql("SET").collect().size == 0)
assertResult(Set(testKey -> testVal)) {
- collectResults(sql(s"SET $testKey=$testVal"))
+ collectResults(hql(s"SET $testKey=$testVal"))
}
assert(hiveconf.get(testKey, "") == testVal)
assertResult(Set(testKey -> testVal)) {
- collectResults(sql("SET"))
+ collectResults(hql("SET"))
}
sql(s"SET ${testKey + testKey}=${testVal + testVal}")
assert(hiveconf.get(testKey + testKey, "") == testVal + testVal)
assertResult(Set(testKey -> testVal, (testKey + testKey) -> (testVal + testVal))) {
- collectResults(sql("SET"))
+ collectResults(hql("SET"))
}
// "set key"
assertResult(Set(testKey -> testVal)) {
- collectResults(sql(s"SET $testKey"))
+ collectResults(hql(s"SET $testKey"))
}
assertResult(Set(nonexistentKey -> "<undefined>")) {
- collectResults(sql(s"SET $nonexistentKey"))
+ collectResults(hql(s"SET $nonexistentKey"))
}
// Assert that sql() should have the same effects as sql() by repeating the above using sql().