From 4881fc62defee542df56be8f22f237b4dedd77fe Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Sat, 21 Jun 2014 12:04:18 -0700 Subject: [SQL] Break hiveOperators.scala into multiple files. The single file was getting very long (500+ loc). Author: Reynold Xin Closes #1166 from rxin/hiveOperators and squashes the following commits: 5b43068 [Reynold Xin] [SQL] Break hiveOperators.scala into multiple files. (cherry picked from commit ec935abce13b60f353236566da149c0c87bb1002) Signed-off-by: Michael Armbrust --- .../org/apache/spark/sql/hive/HiveStrategies.scala | 9 +- .../hive/execution/DescribeHiveTableCommand.scala | 88 ++++ .../spark/sql/hive/execution/HiveTableScan.scala | 223 +++++++++ .../sql/hive/execution/InsertIntoHiveTable.scala | 248 ++++++++++ .../spark/sql/hive/execution/NativeCommand.scala | 47 ++ .../spark/sql/hive/execution/hiveOperators.scala | 524 --------------------- 6 files changed, 610 insertions(+), 529 deletions(-) create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala create mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala delete mode 100644 sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/hiveOperators.scala (limited to 'sql/hive') diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index af7687b404..4d0fab4140 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -64,7 +64,6 @@ private[hive] trait HiveStrategies { val partitionKeyIds = relation.partitionKeys.map(_.exprId).toSet val (pruningPredicates, otherPredicates) = predicates.partition { _.references.map(_.exprId).subsetOf(partitionKeyIds) - } pruneFilterProject( @@ -81,16 +80,16 @@ private[hive] trait HiveStrategies { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.NativeCommand(sql) => NativeCommand(sql, plan.output)(context) :: Nil - case describe: logical.DescribeCommand => { + + case describe: logical.DescribeCommand => val resolvedTable = context.executePlan(describe.table).analyzed resolvedTable match { case t: MetastoreRelation => - Seq(DescribeHiveTableCommand( - t, describe.output, describe.isExtended)(context)) + Seq(DescribeHiveTableCommand(t, describe.output, describe.isExtended)(context)) case o: LogicalPlan => Seq(DescribeCommand(planLater(o), describe.output)(context)) } - } + case _ => Nil } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala new file mode 100644 index 0000000000..a40e89e0d3 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DescribeHiveTableCommand.scala @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import scala.collection.JavaConversions._ + +import org.apache.hadoop.hive.metastore.api.FieldSchema + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow, Row} +import org.apache.spark.sql.execution.{Command, LeafNode} +import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation} + +/** + * Implementation for "describe [extended] table". + * + * :: DeveloperApi :: + */ +@DeveloperApi +case class DescribeHiveTableCommand( + table: MetastoreRelation, + output: Seq[Attribute], + isExtended: Boolean)( + @transient context: HiveContext) + extends LeafNode with Command { + + // Strings with the format like Hive. It is used for result comparison in our unit tests. + lazy val hiveString: Seq[String] = { + val alignment = 20 + val delim = "\t" + + sideEffectResult.map { + case (name, dataType, comment) => + String.format("%-" + alignment + "s", name) + delim + + String.format("%-" + alignment + "s", dataType) + delim + + String.format("%-" + alignment + "s", Option(comment).getOrElse("None")) + } + } + + override protected[sql] lazy val sideEffectResult: Seq[(String, String, String)] = { + // Trying to mimic the format of Hive's output. But not exactly the same. + var results: Seq[(String, String, String)] = Nil + + val columns: Seq[FieldSchema] = table.hiveQlTable.getCols + val partitionColumns: Seq[FieldSchema] = table.hiveQlTable.getPartCols + results ++= columns.map(field => (field.getName, field.getType, field.getComment)) + if (!partitionColumns.isEmpty) { + val partColumnInfo = + partitionColumns.map(field => (field.getName, field.getType, field.getComment)) + results ++= + partColumnInfo ++ + Seq(("# Partition Information", "", "")) ++ + Seq((s"# ${output.get(0).name}", output.get(1).name, output.get(2).name)) ++ + partColumnInfo + } + + if (isExtended) { + results ++= Seq(("Detailed Table Information", table.hiveQlTable.getTTable.toString, "")) + } + + results + } + + override def execute(): RDD[Row] = { + val rows = sideEffectResult.map { + case (name, dataType, comment) => new GenericRow(Array[Any](name, dataType, comment)) + } + context.sparkContext.parallelize(rows, 1) + } + + override def otherCopyArgs = context :: Nil +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala new file mode 100644 index 0000000000..ef8bae7453 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala @@ -0,0 +1,223 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import scala.collection.JavaConversions._ + +import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar} +import org.apache.hadoop.hive.conf.HiveConf +import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition} +import org.apache.hadoop.hive.serde.serdeConstants +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils +import org.apache.hadoop.hive.serde2.objectinspector._ +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption +import org.apache.hadoop.hive.serde2.objectinspector.primitive._ +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types.{BooleanType, DataType} +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.hive._ +import org.apache.spark.util.MutablePair + +/** + * :: DeveloperApi :: + * The Hive table scan operator. Column and partition pruning are both handled. + * + * @param attributes Attributes to be fetched from the Hive table. + * @param relation The Hive table be be scanned. + * @param partitionPruningPred An optional partition pruning predicate for partitioned table. + */ +@DeveloperApi +case class HiveTableScan( + attributes: Seq[Attribute], + relation: MetastoreRelation, + partitionPruningPred: Option[Expression])( + @transient val context: HiveContext) + extends LeafNode + with HiveInspectors { + + require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned, + "Partition pruning predicates only supported for partitioned tables.") + + // Bind all partition key attribute references in the partition pruning predicate for later + // evaluation. + private[this] val boundPruningPred = partitionPruningPred.map { pred => + require( + pred.dataType == BooleanType, + s"Data type of predicate $pred must be BooleanType rather than ${pred.dataType}.") + + BindReferences.bindReference(pred, relation.partitionKeys) + } + + @transient + private[this] val hadoopReader = new HadoopTableReader(relation.tableDesc, context) + + /** + * The hive object inspector for this table, which can be used to extract values from the + * serialized row representation. + */ + @transient + private[this] lazy val objectInspector = + relation.tableDesc.getDeserializer.getObjectInspector.asInstanceOf[StructObjectInspector] + + /** + * Functions that extract the requested attributes from the hive output. Partitioned values are + * casted from string to its declared data type. + */ + @transient + protected lazy val attributeFunctions: Seq[(Any, Array[String]) => Any] = { + attributes.map { a => + val ordinal = relation.partitionKeys.indexOf(a) + if (ordinal >= 0) { + val dataType = relation.partitionKeys(ordinal).dataType + (_: Any, partitionKeys: Array[String]) => { + castFromString(partitionKeys(ordinal), dataType) + } + } else { + val ref = objectInspector.getAllStructFieldRefs + .find(_.getFieldName == a.name) + .getOrElse(sys.error(s"Can't find attribute $a")) + val fieldObjectInspector = ref.getFieldObjectInspector + + val unwrapHiveData = fieldObjectInspector match { + case _: HiveVarcharObjectInspector => + (value: Any) => value.asInstanceOf[HiveVarchar].getValue + case _: HiveDecimalObjectInspector => + (value: Any) => BigDecimal(value.asInstanceOf[HiveDecimal].bigDecimalValue()) + case _ => + identity[Any] _ + } + + (row: Any, _: Array[String]) => { + val data = objectInspector.getStructFieldData(row, ref) + val hiveData = unwrapData(data, fieldObjectInspector) + if (hiveData != null) unwrapHiveData(hiveData) else null + } + } + } + } + + private[this] def castFromString(value: String, dataType: DataType) = { + Cast(Literal(value), dataType).eval(null) + } + + private def addColumnMetadataToConf(hiveConf: HiveConf) { + // Specifies IDs and internal names of columns to be scanned. + val neededColumnIDs = attributes.map(a => relation.output.indexWhere(_.name == a.name): Integer) + val columnInternalNames = neededColumnIDs.map(HiveConf.getColumnInternalName(_)).mkString(",") + + if (attributes.size == relation.output.size) { + ColumnProjectionUtils.setFullyReadColumns(hiveConf) + } else { + ColumnProjectionUtils.appendReadColumnIDs(hiveConf, neededColumnIDs) + } + + ColumnProjectionUtils.appendReadColumnNames(hiveConf, attributes.map(_.name)) + + // Specifies types and object inspectors of columns to be scanned. + val structOI = ObjectInspectorUtils + .getStandardObjectInspector( + relation.tableDesc.getDeserializer.getObjectInspector, + ObjectInspectorCopyOption.JAVA) + .asInstanceOf[StructObjectInspector] + + val columnTypeNames = structOI + .getAllStructFieldRefs + .map(_.getFieldObjectInspector) + .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName) + .mkString(",") + + hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames) + hiveConf.set(serdeConstants.LIST_COLUMNS, columnInternalNames) + } + + addColumnMetadataToConf(context.hiveconf) + + private def inputRdd = if (!relation.hiveQlTable.isPartitioned) { + hadoopReader.makeRDDForTable(relation.hiveQlTable) + } else { + hadoopReader.makeRDDForPartitionedTable(prunePartitions(relation.hiveQlPartitions)) + } + + /** + * Prunes partitions not involve the query plan. + * + * @param partitions All partitions of the relation. + * @return Partitions that are involved in the query plan. + */ + private[hive] def prunePartitions(partitions: Seq[HivePartition]) = { + boundPruningPred match { + case None => partitions + case Some(shouldKeep) => partitions.filter { part => + val dataTypes = relation.partitionKeys.map(_.dataType) + val castedValues = for ((value, dataType) <- part.getValues.zip(dataTypes)) yield { + castFromString(value, dataType) + } + + // Only partitioned values are needed here, since the predicate has already been bound to + // partition key attribute references. + val row = new GenericRow(castedValues.toArray) + shouldKeep.eval(row).asInstanceOf[Boolean] + } + } + } + + override def execute() = { + inputRdd.mapPartitions { iterator => + if (iterator.isEmpty) { + Iterator.empty + } else { + val mutableRow = new GenericMutableRow(attributes.length) + val mutablePair = new MutablePair[Any, Array[String]]() + val buffered = iterator.buffered + + // NOTE (lian): Critical path of Hive table scan, unnecessary FP style code and pattern + // matching are avoided intentionally. + val rowsAndPartitionKeys = buffered.head match { + // With partition keys + case _: Array[Any] => + buffered.map { case array: Array[Any] => + val deserializedRow = array(0) + val partitionKeys = array(1).asInstanceOf[Array[String]] + mutablePair.update(deserializedRow, partitionKeys) + } + + // Without partition keys + case _ => + val emptyPartitionKeys = Array.empty[String] + buffered.map { deserializedRow => + mutablePair.update(deserializedRow, emptyPartitionKeys) + } + } + + rowsAndPartitionKeys.map { pair => + var i = 0 + while (i < attributes.length) { + mutableRow(i) = attributeFunctions(i)(pair._1, pair._2) + i += 1 + } + mutableRow: Row + } + } + } + } + + override def output = attributes +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala new file mode 100644 index 0000000000..594a803806 --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import scala.collection.JavaConversions._ + +import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar} +import org.apache.hadoop.hive.metastore.MetaStoreUtils +import org.apache.hadoop.hive.ql.Context +import org.apache.hadoop.hive.ql.metadata.Hive +import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc} +import org.apache.hadoop.hive.serde2.Serializer +import org.apache.hadoop.hive.serde2.objectinspector._ +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption +import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveDecimalObjectInspector +import org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaHiveVarcharObjectInspector +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf} + +import org.apache.spark.{SparkException, TaskContext} +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.Row +import org.apache.spark.sql.execution.{SparkPlan, UnaryNode} +import org.apache.spark.sql.hive.{HiveContext, MetastoreRelation, SparkHiveHadoopWriter} + +/** + * :: DeveloperApi :: + */ +@DeveloperApi +case class InsertIntoHiveTable( + table: MetastoreRelation, + partition: Map[String, Option[String]], + child: SparkPlan, + overwrite: Boolean) + (@transient sc: HiveContext) + extends UnaryNode { + + val outputClass = newSerializer(table.tableDesc).getSerializedClass + @transient private val hiveContext = new Context(sc.hiveconf) + @transient private val db = Hive.get(sc.hiveconf) + + private def newSerializer(tableDesc: TableDesc): Serializer = { + val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer] + serializer.initialize(null, tableDesc.getProperties) + serializer + } + + override def otherCopyArgs = sc :: Nil + + def output = child.output + + /** + * Wraps with Hive types based on object inspector. + * TODO: Consolidate all hive OI/data interface code. + */ + protected def wrap(a: (Any, ObjectInspector)): Any = a match { + case (s: String, oi: JavaHiveVarcharObjectInspector) => + new HiveVarchar(s, s.size) + + case (bd: BigDecimal, oi: JavaHiveDecimalObjectInspector) => + new HiveDecimal(bd.underlying()) + + case (row: Row, oi: StandardStructObjectInspector) => + val struct = oi.create() + row.zip(oi.getAllStructFieldRefs: Seq[StructField]).foreach { + case (data, field) => + oi.setStructFieldData(struct, field, wrap(data, field.getFieldObjectInspector)) + } + struct + + case (s: Seq[_], oi: ListObjectInspector) => + val wrappedSeq = s.map(wrap(_, oi.getListElementObjectInspector)) + seqAsJavaList(wrappedSeq) + + case (obj, _) => + obj + } + + def saveAsHiveFile( + rdd: RDD[Writable], + valueClass: Class[_], + fileSinkConf: FileSinkDesc, + conf: JobConf, + isCompressed: Boolean) { + if (valueClass == null) { + throw new SparkException("Output value class not set") + } + conf.setOutputValueClass(valueClass) + if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null) { + throw new SparkException("Output format class not set") + } + // Doesn't work in Scala 2.9 due to what may be a generics bug + // TODO: Should we uncomment this for Scala 2.10? + // conf.setOutputFormat(outputFormatClass) + conf.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName) + if (isCompressed) { + // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec", + // and "mapred.output.compression.type" have no impact on ORC because it uses table properties + // to store compression information. + conf.set("mapred.output.compress", "true") + fileSinkConf.setCompressed(true) + fileSinkConf.setCompressCodec(conf.get("mapred.output.compression.codec")) + fileSinkConf.setCompressType(conf.get("mapred.output.compression.type")) + } + conf.setOutputCommitter(classOf[FileOutputCommitter]) + FileOutputFormat.setOutputPath( + conf, + SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, conf)) + + logger.debug("Saving as hadoop file of type " + valueClass.getSimpleName) + + val writer = new SparkHiveHadoopWriter(conf, fileSinkConf) + writer.preSetup() + + def writeToFile(context: TaskContext, iter: Iterator[Writable]) { + // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it + // around by taking a mod. We expect that no task will be attempted 2 billion times. + val attemptNumber = (context.attemptId % Int.MaxValue).toInt + + writer.setup(context.stageId, context.partitionId, attemptNumber) + writer.open() + + var count = 0 + while(iter.hasNext) { + val record = iter.next() + count += 1 + writer.write(record) + } + + writer.close() + writer.commit() + } + + sc.sparkContext.runJob(rdd, writeToFile _) + writer.commitJob() + } + + override def execute() = result + + /** + * Inserts all the rows in the table into Hive. Row objects are properly serialized with the + * `org.apache.hadoop.hive.serde2.SerDe` and the + * `org.apache.hadoop.mapred.OutputFormat` provided by the table definition. + * + * Note: this is run once and then kept to avoid double insertions. + */ + private lazy val result: RDD[Row] = { + val childRdd = child.execute() + assert(childRdd != null) + + // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer + // instances within the closure, since Serializer is not serializable while TableDesc is. + val tableDesc = table.tableDesc + val tableLocation = table.hiveQlTable.getDataLocation + val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation) + val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) + val rdd = childRdd.mapPartitions { iter => + val serializer = newSerializer(fileSinkConf.getTableInfo) + val standardOI = ObjectInspectorUtils + .getStandardObjectInspector( + fileSinkConf.getTableInfo.getDeserializer.getObjectInspector, + ObjectInspectorCopyOption.JAVA) + .asInstanceOf[StructObjectInspector] + + + val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray + val outputData = new Array[Any](fieldOIs.length) + iter.map { row => + var i = 0 + while (i < row.length) { + // Casts Strings to HiveVarchars when necessary. + outputData(i) = wrap(row(i), fieldOIs(i)) + i += 1 + } + + serializer.serialize(outputData, standardOI) + } + } + + // ORC stores compression information in table properties. While, there are other formats + // (e.g. RCFile) that rely on hadoop configurations to store compression information. + val jobConf = new JobConf(sc.hiveconf) + saveAsHiveFile( + rdd, + outputClass, + fileSinkConf, + jobConf, + sc.hiveconf.getBoolean("hive.exec.compress.output", false)) + + // TODO: Handle dynamic partitioning. + val outputPath = FileOutputFormat.getOutputPath(jobConf) + // Have to construct the format of dbname.tablename. + val qualifiedTableName = s"${table.databaseName}.${table.tableName}" + // TODO: Correctly set holdDDLTime. + // In most of the time, we should have holdDDLTime = false. + // holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint. + val holdDDLTime = false + if (partition.nonEmpty) { + val partitionSpec = partition.map { + case (key, Some(value)) => key -> value + case (key, None) => key -> "" // Should not reach here right now. + } + val partVals = MetaStoreUtils.getPvals(table.hiveQlTable.getPartCols, partitionSpec) + db.validatePartitionNameCharacters(partVals) + // inheritTableSpecs is set to true. It should be set to false for a IMPORT query + // which is currently considered as a Hive native command. + val inheritTableSpecs = true + // TODO: Correctly set isSkewedStoreAsSubdir. + val isSkewedStoreAsSubdir = false + db.loadPartition( + outputPath, + qualifiedTableName, + partitionSpec, + overwrite, + holdDDLTime, + inheritTableSpecs, + isSkewedStoreAsSubdir) + } else { + db.loadTable( + outputPath, + qualifiedTableName, + overwrite, + holdDDLTime) + } + + // It would be nice to just return the childRdd unchanged so insert operations could be chained, + // however for now we return an empty list to simplify compatibility checks with hive, which + // does not return anything for insert operations. + // TODO: implement hive compatibility as rules. + sc.sparkContext.makeRDD(Nil, 1) + } +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala new file mode 100644 index 0000000000..fe6031678f --- /dev/null +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/NativeCommand.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.hive.execution + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRow, Row} +import org.apache.spark.sql.execution.{Command, LeafNode} +import org.apache.spark.sql.hive.HiveContext + +/** + * :: DeveloperApi :: + */ +@DeveloperApi +case class NativeCommand( + sql: String, output: Seq[Attribute])( + @transient context: HiveContext) + extends LeafNode with Command { + + override protected[sql] lazy val sideEffectResult: Seq[String] = context.runSqlHive(sql) + + override def execute(): RDD[Row] = { + if (sideEffectResult.size == 0) { + context.emptyResult + } else { + val rows = sideEffectResult.map(r => new GenericRow(Array[Any](r))) + context.sparkContext.parallelize(rows, 1) + } + } + + override def otherCopyArgs = context :: Nil +} diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/hiveOperators.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/hiveOperators.scala deleted file mode 100644 index 2de2db28a7..0000000000 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/hiveOperators.scala +++ /dev/null @@ -1,524 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive.execution - -import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar} -import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hadoop.hive.metastore.MetaStoreUtils -import org.apache.hadoop.hive.metastore.api.FieldSchema -import org.apache.hadoop.hive.ql.Context -import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Hive} -import org.apache.hadoop.hive.ql.metadata.formatting.MetaDataFormatUtils -import org.apache.hadoop.hive.ql.plan.{TableDesc, FileSinkDesc} -import org.apache.hadoop.hive.serde.serdeConstants -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption -import org.apache.hadoop.hive.serde2.objectinspector._ -import org.apache.hadoop.hive.serde2.objectinspector.primitive._ -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils -import org.apache.hadoop.hive.serde2.{ColumnProjectionUtils, Serializer} -import org.apache.hadoop.io.Writable -import org.apache.hadoop.mapred._ - -import org.apache.spark -import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.rdd.RDD -import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.types.{BooleanType, DataType} -import org.apache.spark.sql.execution._ -import org.apache.spark.sql.hive._ -import org.apache.spark.util.MutablePair -import org.apache.spark.{TaskContext, SparkException} - -/* Implicits */ -import scala.collection.JavaConversions._ - -/** - * :: DeveloperApi :: - * The Hive table scan operator. Column and partition pruning are both handled. - * - * @param attributes Attributes to be fetched from the Hive table. - * @param relation The Hive table be be scanned. - * @param partitionPruningPred An optional partition pruning predicate for partitioned table. - */ -@DeveloperApi -case class HiveTableScan( - attributes: Seq[Attribute], - relation: MetastoreRelation, - partitionPruningPred: Option[Expression])( - @transient val context: HiveContext) - extends LeafNode - with HiveInspectors { - - require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned, - "Partition pruning predicates only supported for partitioned tables.") - - // Bind all partition key attribute references in the partition pruning predicate for later - // evaluation. - private val boundPruningPred = partitionPruningPred.map { pred => - require( - pred.dataType == BooleanType, - s"Data type of predicate $pred must be BooleanType rather than ${pred.dataType}.") - - BindReferences.bindReference(pred, relation.partitionKeys) - } - - @transient - val hadoopReader = new HadoopTableReader(relation.tableDesc, context) - - /** - * The hive object inspector for this table, which can be used to extract values from the - * serialized row representation. - */ - @transient - lazy val objectInspector = - relation.tableDesc.getDeserializer.getObjectInspector.asInstanceOf[StructObjectInspector] - - /** - * Functions that extract the requested attributes from the hive output. Partitioned values are - * casted from string to its declared data type. - */ - @transient - protected lazy val attributeFunctions: Seq[(Any, Array[String]) => Any] = { - attributes.map { a => - val ordinal = relation.partitionKeys.indexOf(a) - if (ordinal >= 0) { - val dataType = relation.partitionKeys(ordinal).dataType - (_: Any, partitionKeys: Array[String]) => { - castFromString(partitionKeys(ordinal), dataType) - } - } else { - val ref = objectInspector.getAllStructFieldRefs - .find(_.getFieldName == a.name) - .getOrElse(sys.error(s"Can't find attribute $a")) - val fieldObjectInspector = ref.getFieldObjectInspector - - val unwrapHiveData = fieldObjectInspector match { - case _: HiveVarcharObjectInspector => - (value: Any) => value.asInstanceOf[HiveVarchar].getValue - case _: HiveDecimalObjectInspector => - (value: Any) => BigDecimal(value.asInstanceOf[HiveDecimal].bigDecimalValue()) - case _ => - identity[Any] _ - } - - (row: Any, _: Array[String]) => { - val data = objectInspector.getStructFieldData(row, ref) - val hiveData = unwrapData(data, fieldObjectInspector) - if (hiveData != null) unwrapHiveData(hiveData) else null - } - } - } - } - - private def castFromString(value: String, dataType: DataType) = { - Cast(Literal(value), dataType).eval(null) - } - - private def addColumnMetadataToConf(hiveConf: HiveConf) { - // Specifies IDs and internal names of columns to be scanned. - val neededColumnIDs = attributes.map(a => relation.output.indexWhere(_.name == a.name): Integer) - val columnInternalNames = neededColumnIDs.map(HiveConf.getColumnInternalName(_)).mkString(",") - - if (attributes.size == relation.output.size) { - ColumnProjectionUtils.setFullyReadColumns(hiveConf) - } else { - ColumnProjectionUtils.appendReadColumnIDs(hiveConf, neededColumnIDs) - } - - ColumnProjectionUtils.appendReadColumnNames(hiveConf, attributes.map(_.name)) - - // Specifies types and object inspectors of columns to be scanned. - val structOI = ObjectInspectorUtils - .getStandardObjectInspector( - relation.tableDesc.getDeserializer.getObjectInspector, - ObjectInspectorCopyOption.JAVA) - .asInstanceOf[StructObjectInspector] - - val columnTypeNames = structOI - .getAllStructFieldRefs - .map(_.getFieldObjectInspector) - .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName) - .mkString(",") - - hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames) - hiveConf.set(serdeConstants.LIST_COLUMNS, columnInternalNames) - } - - addColumnMetadataToConf(context.hiveconf) - - @transient - def inputRdd = if (!relation.hiveQlTable.isPartitioned) { - hadoopReader.makeRDDForTable(relation.hiveQlTable) - } else { - hadoopReader.makeRDDForPartitionedTable(prunePartitions(relation.hiveQlPartitions)) - } - - /** - * Prunes partitions not involve the query plan. - * - * @param partitions All partitions of the relation. - * @return Partitions that are involved in the query plan. - */ - private[hive] def prunePartitions(partitions: Seq[HivePartition]) = { - boundPruningPred match { - case None => partitions - case Some(shouldKeep) => partitions.filter { part => - val dataTypes = relation.partitionKeys.map(_.dataType) - val castedValues = for ((value, dataType) <- part.getValues.zip(dataTypes)) yield { - castFromString(value, dataType) - } - - // Only partitioned values are needed here, since the predicate has already been bound to - // partition key attribute references. - val row = new GenericRow(castedValues.toArray) - shouldKeep.eval(row).asInstanceOf[Boolean] - } - } - } - - def execute() = { - inputRdd.mapPartitions { iterator => - if (iterator.isEmpty) { - Iterator.empty - } else { - val mutableRow = new GenericMutableRow(attributes.length) - val mutablePair = new MutablePair[Any, Array[String]]() - val buffered = iterator.buffered - - // NOTE (lian): Critical path of Hive table scan, unnecessary FP style code and pattern - // matching are avoided intentionally. - val rowsAndPartitionKeys = buffered.head match { - // With partition keys - case _: Array[Any] => - buffered.map { case array: Array[Any] => - val deserializedRow = array(0) - val partitionKeys = array(1).asInstanceOf[Array[String]] - mutablePair.update(deserializedRow, partitionKeys) - } - - // Without partition keys - case _ => - val emptyPartitionKeys = Array.empty[String] - buffered.map { deserializedRow => - mutablePair.update(deserializedRow, emptyPartitionKeys) - } - } - - rowsAndPartitionKeys.map { pair => - var i = 0 - while (i < attributes.length) { - mutableRow(i) = attributeFunctions(i)(pair._1, pair._2) - i += 1 - } - mutableRow: Row - } - } - } - } - - def output = attributes -} - -/** - * :: DeveloperApi :: - */ -@DeveloperApi -case class InsertIntoHiveTable( - table: MetastoreRelation, - partition: Map[String, Option[String]], - child: SparkPlan, - overwrite: Boolean) - (@transient sc: HiveContext) - extends UnaryNode { - - val outputClass = newSerializer(table.tableDesc).getSerializedClass - @transient private val hiveContext = new Context(sc.hiveconf) - @transient private val db = Hive.get(sc.hiveconf) - - private def newSerializer(tableDesc: TableDesc): Serializer = { - val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer] - serializer.initialize(null, tableDesc.getProperties) - serializer - } - - override def otherCopyArgs = sc :: Nil - - def output = child.output - - /** - * Wraps with Hive types based on object inspector. - * TODO: Consolidate all hive OI/data interface code. - */ - protected def wrap(a: (Any, ObjectInspector)): Any = a match { - case (s: String, oi: JavaHiveVarcharObjectInspector) => - new HiveVarchar(s, s.size) - - case (bd: BigDecimal, oi: JavaHiveDecimalObjectInspector) => - new HiveDecimal(bd.underlying()) - - case (row: Row, oi: StandardStructObjectInspector) => - val struct = oi.create() - row.zip(oi.getAllStructFieldRefs: Seq[StructField]).foreach { - case (data, field) => - oi.setStructFieldData(struct, field, wrap(data, field.getFieldObjectInspector)) - } - struct - - case (s: Seq[_], oi: ListObjectInspector) => - val wrappedSeq = s.map(wrap(_, oi.getListElementObjectInspector)) - seqAsJavaList(wrappedSeq) - - case (obj, _) => - obj - } - - def saveAsHiveFile( - rdd: RDD[Writable], - valueClass: Class[_], - fileSinkConf: FileSinkDesc, - conf: JobConf, - isCompressed: Boolean) { - if (valueClass == null) { - throw new SparkException("Output value class not set") - } - conf.setOutputValueClass(valueClass) - if (fileSinkConf.getTableInfo.getOutputFileFormatClassName == null) { - throw new SparkException("Output format class not set") - } - // Doesn't work in Scala 2.9 due to what may be a generics bug - // TODO: Should we uncomment this for Scala 2.10? - // conf.setOutputFormat(outputFormatClass) - conf.set("mapred.output.format.class", fileSinkConf.getTableInfo.getOutputFileFormatClassName) - if (isCompressed) { - // Please note that isCompressed, "mapred.output.compress", "mapred.output.compression.codec", - // and "mapred.output.compression.type" have no impact on ORC because it uses table properties - // to store compression information. - conf.set("mapred.output.compress", "true") - fileSinkConf.setCompressed(true) - fileSinkConf.setCompressCodec(conf.get("mapred.output.compression.codec")) - fileSinkConf.setCompressType(conf.get("mapred.output.compression.type")) - } - conf.setOutputCommitter(classOf[FileOutputCommitter]) - FileOutputFormat.setOutputPath( - conf, - SparkHiveHadoopWriter.createPathFromString(fileSinkConf.getDirName, conf)) - - logger.debug("Saving as hadoop file of type " + valueClass.getSimpleName) - - val writer = new SparkHiveHadoopWriter(conf, fileSinkConf) - writer.preSetup() - - def writeToFile(context: TaskContext, iter: Iterator[Writable]) { - // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it - // around by taking a mod. We expect that no task will be attempted 2 billion times. - val attemptNumber = (context.attemptId % Int.MaxValue).toInt - - writer.setup(context.stageId, context.partitionId, attemptNumber) - writer.open() - - var count = 0 - while(iter.hasNext) { - val record = iter.next() - count += 1 - writer.write(record) - } - - writer.close() - writer.commit() - } - - sc.sparkContext.runJob(rdd, writeToFile _) - writer.commitJob() - } - - override def execute() = result - - /** - * Inserts all the rows in the table into Hive. Row objects are properly serialized with the - * `org.apache.hadoop.hive.serde2.SerDe` and the - * `org.apache.hadoop.mapred.OutputFormat` provided by the table definition. - * - * Note: this is run once and then kept to avoid double insertions. - */ - private lazy val result: RDD[Row] = { - val childRdd = child.execute() - assert(childRdd != null) - - // Have to pass the TableDesc object to RDD.mapPartitions and then instantiate new serializer - // instances within the closure, since Serializer is not serializable while TableDesc is. - val tableDesc = table.tableDesc - val tableLocation = table.hiveQlTable.getDataLocation - val tmpLocation = hiveContext.getExternalTmpFileURI(tableLocation) - val fileSinkConf = new FileSinkDesc(tmpLocation.toString, tableDesc, false) - val rdd = childRdd.mapPartitions { iter => - val serializer = newSerializer(fileSinkConf.getTableInfo) - val standardOI = ObjectInspectorUtils - .getStandardObjectInspector( - fileSinkConf.getTableInfo.getDeserializer.getObjectInspector, - ObjectInspectorCopyOption.JAVA) - .asInstanceOf[StructObjectInspector] - - - val fieldOIs = standardOI.getAllStructFieldRefs.map(_.getFieldObjectInspector).toArray - val outputData = new Array[Any](fieldOIs.length) - iter.map { row => - var i = 0 - while (i < row.length) { - // Casts Strings to HiveVarchars when necessary. - outputData(i) = wrap(row(i), fieldOIs(i)) - i += 1 - } - - serializer.serialize(outputData, standardOI) - } - } - - // ORC stores compression information in table properties. While, there are other formats - // (e.g. RCFile) that rely on hadoop configurations to store compression information. - val jobConf = new JobConf(sc.hiveconf) - saveAsHiveFile( - rdd, - outputClass, - fileSinkConf, - jobConf, - sc.hiveconf.getBoolean("hive.exec.compress.output", false)) - - // TODO: Handle dynamic partitioning. - val outputPath = FileOutputFormat.getOutputPath(jobConf) - // Have to construct the format of dbname.tablename. - val qualifiedTableName = s"${table.databaseName}.${table.tableName}" - // TODO: Correctly set holdDDLTime. - // In most of the time, we should have holdDDLTime = false. - // holdDDLTime will be true when TOK_HOLD_DDLTIME presents in the query as a hint. - val holdDDLTime = false - if (partition.nonEmpty) { - val partitionSpec = partition.map { - case (key, Some(value)) => key -> value - case (key, None) => key -> "" // Should not reach here right now. - } - val partVals = MetaStoreUtils.getPvals(table.hiveQlTable.getPartCols, partitionSpec) - db.validatePartitionNameCharacters(partVals) - // inheritTableSpecs is set to true. It should be set to false for a IMPORT query - // which is currently considered as a Hive native command. - val inheritTableSpecs = true - // TODO: Correctly set isSkewedStoreAsSubdir. - val isSkewedStoreAsSubdir = false - db.loadPartition( - outputPath, - qualifiedTableName, - partitionSpec, - overwrite, - holdDDLTime, - inheritTableSpecs, - isSkewedStoreAsSubdir) - } else { - db.loadTable( - outputPath, - qualifiedTableName, - overwrite, - holdDDLTime) - } - - // It would be nice to just return the childRdd unchanged so insert operations could be chained, - // however for now we return an empty list to simplify compatibility checks with hive, which - // does not return anything for insert operations. - // TODO: implement hive compatibility as rules. - sc.sparkContext.makeRDD(Nil, 1) - } -} - -/** - * :: DeveloperApi :: - */ -@DeveloperApi -case class NativeCommand( - sql: String, output: Seq[Attribute])( - @transient context: HiveContext) - extends LeafNode with Command { - - override protected[sql] lazy val sideEffectResult: Seq[String] = context.runSqlHive(sql) - - override def execute(): RDD[spark.sql.Row] = { - if (sideEffectResult.size == 0) { - context.emptyResult - } else { - val rows = sideEffectResult.map(r => new GenericRow(Array[Any](r))) - context.sparkContext.parallelize(rows, 1) - } - } - - override def otherCopyArgs = context :: Nil -} - -/** - * :: DeveloperApi :: - */ -@DeveloperApi -case class DescribeHiveTableCommand( - table: MetastoreRelation, - output: Seq[Attribute], - isExtended: Boolean)( - @transient context: HiveContext) - extends LeafNode with Command { - - // Strings with the format like Hive. It is used for result comparison in our unit tests. - lazy val hiveString: Seq[String] = { - val alignment = 20 - val delim = "\t" - - sideEffectResult.map { - case (name, dataType, comment) => - String.format("%-" + alignment + "s", name) + delim + - String.format("%-" + alignment + "s", dataType) + delim + - String.format("%-" + alignment + "s", Option(comment).getOrElse("None")) - } - } - - override protected[sql] lazy val sideEffectResult: Seq[(String, String, String)] = { - // Trying to mimic the format of Hive's output. But not exactly the same. - var results: Seq[(String, String, String)] = Nil - - val columns: Seq[FieldSchema] = table.hiveQlTable.getCols - val partitionColumns: Seq[FieldSchema] = table.hiveQlTable.getPartCols - results ++= columns.map(field => (field.getName, field.getType, field.getComment)) - if (!partitionColumns.isEmpty) { - val partColumnInfo = - partitionColumns.map(field => (field.getName, field.getType, field.getComment)) - results ++= - partColumnInfo ++ - Seq(("# Partition Information", "", "")) ++ - Seq((s"# ${output.get(0).name}", output.get(1).name, output.get(2).name)) ++ - partColumnInfo - } - - if (isExtended) { - results ++= Seq(("Detailed Table Information", table.hiveQlTable.getTTable.toString, "")) - } - - results - } - - override def execute(): RDD[Row] = { - val rows = sideEffectResult.map { - case (name, dataType, comment) => new GenericRow(Array[Any](name, dataType, comment)) - } - context.sparkContext.parallelize(rows, 1) - } - - override def otherCopyArgs = context :: Nil -} -- cgit v1.2.3