aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-09-06 14:17:47 +0800
committerWenchen Fan <wenchen@databricks.com>2016-09-06 14:17:47 +0800
commitc0ae6bc6ea38909730fad36e653d3c7ab0a84b44 (patch)
tree25c1e98400ccadb9db221cf02cda423fd0bd7eb4 /sql
parent64e826f91eabb1a22d3d163d71fbb7b6d2185f25 (diff)
downloadspark-c0ae6bc6ea38909730fad36e653d3c7ab0a84b44.tar.gz
spark-c0ae6bc6ea38909730fad36e653d3c7ab0a84b44.tar.bz2
spark-c0ae6bc6ea38909730fad36e653d3c7ab0a84b44.zip
[SPARK-17361][SQL] file-based external table without path should not be created
## What changes were proposed in this pull request? Using the public `Catalog` API, users can create a file-based data source table, without giving the path options. For this case, currently we can create the table successfully, but fail when we read it. Ideally we should fail during creation. This is because when we create data source table, we resolve the data source relation without validating path: `resolveRelation(checkPathExist = false)`. Looking back to why we add this trick(`checkPathExist`), it's because when we call `resolveRelation` for managed table, we add the path to data source options but the path is not created yet. So why we add this not-yet-created path to data source options? This PR fix the problem by adding path to options after we call `resolveRelation`. Then we can remove the `checkPathExist` parameter in `DataSource.resolveRelation` and do some related cleanups. ## How was this patch tested? existing tests and new test in `CatalogSuite` Author: Wenchen Fan <wenchen@databricks.com> Closes #14921 from cloud-fan/check-path.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala24
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala9
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala17
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala2
9 files changed, 48 insertions, 44 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 79231ee9e3..e74fa6e638 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -156,9 +156,9 @@ case class CatalogTable(
outputFormat: Option[String] = storage.outputFormat,
compressed: Boolean = false,
serde: Option[String] = storage.serde,
- serdeProperties: Map[String, String] = storage.properties): CatalogTable = {
+ properties: Map[String, String] = storage.properties): CatalogTable = {
copy(storage = CatalogStorageFormat(
- locationUri, inputFormat, outputFormat, serde, compressed, serdeProperties))
+ locationUri, inputFormat, outputFormat, serde, compressed, properties))
}
override def toString: String = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index c7e3279061..b1830e6cf3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -18,7 +18,6 @@
package org.apache.spark.sql.execution.command
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.QueryPlan
@@ -56,12 +55,6 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
}
}
- val optionsWithPath = if (table.tableType == CatalogTableType.MANAGED) {
- table.storage.properties + ("path" -> sessionState.catalog.defaultTablePath(table.identifier))
- } else {
- table.storage.properties
- }
-
// Create the relation to validate the arguments before writing the metadata to the metastore,
// and infer the table schema and partition if users didn't specify schema in CREATE TABLE.
val dataSource: BaseRelation =
@@ -70,7 +63,16 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
className = table.provider.get,
bucketSpec = table.bucketSpec,
- options = optionsWithPath).resolveRelation(checkPathExist = false)
+ options = table.storage.properties).resolveRelation()
+
+ dataSource match {
+ case fs: HadoopFsRelation =>
+ if (table.tableType == CatalogTableType.EXTERNAL && fs.location.paths.isEmpty) {
+ throw new AnalysisException(
+ "Cannot create a file-based external data source table without path")
+ }
+ case _ =>
+ }
val partitionColumnNames = if (table.schema.nonEmpty) {
table.partitionColumnNames
@@ -83,6 +85,12 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
}
}
+ val optionsWithPath = if (table.tableType == CatalogTableType.MANAGED) {
+ table.storage.properties + ("path" -> sessionState.catalog.defaultTablePath(table.identifier))
+ } else {
+ table.storage.properties
+ }
+
val newTable = table.copy(
storage = table.storage.copy(properties = optionsWithPath),
schema = dataSource.schema,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 53fb684eb5..bc1c4f85e3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -318,7 +318,7 @@ case class AlterTableSerDePropertiesCommand(
if (partSpec.isEmpty) {
val newTable = table.withNewStorage(
serde = serdeClassName.orElse(table.storage.serde),
- serdeProperties = table.storage.properties ++ serdeProperties.getOrElse(Map()))
+ properties = table.storage.properties ++ serdeProperties.getOrElse(Map()))
catalog.alterTable(newTable)
} else {
val spec = partSpec.get
@@ -669,7 +669,7 @@ case class AlterTableSetLocationCommand(
if (DDLUtils.isDatasourceTable(table)) {
table.withNewStorage(
locationUri = Some(location),
- serdeProperties = table.storage.properties ++ Map("path" -> location))
+ properties = table.storage.properties ++ Map("path" -> location))
} else {
table.withNewStorage(locationUri = Some(location))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 4e6caae85c..027f3588e2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -181,7 +181,7 @@ case class AlterTableRenameCommand(
if (DDLUtils.isDatasourceTable(table) && table.tableType == CatalogTableType.MANAGED) {
val newPath = catalog.defaultTablePath(newTblName)
val newTable = table.withNewStorage(
- serdeProperties = table.storage.properties ++ Map("path" -> newPath))
+ properties = table.storage.properties ++ Map("path" -> newPath))
catalog.alterTable(newTable)
}
// Invalidate the table last, otherwise uncaching the table would load the logical plan
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 9c99a800cc..71807b771a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -315,12 +315,8 @@ case class DataSource(
/**
* Create a resolved [[BaseRelation]] that can be used to read data from or write data into this
* [[DataSource]]
- *
- * @param checkPathExist A flag to indicate whether to check the existence of path or not.
- * This flag will be set to false when we create an empty table (the
- * path of the table does not exist).
*/
- def resolveRelation(checkPathExist: Boolean = true): BaseRelation = {
+ def resolveRelation(): BaseRelation = {
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val relation = (providingClass.newInstance(), userSpecifiedSchema) match {
// TODO: Throw when too much is given.
@@ -367,11 +363,11 @@ case class DataSource(
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val globPath = SparkHadoopUtil.get.globPathIfNecessary(qualified)
- if (checkPathExist && globPath.isEmpty) {
+ if (globPath.isEmpty) {
throw new AnalysisException(s"Path does not exist: $qualified")
}
// Sufficient to check head of the globPath seq for non-glob scenario
- if (checkPathExist && !fs.exists(globPath.head)) {
+ if (!fs.exists(globPath.head)) {
throw new AnalysisException(s"Path does not exist: ${globPath.head}")
}
globPath
@@ -391,7 +387,7 @@ case class DataSource(
val fileCatalog =
new ListingFileCatalog(
- sparkSession, globbedPaths, options, partitionSchema, !checkPathExist)
+ sparkSession, globbedPaths, options, partitionSchema)
val dataSchema = userSpecifiedSchema.map { schema =>
val equality = sparkSession.sessionState.conf.resolver
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
index 706ec6b9b3..60742bdbed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
@@ -17,10 +17,7 @@
package org.apache.spark.sql.execution.datasources
-import java.io.FileNotFoundException
-
import scala.collection.mutable
-import scala.util.Try
import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
@@ -37,16 +34,12 @@ import org.apache.spark.sql.types.StructType
* @param paths a list of paths to scan
* @param partitionSchema an optional partition schema that will be use to provide types for the
* discovered partitions
- * @param ignoreFileNotFound if true, return empty file list when encountering a
- * [[FileNotFoundException]] in file listing. Note that this is a hack
- * for SPARK-16313. We should get rid of this flag in the future.
*/
class ListingFileCatalog(
sparkSession: SparkSession,
override val paths: Seq[Path],
parameters: Map[String, String],
- partitionSchema: Option[StructType],
- ignoreFileNotFound: Boolean = false)
+ partitionSchema: Option[StructType])
extends PartitioningAwareFileCatalog(sparkSession, parameters, partitionSchema) {
@volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
@@ -88,7 +81,7 @@ class ListingFileCatalog(
*/
def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
- HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession, ignoreFileNotFound)
+ HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession)
} else {
// Right now, the number of paths is less than the value of
// parallelPartitionDiscoveryThreshold. So, we will list file statues at the driver.
@@ -104,12 +97,7 @@ class ListingFileCatalog(
logTrace(s"Listing $path on driver")
val childStatuses = {
- val stats =
- try {
- fs.listStatus(path)
- } catch {
- case e: FileNotFoundException if ignoreFileNotFound => Array.empty[FileStatus]
- }
+ val stats = fs.listStatus(path)
if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index 7e40c35984..5cc5f32e6e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -440,8 +440,7 @@ object HadoopFsRelation extends Logging {
def listLeafFilesInParallel(
paths: Seq[Path],
hadoopConf: Configuration,
- sparkSession: SparkSession,
- ignoreFileNotFound: Boolean): mutable.LinkedHashSet[FileStatus] = {
+ sparkSession: SparkSession): mutable.LinkedHashSet[FileStatus] = {
assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
@@ -462,11 +461,7 @@ object HadoopFsRelation extends Logging {
val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
paths.map(new Path(_)).flatMap { path =>
val fs = path.getFileSystem(serializableConfiguration.value)
- try {
- listLeafFiles(fs, fs.getFileStatus(path), pathFilter)
- } catch {
- case e: java.io.FileNotFoundException if ignoreFileNotFound => Array.empty[FileStatus]
- }
+ listLeafFiles(fs, fs.getFileStatus(path), pathFilter)
}
}.map { status =>
val blockLocations = status match {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index 8aa81854b2..b221eed7b2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
import org.apache.spark.sql.catalyst.plans.logical.Range
import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{IntegerType, StructType}
/**
@@ -305,6 +306,22 @@ class CatalogSuite
columnFields.foreach { f => assert(columnString.contains(f.toString)) }
}
+ test("createExternalTable should fail if path is not given for file-based data source") {
+ val e = intercept[AnalysisException] {
+ spark.catalog.createExternalTable("tbl", "json", Map.empty[String, String])
+ }
+ assert(e.message.contains("Unable to infer schema"))
+
+ val e2 = intercept[AnalysisException] {
+ spark.catalog.createExternalTable(
+ "tbl",
+ "json",
+ new StructType().add("i", IntegerType),
+ Map.empty[String, String])
+ }
+ assert(e2.message == "Cannot create a file-based external data source table without path")
+ }
+
// TODO: add tests for the rest of them
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index c48d4ed608..8410a2e4a4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -81,7 +81,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
options = table.storage.properties)
LogicalRelation(
- dataSource.resolveRelation(checkPathExist = true),
+ dataSource.resolveRelation(),
catalogTable = Some(table))
}
}