aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala110
1 files changed, 87 insertions, 23 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 1fa15730bc..dac56d3936 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -22,8 +22,6 @@ import java.io.File
import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.mapreduce.Job
-import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet, PredicateHelper}
@@ -34,8 +32,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{IntegerType, StructType}
-import org.apache.spark.util.{SerializableConfiguration, Utils}
-import org.apache.spark.util.collection.BitSet
+import org.apache.spark.util.Utils
class FileSourceStrategySuite extends QueryTest with SharedSQLContext with PredicateHelper {
import testImplicits._
@@ -76,7 +73,8 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
"file2" -> 5,
"file3" -> 5))
- withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "10") {
+ withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "11",
+ SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") {
checkScan(table.select('c1)) { partitions =>
// 5 byte files should be laid out [(5, 5), (5)]
assert(partitions.size == 2, "when checking partitions")
@@ -98,11 +96,12 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
createTable(
files = Seq(
"file1" -> 15,
- "file2" -> 4))
+ "file2" -> 3))
- withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "10") {
+ withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "10",
+ SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") {
checkScan(table.select('c1)) { partitions =>
- // Files should be laid out [(0-5), (5-10, 4)]
+ // Files should be laid out [(0-10), (10-15, 4)]
assert(partitions.size == 2, "when checking partitions")
assert(partitions(0).files.size == 1, "when checking partition 1")
assert(partitions(1).files.size == 2, "when checking partition 2")
@@ -121,6 +120,53 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
}
}
+ test("Unpartitioned table, many files that get split") {
+ val table =
+ createTable(
+ files = Seq(
+ "file1" -> 2,
+ "file2" -> 2,
+ "file3" -> 1,
+ "file4" -> 1,
+ "file5" -> 1,
+ "file6" -> 1))
+
+ withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "4",
+ SQLConf.FILES_OPEN_COST_IN_BYTES.key -> "1") {
+ checkScan(table.select('c1)) { partitions =>
+ // Files should be laid out [(file1), (file2, file3), (file4, file5), (file6)]
+ assert(partitions.size == 4, "when checking partitions")
+ assert(partitions(0).files.size == 1, "when checking partition 1")
+ assert(partitions(1).files.size == 2, "when checking partition 2")
+ assert(partitions(2).files.size == 2, "when checking partition 3")
+ assert(partitions(3).files.size == 1, "when checking partition 4")
+
+ // First partition reads (file1)
+ assert(partitions(0).files(0).start == 0)
+ assert(partitions(0).files(0).length == 2)
+
+ // Second partition reads (file2, file3)
+ assert(partitions(1).files(0).start == 0)
+ assert(partitions(1).files(0).length == 2)
+ assert(partitions(1).files(1).start == 0)
+ assert(partitions(1).files(1).length == 1)
+
+ // Third partition reads (file4, file5)
+ assert(partitions(2).files(0).start == 0)
+ assert(partitions(2).files(0).length == 1)
+ assert(partitions(2).files(1).start == 0)
+ assert(partitions(2).files(1).length == 1)
+
+ // Final partition reads (file6)
+ assert(partitions(3).files(0).start == 0)
+ assert(partitions(3).files(0).length == 1)
+ }
+
+ checkPartitionSchema(StructType(Nil))
+ checkDataSchema(StructType(Nil).add("c1", IntegerType))
+ }
+ }
+
test("partitioned table") {
val table =
createTable(
@@ -147,6 +193,34 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
checkDataFilters(Set(IsNotNull("c1"), EqualTo("c1", 1)))
}
+ test("partitioned table - case insensitive") {
+ withSQLConf("spark.sql.caseSensitive" -> "false") {
+ val table =
+ createTable(
+ files = Seq(
+ "p1=1/file1" -> 10,
+ "p1=2/file2" -> 10))
+
+ // Only one file should be read.
+ checkScan(table.where("P1 = 1")) { partitions =>
+ assert(partitions.size == 1, "when checking partitions")
+ assert(partitions.head.files.size == 1, "when files in partition 1")
+ }
+ // We don't need to reevaluate filters that are only on partitions.
+ checkDataFilters(Set.empty)
+
+ // Only one file should be read.
+ checkScan(table.where("P1 = 1 AND C1 = 1 AND (P1 + C1) = 1")) { partitions =>
+ assert(partitions.size == 1, "when checking partitions")
+ assert(partitions.head.files.size == 1, "when checking files in partition 1")
+ assert(partitions.head.files.head.partitionValues.getInt(0) == 1,
+ "when checking partition values")
+ }
+ // Only the filters that do not contain the partition column should be pushed down
+ checkDataFilters(Set(IsNotNull("c1"), EqualTo("c1", 1)))
+ }
+ }
+
test("partitioned table - after scan filters") {
val table =
createTable(
@@ -230,7 +304,8 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
/** Plans the query and calls the provided validation function with the planned partitioning. */
def checkScan(df: DataFrame)(func: Seq[FilePartition] => Unit): Unit = {
val fileScan = df.queryExecution.executedPlan.collect {
- case DataSourceScan(_, scan: FileScanRDD, _, _) => scan
+ case scan: DataSourceScan if scan.rdd.isInstanceOf[FileScanRDD] =>
+ scan.rdd.asInstanceOf[FileScanRDD]
}.headOption.getOrElse {
fail(s"No FileScan in query\n${df.queryExecution}")
}
@@ -315,28 +390,17 @@ class TestFileFormat extends FileFormat {
throw new NotImplementedError("JUST FOR TESTING")
}
- override def buildInternalScan(
- sqlContext: SQLContext,
- dataSchema: StructType,
- requiredColumns: Array[String],
- filters: Array[Filter],
- bucketSet: Option[BitSet],
- inputFiles: Seq[FileStatus],
- broadcastedConf: Broadcast[SerializableConfiguration],
- options: Map[String, String]): RDD[InternalRow] = {
- throw new NotImplementedError("JUST FOR TESTING")
- }
-
override def buildReader(
sqlContext: SQLContext,
- partitionSchema: StructType,
dataSchema: StructType,
+ partitionSchema: StructType,
+ requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String]): PartitionedFile => Iterator[InternalRow] = {
// Record the arguments so they can be checked in the test case.
LastArguments.partitionSchema = partitionSchema
- LastArguments.dataSchema = dataSchema
+ LastArguments.dataSchema = requiredSchema
LastArguments.filters = filters
LastArguments.options = options