aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEric Liang <ekl@databricks.com>2016-12-04 20:44:04 +0800
committerWenchen Fan <wenchen@databricks.com>2016-12-04 20:44:04 +0800
commitd9eb4c7215f26dd05527c0b9980af35087ab9d64 (patch)
tree2696b15e871b3d98731f9d841f811e1f52806f67
parente463678b194e08be4a8bc9d1d45461d6c77a15ee (diff)
downloadspark-d9eb4c7215f26dd05527c0b9980af35087ab9d64.tar.gz
spark-d9eb4c7215f26dd05527c0b9980af35087ab9d64.tar.bz2
spark-d9eb4c7215f26dd05527c0b9980af35087ab9d64.zip
[SPARK-18661][SQL] Creating a partitioned datasource table should not scan all files for table
## What changes were proposed in this pull request? Even though in 2.1 creating a partitioned datasource table will not populate the partition data by default (until the user issues MSCK REPAIR TABLE), it seems we still scan the filesystem for no good reason. We should avoid doing this when the user specifies a schema. ## How was this patch tested? Perf stat tests. Author: Eric Liang <ekl@databricks.com> Closes #16090 from ericl/spark-18661.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala11
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala51
4 files changed, 66 insertions, 8 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 422700c891..193a2a2cdc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -58,13 +58,21 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
// Create the relation to validate the arguments before writing the metadata to the metastore,
// and infer the table schema and partition if users didn't specify schema in CREATE TABLE.
val pathOption = table.storage.locationUri.map("path" -> _)
+ // Fill in some default table options from the session conf
+ val tableWithDefaultOptions = table.copy(
+ identifier = table.identifier.copy(
+ database = Some(
+ table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase))),
+ tracksPartitionsInCatalog = sparkSession.sessionState.conf.manageFilesourcePartitions)
val dataSource: BaseRelation =
DataSource(
sparkSession = sparkSession,
userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
+ partitionColumns = table.partitionColumnNames,
className = table.provider.get,
bucketSpec = table.bucketSpec,
- options = table.storage.properties ++ pathOption).resolveRelation()
+ options = table.storage.properties ++ pathOption,
+ catalogTable = Some(tableWithDefaultOptions)).resolveRelation()
dataSource match {
case fs: HadoopFsRelation =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index ccfc759c8f..f47eb84df0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -132,7 +132,7 @@ case class DataSource(
}.toArray
new InMemoryFileIndex(sparkSession, globbedPaths, options, None)
}
- val partitionSchema = if (partitionColumns.isEmpty && catalogTable.isEmpty) {
+ val partitionSchema = if (partitionColumns.isEmpty) {
// Try to infer partitioning, because no DataSource in the read path provides the partitioning
// columns properly unless it is a Hive DataSource
val resolved = tempFileIndex.partitionSchema.map { partitionField =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 2a004ba2f1..e61beb49e4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -312,7 +312,13 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
pathToNonPartitionedTable,
userSpecifiedSchema = Option("num int, str string"),
userSpecifiedPartitionCols = partitionCols,
- expectedSchema = new StructType().add("num", IntegerType).add("str", StringType),
+ expectedSchema = if (partitionCols.isDefined) {
+ // we skipped inference, so the partition col is ordered at the end
+ new StructType().add("str", StringType).add("num", IntegerType)
+ } else {
+ // no inferred partitioning, so schema is in original order
+ new StructType().add("num", IntegerType).add("str", StringType)
+ },
expectedPartitionCols = partitionCols.map(Seq(_)).getOrElse(Seq.empty[String]))
}
}
@@ -565,7 +571,8 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
val table = catalog.getTableMetadata(TableIdentifier("tbl"))
assert(table.tableType == CatalogTableType.MANAGED)
assert(table.provider == Some("parquet"))
- assert(table.schema == new StructType().add("a", IntegerType).add("b", IntegerType))
+ // a is ordered last since it is a user-specified partitioning column
+ assert(table.schema == new StructType().add("b", IntegerType).add("a", IntegerType))
assert(table.partitionColumnNames == Seq("a"))
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
index 9838b9a4eb..65c02d473b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionedTablePerfStatsSuite.scala
@@ -60,36 +60,52 @@ class PartitionedTablePerfStatsSuite
setupPartitionedHiveTable(tableName, dir, 5)
}
- private def setupPartitionedHiveTable(tableName: String, dir: File, scale: Int): Unit = {
+ private def setupPartitionedHiveTable(
+ tableName: String, dir: File, scale: Int,
+ clearMetricsBeforeCreate: Boolean = false, repair: Boolean = true): Unit = {
spark.range(scale).selectExpr("id as fieldOne", "id as partCol1", "id as partCol2").write
.partitionBy("partCol1", "partCol2")
.mode("overwrite")
.parquet(dir.getAbsolutePath)
+ if (clearMetricsBeforeCreate) {
+ HiveCatalogMetrics.reset()
+ }
+
spark.sql(s"""
|create external table $tableName (fieldOne long)
|partitioned by (partCol1 int, partCol2 int)
|stored as parquet
|location "${dir.getAbsolutePath}"""".stripMargin)
- spark.sql(s"msck repair table $tableName")
+ if (repair) {
+ spark.sql(s"msck repair table $tableName")
+ }
}
private def setupPartitionedDatasourceTable(tableName: String, dir: File): Unit = {
setupPartitionedDatasourceTable(tableName, dir, 5)
}
- private def setupPartitionedDatasourceTable(tableName: String, dir: File, scale: Int): Unit = {
+ private def setupPartitionedDatasourceTable(
+ tableName: String, dir: File, scale: Int,
+ clearMetricsBeforeCreate: Boolean = false, repair: Boolean = true): Unit = {
spark.range(scale).selectExpr("id as fieldOne", "id as partCol1", "id as partCol2").write
.partitionBy("partCol1", "partCol2")
.mode("overwrite")
.parquet(dir.getAbsolutePath)
+ if (clearMetricsBeforeCreate) {
+ HiveCatalogMetrics.reset()
+ }
+
spark.sql(s"""
|create table $tableName (fieldOne long, partCol1 int, partCol2 int)
|using parquet
|options (path "${dir.getAbsolutePath}")
|partitioned by (partCol1, partCol2)""".stripMargin)
- spark.sql(s"msck repair table $tableName")
+ if (repair) {
+ spark.sql(s"msck repair table $tableName")
+ }
}
genericTest("partitioned pruned table reports only selected files") { spec =>
@@ -250,6 +266,33 @@ class PartitionedTablePerfStatsSuite
}
}
+ test("datasource table: table setup does not scan filesystem") {
+ withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
+ withTable("test") {
+ withTempDir { dir =>
+ setupPartitionedDatasourceTable(
+ "test", dir, scale = 10, clearMetricsBeforeCreate = true, repair = false)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+ }
+ }
+ }
+ }
+
+ test("hive table: table setup does not scan filesystem") {
+ withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
+ withTable("test") {
+ withTempDir { dir =>
+ HiveCatalogMetrics.reset()
+ setupPartitionedHiveTable(
+ "test", dir, scale = 10, clearMetricsBeforeCreate = true, repair = false)
+ assert(HiveCatalogMetrics.METRIC_FILES_DISCOVERED.getCount() == 0)
+ assert(HiveCatalogMetrics.METRIC_FILE_CACHE_HITS.getCount() == 0)
+ }
+ }
+ }
+ }
+
test("hive table: num hive client calls does not scale with partition count") {
withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
withTable("test") {