aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2014-07-28 10:59:53 -0700
committerMichael Armbrust <michael@databricks.com>2014-07-28 10:59:53 -0700
commit2b8d89e30ebfe2272229a1eddd7542d7437c9924 (patch)
tree631e24b0ed25be8f7b5a5eb3074e1c283e54a1f4
parenta7d145e98c55fa66a541293930f25d9cdc25f3b4 (diff)
downloadspark-2b8d89e30ebfe2272229a1eddd7542d7437c9924.tar.gz
spark-2b8d89e30ebfe2272229a1eddd7542d7437c9924.tar.bz2
spark-2b8d89e30ebfe2272229a1eddd7542d7437c9924.zip
[SPARK-2523] [SQL] Hadoop table scan bug fixing
In HiveTableScan.scala, ObjectInspector was created for all of the partition based records, which probably causes ClassCastException if the object inspector is not identical among table & partitions. This is the follow up with: https://github.com/apache/spark/pull/1408 https://github.com/apache/spark/pull/1390 I've run a micro benchmark in my local with 15000000 records totally, and got the result as below: With This Patch | Partition-Based Table | Non-Partition-Based Table ------------ | ------------- | ------------- No | 1927 ms | 1885 ms Yes | 1541 ms | 1524 ms It showed this patch will also improve the performance. PS: the benchmark code is also attached. (thanks liancheng ) ``` package org.apache.spark.sql.hive import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.spark.sql._ object HiveTableScanPrepare extends App { case class Record(key: String, value: String) val sparkContext = new SparkContext( new SparkConf() .setMaster("local") .setAppName(getClass.getSimpleName.stripSuffix("$"))) val hiveContext = new LocalHiveContext(sparkContext) val rdd = sparkContext.parallelize((1 to 3000000).map(i => Record(s"$i", s"val_$i"))) import hiveContext._ hql("SHOW TABLES") hql("DROP TABLE if exists part_scan_test") hql("DROP TABLE if exists scan_test") hql("DROP TABLE if exists records") rdd.registerAsTable("records") hql("""CREATE TABLE part_scan_test (key STRING, value STRING) PARTITIONED BY (part1 string, part2 STRING) | ROW FORMAT SERDE | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe' | STORED AS RCFILE """.stripMargin) hql("""CREATE TABLE scan_test (key STRING, value STRING) | ROW FORMAT SERDE | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe' | STORED AS RCFILE """.stripMargin) for (part1 <- 2000 until 2001) { for (part2 <- 1 to 5) { hql(s"""from records | insert into table part_scan_test PARTITION (part1='$part1', part2='2010-01-$part2') | select key, value """.stripMargin) hql(s"""from records | insert into table scan_test select key, value """.stripMargin) } } } object HiveTableScanTest extends App { val sparkContext = new SparkContext( new SparkConf() .setMaster("local") .setAppName(getClass.getSimpleName.stripSuffix("$"))) val hiveContext = new LocalHiveContext(sparkContext) import hiveContext._ hql("SHOW TABLES") val part_scan_test = hql("select key, value from part_scan_test") val scan_test = hql("select key, value from scan_test") val r_part_scan_test = (0 to 5).map(i => benchmark(part_scan_test)) val r_scan_test = (0 to 5).map(i => benchmark(scan_test)) println("Scanning Partition-Based Table") r_part_scan_test.foreach(printResult) println("Scanning Non-Partition-Based Table") r_scan_test.foreach(printResult) def printResult(result: (Long, Long)) { println(s"Duration: ${result._1} ms Result: ${result._2}") } def benchmark(srdd: SchemaRDD) = { val begin = System.currentTimeMillis() val result = srdd.count() val end = System.currentTimeMillis() ((end - begin), result) } } ``` Author: Cheng Hao <hao.cheng@intel.com> Closes #1439 from chenghao-intel/hadoop_table_scan and squashes the following commits: 888968f [Cheng Hao] Fix issues in code style 27540ba [Cheng Hao] Fix the TableScan Bug while partition serde differs 40a24a7 [Cheng Hao] Add Unit Test
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala113
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala90
-rw-r--r--sql/hive/src/test/resources/golden/partition_based_table_scan_with_different_serde-0-8caed2a6e80250a6d38a59388679c2982
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala48
4 files changed, 138 insertions, 115 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index c3942578d6..82c88280d7 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -24,6 +24,8 @@ import org.apache.hadoop.hive.ql.exec.Utilities
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.serde2.Deserializer
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
+
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf}
@@ -31,13 +33,16 @@ import org.apache.spark.SerializableWritable
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Row, GenericMutableRow, Literal, Cast}
+import org.apache.spark.sql.catalyst.types.DataType
+
/**
* A trait for subclasses that handle table scans.
*/
private[hive] sealed trait TableReader {
- def makeRDDForTable(hiveTable: HiveTable): RDD[_]
+ def makeRDDForTable(hiveTable: HiveTable): RDD[Row]
- def makeRDDForPartitionedTable(partitions: Seq[HivePartition]): RDD[_]
+ def makeRDDForPartitionedTable(partitions: Seq[HivePartition]): RDD[Row]
}
@@ -46,7 +51,10 @@ private[hive] sealed trait TableReader {
* data warehouse directory.
*/
private[hive]
-class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveContext)
+class HadoopTableReader(
+ @transient attributes: Seq[Attribute],
+ @transient relation: MetastoreRelation,
+ @transient sc: HiveContext)
extends TableReader {
// Choose the minimum number of splits. If mapred.map.tasks is set, then use that unless
@@ -63,10 +71,10 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon
def hiveConf = _broadcastedHiveConf.value.value
- override def makeRDDForTable(hiveTable: HiveTable): RDD[_] =
+ override def makeRDDForTable(hiveTable: HiveTable): RDD[Row] =
makeRDDForTable(
hiveTable,
- _tableDesc.getDeserializerClass.asInstanceOf[Class[Deserializer]],
+ relation.tableDesc.getDeserializerClass.asInstanceOf[Class[Deserializer]],
filterOpt = None)
/**
@@ -81,14 +89,14 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon
def makeRDDForTable(
hiveTable: HiveTable,
deserializerClass: Class[_ <: Deserializer],
- filterOpt: Option[PathFilter]): RDD[_] = {
+ filterOpt: Option[PathFilter]): RDD[Row] = {
assert(!hiveTable.isPartitioned, """makeRDDForTable() cannot be called on a partitioned table,
since input formats may differ across partitions. Use makeRDDForTablePartitions() instead.""")
// Create local references to member variables, so that the entire `this` object won't be
// serialized in the closure below.
- val tableDesc = _tableDesc
+ val tableDesc = relation.tableDesc
val broadcastedHiveConf = _broadcastedHiveConf
val tablePath = hiveTable.getPath
@@ -99,23 +107,20 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon
.asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
val hadoopRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
+ val attrsWithIndex = attributes.zipWithIndex
+ val mutableRow = new GenericMutableRow(attrsWithIndex.length)
val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter =>
val hconf = broadcastedHiveConf.value.value
val deserializer = deserializerClass.newInstance()
deserializer.initialize(hconf, tableDesc.getProperties)
- // Deserialize each Writable to get the row value.
- iter.map {
- case v: Writable => deserializer.deserialize(v)
- case value =>
- sys.error(s"Unable to deserialize non-Writable: $value of ${value.getClass.getName}")
- }
+ HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow)
}
deserializedHadoopRDD
}
- override def makeRDDForPartitionedTable(partitions: Seq[HivePartition]): RDD[_] = {
+ override def makeRDDForPartitionedTable(partitions: Seq[HivePartition]): RDD[Row] = {
val partitionToDeserializer = partitions.map(part =>
(part, part.getDeserializer.getClass.asInstanceOf[Class[Deserializer]])).toMap
makeRDDForPartitionedTable(partitionToDeserializer, filterOpt = None)
@@ -132,9 +137,9 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon
* subdirectory of each partition being read. If None, then all files are accepted.
*/
def makeRDDForPartitionedTable(
- partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]],
- filterOpt: Option[PathFilter]): RDD[_] = {
-
+ partitionToDeserializer: Map[HivePartition,
+ Class[_ <: Deserializer]],
+ filterOpt: Option[PathFilter]): RDD[Row] = {
val hivePartitionRDDs = partitionToDeserializer.map { case (partition, partDeserializer) =>
val partDesc = Utilities.getPartitionDesc(partition)
val partPath = partition.getPartitionPath
@@ -156,33 +161,42 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon
}
// Create local references so that the outer object isn't serialized.
- val tableDesc = _tableDesc
+ val tableDesc = relation.tableDesc
val broadcastedHiveConf = _broadcastedHiveConf
val localDeserializer = partDeserializer
+ val mutableRow = new GenericMutableRow(attributes.length)
+
+ // split the attributes (output schema) into 2 categories:
+ // (partition keys, ordinal), (normal attributes, ordinal), the ordinal mean the
+ // index of the attribute in the output Row.
+ val (partitionKeys, attrs) = attributes.zipWithIndex.partition(attr => {
+ relation.partitionKeys.indexOf(attr._1) >= 0
+ })
+
+ def fillPartitionKeys(parts: Array[String], row: GenericMutableRow) = {
+ partitionKeys.foreach { case (attr, ordinal) =>
+ // get partition key ordinal for a given attribute
+ val partOridinal = relation.partitionKeys.indexOf(attr)
+ row(ordinal) = Cast(Literal(parts(partOridinal)), attr.dataType).eval(null)
+ }
+ }
+ // fill the partition key for the given MutableRow Object
+ fillPartitionKeys(partValues, mutableRow)
val hivePartitionRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
hivePartitionRDD.mapPartitions { iter =>
val hconf = broadcastedHiveConf.value.value
- val rowWithPartArr = new Array[Object](2)
-
- // The update and deserializer initialization are intentionally
- // kept out of the below iter.map loop to save performance.
- rowWithPartArr.update(1, partValues)
val deserializer = localDeserializer.newInstance()
deserializer.initialize(hconf, partProps)
- // Map each tuple to a row object
- iter.map { value =>
- val deserializedRow = deserializer.deserialize(value)
- rowWithPartArr.update(0, deserializedRow)
- rowWithPartArr.asInstanceOf[Object]
- }
+ // fill the non partition key attributes
+ HadoopTableReader.fillObject(iter, deserializer, attrs, mutableRow)
}
}.toSeq
// Even if we don't use any partitions, we still need an empty RDD
if (hivePartitionRDDs.size == 0) {
- new EmptyRDD[Object](sc.sparkContext)
+ new EmptyRDD[Row](sc.sparkContext)
} else {
new UnionRDD(hivePartitionRDDs(0).context, hivePartitionRDDs)
}
@@ -225,10 +239,9 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon
// Only take the value (skip the key) because Hive works only with values.
rdd.map(_._2)
}
-
}
-private[hive] object HadoopTableReader {
+private[hive] object HadoopTableReader extends HiveInspectors {
/**
* Curried. After given an argument for 'path', the resulting JobConf => Unit closure is used to
* instantiate a HadoopRDD.
@@ -241,4 +254,40 @@ private[hive] object HadoopTableReader {
val bufferSize = System.getProperty("spark.buffer.size", "65536")
jobConf.set("io.file.buffer.size", bufferSize)
}
+
+ /**
+ * Transform the raw data(Writable object) into the Row object for an iterable input
+ * @param iter Iterable input which represented as Writable object
+ * @param deserializer Deserializer associated with the input writable object
+ * @param attrs Represents the row attribute names and its zero-based position in the MutableRow
+ * @param row reusable MutableRow object
+ *
+ * @return Iterable Row object that transformed from the given iterable input.
+ */
+ def fillObject(
+ iter: Iterator[Writable],
+ deserializer: Deserializer,
+ attrs: Seq[(Attribute, Int)],
+ row: GenericMutableRow): Iterator[Row] = {
+ val soi = deserializer.getObjectInspector().asInstanceOf[StructObjectInspector]
+ // get the field references according to the attributes(output of the reader) required
+ val fieldRefs = attrs.map { case (attr, idx) => (soi.getStructFieldRef(attr.name), idx) }
+
+ // Map each tuple to a row object
+ iter.map { value =>
+ val raw = deserializer.deserialize(value)
+ var idx = 0;
+ while (idx < fieldRefs.length) {
+ val fieldRef = fieldRefs(idx)._1
+ val fieldIdx = fieldRefs(idx)._2
+ val fieldValue = soi.getStructFieldData(raw, fieldRef)
+
+ row(fieldIdx) = unwrapData(fieldValue, fieldRef.getFieldObjectInspector())
+
+ idx += 1
+ }
+
+ row: Row
+ }
+ }
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
index e7016fa16e..8920e2a76a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala
@@ -34,7 +34,6 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.types.{BooleanType, DataType}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive._
-import org.apache.spark.util.MutablePair
/**
* :: DeveloperApi ::
@@ -50,8 +49,7 @@ case class HiveTableScan(
relation: MetastoreRelation,
partitionPruningPred: Option[Expression])(
@transient val context: HiveContext)
- extends LeafNode
- with HiveInspectors {
+ extends LeafNode {
require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned,
"Partition pruning predicates only supported for partitioned tables.")
@@ -67,42 +65,7 @@ case class HiveTableScan(
}
@transient
- private[this] val hadoopReader = new HadoopTableReader(relation.tableDesc, context)
-
- /**
- * The hive object inspector for this table, which can be used to extract values from the
- * serialized row representation.
- */
- @transient
- private[this] lazy val objectInspector =
- relation.tableDesc.getDeserializer.getObjectInspector.asInstanceOf[StructObjectInspector]
-
- /**
- * Functions that extract the requested attributes from the hive output. Partitioned values are
- * casted from string to its declared data type.
- */
- @transient
- protected lazy val attributeFunctions: Seq[(Any, Array[String]) => Any] = {
- attributes.map { a =>
- val ordinal = relation.partitionKeys.indexOf(a)
- if (ordinal >= 0) {
- val dataType = relation.partitionKeys(ordinal).dataType
- (_: Any, partitionKeys: Array[String]) => {
- castFromString(partitionKeys(ordinal), dataType)
- }
- } else {
- val ref = objectInspector.getAllStructFieldRefs
- .find(_.getFieldName == a.name)
- .getOrElse(sys.error(s"Can't find attribute $a"))
- val fieldObjectInspector = ref.getFieldObjectInspector
-
- (row: Any, _: Array[String]) => {
- val data = objectInspector.getStructFieldData(row, ref)
- unwrapData(data, fieldObjectInspector)
- }
- }
- }
- }
+ private[this] val hadoopReader = new HadoopTableReader(attributes, relation, context)
private[this] def castFromString(value: String, dataType: DataType) = {
Cast(Literal(value), dataType).eval(null)
@@ -114,6 +77,7 @@ case class HiveTableScan(
val columnInternalNames = neededColumnIDs.map(HiveConf.getColumnInternalName(_)).mkString(",")
if (attributes.size == relation.output.size) {
+ // SQLContext#pruneFilterProject guarantees no duplicated value in `attributes`
ColumnProjectionUtils.setFullyReadColumns(hiveConf)
} else {
ColumnProjectionUtils.appendReadColumnIDs(hiveConf, neededColumnIDs)
@@ -140,12 +104,6 @@ case class HiveTableScan(
addColumnMetadataToConf(context.hiveconf)
- private def inputRdd = if (!relation.hiveQlTable.isPartitioned) {
- hadoopReader.makeRDDForTable(relation.hiveQlTable)
- } else {
- hadoopReader.makeRDDForPartitionedTable(prunePartitions(relation.hiveQlPartitions))
- }
-
/**
* Prunes partitions not involve the query plan.
*
@@ -169,44 +127,10 @@ case class HiveTableScan(
}
}
- override def execute() = {
- inputRdd.mapPartitions { iterator =>
- if (iterator.isEmpty) {
- Iterator.empty
- } else {
- val mutableRow = new GenericMutableRow(attributes.length)
- val mutablePair = new MutablePair[Any, Array[String]]()
- val buffered = iterator.buffered
-
- // NOTE (lian): Critical path of Hive table scan, unnecessary FP style code and pattern
- // matching are avoided intentionally.
- val rowsAndPartitionKeys = buffered.head match {
- // With partition keys
- case _: Array[Any] =>
- buffered.map { case array: Array[Any] =>
- val deserializedRow = array(0)
- val partitionKeys = array(1).asInstanceOf[Array[String]]
- mutablePair.update(deserializedRow, partitionKeys)
- }
-
- // Without partition keys
- case _ =>
- val emptyPartitionKeys = Array.empty[String]
- buffered.map { deserializedRow =>
- mutablePair.update(deserializedRow, emptyPartitionKeys)
- }
- }
-
- rowsAndPartitionKeys.map { pair =>
- var i = 0
- while (i < attributes.length) {
- mutableRow(i) = attributeFunctions(i)(pair._1, pair._2)
- i += 1
- }
- mutableRow: Row
- }
- }
- }
+ override def execute() = if (!relation.hiveQlTable.isPartitioned) {
+ hadoopReader.makeRDDForTable(relation.hiveQlTable)
+ } else {
+ hadoopReader.makeRDDForPartitionedTable(prunePartitions(relation.hiveQlPartitions))
}
override def output = attributes
diff --git a/sql/hive/src/test/resources/golden/partition_based_table_scan_with_different_serde-0-8caed2a6e80250a6d38a59388679c298 b/sql/hive/src/test/resources/golden/partition_based_table_scan_with_different_serde-0-8caed2a6e80250a6d38a59388679c298
new file mode 100644
index 0000000000..f369f21e18
--- /dev/null
+++ b/sql/hive/src/test/resources/golden/partition_based_table_scan_with_different_serde-0-8caed2a6e80250a6d38a59388679c298
@@ -0,0 +1,2 @@
+100 100 2010-01-01
+200 200 2010-01-02
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
new file mode 100644
index 0000000000..bcb00f871d
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveTableScanSuite.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.sql.hive.test.TestHive
+
+class HiveTableScanSuite extends HiveComparisonTest {
+ // MINOR HACK: You must run a query before calling reset the first time.
+ TestHive.hql("SHOW TABLES")
+ TestHive.reset()
+
+ TestHive.hql("""CREATE TABLE part_scan_test (key STRING, value STRING) PARTITIONED BY (ds STRING)
+ | ROW FORMAT SERDE
+ | 'org.apache.hadoop.hive.serde2.columnar.LazyBinaryColumnarSerDe'
+ | STORED AS RCFILE
+ """.stripMargin)
+ TestHive.hql("""FROM src
+ | INSERT INTO TABLE part_scan_test PARTITION (ds='2010-01-01')
+ | SELECT 100,100 LIMIT 1
+ """.stripMargin)
+ TestHive.hql("""ALTER TABLE part_scan_test SET SERDE
+ | 'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe'
+ """.stripMargin)
+ TestHive.hql("""FROM src INSERT INTO TABLE part_scan_test PARTITION (ds='2010-01-02')
+ | SELECT 200,200 LIMIT 1
+ """.stripMargin)
+
+ createQueryTest("partition_based_table_scan_with_different_serde",
+ "SELECT * from part_scan_test", false)
+}