aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-11-02 18:05:14 -0700
committerYin Huai <yhuai@databricks.com>2016-11-02 18:05:14 -0700
commit3a1bc6f4780f8384c1211b1335e7394a4a28377e (patch)
tree5d6e8f5d035d4a8c1078d93348087129d5582750 /sql/core/src/main
parentfd90541c35af2bccf0155467bec8cea7c8865046 (diff)
downloadspark-3a1bc6f4780f8384c1211b1335e7394a4a28377e.tar.gz
spark-3a1bc6f4780f8384c1211b1335e7394a4a28377e.tar.bz2
spark-3a1bc6f4780f8384c1211b1335e7394a4a28377e.zip
[SPARK-17470][SQL] unify path for data source table and locationUri for hive serde table
## What changes were proposed in this pull request? Due to a limitation of hive metastore(table location must be directory path, not file path), we always store `path` for data source table in storage properties, instead of the `locationUri` field. However, we should not expose this difference to `CatalogTable` level, but just treat it as a hack in `HiveExternalCatalog`, like we store table schema of data source table in table properties. This PR unifies `path` and `locationUri` outside of `HiveExternalCatalog`, both data source table and hive serde table should use the `locationUri` field. This PR also unifies the way we handle default table location for managed table. Previously, the default table location of hive serde managed table is set by external catalog, but the one of data source table is set by command. After this PR, we follow the hive way and the default table location is always set by external catalog. For managed non-file-based tables, we will assign a default table location and create an empty directory for it, the table location will be removed when the table is dropped. This is reasonable as metastore doesn't care about whether a table is file-based or not, and an empty table directory has no harm. For external non-file-based tables, ideally we can omit the table location, but due to a hive metastore issue, we will assign a random location to it, and remove it right after the table is created. See SPARK-15269 for more details. This is fine as it's well isolated in `HiveExternalCatalog`. To keep the existing behaviour of the `path` option, in this PR we always add the `locationUri` to storage properties using key `path`, before passing storage properties to `DataSource` as data source options. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #15024 from cloud-fan/path.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala17
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala37
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala23
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala50
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala241
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala4
8 files changed, 170 insertions, 210 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 700f4835ac..f95362e292 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -373,7 +373,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
throw new AnalysisException(s"Table $tableIdent already exists.")
case _ =>
- val tableType = if (new CaseInsensitiveMap(extraOptions.toMap).contains("path")) {
+ val storage = DataSource.buildStorageFormatFromOptions(extraOptions.toMap)
+ val tableType = if (storage.locationUri.isDefined) {
CatalogTableType.EXTERNAL
} else {
CatalogTableType.MANAGED
@@ -382,7 +383,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
val tableDesc = CatalogTable(
identifier = tableIdent,
tableType = tableType,
- storage = CatalogStorageFormat.empty.copy(properties = extraOptions.toMap),
+ storage = storage,
schema = new StructType,
provider = Some(source),
partitionColumnNames = partitioningColumns.getOrElse(Nil),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index fe183d0097..634ffde354 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -343,7 +343,8 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
// TODO: this may be wrong for non file-based data source like JDBC, which should be external
// even there is no `path` in options. We should consider allow the EXTERNAL keyword.
- val tableType = if (new CaseInsensitiveMap(options).contains("path")) {
+ val storage = DataSource.buildStorageFormatFromOptions(options)
+ val tableType = if (storage.locationUri.isDefined) {
CatalogTableType.EXTERNAL
} else {
CatalogTableType.MANAGED
@@ -352,7 +353,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
val tableDesc = CatalogTable(
identifier = table,
tableType = tableType,
- storage = CatalogStorageFormat.empty.copy(properties = options),
+ storage = storage,
schema = schema.getOrElse(new StructType),
provider = Some(provider),
partitionColumnNames = partitionColumnNames,
@@ -1062,17 +1063,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
if (conf.convertCTAS && !hasStorageProperties) {
// At here, both rowStorage.serdeProperties and fileStorage.serdeProperties
// are empty Maps.
- val optionsWithPath = if (location.isDefined) {
- Map("path" -> location.get)
- } else {
- Map.empty[String, String]
- }
-
val newTableDesc = tableDesc.copy(
- storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath),
- provider = Some(conf.defaultDataSourceName)
- )
-
+ storage = CatalogStorageFormat.empty.copy(locationUri = location),
+ provider = Some(conf.defaultDataSourceName))
CreateTable(newTableDesc, mode, Some(q))
} else {
CreateTable(tableDesc, mode, Some(q))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 2a9743130d..d4b28274cc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -57,13 +57,14 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
// Create the relation to validate the arguments before writing the metadata to the metastore,
// and infer the table schema and partition if users didn't specify schema in CREATE TABLE.
+ val pathOption = table.storage.locationUri.map("path" -> _)
val dataSource: BaseRelation =
DataSource(
sparkSession = sparkSession,
userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
className = table.provider.get,
bucketSpec = table.bucketSpec,
- options = table.storage.properties).resolveRelation()
+ options = table.storage.properties ++ pathOption).resolveRelation()
dataSource match {
case fs: HadoopFsRelation =>
@@ -85,14 +86,7 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
}
}
- val optionsWithPath = if (table.tableType == CatalogTableType.MANAGED) {
- table.storage.properties + ("path" -> sessionState.catalog.defaultTablePath(table.identifier))
- } else {
- table.storage.properties
- }
-
val newTable = table.copy(
- storage = table.storage.copy(properties = optionsWithPath),
schema = dataSource.schema,
partitionColumnNames = partitionColumnNames,
// If metastore partition management for file source tables is enabled, we start off with
@@ -140,12 +134,6 @@ case class CreateDataSourceTableAsSelectCommand(
val tableIdentWithDB = table.identifier.copy(database = Some(db))
val tableName = tableIdentWithDB.unquotedString
- val optionsWithPath = if (table.tableType == CatalogTableType.MANAGED) {
- table.storage.properties + ("path" -> sessionState.catalog.defaultTablePath(table.identifier))
- } else {
- table.storage.properties
- }
-
var createMetastoreTable = false
var existingSchema = Option.empty[StructType]
if (sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) {
@@ -162,13 +150,7 @@ case class CreateDataSourceTableAsSelectCommand(
return Seq.empty[Row]
case SaveMode.Append =>
// Check if the specified data source match the data source of the existing table.
- val dataSource = DataSource(
- sparkSession = sparkSession,
- userSpecifiedSchema = Some(query.schema.asNullable),
- partitionColumns = table.partitionColumnNames,
- bucketSpec = table.bucketSpec,
- className = provider,
- options = optionsWithPath)
+ val existingProvider = DataSource.lookupDataSource(provider)
// TODO: Check that options from the resolved relation match the relation that we are
// inserting into (i.e. using the same compression).
@@ -178,7 +160,7 @@ case class CreateDataSourceTableAsSelectCommand(
case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
// check if the file formats match
l.relation match {
- case r: HadoopFsRelation if r.fileFormat.getClass != dataSource.providingClass =>
+ case r: HadoopFsRelation if r.fileFormat.getClass != existingProvider =>
throw new AnalysisException(
s"The file format of the existing table $tableName is " +
s"`${r.fileFormat.getClass.getName}`. It doesn't match the specified " +
@@ -213,13 +195,20 @@ case class CreateDataSourceTableAsSelectCommand(
case None => data
}
+ val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
+ Some(sessionState.catalog.defaultTablePath(table.identifier))
+ } else {
+ table.storage.locationUri
+ }
+
// Create the relation based on the data of df.
+ val pathOption = tableLocation.map("path" -> _)
val dataSource = DataSource(
sparkSession,
className = provider,
partitionColumns = table.partitionColumnNames,
bucketSpec = table.bucketSpec,
- options = optionsWithPath)
+ options = table.storage.properties ++ pathOption)
val result = try {
dataSource.write(mode, df)
@@ -230,7 +219,7 @@ case class CreateDataSourceTableAsSelectCommand(
}
if (createMetastoreTable) {
val newTable = table.copy(
- storage = table.storage.copy(properties = optionsWithPath),
+ storage = table.storage.copy(locationUri = tableLocation),
// We will use the schema of resolved.relation as the schema of the table (instead of
// the schema of df). It is important since the nullability may be changed by the relation
// provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index 61e0550cef..52af915b0b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -485,14 +485,6 @@ case class AlterTableRecoverPartitionsCommand(
}
}
- private def getBasePath(table: CatalogTable): Option[String] = {
- if (table.provider == Some("hive")) {
- table.storage.locationUri
- } else {
- new CaseInsensitiveMap(table.storage.properties).get("path")
- }
- }
-
override def run(spark: SparkSession): Seq[Row] = {
val catalog = spark.sessionState.catalog
val table = catalog.getTableMetadata(tableName)
@@ -503,13 +495,12 @@ case class AlterTableRecoverPartitionsCommand(
s"Operation not allowed: $cmd only works on partitioned tables: $tableIdentWithDB")
}
- val tablePath = getBasePath(table)
- if (tablePath.isEmpty) {
+ if (table.storage.locationUri.isEmpty) {
throw new AnalysisException(s"Operation not allowed: $cmd only works on table with " +
s"location provided: $tableIdentWithDB")
}
- val root = new Path(tablePath.get)
+ val root = new Path(table.storage.locationUri.get)
logInfo(s"Recover all the partitions in $root")
val fs = root.getFileSystem(spark.sparkContext.hadoopConfiguration)
@@ -688,15 +679,7 @@ case class AlterTableSetLocationCommand(
catalog.alterPartitions(table.identifier, Seq(newPart))
case None =>
// No partition spec is specified, so we set the location for the table itself
- val newTable =
- if (DDLUtils.isDatasourceTable(table)) {
- table.withNewStorage(
- locationUri = Some(location),
- properties = table.storage.properties ++ Map("path" -> location))
- } else {
- table.withNewStorage(locationUri = Some(location))
- }
- catalog.alterTable(newTable)
+ catalog.alterTable(table.withNewStorage(locationUri = Some(location)))
}
Seq.empty[Row]
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 4acfffb628..f32c956f59 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
import org.apache.spark.sql.execution.datasources.PartitioningUtils
+import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -62,25 +63,6 @@ case class CreateTableLikeCommand(
val catalog = sparkSession.sessionState.catalog
val sourceTableDesc = catalog.getTempViewOrPermanentTableMetadata(sourceTable)
- // Storage format
- val newStorage =
- if (sourceTableDesc.tableType == CatalogTableType.VIEW) {
- val newPath = catalog.defaultTablePath(targetTable)
- CatalogStorageFormat.empty.copy(properties = Map("path" -> newPath))
- } else if (DDLUtils.isDatasourceTable(sourceTableDesc)) {
- val newPath = catalog.defaultTablePath(targetTable)
- val newSerdeProp =
- sourceTableDesc.storage.properties.filterKeys(_.toLowerCase != "path") ++
- Map("path" -> newPath)
- sourceTableDesc.storage.copy(
- locationUri = None,
- properties = newSerdeProp)
- } else {
- sourceTableDesc.storage.copy(
- locationUri = None,
- properties = sourceTableDesc.storage.properties)
- }
-
val newProvider = if (sourceTableDesc.tableType == CatalogTableType.VIEW) {
Some(sparkSession.sessionState.conf.defaultDataSourceName)
} else {
@@ -91,7 +73,8 @@ case class CreateTableLikeCommand(
CatalogTable(
identifier = targetTable,
tableType = CatalogTableType.MANAGED,
- storage = newStorage,
+ // We are creating a new managed table, which should not have custom table location.
+ storage = sourceTableDesc.storage.copy(locationUri = None),
schema = sourceTableDesc.schema,
provider = newProvider,
partitionColumnNames = sourceTableDesc.partitionColumnNames,
@@ -170,13 +153,6 @@ case class AlterTableRenameCommand(
case NonFatal(e) => log.warn(e.toString, e)
}
}
- // For datasource tables, we also need to update the "path" serde property
- if (DDLUtils.isDatasourceTable(table) && table.tableType == CatalogTableType.MANAGED) {
- val newPath = catalog.defaultTablePath(newName)
- val newTable = table.withNewStorage(
- properties = table.storage.properties ++ Map("path" -> newPath))
- catalog.alterTable(newTable)
- }
// Invalidate the table last, otherwise uncaching the table would load the logical plan
// back into the hive metastore cache
catalog.refreshTable(oldName)
@@ -367,8 +343,9 @@ case class TruncateTableCommand(
DDLUtils.verifyPartitionProviderIsHive(spark, table, "TRUNCATE TABLE ... PARTITION")
}
val locations =
- if (DDLUtils.isDatasourceTable(table)) {
- Seq(table.storage.properties.get("path"))
+ // TODO: The `InMemoryCatalog` doesn't support listPartition with partial partition spec.
+ if (spark.conf.get(CATALOG_IMPLEMENTATION) == "in-memory") {
+ Seq(table.storage.locationUri)
} else if (table.partitionColumnNames.isEmpty) {
Seq(table.storage.locationUri)
} else {
@@ -916,17 +893,18 @@ case class ShowCreateTableCommand(table: TableIdentifier) extends RunnableComman
}
private def showDataSourceTableOptions(metadata: CatalogTable, builder: StringBuilder): Unit = {
- val props = metadata.properties
-
builder ++= s"USING ${metadata.provider.get}\n"
- val dataSourceOptions = metadata.storage.properties.filterNot {
- case (key, value) =>
+ val dataSourceOptions = metadata.storage.properties.map {
+ case (key, value) => s"${quoteIdentifier(key)} '${escapeSingleQuotedString(value)}'"
+ } ++ metadata.storage.locationUri.flatMap { location =>
+ if (metadata.tableType == MANAGED) {
// If it's a managed table, omit PATH option. Spark SQL always creates external table
// when the table creation DDL contains the PATH option.
- key.toLowerCase == "path" && metadata.tableType == MANAGED
- }.map {
- case (key, value) => s"${quoteIdentifier(key)} '${escapeSingleQuotedString(value)}'"
+ None
+ } else {
+ Some(s"path '${escapeSingleQuotedString(location)}'")
+ }
}
if (dataSourceOptions.nonEmpty) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 3f956c4276..0b50448a7a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
@@ -78,115 +78,9 @@ case class DataSource(
case class SourceInfo(name: String, schema: StructType, partitionColumns: Seq[String])
- lazy val providingClass: Class[_] = lookupDataSource(className)
+ lazy val providingClass: Class[_] = DataSource.lookupDataSource(className)
lazy val sourceInfo = sourceSchema()
- /** A map to maintain backward compatibility in case we move data sources around. */
- private val backwardCompatibilityMap: Map[String, String] = {
- val jdbc = classOf[JdbcRelationProvider].getCanonicalName
- val json = classOf[JsonFileFormat].getCanonicalName
- val parquet = classOf[ParquetFileFormat].getCanonicalName
- val csv = classOf[CSVFileFormat].getCanonicalName
- val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat"
- val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat"
-
- Map(
- "org.apache.spark.sql.jdbc" -> jdbc,
- "org.apache.spark.sql.jdbc.DefaultSource" -> jdbc,
- "org.apache.spark.sql.execution.datasources.jdbc.DefaultSource" -> jdbc,
- "org.apache.spark.sql.execution.datasources.jdbc" -> jdbc,
- "org.apache.spark.sql.json" -> json,
- "org.apache.spark.sql.json.DefaultSource" -> json,
- "org.apache.spark.sql.execution.datasources.json" -> json,
- "org.apache.spark.sql.execution.datasources.json.DefaultSource" -> json,
- "org.apache.spark.sql.parquet" -> parquet,
- "org.apache.spark.sql.parquet.DefaultSource" -> parquet,
- "org.apache.spark.sql.execution.datasources.parquet" -> parquet,
- "org.apache.spark.sql.execution.datasources.parquet.DefaultSource" -> parquet,
- "org.apache.spark.sql.hive.orc.DefaultSource" -> orc,
- "org.apache.spark.sql.hive.orc" -> orc,
- "org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
- "org.apache.spark.ml.source.libsvm" -> libsvm,
- "com.databricks.spark.csv" -> csv
- )
- }
-
- /**
- * Class that were removed in Spark 2.0. Used to detect incompatibility libraries for Spark 2.0.
- */
- private val spark2RemovedClasses = Set(
- "org.apache.spark.sql.DataFrame",
- "org.apache.spark.sql.sources.HadoopFsRelationProvider",
- "org.apache.spark.Logging")
-
- /** Given a provider name, look up the data source class definition. */
- private def lookupDataSource(provider0: String): Class[_] = {
- val provider = backwardCompatibilityMap.getOrElse(provider0, provider0)
- val provider2 = s"$provider.DefaultSource"
- val loader = Utils.getContextOrSparkClassLoader
- val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
-
- try {
- serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider)).toList match {
- // the provider format did not match any given registered aliases
- case Nil =>
- try {
- Try(loader.loadClass(provider)).orElse(Try(loader.loadClass(provider2))) match {
- case Success(dataSource) =>
- // Found the data source using fully qualified path
- dataSource
- case Failure(error) =>
- if (provider.toLowerCase == "orc" ||
- provider.startsWith("org.apache.spark.sql.hive.orc")) {
- throw new AnalysisException(
- "The ORC data source must be used with Hive support enabled")
- } else if (provider.toLowerCase == "avro" ||
- provider == "com.databricks.spark.avro") {
- throw new AnalysisException(
- s"Failed to find data source: ${provider.toLowerCase}. Please find an Avro " +
- "package at " +
- "https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects")
- } else {
- throw new ClassNotFoundException(
- s"Failed to find data source: $provider. Please find packages at " +
- "https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects",
- error)
- }
- }
- } catch {
- case e: NoClassDefFoundError => // This one won't be caught by Scala NonFatal
- // NoClassDefFoundError's class name uses "/" rather than "." for packages
- val className = e.getMessage.replaceAll("/", ".")
- if (spark2RemovedClasses.contains(className)) {
- throw new ClassNotFoundException(s"$className was removed in Spark 2.0. " +
- "Please check if your library is compatible with Spark 2.0", e)
- } else {
- throw e
- }
- }
- case head :: Nil =>
- // there is exactly one registered alias
- head.getClass
- case sources =>
- // There are multiple registered aliases for the input
- sys.error(s"Multiple sources found for $provider " +
- s"(${sources.map(_.getClass.getName).mkString(", ")}), " +
- "please specify the fully qualified class name.")
- }
- } catch {
- case e: ServiceConfigurationError if e.getCause.isInstanceOf[NoClassDefFoundError] =>
- // NoClassDefFoundError's class name uses "/" rather than "." for packages
- val className = e.getCause.getMessage.replaceAll("/", ".")
- if (spark2RemovedClasses.contains(className)) {
- throw new ClassNotFoundException(s"Detected an incompatible DataSourceRegister. " +
- "Please remove the incompatible library from classpath or upgrade it. " +
- s"Error: ${e.getMessage}", e)
- } else {
- throw e
- }
- }
- }
-
/**
* Infer the schema of the given FileFormat, returns a pair of schema and partition column names.
*/
@@ -470,13 +364,14 @@ case class DataSource(
// 1. Only one output path can be specified on the write path;
// 2. Output path must be a legal HDFS style file system path;
// 3. It's OK that the output path doesn't exist yet;
- val caseInsensitiveOptions = new CaseInsensitiveMap(options)
- val outputPath = {
- val path = new Path(caseInsensitiveOptions.getOrElse("path", {
- throw new IllegalArgumentException("'path' is not specified")
- }))
+ val allPaths = paths ++ new CaseInsensitiveMap(options).get("path")
+ val outputPath = if (allPaths.length == 1) {
+ val path = new Path(allPaths.head)
val fs = path.getFileSystem(sparkSession.sessionState.newHadoopConf())
path.makeQualified(fs.getUri, fs.getWorkingDirectory)
+ } else {
+ throw new IllegalArgumentException("Expected exactly one path to be specified, but " +
+ s"got: ${allPaths.mkString(", ")}")
}
val caseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis
@@ -539,3 +434,123 @@ case class DataSource(
}
}
}
+
+object DataSource {
+
+ /** A map to maintain backward compatibility in case we move data sources around. */
+ private val backwardCompatibilityMap: Map[String, String] = {
+ val jdbc = classOf[JdbcRelationProvider].getCanonicalName
+ val json = classOf[JsonFileFormat].getCanonicalName
+ val parquet = classOf[ParquetFileFormat].getCanonicalName
+ val csv = classOf[CSVFileFormat].getCanonicalName
+ val libsvm = "org.apache.spark.ml.source.libsvm.LibSVMFileFormat"
+ val orc = "org.apache.spark.sql.hive.orc.OrcFileFormat"
+
+ Map(
+ "org.apache.spark.sql.jdbc" -> jdbc,
+ "org.apache.spark.sql.jdbc.DefaultSource" -> jdbc,
+ "org.apache.spark.sql.execution.datasources.jdbc.DefaultSource" -> jdbc,
+ "org.apache.spark.sql.execution.datasources.jdbc" -> jdbc,
+ "org.apache.spark.sql.json" -> json,
+ "org.apache.spark.sql.json.DefaultSource" -> json,
+ "org.apache.spark.sql.execution.datasources.json" -> json,
+ "org.apache.spark.sql.execution.datasources.json.DefaultSource" -> json,
+ "org.apache.spark.sql.parquet" -> parquet,
+ "org.apache.spark.sql.parquet.DefaultSource" -> parquet,
+ "org.apache.spark.sql.execution.datasources.parquet" -> parquet,
+ "org.apache.spark.sql.execution.datasources.parquet.DefaultSource" -> parquet,
+ "org.apache.spark.sql.hive.orc.DefaultSource" -> orc,
+ "org.apache.spark.sql.hive.orc" -> orc,
+ "org.apache.spark.ml.source.libsvm.DefaultSource" -> libsvm,
+ "org.apache.spark.ml.source.libsvm" -> libsvm,
+ "com.databricks.spark.csv" -> csv
+ )
+ }
+
+ /**
+ * Class that were removed in Spark 2.0. Used to detect incompatibility libraries for Spark 2.0.
+ */
+ private val spark2RemovedClasses = Set(
+ "org.apache.spark.sql.DataFrame",
+ "org.apache.spark.sql.sources.HadoopFsRelationProvider",
+ "org.apache.spark.Logging")
+
+ /** Given a provider name, look up the data source class definition. */
+ def lookupDataSource(provider: String): Class[_] = {
+ val provider1 = backwardCompatibilityMap.getOrElse(provider, provider)
+ val provider2 = s"$provider1.DefaultSource"
+ val loader = Utils.getContextOrSparkClassLoader
+ val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
+
+ try {
+ serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider1)).toList match {
+ // the provider format did not match any given registered aliases
+ case Nil =>
+ try {
+ Try(loader.loadClass(provider1)).orElse(Try(loader.loadClass(provider2))) match {
+ case Success(dataSource) =>
+ // Found the data source using fully qualified path
+ dataSource
+ case Failure(error) =>
+ if (provider1.toLowerCase == "orc" ||
+ provider1.startsWith("org.apache.spark.sql.hive.orc")) {
+ throw new AnalysisException(
+ "The ORC data source must be used with Hive support enabled")
+ } else if (provider1.toLowerCase == "avro" ||
+ provider1 == "com.databricks.spark.avro") {
+ throw new AnalysisException(
+ s"Failed to find data source: ${provider1.toLowerCase}. Please find an Avro " +
+ "package at " +
+ "https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects")
+ } else {
+ throw new ClassNotFoundException(
+ s"Failed to find data source: $provider1. Please find packages at " +
+ "https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects",
+ error)
+ }
+ }
+ } catch {
+ case e: NoClassDefFoundError => // This one won't be caught by Scala NonFatal
+ // NoClassDefFoundError's class name uses "/" rather than "." for packages
+ val className = e.getMessage.replaceAll("/", ".")
+ if (spark2RemovedClasses.contains(className)) {
+ throw new ClassNotFoundException(s"$className was removed in Spark 2.0. " +
+ "Please check if your library is compatible with Spark 2.0", e)
+ } else {
+ throw e
+ }
+ }
+ case head :: Nil =>
+ // there is exactly one registered alias
+ head.getClass
+ case sources =>
+ // There are multiple registered aliases for the input
+ sys.error(s"Multiple sources found for $provider1 " +
+ s"(${sources.map(_.getClass.getName).mkString(", ")}), " +
+ "please specify the fully qualified class name.")
+ }
+ } catch {
+ case e: ServiceConfigurationError if e.getCause.isInstanceOf[NoClassDefFoundError] =>
+ // NoClassDefFoundError's class name uses "/" rather than "." for packages
+ val className = e.getCause.getMessage.replaceAll("/", ".")
+ if (spark2RemovedClasses.contains(className)) {
+ throw new ClassNotFoundException(s"Detected an incompatible DataSourceRegister. " +
+ "Please remove the incompatible library from classpath or upgrade it. " +
+ s"Error: ${e.getMessage}", e)
+ } else {
+ throw e
+ }
+ }
+ }
+
+ /**
+ * When creating a data source table, the `path` option has a special meaning: the table location.
+ * This method extracts the `path` option and treat it as table location to build a
+ * [[CatalogStorageFormat]]. Note that, the `path` option is removed from options after this.
+ */
+ def buildStorageFormatFromOptions(options: Map[String, String]): CatalogStorageFormat = {
+ val path = new CaseInsensitiveMap(options).get("path")
+ val optionsWithoutPath = options.filterKeys(_.toLowerCase != "path")
+ CatalogStorageFormat.empty.copy(locationUri = path, properties = optionsWithoutPath)
+ }
+}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 47c1f9d3fa..e87998fe4a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -237,6 +237,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
sparkSession: SparkSession,
simpleCatalogRelation: SimpleCatalogRelation): LogicalPlan = {
val table = simpleCatalogRelation.catalogTable
+ val pathOption = table.storage.locationUri.map("path" -> _)
val dataSource =
DataSource(
sparkSession,
@@ -244,7 +245,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
partitionColumns = table.partitionColumnNames,
bucketSpec = table.bucketSpec,
className = table.provider.get,
- options = table.storage.properties)
+ options = table.storage.properties ++ pathOption)
LogicalRelation(
dataSource.resolveRelation(),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 44fd38dfb9..d3e323cb12 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdenti
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
-import org.apache.spark.sql.execution.datasources.CreateTable
+import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource}
import org.apache.spark.sql.types.StructType
@@ -354,7 +354,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
val tableDesc = CatalogTable(
identifier = tableIdent,
tableType = CatalogTableType.EXTERNAL,
- storage = CatalogStorageFormat.empty.copy(properties = options),
+ storage = DataSource.buildStorageFormatFromOptions(options),
schema = schema,
provider = Some(source)
)