aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-04-06 15:33:39 -0700
committerDavies Liu <davies.liu@gmail.com>2016-04-06 15:33:39 -0700
commit5a4b11a901703464b9261dea0642d80cf8d4856c (patch)
tree8807e70f4508b8e6253ebc116b9ea2b1475997b6 /sql
parenta4ead6d3881f071a2ae53ff1c961c6ac388cac1d (diff)
downloadspark-5a4b11a901703464b9261dea0642d80cf8d4856c.tar.gz
spark-5a4b11a901703464b9261dea0642d80cf8d4856c.tar.bz2
spark-5a4b11a901703464b9261dea0642d80cf8d4856c.zip
[SPARK-14224] [SPARK-14223] [SPARK-14310] [SQL] fix RowEncoder and parquet reader for wide table
## What changes were proposed in this pull request? 1) fix the RowEncoder for wide table (many columns) by splitting the generate code into multiple functions. 2) Separate DataSourceScan as RowDataSourceScan and BatchedDataSourceScan 3) Disable the returning columnar batch in parquet reader if there are many columns. 4) Added a internal config for maximum number of fields (nested) columns supported by whole stage codegen. Closes #12098 ## How was this patch tested? Add a tests for table with 1000 columns. Author: Davies Liu <davies@databricks.com> Closes #12047 from davies/many_columns.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala24
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java19
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala291
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala34
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala77
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala9
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala10
13 files changed, 267 insertions, 234 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala
index a0490e1351..28b6b2adf8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala
@@ -524,22 +524,26 @@ case class CreateExternalRow(children: Seq[Expression], schema: StructType)
override def genCode(ctx: CodegenContext, ev: ExprCode): String = {
val rowClass = classOf[GenericRowWithSchema].getName
val values = ctx.freshName("values")
- val schemaField = ctx.addReferenceObj("schema", schema)
- s"""
- boolean ${ev.isNull} = false;
- final Object[] $values = new Object[${children.size}];
- """ +
- children.zipWithIndex.map { case (e, i) =>
- val eval = e.gen(ctx)
- eval.code + s"""
+ ctx.addMutableState("Object[]", values, "")
+
+ val childrenCodes = children.zipWithIndex.map { case (e, i) =>
+ val eval = e.gen(ctx)
+ eval.code + s"""
if (${eval.isNull}) {
$values[$i] = null;
} else {
$values[$i] = ${eval.value};
}
"""
- }.mkString("\n") +
- s"final ${classOf[Row].getName} ${ev.value} = new $rowClass($values, this.$schemaField);"
+ }
+ val childrenCode = ctx.splitExpressions(ctx.INPUT_ROW, childrenCodes)
+ val schemaField = ctx.addReferenceObj("schema", schema)
+ s"""
+ boolean ${ev.isNull} = false;
+ $values = new Object[${children.size}];
+ $childrenCode
+ final ${classOf[Row].getName} ${ev.value} = new $rowClass($values, this.$schemaField);
+ """
}
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index a0b6276ef5..51bdf0f0f2 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -31,7 +31,8 @@ import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
-import org.apache.spark.sql.types.*;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
/**
* A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the
@@ -100,20 +101,6 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.ON_HEAP;
/**
- * Tries to initialize the reader for this split. Returns true if this reader supports reading
- * this split and false otherwise.
- */
- public boolean tryInitialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
- throws IOException, InterruptedException {
- try {
- initialize(inputSplit, taskAttemptContext);
- return true;
- } catch (UnsupportedOperationException e) {
- return false;
- }
- }
-
- /**
* Implementation of RecordReader API.
*/
@Override
@@ -222,7 +209,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
return columnarBatch;
}
- /**
+ /*
* Can be called before any rows are returned to enable returning columnar batches directly.
*/
public void enableReturningBatches() {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 1c9cb79ba4..9259ff4062 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -120,7 +120,7 @@ class SQLContext private[sql](
*/
@transient
protected[sql] lazy val sessionState: SessionState = new SessionState(self)
- protected[sql] def conf: SQLConf = sessionState.conf
+ protected[spark] def conf: SQLConf = sessionState.conf
/**
* An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index ab575e90c9..392c48fb7b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -24,13 +24,13 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
-import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPartitioning}
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
import org.apache.spark.sql.catalyst.util.toCommentSafeString
import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetSource}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation}
-import org.apache.spark.sql.types.DataType
+import org.apache.spark.sql.types.{AtomicType, DataType}
object RDDConversions {
def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[InternalRow] = {
@@ -123,28 +123,30 @@ private[sql] case class PhysicalRDD(
}
}
-/** Physical plan node for scanning data from a relation. */
-private[sql] case class DataSourceScan(
- output: Seq[Attribute],
- rdd: RDD[InternalRow],
- @transient relation: BaseRelation,
- override val metadata: Map[String, String] = Map.empty)
- extends LeafNode with CodegenSupport {
+private[sql] trait DataSourceScan extends LeafNode {
+ val rdd: RDD[InternalRow]
+ val relation: BaseRelation
override val nodeName: String = relation.toString
// Ignore rdd when checking results
- override def sameResult(plan: SparkPlan ): Boolean = plan match {
+ override def sameResult(plan: SparkPlan): Boolean = plan match {
case other: DataSourceScan => relation == other.relation && metadata == other.metadata
case _ => false
}
+}
- private[sql] override lazy val metrics = if (canProcessBatches()) {
- Map("numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"),
- "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
- } else {
+/** Physical plan node for scanning data from a relation. */
+private[sql] case class RowDataSourceScan(
+ output: Seq[Attribute],
+ rdd: RDD[InternalRow],
+ @transient relation: BaseRelation,
+ override val outputPartitioning: Partitioning,
+ override val metadata: Map[String, String] = Map.empty)
+ extends DataSourceScan with CodegenSupport {
+
+ private[sql] override lazy val metrics =
Map("numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
- }
val outputUnsafeRows = relation match {
case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] =>
@@ -153,38 +155,6 @@ private[sql] case class DataSourceScan(
case _ => false
}
- override val outputPartitioning = {
- val bucketSpec = relation match {
- // TODO: this should be closer to bucket planning.
- case r: HadoopFsRelation if r.sqlContext.conf.bucketingEnabled => r.bucketSpec
- case _ => None
- }
-
- def toAttribute(colName: String): Attribute = output.find(_.name == colName).getOrElse {
- throw new AnalysisException(s"bucket column $colName not found in existing columns " +
- s"(${output.map(_.name).mkString(", ")})")
- }
-
- bucketSpec.map { spec =>
- val numBuckets = spec.numBuckets
- val bucketColumns = spec.bucketColumnNames.map(toAttribute)
- HashPartitioning(bucketColumns, numBuckets)
- }.getOrElse {
- UnknownPartitioning(0)
- }
- }
-
- private def canProcessBatches(): Boolean = {
- relation match {
- case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] &&
- SQLContext.getActive().get.conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED) &&
- SQLContext.getActive().get.conf.getConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED) =>
- true
- case _ =>
- false
- }
- }
-
protected override def doExecute(): RDD[InternalRow] = {
val unsafeRow = if (outputUnsafeRows) {
rdd
@@ -211,6 +181,57 @@ private[sql] case class DataSourceScan(
rdd :: Nil
}
+ override protected def doProduce(ctx: CodegenContext): String = {
+ val numOutputRows = metricTerm(ctx, "numOutputRows")
+ // PhysicalRDD always just has one input
+ val input = ctx.freshName("input")
+ ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];")
+ val exprRows = output.zipWithIndex.map{ case (a, i) =>
+ new BoundReference(i, a.dataType, a.nullable)
+ }
+ val row = ctx.freshName("row")
+ ctx.INPUT_ROW = row
+ ctx.currentVars = null
+ val columnsRowInput = exprRows.map(_.gen(ctx))
+ val inputRow = if (outputUnsafeRows) row else null
+ s"""
+ |while ($input.hasNext()) {
+ | InternalRow $row = (InternalRow) $input.next();
+ | $numOutputRows.add(1);
+ | ${consume(ctx, columnsRowInput, inputRow).trim}
+ | if (shouldStop()) return;
+ |}
+ """.stripMargin
+ }
+}
+
+/** Physical plan node for scanning data from a batched relation. */
+private[sql] case class BatchedDataSourceScan(
+ output: Seq[Attribute],
+ rdd: RDD[InternalRow],
+ @transient relation: BaseRelation,
+ override val outputPartitioning: Partitioning,
+ override val metadata: Map[String, String] = Map.empty)
+ extends DataSourceScan with CodegenSupport {
+
+ private[sql] override lazy val metrics =
+ Map("numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"),
+ "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
+
+ protected override def doExecute(): RDD[InternalRow] = {
+ throw new UnsupportedOperationException
+ }
+
+ override def simpleString: String = {
+ val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value"
+ val metadataStr = metadataEntries.mkString(" ", ", ", "")
+ s"BatchedScan $nodeName${output.mkString("[", ",", "]")}$metadataStr"
+ }
+
+ override def upstreams(): Seq[RDD[InternalRow]] = {
+ rdd :: Nil
+ }
+
private def genCodeColumnVector(ctx: CodegenContext, columnVar: String, ordinal: String,
dataType: DataType, nullable: Boolean): ExprCode = {
val javaType = ctx.javaType(dataType)
@@ -232,113 +253,65 @@ private[sql] case class DataSourceScan(
// Support codegen so that we can avoid the UnsafeRow conversion in all cases. Codegen
// never requires UnsafeRow as input.
override protected def doProduce(ctx: CodegenContext): String = {
- val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch"
- val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector"
val input = ctx.freshName("input")
- val idx = ctx.freshName("batchIdx")
- val rowidx = ctx.freshName("rowIdx")
- val batch = ctx.freshName("batch")
// PhysicalRDD always just has one input
ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];")
+
+ // metrics
+ val numOutputRows = metricTerm(ctx, "numOutputRows")
+ val scanTimeMetric = metricTerm(ctx, "scanTime")
+ val scanTimeTotalNs = ctx.freshName("scanTime")
+ ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;")
+
+ val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch"
+ val batch = ctx.freshName("batch")
ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;")
+
+ val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector"
+ val idx = ctx.freshName("batchIdx")
ctx.addMutableState("int", idx, s"$idx = 0;")
val colVars = output.indices.map(i => ctx.freshName("colInstance" + i))
val columnAssigns = colVars.zipWithIndex.map { case (name, i) =>
ctx.addMutableState(columnVectorClz, name, s"$name = null;")
- s"$name = ${batch}.column($i);" }
-
- val row = ctx.freshName("row")
- val numOutputRows = metricTerm(ctx, "numOutputRows")
+ s"$name = $batch.column($i);"
+ }
- // The input RDD can either return (all) ColumnarBatches or InternalRows. We determine this
- // by looking at the first value of the RDD and then calling the function which will process
- // the remaining. It is faster to return batches.
- // TODO: The abstractions between this class and SqlNewHadoopRDD makes it difficult to know
- // here which path to use. Fix this.
+ val nextBatch = ctx.freshName("nextBatch")
+ ctx.addNewFunction(nextBatch,
+ s"""
+ |private void $nextBatch() throws java.io.IOException {
+ | long getBatchStart = System.nanoTime();
+ | if ($input.hasNext()) {
+ | $batch = ($columnarBatchClz)$input.next();
+ | $numOutputRows.add($batch.numRows());
+ | $idx = 0;
+ | ${columnAssigns.mkString("", "\n", "\n")}
+ | }
+ | $scanTimeTotalNs += System.nanoTime() - getBatchStart;
+ |}""".stripMargin)
- val exprRows =
- output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, x._1.nullable))
- ctx.INPUT_ROW = row
ctx.currentVars = null
- val columnsRowInput = exprRows.map(_.gen(ctx))
- val inputRow = if (outputUnsafeRows) row else null
- val scanRows = ctx.freshName("processRows")
- ctx.addNewFunction(scanRows,
- s"""
- | private void $scanRows(InternalRow $row) throws java.io.IOException {
- | boolean firstRow = true;
- | while (!shouldStop() && (firstRow || $input.hasNext())) {
- | if (firstRow) {
- | firstRow = false;
- | } else {
- | $row = (InternalRow) $input.next();
- | }
- | $numOutputRows.add(1);
- | ${consume(ctx, columnsRowInput, inputRow).trim}
- | }
- | }""".stripMargin)
-
- // Timers for how long we spent inside the scan. We can only maintain this when using batches,
- // otherwise the overhead is too high.
- if (canProcessBatches()) {
- val scanTimeMetric = metricTerm(ctx, "scanTime")
- val getBatchStart = ctx.freshName("scanStart")
- val scanTimeTotalNs = ctx.freshName("scanTime")
- ctx.currentVars = null
- val columnsBatchInput = (output zip colVars).map { case (attr, colVar) =>
- genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) }
- val scanBatches = ctx.freshName("processBatches")
- ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;")
-
- ctx.addNewFunction(scanBatches,
- s"""
- | private void $scanBatches() throws java.io.IOException {
- | while (true) {
- | int numRows = $batch.numRows();
- | if ($idx == 0) {
- | ${columnAssigns.mkString("", "\n", "\n")}
- | $numOutputRows.add(numRows);
- | }
- |
- | while (!shouldStop() && $idx < numRows) {
- | int $rowidx = $idx++;
- | ${consume(ctx, columnsBatchInput).trim}
- | }
- | if (shouldStop()) return;
- |
- | long $getBatchStart = System.nanoTime();
- | if (!$input.hasNext()) {
- | $batch = null;
- | $scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000));
- | break;
- | }
- | $batch = ($columnarBatchClz)$input.next();
- | $scanTimeTotalNs += System.nanoTime() - $getBatchStart;
- | $idx = 0;
- | }
- | }""".stripMargin)
-
- val value = ctx.freshName("value")
- s"""
- | if ($batch != null) {
- | $scanBatches();
- | } else if ($input.hasNext()) {
- | Object $value = $input.next();
- | if ($value instanceof $columnarBatchClz) {
- | $batch = ($columnarBatchClz)$value;
- | $scanBatches();
- | } else {
- | $scanRows((InternalRow) $value);
- | }
- | }
- """.stripMargin
- } else {
- s"""
- |if ($input.hasNext()) {
- | $scanRows((InternalRow) $input.next());
- |}
- """.stripMargin
+ val rowidx = ctx.freshName("rowIdx")
+ val columnsBatchInput = (output zip colVars).map { case (attr, colVar) =>
+ genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable)
}
+ s"""
+ |if ($batch == null) {
+ | $nextBatch();
+ |}
+ |while ($batch != null) {
+ | int numRows = $batch.numRows();
+ | while ($idx < numRows) {
+ | int $rowidx = $idx++;
+ | ${consume(ctx, columnsBatchInput).trim}
+ | if (shouldStop()) return;
+ | }
+ | $batch = null;
+ | $nextBatch();
+ |}
+ |$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000));
+ |$scanTimeTotalNs = 0;
+ """.stripMargin
}
}
@@ -346,4 +319,38 @@ private[sql] object DataSourceScan {
// Metadata keys
val INPUT_PATHS = "InputPaths"
val PUSHED_FILTERS = "PushedFilters"
+
+ def create(
+ output: Seq[Attribute],
+ rdd: RDD[InternalRow],
+ relation: BaseRelation,
+ metadata: Map[String, String] = Map.empty): DataSourceScan = {
+ val outputPartitioning = {
+ val bucketSpec = relation match {
+ // TODO: this should be closer to bucket planning.
+ case r: HadoopFsRelation if r.sqlContext.conf.bucketingEnabled => r.bucketSpec
+ case _ => None
+ }
+
+ def toAttribute(colName: String): Attribute = output.find(_.name == colName).getOrElse {
+ throw new AnalysisException(s"bucket column $colName not found in existing columns " +
+ s"(${output.map(_.name).mkString(", ")})")
+ }
+
+ bucketSpec.map { spec =>
+ val numBuckets = spec.numBuckets
+ val bucketColumns = spec.bucketColumnNames.map(toAttribute)
+ HashPartitioning(bucketColumns, numBuckets)
+ }.getOrElse {
+ UnknownPartitioning(0)
+ }
+ }
+
+ relation match {
+ case r: HadoopFsRelation if r.fileFormat.supportBatch(r.sqlContext, relation.schema) =>
+ BatchedDataSourceScan(output, rdd, relation, outputPartitioning, metadata)
+ case _ =>
+ RowDataSourceScan(output, rdd, relation, outputPartitioning, metadata)
+ }
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
index 4e75a3a794..98129d6c52 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.aggregate.TungstenAggregate
import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, SortMergeJoin}
import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetrics}
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
/**
* An interface for those physical operators that support codegen.
@@ -433,12 +434,20 @@ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] {
case _ => true
}
+ private def numOfNestedFields(dataType: DataType): Int = dataType match {
+ case dt: StructType => dt.fields.map(f => numOfNestedFields(f.dataType)).sum
+ case m: MapType => numOfNestedFields(m.keyType) + numOfNestedFields(m.valueType)
+ case a: ArrayType => numOfNestedFields(a.elementType)
+ case u: UserDefinedType[_] => numOfNestedFields(u.sqlType)
+ case _ => 1
+ }
+
private def supportCodegen(plan: SparkPlan): Boolean = plan match {
case plan: CodegenSupport if plan.supportCodegen =>
val willFallback = plan.expressions.exists(_.find(e => !supportCodegen(e)).isDefined)
// the generated code will be huge if there are too many columns
- val haveManyColumns = plan.output.length > 200
- !willFallback && !haveManyColumns
+ val haveTooManyFields = numOfNestedFields(plan.schema) > conf.wholeStageMaxNumFields
+ !willFallback && !haveTooManyFields
case _ => false
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 52c8f3ef0b..8c183317f6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -238,7 +238,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
}
case l @ LogicalRelation(baseRelation: TableScan, _, _) =>
- execution.DataSourceScan(
+ execution.DataSourceScan.create(
l.output, toCatalystRDD(l, baseRelation.buildScan()), baseRelation) :: Nil
case i @ logical.InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _, _),
@@ -610,7 +610,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// Don't request columns that are only referenced by pushed filters.
.filterNot(handledSet.contains)
- val scan = execution.DataSourceScan(
+ val scan = execution.DataSourceScan.create(
projects.map(_.toAttribute),
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation, metadata)
@@ -620,7 +620,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val requestedColumns =
(projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq
- val scan = execution.DataSourceScan(
+ val scan = execution.DataSourceScan.create(
requestedColumns,
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation, metadata)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 618d5a522b..aa1f76450c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -181,7 +181,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
}
val scan =
- DataSourceScan(
+ DataSourceScan.create(
readDataColumns ++ partitionColumns,
new FileScanRDD(
files.sqlContext,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
index 159fdc99dd..6ddb218a22 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
@@ -97,13 +97,6 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
@transient protected val jobId = new JobID(jobTrackerId, id)
- // If true, enable using the custom RecordReader for parquet. This only works for
- // a subset of the types (no complex types).
- protected val enableVectorizedParquetReader: Boolean =
- sqlContext.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key).toBoolean
- protected val enableWholestageCodegen: Boolean =
- sqlContext.getConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key).toBoolean
-
override def getPartitions: Array[SparkPartition] = {
val conf = getConf(isDriverSide = true)
val inputFormat = inputFormatClass.newInstance
@@ -165,32 +158,9 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
}
val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
- private[this] var reader: RecordReader[Void, V] = null
-
- /**
- * If the format is ParquetInputFormat, try to create the optimized RecordReader. If this
- * fails (for example, unsupported schema), try with the normal reader.
- * TODO: plumb this through a different way?
- */
- if (enableVectorizedParquetReader &&
- format.getClass.getName == "org.apache.parquet.hadoop.ParquetInputFormat") {
- val parquetReader: VectorizedParquetRecordReader = new VectorizedParquetRecordReader()
- if (!parquetReader.tryInitialize(
- split.serializableHadoopSplit.value, hadoopAttemptContext)) {
- parquetReader.close()
- } else {
- reader = parquetReader.asInstanceOf[RecordReader[Void, V]]
- parquetReader.resultBatch()
- // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
- if (enableWholestageCodegen) parquetReader.enableReturningBatches()
- }
- }
-
- if (reader == null) {
- reader = format.createRecordReader(
+ private[this] var reader: RecordReader[Void, V] = format.createRecordReader(
split.serializableHadoopSplit.value, hadoopAttemptContext)
- reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
- }
+ reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener(context => close())
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 5b58fa1fc5..a2fd8da782 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -24,7 +24,6 @@ import java.util.logging.{Logger => JLogger}
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.{Failure, Try}
-import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
@@ -53,7 +52,7 @@ import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.types.{AtomicType, DataType, StructType}
import org.apache.spark.util.{SerializableConfiguration, Utils}
import org.apache.spark.util.collection.BitSet
@@ -276,6 +275,16 @@ private[sql] class DefaultSource
file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
}
+ /**
+ * Returns whether the reader will the rows as batch or not.
+ */
+ override def supportBatch(sqlContext: SQLContext, schema: StructType): Boolean = {
+ val conf = SQLContext.getActive().get.conf
+ conf.useFileScan && conf.parquetVectorizedReaderEnabled &&
+ conf.wholeStageEnabled && schema.length <= conf.wholeStageMaxNumFields &&
+ schema.forall(_.dataType.isInstanceOf[AtomicType])
+ }
+
override def buildReader(
sqlContext: SQLContext,
dataSchema: StructType,
@@ -306,6 +315,10 @@ private[sql] class DefaultSource
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sqlContext.conf.getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP))
+ // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
+ val returningBatch =
+ supportBatch(sqlContext, StructType(partitionSchema.fields ++ dataSchema.fields))
+
// Try to push down filters when filter push-down is enabled.
val pushed = if (sqlContext.getConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key).toBoolean) {
filters
@@ -324,10 +337,8 @@ private[sql] class DefaultSource
// TODO: if you move this into the closure it reverts to the default values.
// If true, enable using the custom RecordReader for parquet. This only works for
// a subset of the types (no complex types).
- val enableVectorizedParquetReader: Boolean =
- sqlContext.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key).toBoolean
- val enableWholestageCodegen: Boolean =
- sqlContext.getConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key).toBoolean
+ val enableVectorizedParquetReader: Boolean = sqlContext.conf.parquetVectorizedReaderEnabled &&
+ dataSchema.forall(_.dataType.isInstanceOf[AtomicType])
(file: PartitionedFile) => {
assert(file.partitionValues.numFields == partitionSchema.size)
@@ -347,32 +358,27 @@ private[sql] class DefaultSource
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val hadoopAttemptContext = new TaskAttemptContextImpl(broadcastedConf.value.value, attemptId)
- val parquetReader = try {
- if (!enableVectorizedParquetReader) sys.error("Vectorized reader turned off.")
+ val parquetReader = if (enableVectorizedParquetReader) {
val vectorizedReader = new VectorizedParquetRecordReader()
vectorizedReader.initialize(split, hadoopAttemptContext)
logDebug(s"Appending $partitionSchema ${file.partitionValues}")
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
- // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
- // TODO: fix column appending
- if (enableWholestageCodegen) {
- logDebug(s"Enabling batch returning")
+ if (returningBatch) {
vectorizedReader.enableReturningBatches()
}
vectorizedReader
- } catch {
- case NonFatal(e) =>
- logDebug(s"Falling back to parquet-mr: $e", e)
- val reader = pushed match {
- case Some(filter) =>
- new ParquetRecordReader[InternalRow](
- new CatalystReadSupport,
- FilterCompat.get(filter, null))
- case _ =>
- new ParquetRecordReader[InternalRow](new CatalystReadSupport)
- }
- reader.initialize(split, hadoopAttemptContext)
- reader
+ } else {
+ logDebug(s"Falling back to parquet-mr")
+ val reader = pushed match {
+ case Some(filter) =>
+ new ParquetRecordReader[InternalRow](
+ new CatalystReadSupport,
+ FilterCompat.get(filter, null))
+ case _ =>
+ new ParquetRecordReader[InternalRow](new CatalystReadSupport)
+ }
+ reader.initialize(split, hadoopAttemptContext)
+ reader
}
val iter = new RecordReaderIterator(parquetReader)
@@ -432,13 +438,21 @@ private[sql] class DefaultSource
val setInputPaths =
ParquetRelation.initializeDriverSideJobFunc(inputFiles, parquetBlockSize) _
+ val allPrimitiveTypes = dataSchema.forall(_.dataType.isInstanceOf[AtomicType])
+ val inputFormatCls = if (sqlContext.conf.parquetVectorizedReaderEnabled
+ && allPrimitiveTypes) {
+ classOf[VectorizedParquetInputFormat]
+ } else {
+ classOf[ParquetInputFormat[InternalRow]]
+ }
+
Utils.withDummyCallSite(sqlContext.sparkContext) {
new SqlNewHadoopRDD(
sqlContext = sqlContext,
broadcastedConf = broadcastedConf,
initDriverSideJobFuncOpt = Some(setInputPaths),
initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
- inputFormatClass = classOf[ParquetInputFormat[InternalRow]],
+ inputFormatClass = inputFormatCls,
valueClass = classOf[InternalRow]) {
val cacheMetadata = useMetadataCache
@@ -481,6 +495,17 @@ private[sql] class DefaultSource
}
}
+/**
+ * The ParquetInputFormat that create VectorizedParquetRecordReader.
+ */
+final class VectorizedParquetInputFormat extends ParquetInputFormat[InternalRow] {
+ override def createRecordReader(
+ inputSplit: InputSplit,
+ taskAttemptContext: TaskAttemptContext): ParquetRecordReader[InternalRow] = {
+ new VectorizedParquetRecordReader().asInstanceOf[ParquetRecordReader[InternalRow]]
+ }
+}
+
// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
private[sql] class ParquetOutputWriter(
path: String,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 927af89949..dc6ba1bcfb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -396,6 +396,13 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val WHOLESTAGE_MAX_NUM_FIELDS = SQLConfigBuilder("spark.sql.codegen.maxFields")
+ .internal()
+ .doc("The maximum number of fields (including nested fields) that will be supported before" +
+ " deactivating whole-stage codegen.")
+ .intConf
+ .createWithDefault(200)
+
val FILES_MAX_PARTITION_BYTES = SQLConfigBuilder("spark.sql.files.maxPartitionBytes")
.doc("The maximum number of bytes to pack into a single partition when reading files.")
.longConf
@@ -480,6 +487,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def parquetCacheMetadata: Boolean = getConf(PARQUET_CACHE_METADATA)
+ def parquetVectorizedReaderEnabled: Boolean = getConf(PARQUET_VECTORIZED_READER_ENABLED)
+
def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE)
def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)
@@ -504,6 +513,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def wholeStageEnabled: Boolean = getConf(WHOLESTAGE_CODEGEN_ENABLED)
+ def wholeStageMaxNumFields: Int = getConf(WHOLESTAGE_MAX_NUM_FIELDS)
+
def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED)
def canonicalView: Boolean = getConf(CANONICAL_NATIVE_VIEW)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 14e14710f6..6acb41dd1f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -469,6 +469,15 @@ trait FileFormat {
options: Map[String, String]): RDD[InternalRow]
/**
+ * Returns whether this format support returning columnar batch or not.
+ *
+ * TODO: we should just have different traits for the different formats.
+ */
+ def supportBatch(sqlContext: SQLContext, dataSchema: StructType): Boolean = {
+ false
+ }
+
+ /**
* Returns a function that can be used to read a single file in as an Iterator of InternalRow.
*
* @param dataSchema The global data schema. It can be either specified by the user, or
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 4446a6881c..41f536fc37 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -279,7 +279,8 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
/** Plans the query and calls the provided validation function with the planned partitioning. */
def checkScan(df: DataFrame)(func: Seq[FilePartition] => Unit): Unit = {
val fileScan = df.queryExecution.executedPlan.collect {
- case DataSourceScan(_, scan: FileScanRDD, _, _) => scan
+ case scan: DataSourceScan if scan.rdd.isInstanceOf[FileScanRDD] =>
+ scan.rdd.asInstanceOf[FileScanRDD]
}.headOption.getOrElse {
fail(s"No FileScan in query\n${df.queryExecution}")
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 2f806ebba6..7d206e7bc4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -579,6 +579,16 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
assert(CatalystReadSupport.expandUDT(schema) === expected)
}
+
+ test("read/write wide table") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+
+ val df = sqlContext.range(1000).select(Seq.tabulate(1000) {i => ('id + i).as(s"c$i")} : _*)
+ df.write.mode(SaveMode.Overwrite).parquet(path)
+ checkAnswer(sqlContext.read.parquet(path), df)
+ }
+ }
}
object TestingUDT {