diff options
author | Davies Liu <davies@databricks.com> | 2016-04-06 15:33:39 -0700 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2016-04-06 15:33:39 -0700 |
commit | 5a4b11a901703464b9261dea0642d80cf8d4856c (patch) | |
tree | 8807e70f4508b8e6253ebc116b9ea2b1475997b6 | |
parent | a4ead6d3881f071a2ae53ff1c961c6ac388cac1d (diff) | |
download | spark-5a4b11a901703464b9261dea0642d80cf8d4856c.tar.gz spark-5a4b11a901703464b9261dea0642d80cf8d4856c.tar.bz2 spark-5a4b11a901703464b9261dea0642d80cf8d4856c.zip |
[SPARK-14224] [SPARK-14223] [SPARK-14310] [SQL] fix RowEncoder and parquet reader for wide table
## What changes were proposed in this pull request?
1) fix the RowEncoder for wide table (many columns) by splitting the generate code into multiple functions.
2) Separate DataSourceScan as RowDataSourceScan and BatchedDataSourceScan
3) Disable the returning columnar batch in parquet reader if there are many columns.
4) Added a internal config for maximum number of fields (nested) columns supported by whole stage codegen.
Closes #12098
## How was this patch tested?
Add a tests for table with 1000 columns.
Author: Davies Liu <davies@databricks.com>
Closes #12047 from davies/many_columns.
13 files changed, 267 insertions, 234 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala index a0490e1351..28b6b2adf8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala @@ -524,22 +524,26 @@ case class CreateExternalRow(children: Seq[Expression], schema: StructType) override def genCode(ctx: CodegenContext, ev: ExprCode): String = { val rowClass = classOf[GenericRowWithSchema].getName val values = ctx.freshName("values") - val schemaField = ctx.addReferenceObj("schema", schema) - s""" - boolean ${ev.isNull} = false; - final Object[] $values = new Object[${children.size}]; - """ + - children.zipWithIndex.map { case (e, i) => - val eval = e.gen(ctx) - eval.code + s""" + ctx.addMutableState("Object[]", values, "") + + val childrenCodes = children.zipWithIndex.map { case (e, i) => + val eval = e.gen(ctx) + eval.code + s""" if (${eval.isNull}) { $values[$i] = null; } else { $values[$i] = ${eval.value}; } """ - }.mkString("\n") + - s"final ${classOf[Row].getName} ${ev.value} = new $rowClass($values, this.$schemaField);" + } + val childrenCode = ctx.splitExpressions(ctx.INPUT_ROW, childrenCodes) + val schemaField = ctx.addReferenceObj("schema", schema) + s""" + boolean ${ev.isNull} = false; + $values = new Object[${children.size}]; + $childrenCode + final ${classOf[Row].getName} ${ev.value} = new $rowClass($values, this.$schemaField); + """ } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index a0b6276ef5..51bdf0f0f2 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -31,7 +31,8 @@ import org.apache.spark.memory.MemoryMode; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; import org.apache.spark.sql.execution.vectorized.ColumnarBatch; -import org.apache.spark.sql.types.*; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; /** * A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the @@ -100,20 +101,6 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.ON_HEAP; /** - * Tries to initialize the reader for this split. Returns true if this reader supports reading - * this split and false otherwise. - */ - public boolean tryInitialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) - throws IOException, InterruptedException { - try { - initialize(inputSplit, taskAttemptContext); - return true; - } catch (UnsupportedOperationException e) { - return false; - } - } - - /** * Implementation of RecordReader API. */ @Override @@ -222,7 +209,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa return columnarBatch; } - /** + /* * Can be called before any rows are returned to enable returning columnar batches directly. */ public void enableReturningBatches() { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 1c9cb79ba4..9259ff4062 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -120,7 +120,7 @@ class SQLContext private[sql]( */ @transient protected[sql] lazy val sessionState: SessionState = new SessionState(self) - protected[sql] def conf: SQLConf = sessionState.conf + protected[spark] def conf: SQLConf = sessionState.conf /** * An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index ab575e90c9..392c48fb7b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -24,13 +24,13 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} -import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.util.toCommentSafeString import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetSource} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation} -import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types.{AtomicType, DataType} object RDDConversions { def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[InternalRow] = { @@ -123,28 +123,30 @@ private[sql] case class PhysicalRDD( } } -/** Physical plan node for scanning data from a relation. */ -private[sql] case class DataSourceScan( - output: Seq[Attribute], - rdd: RDD[InternalRow], - @transient relation: BaseRelation, - override val metadata: Map[String, String] = Map.empty) - extends LeafNode with CodegenSupport { +private[sql] trait DataSourceScan extends LeafNode { + val rdd: RDD[InternalRow] + val relation: BaseRelation override val nodeName: String = relation.toString // Ignore rdd when checking results - override def sameResult(plan: SparkPlan ): Boolean = plan match { + override def sameResult(plan: SparkPlan): Boolean = plan match { case other: DataSourceScan => relation == other.relation && metadata == other.metadata case _ => false } +} - private[sql] override lazy val metrics = if (canProcessBatches()) { - Map("numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"), - "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) - } else { +/** Physical plan node for scanning data from a relation. */ +private[sql] case class RowDataSourceScan( + output: Seq[Attribute], + rdd: RDD[InternalRow], + @transient relation: BaseRelation, + override val outputPartitioning: Partitioning, + override val metadata: Map[String, String] = Map.empty) + extends DataSourceScan with CodegenSupport { + + private[sql] override lazy val metrics = Map("numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) - } val outputUnsafeRows = relation match { case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] => @@ -153,38 +155,6 @@ private[sql] case class DataSourceScan( case _ => false } - override val outputPartitioning = { - val bucketSpec = relation match { - // TODO: this should be closer to bucket planning. - case r: HadoopFsRelation if r.sqlContext.conf.bucketingEnabled => r.bucketSpec - case _ => None - } - - def toAttribute(colName: String): Attribute = output.find(_.name == colName).getOrElse { - throw new AnalysisException(s"bucket column $colName not found in existing columns " + - s"(${output.map(_.name).mkString(", ")})") - } - - bucketSpec.map { spec => - val numBuckets = spec.numBuckets - val bucketColumns = spec.bucketColumnNames.map(toAttribute) - HashPartitioning(bucketColumns, numBuckets) - }.getOrElse { - UnknownPartitioning(0) - } - } - - private def canProcessBatches(): Boolean = { - relation match { - case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] && - SQLContext.getActive().get.conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED) && - SQLContext.getActive().get.conf.getConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED) => - true - case _ => - false - } - } - protected override def doExecute(): RDD[InternalRow] = { val unsafeRow = if (outputUnsafeRows) { rdd @@ -211,6 +181,57 @@ private[sql] case class DataSourceScan( rdd :: Nil } + override protected def doProduce(ctx: CodegenContext): String = { + val numOutputRows = metricTerm(ctx, "numOutputRows") + // PhysicalRDD always just has one input + val input = ctx.freshName("input") + ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];") + val exprRows = output.zipWithIndex.map{ case (a, i) => + new BoundReference(i, a.dataType, a.nullable) + } + val row = ctx.freshName("row") + ctx.INPUT_ROW = row + ctx.currentVars = null + val columnsRowInput = exprRows.map(_.gen(ctx)) + val inputRow = if (outputUnsafeRows) row else null + s""" + |while ($input.hasNext()) { + | InternalRow $row = (InternalRow) $input.next(); + | $numOutputRows.add(1); + | ${consume(ctx, columnsRowInput, inputRow).trim} + | if (shouldStop()) return; + |} + """.stripMargin + } +} + +/** Physical plan node for scanning data from a batched relation. */ +private[sql] case class BatchedDataSourceScan( + output: Seq[Attribute], + rdd: RDD[InternalRow], + @transient relation: BaseRelation, + override val outputPartitioning: Partitioning, + override val metadata: Map[String, String] = Map.empty) + extends DataSourceScan with CodegenSupport { + + private[sql] override lazy val metrics = + Map("numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"), + "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) + + protected override def doExecute(): RDD[InternalRow] = { + throw new UnsupportedOperationException + } + + override def simpleString: String = { + val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value" + val metadataStr = metadataEntries.mkString(" ", ", ", "") + s"BatchedScan $nodeName${output.mkString("[", ",", "]")}$metadataStr" + } + + override def upstreams(): Seq[RDD[InternalRow]] = { + rdd :: Nil + } + private def genCodeColumnVector(ctx: CodegenContext, columnVar: String, ordinal: String, dataType: DataType, nullable: Boolean): ExprCode = { val javaType = ctx.javaType(dataType) @@ -232,113 +253,65 @@ private[sql] case class DataSourceScan( // Support codegen so that we can avoid the UnsafeRow conversion in all cases. Codegen // never requires UnsafeRow as input. override protected def doProduce(ctx: CodegenContext): String = { - val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch" - val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector" val input = ctx.freshName("input") - val idx = ctx.freshName("batchIdx") - val rowidx = ctx.freshName("rowIdx") - val batch = ctx.freshName("batch") // PhysicalRDD always just has one input ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];") + + // metrics + val numOutputRows = metricTerm(ctx, "numOutputRows") + val scanTimeMetric = metricTerm(ctx, "scanTime") + val scanTimeTotalNs = ctx.freshName("scanTime") + ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;") + + val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch" + val batch = ctx.freshName("batch") ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;") + + val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector" + val idx = ctx.freshName("batchIdx") ctx.addMutableState("int", idx, s"$idx = 0;") val colVars = output.indices.map(i => ctx.freshName("colInstance" + i)) val columnAssigns = colVars.zipWithIndex.map { case (name, i) => ctx.addMutableState(columnVectorClz, name, s"$name = null;") - s"$name = ${batch}.column($i);" } - - val row = ctx.freshName("row") - val numOutputRows = metricTerm(ctx, "numOutputRows") + s"$name = $batch.column($i);" + } - // The input RDD can either return (all) ColumnarBatches or InternalRows. We determine this - // by looking at the first value of the RDD and then calling the function which will process - // the remaining. It is faster to return batches. - // TODO: The abstractions between this class and SqlNewHadoopRDD makes it difficult to know - // here which path to use. Fix this. + val nextBatch = ctx.freshName("nextBatch") + ctx.addNewFunction(nextBatch, + s""" + |private void $nextBatch() throws java.io.IOException { + | long getBatchStart = System.nanoTime(); + | if ($input.hasNext()) { + | $batch = ($columnarBatchClz)$input.next(); + | $numOutputRows.add($batch.numRows()); + | $idx = 0; + | ${columnAssigns.mkString("", "\n", "\n")} + | } + | $scanTimeTotalNs += System.nanoTime() - getBatchStart; + |}""".stripMargin) - val exprRows = - output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, x._1.nullable)) - ctx.INPUT_ROW = row ctx.currentVars = null - val columnsRowInput = exprRows.map(_.gen(ctx)) - val inputRow = if (outputUnsafeRows) row else null - val scanRows = ctx.freshName("processRows") - ctx.addNewFunction(scanRows, - s""" - | private void $scanRows(InternalRow $row) throws java.io.IOException { - | boolean firstRow = true; - | while (!shouldStop() && (firstRow || $input.hasNext())) { - | if (firstRow) { - | firstRow = false; - | } else { - | $row = (InternalRow) $input.next(); - | } - | $numOutputRows.add(1); - | ${consume(ctx, columnsRowInput, inputRow).trim} - | } - | }""".stripMargin) - - // Timers for how long we spent inside the scan. We can only maintain this when using batches, - // otherwise the overhead is too high. - if (canProcessBatches()) { - val scanTimeMetric = metricTerm(ctx, "scanTime") - val getBatchStart = ctx.freshName("scanStart") - val scanTimeTotalNs = ctx.freshName("scanTime") - ctx.currentVars = null - val columnsBatchInput = (output zip colVars).map { case (attr, colVar) => - genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) } - val scanBatches = ctx.freshName("processBatches") - ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;") - - ctx.addNewFunction(scanBatches, - s""" - | private void $scanBatches() throws java.io.IOException { - | while (true) { - | int numRows = $batch.numRows(); - | if ($idx == 0) { - | ${columnAssigns.mkString("", "\n", "\n")} - | $numOutputRows.add(numRows); - | } - | - | while (!shouldStop() && $idx < numRows) { - | int $rowidx = $idx++; - | ${consume(ctx, columnsBatchInput).trim} - | } - | if (shouldStop()) return; - | - | long $getBatchStart = System.nanoTime(); - | if (!$input.hasNext()) { - | $batch = null; - | $scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000)); - | break; - | } - | $batch = ($columnarBatchClz)$input.next(); - | $scanTimeTotalNs += System.nanoTime() - $getBatchStart; - | $idx = 0; - | } - | }""".stripMargin) - - val value = ctx.freshName("value") - s""" - | if ($batch != null) { - | $scanBatches(); - | } else if ($input.hasNext()) { - | Object $value = $input.next(); - | if ($value instanceof $columnarBatchClz) { - | $batch = ($columnarBatchClz)$value; - | $scanBatches(); - | } else { - | $scanRows((InternalRow) $value); - | } - | } - """.stripMargin - } else { - s""" - |if ($input.hasNext()) { - | $scanRows((InternalRow) $input.next()); - |} - """.stripMargin + val rowidx = ctx.freshName("rowIdx") + val columnsBatchInput = (output zip colVars).map { case (attr, colVar) => + genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) } + s""" + |if ($batch == null) { + | $nextBatch(); + |} + |while ($batch != null) { + | int numRows = $batch.numRows(); + | while ($idx < numRows) { + | int $rowidx = $idx++; + | ${consume(ctx, columnsBatchInput).trim} + | if (shouldStop()) return; + | } + | $batch = null; + | $nextBatch(); + |} + |$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000)); + |$scanTimeTotalNs = 0; + """.stripMargin } } @@ -346,4 +319,38 @@ private[sql] object DataSourceScan { // Metadata keys val INPUT_PATHS = "InputPaths" val PUSHED_FILTERS = "PushedFilters" + + def create( + output: Seq[Attribute], + rdd: RDD[InternalRow], + relation: BaseRelation, + metadata: Map[String, String] = Map.empty): DataSourceScan = { + val outputPartitioning = { + val bucketSpec = relation match { + // TODO: this should be closer to bucket planning. + case r: HadoopFsRelation if r.sqlContext.conf.bucketingEnabled => r.bucketSpec + case _ => None + } + + def toAttribute(colName: String): Attribute = output.find(_.name == colName).getOrElse { + throw new AnalysisException(s"bucket column $colName not found in existing columns " + + s"(${output.map(_.name).mkString(", ")})") + } + + bucketSpec.map { spec => + val numBuckets = spec.numBuckets + val bucketColumns = spec.bucketColumnNames.map(toAttribute) + HashPartitioning(bucketColumns, numBuckets) + }.getOrElse { + UnknownPartitioning(0) + } + } + + relation match { + case r: HadoopFsRelation if r.fileFormat.supportBatch(r.sqlContext, relation.schema) => + BatchedDataSourceScan(output, rdd, relation, outputPartitioning, metadata) + case _ => + RowDataSourceScan(output, rdd, relation, outputPartitioning, metadata) + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala index 4e75a3a794..98129d6c52 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.aggregate.TungstenAggregate import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, SortMergeJoin} import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetrics} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ /** * An interface for those physical operators that support codegen. @@ -433,12 +434,20 @@ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] { case _ => true } + private def numOfNestedFields(dataType: DataType): Int = dataType match { + case dt: StructType => dt.fields.map(f => numOfNestedFields(f.dataType)).sum + case m: MapType => numOfNestedFields(m.keyType) + numOfNestedFields(m.valueType) + case a: ArrayType => numOfNestedFields(a.elementType) + case u: UserDefinedType[_] => numOfNestedFields(u.sqlType) + case _ => 1 + } + private def supportCodegen(plan: SparkPlan): Boolean = plan match { case plan: CodegenSupport if plan.supportCodegen => val willFallback = plan.expressions.exists(_.find(e => !supportCodegen(e)).isDefined) // the generated code will be huge if there are too many columns - val haveManyColumns = plan.output.length > 200 - !willFallback && !haveManyColumns + val haveTooManyFields = numOfNestedFields(plan.schema) > conf.wholeStageMaxNumFields + !willFallback && !haveTooManyFields case _ => false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 52c8f3ef0b..8c183317f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -238,7 +238,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { } case l @ LogicalRelation(baseRelation: TableScan, _, _) => - execution.DataSourceScan( + execution.DataSourceScan.create( l.output, toCatalystRDD(l, baseRelation.buildScan()), baseRelation) :: Nil case i @ logical.InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _, _), @@ -610,7 +610,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { // Don't request columns that are only referenced by pushed filters. .filterNot(handledSet.contains) - val scan = execution.DataSourceScan( + val scan = execution.DataSourceScan.create( projects.map(_.toAttribute), scanBuilder(requestedColumns, candidatePredicates, pushedFilters), relation.relation, metadata) @@ -620,7 +620,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val requestedColumns = (projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq - val scan = execution.DataSourceScan( + val scan = execution.DataSourceScan.create( requestedColumns, scanBuilder(requestedColumns, candidatePredicates, pushedFilters), relation.relation, metadata) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 618d5a522b..aa1f76450c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -181,7 +181,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { } val scan = - DataSourceScan( + DataSourceScan.create( readDataColumns ++ partitionColumns, new FileScanRDD( files.sqlContext, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala index 159fdc99dd..6ddb218a22 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala @@ -97,13 +97,6 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag]( @transient protected val jobId = new JobID(jobTrackerId, id) - // If true, enable using the custom RecordReader for parquet. This only works for - // a subset of the types (no complex types). - protected val enableVectorizedParquetReader: Boolean = - sqlContext.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key).toBoolean - protected val enableWholestageCodegen: Boolean = - sqlContext.getConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key).toBoolean - override def getPartitions: Array[SparkPartition] = { val conf = getConf(isDriverSide = true) val inputFormat = inputFormatClass.newInstance @@ -165,32 +158,9 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag]( } val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0) val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) - private[this] var reader: RecordReader[Void, V] = null - - /** - * If the format is ParquetInputFormat, try to create the optimized RecordReader. If this - * fails (for example, unsupported schema), try with the normal reader. - * TODO: plumb this through a different way? - */ - if (enableVectorizedParquetReader && - format.getClass.getName == "org.apache.parquet.hadoop.ParquetInputFormat") { - val parquetReader: VectorizedParquetRecordReader = new VectorizedParquetRecordReader() - if (!parquetReader.tryInitialize( - split.serializableHadoopSplit.value, hadoopAttemptContext)) { - parquetReader.close() - } else { - reader = parquetReader.asInstanceOf[RecordReader[Void, V]] - parquetReader.resultBatch() - // Whole stage codegen (PhysicalRDD) is able to deal with batches directly - if (enableWholestageCodegen) parquetReader.enableReturningBatches() - } - } - - if (reader == null) { - reader = format.createRecordReader( + private[this] var reader: RecordReader[Void, V] = format.createRecordReader( split.serializableHadoopSplit.value, hadoopAttemptContext) - reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) - } + reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) // Register an on-task-completion callback to close the input stream. context.addTaskCompletionListener(context => close()) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 5b58fa1fc5..a2fd8da782 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -24,7 +24,6 @@ import java.util.logging.{Logger => JLogger} import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.{Failure, Try} -import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} @@ -53,7 +52,7 @@ import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.types.{AtomicType, DataType, StructType} import org.apache.spark.util.{SerializableConfiguration, Utils} import org.apache.spark.util.collection.BitSet @@ -276,6 +275,16 @@ private[sql] class DefaultSource file.getName == ParquetFileWriter.PARQUET_METADATA_FILE } + /** + * Returns whether the reader will the rows as batch or not. + */ + override def supportBatch(sqlContext: SQLContext, schema: StructType): Boolean = { + val conf = SQLContext.getActive().get.conf + conf.useFileScan && conf.parquetVectorizedReaderEnabled && + conf.wholeStageEnabled && schema.length <= conf.wholeStageMaxNumFields && + schema.forall(_.dataType.isInstanceOf[AtomicType]) + } + override def buildReader( sqlContext: SQLContext, dataSchema: StructType, @@ -306,6 +315,10 @@ private[sql] class DefaultSource SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, sqlContext.conf.getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP)) + // Whole stage codegen (PhysicalRDD) is able to deal with batches directly + val returningBatch = + supportBatch(sqlContext, StructType(partitionSchema.fields ++ dataSchema.fields)) + // Try to push down filters when filter push-down is enabled. val pushed = if (sqlContext.getConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key).toBoolean) { filters @@ -324,10 +337,8 @@ private[sql] class DefaultSource // TODO: if you move this into the closure it reverts to the default values. // If true, enable using the custom RecordReader for parquet. This only works for // a subset of the types (no complex types). - val enableVectorizedParquetReader: Boolean = - sqlContext.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key).toBoolean - val enableWholestageCodegen: Boolean = - sqlContext.getConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key).toBoolean + val enableVectorizedParquetReader: Boolean = sqlContext.conf.parquetVectorizedReaderEnabled && + dataSchema.forall(_.dataType.isInstanceOf[AtomicType]) (file: PartitionedFile) => { assert(file.partitionValues.numFields == partitionSchema.size) @@ -347,32 +358,27 @@ private[sql] class DefaultSource val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = new TaskAttemptContextImpl(broadcastedConf.value.value, attemptId) - val parquetReader = try { - if (!enableVectorizedParquetReader) sys.error("Vectorized reader turned off.") + val parquetReader = if (enableVectorizedParquetReader) { val vectorizedReader = new VectorizedParquetRecordReader() vectorizedReader.initialize(split, hadoopAttemptContext) logDebug(s"Appending $partitionSchema ${file.partitionValues}") vectorizedReader.initBatch(partitionSchema, file.partitionValues) - // Whole stage codegen (PhysicalRDD) is able to deal with batches directly - // TODO: fix column appending - if (enableWholestageCodegen) { - logDebug(s"Enabling batch returning") + if (returningBatch) { vectorizedReader.enableReturningBatches() } vectorizedReader - } catch { - case NonFatal(e) => - logDebug(s"Falling back to parquet-mr: $e", e) - val reader = pushed match { - case Some(filter) => - new ParquetRecordReader[InternalRow]( - new CatalystReadSupport, - FilterCompat.get(filter, null)) - case _ => - new ParquetRecordReader[InternalRow](new CatalystReadSupport) - } - reader.initialize(split, hadoopAttemptContext) - reader + } else { + logDebug(s"Falling back to parquet-mr") + val reader = pushed match { + case Some(filter) => + new ParquetRecordReader[InternalRow]( + new CatalystReadSupport, + FilterCompat.get(filter, null)) + case _ => + new ParquetRecordReader[InternalRow](new CatalystReadSupport) + } + reader.initialize(split, hadoopAttemptContext) + reader } val iter = new RecordReaderIterator(parquetReader) @@ -432,13 +438,21 @@ private[sql] class DefaultSource val setInputPaths = ParquetRelation.initializeDriverSideJobFunc(inputFiles, parquetBlockSize) _ + val allPrimitiveTypes = dataSchema.forall(_.dataType.isInstanceOf[AtomicType]) + val inputFormatCls = if (sqlContext.conf.parquetVectorizedReaderEnabled + && allPrimitiveTypes) { + classOf[VectorizedParquetInputFormat] + } else { + classOf[ParquetInputFormat[InternalRow]] + } + Utils.withDummyCallSite(sqlContext.sparkContext) { new SqlNewHadoopRDD( sqlContext = sqlContext, broadcastedConf = broadcastedConf, initDriverSideJobFuncOpt = Some(setInputPaths), initLocalJobFuncOpt = Some(initLocalJobFuncOpt), - inputFormatClass = classOf[ParquetInputFormat[InternalRow]], + inputFormatClass = inputFormatCls, valueClass = classOf[InternalRow]) { val cacheMetadata = useMetadataCache @@ -481,6 +495,17 @@ private[sql] class DefaultSource } } +/** + * The ParquetInputFormat that create VectorizedParquetRecordReader. + */ +final class VectorizedParquetInputFormat extends ParquetInputFormat[InternalRow] { + override def createRecordReader( + inputSplit: InputSplit, + taskAttemptContext: TaskAttemptContext): ParquetRecordReader[InternalRow] = { + new VectorizedParquetRecordReader().asInstanceOf[ParquetRecordReader[InternalRow]] + } +} + // NOTE: This class is instantiated and used on executor side only, no need to be serializable. private[sql] class ParquetOutputWriter( path: String, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 927af89949..dc6ba1bcfb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -396,6 +396,13 @@ object SQLConf { .booleanConf .createWithDefault(true) + val WHOLESTAGE_MAX_NUM_FIELDS = SQLConfigBuilder("spark.sql.codegen.maxFields") + .internal() + .doc("The maximum number of fields (including nested fields) that will be supported before" + + " deactivating whole-stage codegen.") + .intConf + .createWithDefault(200) + val FILES_MAX_PARTITION_BYTES = SQLConfigBuilder("spark.sql.files.maxPartitionBytes") .doc("The maximum number of bytes to pack into a single partition when reading files.") .longConf @@ -480,6 +487,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def parquetCacheMetadata: Boolean = getConf(PARQUET_CACHE_METADATA) + def parquetVectorizedReaderEnabled: Boolean = getConf(PARQUET_VECTORIZED_READER_ENABLED) + def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE) def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS) @@ -504,6 +513,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def wholeStageEnabled: Boolean = getConf(WHOLESTAGE_CODEGEN_ENABLED) + def wholeStageMaxNumFields: Int = getConf(WHOLESTAGE_MAX_NUM_FIELDS) + def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED) def canonicalView: Boolean = getConf(CANONICAL_NATIVE_VIEW) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 14e14710f6..6acb41dd1f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -469,6 +469,15 @@ trait FileFormat { options: Map[String, String]): RDD[InternalRow] /** + * Returns whether this format support returning columnar batch or not. + * + * TODO: we should just have different traits for the different formats. + */ + def supportBatch(sqlContext: SQLContext, dataSchema: StructType): Boolean = { + false + } + + /** * Returns a function that can be used to read a single file in as an Iterator of InternalRow. * * @param dataSchema The global data schema. It can be either specified by the user, or diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 4446a6881c..41f536fc37 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -279,7 +279,8 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi /** Plans the query and calls the provided validation function with the planned partitioning. */ def checkScan(df: DataFrame)(func: Seq[FilePartition] => Unit): Unit = { val fileScan = df.queryExecution.executedPlan.collect { - case DataSourceScan(_, scan: FileScanRDD, _, _) => scan + case scan: DataSourceScan if scan.rdd.isInstanceOf[FileScanRDD] => + scan.rdd.asInstanceOf[FileScanRDD] }.headOption.getOrElse { fail(s"No FileScan in query\n${df.queryExecution}") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 2f806ebba6..7d206e7bc4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -579,6 +579,16 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext assert(CatalystReadSupport.expandUDT(schema) === expected) } + + test("read/write wide table") { + withTempPath { dir => + val path = dir.getCanonicalPath + + val df = sqlContext.range(1000).select(Seq.tabulate(1000) {i => ('id + i).as(s"c$i")} : _*) + df.write.mode(SaveMode.Overwrite).parquet(path) + checkAnswer(sqlContext.read.parquet(path), df) + } + } } object TestingUDT { |