aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorNong Li <nong@databricks.com>2015-11-18 18:38:45 -0800
committerReynold Xin <rxin@databricks.com>2015-11-18 18:38:45 -0800
commit6d0848b53bbe6c5acdcf5c033cd396b1ae6e293d (patch)
treecf1c2b5184a996d4e931d1837dd7899199c2ba72 /core
parente61367b9f9bfc8e123369d55d7ca5925568b98a7 (diff)
downloadspark-6d0848b53bbe6c5acdcf5c033cd396b1ae6e293d.tar.gz
spark-6d0848b53bbe6c5acdcf5c033cd396b1ae6e293d.tar.bz2
spark-6d0848b53bbe6c5acdcf5c033cd396b1ae6e293d.zip
[SPARK-11787][SQL] Improve Parquet scan performance when using flat schemas.
This patch adds an alternate to the Parquet RecordReader from the parquet-mr project that is much faster for flat schemas. Instead of using the general converter mechanism from parquet-mr, this directly uses the lower level APIs from parquet-columnar and a customer RecordReader that directly assembles into UnsafeRows. This is optionally disabled and only used for supported schemas. Using the tpcds store sales table and doing a sum of increasingly more columns, the results are: For 1 Column: Before: 11.3M rows/second After: 18.2M rows/second For 2 Columns: Before: 7.2M rows/second After: 11.2M rows/second For 5 Columns: Before: 2.9M rows/second After: 4.5M rows/second Author: Nong Li <nong@databricks.com> Closes #9774 from nongli/parquet.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala41
1 files changed, 34 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
index 264dae7f39..4d176332b6 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SqlNewHadoopRDD.scala
@@ -20,8 +20,6 @@ package org.apache.spark.rdd
import java.text.SimpleDateFormat
import java.util.Date
-import scala.reflect.ClassTag
-
import org.apache.hadoop.conf.{Configurable, Configuration}
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapreduce._
@@ -30,10 +28,12 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.executor.DataReadMethod
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
+import org.apache.spark.storage.StorageLevel
import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.{Utils, SerializableConfiguration, ShutdownHookManager}
import org.apache.spark.{Partition => SparkPartition, _}
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.util.{SerializableConfiguration, ShutdownHookManager, Utils}
+
+import scala.reflect.ClassTag
private[spark] class SqlNewHadoopPartition(
@@ -96,6 +96,11 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
@transient protected val jobId = new JobID(jobTrackerId, id)
+ // If true, enable using the custom RecordReader for parquet. This only works for
+ // a subset of the types (no complex types).
+ protected val enableUnsafeRowParquetReader: Boolean =
+ sc.conf.getBoolean("spark.parquet.enableUnsafeRowRecordReader", true)
+
override def getPartitions: Array[SparkPartition] = {
val conf = getConf(isDriverSide = true)
val inputFormat = inputFormatClass.newInstance
@@ -150,9 +155,31 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
configurable.setConf(conf)
case _ =>
}
- private[this] var reader = format.createRecordReader(
- split.serializableHadoopSplit.value, hadoopAttemptContext)
- reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
+ private[this] var reader: RecordReader[Void, V] = null
+
+ /**
+ * If the format is ParquetInputFormat, try to create the optimized RecordReader. If this
+ * fails (for example, unsupported schema), try with the normal reader.
+ * TODO: plumb this through a different way?
+ */
+ if (enableUnsafeRowParquetReader &&
+ format.getClass.getName == "org.apache.parquet.hadoop.ParquetInputFormat") {
+ // TODO: move this class to sql.execution and remove this.
+ reader = Utils.classForName(
+ "org.apache.spark.sql.execution.datasources.parquet.UnsafeRowParquetRecordReader")
+ .newInstance().asInstanceOf[RecordReader[Void, V]]
+ try {
+ reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
+ } catch {
+ case e: Exception => reader = null
+ }
+ }
+
+ if (reader == null) {
+ reader = format.createRecordReader(
+ split.serializableHadoopSplit.value, hadoopAttemptContext)
+ reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
+ }
// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener(context => close())