diff options
13 files changed, 267 insertions, 234 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala index a0490e1351..28b6b2adf8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala @@ -524,22 +524,26 @@ case class CreateExternalRow(children: Seq[Expression], schema: StructType) override def genCode(ctx: CodegenContext, ev: ExprCode): String = { val rowClass = classOf[GenericRowWithSchema].getName val values = ctx.freshName("values") - val schemaField = ctx.addReferenceObj("schema", schema) - s""" - boolean ${ev.isNull} = false; - final Object[] $values = new Object[${children.size}]; - """ + - children.zipWithIndex.map { case (e, i) => - val eval = e.gen(ctx) - eval.code + s""" + ctx.addMutableState("Object[]", values, "") + + val childrenCodes = children.zipWithIndex.map { case (e, i) => + val eval = e.gen(ctx) + eval.code + s""" if (${eval.isNull}) { $values[$i] = null; } else { $values[$i] = ${eval.value}; } """ - }.mkString("\n") + - s"final ${classOf[Row].getName} ${ev.value} = new $rowClass($values, this.$schemaField);" + } + val childrenCode = ctx.splitExpressions(ctx.INPUT_ROW, childrenCodes) + val schemaField = ctx.addReferenceObj("schema", schema) + s""" + boolean ${ev.isNull} = false; + $values = new Object[${children.size}]; + $childrenCode + final ${classOf[Row].getName} ${ev.value} = new $rowClass($values, this.$schemaField); + """ } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java index a0b6276ef5..51bdf0f0f2 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java @@ -31,7 +31,8 @@ import org.apache.spark.memory.MemoryMode; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils; import org.apache.spark.sql.execution.vectorized.ColumnarBatch; -import org.apache.spark.sql.types.*; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; /** * A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the @@ -100,20 +101,6 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.ON_HEAP; /** - * Tries to initialize the reader for this split. Returns true if this reader supports reading - * this split and false otherwise. - */ - public boolean tryInitialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) - throws IOException, InterruptedException { - try { - initialize(inputSplit, taskAttemptContext); - return true; - } catch (UnsupportedOperationException e) { - return false; - } - } - - /** * Implementation of RecordReader API. */ @Override @@ -222,7 +209,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa return columnarBatch; } - /** + /* * Can be called before any rows are returned to enable returning columnar batches directly. */ public void enableReturningBatches() { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 1c9cb79ba4..9259ff4062 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -120,7 +120,7 @@ class SQLContext private[sql]( */ @transient protected[sql] lazy val sessionState: SessionState = new SessionState(self) - protected[sql] def conf: SQLConf = sessionState.conf + protected[spark] def conf: SQLConf = sessionState.conf /** * An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index ab575e90c9..392c48fb7b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -24,13 +24,13 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics} -import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning} import org.apache.spark.sql.catalyst.util.toCommentSafeString import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetSource} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation} -import org.apache.spark.sql.types.DataType +import org.apache.spark.sql.types.{AtomicType, DataType} object RDDConversions { def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[InternalRow] = { @@ -123,28 +123,30 @@ private[sql] case class PhysicalRDD( } } -/** Physical plan node for scanning data from a relation. */ -private[sql] case class DataSourceScan( - output: Seq[Attribute], - rdd: RDD[InternalRow], - @transient relation: BaseRelation, - override val metadata: Map[String, String] = Map.empty) - extends LeafNode with CodegenSupport { +private[sql] trait DataSourceScan extends LeafNode { + val rdd: RDD[InternalRow] + val relation: BaseRelation override val nodeName: String = relation.toString // Ignore rdd when checking results - override def sameResult(plan: SparkPlan ): Boolean = plan match { + override def sameResult(plan: SparkPlan): Boolean = plan match { case other: DataSourceScan => relation == other.relation && metadata == other.metadata case _ => false } +} - private[sql] override lazy val metrics = if (canProcessBatches()) { - Map("numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"), - "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) - } else { +/** Physical plan node for scanning data from a relation. */ +private[sql] case class RowDataSourceScan( + output: Seq[Attribute], + rdd: RDD[InternalRow], + @transient relation: BaseRelation, + override val outputPartitioning: Partitioning, + override val metadata: Map[String, String] = Map.empty) + extends DataSourceScan with CodegenSupport { + + private[sql] override lazy val metrics = Map("numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows")) - } val outputUnsafeRows = relation match { case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] => @@ -153,38 +155,6 @@ private[sql] case class DataSourceScan( case _ => false } - override val outputPartitioning = { - val bucketSpec = relation match { - // TODO: this should be closer to bucket planning. - case r: HadoopFsRelation if r.sqlContext.conf.bucketingEnabled => r.bucketSpec - case _ => None - } - - def toAttribute(colName: String): Attribute = output.find(_.name == colName).getOrElse { - throw new AnalysisException(s"bucket column $colName not found in existing columns " + - s"(${output.map(_.name).mkString(", ")})") - } - - bucketSpec.map { spec => - val numBuckets = spec.numBuckets - val bucketColumns = spec.bucketColumnNames.map(toAttribute) - HashPartitioning(bucketColumns, numBuckets) - }.getOrElse { - UnknownPartitioning(0) - } - } - - private def canProcessBatches(): Boolean = { - relation match { - case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] && - SQLContext.getActive().get.conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED) && - SQLContext.getActive().get.conf.getConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED) => - true - case _ => - false - } - } - protected override def doExecute(): RDD[InternalRow] = { val unsafeRow = if (outputUnsafeRows) { rdd @@ -211,6 +181,57 @@ private[sql] case class DataSourceScan( rdd :: Nil } + override protected def doProduce(ctx: CodegenContext): String = { + val numOutputRows = metricTerm(ctx, "numOutputRows") + // PhysicalRDD always just has one input + val input = ctx.freshName("input") + ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];") + val exprRows = output.zipWithIndex.map{ case (a, i) => + new BoundReference(i, a.dataType, a.nullable) + } + val row = ctx.freshName("row") + ctx.INPUT_ROW = row + ctx.currentVars = null + val columnsRowInput = exprRows.map(_.gen(ctx)) + val inputRow = if (outputUnsafeRows) row else null + s""" + |while ($input.hasNext()) { + | InternalRow $row = (InternalRow) $input.next(); + | $numOutputRows.add(1); + | ${consume(ctx, columnsRowInput, inputRow).trim} + | if (shouldStop()) return; + |} + """.stripMargin + } +} + +/** Physical plan node for scanning data from a batched relation. */ +private[sql] case class BatchedDataSourceScan( + output: Seq[Attribute], + rdd: RDD[InternalRow], + @transient relation: BaseRelation, + override val outputPartitioning: Partitioning, + override val metadata: Map[String, String] = Map.empty) + extends DataSourceScan with CodegenSupport { + + private[sql] override lazy val metrics = + Map("numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"), + "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time")) + + protected override def doExecute(): RDD[InternalRow] = { + throw new UnsupportedOperationException + } + + override def simpleString: String = { + val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value" + val metadataStr = metadataEntries.mkString(" ", ", ", "") + s"BatchedScan $nodeName${output.mkString("[", ",", "]")}$metadataStr" + } + + override def upstreams(): Seq[RDD[InternalRow]] = { + rdd :: Nil + } + private def genCodeColumnVector(ctx: CodegenContext, columnVar: String, ordinal: String, dataType: DataType, nullable: Boolean): ExprCode = { val javaType = ctx.javaType(dataType) @@ -232,113 +253,65 @@ private[sql] case class DataSourceScan( // Support codegen so that we can avoid the UnsafeRow conversion in all cases. Codegen // never requires UnsafeRow as input. override protected def doProduce(ctx: CodegenContext): String = { - val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch" - val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector" val input = ctx.freshName("input") - val idx = ctx.freshName("batchIdx") - val rowidx = ctx.freshName("rowIdx") - val batch = ctx.freshName("batch") // PhysicalRDD always just has one input ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];") + + // metrics + val numOutputRows = metricTerm(ctx, "numOutputRows") + val scanTimeMetric = metricTerm(ctx, "scanTime") + val scanTimeTotalNs = ctx.freshName("scanTime") + ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;") + + val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch" + val batch = ctx.freshName("batch") ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;") + + val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector" + val idx = ctx.freshName("batchIdx") ctx.addMutableState("int", idx, s"$idx = 0;") val colVars = output.indices.map(i => ctx.freshName("colInstance" + i)) val columnAssigns = colVars.zipWithIndex.map { case (name, i) => ctx.addMutableState(columnVectorClz, name, s"$name = null;") - s"$name = ${batch}.column($i);" } - - val row = ctx.freshName("row") - val numOutputRows = metricTerm(ctx, "numOutputRows") + s"$name = $batch.column($i);" + } - // The input RDD can either return (all) ColumnarBatches or InternalRows. We determine this - // by looking at the first value of the RDD and then calling the function which will process - // the remaining. It is faster to return batches. - // TODO: The abstractions between this class and SqlNewHadoopRDD makes it difficult to know - // here which path to use. Fix this. + val nextBatch = ctx.freshName("nextBatch") + ctx.addNewFunction(nextBatch, + s""" + |private void $nextBatch() throws java.io.IOException { + | long getBatchStart = System.nanoTime(); + | if ($input.hasNext()) { + | $batch = ($columnarBatchClz)$input.next(); + | $numOutputRows.add($batch.numRows()); + | $idx = 0; + | ${columnAssigns.mkString("", "\n", "\n")} + | } + | $scanTimeTotalNs += System.nanoTime() - getBatchStart; + |}""".stripMargin) - val exprRows = - output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, x._1.nullable)) - ctx.INPUT_ROW = row ctx.currentVars = null - val columnsRowInput = exprRows.map(_.gen(ctx)) - val inputRow = if (outputUnsafeRows) row else null - val scanRows = ctx.freshName("processRows") - ctx.addNewFunction(scanRows, - s""" - | private void $scanRows(InternalRow $row) throws java.io.IOException { - | boolean firstRow = true; - | while (!shouldStop() && (firstRow || $input.hasNext())) { - | if (firstRow) { - | firstRow = false; - | } else { - | $row = (InternalRow) $input.next(); - | } - | $numOutputRows.add(1); - | ${consume(ctx, columnsRowInput, inputRow).trim} - | } - | }""".stripMargin) - - // Timers for how long we spent inside the scan. We can only maintain this when using batches, - // otherwise the overhead is too high. - if (canProcessBatches()) { - val scanTimeMetric = metricTerm(ctx, "scanTime") - val getBatchStart = ctx.freshName("scanStart") - val scanTimeTotalNs = ctx.freshName("scanTime") - ctx.currentVars = null - val columnsBatchInput = (output zip colVars).map { case (attr, colVar) => - genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) } - val scanBatches = ctx.freshName("processBatches") - ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;") - - ctx.addNewFunction(scanBatches, - s""" - | private void $scanBatches() throws java.io.IOException { - | while (true) { - | int numRows = $batch.numRows(); - | if ($idx == 0) { - | ${columnAssigns.mkString("", "\n", "\n")} - | $numOutputRows.add(numRows); - | } - | - | while (!shouldStop() && $idx < numRows) { - | int $rowidx = $idx++; - | ${consume(ctx, columnsBatchInput).trim} - | } - | if (shouldStop()) return; - | - | long $getBatchStart = System.nanoTime(); - | if (!$input.hasNext()) { - | $batch = null; - | $scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000)); - | break; - | } - | $batch = ($columnarBatchClz)$input.next(); - | $scanTimeTotalNs += System.nanoTime() - $getBatchStart; - | $idx = 0; - | } - | }""".stripMargin) - - val value = ctx.freshName("value") - s""" - | if ($batch != null) { - | $scanBatches(); - | } else if ($input.hasNext()) { - | Object $value = $input.next(); - | if ($value instanceof $columnarBatchClz) { - | $batch = ($columnarBatchClz)$value; - | $scanBatches(); - | } else { - | $scanRows((InternalRow) $value); - | } - | } - """.stripMargin - } else { - s""" - |if ($input.hasNext()) { - | $scanRows((InternalRow) $input.next()); - |} - """.stripMargin + val rowidx = ctx.freshName("rowIdx") + val columnsBatchInput = (output zip colVars).map { case (attr, colVar) => + genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) } + s""" + |if ($batch == null) { + | $nextBatch(); + |} + |while ($batch != null) { + | int numRows = $batch.numRows(); + | while ($idx < numRows) { + | int $rowidx = $idx++; + | ${consume(ctx, columnsBatchInput).trim} + | if (shouldStop()) return; + | } + | $batch = null; + | $nextBatch(); + |} + |$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000)); + |$scanTimeTotalNs = 0; + """.stripMargin } } @@ -346,4 +319,38 @@ private[sql] object DataSourceScan { // Metadata keys val INPUT_PATHS = "InputPaths" val PUSHED_FILTERS = "PushedFilters" + + def create( + output: Seq[Attribute], + rdd: RDD[InternalRow], + relation: BaseRelation, + metadata: Map[String, String] = Map.empty): DataSourceScan = { + val outputPartitioning = { + val bucketSpec = relation match { + // TODO: this should be closer to bucket planning. + case r: HadoopFsRelation if r.sqlContext.conf.bucketingEnabled => r.bucketSpec + case _ => None + } + + def toAttribute(colName: String): Attribute = output.find(_.name == colName).getOrElse { + throw new AnalysisException(s"bucket column $colName not found in existing columns " + + s"(${output.map(_.name).mkString(", ")})") + } + + bucketSpec.map { spec => + val numBuckets = spec.numBuckets + val bucketColumns = spec.bucketColumnNames.map(toAttribute) + HashPartitioning(bucketColumns, numBuckets) + }.getOrElse { + UnknownPartitioning(0) + } + } + + relation match { + case r: HadoopFsRelation if r.fileFormat.supportBatch(r.sqlContext, relation.schema) => + BatchedDataSourceScan(output, rdd, relation, outputPartitioning, metadata) + case _ => + RowDataSourceScan(output, rdd, relation, outputPartitioning, metadata) + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala index 4e75a3a794..98129d6c52 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala @@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.aggregate.TungstenAggregate import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, SortMergeJoin} import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetrics} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types._ /** * An interface for those physical operators that support codegen. @@ -433,12 +434,20 @@ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] { case _ => true } + private def numOfNestedFields(dataType: DataType): Int = dataType match { + case dt: StructType => dt.fields.map(f => numOfNestedFields(f.dataType)).sum + case m: MapType => numOfNestedFields(m.keyType) + numOfNestedFields(m.valueType) + case a: ArrayType => numOfNestedFields(a.elementType) + case u: UserDefinedType[_] => numOfNestedFields(u.sqlType) + case _ => 1 + } + private def supportCodegen(plan: SparkPlan): Boolean = plan match { case plan: CodegenSupport if plan.supportCodegen => val willFallback = plan.expressions.exists(_.find(e => !supportCodegen(e)).isDefined) // the generated code will be huge if there are too many columns - val haveManyColumns = plan.output.length > 200 - !willFallback && !haveManyColumns + val haveTooManyFields = numOfNestedFields(plan.schema) > conf.wholeStageMaxNumFields + !willFallback && !haveTooManyFields case _ => false } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 52c8f3ef0b..8c183317f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -238,7 +238,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { } case l @ LogicalRelation(baseRelation: TableScan, _, _) => - execution.DataSourceScan( + execution.DataSourceScan.create( l.output, toCatalystRDD(l, baseRelation.buildScan()), baseRelation) :: Nil case i @ logical.InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _, _), @@ -610,7 +610,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { // Don't request columns that are only referenced by pushed filters. .filterNot(handledSet.contains) - val scan = execution.DataSourceScan( + val scan = execution.DataSourceScan.create( projects.map(_.toAttribute), scanBuilder(requestedColumns, candidatePredicates, pushedFilters), relation.relation, metadata) @@ -620,7 +620,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { val requestedColumns = (projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq - val scan = execution.DataSourceScan( + val scan = execution.DataSourceScan.create( requestedColumns, scanBuilder(requestedColumns, candidatePredicates, pushedFilters), relation.relation, metadata) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 618d5a522b..aa1f76450c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -181,7 +181,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging { } val scan = - DataSourceScan( + DataSourceScan.create( readDataColumns ++ partitionColumns, new FileScanRDD( files.sqlContext, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala index 159fdc99dd..6ddb218a22 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala @@ -97,13 +97,6 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag]( @transient protected val jobId = new JobID(jobTrackerId, id) - // If true, enable using the custom RecordReader for parquet. This only works for - // a subset of the types (no complex types). - protected val enableVectorizedParquetReader: Boolean = - sqlContext.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key).toBoolean - protected val enableWholestageCodegen: Boolean = - sqlContext.getConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key).toBoolean - override def getPartitions: Array[SparkPartition] = { val conf = getConf(isDriverSide = true) val inputFormat = inputFormatClass.newInstance @@ -165,32 +158,9 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag]( } val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0) val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId) - private[this] var reader: RecordReader[Void, V] = null - - /** - * If the format is ParquetInputFormat, try to create the optimized RecordReader. If this - * fails (for example, unsupported schema), try with the normal reader. - * TODO: plumb this through a different way? - */ - if (enableVectorizedParquetReader && - format.getClass.getName == "org.apache.parquet.hadoop.ParquetInputFormat") { - val parquetReader: VectorizedParquetRecordReader = new VectorizedParquetRecordReader() - if (!parquetReader.tryInitialize( - split.serializableHadoopSplit.value, hadoopAttemptContext)) { - parquetReader.close() - } else { - reader = parquetReader.asInstanceOf[RecordReader[Void, V]] - parquetReader.resultBatch() - // Whole stage codegen (PhysicalRDD) is able to deal with batches directly - if (enableWholestageCodegen) parquetReader.enableReturningBatches() - } - } - - if (reader == null) { - reader = format.createRecordReader( + private[this] var reader: RecordReader[Void, V] = format.createRecordReader( split.serializableHadoopSplit.value, hadoopAttemptContext) - reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) - } + reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext) // Register an on-task-completion callback to close the input stream. context.addTaskCompletionListener(context => close()) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 5b58fa1fc5..a2fd8da782 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -24,7 +24,6 @@ import java.util.logging.{Logger => JLogger} import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.{Failure, Try} -import scala.util.control.NonFatal import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, Path} @@ -53,7 +52,7 @@ import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ -import org.apache.spark.sql.types.{DataType, StructType} +import org.apache.spark.sql.types.{AtomicType, DataType, StructType} import org.apache.spark.util.{SerializableConfiguration, Utils} import org.apache.spark.util.collection.BitSet @@ -276,6 +275,16 @@ private[sql] class DefaultSource file.getName == ParquetFileWriter.PARQUET_METADATA_FILE } + /** + * Returns whether the reader will the rows as batch or not. + */ + override def supportBatch(sqlContext: SQLContext, schema: StructType): Boolean = { + val conf = SQLContext.getActive().get.conf + conf.useFileScan && conf.parquetVectorizedReaderEnabled && + conf.wholeStageEnabled && schema.length <= conf.wholeStageMaxNumFields && + schema.forall(_.dataType.isInstanceOf[AtomicType]) + } + override def buildReader( sqlContext: SQLContext, dataSchema: StructType, @@ -306,6 +315,10 @@ private[sql] class DefaultSource SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, sqlContext.conf.getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP)) + // Whole stage codegen (PhysicalRDD) is able to deal with batches directly + val returningBatch = + supportBatch(sqlContext, StructType(partitionSchema.fields ++ dataSchema.fields)) + // Try to push down filters when filter push-down is enabled. val pushed = if (sqlContext.getConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key).toBoolean) { filters @@ -324,10 +337,8 @@ private[sql] class DefaultSource // TODO: if you move this into the closure it reverts to the default values. // If true, enable using the custom RecordReader for parquet. This only works for // a subset of the types (no complex types). - val enableVectorizedParquetReader: Boolean = - sqlContext.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key).toBoolean - val enableWholestageCodegen: Boolean = - sqlContext.getConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key).toBoolean + val enableVectorizedParquetReader: Boolean = sqlContext.conf.parquetVectorizedReaderEnabled && + dataSchema.forall(_.dataType.isInstanceOf[AtomicType]) (file: PartitionedFile) => { assert(file.partitionValues.numFields == partitionSchema.size) @@ -347,32 +358,27 @@ private[sql] class DefaultSource val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = new TaskAttemptContextImpl(broadcastedConf.value.value, attemptId) - val parquetReader = try { - if (!enableVectorizedParquetReader) sys.error("Vectorized reader turned off.") + val parquetReader = if (enableVectorizedParquetReader) { val vectorizedReader = new VectorizedParquetRecordReader() vectorizedReader.initialize(split, hadoopAttemptContext) logDebug(s"Appending $partitionSchema ${file.partitionValues}") vectorizedReader.initBatch(partitionSchema, file.partitionValues) - // Whole stage codegen (PhysicalRDD) is able to deal with batches directly - // TODO: fix column appending - if (enableWholestageCodegen) { - logDebug(s"Enabling batch returning") + if (returningBatch) { vectorizedReader.enableReturningBatches() } vectorizedReader - } catch { - case NonFatal(e) => - logDebug(s"Falling back to parquet-mr: $e", e) - val reader = pushed match { - case Some(filter) => - new ParquetRecordReader[InternalRow]( - new CatalystReadSupport, - FilterCompat.get(filter, null)) - case _ => - new ParquetRecordReader[InternalRow](new CatalystReadSupport) - } - reader.initialize(split, hadoopAttemptContext) - reader + } else { + logDebug(s"Falling back to parquet-mr") + val reader = pushed match { + case Some(filter) => + new ParquetRecordReader[InternalRow]( + new CatalystReadSupport, + FilterCompat.get(filter, null)) + case _ => + new ParquetRecordReader[InternalRow](new CatalystReadSupport) + } + reader.initialize(split, hadoopAttemptContext) + reader } val iter = new RecordReaderIterator(parquetReader) @@ -432,13 +438,21 @@ private[sql] class DefaultSource val setInputPaths = ParquetRelation.initializeDriverSideJobFunc(inputFiles, parquetBlockSize) _ + val allPrimitiveTypes = dataSchema.forall(_.dataType.isInstanceOf[AtomicType]) + val inputFormatCls = if (sqlContext.conf.parquetVectorizedReaderEnabled + && allPrimitiveTypes) { + classOf[VectorizedParquetInputFormat] + } else { + classOf[ParquetInputFormat[InternalRow]] + } + Utils.withDummyCallSite(sqlContext.sparkContext) { new SqlNewHadoopRDD( sqlContext = sqlContext, broadcastedConf = broadcastedConf, initDriverSideJobFuncOpt = Some(setInputPaths), initLocalJobFuncOpt = Some(initLocalJobFuncOpt), - inputFormatClass = classOf[ParquetInputFormat[InternalRow]], + inputFormatClass = inputFormatCls, valueClass = classOf[InternalRow]) { val cacheMetadata = useMetadataCache @@ -481,6 +495,17 @@ private[sql] class DefaultSource } } +/** + * The ParquetInputFormat that create VectorizedParquetRecordReader. + */ +final class VectorizedParquetInputFormat extends ParquetInputFormat[InternalRow] { + override def createRecordReader( + inputSplit: InputSplit, + taskAttemptContext: TaskAttemptContext): ParquetRecordReader[InternalRow] = { + new VectorizedParquetRecordReader().asInstanceOf[ParquetRecordReader[InternalRow]] + } +} + // NOTE: This class is instantiated and used on executor side only, no need to be serializable. private[sql] class ParquetOutputWriter( path: String, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 927af89949..dc6ba1bcfb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -396,6 +396,13 @@ object SQLConf { .booleanConf .createWithDefault(true) + val WHOLESTAGE_MAX_NUM_FIELDS = SQLConfigBuilder("spark.sql.codegen.maxFields") + .internal() + .doc("The maximum number of fields (including nested fields) that will be supported before" + + " deactivating whole-stage codegen.") + .intConf + .createWithDefault(200) + val FILES_MAX_PARTITION_BYTES = SQLConfigBuilder("spark.sql.files.maxPartitionBytes") .doc("The maximum number of bytes to pack into a single partition when reading files.") .longConf @@ -480,6 +487,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def parquetCacheMetadata: Boolean = getConf(PARQUET_CACHE_METADATA) + def parquetVectorizedReaderEnabled: Boolean = getConf(PARQUET_VECTORIZED_READER_ENABLED) + def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE) def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS) @@ -504,6 +513,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging { def wholeStageEnabled: Boolean = getConf(WHOLESTAGE_CODEGEN_ENABLED) + def wholeStageMaxNumFields: Int = getConf(WHOLESTAGE_MAX_NUM_FIELDS) + def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED) def canonicalView: Boolean = getConf(CANONICAL_NATIVE_VIEW) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala index 14e14710f6..6acb41dd1f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala @@ -469,6 +469,15 @@ trait FileFormat { options: Map[String, String]): RDD[InternalRow] /** + * Returns whether this format support returning columnar batch or not. + * + * TODO: we should just have different traits for the different formats. + */ + def supportBatch(sqlContext: SQLContext, dataSchema: StructType): Boolean = { + false + } + + /** * Returns a function that can be used to read a single file in as an Iterator of InternalRow. * * @param dataSchema The global data schema. It can be either specified by the user, or diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 4446a6881c..41f536fc37 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -279,7 +279,8 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi /** Plans the query and calls the provided validation function with the planned partitioning. */ def checkScan(df: DataFrame)(func: Seq[FilePartition] => Unit): Unit = { val fileScan = df.queryExecution.executedPlan.collect { - case DataSourceScan(_, scan: FileScanRDD, _, _) => scan + case scan: DataSourceScan if scan.rdd.isInstanceOf[FileScanRDD] => + scan.rdd.asInstanceOf[FileScanRDD] }.headOption.getOrElse { fail(s"No FileScan in query\n${df.queryExecution}") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 2f806ebba6..7d206e7bc4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -579,6 +579,16 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext assert(CatalystReadSupport.expandUDT(schema) === expected) } + + test("read/write wide table") { + withTempPath { dir => + val path = dir.getCanonicalPath + + val df = sqlContext.range(1000).select(Seq.tabulate(1000) {i => ('id + i).as(s"c$i")} : _*) + df.write.mode(SaveMode.Overwrite).parquet(path) + checkAnswer(sqlContext.read.parquet(path), df) + } + } } object TestingUDT { |