aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheng Lian <lian.cs.zju@gmail.com>2014-10-03 12:26:02 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-10-03 12:26:02 -0700
commitbec0d0eaa33811fde72b84f7d53a6f6031e7b5d3 (patch)
tree5a5d3be1918748b19142c2686e96315d57a6a4d8
parentfbe8e9856b23262193105e7bf86075f516f0db25 (diff)
downloadspark-bec0d0eaa33811fde72b84f7d53a6f6031e7b5d3.tar.gz
spark-bec0d0eaa33811fde72b84f7d53a6f6031e7b5d3.tar.bz2
spark-bec0d0eaa33811fde72b84f7d53a6f6031e7b5d3.zip
[SPARK-3007][SQL] Adds dynamic partitioning support
PR #2226 was reverted because it broke Jenkins builds for unknown reason. This debugging PR aims to fix the Jenkins build. This PR also fixes two bugs: 1. Compression configurations in `InsertIntoHiveTable` are disabled by mistake The `FileSinkDesc` object passed to the writer container doesn't have compression related configurations. These configurations are not taken care of until `saveAsHiveFile` is called. This PR moves compression code forward, right after instantiation of the `FileSinkDesc` object. 1. `PreInsertionCasts` doesn't take table partitions into account In `castChildOutput`, `table.attributes` only contains non-partition columns, thus for partitioned table `childOutputDataTypes` never equals to `tableOutputDataTypes`. This results funny analyzed plan like this: ``` == Analyzed Logical Plan == InsertIntoTable Map(partcol1 -> None, partcol2 -> None), false MetastoreRelation default, dynamic_part_table, None Project [c_0#1164,c_1#1165,c_2#1166] Project [c_0#1164,c_1#1165,c_2#1166] Project [c_0#1164,c_1#1165,c_2#1166] ... (repeats 99 times) ... Project [c_0#1164,c_1#1165,c_2#1166] Project [c_0#1164,c_1#1165,c_2#1166] Project [1 AS c_0#1164,1 AS c_1#1165,1 AS c_2#1166] Filter (key#1170 = 150) MetastoreRelation default, src, None ``` Awful though this logical plan looks, it's harmless because all projects will be eliminated by optimizer. Guess that's why this issue hasn't been caught before. Author: Cheng Lian <lian.cs.zju@gmail.com> Author: baishuo(白硕) <vc_java@hotmail.com> Author: baishuo <vc_java@hotmail.com> Closes #2616 from liancheng/dp-fix and squashes the following commits: 21935b6 [Cheng Lian] Adds back deleted trailing space f471c4b [Cheng Lian] PreInsertionCasts should take table partitions into account a132c80 [Cheng Lian] Fixes output compression 9c6eb2d [Cheng Lian] Adds tests to verify dynamic partitioning folder layout 0eed349 [Cheng Lian] Addresses @yhuai's comments 26632c3 [Cheng Lian] Adds more tests 9227181 [Cheng Lian] Minor refactoring c47470e [Cheng Lian] Refactors InsertIntoHiveTable to a Command 6fb16d7 [Cheng Lian] Fixes typo in test name, regenerated golden answer files d53daa5 [Cheng Lian] Refactors dynamic partitioning support b821611 [baishuo] pass check style 997c990 [baishuo] use HiveConf.DEFAULTPARTITIONNAME to replace hive.exec.default.partition.name 761ecf2 [baishuo] modify according micheal's advice 207c6ac [baishuo] modify for some bad indentation caea6fb [baishuo] modify code to pass scala style checks b660e74 [baishuo] delete a empty else branch cd822f0 [baishuo] do a little modify 8e7268c [baishuo] update file after test 3f91665 [baishuo(白硕)] Update Cast.scala 8ad173c [baishuo(白硕)] Update InsertIntoHiveTable.scala 051ba91 [baishuo(白硕)] Update Cast.scala d452eb3 [baishuo(白硕)] Update HiveQuerySuite.scala 37c603b [baishuo(白硕)] Update InsertIntoHiveTable.scala 98cfb1f [baishuo(白硕)] Update HiveCompatibilitySuite.scala 6af73f4 [baishuo(白硕)] Update InsertIntoHiveTable.scala adf02f1 [baishuo(白硕)] Update InsertIntoHiveTable.scala 1867e23 [baishuo(白硕)] Update SparkHadoopWriter.scala 6bb5880 [baishuo(白硕)] Update HiveQl.scala
-rw-r--r--sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala17
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala195
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala3
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala5
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala218
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala217
-rw-r--r--sql/hive/src/test/resources/golden/dynamic_partition-0-be33aaa7253c8f248ff3921cd7dae3400
-rw-r--r--sql/hive/src/test/resources/golden/dynamic_partition-1-640552dd462707563fd255a713f83b410
-rw-r--r--sql/hive/src/test/resources/golden/dynamic_partition-2-36456c9d0d2e3ef72ab5ba9ba48e54931
-rw-r--r--sql/hive/src/test/resources/golden/dynamic_partition-3-b7f7fa7ebf666f4fee27e149d8c6961f0
-rw-r--r--sql/hive/src/test/resources/golden/dynamic_partition-4-8bdb71ad8cb3cc3026043def2525de3a0
-rw-r--r--sql/hive/src/test/resources/golden/dynamic_partition-5-c630dce438f3792e7fb0f523fbbb3e1e0
-rw-r--r--sql/hive/src/test/resources/golden/dynamic_partition-6-7abc9ec8a36cdc5e89e955265a7fd7cf0
-rw-r--r--sql/hive/src/test/resources/golden/dynamic_partition-7-be33aaa7253c8f248ff3921cd7dae3400
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala100
15 files changed, 450 insertions, 306 deletions
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index 556c984ad3..35e9c9939d 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -220,6 +220,23 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
*/
override def whiteList = Seq(
"add_part_exist",
+ "dynamic_partition_skip_default",
+ "infer_bucket_sort_dyn_part",
+ "load_dyn_part1",
+ "load_dyn_part2",
+ "load_dyn_part3",
+ "load_dyn_part4",
+ "load_dyn_part5",
+ "load_dyn_part6",
+ "load_dyn_part7",
+ "load_dyn_part8",
+ "load_dyn_part9",
+ "load_dyn_part10",
+ "load_dyn_part11",
+ "load_dyn_part12",
+ "load_dyn_part13",
+ "load_dyn_part14",
+ "load_dyn_part14_win",
"add_part_multiple",
"add_partition_no_whitelist",
"add_partition_with_whitelist",
diff --git a/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
deleted file mode 100644
index ab7862f4f9..0000000000
--- a/sql/hive/src/main/scala/org/apache/spark/SparkHadoopWriter.scala
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive
-
-import java.io.IOException
-import java.text.NumberFormat
-import java.util.Date
-
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities}
-import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
-import org.apache.hadoop.hive.ql.plan.FileSinkDesc
-import org.apache.hadoop.mapred._
-import org.apache.hadoop.io.Writable
-
-import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter}
-
-/**
- * Internal helper class that saves an RDD using a Hive OutputFormat.
- * It is based on [[SparkHadoopWriter]].
- */
-private[hive] class SparkHiveHadoopWriter(
- @transient jobConf: JobConf,
- fileSinkConf: FileSinkDesc)
- extends Logging
- with SparkHadoopMapRedUtil
- with Serializable {
-
- private val now = new Date()
- private val conf = new SerializableWritable(jobConf)
-
- private var jobID = 0
- private var splitID = 0
- private var attemptID = 0
- private var jID: SerializableWritable[JobID] = null
- private var taID: SerializableWritable[TaskAttemptID] = null
-
- @transient private var writer: FileSinkOperator.RecordWriter = null
- @transient private var format: HiveOutputFormat[AnyRef, Writable] = null
- @transient private var committer: OutputCommitter = null
- @transient private var jobContext: JobContext = null
- @transient private var taskContext: TaskAttemptContext = null
-
- def preSetup() {
- setIDs(0, 0, 0)
- setConfParams()
-
- val jCtxt = getJobContext()
- getOutputCommitter().setupJob(jCtxt)
- }
-
-
- def setup(jobid: Int, splitid: Int, attemptid: Int) {
- setIDs(jobid, splitid, attemptid)
- setConfParams()
- }
-
- def open() {
- val numfmt = NumberFormat.getInstance()
- numfmt.setMinimumIntegerDigits(5)
- numfmt.setGroupingUsed(false)
-
- val extension = Utilities.getFileExtension(
- conf.value,
- fileSinkConf.getCompressed,
- getOutputFormat())
-
- val outputName = "part-" + numfmt.format(splitID) + extension
- val path = FileOutputFormat.getTaskOutputPath(conf.value, outputName)
-
- getOutputCommitter().setupTask(getTaskContext())
- writer = HiveFileFormatUtils.getHiveRecordWriter(
- conf.value,
- fileSinkConf.getTableInfo,
- conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
- fileSinkConf,
- path,
- null)
- }
-
- def write(value: Writable) {
- if (writer != null) {
- writer.write(value)
- } else {
- throw new IOException("Writer is null, open() has not been called")
- }
- }
-
- def close() {
- // Seems the boolean value passed into close does not matter.
- writer.close(false)
- }
-
- def commit() {
- val taCtxt = getTaskContext()
- val cmtr = getOutputCommitter()
- if (cmtr.needsTaskCommit(taCtxt)) {
- try {
- cmtr.commitTask(taCtxt)
- logInfo (taID + ": Committed")
- } catch {
- case e: IOException =>
- logError("Error committing the output of task: " + taID.value, e)
- cmtr.abortTask(taCtxt)
- throw e
- }
- } else {
- logWarning ("No need to commit output of task: " + taID.value)
- }
- }
-
- def commitJob() {
- // always ? Or if cmtr.needsTaskCommit ?
- val cmtr = getOutputCommitter()
- cmtr.commitJob(getJobContext())
- }
-
- // ********* Private Functions *********
-
- private def getOutputFormat(): HiveOutputFormat[AnyRef,Writable] = {
- if (format == null) {
- format = conf.value.getOutputFormat()
- .asInstanceOf[HiveOutputFormat[AnyRef,Writable]]
- }
- format
- }
-
- private def getOutputCommitter(): OutputCommitter = {
- if (committer == null) {
- committer = conf.value.getOutputCommitter
- }
- committer
- }
-
- private def getJobContext(): JobContext = {
- if (jobContext == null) {
- jobContext = newJobContext(conf.value, jID.value)
- }
- jobContext
- }
-
- private def getTaskContext(): TaskAttemptContext = {
- if (taskContext == null) {
- taskContext = newTaskAttemptContext(conf.value, taID.value)
- }
- taskContext
- }
-
- private def setIDs(jobId: Int, splitId: Int, attemptId: Int) {
- jobID = jobId
- splitID = splitId
- attemptID = attemptId
-
- jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobId))
- taID = new SerializableWritable[TaskAttemptID](
- new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID))
- }
-
- private def setConfParams() {
- conf.value.set("mapred.job.id", jID.value.toString)
- conf.value.set("mapred.tip.id", taID.value.getTaskID.toString)
- conf.value.set("mapred.task.id", taID.value.toString)
- conf.value.setBoolean("mapred.task.is.map", true)
- conf.value.setInt("mapred.task.partition", splitID)
- }
-}
-
-private[hive] object SparkHiveHadoopWriter {
- def createPathFromString(path: String, conf: JobConf): Path = {
- if (path == null) {
- throw new IllegalArgumentException("Output path is null")
- }
- val outputPath = new Path(path)
- val fs = outputPath.getFileSystem(conf)
- if (outputPath == null || fs == null) {
- throw new IllegalArgumentException("Incorrectly formatted output path")
- }
- outputPath.makeQualified(fs)
- }
-}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 06b1446ccb..989a9784a4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -144,7 +144,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
val childOutputDataTypes = child.output.map(_.dataType)
// Only check attributes, not partitionKeys since they are always strings.
// TODO: Fully support inserting into partitioned tables.
- val tableOutputDataTypes = table.attributes.map(_.dataType)
+ val tableOutputDataTypes =
+ table.attributes.map(_.dataType) ++ table.partitionKeys.map(_.dataType)
if (childOutputDataTypes == tableOutputDataTypes) {
p
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 6bb42eeb05..32c9175f18 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -806,11 +806,6 @@ private[hive] object HiveQl {
cleanIdentifier(key.toLowerCase) -> None
}.toMap).getOrElse(Map.empty)
- if (partitionKeys.values.exists(p => p.isEmpty)) {
- throw new NotImplementedError(s"Do not support INSERT INTO/OVERWRITE with" +
- s"dynamic partitioning.")
- }
-
InsertIntoTable(UnresolvedRelation(db, tableName, None), partitionKeys, query, overwrite)
case a: ASTNode =>
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index a284a91a91..16a8c782ac 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -19,27 +19,25 @@ package org.apache.spark.sql.hive.execution
import scala.collection.JavaConversions._
-import java.util.{HashMap => JHashMap}
-
import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar}
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.apache.hadoop.hive.metastore.MetaStoreUtils
-import org.apache.hadoop.hive.ql.Context
import org.apache.hadoop.hive.ql.metadata.Hive
import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
+import org.apache.hadoop.hive.ql.{Context, ErrorMsg}
import org.apache.hadoop.hive.serde2.Serializer
-import org.apache.hadoop.hive.serde2.objectinspector._
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveDecimalObjectInspector
-import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveVarcharObjectInspector
-import org.apache.hadoop.io.Writable
+import org.apache.hadoop.hive.serde2.objectinspector._
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.{JavaHiveDecimalObjectInspector, JavaHiveVarcharObjectInspector}
import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf}
-import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.Row
-import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
-import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation, SparkHiveHadoopWriter}
+import org.apache.spark.sql.execution.{Command, SparkPlan, UnaryNode}
+import org.apache.spark.sql.hive._
+import org.apache.spark.{SerializableWritable, SparkException, TaskContext}
/**
* :: DeveloperApi ::
@@ -51,7 +49,7 @@ case class InsertIntoHiveTable(
child: SparkPlan,
overwrite: Boolean)
(@transient sc: HiveContext)
- extends UnaryNode {
+ extends UnaryNode with Command {
@transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
@transient private lazy val hiveContext = new Context(sc.hiveconf)
@@ -101,66 +99,61 @@ case class InsertIntoHiveTable(
}
def saveAsHiveFile(
- rdd: RDD[Writable],
+ rdd: RDD[Row],
valueClass: Class[_],
fileSinkConf: FileSinkDesc,
- conf: JobConf,
- isCompressed: Boolean) {
- if (valueClass == null) {
- throw new SparkException("Output value class not set")
- }
- conf.setOutputValueClass(valueClass)
- if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null) {
- throw new SparkException("Output format class not set")
- }
- // Doesn't work in Scala 2.9 due to what may be a generics bug
- // TODO: Should we uncomment this for Scala 2.10?
- // conf.setOutputFormat(outputFormatClass)
- conf.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName)
- if (isCompressed) {
- // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
- // and "mapred.output.compression.type" have no impact on ORC because it uses table properties
- // to store compression information.
- conf.set("mapred.output.compress", "true")
- fileSinkConf.setCompressed(true)
- fileSinkConf.setCompressCodec(conf.get("mapred.output.compression.codec"))
- fileSinkConf.setCompressType(conf.get("mapred.output.compression.type"))
- }
- conf.setOutputCommitter(classOf[FileOutputCommitter])
- FileOutputFormat.setOutputPath(
- conf,
- SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, conf))
+ conf: SerializableWritable[JobConf],
+ writerContainer: SparkHiveWriterContainer) {
+ assert(valueClass != null, "Output value class not set")
+ conf.value.setOutputValueClass(valueClass)
+
+ val outputFileFormatClassName = fileSinkConf.getTableInfo.getOutputFileFormatClassName
+ assert(outputFileFormatClassName != null, "Output format class not set")
+ conf.value.set("mapred.output.format.class", outputFileFormatClassName)
+ conf.value.setOutputCommitter(classOf[FileOutputCommitter])
+ FileOutputFormat.setOutputPath(
+ conf.value,
+ SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName, conf.value))
log.debug("Saving as hadoop file of type " + valueClass.getSimpleName)
- val writer = new SparkHiveHadoopWriter(conf, fileSinkConf)
- writer.preSetup()
+ writerContainer.driverSideSetup()
+ sc.sparkContext.runJob(rdd, writeToFile _)
+ writerContainer.commitJob()
+
+ // Note that this function is executed on executor side
+ def writeToFile(context: TaskContext, iterator: Iterator[Row]) {
+ val serializer = newSerializer(fileSinkConf.getTableInfo)
+ val standardOI = ObjectInspectorUtils
+ .getStandardObjectInspector(
+ fileSinkConf.getTableInfo.getDeserializer.getObjectInspector,
+ ObjectInspectorCopyOption.JAVA)
+ .asInstanceOf[StructObjectInspector]
+
+ val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray
+ val outputData = new Array[Any](fieldOIs.length)
- def writeToFile(context: TaskContext, iter: Iterator[Writable]) {
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
+ writerContainer.executorSideSetup(context.stageId, context.partitionId, attemptNumber)
- writer.setup(context.stageId, context.partitionId, attemptNumber)
- writer.open()
+ iterator.foreach { row =>
+ var i = 0
+ while (i < fieldOIs.length) {
+ // TODO (lian) avoid per row dynamic dispatching and pattern matching cost in `wrap`
+ outputData(i) = wrap(row(i), fieldOIs(i))
+ i += 1
+ }
- var count = 0
- while(iter.hasNext) {
- val record = iter.next()
- count += 1
- writer.write(record)
+ val writer = writerContainer.getLocalFileWriter(row)
+ writer.write(serializer.serialize(outputData, standardOI))
}
- writer.close()
- writer.commit()
+ writerContainer.close()
}
-
- sc.sparkContext.runJob(rdd, writeToFile _)
- writer.commitJob()
}
- override def execute() = result
-
/**
* Inserts all the rows in the table into Hive. Row objects are properly serialized with the
* `org.apache.hadoop.hive.serde2.SerDe` and the
@@ -168,50 +161,69 @@ case class InsertIntoHiveTable(
*
* Note: this is run once and then kept to avoid double insertions.
*/
- private lazy val result: RDD[Row] = {
- val childRdd = child.execute()
- assert(childRdd != null)
-
+ override protected[sql] lazy val sideEffectResult: Seq[Row] = {
// Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer
// instances within the closure, since Serializer is not serializable while TableDesc is.
val tableDesc = table.tableDesc
val tableLocation = table.hiveQlTable.getDataLocation
val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation)
val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
- val rdd = childRdd.mapPartitions { iter =>
- val serializer = newSerializer(fileSinkConf.getTableInfo)
- val standardOI = ObjectInspectorUtils
- .getStandardObjectInspector(
- fileSinkConf.getTableInfo.getDeserializer.getObjectInspector,
- ObjectInspectorCopyOption.JAVA)
- .asInstanceOf[StructObjectInspector]
+ val isCompressed = sc.hiveconf.getBoolean(
+ ConfVars.COMPRESSRESULT.varname, ConfVars.COMPRESSRESULT.defaultBoolVal)
+ if (isCompressed) {
+ // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
+ // and "mapred.output.compression.type" have no impact on ORC because it uses table properties
+ // to store compression information.
+ sc.hiveconf.set("mapred.output.compress", "true")
+ fileSinkConf.setCompressed(true)
+ fileSinkConf.setCompressCodec(sc.hiveconf.get("mapred.output.compression.codec"))
+ fileSinkConf.setCompressType(sc.hiveconf.get("mapred.output.compression.type"))
+ }
- val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray
- val outputData = new Array[Any](fieldOIs.length)
- iter.map { row =>
- var i = 0
- while (i < row.length) {
- // Casts Strings to HiveVarchars when necessary.
- outputData(i) = wrap(row(i), fieldOIs(i))
- i += 1
- }
+ val numDynamicPartitions = partition.values.count(_.isEmpty)
+ val numStaticPartitions = partition.values.count(_.nonEmpty)
+ val partitionSpec = partition.map {
+ case (key, Some(value)) => key -> value
+ case (key, None) => key -> ""
+ }
+
+ // All partition column names in the format of "<column name 1>/<column name 2>/..."
+ val partitionColumns = fileSinkConf.getTableInfo.getProperties.getProperty("partition_columns")
+ val partitionColumnNames = Option(partitionColumns).map(_.split("/")).orNull
+
+ // Validate partition spec if there exist any dynamic partitions
+ if (numDynamicPartitions > 0) {
+ // Report error if dynamic partitioning is not enabled
+ if (!sc.hiveconf.getBoolVar(HiveConf.ConfVars.DYNAMICPARTITIONING)) {
+ throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_DISABLED.getMsg)
+ }
- serializer.serialize(outputData, standardOI)
+ // Report error if dynamic partition strict mode is on but no static partition is found
+ if (numStaticPartitions == 0 &&
+ sc.hiveconf.getVar(HiveConf.ConfVars.DYNAMICPARTITIONINGMODE).equalsIgnoreCase("strict")) {
+ throw new SparkException(ErrorMsg.DYNAMIC_PARTITION_STRICT_MODE.getMsg)
+ }
+
+ // Report error if any static partition appears after a dynamic partition
+ val isDynamic = partitionColumnNames.map(partitionSpec(_).isEmpty)
+ isDynamic.init.zip(isDynamic.tail).find(_ == (true, false)).foreach { _ =>
+ throw new SparkException(ErrorMsg.PARTITION_DYN_STA_ORDER.getMsg)
}
}
- // ORC stores compression information in table properties. While, there are other formats
- // (e.g. RCFile) that rely on hadoop configurations to store compression information.
val jobConf = new JobConf(sc.hiveconf)
- saveAsHiveFile(
- rdd,
- outputClass,
- fileSinkConf,
- jobConf,
- sc.hiveconf.getBoolean("hive.exec.compress.output", false))
-
- // TODO: Handle dynamic partitioning.
+ val jobConfSer = new SerializableWritable(jobConf)
+
+ val writerContainer = if (numDynamicPartitions > 0) {
+ val dynamicPartColNames = partitionColumnNames.takeRight(numDynamicPartitions)
+ new SparkHiveDynamicPartitionWriterContainer(jobConf, fileSinkConf, dynamicPartColNames)
+ } else {
+ new SparkHiveWriterContainer(jobConf, fileSinkConf)
+ }
+
+ saveAsHiveFile(child.execute(), outputClass, fileSinkConf, jobConfSer, writerContainer)
+
val outputPath = FileOutputFormat.getOutputPath(jobConf)
// Have to construct the format of dbname.tablename.
val qualifiedTableName = s"${table.databaseName}.${table.tableName}"
@@ -220,10 +232,6 @@ case class InsertIntoHiveTable(
// holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint.
val holdDDLTime = false
if (partition.nonEmpty) {
- val partitionSpec = partition.map {
- case (key, Some(value)) => key -> value
- case (key, None) => key -> "" // Should not reach here right now.
- }
val partVals = MetaStoreUtils.getPvals(table.hiveQlTable.getPartCols, partitionSpec)
db.validatePartitionNameCharacters(partVals)
// inheritTableSpecs is set to true. It should be set to false for a IMPORT query
@@ -231,14 +239,26 @@ case class InsertIntoHiveTable(
val inheritTableSpecs = true
// TODO: Correctly set isSkewedStoreAsSubdir.
val isSkewedStoreAsSubdir = false
- db.loadPartition(
- outputPath,
- qualifiedTableName,
- partitionSpec,
- overwrite,
- holdDDLTime,
- inheritTableSpecs,
- isSkewedStoreAsSubdir)
+ if (numDynamicPartitions > 0) {
+ db.loadDynamicPartitions(
+ outputPath,
+ qualifiedTableName,
+ partitionSpec,
+ overwrite,
+ numDynamicPartitions,
+ holdDDLTime,
+ isSkewedStoreAsSubdir
+ )
+ } else {
+ db.loadPartition(
+ outputPath,
+ qualifiedTableName,
+ partitionSpec,
+ overwrite,
+ holdDDLTime,
+ inheritTableSpecs,
+ isSkewedStoreAsSubdir)
+ }
} else {
db.loadTable(
outputPath,
@@ -251,6 +271,6 @@ case class InsertIntoHiveTable(
// however for now we return an empty list to simplify compatibility checks with hive, which
// does not return anything for insert operations.
// TODO: implement hive compatibility as rules.
- sc.sparkContext.makeRDD(Nil, 1)
+ Seq.empty[Row]
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
new file mode 100644
index 0000000000..ac5c7a8220
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveWriterContainers.scala
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.IOException
+import java.text.NumberFormat
+import java.util.Date
+
+import scala.collection.mutable
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars
+import org.apache.hadoop.hive.ql.exec.{FileSinkOperator, Utilities}
+import org.apache.hadoop.hive.ql.io.{HiveFileFormatUtils, HiveOutputFormat}
+import org.apache.hadoop.hive.ql.plan.FileSinkDesc
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred._
+
+import org.apache.spark.sql.Row
+import org.apache.spark.{Logging, SerializableWritable, SparkHadoopWriter}
+
+/**
+ * Internal helper class that saves an RDD using a Hive OutputFormat.
+ * It is based on [[SparkHadoopWriter]].
+ */
+private[hive] class SparkHiveWriterContainer(
+ @transient jobConf: JobConf,
+ fileSinkConf: FileSinkDesc)
+ extends Logging
+ with SparkHadoopMapRedUtil
+ with Serializable {
+
+ private val now = new Date()
+ protected val conf = new SerializableWritable(jobConf)
+
+ private var jobID = 0
+ private var splitID = 0
+ private var attemptID = 0
+ private var jID: SerializableWritable[JobID] = null
+ private var taID: SerializableWritable[TaskAttemptID] = null
+
+ @transient private var writer: FileSinkOperator.RecordWriter = null
+ @transient private lazy val committer = conf.value.getOutputCommitter
+ @transient private lazy val jobContext = newJobContext(conf.value, jID.value)
+ @transient private lazy val taskContext = newTaskAttemptContext(conf.value, taID.value)
+ @transient private lazy val outputFormat =
+ conf.value.getOutputFormat.asInstanceOf[HiveOutputFormat[AnyRef,Writable]]
+
+ def driverSideSetup() {
+ setIDs(0, 0, 0)
+ setConfParams()
+ committer.setupJob(jobContext)
+ }
+
+ def executorSideSetup(jobId: Int, splitId: Int, attemptId: Int) {
+ setIDs(jobId, splitId, attemptId)
+ setConfParams()
+ committer.setupTask(taskContext)
+ initWriters()
+ }
+
+ protected def getOutputName: String = {
+ val numberFormat = NumberFormat.getInstance()
+ numberFormat.setMinimumIntegerDigits(5)
+ numberFormat.setGroupingUsed(false)
+ val extension = Utilities.getFileExtension(conf.value, fileSinkConf.getCompressed, outputFormat)
+ "part-" + numberFormat.format(splitID) + extension
+ }
+
+ def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = writer
+
+ def close() {
+ // Seems the boolean value passed into close does not matter.
+ writer.close(false)
+ commit()
+ }
+
+ def commitJob() {
+ committer.commitJob(jobContext)
+ }
+
+ protected def initWriters() {
+ // NOTE this method is executed at the executor side.
+ // For Hive tables without partitions or with only static partitions, only 1 writer is needed.
+ writer = HiveFileFormatUtils.getHiveRecordWriter(
+ conf.value,
+ fileSinkConf.getTableInfo,
+ conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
+ fileSinkConf,
+ FileOutputFormat.getTaskOutputPath(conf.value, getOutputName),
+ Reporter.NULL)
+ }
+
+ protected def commit() {
+ if (committer.needsTaskCommit(taskContext)) {
+ try {
+ committer.commitTask(taskContext)
+ logInfo (taID + ": Committed")
+ } catch {
+ case e: IOException =>
+ logError("Error committing the output of task: " + taID.value, e)
+ committer.abortTask(taskContext)
+ throw e
+ }
+ } else {
+ logInfo("No need to commit output of task: " + taID.value)
+ }
+ }
+
+ // ********* Private Functions *********
+
+ private def setIDs(jobId: Int, splitId: Int, attemptId: Int) {
+ jobID = jobId
+ splitID = splitId
+ attemptID = attemptId
+
+ jID = new SerializableWritable[JobID](SparkHadoopWriter.createJobID(now, jobId))
+ taID = new SerializableWritable[TaskAttemptID](
+ new TaskAttemptID(new TaskID(jID.value, true, splitID), attemptID))
+ }
+
+ private def setConfParams() {
+ conf.value.set("mapred.job.id", jID.value.toString)
+ conf.value.set("mapred.tip.id", taID.value.getTaskID.toString)
+ conf.value.set("mapred.task.id", taID.value.toString)
+ conf.value.setBoolean("mapred.task.is.map", true)
+ conf.value.setInt("mapred.task.partition", splitID)
+ }
+}
+
+private[hive] object SparkHiveWriterContainer {
+ def createPathFromString(path: String, conf: JobConf): Path = {
+ if (path == null) {
+ throw new IllegalArgumentException("Output path is null")
+ }
+ val outputPath = new Path(path)
+ val fs = outputPath.getFileSystem(conf)
+ if (outputPath == null || fs == null) {
+ throw new IllegalArgumentException("Incorrectly formatted output path")
+ }
+ outputPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
+ }
+}
+
+private[spark] class SparkHiveDynamicPartitionWriterContainer(
+ @transient jobConf: JobConf,
+ fileSinkConf: FileSinkDesc,
+ dynamicPartColNames: Array[String])
+ extends SparkHiveWriterContainer(jobConf, fileSinkConf) {
+
+ private val defaultPartName = jobConf.get(
+ ConfVars.DEFAULTPARTITIONNAME.varname, ConfVars.DEFAULTPARTITIONNAME.defaultVal)
+
+ @transient private var writers: mutable.HashMap[String, FileSinkOperator.RecordWriter] = _
+
+ override protected def initWriters(): Unit = {
+ // NOTE: This method is executed at the executor side.
+ // Actual writers are created for each dynamic partition on the fly.
+ writers = mutable.HashMap.empty[String, FileSinkOperator.RecordWriter]
+ }
+
+ override def close(): Unit = {
+ writers.values.foreach(_.close(false))
+ commit()
+ }
+
+ override def getLocalFileWriter(row: Row): FileSinkOperator.RecordWriter = {
+ val dynamicPartPath = dynamicPartColNames
+ .zip(row.takeRight(dynamicPartColNames.length))
+ .map { case (col, rawVal) =>
+ val string = if (rawVal == null) null else String.valueOf(rawVal)
+ s"/$col=${if (string == null || string.isEmpty) defaultPartName else string}"
+ }
+ .mkString
+
+ def newWriter = {
+ val newFileSinkDesc = new FileSinkDesc(
+ fileSinkConf.getDirName + dynamicPartPath,
+ fileSinkConf.getTableInfo,
+ fileSinkConf.getCompressed)
+ newFileSinkDesc.setCompressCodec(fileSinkConf.getCompressCodec)
+ newFileSinkDesc.setCompressType(fileSinkConf.getCompressType)
+
+ val path = {
+ val outputPath = FileOutputFormat.getOutputPath(conf.value)
+ assert(outputPath != null, "Undefined job output-path")
+ val workPath = new Path(outputPath, dynamicPartPath.stripPrefix("/"))
+ new Path(workPath, getOutputName)
+ }
+
+ HiveFileFormatUtils.getHiveRecordWriter(
+ conf.value,
+ fileSinkConf.getTableInfo,
+ conf.value.getOutputValueClass.asInstanceOf[Class[Writable]],
+ newFileSinkDesc,
+ path,
+ Reporter.NULL)
+ }
+
+ writers.getOrElseUpdate(dynamicPartPath, newWriter)
+ }
+}
diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-0-be33aaa7253c8f248ff3921cd7dae340 b/sql/hive/src/test/resources/golden/dynamic_partition-0-be33aaa7253c8f248ff3921cd7dae340
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/dynamic_partition-0-be33aaa7253c8f248ff3921cd7dae340
diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-1-640552dd462707563fd255a713f83b41 b/sql/hive/src/test/resources/golden/dynamic_partition-1-640552dd462707563fd255a713f83b41
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/dynamic_partition-1-640552dd462707563fd255a713f83b41
diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-2-36456c9d0d2e3ef72ab5ba9ba48e5493 b/sql/hive/src/test/resources/golden/dynamic_partition-2-36456c9d0d2e3ef72ab5ba9ba48e5493
new file mode 100644
index 0000000000..573541ac97
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/dynamic_partition-2-36456c9d0d2e3ef72ab5ba9ba48e5493
@@ -0,0 +1 @@
+0
diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-3-b7f7fa7ebf666f4fee27e149d8c6961f b/sql/hive/src/test/resources/golden/dynamic_partition-3-b7f7fa7ebf666f4fee27e149d8c6961f
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/dynamic_partition-3-b7f7fa7ebf666f4fee27e149d8c6961f
diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-4-8bdb71ad8cb3cc3026043def2525de3a b/sql/hive/src/test/resources/golden/dynamic_partition-4-8bdb71ad8cb3cc3026043def2525de3a
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/dynamic_partition-4-8bdb71ad8cb3cc3026043def2525de3a
diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-5-c630dce438f3792e7fb0f523fbbb3e1e b/sql/hive/src/test/resources/golden/dynamic_partition-5-c630dce438f3792e7fb0f523fbbb3e1e
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/dynamic_partition-5-c630dce438f3792e7fb0f523fbbb3e1e
diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-6-7abc9ec8a36cdc5e89e955265a7fd7cf b/sql/hive/src/test/resources/golden/dynamic_partition-6-7abc9ec8a36cdc5e89e955265a7fd7cf
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/dynamic_partition-6-7abc9ec8a36cdc5e89e955265a7fd7cf
diff --git a/sql/hive/src/test/resources/golden/dynamic_partition-7-be33aaa7253c8f248ff3921cd7dae340 b/sql/hive/src/test/resources/golden/dynamic_partition-7-be33aaa7253c8f248ff3921cd7dae340
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/dynamic_partition-7-be33aaa7253c8f248ff3921cd7dae340
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index f5868bff22..2e282a9ade 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -19,6 +19,9 @@ package org.apache.spark.sql.hive.execution
import scala.util.Try
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars
+
+import org.apache.spark.SparkException
import org.apache.spark.sql.hive._
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
@@ -380,7 +383,7 @@ class HiveQuerySuite extends HiveComparisonTest {
def isExplanation(result: SchemaRDD) = {
val explanation = result.select('plan).collect().map { case Row(plan: String) => plan }
- explanation.exists(_ == "== Physical Plan ==")
+ explanation.contains("== Physical Plan ==")
}
test("SPARK-1704: Explain commands as a SchemaRDD") {
@@ -568,6 +571,91 @@ class HiveQuerySuite extends HiveComparisonTest {
case class LogEntry(filename: String, message: String)
case class LogFile(name: String)
+ createQueryTest("dynamic_partition",
+ """
+ |DROP TABLE IF EXISTS dynamic_part_table;
+ |CREATE TABLE dynamic_part_table(intcol INT) PARTITIONED BY (partcol1 INT, partcol2 INT);
+ |
+ |SET hive.exec.dynamic.partition.mode=nonstrict;
+ |
+ |INSERT INTO TABLE dynamic_part_table PARTITION(partcol1, partcol2)
+ |SELECT 1, 1, 1 FROM src WHERE key=150;
+ |
+ |INSERT INTO TABLE dynamic_part_table PARTITION(partcol1, partcol2)
+ |SELECT 1, NULL, 1 FROM src WHERE key=150;
+ |
+ |INSERT INTO TABLE dynamic_part_table PARTITION(partcol1, partcol2)
+ |SELECT 1, 1, NULL FROM src WHERE key=150;
+ |
+ |INSERT INTO TABLe dynamic_part_table PARTITION(partcol1, partcol2)
+ |SELECT 1, NULL, NULL FROM src WHERE key=150;
+ |
+ |DROP TABLE IF EXISTS dynamic_part_table;
+ """.stripMargin)
+
+ test("Dynamic partition folder layout") {
+ sql("DROP TABLE IF EXISTS dynamic_part_table")
+ sql("CREATE TABLE dynamic_part_table(intcol INT) PARTITIONED BY (partcol1 INT, partcol2 INT)")
+ sql("SET hive.exec.dynamic.partition.mode=nonstrict")
+
+ val data = Map(
+ Seq("1", "1") -> 1,
+ Seq("1", "NULL") -> 2,
+ Seq("NULL", "1") -> 3,
+ Seq("NULL", "NULL") -> 4)
+
+ data.foreach { case (parts, value) =>
+ sql(
+ s"""INSERT INTO TABLE dynamic_part_table PARTITION(partcol1, partcol2)
+ |SELECT $value, ${parts.mkString(", ")} FROM src WHERE key=150
+ """.stripMargin)
+
+ val partFolder = Seq("partcol1", "partcol2")
+ .zip(parts)
+ .map { case (k, v) =>
+ if (v == "NULL") {
+ s"$k=${ConfVars.DEFAULTPARTITIONNAME.defaultVal}"
+ } else {
+ s"$k=$v"
+ }
+ }
+ .mkString("/")
+
+ // Loads partition data to a temporary table to verify contents
+ val path = s"$warehousePath/dynamic_part_table/$partFolder/part-00000"
+
+ sql("DROP TABLE IF EXISTS dp_verify")
+ sql("CREATE TABLE dp_verify(intcol INT)")
+ sql(s"LOAD DATA LOCAL INPATH '$path' INTO TABLE dp_verify")
+
+ assert(sql("SELECT * FROM dp_verify").collect() === Array(Row(value)))
+ }
+ }
+
+ test("Partition spec validation") {
+ sql("DROP TABLE IF EXISTS dp_test")
+ sql("CREATE TABLE dp_test(key INT, value STRING) PARTITIONED BY (dp INT, sp INT)")
+ sql("SET hive.exec.dynamic.partition.mode=strict")
+
+ // Should throw when using strict dynamic partition mode without any static partition
+ intercept[SparkException] {
+ sql(
+ """INSERT INTO TABLE dp_test PARTITION(dp)
+ |SELECT key, value, key % 5 FROM src
+ """.stripMargin)
+ }
+
+ sql("SET hive.exec.dynamic.partition.mode=nonstrict")
+
+ // Should throw when a static partition appears after a dynamic partition
+ intercept[SparkException] {
+ sql(
+ """INSERT INTO TABLE dp_test PARTITION(dp, sp = 1)
+ |SELECT key, value, key % 5 FROM src
+ """.stripMargin)
+ }
+ }
+
test("SPARK-3414 regression: should store analyzed logical plan when registering a temp table") {
sparkContext.makeRDD(Seq.empty[LogEntry]).registerTempTable("rawLogs")
sparkContext.makeRDD(Seq.empty[LogFile]).registerTempTable("logFiles")
@@ -625,27 +713,27 @@ class HiveQuerySuite extends HiveComparisonTest {
assert(sql("SET").collect().size == 0)
assertResult(Set(testKey -> testVal)) {
- collectResults(hql(s"SET $testKey=$testVal"))
+ collectResults(sql(s"SET $testKey=$testVal"))
}
assert(hiveconf.get(testKey, "") == testVal)
assertResult(Set(testKey -> testVal)) {
- collectResults(hql("SET"))
+ collectResults(sql("SET"))
}
sql(s"SET ${testKey + testKey}=${testVal + testVal}")
assert(hiveconf.get(testKey + testKey, "") == testVal + testVal)
assertResult(Set(testKey -> testVal, (testKey + testKey) -> (testVal + testVal))) {
- collectResults(hql("SET"))
+ collectResults(sql("SET"))
}
// "set key"
assertResult(Set(testKey -> testVal)) {
- collectResults(hql(s"SET $testKey"))
+ collectResults(sql(s"SET $testKey"))
}
assertResult(Set(nonexistentKey -> "<undefined>")) {
- collectResults(hql(s"SET $nonexistentKey"))
+ collectResults(sql(s"SET $nonexistentKey"))
}
// Assert that sql() should have the same effects as sql() by repeating the above using sql().