aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorReynold Xin <rxin@apache.org>2014-06-21 12:04:18 -0700
committerMichael Armbrust <michael@databricks.com>2014-06-21 12:04:31 -0700
commit4881fc62defee542df56be8f22f237b4dedd77fe (patch)
tree298e8808f661aa1031242d3fd2bc199f27902334 /sql/hive
parent1829ec4111d80fe4fb177faef33b46a6b80fd88d (diff)
downloadspark-4881fc62defee542df56be8f22f237b4dedd77fe.tar.gz
spark-4881fc62defee542df56be8f22f237b4dedd77fe.tar.bz2
spark-4881fc62defee542df56be8f22f237b4dedd77fe.zip
[SQL] Break hiveOperators.scala into multiple files.
The single file was getting very long (500+ loc). Author: Reynold Xin <rxin@apache.org> Closes #1166 from rxin/hiveOperators and squashes the following commits: 5b43068 [Reynold Xin] [SQL] Break hiveOperators.scala into multiple files. (cherry picked from commit ec935abce13b60f353236566da149c0c87bb1002) Signed-off-by: Michael Armbrust <michael@databricks.com>
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala9
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala88
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala223
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala248
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala47
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/hiveOperators.scala524
6 files changed, 610 insertions, 529 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index af7687b404..4d0fab4140 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -64,7 +64,6 @@ private[hive] trait HiveStrategies {
val partitionKeyIds = relation.partitionKeys.map(_.exprId).toSet
val (pruningPredicates, otherPredicates) = predicates.partition {
_.references.map(_.exprId).subsetOf(partitionKeyIds)
-
}
pruneFilterProject(
@@ -81,16 +80,16 @@ private[hive] trait HiveStrategies {
def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case logical.NativeCommand(sql) =>
NativeCommand(sql, plan.output)(context) :: Nil
- case describe: logical.DescribeCommand => {
+
+ case describe: logical.DescribeCommand =>
val resolvedTable = context.executePlan(describe.table).analyzed
resolvedTable match {
case t: MetastoreRelation =>
- Seq(DescribeHiveTableCommand(
- t, describe.output, describe.isExtended)(context))
+ Seq(DescribeHiveTableCommand(t, describe.output, describe.isExtended)(context))
case o: LogicalPlan =>
Seq(DescribeCommand(planLater(o), describe.output)(context))
}
- }
+
case _ => Nil
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
new file mode 100644
index 0000000000..a40e89e0d3
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import scala.collection.JavaConversions._
+
+import org.apache.hadoop.hive.metastore.api.FieldSchema
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow, Row}
+import org.apache.spark.sql.execution.{Command, LeafNode}
+import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation}
+
+/**
+ * Implementation for "describe [extended] table".
+ *
+ * :: DeveloperApi ::
+ */
+@DeveloperApi
+case class DescribeHiveTableCommand(
+ table: MetastoreRelation,
+ output: Seq[Attribute],
+ isExtended: Boolean)(
+ @transient context: HiveContext)
+ extends LeafNode with Command {
+
+ // Strings with the format like Hive. It is used for result comparison in our unit tests.
+ lazy val hiveString: Seq[String] = {
+ val alignment = 20
+ val delim = "\t"
+
+ sideEffectResult.map {
+ case (name, dataType, comment) =>
+ String.format("%-" + alignment + "s", name) + delim +
+ String.format("%-" + alignment + "s", dataType) + delim +
+ String.format("%-" + alignment + "s", Option(comment).getOrElse("None"))
+ }
+ }
+
+ override protected[sql] lazy val sideEffectResult: Seq[(String, String, String)] = {
+ // Trying to mimic the format of Hive's output. But not exactly the same.
+ var results: Seq[(String, String, String)] = Nil
+
+ val columns: Seq[FieldSchema] = table.hiveQlTable.getCols
+ val partitionColumns: Seq[FieldSchema] = table.hiveQlTable.getPartCols
+ results ++= columns.map(field => (field.getName, field.getType, field.getComment))
+ if (!partitionColumns.isEmpty) {
+ val partColumnInfo =
+ partitionColumns.map(field => (field.getName, field.getType, field.getComment))
+ results ++=
+ partColumnInfo ++
+ Seq(("# Partition Information", "", "")) ++
+ Seq((s"# ${output.get(0).name}", output.get(1).name, output.get(2).name)) ++
+ partColumnInfo
+ }
+
+ if (isExtended) {
+ results ++= Seq(("Detailed Table Information", table.hiveQlTable.getTTable.toString, ""))
+ }
+
+ results
+ }
+
+ override def execute(): RDD[Row] = {
+ val rows = sideEffectResult.map {
+ case (name, dataType, comment) => new GenericRow(Array[Any](name, dataType, comment))
+ }
+ context.sparkContext.parallelize(rows, 1)
+ }
+
+ override def otherCopyArgs = context :: Nil
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
new file mode 100644
index 0000000000..ef8bae7453
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import scala.collection.JavaConversions._
+
+import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar}
+import org.apache.hadoop.hive.conf.HiveConf
+import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition}
+import org.apache.hadoop.hive.serde.serdeConstants
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils
+import org.apache.hadoop.hive.serde2.objectinspector._
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.hive.serde2.objectinspector.primitive._
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types.{BooleanType, DataType}
+import org.apache.spark.sql.execution._
+import org.apache.spark.sql.hive._
+import org.apache.spark.util.MutablePair
+
+/**
+ * :: DeveloperApi ::
+ * The Hive table scan operator. Column and partition pruning are both handled.
+ *
+ * @param attributes Attributes to be fetched from the Hive table.
+ * @param relation The Hive table be be scanned.
+ * @param partitionPruningPred An optional partition pruning predicate for partitioned table.
+ */
+@DeveloperApi
+case class HiveTableScan(
+ attributes: Seq[Attribute],
+ relation: MetastoreRelation,
+ partitionPruningPred: Option[Expression])(
+ @transient val context: HiveContext)
+ extends LeafNode
+ with HiveInspectors {
+
+ require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned,
+ "Partition pruning predicates only supported for partitioned tables.")
+
+ // Bind all partition key attribute references in the partition pruning predicate for later
+ // evaluation.
+ private[this] val boundPruningPred = partitionPruningPred.map { pred =>
+ require(
+ pred.dataType == BooleanType,
+ s"Data type of predicate $pred must be BooleanType rather than ${pred.dataType}.")
+
+ BindReferences.bindReference(pred, relation.partitionKeys)
+ }
+
+ @transient
+ private[this] val hadoopReader = new HadoopTableReader(relation.tableDesc, context)
+
+ /**
+ * The hive object inspector for this table, which can be used to extract values from the
+ * serialized row representation.
+ */
+ @transient
+ private[this] lazy val objectInspector =
+ relation.tableDesc.getDeserializer.getObjectInspector.asInstanceOf[StructObjectInspector]
+
+ /**
+ * Functions that extract the requested attributes from the hive output. Partitioned values are
+ * casted from string to its declared data type.
+ */
+ @transient
+ protected lazy val attributeFunctions: Seq[(Any, Array[String]) => Any] = {
+ attributes.map { a =>
+ val ordinal = relation.partitionKeys.indexOf(a)
+ if (ordinal >= 0) {
+ val dataType = relation.partitionKeys(ordinal).dataType
+ (_: Any, partitionKeys: Array[String]) => {
+ castFromString(partitionKeys(ordinal), dataType)
+ }
+ } else {
+ val ref = objectInspector.getAllStructFieldRefs
+ .find(_.getFieldName == a.name)
+ .getOrElse(sys.error(s"Can't find attribute $a"))
+ val fieldObjectInspector = ref.getFieldObjectInspector
+
+ val unwrapHiveData = fieldObjectInspector match {
+ case _: HiveVarcharObjectInspector =>
+ (value: Any) => value.asInstanceOf[HiveVarchar].getValue
+ case _: HiveDecimalObjectInspector =>
+ (value: Any) => BigDecimal(value.asInstanceOf[HiveDecimal].bigDecimalValue())
+ case _ =>
+ identity[Any] _
+ }
+
+ (row: Any, _: Array[String]) => {
+ val data = objectInspector.getStructFieldData(row, ref)
+ val hiveData = unwrapData(data, fieldObjectInspector)
+ if (hiveData != null) unwrapHiveData(hiveData) else null
+ }
+ }
+ }
+ }
+
+ private[this] def castFromString(value: String, dataType: DataType) = {
+ Cast(Literal(value), dataType).eval(null)
+ }
+
+ private def addColumnMetadataToConf(hiveConf: HiveConf) {
+ // Specifies IDs and internal names of columns to be scanned.
+ val neededColumnIDs = attributes.map(a => relation.output.indexWhere(_.name == a.name): Integer)
+ val columnInternalNames = neededColumnIDs.map(HiveConf.getColumnInternalName(_)).mkString(",")
+
+ if (attributes.size == relation.output.size) {
+ ColumnProjectionUtils.setFullyReadColumns(hiveConf)
+ } else {
+ ColumnProjectionUtils.appendReadColumnIDs(hiveConf, neededColumnIDs)
+ }
+
+ ColumnProjectionUtils.appendReadColumnNames(hiveConf, attributes.map(_.name))
+
+ // Specifies types and object inspectors of columns to be scanned.
+ val structOI = ObjectInspectorUtils
+ .getStandardObjectInspector(
+ relation.tableDesc.getDeserializer.getObjectInspector,
+ ObjectInspectorCopyOption.JAVA)
+ .asInstanceOf[StructObjectInspector]
+
+ val columnTypeNames = structOI
+ .getAllStructFieldRefs
+ .map(_.getFieldObjectInspector)
+ .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName)
+ .mkString(",")
+
+ hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames)
+ hiveConf.set(serdeConstants.LIST_COLUMNS, columnInternalNames)
+ }
+
+ addColumnMetadataToConf(context.hiveconf)
+
+ private def inputRdd = if (!relation.hiveQlTable.isPartitioned) {
+ hadoopReader.makeRDDForTable(relation.hiveQlTable)
+ } else {
+ hadoopReader.makeRDDForPartitionedTable(prunePartitions(relation.hiveQlPartitions))
+ }
+
+ /**
+ * Prunes partitions not involve the query plan.
+ *
+ * @param partitions All partitions of the relation.
+ * @return Partitions that are involved in the query plan.
+ */
+ private[hive] def prunePartitions(partitions: Seq[HivePartition]) = {
+ boundPruningPred match {
+ case None => partitions
+ case Some(shouldKeep) => partitions.filter { part =>
+ val dataTypes = relation.partitionKeys.map(_.dataType)
+ val castedValues = for ((value, dataType) <- part.getValues.zip(dataTypes)) yield {
+ castFromString(value, dataType)
+ }
+
+ // Only partitioned values are needed here, since the predicate has already been bound to
+ // partition key attribute references.
+ val row = new GenericRow(castedValues.toArray)
+ shouldKeep.eval(row).asInstanceOf[Boolean]
+ }
+ }
+ }
+
+ override def execute() = {
+ inputRdd.mapPartitions { iterator =>
+ if (iterator.isEmpty) {
+ Iterator.empty
+ } else {
+ val mutableRow = new GenericMutableRow(attributes.length)
+ val mutablePair = new MutablePair[Any, Array[String]]()
+ val buffered = iterator.buffered
+
+ // NOTE (lian): Critical path of Hive table scan, unnecessary FP style code and pattern
+ // matching are avoided intentionally.
+ val rowsAndPartitionKeys = buffered.head match {
+ // With partition keys
+ case _: Array[Any] =>
+ buffered.map { case array: Array[Any] =>
+ val deserializedRow = array(0)
+ val partitionKeys = array(1).asInstanceOf[Array[String]]
+ mutablePair.update(deserializedRow, partitionKeys)
+ }
+
+ // Without partition keys
+ case _ =>
+ val emptyPartitionKeys = Array.empty[String]
+ buffered.map { deserializedRow =>
+ mutablePair.update(deserializedRow, emptyPartitionKeys)
+ }
+ }
+
+ rowsAndPartitionKeys.map { pair =>
+ var i = 0
+ while (i < attributes.length) {
+ mutableRow(i) = attributeFunctions(i)(pair._1, pair._2)
+ i += 1
+ }
+ mutableRow: Row
+ }
+ }
+ }
+ }
+
+ override def output = attributes
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
new file mode 100644
index 0000000000..594a803806
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import scala.collection.JavaConversions._
+
+import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar}
+import org.apache.hadoop.hive.metastore.MetaStoreUtils
+import org.apache.hadoop.hive.ql.Context
+import org.apache.hadoop.hive.ql.metadata.Hive
+import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
+import org.apache.hadoop.hive.serde2.Serializer
+import org.apache.hadoop.hive.serde2.objectinspector._
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveDecimalObjectInspector
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveVarcharObjectInspector
+import org.apache.hadoop.io.Writable
+import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf}
+
+import org.apache.spark.{SparkException, TaskContext}
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.execution.{SparkPlan, UnaryNode}
+import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation, SparkHiveHadoopWriter}
+
+/**
+ * :: DeveloperApi ::
+ */
+@DeveloperApi
+case class InsertIntoHiveTable(
+ table: MetastoreRelation,
+ partition: Map[String, Option[String]],
+ child: SparkPlan,
+ overwrite: Boolean)
+ (@transient sc: HiveContext)
+ extends UnaryNode {
+
+ val outputClass = newSerializer(table.tableDesc).getSerializedClass
+ @transient private val hiveContext = new Context(sc.hiveconf)
+ @transient private val db = Hive.get(sc.hiveconf)
+
+ private def newSerializer(tableDesc: TableDesc): Serializer = {
+ val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
+ serializer.initialize(null, tableDesc.getProperties)
+ serializer
+ }
+
+ override def otherCopyArgs = sc :: Nil
+
+ def output = child.output
+
+ /**
+ * Wraps with Hive types based on object inspector.
+ * TODO: Consolidate all hive OI/data interface code.
+ */
+ protected def wrap(a: (Any, ObjectInspector)): Any = a match {
+ case (s: String, oi: JavaHiveVarcharObjectInspector) =>
+ new HiveVarchar(s, s.size)
+
+ case (bd: BigDecimal, oi: JavaHiveDecimalObjectInspector) =>
+ new HiveDecimal(bd.underlying())
+
+ case (row: Row, oi: StandardStructObjectInspector) =>
+ val struct = oi.create()
+ row.zip(oi.getAllStructFieldRefs: Seq[StructField]).foreach {
+ case (data, field) =>
+ oi.setStructFieldData(struct, field, wrap(data, field.getFieldObjectInspector))
+ }
+ struct
+
+ case (s: Seq[_], oi: ListObjectInspector) =>
+ val wrappedSeq = s.map(wrap(_, oi.getListElementObjectInspector))
+ seqAsJavaList(wrappedSeq)
+
+ case (obj, _) =>
+ obj
+ }
+
+ def saveAsHiveFile(
+ rdd: RDD[Writable],
+ valueClass: Class[_],
+ fileSinkConf: FileSinkDesc,
+ conf: JobConf,
+ isCompressed: Boolean) {
+ if (valueClass == null) {
+ throw new SparkException("Output value class not set")
+ }
+ conf.setOutputValueClass(valueClass)
+ if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null) {
+ throw new SparkException("Output format class not set")
+ }
+ // Doesn't work in Scala 2.9 due to what may be a generics bug
+ // TODO: Should we uncomment this for Scala 2.10?
+ // conf.setOutputFormat(outputFormatClass)
+ conf.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName)
+ if (isCompressed) {
+ // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
+ // and "mapred.output.compression.type" have no impact on ORC because it uses table properties
+ // to store compression information.
+ conf.set("mapred.output.compress", "true")
+ fileSinkConf.setCompressed(true)
+ fileSinkConf.setCompressCodec(conf.get("mapred.output.compression.codec"))
+ fileSinkConf.setCompressType(conf.get("mapred.output.compression.type"))
+ }
+ conf.setOutputCommitter(classOf[FileOutputCommitter])
+ FileOutputFormat.setOutputPath(
+ conf,
+ SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, conf))
+
+ logger.debug("Saving as hadoop file of type " + valueClass.getSimpleName)
+
+ val writer = new SparkHiveHadoopWriter(conf, fileSinkConf)
+ writer.preSetup()
+
+ def writeToFile(context: TaskContext, iter: Iterator[Writable]) {
+ // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
+ // around by taking a mod. We expect that no task will be attempted 2 billion times.
+ val attemptNumber = (context.attemptId % Int.MaxValue).toInt
+
+ writer.setup(context.stageId, context.partitionId, attemptNumber)
+ writer.open()
+
+ var count = 0
+ while(iter.hasNext) {
+ val record = iter.next()
+ count += 1
+ writer.write(record)
+ }
+
+ writer.close()
+ writer.commit()
+ }
+
+ sc.sparkContext.runJob(rdd, writeToFile _)
+ writer.commitJob()
+ }
+
+ override def execute() = result
+
+ /**
+ * Inserts all the rows in the table into Hive. Row objects are properly serialized with the
+ * `org.apache.hadoop.hive.serde2.SerDe` and the
+ * `org.apache.hadoop.mapred.OutputFormat` provided by the table definition.
+ *
+ * Note: this is run once and then kept to avoid double insertions.
+ */
+ private lazy val result: RDD[Row] = {
+ val childRdd = child.execute()
+ assert(childRdd != null)
+
+ // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer
+ // instances within the closure, since Serializer is not serializable while TableDesc is.
+ val tableDesc = table.tableDesc
+ val tableLocation = table.hiveQlTable.getDataLocation
+ val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation)
+ val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
+ val rdd = childRdd.mapPartitions { iter =>
+ val serializer = newSerializer(fileSinkConf.getTableInfo)
+ val standardOI = ObjectInspectorUtils
+ .getStandardObjectInspector(
+ fileSinkConf.getTableInfo.getDeserializer.getObjectInspector,
+ ObjectInspectorCopyOption.JAVA)
+ .asInstanceOf[StructObjectInspector]
+
+
+ val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray
+ val outputData = new Array[Any](fieldOIs.length)
+ iter.map { row =>
+ var i = 0
+ while (i < row.length) {
+ // Casts Strings to HiveVarchars when necessary.
+ outputData(i) = wrap(row(i), fieldOIs(i))
+ i += 1
+ }
+
+ serializer.serialize(outputData, standardOI)
+ }
+ }
+
+ // ORC stores compression information in table properties. While, there are other formats
+ // (e.g. RCFile) that rely on hadoop configurations to store compression information.
+ val jobConf = new JobConf(sc.hiveconf)
+ saveAsHiveFile(
+ rdd,
+ outputClass,
+ fileSinkConf,
+ jobConf,
+ sc.hiveconf.getBoolean("hive.exec.compress.output", false))
+
+ // TODO: Handle dynamic partitioning.
+ val outputPath = FileOutputFormat.getOutputPath(jobConf)
+ // Have to construct the format of dbname.tablename.
+ val qualifiedTableName = s"${table.databaseName}.${table.tableName}"
+ // TODO: Correctly set holdDDLTime.
+ // In most of the time, we should have holdDDLTime = false.
+ // holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint.
+ val holdDDLTime = false
+ if (partition.nonEmpty) {
+ val partitionSpec = partition.map {
+ case (key, Some(value)) => key -> value
+ case (key, None) => key -> "" // Should not reach here right now.
+ }
+ val partVals = MetaStoreUtils.getPvals(table.hiveQlTable.getPartCols, partitionSpec)
+ db.validatePartitionNameCharacters(partVals)
+ // inheritTableSpecs is set to true. It should be set to false for a IMPORT query
+ // which is currently considered as a Hive native command.
+ val inheritTableSpecs = true
+ // TODO: Correctly set isSkewedStoreAsSubdir.
+ val isSkewedStoreAsSubdir = false
+ db.loadPartition(
+ outputPath,
+ qualifiedTableName,
+ partitionSpec,
+ overwrite,
+ holdDDLTime,
+ inheritTableSpecs,
+ isSkewedStoreAsSubdir)
+ } else {
+ db.loadTable(
+ outputPath,
+ qualifiedTableName,
+ overwrite,
+ holdDDLTime)
+ }
+
+ // It would be nice to just return the childRdd unchanged so insert operations could be chained,
+ // however for now we return an empty list to simplify compatibility checks with hive, which
+ // does not return anything for insert operations.
+ // TODO: implement hive compatibility as rules.
+ sc.sparkContext.makeRDD(Nil, 1)
+ }
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala
new file mode 100644
index 0000000000..fe6031678f
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow, Row}
+import org.apache.spark.sql.execution.{Command, LeafNode}
+import org.apache.spark.sql.hive.HiveContext
+
+/**
+ * :: DeveloperApi ::
+ */
+@DeveloperApi
+case class NativeCommand(
+ sql: String, output: Seq[Attribute])(
+ @transient context: HiveContext)
+ extends LeafNode with Command {
+
+ override protected[sql] lazy val sideEffectResult: Seq[String] = context.runSqlHive(sql)
+
+ override def execute(): RDD[Row] = {
+ if (sideEffectResult.size == 0) {
+ context.emptyResult
+ } else {
+ val rows = sideEffectResult.map(r => new GenericRow(Array[Any](r)))
+ context.sparkContext.parallelize(rows, 1)
+ }
+ }
+
+ override def otherCopyArgs = context :: Nil
+}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/hiveOperators.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/hiveOperators.scala
deleted file mode 100644
index 2de2db28a7..0000000000
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/hiveOperators.scala
+++ /dev/null
@@ -1,524 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.hive.execution
-
-import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar}
-import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.metastore.MetaStoreUtils
-import org.apache.hadoop.hive.metastore.api.FieldSchema
-import org.apache.hadoop.hive.ql.Context
-import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Hive}
-import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatUtils
-import org.apache.hadoop.hive.ql.plan.{TableDesc, FileSinkDesc}
-import org.apache.hadoop.hive.serde.serdeConstants
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption
-import org.apache.hadoop.hive.serde2.objectinspector._
-import org.apache.hadoop.hive.serde2.objectinspector.primitive._
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
-import org.apache.hadoop.hive.serde2.{ColumnProjectionUtils, Serializer}
-import org.apache.hadoop.io.Writable
-import org.apache.hadoop.mapred._
-
-import org.apache.spark
-import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.types.{BooleanType, DataType}
-import org.apache.spark.sql.execution._
-import org.apache.spark.sql.hive._
-import org.apache.spark.util.MutablePair
-import org.apache.spark.{TaskContext, SparkException}
-
-/* Implicits */
-import scala.collection.JavaConversions._
-
-/**
- * :: DeveloperApi ::
- * The Hive table scan operator. Column and partition pruning are both handled.
- *
- * @param attributes Attributes to be fetched from the Hive table.
- * @param relation The Hive table be be scanned.
- * @param partitionPruningPred An optional partition pruning predicate for partitioned table.
- */
-@DeveloperApi
-case class HiveTableScan(
- attributes: Seq[Attribute],
- relation: MetastoreRelation,
- partitionPruningPred: Option[Expression])(
- @transient val context: HiveContext)
- extends LeafNode
- with HiveInspectors {
-
- require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned,
- "Partition pruning predicates only supported for partitioned tables.")
-
- // Bind all partition key attribute references in the partition pruning predicate for later
- // evaluation.
- private val boundPruningPred = partitionPruningPred.map { pred =>
- require(
- pred.dataType == BooleanType,
- s"Data type of predicate $pred must be BooleanType rather than ${pred.dataType}.")
-
- BindReferences.bindReference(pred, relation.partitionKeys)
- }
-
- @transient
- val hadoopReader = new HadoopTableReader(relation.tableDesc, context)
-
- /**
- * The hive object inspector for this table, which can be used to extract values from the
- * serialized row representation.
- */
- @transient
- lazy val objectInspector =
- relation.tableDesc.getDeserializer.getObjectInspector.asInstanceOf[StructObjectInspector]
-
- /**
- * Functions that extract the requested attributes from the hive output. Partitioned values are
- * casted from string to its declared data type.
- */
- @transient
- protected lazy val attributeFunctions: Seq[(Any, Array[String]) => Any] = {
- attributes.map { a =>
- val ordinal = relation.partitionKeys.indexOf(a)
- if (ordinal >= 0) {
- val dataType = relation.partitionKeys(ordinal).dataType
- (_: Any, partitionKeys: Array[String]) => {
- castFromString(partitionKeys(ordinal), dataType)
- }
- } else {
- val ref = objectInspector.getAllStructFieldRefs
- .find(_.getFieldName == a.name)
- .getOrElse(sys.error(s"Can't find attribute $a"))
- val fieldObjectInspector = ref.getFieldObjectInspector
-
- val unwrapHiveData = fieldObjectInspector match {
- case _: HiveVarcharObjectInspector =>
- (value: Any) => value.asInstanceOf[HiveVarchar].getValue
- case _: HiveDecimalObjectInspector =>
- (value: Any) => BigDecimal(value.asInstanceOf[HiveDecimal].bigDecimalValue())
- case _ =>
- identity[Any] _
- }
-
- (row: Any, _: Array[String]) => {
- val data = objectInspector.getStructFieldData(row, ref)
- val hiveData = unwrapData(data, fieldObjectInspector)
- if (hiveData != null) unwrapHiveData(hiveData) else null
- }
- }
- }
- }
-
- private def castFromString(value: String, dataType: DataType) = {
- Cast(Literal(value), dataType).eval(null)
- }
-
- private def addColumnMetadataToConf(hiveConf: HiveConf) {
- // Specifies IDs and internal names of columns to be scanned.
- val neededColumnIDs = attributes.map(a => relation.output.indexWhere(_.name == a.name): Integer)
- val columnInternalNames = neededColumnIDs.map(HiveConf.getColumnInternalName(_)).mkString(",")
-
- if (attributes.size == relation.output.size) {
- ColumnProjectionUtils.setFullyReadColumns(hiveConf)
- } else {
- ColumnProjectionUtils.appendReadColumnIDs(hiveConf, neededColumnIDs)
- }
-
- ColumnProjectionUtils.appendReadColumnNames(hiveConf, attributes.map(_.name))
-
- // Specifies types and object inspectors of columns to be scanned.
- val structOI = ObjectInspectorUtils
- .getStandardObjectInspector(
- relation.tableDesc.getDeserializer.getObjectInspector,
- ObjectInspectorCopyOption.JAVA)
- .asInstanceOf[StructObjectInspector]
-
- val columnTypeNames = structOI
- .getAllStructFieldRefs
- .map(_.getFieldObjectInspector)
- .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName)
- .mkString(",")
-
- hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames)
- hiveConf.set(serdeConstants.LIST_COLUMNS, columnInternalNames)
- }
-
- addColumnMetadataToConf(context.hiveconf)
-
- @transient
- def inputRdd = if (!relation.hiveQlTable.isPartitioned) {
- hadoopReader.makeRDDForTable(relation.hiveQlTable)
- } else {
- hadoopReader.makeRDDForPartitionedTable(prunePartitions(relation.hiveQlPartitions))
- }
-
- /**
- * Prunes partitions not involve the query plan.
- *
- * @param partitions All partitions of the relation.
- * @return Partitions that are involved in the query plan.
- */
- private[hive] def prunePartitions(partitions: Seq[HivePartition]) = {
- boundPruningPred match {
- case None => partitions
- case Some(shouldKeep) => partitions.filter { part =>
- val dataTypes = relation.partitionKeys.map(_.dataType)
- val castedValues = for ((value, dataType) <- part.getValues.zip(dataTypes)) yield {
- castFromString(value, dataType)
- }
-
- // Only partitioned values are needed here, since the predicate has already been bound to
- // partition key attribute references.
- val row = new GenericRow(castedValues.toArray)
- shouldKeep.eval(row).asInstanceOf[Boolean]
- }
- }
- }
-
- def execute() = {
- inputRdd.mapPartitions { iterator =>
- if (iterator.isEmpty) {
- Iterator.empty
- } else {
- val mutableRow = new GenericMutableRow(attributes.length)
- val mutablePair = new MutablePair[Any, Array[String]]()
- val buffered = iterator.buffered
-
- // NOTE (lian): Critical path of Hive table scan, unnecessary FP style code and pattern
- // matching are avoided intentionally.
- val rowsAndPartitionKeys = buffered.head match {
- // With partition keys
- case _: Array[Any] =>
- buffered.map { case array: Array[Any] =>
- val deserializedRow = array(0)
- val partitionKeys = array(1).asInstanceOf[Array[String]]
- mutablePair.update(deserializedRow, partitionKeys)
- }
-
- // Without partition keys
- case _ =>
- val emptyPartitionKeys = Array.empty[String]
- buffered.map { deserializedRow =>
- mutablePair.update(deserializedRow, emptyPartitionKeys)
- }
- }
-
- rowsAndPartitionKeys.map { pair =>
- var i = 0
- while (i < attributes.length) {
- mutableRow(i) = attributeFunctions(i)(pair._1, pair._2)
- i += 1
- }
- mutableRow: Row
- }
- }
- }
- }
-
- def output = attributes
-}
-
-/**
- * :: DeveloperApi ::
- */
-@DeveloperApi
-case class InsertIntoHiveTable(
- table: MetastoreRelation,
- partition: Map[String, Option[String]],
- child: SparkPlan,
- overwrite: Boolean)
- (@transient sc: HiveContext)
- extends UnaryNode {
-
- val outputClass = newSerializer(table.tableDesc).getSerializedClass
- @transient private val hiveContext = new Context(sc.hiveconf)
- @transient private val db = Hive.get(sc.hiveconf)
-
- private def newSerializer(tableDesc: TableDesc): Serializer = {
- val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
- serializer.initialize(null, tableDesc.getProperties)
- serializer
- }
-
- override def otherCopyArgs = sc :: Nil
-
- def output = child.output
-
- /**
- * Wraps with Hive types based on object inspector.
- * TODO: Consolidate all hive OI/data interface code.
- */
- protected def wrap(a: (Any, ObjectInspector)): Any = a match {
- case (s: String, oi: JavaHiveVarcharObjectInspector) =>
- new HiveVarchar(s, s.size)
-
- case (bd: BigDecimal, oi: JavaHiveDecimalObjectInspector) =>
- new HiveDecimal(bd.underlying())
-
- case (row: Row, oi: StandardStructObjectInspector) =>
- val struct = oi.create()
- row.zip(oi.getAllStructFieldRefs: Seq[StructField]).foreach {
- case (data, field) =>
- oi.setStructFieldData(struct, field, wrap(data, field.getFieldObjectInspector))
- }
- struct
-
- case (s: Seq[_], oi: ListObjectInspector) =>
- val wrappedSeq = s.map(wrap(_, oi.getListElementObjectInspector))
- seqAsJavaList(wrappedSeq)
-
- case (obj, _) =>
- obj
- }
-
- def saveAsHiveFile(
- rdd: RDD[Writable],
- valueClass: Class[_],
- fileSinkConf: FileSinkDesc,
- conf: JobConf,
- isCompressed: Boolean) {
- if (valueClass == null) {
- throw new SparkException("Output value class not set")
- }
- conf.setOutputValueClass(valueClass)
- if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null) {
- throw new SparkException("Output format class not set")
- }
- // Doesn't work in Scala 2.9 due to what may be a generics bug
- // TODO: Should we uncomment this for Scala 2.10?
- // conf.setOutputFormat(outputFormatClass)
- conf.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName)
- if (isCompressed) {
- // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec",
- // and "mapred.output.compression.type" have no impact on ORC because it uses table properties
- // to store compression information.
- conf.set("mapred.output.compress", "true")
- fileSinkConf.setCompressed(true)
- fileSinkConf.setCompressCodec(conf.get("mapred.output.compression.codec"))
- fileSinkConf.setCompressType(conf.get("mapred.output.compression.type"))
- }
- conf.setOutputCommitter(classOf[FileOutputCommitter])
- FileOutputFormat.setOutputPath(
- conf,
- SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, conf))
-
- logger.debug("Saving as hadoop file of type " + valueClass.getSimpleName)
-
- val writer = new SparkHiveHadoopWriter(conf, fileSinkConf)
- writer.preSetup()
-
- def writeToFile(context: TaskContext, iter: Iterator[Writable]) {
- // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
- // around by taking a mod. We expect that no task will be attempted 2 billion times.
- val attemptNumber = (context.attemptId % Int.MaxValue).toInt
-
- writer.setup(context.stageId, context.partitionId, attemptNumber)
- writer.open()
-
- var count = 0
- while(iter.hasNext) {
- val record = iter.next()
- count += 1
- writer.write(record)
- }
-
- writer.close()
- writer.commit()
- }
-
- sc.sparkContext.runJob(rdd, writeToFile _)
- writer.commitJob()
- }
-
- override def execute() = result
-
- /**
- * Inserts all the rows in the table into Hive. Row objects are properly serialized with the
- * `org.apache.hadoop.hive.serde2.SerDe` and the
- * `org.apache.hadoop.mapred.OutputFormat` provided by the table definition.
- *
- * Note: this is run once and then kept to avoid double insertions.
- */
- private lazy val result: RDD[Row] = {
- val childRdd = child.execute()
- assert(childRdd != null)
-
- // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer
- // instances within the closure, since Serializer is not serializable while TableDesc is.
- val tableDesc = table.tableDesc
- val tableLocation = table.hiveQlTable.getDataLocation
- val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation)
- val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false)
- val rdd = childRdd.mapPartitions { iter =>
- val serializer = newSerializer(fileSinkConf.getTableInfo)
- val standardOI = ObjectInspectorUtils
- .getStandardObjectInspector(
- fileSinkConf.getTableInfo.getDeserializer.getObjectInspector,
- ObjectInspectorCopyOption.JAVA)
- .asInstanceOf[StructObjectInspector]
-
-
- val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray
- val outputData = new Array[Any](fieldOIs.length)
- iter.map { row =>
- var i = 0
- while (i < row.length) {
- // Casts Strings to HiveVarchars when necessary.
- outputData(i) = wrap(row(i), fieldOIs(i))
- i += 1
- }
-
- serializer.serialize(outputData, standardOI)
- }
- }
-
- // ORC stores compression information in table properties. While, there are other formats
- // (e.g. RCFile) that rely on hadoop configurations to store compression information.
- val jobConf = new JobConf(sc.hiveconf)
- saveAsHiveFile(
- rdd,
- outputClass,
- fileSinkConf,
- jobConf,
- sc.hiveconf.getBoolean("hive.exec.compress.output", false))
-
- // TODO: Handle dynamic partitioning.
- val outputPath = FileOutputFormat.getOutputPath(jobConf)
- // Have to construct the format of dbname.tablename.
- val qualifiedTableName = s"${table.databaseName}.${table.tableName}"
- // TODO: Correctly set holdDDLTime.
- // In most of the time, we should have holdDDLTime = false.
- // holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint.
- val holdDDLTime = false
- if (partition.nonEmpty) {
- val partitionSpec = partition.map {
- case (key, Some(value)) => key -> value
- case (key, None) => key -> "" // Should not reach here right now.
- }
- val partVals = MetaStoreUtils.getPvals(table.hiveQlTable.getPartCols, partitionSpec)
- db.validatePartitionNameCharacters(partVals)
- // inheritTableSpecs is set to true. It should be set to false for a IMPORT query
- // which is currently considered as a Hive native command.
- val inheritTableSpecs = true
- // TODO: Correctly set isSkewedStoreAsSubdir.
- val isSkewedStoreAsSubdir = false
- db.loadPartition(
- outputPath,
- qualifiedTableName,
- partitionSpec,
- overwrite,
- holdDDLTime,
- inheritTableSpecs,
- isSkewedStoreAsSubdir)
- } else {
- db.loadTable(
- outputPath,
- qualifiedTableName,
- overwrite,
- holdDDLTime)
- }
-
- // It would be nice to just return the childRdd unchanged so insert operations could be chained,
- // however for now we return an empty list to simplify compatibility checks with hive, which
- // does not return anything for insert operations.
- // TODO: implement hive compatibility as rules.
- sc.sparkContext.makeRDD(Nil, 1)
- }
-}
-
-/**
- * :: DeveloperApi ::
- */
-@DeveloperApi
-case class NativeCommand(
- sql: String, output: Seq[Attribute])(
- @transient context: HiveContext)
- extends LeafNode with Command {
-
- override protected[sql] lazy val sideEffectResult: Seq[String] = context.runSqlHive(sql)
-
- override def execute(): RDD[spark.sql.Row] = {
- if (sideEffectResult.size == 0) {
- context.emptyResult
- } else {
- val rows = sideEffectResult.map(r => new GenericRow(Array[Any](r)))
- context.sparkContext.parallelize(rows, 1)
- }
- }
-
- override def otherCopyArgs = context :: Nil
-}
-
-/**
- * :: DeveloperApi ::
- */
-@DeveloperApi
-case class DescribeHiveTableCommand(
- table: MetastoreRelation,
- output: Seq[Attribute],
- isExtended: Boolean)(
- @transient context: HiveContext)
- extends LeafNode with Command {
-
- // Strings with the format like Hive. It is used for result comparison in our unit tests.
- lazy val hiveString: Seq[String] = {
- val alignment = 20
- val delim = "\t"
-
- sideEffectResult.map {
- case (name, dataType, comment) =>
- String.format("%-" + alignment + "s", name) + delim +
- String.format("%-" + alignment + "s", dataType) + delim +
- String.format("%-" + alignment + "s", Option(comment).getOrElse("None"))
- }
- }
-
- override protected[sql] lazy val sideEffectResult: Seq[(String, String, String)] = {
- // Trying to mimic the format of Hive's output. But not exactly the same.
- var results: Seq[(String, String, String)] = Nil
-
- val columns: Seq[FieldSchema] = table.hiveQlTable.getCols
- val partitionColumns: Seq[FieldSchema] = table.hiveQlTable.getPartCols
- results ++= columns.map(field => (field.getName, field.getType, field.getComment))
- if (!partitionColumns.isEmpty) {
- val partColumnInfo =
- partitionColumns.map(field => (field.getName, field.getType, field.getComment))
- results ++=
- partColumnInfo ++
- Seq(("# Partition Information", "", "")) ++
- Seq((s"# ${output.get(0).name}", output.get(1).name, output.get(2).name)) ++
- partColumnInfo
- }
-
- if (isExtended) {
- results ++= Seq(("Detailed Table Information", table.hiveQlTable.getTTable.toString, ""))
- }
-
- results
- }
-
- override def execute(): RDD[Row] = {
- val rows = sideEffectResult.map {
- case (name, dataType, comment) => new GenericRow(Array[Any](name, dataType, comment))
- }
- context.sparkContext.parallelize(rows, 1)
- }
-
- override def otherCopyArgs = context :: Nil
-}