diff options
Diffstat (limited to 'sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala')
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala | 110 |
1 files changed, 87 insertions, 23 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 1fa15730bc..dac56d3936 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -22,8 +22,6 @@ import java.io.File import org.apache.hadoop.fs.FileStatus import org.apache.hadoop.mapreduce.Job -import org.apache.spark.broadcast.Broadcast -import org.apache.spark.rdd.RDD import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet, PredicateHelper} @@ -34,8 +32,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{IntegerType, StructType} -import org.apache.spark.util.{SerializableConfiguration, Utils} -import org.apache.spark.util.collection.BitSet +import org.apache.spark.util.Utils class FileSourceStrategySuite extends QueryTest with SharedSQLContext with PredicateHelper { import testImplicits._ @@ -76,7 +73,8 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi "file2" -> 5, "file3" -> 5)) - withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "10") { + withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "11", + SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") { checkScan(table.select('c1)) { partitions => // 5 byte files should be laid out [(5, 5), (5)] assert(partitions.size == 2, "when checking partitions") @@ -98,11 +96,12 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi createTable( files = Seq( "file1" -> 15, - "file2" -> 4)) + "file2" -> 3)) - withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "10") { + withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "10", + SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") { checkScan(table.select('c1)) { partitions => - // Files should be laid out [(0-5), (5-10, 4)] + // Files should be laid out [(0-10), (10-15, 4)] assert(partitions.size == 2, "when checking partitions") assert(partitions(0).files.size == 1, "when checking partition 1") assert(partitions(1).files.size == 2, "when checking partition 2") @@ -121,6 +120,53 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi } } + test("Unpartitioned table, many files that get split") { + val table = + createTable( + files = Seq( + "file1" -> 2, + "file2" -> 2, + "file3" -> 1, + "file4" -> 1, + "file5" -> 1, + "file6" -> 1)) + + withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "4", + SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") { + checkScan(table.select('c1)) { partitions => + // Files should be laid out [(file1), (file2, file3), (file4, file5), (file6)] + assert(partitions.size == 4, "when checking partitions") + assert(partitions(0).files.size == 1, "when checking partition 1") + assert(partitions(1).files.size == 2, "when checking partition 2") + assert(partitions(2).files.size == 2, "when checking partition 3") + assert(partitions(3).files.size == 1, "when checking partition 4") + + // First partition reads (file1) + assert(partitions(0).files(0).start == 0) + assert(partitions(0).files(0).length == 2) + + // Second partition reads (file2, file3) + assert(partitions(1).files(0).start == 0) + assert(partitions(1).files(0).length == 2) + assert(partitions(1).files(1).start == 0) + assert(partitions(1).files(1).length == 1) + + // Third partition reads (file4, file5) + assert(partitions(2).files(0).start == 0) + assert(partitions(2).files(0).length == 1) + assert(partitions(2).files(1).start == 0) + assert(partitions(2).files(1).length == 1) + + // Final partition reads (file6) + assert(partitions(3).files(0).start == 0) + assert(partitions(3).files(0).length == 1) + } + + checkPartitionSchema(StructType(Nil)) + checkDataSchema(StructType(Nil).add("c1", IntegerType)) + } + } + test("partitioned table") { val table = createTable( @@ -147,6 +193,34 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi checkDataFilters(Set(IsNotNull("c1"), EqualTo("c1", 1))) } + test("partitioned table - case insensitive") { + withSQLConf("spark.sql.caseSensitive" -> "false") { + val table = + createTable( + files = Seq( + "p1=1/file1" -> 10, + "p1=2/file2" -> 10)) + + // Only one file should be read. + checkScan(table.where("P1 = 1")) { partitions => + assert(partitions.size == 1, "when checking partitions") + assert(partitions.head.files.size == 1, "when files in partition 1") + } + // We don't need to reevaluate filters that are only on partitions. + checkDataFilters(Set.empty) + + // Only one file should be read. + checkScan(table.where("P1 = 1 AND C1 = 1 AND (P1 + C1) = 1")) { partitions => + assert(partitions.size == 1, "when checking partitions") + assert(partitions.head.files.size == 1, "when checking files in partition 1") + assert(partitions.head.files.head.partitionValues.getInt(0) == 1, + "when checking partition values") + } + // Only the filters that do not contain the partition column should be pushed down + checkDataFilters(Set(IsNotNull("c1"), EqualTo("c1", 1))) + } + } + test("partitioned table - after scan filters") { val table = createTable( @@ -230,7 +304,8 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi /** Plans the query and calls the provided validation function with the planned partitioning. */ def checkScan(df: DataFrame)(func: Seq[FilePartition] => Unit): Unit = { val fileScan = df.queryExecution.executedPlan.collect { - case DataSourceScan(_, scan: FileScanRDD, _, _) => scan + case scan: DataSourceScan if scan.rdd.isInstanceOf[FileScanRDD] => + scan.rdd.asInstanceOf[FileScanRDD] }.headOption.getOrElse { fail(s"No FileScan in query\n${df.queryExecution}") } @@ -315,28 +390,17 @@ class TestFileFormat extends FileFormat { throw new NotImplementedError("JUST FOR TESTING") } - override def buildInternalScan( - sqlContext: SQLContext, - dataSchema: StructType, - requiredColumns: Array[String], - filters: Array[Filter], - bucketSet: Option[BitSet], - inputFiles: Seq[FileStatus], - broadcastedConf: Broadcast[SerializableConfiguration], - options: Map[String, String]): RDD[InternalRow] = { - throw new NotImplementedError("JUST FOR TESTING") - } - override def buildReader( sqlContext: SQLContext, - partitionSchema: StructType, dataSchema: StructType, + partitionSchema: StructType, + requiredSchema: StructType, filters: Seq[Filter], options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = { // Record the arguments so they can be checked in the test case. LastArguments.partitionSchema = partitionSchema - LastArguments.dataSchema = dataSchema + LastArguments.dataSchema = requiredSchema LastArguments.filters = filters LastArguments.options = options |