aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSean Zhong <seanzhong@databricks.com>2016-08-05 11:19:20 +0800
committerWenchen Fan <wenchen@databricks.com>2016-08-05 11:19:20 +0800
commit1fa644497aed0a6d22f5fc7bf8e752508053b75b (patch)
tree1f8a2fd82497f6511b24d8ed1dc023f7884ed58f
parent53e766cfe2112265b606b68146a5798ccf7ec682 (diff)
downloadspark-1fa644497aed0a6d22f5fc7bf8e752508053b75b.tar.gz
spark-1fa644497aed0a6d22f5fc7bf8e752508053b75b.tar.bz2
spark-1fa644497aed0a6d22f5fc7bf8e752508053b75b.zip
[SPARK-16907][SQL] Fix performance regression for parquet table when vectorized parquet record reader is not being used
## What changes were proposed in this pull request? For non-partitioned parquet table, if the vectorized parquet record reader is not being used, Spark 2.0 adds an extra unnecessary memory copy to append partition values for each row. There are several typical cases that vectorized parquet record reader is not being used: 1. When the table schema is not flat, like containing nested fields. 2. When `spark.sql.parquet.enableVectorizedReader = false` By fixing this bug, we get about 20% - 30% performance gain in test case like this: ``` // Generates parquet table with nested columns spark.range(100000000).select(struct($"id").as("nc")).write.parquet("/tmp/data4") def time[R](block: => R): Long = { val t0 = System.nanoTime() val result = block // call-by-name val t1 = System.nanoTime() println("Elapsed time: " + (t1 - t0)/1000000 + "ms") (t1 - t0)/1000000 } val x = ((0 until 20).toList.map(x => time(spark.read.parquet("/tmp/data4").filter($"nc.id" < 100).collect()))).sum/20 ``` ## How was this patch tested? After a few times warm up, we get 26% performance improvement Before fix: ``` Average: 4584ms, raw data (10 tries): 4726ms 4509ms 4454ms 4879ms 4586ms 4733ms 4500ms 4361ms 4456ms 4640ms ``` After fix: ``` Average: 3614ms, raw data(10 tries): 3554ms 3740ms 4019ms 3439ms 3460ms 3664ms 3557ms 3584ms 3612ms 3531ms ``` Test env: Intel(R) Core(TM) i7-6700 CPU 3.40GHz, Intel SSD SC2KW24 Author: Sean Zhong <seanzhong@databricks.com> Closes #14445 from clockfly/fix_parquet_regression_2.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala8
1 files changed, 7 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index c3e75f1934..ea32506c09 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -368,6 +368,7 @@ private[sql] class ParquetFileFormat
vectorizedReader
} else {
logDebug(s"Falling back to parquet-mr")
+ // ParquetRecordReader returns UnsafeRow
val reader = pushed match {
case Some(filter) =>
new ParquetRecordReader[UnsafeRow](
@@ -394,8 +395,13 @@ private[sql] class ParquetFileFormat
// This is a horrible erasure hack... if we type the iterator above, then it actually check
// the type in next() and we get a class cast exception. If we make that function return
// Object, then we can defer the cast until later!
- iter.asInstanceOf[Iterator[InternalRow]]
+ if (partitionSchema.length == 0) {
+ // There is no partition columns
+ iter.asInstanceOf[Iterator[InternalRow]]
+ } else {
+ iter.asInstanceOf[Iterator[InternalRow]]
.map(d => appendPartitionColumns(joinedRow(d, file.partitionValues)))
+ }
}
}
}