aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala24
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala9
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala17
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala2
9 files changed, 48 insertions, 44 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 79231ee9e3..e74fa6e638 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -156,9 +156,9 @@ case class CatalogTable(
outputFormat: Option[String] = storage.outputFormat,
compressed: Boolean = false,
serde: Option[String] = storage.serde,
- serdeProperties: Map[String, String] = storage.properties): CatalogTable = {
+ properties: Map[String, String] = storage.properties): CatalogTable = {
copy(storage = CatalogStorageFormat(
- locationUri, inputFormat, outputFormat, serde, compressed, serdeProperties))
+ locationUri, inputFormat, outputFormat, serde, compressed, properties))
}
override def toString: String = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index c7e3279061..b1830e6cf3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -18,7 +18,6 @@
package org.apache.spark.sql.execution.command
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.plans.QueryPlan
@@ -56,12 +55,6 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
}
}
- val optionsWithPath = if (table.tableType == CatalogTableType.MANAGED) {
- table.storage.properties + ("path" -> sessionState.catalog.defaultTablePath(table.identifier))
- } else {
- table.storage.properties
- }
-
// Create the relation to validate the arguments before writing the metadata to the metastore,
// and infer the table schema and partition if users didn't specify schema in CREATE TABLE.
val dataSource: BaseRelation =
@@ -70,7 +63,16 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
className = table.provider.get,
bucketSpec = table.bucketSpec,
- options = optionsWithPath).resolveRelation(checkPathExist = false)
+ options = table.storage.properties).resolveRelation()
+
+ dataSource match {
+ case fs: HadoopFsRelation =>
+ if (table.tableType == CatalogTableType.EXTERNAL && fs.location.paths.isEmpty) {
+ throw new AnalysisException(
+ "Cannot create a file-based external data source table without path")
+ }
+ case _ =>
+ }
val partitionColumnNames = if (table.schema.nonEmpty) {
table.partitionColumnNames
@@ -83,6 +85,12 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
}
}
+ val optionsWithPath = if (table.tableType == CatalogTableType.MANAGED) {
+ table.storage.properties + ("path" -> sessionState.catalog.defaultTablePath(table.identifier))
+ } else {
+ table.storage.properties
+ }
+
val newTable = table.copy(
storage = table.storage.copy(properties = optionsWithPath),
schema = dataSource.schema,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 53fb684eb5..bc1c4f85e3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -318,7 +318,7 @@ case class AlterTableSerDePropertiesCommand(
if (partSpec.isEmpty) {
val newTable = table.withNewStorage(
serde = serdeClassName.orElse(table.storage.serde),
- serdeProperties = table.storage.properties ++ serdeProperties.getOrElse(Map()))
+ properties = table.storage.properties ++ serdeProperties.getOrElse(Map()))
catalog.alterTable(newTable)
} else {
val spec = partSpec.get
@@ -669,7 +669,7 @@ case class AlterTableSetLocationCommand(
if (DDLUtils.isDatasourceTable(table)) {
table.withNewStorage(
locationUri = Some(location),
- serdeProperties = table.storage.properties ++ Map("path" -> location))
+ properties = table.storage.properties ++ Map("path" -> location))
} else {
table.withNewStorage(locationUri = Some(location))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 4e6caae85c..027f3588e2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -181,7 +181,7 @@ case class AlterTableRenameCommand(
if (DDLUtils.isDatasourceTable(table) && table.tableType == CatalogTableType.MANAGED) {
val newPath = catalog.defaultTablePath(newTblName)
val newTable = table.withNewStorage(
- serdeProperties = table.storage.properties ++ Map("path" -> newPath))
+ properties = table.storage.properties ++ Map("path" -> newPath))
catalog.alterTable(newTable)
}
// Invalidate the table last, otherwise uncaching the table would load the logical plan
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 9c99a800cc..71807b771a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -315,12 +315,8 @@ case class DataSource(
/**
* Create a resolved [[BaseRelation]] that can be used to read data from or write data into this
* [[DataSource]]
- *
- * @param checkPathExist A flag to indicate whether to check the existence of path or not.
- * This flag will be set to false when we create an empty table (the
- * path of the table does not exist).
*/
- def resolveRelation(checkPathExist: Boolean = true): BaseRelation = {
+ def resolveRelation(): BaseRelation = {
val caseInsensitiveOptions = new CaseInsensitiveMap(options)
val relation = (providingClass.newInstance(), userSpecifiedSchema) match {
// TODO: Throw when too much is given.
@@ -367,11 +363,11 @@ case class DataSource(
val qualified = hdfsPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
val globPath = SparkHadoopUtil.get.globPathIfNecessary(qualified)
- if (checkPathExist && globPath.isEmpty) {
+ if (globPath.isEmpty) {
throw new AnalysisException(s"Path does not exist: $qualified")
}
// Sufficient to check head of the globPath seq for non-glob scenario
- if (checkPathExist && !fs.exists(globPath.head)) {
+ if (!fs.exists(globPath.head)) {
throw new AnalysisException(s"Path does not exist: ${globPath.head}")
}
globPath
@@ -391,7 +387,7 @@ case class DataSource(
val fileCatalog =
new ListingFileCatalog(
- sparkSession, globbedPaths, options, partitionSchema, !checkPathExist)
+ sparkSession, globbedPaths, options, partitionSchema)
val dataSchema = userSpecifiedSchema.map { schema =>
val equality = sparkSession.sessionState.conf.resolver
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
index 706ec6b9b3..60742bdbed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
@@ -17,10 +17,7 @@
package org.apache.spark.sql.execution.datasources
-import java.io.FileNotFoundException
-
import scala.collection.mutable
-import scala.util.Try
import org.apache.hadoop.fs.{FileStatus, LocatedFileStatus, Path}
import org.apache.hadoop.mapred.{FileInputFormat, JobConf}
@@ -37,16 +34,12 @@ import org.apache.spark.sql.types.StructType
* @param paths a list of paths to scan
* @param partitionSchema an optional partition schema that will be use to provide types for the
* discovered partitions
- * @param ignoreFileNotFound if true, return empty file list when encountering a
- * [[FileNotFoundException]] in file listing. Note that this is a hack
- * for SPARK-16313. We should get rid of this flag in the future.
*/
class ListingFileCatalog(
sparkSession: SparkSession,
override val paths: Seq[Path],
parameters: Map[String, String],
- partitionSchema: Option[StructType],
- ignoreFileNotFound: Boolean = false)
+ partitionSchema: Option[StructType])
extends PartitioningAwareFileCatalog(sparkSession, parameters, partitionSchema) {
@volatile private var cachedLeafFiles: mutable.LinkedHashMap[Path, FileStatus] = _
@@ -88,7 +81,7 @@ class ListingFileCatalog(
*/
def listLeafFiles(paths: Seq[Path]): mutable.LinkedHashSet[FileStatus] = {
if (paths.length >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold) {
- HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession, ignoreFileNotFound)
+ HadoopFsRelation.listLeafFilesInParallel(paths, hadoopConf, sparkSession)
} else {
// Right now, the number of paths is less than the value of
// parallelPartitionDiscoveryThreshold. So, we will list file statues at the driver.
@@ -104,12 +97,7 @@ class ListingFileCatalog(
logTrace(s"Listing $path on driver")
val childStatuses = {
- val stats =
- try {
- fs.listStatus(path)
- } catch {
- case e: FileNotFoundException if ignoreFileNotFound => Array.empty[FileStatus]
- }
+ val stats = fs.listStatus(path)
if (pathFilter != null) stats.filter(f => pathFilter.accept(f.getPath)) else stats
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
index 7e40c35984..5cc5f32e6e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/fileSourceInterfaces.scala
@@ -440,8 +440,7 @@ object HadoopFsRelation extends Logging {
def listLeafFilesInParallel(
paths: Seq[Path],
hadoopConf: Configuration,
- sparkSession: SparkSession,
- ignoreFileNotFound: Boolean): mutable.LinkedHashSet[FileStatus] = {
+ sparkSession: SparkSession): mutable.LinkedHashSet[FileStatus] = {
assert(paths.size >= sparkSession.sessionState.conf.parallelPartitionDiscoveryThreshold)
logInfo(s"Listing leaf files and directories in parallel under: ${paths.mkString(", ")}")
@@ -462,11 +461,7 @@ object HadoopFsRelation extends Logging {
val pathFilter = FileInputFormat.getInputPathFilter(jobConf)
paths.map(new Path(_)).flatMap { path =>
val fs = path.getFileSystem(serializableConfiguration.value)
- try {
- listLeafFiles(fs, fs.getFileStatus(path), pathFilter)
- } catch {
- case e: java.io.FileNotFoundException if ignoreFileNotFound => Array.empty[FileStatus]
- }
+ listLeafFiles(fs, fs.getFileStatus(path), pathFilter)
}
}.map { status =>
val blockLocations = status match {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index 8aa81854b2..b221eed7b2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
import org.apache.spark.sql.catalyst.plans.logical.Range
import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.{IntegerType, StructType}
/**
@@ -305,6 +306,22 @@ class CatalogSuite
columnFields.foreach { f => assert(columnString.contains(f.toString)) }
}
+ test("createExternalTable should fail if path is not given for file-based data source") {
+ val e = intercept[AnalysisException] {
+ spark.catalog.createExternalTable("tbl", "json", Map.empty[String, String])
+ }
+ assert(e.message.contains("Unable to infer schema"))
+
+ val e2 = intercept[AnalysisException] {
+ spark.catalog.createExternalTable(
+ "tbl",
+ "json",
+ new StructType().add("i", IntegerType),
+ Map.empty[String, String])
+ }
+ assert(e2.message == "Cannot create a file-based external data source table without path")
+ }
+
// TODO: add tests for the rest of them
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index c48d4ed608..8410a2e4a4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -81,7 +81,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
options = table.storage.properties)
LogicalRelation(
- dataSource.resolveRelation(checkPathExist = true),
+ dataSource.resolveRelation(),
catalogTable = Some(table))
}
}