aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorCheng Lian <lian.cs.zju@gmail.com>2014-09-13 15:08:30 -0700
committerMichael Armbrust <michael@databricks.com>2014-09-13 15:08:30 -0700
commit74049249abb952ad061c0e221c22ff894a9e9c8d (patch)
treed5a8a9880c2565b3d5477e3b9054fc8018613db2 /sql/hive
parent184cd51c4207c23726da97f907f2d912a5a44845 (diff)
downloadspark-74049249abb952ad061c0e221c22ff894a9e9c8d.tar.gz
spark-74049249abb952ad061c0e221c22ff894a9e9c8d.tar.bz2
spark-74049249abb952ad061c0e221c22ff894a9e9c8d.zip
[SPARK-3294][SQL] Eliminates boxing costs from in-memory columnar storage
This is a major refactoring of the in-memory columnar storage implementation, aims to eliminate boxing costs from critical paths (building/accessing column buffers) as much as possible. The basic idea is to refactor all major interfaces into a row-based form and use them together with `SpecificMutableRow`. The difficult part is how to adapt all compression schemes, esp. `RunLengthEncoding` and `DictionaryEncoding`, to this design. Since in-memory compression is disabled by default for now, and this PR should be strictly better than before no matter in-memory compression is enabled or not, maybe I'll finish that part in another PR. **UPDATE** This PR also took the chance to optimize `HiveTableScan` by 1. leveraging `SpecificMutableRow` to avoid boxing cost, and 1. building specific `Writable` unwrapper functions a head of time to avoid per row pattern matching and branching costs. TODO - [x] Benchmark - [ ] ~~Eliminate boxing costs in `RunLengthEncoding`~~ (left to future PRs) - [ ] ~~Eliminate boxing costs in `DictionaryEncoding` (seems not easy to do without specializing `DictionaryEncoding` for every supported column type)~~ (left to future PRs) ## Micro benchmark The benchmark uses a 10 million line CSV table consists of bytes, shorts, integers, longs, floats and doubles, measures the time to build the in-memory version of this table, and the time to scan the whole in-memory table. Benchmark code can be found [here](https://gist.github.com/liancheng/fe70a148de82e77bd2c8#file-hivetablescanbenchmark-scala). Script used to generate the input table can be found [here](https://gist.github.com/liancheng/fe70a148de82e77bd2c8#file-tablegen-scala). Speedup: - Hive table scanning + column buffer building: **18.74%** The original benchmark uses 1K as in-memory batch size, when increased to 10K, it can be 28.32% faster. - In-memory table scanning: **7.95%** Before: | Building | Scanning ------- | -------- | -------- 1 | 16472 | 525 2 | 16168 | 530 3 | 16386 | 529 4 | 16184 | 538 5 | 16209 | 521 Average | 16283.8 | 528.6 After: | Building | Scanning ------- | -------- | -------- 1 | 13124 | 458 2 | 13260 | 529 3 | 12981 | 463 4 | 13214 | 483 5 | 13583 | 500 Average | 13232.4 | 486.6 Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #2327 from liancheng/prevent-boxing/unboxing and squashes the following commits: 4419fe4 [Cheng Lian] Addressing comments e5d2cf2 [Cheng Lian] Bug fix: should call setNullAt when field value is null to avoid NPE 8b8552b [Cheng Lian] Only checks for partition batch pruning flag once 489f97b [Cheng Lian] Bug fix: TableReader.fillObject uses wrong ordinals 97bbc4e [Cheng Lian] Optimizes hive.TableReader by by providing specific Writable unwrappers a head of time 3dc1f94 [Cheng Lian] Minor changes to eliminate row object creation 5b39cb9 [Cheng Lian] Lowers log level of compression scheme details f2a7890 [Cheng Lian] Use SpecificMutableRow in InMemoryColumnarTableScan to avoid boxing 9cf30b0 [Cheng Lian] Added row based ColumnType.append/extract 456c366 [Cheng Lian] Made compression decoder row based edac3cd [Cheng Lian] Makes ColumnAccessor.extractSingle row based 8216936 [Cheng Lian] Removes boxing cost in IntDelta and LongDelta by providing specialized implementations b70d519 [Cheng Lian] Made some in-memory columnar storage interfaces row-based
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala119
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala18
2 files changed, 84 insertions, 53 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index 329f80cad4..84fafcde63 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -25,16 +25,14 @@ import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table =>
import org.apache.hadoop.hive.ql.plan.{PlanUtils, TableDesc}
import org.apache.hadoop.hive.serde2.Deserializer
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector
-
+import org.apache.hadoop.hive.serde2.objectinspector.primitive._
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf}
import org.apache.spark.SerializableWritable
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD}
-
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Row, GenericMutableRow, Literal, Cast}
-import org.apache.spark.sql.catalyst.types.DataType
+import org.apache.spark.sql.catalyst.expressions._
/**
* A trait for subclasses that handle table scans.
@@ -108,12 +106,12 @@ class HadoopTableReader(
val hadoopRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
val attrsWithIndex = attributes.zipWithIndex
- val mutableRow = new GenericMutableRow(attrsWithIndex.length)
+ val mutableRow = new SpecificMutableRow(attributes.map(_.dataType))
+
val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter =>
val hconf = broadcastedHiveConf.value.value
val deserializer = deserializerClass.newInstance()
deserializer.initialize(hconf, tableDesc.getProperties)
-
HadoopTableReader.fillObject(iter, deserializer, attrsWithIndex, mutableRow)
}
@@ -164,33 +162,32 @@ class HadoopTableReader(
val tableDesc = relation.tableDesc
val broadcastedHiveConf = _broadcastedHiveConf
val localDeserializer = partDeserializer
- val mutableRow = new GenericMutableRow(attributes.length)
-
- // split the attributes (output schema) into 2 categories:
- // (partition keys, ordinal), (normal attributes, ordinal), the ordinal mean the
- // index of the attribute in the output Row.
- val (partitionKeys, attrs) = attributes.zipWithIndex.partition(attr => {
- relation.partitionKeys.indexOf(attr._1) >= 0
- })
-
- def fillPartitionKeys(parts: Array[String], row: GenericMutableRow) = {
- partitionKeys.foreach { case (attr, ordinal) =>
- // get partition key ordinal for a given attribute
- val partOridinal = relation.partitionKeys.indexOf(attr)
- row(ordinal) = Cast(Literal(parts(partOridinal)), attr.dataType).eval(null)
+ val mutableRow = new SpecificMutableRow(attributes.map(_.dataType))
+
+ // Splits all attributes into two groups, partition key attributes and those that are not.
+ // Attached indices indicate the position of each attribute in the output schema.
+ val (partitionKeyAttrs, nonPartitionKeyAttrs) =
+ attributes.zipWithIndex.partition { case (attr, _) =>
+ relation.partitionKeys.contains(attr)
+ }
+
+ def fillPartitionKeys(rawPartValues: Array[String], row: MutableRow) = {
+ partitionKeyAttrs.foreach { case (attr, ordinal) =>
+ val partOrdinal = relation.partitionKeys.indexOf(attr)
+ row(ordinal) = Cast(Literal(rawPartValues(partOrdinal)), attr.dataType).eval(null)
}
}
- // fill the partition key for the given MutableRow Object
+
+ // Fill all partition keys to the given MutableRow object
fillPartitionKeys(partValues, mutableRow)
- val hivePartitionRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
- hivePartitionRDD.mapPartitions { iter =>
+ createHadoopRdd(tableDesc, inputPathStr, ifc).mapPartitions { iter =>
val hconf = broadcastedHiveConf.value.value
val deserializer = localDeserializer.newInstance()
deserializer.initialize(hconf, partProps)
- // fill the non partition key attributes
- HadoopTableReader.fillObject(iter, deserializer, attrs, mutableRow)
+ // fill the non partition key attributes
+ HadoopTableReader.fillObject(iter, deserializer, nonPartitionKeyAttrs, mutableRow)
}
}.toSeq
@@ -257,38 +254,64 @@ private[hive] object HadoopTableReader extends HiveInspectors {
}
/**
- * Transform the raw data(Writable object) into the Row object for an iterable input
- * @param iter Iterable input which represented as Writable object
- * @param deserializer Deserializer associated with the input writable object
- * @param attrs Represents the row attribute names and its zero-based position in the MutableRow
- * @param row reusable MutableRow object
- *
- * @return Iterable Row object that transformed from the given iterable input.
+ * Transform all given raw `Writable`s into `Row`s.
+ *
+ * @param iterator Iterator of all `Writable`s to be transformed
+ * @param deserializer The `Deserializer` associated with the input `Writable`
+ * @param nonPartitionKeyAttrs Attributes that should be filled together with their corresponding
+ * positions in the output schema
+ * @param mutableRow A reusable `MutableRow` that should be filled
+ * @return An `Iterator[Row]` transformed from `iterator`
*/
def fillObject(
- iter: Iterator[Writable],
+ iterator: Iterator[Writable],
deserializer: Deserializer,
- attrs: Seq[(Attribute, Int)],
- row: GenericMutableRow): Iterator[Row] = {
+ nonPartitionKeyAttrs: Seq[(Attribute, Int)],
+ mutableRow: MutableRow): Iterator[Row] = {
+
val soi = deserializer.getObjectInspector().asInstanceOf[StructObjectInspector]
- // get the field references according to the attributes(output of the reader) required
- val fieldRefs = attrs.map { case (attr, idx) => (soi.getStructFieldRef(attr.name), idx) }
+ val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map { case (attr, ordinal) =>
+ soi.getStructFieldRef(attr.name) -> ordinal
+ }.unzip
+
+ // Builds specific unwrappers ahead of time according to object inspector types to avoid pattern
+ // matching and branching costs per row.
+ val unwrappers: Seq[(Any, MutableRow, Int) => Unit] = fieldRefs.map {
+ _.getFieldObjectInspector match {
+ case oi: BooleanObjectInspector =>
+ (value: Any, row: MutableRow, ordinal: Int) => row.setBoolean(ordinal, oi.get(value))
+ case oi: ByteObjectInspector =>
+ (value: Any, row: MutableRow, ordinal: Int) => row.setByte(ordinal, oi.get(value))
+ case oi: ShortObjectInspector =>
+ (value: Any, row: MutableRow, ordinal: Int) => row.setShort(ordinal, oi.get(value))
+ case oi: IntObjectInspector =>
+ (value: Any, row: MutableRow, ordinal: Int) => row.setInt(ordinal, oi.get(value))
+ case oi: LongObjectInspector =>
+ (value: Any, row: MutableRow, ordinal: Int) => row.setLong(ordinal, oi.get(value))
+ case oi: FloatObjectInspector =>
+ (value: Any, row: MutableRow, ordinal: Int) => row.setFloat(ordinal, oi.get(value))
+ case oi: DoubleObjectInspector =>
+ (value: Any, row: MutableRow, ordinal: Int) => row.setDouble(ordinal, oi.get(value))
+ case oi =>
+ (value: Any, row: MutableRow, ordinal: Int) => row(ordinal) = unwrapData(value, oi)
+ }
+ }
// Map each tuple to a row object
- iter.map { value =>
+ iterator.map { value =>
val raw = deserializer.deserialize(value)
- var idx = 0;
- while (idx < fieldRefs.length) {
- val fieldRef = fieldRefs(idx)._1
- val fieldIdx = fieldRefs(idx)._2
- val fieldValue = soi.getStructFieldData(raw, fieldRef)
-
- row(fieldIdx) = unwrapData(fieldValue, fieldRef.getFieldObjectInspector())
-
- idx += 1
+ var i = 0
+ while (i < fieldRefs.length) {
+ val fieldValue = soi.getStructFieldData(raw, fieldRefs(i))
+ if (fieldValue == null) {
+ mutableRow.setNullAt(fieldOrdinals(i))
+ } else {
+ unwrappers(i)(fieldValue, mutableRow, fieldOrdinals(i))
+ }
+ i += 1
}
- row: Row
+ mutableRow: Row
}
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 6bf8d18a5c..8c8a8b124a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -295,8 +295,16 @@ class HiveQuerySuite extends HiveComparisonTest {
"SELECT (CASE WHEN key > 2 THEN 3 WHEN 2 > key THEN 2 ELSE 0 END) FROM src WHERE key < 15")
test("implement identity function using case statement") {
- val actual = sql("SELECT (CASE key WHEN key THEN key END) FROM src").collect().toSet
- val expected = sql("SELECT key FROM src").collect().toSet
+ val actual = sql("SELECT (CASE key WHEN key THEN key END) FROM src")
+ .map { case Row(i: Int) => i }
+ .collect()
+ .toSet
+
+ val expected = sql("SELECT key FROM src")
+ .map { case Row(i: Int) => i }
+ .collect()
+ .toSet
+
assert(actual === expected)
}
@@ -559,9 +567,9 @@ class HiveQuerySuite extends HiveComparisonTest {
val testVal = "test.val.0"
val nonexistentKey = "nonexistent"
val KV = "([^=]+)=([^=]*)".r
- def collectResults(rdd: SchemaRDD): Set[(String, String)] =
- rdd.collect().map {
- case Row(key: String, value: String) => key -> value
+ def collectResults(rdd: SchemaRDD): Set[(String, String)] =
+ rdd.collect().map {
+ case Row(key: String, value: String) => key -> value
case Row(KV(key, value)) => key -> value
}.toSet
clear()