aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-09-01 16:45:22 +0800
committerWenchen Fan <wenchen@databricks.com>2016-09-01 16:45:22 +0800
commit8e740ae44d55570a3e7b6eae1f0239ac1319b986 (patch)
treea4055f80643f9411cfb14d7ab51179ac0b83e047 /sql/core
parent1f06a5b6a0584d0c9656f58eaf54e54e2383c82b (diff)
downloadspark-8e740ae44d55570a3e7b6eae1f0239ac1319b986.tar.gz
spark-8e740ae44d55570a3e7b6eae1f0239ac1319b986.tar.bz2
spark-8e740ae44d55570a3e7b6eae1f0239ac1319b986.zip
[SPARK-17257][SQL] the physical plan of CREATE TABLE or CTAS should take CatalogTable
## What changes were proposed in this pull request? This is kind of a follow-up of https://github.com/apache/spark/pull/14482 . As we put `CatalogTable` in the logical plan directly, it makes sense to let physical plans take `CatalogTable` directly, instead of extracting some fields of `CatalogTable` in planner and then construct a new `CatalogTable` in physical plan. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #14823 from cloud-fan/create-table.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala15
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala135
4 files changed, 75 insertions, 101 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index a9049a60f2..c05c7a6551 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
-import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, HadoopFsRelation}
+import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, CreateTable, DataSource, HadoopFsRelation}
import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils}
import org.apache.spark.sql.types.StructType
@@ -368,9 +368,15 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
throw new AnalysisException(s"Table $tableIdent already exists.")
case _ =>
+ val tableType = if (new CaseInsensitiveMap(extraOptions.toMap).contains("path")) {
+ CatalogTableType.EXTERNAL
+ } else {
+ CatalogTableType.MANAGED
+ }
+
val tableDesc = CatalogTable(
identifier = tableIdent,
- tableType = CatalogTableType.EXTERNAL,
+ tableType = tableType,
storage = CatalogStorageFormat.empty.copy(properties = extraOptions.toMap),
schema = new StructType,
provider = Some(source),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 656494d97d..8fc1a8595a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -325,14 +325,17 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
.getOrElse(Array.empty[String])
val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)
+ // TODO: this may be wrong for non file-based data source like JDBC, which should be external
+ // even there is no `path` in options. We should consider allow the EXTERNAL keyword.
+ val tableType = if (new CaseInsensitiveMap(options).contains("path")) {
+ CatalogTableType.EXTERNAL
+ } else {
+ CatalogTableType.MANAGED
+ }
+
val tableDesc = CatalogTable(
identifier = table,
- // TODO: actually the table type may be EXTERNAL if we have `path` in options. However, the
- // physical plan `CreateDataSourceTableCommand` doesn't take table type as parameter, but a
- // boolean flag called `managedIfNoPath`. We set the table type to MANAGED here to simulate
- // setting the `managedIfNoPath` flag. In the future we should refactor the physical plan and
- // make it take `CatalogTable` directly.
- tableType = CatalogTableType.MANAGED,
+ tableType = tableType,
storage = CatalogStorageFormat.empty.copy(properties = options),
schema = schema.getOrElse(new StructType),
provider = Some(provider),
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 4aaf454285..b4899ad688 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -424,15 +424,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case CreateTable(tableDesc, mode, None) =>
val cmd =
- CreateDataSourceTableCommand(
- tableDesc.identifier,
- if (tableDesc.schema.nonEmpty) Some(tableDesc.schema) else None,
- tableDesc.provider.get,
- tableDesc.storage.properties,
- tableDesc.partitionColumnNames.toArray,
- tableDesc.bucketSpec,
- ignoreIfExists = mode == SaveMode.Ignore,
- managedIfNoPath = tableDesc.tableType == CatalogTableType.MANAGED)
+ CreateDataSourceTableCommand(tableDesc, ignoreIfExists = mode == SaveMode.Ignore)
ExecutedCommandExec(cmd) :: Nil
// CREATE TABLE ... AS SELECT ... for hive serde table is handled in hive module, by rule
@@ -441,12 +433,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get != "hive" =>
val cmd =
CreateDataSourceTableAsSelectCommand(
- tableDesc.identifier,
- tableDesc.provider.get,
- tableDesc.partitionColumnNames.toArray,
- tableDesc.bucketSpec,
+ tableDesc,
mode,
- tableDesc.storage.properties,
query)
ExecutedCommandExec(cmd) :: Nil
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 7400a0e7bb..da3f6c600a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -40,71 +40,56 @@ import org.apache.spark.sql.types._
* USING format OPTIONS ([option1_name "option1_value", option2_name "option2_value", ...])
* }}}
*/
-case class CreateDataSourceTableCommand(
- tableIdent: TableIdentifier,
- userSpecifiedSchema: Option[StructType],
- provider: String,
- options: Map[String, String],
- userSpecifiedPartitionColumns: Array[String],
- bucketSpec: Option[BucketSpec],
- ignoreIfExists: Boolean,
- managedIfNoPath: Boolean)
+case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boolean)
extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
- val tableName = tableIdent.unquotedString
- val sessionState = sparkSession.sessionState
+ assert(table.tableType != CatalogTableType.VIEW)
+ assert(table.provider.isDefined)
- if (sessionState.catalog.tableExists(tableIdent)) {
+ val sessionState = sparkSession.sessionState
+ if (sessionState.catalog.tableExists(table.identifier)) {
if (ignoreIfExists) {
return Seq.empty[Row]
} else {
- throw new AnalysisException(s"Table $tableName already exists.")
+ throw new AnalysisException(s"Table ${table.identifier.unquotedString} already exists.")
}
}
- var isExternal = true
- val optionsWithPath =
- if (!new CaseInsensitiveMap(options).contains("path") && managedIfNoPath) {
- isExternal = false
- options + ("path" -> sessionState.catalog.defaultTablePath(tableIdent))
- } else {
- options
- }
+ val optionsWithPath = if (table.tableType == CatalogTableType.MANAGED) {
+ table.storage.properties + ("path" -> sessionState.catalog.defaultTablePath(table.identifier))
+ } else {
+ table.storage.properties
+ }
- // Create the relation to validate the arguments before writing the metadata to the metastore.
+ // Create the relation to validate the arguments before writing the metadata to the metastore,
+ // and infer the table schema and partition if users didn't specify schema in CREATE TABLE.
val dataSource: BaseRelation =
DataSource(
sparkSession = sparkSession,
- userSpecifiedSchema = userSpecifiedSchema,
- className = provider,
- bucketSpec = None,
+ userSpecifiedSchema = if (table.schema.isEmpty) None else Some(table.schema),
+ className = table.provider.get,
+ bucketSpec = table.bucketSpec,
options = optionsWithPath).resolveRelation(checkPathExist = false)
- val partitionColumns = if (userSpecifiedSchema.nonEmpty) {
- userSpecifiedPartitionColumns
+ val partitionColumnNames = if (table.schema.nonEmpty) {
+ table.partitionColumnNames
} else {
// This is guaranteed in `PreprocessDDL`.
- assert(userSpecifiedPartitionColumns.isEmpty)
+ assert(table.partitionColumnNames.isEmpty)
dataSource match {
- case r: HadoopFsRelation => r.partitionSchema.fieldNames
- case _ => Array.empty[String]
+ case r: HadoopFsRelation => r.partitionSchema.fieldNames.toSeq
+ case _ => Nil
}
}
- val table = CatalogTable(
- identifier = tableIdent,
- tableType = if (isExternal) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED,
- storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath),
+ val newTable = table.copy(
+ storage = table.storage.copy(properties = optionsWithPath),
schema = dataSource.schema,
- provider = Some(provider),
- partitionColumnNames = partitionColumns,
- bucketSpec = bucketSpec
- )
-
+ partitionColumnNames = partitionColumnNames)
// We will return Nil or throw exception at the beginning if the table already exists, so when
// we reach here, the table should not exist and we should set `ignoreIfExists` to false.
- sessionState.catalog.createTable(table, ignoreIfExists = false)
+ sessionState.catalog.createTable(newTable, ignoreIfExists = false)
Seq.empty[Row]
}
}
@@ -112,7 +97,7 @@ case class CreateDataSourceTableCommand(
/**
* A command used to create a data source table using the result of a query.
*
- * Note: This is different from [[CreateTableAsSelectLogicalPlan]]. Please check the syntax for
+ * Note: This is different from `CreateHiveTableAsSelectCommand`. Please check the syntax for
* difference. This is not intended for temporary tables.
*
* The syntax of using this command in SQL is:
@@ -123,32 +108,31 @@ case class CreateDataSourceTableCommand(
* }}}
*/
case class CreateDataSourceTableAsSelectCommand(
- tableIdent: TableIdentifier,
- provider: String,
- partitionColumns: Array[String],
- bucketSpec: Option[BucketSpec],
+ table: CatalogTable,
mode: SaveMode,
- options: Map[String, String],
query: LogicalPlan)
extends RunnableCommand {
override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)
override def run(sparkSession: SparkSession): Seq[Row] = {
- val tableName = tableIdent.unquotedString
+ assert(table.tableType != CatalogTableType.VIEW)
+ assert(table.provider.isDefined)
+ assert(table.schema.isEmpty)
+
+ val tableName = table.identifier.unquotedString
+ val provider = table.provider.get
val sessionState = sparkSession.sessionState
- var createMetastoreTable = false
- var isExternal = true
- val optionsWithPath =
- if (!new CaseInsensitiveMap(options).contains("path")) {
- isExternal = false
- options + ("path" -> sessionState.catalog.defaultTablePath(tableIdent))
- } else {
- options
- }
+ val optionsWithPath = if (table.tableType == CatalogTableType.MANAGED) {
+ table.storage.properties + ("path" -> sessionState.catalog.defaultTablePath(table.identifier))
+ } else {
+ table.storage.properties
+ }
+
+ var createMetastoreTable = false
var existingSchema = Option.empty[StructType]
- if (sparkSession.sessionState.catalog.tableExists(tableIdent)) {
+ if (sparkSession.sessionState.catalog.tableExists(table.identifier)) {
// Check if we need to throw an exception or just return.
mode match {
case SaveMode.ErrorIfExists =>
@@ -165,21 +149,21 @@ case class CreateDataSourceTableAsSelectCommand(
val dataSource = DataSource(
sparkSession = sparkSession,
userSpecifiedSchema = Some(query.schema.asNullable),
- partitionColumns = partitionColumns,
- bucketSpec = bucketSpec,
+ partitionColumns = table.partitionColumnNames,
+ bucketSpec = table.bucketSpec,
className = provider,
options = optionsWithPath)
// TODO: Check that options from the resolved relation match the relation that we are
// inserting into (i.e. using the same compression).
EliminateSubqueryAliases(
- sessionState.catalog.lookupRelation(tableIdent)) match {
+ sessionState.catalog.lookupRelation(table.identifier)) match {
case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
// check if the file formats match
l.relation match {
case r: HadoopFsRelation if r.fileFormat.getClass != dataSource.providingClass =>
throw new AnalysisException(
- s"The file format of the existing table $tableIdent is " +
+ s"The file format of the existing table $tableName is " +
s"`${r.fileFormat.getClass.getName}`. It doesn't match the specified " +
s"format `$provider`")
case _ =>
@@ -216,36 +200,29 @@ case class CreateDataSourceTableAsSelectCommand(
val dataSource = DataSource(
sparkSession,
className = provider,
- partitionColumns = partitionColumns,
- bucketSpec = bucketSpec,
+ partitionColumns = table.partitionColumnNames,
+ bucketSpec = table.bucketSpec,
options = optionsWithPath)
val result = try {
dataSource.write(mode, df)
} catch {
case ex: AnalysisException =>
- logError(s"Failed to write to table ${tableIdent.identifier} in $mode mode", ex)
+ logError(s"Failed to write to table $tableName in $mode mode", ex)
throw ex
}
if (createMetastoreTable) {
- // We will use the schema of resolved.relation as the schema of the table (instead of
- // the schema of df). It is important since the nullability may be changed by the relation
- // provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
- val schema = result.schema
- val table = CatalogTable(
- identifier = tableIdent,
- tableType = if (isExternal) CatalogTableType.EXTERNAL else CatalogTableType.MANAGED,
- storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath),
- schema = schema,
- provider = Some(provider),
- partitionColumnNames = partitionColumns,
- bucketSpec = bucketSpec
- )
- sessionState.catalog.createTable(table, ignoreIfExists = false)
+ val newTable = table.copy(
+ storage = table.storage.copy(properties = optionsWithPath),
+ // We will use the schema of resolved.relation as the schema of the table (instead of
+ // the schema of df). It is important since the nullability may be changed by the relation
+ // provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
+ schema = result.schema)
+ sessionState.catalog.createTable(newTable, ignoreIfExists = false)
}
// Refresh the cache of the table in the catalog.
- sessionState.catalog.refreshTable(tableIdent)
+ sessionState.catalog.refreshTable(table.identifier)
Seq.empty[Row]
}
}