diff options
author | Nong Li <nong@databricks.com> | 2015-11-18 18:38:45 -0800 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-11-18 18:38:45 -0800 |
commit | 6d0848b53bbe6c5acdcf5c033cd396b1ae6e293d (patch) | |
tree | cf1c2b5184a996d4e931d1837dd7899199c2ba72 /core | |
parent | e61367b9f9bfc8e123369d55d7ca5925568b98a7 (diff) | |
download | spark-6d0848b53bbe6c5acdcf5c033cd396b1ae6e293d.tar.gz spark-6d0848b53bbe6c5acdcf5c033cd396b1ae6e293d.tar.bz2 spark-6d0848b53bbe6c5acdcf5c033cd396b1ae6e293d.zip |
[SPARK-11787][SQL] Improve Parquet scan performance when using flat schemas.
This patch adds an alternate to the Parquet RecordReader from the parquet-mr project
that is much faster for flat schemas. Instead of using the general converter mechanism
from parquet-mr, this directly uses the lower level APIs from parquet-columnar and a
customer RecordReader that directly assembles into UnsafeRows.
This is optionally disabled and only used for supported schemas.
Using the tpcds store sales table and doing a sum of increasingly more columns, the results
are:
For 1 Column:
Before: 11.3M rows/second
After: 18.2M rows/second
For 2 Columns:
Before: 7.2M rows/second
After: 11.2M rows/second
For 5 Columns:
Before: 2.9M rows/second
After: 4.5M rows/second
Author: Nong Li <nong@databricks.com>
Closes #9774 from nongli/parquet.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala | 41 |
1 files changed, 34 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala index 264dae7f39..4d176332b6 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala @@ -20,8 +20,6 @@ package org.apache.spark.rdd import java.text.SimpleDateFormat import java.util.Date -import scala.reflect.ClassTag - import org.apache.hadoop.conf.{Configurable, Configuration} import org.apache.hadoop.io.Writable import org.apache.hadoop.mapreduce._ @@ -30,10 +28,12 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.DataReadMethod import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil +import org.apache.spark.storage.StorageLevel import org.apache.spark.unsafe.types.UTF8String +import org.apache.spark.util.{Utils, SerializableConfiguration, ShutdownHookManager} import org.apache.spark.{Partition => SparkPartition, _} -import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager, Utils} + +import scala.reflect.ClassTag private[spark] class SqlNewHadoopPartition( @@ -96,6 +96,11 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag]( @transient protected val jobId = new JobID(jobTrackerId, id) + // If true, enable using the custom RecordReader for parquet. This only works for + // a subset of the types (no complex types). + protected val enableUnsafeRowParquetReader: Boolean = + sc.conf.getBoolean("spark.parquet.enableUnsafeRowRecordReader", true) + override def getPartitions: Array[SparkPartition] = { val conf = getConf(isDriverSide = true) val inputFormat = inputFormatClass.newInstance @@ -150,9 +155,31 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag]( configurable.setConf(conf) case _ => } - private[this] var reader = format.createRecordReader( - split.serializableHadoopSplit.value, hadoopAttemptContext) - reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) + private[this] var reader: RecordReader[Void, V] = null + + /** + * If the format is ParquetInputFormat, try to create the optimized RecordReader. If this + * fails (for example, unsupported schema), try with the normal reader. + * TODO: plumb this through a different way? + */ + if (enableUnsafeRowParquetReader && + format.getClass.getName == "org.apache.parquet.hadoop.ParquetInputFormat") { + // TODO: move this class to sql.execution and remove this. + reader = Utils.classForName( + "org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader") + .newInstance().asInstanceOf[RecordReader[Void, V]] + try { + reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) + } catch { + case e: Exception => reader = null + } + } + + if (reader == null) { + reader = format.createRecordReader( + split.serializableHadoopSplit.value, hadoopAttemptContext) + reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) + } // Register an on-task-completion callback to close the input stream. context.addTaskCompletionListener(context => close()) |