From 18ee55dd5de0597d7fb69e8e16ac3744356a6918 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 17 Jan 2017 12:54:50 +0800 Subject: [SPARK-19148][SQL] do not expose the external table concept in Catalog ## What changes were proposed in this pull request? In https://github.com/apache/spark/pull/16296 , we reached a consensus that we should hide the external/managed table concept to users and only expose custom table path. This PR renames `Catalog.createExternalTable` to `createTable`(still keep the old versions for backward compatibility), and only set the table type to EXTERNAL if `path` is specified in options. ## How was this patch tested? new tests in `CatalogSuite` Author: Wenchen Fan Closes #16528 from cloud-fan/create-table. --- .../org/apache/spark/sql/catalog/Catalog.scala | 129 +++++++++++++++++---- .../execution/command/createDataSourceTables.scala | 9 -- .../apache/spark/sql/internal/CatalogImpl.scala | 78 ++++--------- .../apache/spark/sql/internal/CatalogSuite.scala | 66 ++++++++--- 4 files changed, 183 insertions(+), 99 deletions(-) (limited to 'sql/core') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala index 6b061f8ab2..41e781ed18 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalog +import scala.collection.JavaConverters._ + import org.apache.spark.annotation.{Experimental, InterfaceStability} import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset} import org.apache.spark.sql.types.StructType @@ -187,82 +189,169 @@ abstract class Catalog { def functionExists(dbName: String, functionName: String): Boolean /** - * :: Experimental :: - * Creates an external table from the given path and returns the corresponding DataFrame. + * Creates a table from the given path and returns the corresponding DataFrame. * It will use the default data source configured by spark.sql.sources.default. * * @since 2.0.0 */ + @deprecated("use createTable instead.", "2.2.0") + def createExternalTable(tableName: String, path: String): DataFrame = { + createTable(tableName, path) + } + + /** + * :: Experimental :: + * Creates a table from the given path and returns the corresponding DataFrame. + * It will use the default data source configured by spark.sql.sources.default. + * + * @since 2.2.0 + */ @Experimental @InterfaceStability.Evolving - def createExternalTable(tableName: String, path: String): DataFrame + def createTable(tableName: String, path: String): DataFrame /** - * :: Experimental :: - * Creates an external table from the given path based on a data source - * and returns the corresponding DataFrame. + * Creates a table from the given path based on a data source and returns the corresponding + * DataFrame. * * @since 2.0.0 */ + @deprecated("use createTable instead.", "2.2.0") + def createExternalTable(tableName: String, path: String, source: String): DataFrame = { + createTable(tableName, path, source) + } + + /** + * :: Experimental :: + * Creates a table from the given path based on a data source and returns the corresponding + * DataFrame. + * + * @since 2.2.0 + */ @Experimental @InterfaceStability.Evolving - def createExternalTable(tableName: String, path: String, source: String): DataFrame + def createTable(tableName: String, path: String, source: String): DataFrame /** - * :: Experimental :: - * Creates an external table from the given path based on a data source and a set of options. + * Creates a table from the given path based on a data source and a set of options. * Then, returns the corresponding DataFrame. * * @since 2.0.0 */ + @deprecated("use createTable instead.", "2.2.0") + def createExternalTable( + tableName: String, + source: String, + options: java.util.Map[String, String]): DataFrame = { + createTable(tableName, source, options) + } + + /** + * :: Experimental :: + * Creates a table from the given path based on a data source and a set of options. + * Then, returns the corresponding DataFrame. + * + * @since 2.2.0 + */ @Experimental @InterfaceStability.Evolving + def createTable( + tableName: String, + source: String, + options: java.util.Map[String, String]): DataFrame = { + createTable(tableName, source, options.asScala.toMap) + } + + /** + * (Scala-specific) + * Creates a table from the given path based on a data source and a set of options. + * Then, returns the corresponding DataFrame. + * + * @since 2.0.0 + */ + @deprecated("use createTable instead.", "2.2.0") def createExternalTable( tableName: String, source: String, - options: java.util.Map[String, String]): DataFrame + options: Map[String, String]): DataFrame = { + createTable(tableName, source, options) + } /** * :: Experimental :: * (Scala-specific) - * Creates an external table from the given path based on a data source and a set of options. + * Creates a table from the given path based on a data source and a set of options. * Then, returns the corresponding DataFrame. * - * @since 2.0.0 + * @since 2.2.0 */ @Experimental @InterfaceStability.Evolving - def createExternalTable( + def createTable( tableName: String, source: String, options: Map[String, String]): DataFrame /** * :: Experimental :: - * Create an external table from the given path based on a data source, a schema and - * a set of options. Then, returns the corresponding DataFrame. + * Create a table from the given path based on a data source, a schema and a set of options. + * Then, returns the corresponding DataFrame. * * @since 2.0.0 */ + @deprecated("use createTable instead.", "2.2.0") + def createExternalTable( + tableName: String, + source: String, + schema: StructType, + options: java.util.Map[String, String]): DataFrame = { + createTable(tableName, source, schema, options) + } + + /** + * :: Experimental :: + * Create a table from the given path based on a data source, a schema and a set of options. + * Then, returns the corresponding DataFrame. + * + * @since 2.2.0 + */ @Experimental @InterfaceStability.Evolving + def createTable( + tableName: String, + source: String, + schema: StructType, + options: java.util.Map[String, String]): DataFrame = { + createTable(tableName, source, schema, options.asScala.toMap) + } + + /** + * (Scala-specific) + * Create a table from the given path based on a data source, a schema and a set of options. + * Then, returns the corresponding DataFrame. + * + * @since 2.0.0 + */ + @deprecated("use createTable instead.", "2.2.0") def createExternalTable( tableName: String, source: String, schema: StructType, - options: java.util.Map[String, String]): DataFrame + options: Map[String, String]): DataFrame = { + createTable(tableName, source, schema, options) + } /** * :: Experimental :: * (Scala-specific) - * Create an external table from the given path based on a data source, a schema and - * a set of options. Then, returns the corresponding DataFrame. + * Create a table from the given path based on a data source, a schema and a set of options. + * Then, returns the corresponding DataFrame. * - * @since 2.0.0 + * @since 2.2.0 */ @Experimental @InterfaceStability.Evolving - def createExternalTable( + def createTable( tableName: String, source: String, schema: StructType, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala index 90aeebd932..beeba05554 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala @@ -71,15 +71,6 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo options = table.storage.properties ++ pathOption, catalogTable = Some(tableWithDefaultOptions)).resolveRelation() - dataSource match { - case fs: HadoopFsRelation => - if (table.tableType == CatalogTableType.EXTERNAL && fs.location.rootPaths.isEmpty) { - throw new AnalysisException( - "Cannot create a file-based external data source table without path") - } - case _ => - } - val partitionColumnNames = if (table.schema.nonEmpty) { table.partitionColumnNames } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 8244b2152c..9136a83bc2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.internal -import scala.collection.JavaConverters._ import scala.reflect.runtime.universe.TypeTag import org.apache.spark.annotation.Experimental @@ -257,101 +256,74 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { /** * :: Experimental :: - * Creates an external table from the given path and returns the corresponding DataFrame. + * Creates a table from the given path and returns the corresponding DataFrame. * It will use the default data source configured by spark.sql.sources.default. * * @group ddl_ops - * @since 2.0.0 + * @since 2.2.0 */ @Experimental - override def createExternalTable(tableName: String, path: String): DataFrame = { + override def createTable(tableName: String, path: String): DataFrame = { val dataSourceName = sparkSession.sessionState.conf.defaultDataSourceName - createExternalTable(tableName, path, dataSourceName) + createTable(tableName, path, dataSourceName) } /** * :: Experimental :: - * Creates an external table from the given path based on a data source - * and returns the corresponding DataFrame. + * Creates a table from the given path based on a data source and returns the corresponding + * DataFrame. * * @group ddl_ops - * @since 2.0.0 + * @since 2.2.0 */ @Experimental - override def createExternalTable(tableName: String, path: String, source: String): DataFrame = { - createExternalTable(tableName, source, Map("path" -> path)) - } - - /** - * :: Experimental :: - * Creates an external table from the given path based on a data source and a set of options. - * Then, returns the corresponding DataFrame. - * - * @group ddl_ops - * @since 2.0.0 - */ - @Experimental - override def createExternalTable( - tableName: String, - source: String, - options: java.util.Map[String, String]): DataFrame = { - createExternalTable(tableName, source, options.asScala.toMap) + override def createTable(tableName: String, path: String, source: String): DataFrame = { + createTable(tableName, source, Map("path" -> path)) } /** * :: Experimental :: * (Scala-specific) - * Creates an external table from the given path based on a data source and a set of options. + * Creates a table from the given path based on a data source and a set of options. * Then, returns the corresponding DataFrame. * * @group ddl_ops - * @since 2.0.0 + * @since 2.2.0 */ @Experimental - override def createExternalTable( + override def createTable( tableName: String, source: String, options: Map[String, String]): DataFrame = { - createExternalTable(tableName, source, new StructType, options) - } - - /** - * :: Experimental :: - * Create an external table from the given path based on a data source, a schema and - * a set of options. Then, returns the corresponding DataFrame. - * - * @group ddl_ops - * @since 2.0.0 - */ - @Experimental - override def createExternalTable( - tableName: String, - source: String, - schema: StructType, - options: java.util.Map[String, String]): DataFrame = { - createExternalTable(tableName, source, schema, options.asScala.toMap) + createTable(tableName, source, new StructType, options) } /** * :: Experimental :: * (Scala-specific) - * Create an external table from the given path based on a data source, a schema and - * a set of options. Then, returns the corresponding DataFrame. + * Create a table from the given path based on a data source, a schema and a set of options. + * Then, returns the corresponding DataFrame. * * @group ddl_ops - * @since 2.0.0 + * @since 2.2.0 */ @Experimental - override def createExternalTable( + override def createTable( tableName: String, source: String, schema: StructType, options: Map[String, String]): DataFrame = { val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) + val storage = DataSource.buildStorageFormatFromOptions(options) + val tableType = if (storage.locationUri.isDefined) { + CatalogTableType.EXTERNAL + } else { + CatalogTableType.MANAGED + } val tableDesc = CatalogTable( identifier = tableIdent, - tableType = CatalogTableType.EXTERNAL, - storage = DataSource.buildStorageFormatFromOptions(options), + tableType = tableType, + storage = storage, schema = schema, provider = Some(source) ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index 5dd04543ed..801912f441 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.internal +import java.io.File +import java.net.URI + import org.scalatest.BeforeAndAfterEach import org.apache.spark.SparkFunSuite @@ -27,7 +30,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo} import org.apache.spark.sql.catalyst.plans.logical.Range import org.apache.spark.sql.test.SharedSQLContext -import org.apache.spark.sql.types.{IntegerType, StructType} +import org.apache.spark.sql.types.StructType /** @@ -37,6 +40,7 @@ class CatalogSuite extends SparkFunSuite with BeforeAndAfterEach with SharedSQLContext { + import testImplicits._ private def sessionCatalog: SessionCatalog = spark.sessionState.catalog @@ -306,22 +310,6 @@ class CatalogSuite columnFields.foreach { f => assert(columnString.contains(f.toString)) } } - test("createExternalTable should fail if path is not given for file-based data source") { - val e = intercept[AnalysisException] { - spark.catalog.createExternalTable("tbl", "json", Map.empty[String, String]) - } - assert(e.message.contains("Unable to infer schema")) - - val e2 = intercept[AnalysisException] { - spark.catalog.createExternalTable( - "tbl", - "json", - new StructType().add("i", IntegerType), - Map.empty[String, String]) - } - assert(e2.message == "Cannot create a file-based external data source table without path") - } - test("dropTempView should not un-cache and drop metastore table if a same-name table exists") { withTable("same_name") { spark.range(10).write.saveAsTable("same_name") @@ -460,6 +448,50 @@ class CatalogSuite } } + test("createTable with 'path' in options") { + withTable("t") { + withTempDir { dir => + spark.catalog.createTable( + tableName = "t", + source = "json", + schema = new StructType().add("i", "int"), + options = Map("path" -> dir.getAbsolutePath)) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(table.tableType == CatalogTableType.EXTERNAL) + assert(table.storage.locationUri.get == dir.getAbsolutePath) + + Seq((1)).toDF("i").write.insertInto("t") + assert(dir.exists() && dir.listFiles().nonEmpty) + + sql("DROP TABLE t") + // the table path and data files are still there after DROP TABLE, if custom table path is + // specified. + assert(dir.exists() && dir.listFiles().nonEmpty) + } + } + } + + test("createTable without 'path' in options") { + withTable("t") { + spark.catalog.createTable( + tableName = "t", + source = "json", + schema = new StructType().add("i", "int"), + options = Map.empty[String, String]) + val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t")) + assert(table.tableType == CatalogTableType.MANAGED) + val tablePath = new File(new URI(table.storage.locationUri.get)) + assert(tablePath.exists() && tablePath.listFiles().isEmpty) + + Seq((1)).toDF("i").write.insertInto("t") + assert(tablePath.listFiles().nonEmpty) + + sql("DROP TABLE t") + // the table path is removed after DROP TABLE, if custom table path is not specified. + assert(!tablePath.exists()) + } + } + // TODO: add tests for the rest of them } -- cgit v1.2.3