diff options
Diffstat (limited to 'sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala | 15 |
1 files changed, 7 insertions, 8 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index 38c0084952..bbbbc5ebe9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -286,10 +286,6 @@ private[sql] class DefaultSource SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, sqlContext.conf.getConf(SQLConf.PARQUET_INT96_AS_TIMESTAMP)) - // Whole stage codegen (PhysicalRDD) is able to deal with batches directly - val returningBatch = - supportBatch(sqlContext, StructType(partitionSchema.fields ++ dataSchema.fields)) - // Try to push down filters when filter push-down is enabled. val pushed = if (sqlContext.getConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key).toBoolean) { filters @@ -308,8 +304,11 @@ private[sql] class DefaultSource // TODO: if you move this into the closure it reverts to the default values. // If true, enable using the custom RecordReader for parquet. This only works for // a subset of the types (no complex types). - val enableVectorizedParquetReader: Boolean = sqlContext.conf.parquetVectorizedReaderEnabled && - dataSchema.forall(_.dataType.isInstanceOf[AtomicType]) + val resultSchema = StructType(partitionSchema.fields ++ requiredSchema.fields) + val enableVectorizedReader: Boolean = sqlContext.conf.parquetVectorizedReaderEnabled && + resultSchema.forall(_.dataType.isInstanceOf[AtomicType]) + // Whole stage codegen (PhysicalRDD) is able to deal with batches directly + val returningBatch = supportBatch(sqlContext, resultSchema) (file: PartitionedFile) => { assert(file.partitionValues.numFields == partitionSchema.size) @@ -329,7 +328,7 @@ private[sql] class DefaultSource val attemptId = new TaskAttemptID(new TaskID(new JobID(), TaskType.MAP, 0), 0) val hadoopAttemptContext = new TaskAttemptContextImpl(broadcastedConf.value.value, attemptId) - val parquetReader = if (enableVectorizedParquetReader) { + val parquetReader = if (enableVectorizedReader) { val vectorizedReader = new VectorizedParquetRecordReader() vectorizedReader.initialize(split, hadoopAttemptContext) logDebug(s"Appending $partitionSchema ${file.partitionValues}") @@ -356,7 +355,7 @@ private[sql] class DefaultSource // UnsafeRowParquetRecordReader appends the columns internally to avoid another copy. if (parquetReader.isInstanceOf[VectorizedParquetRecordReader] && - enableVectorizedParquetReader) { + enableVectorizedReader) { iter.asInstanceOf[Iterator[InternalRow]] } else { val fullSchema = requiredSchema.toAttributes ++ partitionSchema.toAttributes |