diff options
Diffstat (limited to 'sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala')
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala | 25 |
1 files changed, 25 insertions, 0 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 7d206e7bc4..ed20c45d5f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow +import org.apache.spark.sql.execution.BatchedDataSourceScanExec import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSQLContext @@ -589,6 +590,30 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext checkAnswer(sqlContext.read.parquet(path), df) } } + + test("returning batch for wide table") { + withSQLConf("spark.sql.codegen.maxFields" -> "100") { + withTempPath { dir => + val path = dir.getCanonicalPath + val df = sqlContext.range(100).select(Seq.tabulate(110) {i => ('id + i).as(s"c$i")} : _*) + df.write.mode(SaveMode.Overwrite).parquet(path) + + // donot return batch, because whole stage codegen is disabled for wide table (>200 columns) + val df2 = sqlContext.read.parquet(path) + assert(df2.queryExecution.sparkPlan.find(_.isInstanceOf[BatchedDataSourceScanExec]).isEmpty, + "Should not return batch") + checkAnswer(df2, df) + + // return batch + val columns = Seq.tabulate(90) {i => s"c$i"} + val df3 = df2.selectExpr(columns : _*) + assert( + df3.queryExecution.sparkPlan.find(_.isInstanceOf[BatchedDataSourceScanExec]).isDefined, + "Should not return batch") + checkAnswer(df3, df.selectExpr(columns : _*)) + } + } + } } object TestingUDT { |