aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala24
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java19
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala291
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala34
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala77
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala9
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala3
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala10
13 files changed, 267 insertions, 234 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala
index a0490e1351..28b6b2adf8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects.scala
@@ -524,22 +524,26 @@ case class CreateExternalRow(children: Seq[Expression], schema: StructType)
override def genCode(ctx: CodegenContext, ev: ExprCode): String = {
val rowClass = classOf[GenericRowWithSchema].getName
val values = ctx.freshName("values")
- val schemaField = ctx.addReferenceObj("schema", schema)
- s"""
- boolean ${ev.isNull} = false;
- final Object[] $values = new Object[${children.size}];
- """ +
- children.zipWithIndex.map { case (e, i) =>
- val eval = e.gen(ctx)
- eval.code + s"""
+ ctx.addMutableState("Object[]", values, "")
+
+ val childrenCodes = children.zipWithIndex.map { case (e, i) =>
+ val eval = e.gen(ctx)
+ eval.code + s"""
if (${eval.isNull}) {
$values[$i] = null;
} else {
$values[$i] = ${eval.value};
}
"""
- }.mkString("\n") +
- s"final ${classOf[Row].getName} ${ev.value} = new $rowClass($values, this.$schemaField);"
+ }
+ val childrenCode = ctx.splitExpressions(ctx.INPUT_ROW, childrenCodes)
+ val schemaField = ctx.addReferenceObj("schema", schema)
+ s"""
+ boolean ${ev.isNull} = false;
+ $values = new Object[${children.size}];
+ $childrenCode
+ final ${classOf[Row].getName} ${ev.value} = new $rowClass($values, this.$schemaField);
+ """
}
}
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index a0b6276ef5..51bdf0f0f2 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -31,7 +31,8 @@ import org.apache.spark.memory.MemoryMode;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
-import org.apache.spark.sql.types.*;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
/**
* A specialized RecordReader that reads into InternalRows or ColumnarBatches directly using the
@@ -100,20 +101,6 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.ON_HEAP;
/**
- * Tries to initialize the reader for this split. Returns true if this reader supports reading
- * this split and false otherwise.
- */
- public boolean tryInitialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
- throws IOException, InterruptedException {
- try {
- initialize(inputSplit, taskAttemptContext);
- return true;
- } catch (UnsupportedOperationException e) {
- return false;
- }
- }
-
- /**
* Implementation of RecordReader API.
*/
@Override
@@ -222,7 +209,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
return columnarBatch;
}
- /**
+ /*
* Can be called before any rows are returned to enable returning columnar batches directly.
*/
public void enableReturningBatches() {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 1c9cb79ba4..9259ff4062 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -120,7 +120,7 @@ class SQLContext private[sql](
*/
@transient
protected[sql] lazy val sessionState: SessionState = new SessionState(self)
- protected[sql] def conf: SQLConf = sessionState.conf
+ protected[spark] def conf: SQLConf = sessionState.conf
/**
* An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index ab575e90c9..392c48fb7b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -24,13 +24,13 @@ import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
-import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPartitioning}
+import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
import org.apache.spark.sql.catalyst.util.toCommentSafeString
import org.apache.spark.sql.execution.datasources.parquet.{DefaultSource => ParquetSource}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation}
-import org.apache.spark.sql.types.DataType
+import org.apache.spark.sql.types.{AtomicType, DataType}
object RDDConversions {
def productToRowRdd[A <: Product](data: RDD[A], outputTypes: Seq[DataType]): RDD[InternalRow] = {
@@ -123,28 +123,30 @@ private[sql] case class PhysicalRDD(
}
}
-/** Physical plan node for scanning data from a relation. */
-private[sql] case class DataSourceScan(
- output: Seq[Attribute],
- rdd: RDD[InternalRow],
- @transient relation: BaseRelation,
- override val metadata: Map[String, String] = Map.empty)
- extends LeafNode with CodegenSupport {
+private[sql] trait DataSourceScan extends LeafNode {
+ val rdd: RDD[InternalRow]
+ val relation: BaseRelation
override val nodeName: String = relation.toString
// Ignore rdd when checking results
- override def sameResult(plan: SparkPlan ): Boolean = plan match {
+ override def sameResult(plan: SparkPlan): Boolean = plan match {
case other: DataSourceScan => relation == other.relation && metadata == other.metadata
case _ => false
}
+}
- private[sql] override lazy val metrics = if (canProcessBatches()) {
- Map("numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"),
- "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
- } else {
+/** Physical plan node for scanning data from a relation. */
+private[sql] case class RowDataSourceScan(
+ output: Seq[Attribute],
+ rdd: RDD[InternalRow],
+ @transient relation: BaseRelation,
+ override val outputPartitioning: Partitioning,
+ override val metadata: Map[String, String] = Map.empty)
+ extends DataSourceScan with CodegenSupport {
+
+ private[sql] override lazy val metrics =
Map("numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))
- }
val outputUnsafeRows = relation match {
case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] =>
@@ -153,38 +155,6 @@ private[sql] case class DataSourceScan(
case _ => false
}
- override val outputPartitioning = {
- val bucketSpec = relation match {
- // TODO: this should be closer to bucket planning.
- case r: HadoopFsRelation if r.sqlContext.conf.bucketingEnabled => r.bucketSpec
- case _ => None
- }
-
- def toAttribute(colName: String): Attribute = output.find(_.name == colName).getOrElse {
- throw new AnalysisException(s"bucket column $colName not found in existing columns " +
- s"(${output.map(_.name).mkString(", ")})")
- }
-
- bucketSpec.map { spec =>
- val numBuckets = spec.numBuckets
- val bucketColumns = spec.bucketColumnNames.map(toAttribute)
- HashPartitioning(bucketColumns, numBuckets)
- }.getOrElse {
- UnknownPartitioning(0)
- }
- }
-
- private def canProcessBatches(): Boolean = {
- relation match {
- case r: HadoopFsRelation if r.fileFormat.isInstanceOf[ParquetSource] &&
- SQLContext.getActive().get.conf.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED) &&
- SQLContext.getActive().get.conf.getConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED) =>
- true
- case _ =>
- false
- }
- }
-
protected override def doExecute(): RDD[InternalRow] = {
val unsafeRow = if (outputUnsafeRows) {
rdd
@@ -211,6 +181,57 @@ private[sql] case class DataSourceScan(
rdd :: Nil
}
+ override protected def doProduce(ctx: CodegenContext): String = {
+ val numOutputRows = metricTerm(ctx, "numOutputRows")
+ // PhysicalRDD always just has one input
+ val input = ctx.freshName("input")
+ ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];")
+ val exprRows = output.zipWithIndex.map{ case (a, i) =>
+ new BoundReference(i, a.dataType, a.nullable)
+ }
+ val row = ctx.freshName("row")
+ ctx.INPUT_ROW = row
+ ctx.currentVars = null
+ val columnsRowInput = exprRows.map(_.gen(ctx))
+ val inputRow = if (outputUnsafeRows) row else null
+ s"""
+ |while ($input.hasNext()) {
+ | InternalRow $row = (InternalRow) $input.next();
+ | $numOutputRows.add(1);
+ | ${consume(ctx, columnsRowInput, inputRow).trim}
+ | if (shouldStop()) return;
+ |}
+ """.stripMargin
+ }
+}
+
+/** Physical plan node for scanning data from a batched relation. */
+private[sql] case class BatchedDataSourceScan(
+ output: Seq[Attribute],
+ rdd: RDD[InternalRow],
+ @transient relation: BaseRelation,
+ override val outputPartitioning: Partitioning,
+ override val metadata: Map[String, String] = Map.empty)
+ extends DataSourceScan with CodegenSupport {
+
+ private[sql] override lazy val metrics =
+ Map("numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"),
+ "scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
+
+ protected override def doExecute(): RDD[InternalRow] = {
+ throw new UnsupportedOperationException
+ }
+
+ override def simpleString: String = {
+ val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value"
+ val metadataStr = metadataEntries.mkString(" ", ", ", "")
+ s"BatchedScan $nodeName${output.mkString("[", ",", "]")}$metadataStr"
+ }
+
+ override def upstreams(): Seq[RDD[InternalRow]] = {
+ rdd :: Nil
+ }
+
private def genCodeColumnVector(ctx: CodegenContext, columnVar: String, ordinal: String,
dataType: DataType, nullable: Boolean): ExprCode = {
val javaType = ctx.javaType(dataType)
@@ -232,113 +253,65 @@ private[sql] case class DataSourceScan(
// Support codegen so that we can avoid the UnsafeRow conversion in all cases. Codegen
// never requires UnsafeRow as input.
override protected def doProduce(ctx: CodegenContext): String = {
- val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch"
- val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector"
val input = ctx.freshName("input")
- val idx = ctx.freshName("batchIdx")
- val rowidx = ctx.freshName("rowIdx")
- val batch = ctx.freshName("batch")
// PhysicalRDD always just has one input
ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];")
+
+ // metrics
+ val numOutputRows = metricTerm(ctx, "numOutputRows")
+ val scanTimeMetric = metricTerm(ctx, "scanTime")
+ val scanTimeTotalNs = ctx.freshName("scanTime")
+ ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;")
+
+ val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch"
+ val batch = ctx.freshName("batch")
ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;")
+
+ val columnVectorClz = "org.apache.spark.sql.execution.vectorized.ColumnVector"
+ val idx = ctx.freshName("batchIdx")
ctx.addMutableState("int", idx, s"$idx = 0;")
val colVars = output.indices.map(i => ctx.freshName("colInstance" + i))
val columnAssigns = colVars.zipWithIndex.map { case (name, i) =>
ctx.addMutableState(columnVectorClz, name, s"$name = null;")
- s"$name = ${batch}.column($i);" }
-
- val row = ctx.freshName("row")
- val numOutputRows = metricTerm(ctx, "numOutputRows")
+ s"$name = $batch.column($i);"
+ }
- // The input RDD can either return (all) ColumnarBatches or InternalRows. We determine this
- // by looking at the first value of the RDD and then calling the function which will process
- // the remaining. It is faster to return batches.
- // TODO: The abstractions between this class and SqlNewHadoopRDD makes it difficult to know
- // here which path to use. Fix this.
+ val nextBatch = ctx.freshName("nextBatch")
+ ctx.addNewFunction(nextBatch,
+ s"""
+ |private void $nextBatch() throws java.io.IOException {
+ | long getBatchStart = System.nanoTime();
+ | if ($input.hasNext()) {
+ | $batch = ($columnarBatchClz)$input.next();
+ | $numOutputRows.add($batch.numRows());
+ | $idx = 0;
+ | ${columnAssigns.mkString("", "\n", "\n")}
+ | }
+ | $scanTimeTotalNs += System.nanoTime() - getBatchStart;
+ |}""".stripMargin)
- val exprRows =
- output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, x._1.nullable))
- ctx.INPUT_ROW = row
ctx.currentVars = null
- val columnsRowInput = exprRows.map(_.gen(ctx))
- val inputRow = if (outputUnsafeRows) row else null
- val scanRows = ctx.freshName("processRows")
- ctx.addNewFunction(scanRows,
- s"""
- | private void $scanRows(InternalRow $row) throws java.io.IOException {
- | boolean firstRow = true;
- | while (!shouldStop() && (firstRow || $input.hasNext())) {
- | if (firstRow) {
- | firstRow = false;
- | } else {
- | $row = (InternalRow) $input.next();
- | }
- | $numOutputRows.add(1);
- | ${consume(ctx, columnsRowInput, inputRow).trim}
- | }
- | }""".stripMargin)
-
- // Timers for how long we spent inside the scan. We can only maintain this when using batches,
- // otherwise the overhead is too high.
- if (canProcessBatches()) {
- val scanTimeMetric = metricTerm(ctx, "scanTime")
- val getBatchStart = ctx.freshName("scanStart")
- val scanTimeTotalNs = ctx.freshName("scanTime")
- ctx.currentVars = null
- val columnsBatchInput = (output zip colVars).map { case (attr, colVar) =>
- genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable) }
- val scanBatches = ctx.freshName("processBatches")
- ctx.addMutableState("long", scanTimeTotalNs, s"$scanTimeTotalNs = 0;")
-
- ctx.addNewFunction(scanBatches,
- s"""
- | private void $scanBatches() throws java.io.IOException {
- | while (true) {
- | int numRows = $batch.numRows();
- | if ($idx == 0) {
- | ${columnAssigns.mkString("", "\n", "\n")}
- | $numOutputRows.add(numRows);
- | }
- |
- | while (!shouldStop() && $idx < numRows) {
- | int $rowidx = $idx++;
- | ${consume(ctx, columnsBatchInput).trim}
- | }
- | if (shouldStop()) return;
- |
- | long $getBatchStart = System.nanoTime();
- | if (!$input.hasNext()) {
- | $batch = null;
- | $scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000));
- | break;
- | }
- | $batch = ($columnarBatchClz)$input.next();
- | $scanTimeTotalNs += System.nanoTime() - $getBatchStart;
- | $idx = 0;
- | }
- | }""".stripMargin)
-
- val value = ctx.freshName("value")
- s"""
- | if ($batch != null) {
- | $scanBatches();
- | } else if ($input.hasNext()) {
- | Object $value = $input.next();
- | if ($value instanceof $columnarBatchClz) {
- | $batch = ($columnarBatchClz)$value;
- | $scanBatches();
- | } else {
- | $scanRows((InternalRow) $value);
- | }
- | }
- """.stripMargin
- } else {
- s"""
- |if ($input.hasNext()) {
- | $scanRows((InternalRow) $input.next());
- |}
- """.stripMargin
+ val rowidx = ctx.freshName("rowIdx")
+ val columnsBatchInput = (output zip colVars).map { case (attr, colVar) =>
+ genCodeColumnVector(ctx, colVar, rowidx, attr.dataType, attr.nullable)
}
+ s"""
+ |if ($batch == null) {
+ | $nextBatch();
+ |}
+ |while ($batch != null) {
+ | int numRows = $batch.numRows();
+ | while ($idx < numRows) {
+ | int $rowidx = $idx++;
+ | ${consume(ctx, columnsBatchInput).trim}
+ | if (shouldStop()) return;
+ | }
+ | $batch = null;
+ | $nextBatch();
+ |}
+ |$scanTimeMetric.add($scanTimeTotalNs / (1000 * 1000));
+ |$scanTimeTotalNs = 0;
+ """.stripMargin
}
}
@@ -346,4 +319,38 @@ private[sql] object DataSourceScan {
// Metadata keys
val INPUT_PATHS = "InputPaths"
val PUSHED_FILTERS = "PushedFilters"
+
+ def create(
+ output: Seq[Attribute],
+ rdd: RDD[InternalRow],
+ relation: BaseRelation,
+ metadata: Map[String, String] = Map.empty): DataSourceScan = {
+ val outputPartitioning = {
+ val bucketSpec = relation match {
+ // TODO: this should be closer to bucket planning.
+ case r: HadoopFsRelation if r.sqlContext.conf.bucketingEnabled => r.bucketSpec
+ case _ => None
+ }
+
+ def toAttribute(colName: String): Attribute = output.find(_.name == colName).getOrElse {
+ throw new AnalysisException(s"bucket column $colName not found in existing columns " +
+ s"(${output.map(_.name).mkString(", ")})")
+ }
+
+ bucketSpec.map { spec =>
+ val numBuckets = spec.numBuckets
+ val bucketColumns = spec.bucketColumnNames.map(toAttribute)
+ HashPartitioning(bucketColumns, numBuckets)
+ }.getOrElse {
+ UnknownPartitioning(0)
+ }
+ }
+
+ relation match {
+ case r: HadoopFsRelation if r.fileFormat.supportBatch(r.sqlContext, relation.schema) =>
+ BatchedDataSourceScan(output, rdd, relation, outputPartitioning, metadata)
+ case _ =>
+ RowDataSourceScan(output, rdd, relation, outputPartitioning, metadata)
+ }
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
index 4e75a3a794..98129d6c52 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.aggregate.TungstenAggregate
import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, SortMergeJoin}
import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetrics}
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
/**
* An interface for those physical operators that support codegen.
@@ -433,12 +434,20 @@ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] {
case _ => true
}
+ private def numOfNestedFields(dataType: DataType): Int = dataType match {
+ case dt: StructType => dt.fields.map(f => numOfNestedFields(f.dataType)).sum
+ case m: MapType => numOfNestedFields(m.keyType) + numOfNestedFields(m.valueType)
+ case a: ArrayType => numOfNestedFields(a.elementType)
+ case u: UserDefinedType[_] => numOfNestedFields(u.sqlType)
+ case _ => 1
+ }
+
private def supportCodegen(plan: SparkPlan): Boolean = plan match {
case plan: CodegenSupport if plan.supportCodegen =>
val willFallback = plan.expressions.exists(_.find(e => !supportCodegen(e)).isDefined)
// the generated code will be huge if there are too many columns
- val haveManyColumns = plan.output.length > 200
- !willFallback && !haveManyColumns
+ val haveTooManyFields = numOfNestedFields(plan.schema) > conf.wholeStageMaxNumFields
+ !willFallback && !haveTooManyFields
case _ => false
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 52c8f3ef0b..8c183317f6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -238,7 +238,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
}
case l @ LogicalRelation(baseRelation: TableScan, _, _) =>
- execution.DataSourceScan(
+ execution.DataSourceScan.create(
l.output, toCatalystRDD(l, baseRelation.buildScan()), baseRelation) :: Nil
case i @ logical.InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _, _),
@@ -610,7 +610,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// Don't request columns that are only referenced by pushed filters.
.filterNot(handledSet.contains)
- val scan = execution.DataSourceScan(
+ val scan = execution.DataSourceScan.create(
projects.map(_.toAttribute),
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation, metadata)
@@ -620,7 +620,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
val requestedColumns =
(projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq
- val scan = execution.DataSourceScan(
+ val scan = execution.DataSourceScan.create(
requestedColumns,
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation, metadata)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index 618d5a522b..aa1f76450c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -181,7 +181,7 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
}
val scan =
- DataSourceScan(
+ DataSourceScan.create(
readDataColumns ++ partitionColumns,
new FileScanRDD(
files.sqlContext,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
index 159fdc99dd..6ddb218a22 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SqlNewHadoopRDD.scala
@@ -97,13 +97,6 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
@transient protected val jobId = new JobID(jobTrackerId, id)
- // If true, enable using the custom RecordReader for parquet. This only works for
- // a subset of the types (no complex types).
- protected val enableVectorizedParquetReader: Boolean =
- sqlContext.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key).toBoolean
- protected val enableWholestageCodegen: Boolean =
- sqlContext.getConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key).toBoolean
-
override def getPartitions: Array[SparkPartition] = {
val conf = getConf(isDriverSide = true)
val inputFormat = inputFormatClass.newInstance
@@ -165,32 +158,9 @@ private[spark] class SqlNewHadoopRDD[V: ClassTag](
}
val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
- private[this] var reader: RecordReader[Void, V] = null
-
- /**
- * If the format is ParquetInputFormat, try to create the optimized RecordReader. If this
- * fails (for example, unsupported schema), try with the normal reader.
- * TODO: plumb this through a different way?
- */
- if (enableVectorizedParquetReader &&
- format.getClass.getName == "org.apache.parquet.hadoop.ParquetInputFormat") {
- val parquetReader: VectorizedParquetRecordReader = new VectorizedParquetRecordReader()
- if (!parquetReader.tryInitialize(
- split.serializableHadoopSplit.value, hadoopAttemptContext)) {
- parquetReader.close()
- } else {
- reader = parquetReader.asInstanceOf[RecordReader[Void, V]]
- parquetReader.resultBatch()
- // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
- if (enableWholestageCodegen) parquetReader.enableReturningBatches()
- }
- }
-
- if (reader == null) {
- reader = format.createRecordReader(
+ private[this] var reader: RecordReader[Void, V] = format.createRecordReader(
split.serializableHadoopSplit.value, hadoopAttemptContext)
- reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
- }
+ reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
// Register an on-task-completion callback to close the input stream.
context.addTaskCompletionListener(context => close())
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
index 5b58fa1fc5..a2fd8da782 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala
@@ -24,7 +24,6 @@ import java.util.logging.{Logger => JLogger}
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.util.{Failure, Try}
-import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
@@ -53,7 +52,7 @@ import org.apache.spark.sql.catalyst.parser.LegacyTypeStringParser
import org.apache.spark.sql.execution.datasources._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
-import org.apache.spark.sql.types.{DataType, StructType}
+import org.apache.spark.sql.types.{AtomicType, DataType, StructType}
import org.apache.spark.util.{SerializableConfiguration, Utils}
import org.apache.spark.util.collection.BitSet
@@ -276,6 +275,16 @@ private[sql] class DefaultSource
file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
}
+ /**
+ * Returns whether the reader will the rows as batch or not.
+ */
+ override def supportBatch(sqlContext: SQLContext, schema: StructType): Boolean = {
+ val conf = SQLContext.getActive().get.conf
+ conf.useFileScan && conf.parquetVectorizedReaderEnabled &&
+ conf.wholeStageEnabled && schema.length <= conf.wholeStageMaxNumFields &&
+ schema.forall(_.dataType.isInstanceOf[AtomicType])
+ }
+
override def buildReader(
sqlContext: SQLContext,
dataSchema: StructType,
@@ -306,6 +315,10 @@ private[sql] class DefaultSource
SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
sqlContext.conf.getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP))
+ // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
+ val returningBatch =
+ supportBatch(sqlContext, StructType(partitionSchema.fields ++ dataSchema.fields))
+
// Try to push down filters when filter push-down is enabled.
val pushed = if (sqlContext.getConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key).toBoolean) {
filters
@@ -324,10 +337,8 @@ private[sql] class DefaultSource
// TODO: if you move this into the closure it reverts to the default values.
// If true, enable using the custom RecordReader for parquet. This only works for
// a subset of the types (no complex types).
- val enableVectorizedParquetReader: Boolean =
- sqlContext.getConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key).toBoolean
- val enableWholestageCodegen: Boolean =
- sqlContext.getConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key).toBoolean
+ val enableVectorizedParquetReader: Boolean = sqlContext.conf.parquetVectorizedReaderEnabled &&
+ dataSchema.forall(_.dataType.isInstanceOf[AtomicType])
(file: PartitionedFile) => {
assert(file.partitionValues.numFields == partitionSchema.size)
@@ -347,32 +358,27 @@ private[sql] class DefaultSource
val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0)
val hadoopAttemptContext = new TaskAttemptContextImpl(broadcastedConf.value.value, attemptId)
- val parquetReader = try {
- if (!enableVectorizedParquetReader) sys.error("Vectorized reader turned off.")
+ val parquetReader = if (enableVectorizedParquetReader) {
val vectorizedReader = new VectorizedParquetRecordReader()
vectorizedReader.initialize(split, hadoopAttemptContext)
logDebug(s"Appending $partitionSchema ${file.partitionValues}")
vectorizedReader.initBatch(partitionSchema, file.partitionValues)
- // Whole stage codegen (PhysicalRDD) is able to deal with batches directly
- // TODO: fix column appending
- if (enableWholestageCodegen) {
- logDebug(s"Enabling batch returning")
+ if (returningBatch) {
vectorizedReader.enableReturningBatches()
}
vectorizedReader
- } catch {
- case NonFatal(e) =>
- logDebug(s"Falling back to parquet-mr: $e", e)
- val reader = pushed match {
- case Some(filter) =>
- new ParquetRecordReader[InternalRow](
- new CatalystReadSupport,
- FilterCompat.get(filter, null))
- case _ =>
- new ParquetRecordReader[InternalRow](new CatalystReadSupport)
- }
- reader.initialize(split, hadoopAttemptContext)
- reader
+ } else {
+ logDebug(s"Falling back to parquet-mr")
+ val reader = pushed match {
+ case Some(filter) =>
+ new ParquetRecordReader[InternalRow](
+ new CatalystReadSupport,
+ FilterCompat.get(filter, null))
+ case _ =>
+ new ParquetRecordReader[InternalRow](new CatalystReadSupport)
+ }
+ reader.initialize(split, hadoopAttemptContext)
+ reader
}
val iter = new RecordReaderIterator(parquetReader)
@@ -432,13 +438,21 @@ private[sql] class DefaultSource
val setInputPaths =
ParquetRelation.initializeDriverSideJobFunc(inputFiles, parquetBlockSize) _
+ val allPrimitiveTypes = dataSchema.forall(_.dataType.isInstanceOf[AtomicType])
+ val inputFormatCls = if (sqlContext.conf.parquetVectorizedReaderEnabled
+ && allPrimitiveTypes) {
+ classOf[VectorizedParquetInputFormat]
+ } else {
+ classOf[ParquetInputFormat[InternalRow]]
+ }
+
Utils.withDummyCallSite(sqlContext.sparkContext) {
new SqlNewHadoopRDD(
sqlContext = sqlContext,
broadcastedConf = broadcastedConf,
initDriverSideJobFuncOpt = Some(setInputPaths),
initLocalJobFuncOpt = Some(initLocalJobFuncOpt),
- inputFormatClass = classOf[ParquetInputFormat[InternalRow]],
+ inputFormatClass = inputFormatCls,
valueClass = classOf[InternalRow]) {
val cacheMetadata = useMetadataCache
@@ -481,6 +495,17 @@ private[sql] class DefaultSource
}
}
+/**
+ * The ParquetInputFormat that create VectorizedParquetRecordReader.
+ */
+final class VectorizedParquetInputFormat extends ParquetInputFormat[InternalRow] {
+ override def createRecordReader(
+ inputSplit: InputSplit,
+ taskAttemptContext: TaskAttemptContext): ParquetRecordReader[InternalRow] = {
+ new VectorizedParquetRecordReader().asInstanceOf[ParquetRecordReader[InternalRow]]
+ }
+}
+
// NOTE: This class is instantiated and used on executor side only, no need to be serializable.
private[sql] class ParquetOutputWriter(
path: String,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 927af89949..dc6ba1bcfb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -396,6 +396,13 @@ object SQLConf {
.booleanConf
.createWithDefault(true)
+ val WHOLESTAGE_MAX_NUM_FIELDS = SQLConfigBuilder("spark.sql.codegen.maxFields")
+ .internal()
+ .doc("The maximum number of fields (including nested fields) that will be supported before" +
+ " deactivating whole-stage codegen.")
+ .intConf
+ .createWithDefault(200)
+
val FILES_MAX_PARTITION_BYTES = SQLConfigBuilder("spark.sql.files.maxPartitionBytes")
.doc("The maximum number of bytes to pack into a single partition when reading files.")
.longConf
@@ -480,6 +487,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def parquetCacheMetadata: Boolean = getConf(PARQUET_CACHE_METADATA)
+ def parquetVectorizedReaderEnabled: Boolean = getConf(PARQUET_VECTORIZED_READER_ENABLED)
+
def columnBatchSize: Int = getConf(COLUMN_BATCH_SIZE)
def numShufflePartitions: Int = getConf(SHUFFLE_PARTITIONS)
@@ -504,6 +513,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def wholeStageEnabled: Boolean = getConf(WHOLESTAGE_CODEGEN_ENABLED)
+ def wholeStageMaxNumFields: Int = getConf(WHOLESTAGE_MAX_NUM_FIELDS)
+
def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED)
def canonicalView: Boolean = getConf(CANONICAL_NATIVE_VIEW)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 14e14710f6..6acb41dd1f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -469,6 +469,15 @@ trait FileFormat {
options: Map[String, String]): RDD[InternalRow]
/**
+ * Returns whether this format support returning columnar batch or not.
+ *
+ * TODO: we should just have different traits for the different formats.
+ */
+ def supportBatch(sqlContext: SQLContext, dataSchema: StructType): Boolean = {
+ false
+ }
+
+ /**
* Returns a function that can be used to read a single file in as an Iterator of InternalRow.
*
* @param dataSchema The global data schema. It can be either specified by the user, or
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 4446a6881c..41f536fc37 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -279,7 +279,8 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
/** Plans the query and calls the provided validation function with the planned partitioning. */
def checkScan(df: DataFrame)(func: Seq[FilePartition] => Unit): Unit = {
val fileScan = df.queryExecution.executedPlan.collect {
- case DataSourceScan(_, scan: FileScanRDD, _, _) => scan
+ case scan: DataSourceScan if scan.rdd.isInstanceOf[FileScanRDD] =>
+ scan.rdd.asInstanceOf[FileScanRDD]
}.headOption.getOrElse {
fail(s"No FileScan in query\n${df.queryExecution}")
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 2f806ebba6..7d206e7bc4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -579,6 +579,16 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
assert(CatalystReadSupport.expandUDT(schema) === expected)
}
+
+ test("read/write wide table") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+
+ val df = sqlContext.range(1000).select(Seq.tabulate(1000) {i => ('id + i).as(s"c$i")} : _*)
+ df.write.mode(SaveMode.Overwrite).parquet(path)
+ checkAnswer(sqlContext.read.parquet(path), df)
+ }
+ }
}
object TestingUDT {