aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--project/MimaExcludes.scala5
-rw-r--r--python/pyspark/sql/catalog.py27
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala129
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala78
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala66
6 files changed, 211 insertions, 103 deletions
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 2314d7f45c..e0ee00e682 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -43,7 +43,10 @@ object MimaExcludes {
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.recoverPartitions"),
// [SPARK-18537] Add a REST api to spark streaming
- ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.streaming.scheduler.StreamingListener.onStreamingStarted")
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.streaming.scheduler.StreamingListener.onStreamingStarted"),
+
+ // [SPARK-19148][SQL] do not expose the external table concept in Catalog
+ ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.createTable")
)
// Exclude rules for 2.1.x
diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py
index 30c7a3fe4f..253a750629 100644
--- a/python/pyspark/sql/catalog.py
+++ b/python/pyspark/sql/catalog.py
@@ -15,6 +15,7 @@
# limitations under the License.
#
+import warnings
from collections import namedtuple
from pyspark import since
@@ -138,7 +139,27 @@ class Catalog(object):
@since(2.0)
def createExternalTable(self, tableName, path=None, source=None, schema=None, **options):
- """Creates an external table based on the dataset in a data source.
+ """Creates a table based on the dataset in a data source.
+
+ It returns the DataFrame associated with the external table.
+
+ The data source is specified by the ``source`` and a set of ``options``.
+ If ``source`` is not specified, the default data source configured by
+ ``spark.sql.sources.default`` will be used.
+
+ Optionally, a schema can be provided as the schema of the returned :class:`DataFrame` and
+ created external table.
+
+ :return: :class:`DataFrame`
+ """
+ warnings.warn(
+ "createExternalTable is deprecated since Spark 2.2, please use createTable instead.",
+ DeprecationWarning)
+ return self.createTable(tableName, path, source, schema, **options)
+
+ @since(2.2)
+ def createTable(self, tableName, path=None, source=None, schema=None, **options):
+ """Creates a table based on the dataset in a data source.
It returns the DataFrame associated with the external table.
@@ -157,12 +178,12 @@ class Catalog(object):
source = self._sparkSession.conf.get(
"spark.sql.sources.default", "org.apache.spark.sql.parquet")
if schema is None:
- df = self._jcatalog.createExternalTable(tableName, source, options)
+ df = self._jcatalog.createTable(tableName, source, options)
else:
if not isinstance(schema, StructType):
raise TypeError("schema should be StructType")
scala_datatype = self._jsparkSession.parseDataType(schema.json())
- df = self._jcatalog.createExternalTable(tableName, source, scala_datatype, options)
+ df = self._jcatalog.createTable(tableName, source, scala_datatype, options)
return DataFrame(df, self._sparkSession._wrapped)
@since(2.0)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
index 6b061f8ab2..41e781ed18 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
@@ -17,6 +17,8 @@
package org.apache.spark.sql.catalog
+import scala.collection.JavaConverters._
+
import org.apache.spark.annotation.{Experimental, InterfaceStability}
import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset}
import org.apache.spark.sql.types.StructType
@@ -187,82 +189,169 @@ abstract class Catalog {
def functionExists(dbName: String, functionName: String): Boolean
/**
- * :: Experimental ::
- * Creates an external table from the given path and returns the corresponding DataFrame.
+ * Creates a table from the given path and returns the corresponding DataFrame.
* It will use the default data source configured by spark.sql.sources.default.
*
* @since 2.0.0
*/
+ @deprecated("use createTable instead.", "2.2.0")
+ def createExternalTable(tableName: String, path: String): DataFrame = {
+ createTable(tableName, path)
+ }
+
+ /**
+ * :: Experimental ::
+ * Creates a table from the given path and returns the corresponding DataFrame.
+ * It will use the default data source configured by spark.sql.sources.default.
+ *
+ * @since 2.2.0
+ */
@Experimental
@InterfaceStability.Evolving
- def createExternalTable(tableName: String, path: String): DataFrame
+ def createTable(tableName: String, path: String): DataFrame
/**
- * :: Experimental ::
- * Creates an external table from the given path based on a data source
- * and returns the corresponding DataFrame.
+ * Creates a table from the given path based on a data source and returns the corresponding
+ * DataFrame.
*
* @since 2.0.0
*/
+ @deprecated("use createTable instead.", "2.2.0")
+ def createExternalTable(tableName: String, path: String, source: String): DataFrame = {
+ createTable(tableName, path, source)
+ }
+
+ /**
+ * :: Experimental ::
+ * Creates a table from the given path based on a data source and returns the corresponding
+ * DataFrame.
+ *
+ * @since 2.2.0
+ */
@Experimental
@InterfaceStability.Evolving
- def createExternalTable(tableName: String, path: String, source: String): DataFrame
+ def createTable(tableName: String, path: String, source: String): DataFrame
/**
- * :: Experimental ::
- * Creates an external table from the given path based on a data source and a set of options.
+ * Creates a table from the given path based on a data source and a set of options.
* Then, returns the corresponding DataFrame.
*
* @since 2.0.0
*/
+ @deprecated("use createTable instead.", "2.2.0")
+ def createExternalTable(
+ tableName: String,
+ source: String,
+ options: java.util.Map[String, String]): DataFrame = {
+ createTable(tableName, source, options)
+ }
+
+ /**
+ * :: Experimental ::
+ * Creates a table from the given path based on a data source and a set of options.
+ * Then, returns the corresponding DataFrame.
+ *
+ * @since 2.2.0
+ */
@Experimental
@InterfaceStability.Evolving
+ def createTable(
+ tableName: String,
+ source: String,
+ options: java.util.Map[String, String]): DataFrame = {
+ createTable(tableName, source, options.asScala.toMap)
+ }
+
+ /**
+ * (Scala-specific)
+ * Creates a table from the given path based on a data source and a set of options.
+ * Then, returns the corresponding DataFrame.
+ *
+ * @since 2.0.0
+ */
+ @deprecated("use createTable instead.", "2.2.0")
def createExternalTable(
tableName: String,
source: String,
- options: java.util.Map[String, String]): DataFrame
+ options: Map[String, String]): DataFrame = {
+ createTable(tableName, source, options)
+ }
/**
* :: Experimental ::
* (Scala-specific)
- * Creates an external table from the given path based on a data source and a set of options.
+ * Creates a table from the given path based on a data source and a set of options.
* Then, returns the corresponding DataFrame.
*
- * @since 2.0.0
+ * @since 2.2.0
*/
@Experimental
@InterfaceStability.Evolving
- def createExternalTable(
+ def createTable(
tableName: String,
source: String,
options: Map[String, String]): DataFrame
/**
* :: Experimental ::
- * Create an external table from the given path based on a data source, a schema and
- * a set of options. Then, returns the corresponding DataFrame.
+ * Create a table from the given path based on a data source, a schema and a set of options.
+ * Then, returns the corresponding DataFrame.
*
* @since 2.0.0
*/
+ @deprecated("use createTable instead.", "2.2.0")
+ def createExternalTable(
+ tableName: String,
+ source: String,
+ schema: StructType,
+ options: java.util.Map[String, String]): DataFrame = {
+ createTable(tableName, source, schema, options)
+ }
+
+ /**
+ * :: Experimental ::
+ * Create a table from the given path based on a data source, a schema and a set of options.
+ * Then, returns the corresponding DataFrame.
+ *
+ * @since 2.2.0
+ */
@Experimental
@InterfaceStability.Evolving
+ def createTable(
+ tableName: String,
+ source: String,
+ schema: StructType,
+ options: java.util.Map[String, String]): DataFrame = {
+ createTable(tableName, source, schema, options.asScala.toMap)
+ }
+
+ /**
+ * (Scala-specific)
+ * Create a table from the given path based on a data source, a schema and a set of options.
+ * Then, returns the corresponding DataFrame.
+ *
+ * @since 2.0.0
+ */
+ @deprecated("use createTable instead.", "2.2.0")
def createExternalTable(
tableName: String,
source: String,
schema: StructType,
- options: java.util.Map[String, String]): DataFrame
+ options: Map[String, String]): DataFrame = {
+ createTable(tableName, source, schema, options)
+ }
/**
* :: Experimental ::
* (Scala-specific)
- * Create an external table from the given path based on a data source, a schema and
- * a set of options. Then, returns the corresponding DataFrame.
+ * Create a table from the given path based on a data source, a schema and a set of options.
+ * Then, returns the corresponding DataFrame.
*
- * @since 2.0.0
+ * @since 2.2.0
*/
@Experimental
@InterfaceStability.Evolving
- def createExternalTable(
+ def createTable(
tableName: String,
source: String,
schema: StructType,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 90aeebd932..beeba05554 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -71,15 +71,6 @@ case class CreateDataSourceTableCommand(table: CatalogTable, ignoreIfExists: Boo
options = table.storage.properties ++ pathOption,
catalogTable = Some(tableWithDefaultOptions)).resolveRelation()
- dataSource match {
- case fs: HadoopFsRelation =>
- if (table.tableType == CatalogTableType.EXTERNAL && fs.location.rootPaths.isEmpty) {
- throw new AnalysisException(
- "Cannot create a file-based external data source table without path")
- }
- case _ =>
- }
-
val partitionColumnNames = if (table.schema.nonEmpty) {
table.partitionColumnNames
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 8244b2152c..9136a83bc2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -17,7 +17,6 @@
package org.apache.spark.sql.internal
-import scala.collection.JavaConverters._
import scala.reflect.runtime.universe.TypeTag
import org.apache.spark.annotation.Experimental
@@ -257,101 +256,74 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
/**
* :: Experimental ::
- * Creates an external table from the given path and returns the corresponding DataFrame.
+ * Creates a table from the given path and returns the corresponding DataFrame.
* It will use the default data source configured by spark.sql.sources.default.
*
* @group ddl_ops
- * @since 2.0.0
+ * @since 2.2.0
*/
@Experimental
- override def createExternalTable(tableName: String, path: String): DataFrame = {
+ override def createTable(tableName: String, path: String): DataFrame = {
val dataSourceName = sparkSession.sessionState.conf.defaultDataSourceName
- createExternalTable(tableName, path, dataSourceName)
+ createTable(tableName, path, dataSourceName)
}
/**
* :: Experimental ::
- * Creates an external table from the given path based on a data source
- * and returns the corresponding DataFrame.
+ * Creates a table from the given path based on a data source and returns the corresponding
+ * DataFrame.
*
* @group ddl_ops
- * @since 2.0.0
+ * @since 2.2.0
*/
@Experimental
- override def createExternalTable(tableName: String, path: String, source: String): DataFrame = {
- createExternalTable(tableName, source, Map("path" -> path))
- }
-
- /**
- * :: Experimental ::
- * Creates an external table from the given path based on a data source and a set of options.
- * Then, returns the corresponding DataFrame.
- *
- * @group ddl_ops
- * @since 2.0.0
- */
- @Experimental
- override def createExternalTable(
- tableName: String,
- source: String,
- options: java.util.Map[String, String]): DataFrame = {
- createExternalTable(tableName, source, options.asScala.toMap)
+ override def createTable(tableName: String, path: String, source: String): DataFrame = {
+ createTable(tableName, source, Map("path" -> path))
}
/**
* :: Experimental ::
* (Scala-specific)
- * Creates an external table from the given path based on a data source and a set of options.
+ * Creates a table from the given path based on a data source and a set of options.
* Then, returns the corresponding DataFrame.
*
* @group ddl_ops
- * @since 2.0.0
+ * @since 2.2.0
*/
@Experimental
- override def createExternalTable(
+ override def createTable(
tableName: String,
source: String,
options: Map[String, String]): DataFrame = {
- createExternalTable(tableName, source, new StructType, options)
- }
-
- /**
- * :: Experimental ::
- * Create an external table from the given path based on a data source, a schema and
- * a set of options. Then, returns the corresponding DataFrame.
- *
- * @group ddl_ops
- * @since 2.0.0
- */
- @Experimental
- override def createExternalTable(
- tableName: String,
- source: String,
- schema: StructType,
- options: java.util.Map[String, String]): DataFrame = {
- createExternalTable(tableName, source, schema, options.asScala.toMap)
+ createTable(tableName, source, new StructType, options)
}
/**
* :: Experimental ::
* (Scala-specific)
- * Create an external table from the given path based on a data source, a schema and
- * a set of options. Then, returns the corresponding DataFrame.
+ * Create a table from the given path based on a data source, a schema and a set of options.
+ * Then, returns the corresponding DataFrame.
*
* @group ddl_ops
- * @since 2.0.0
+ * @since 2.2.0
*/
@Experimental
- override def createExternalTable(
+ override def createTable(
tableName: String,
source: String,
schema: StructType,
options: Map[String, String]): DataFrame = {
val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
+ val storage = DataSource.buildStorageFormatFromOptions(options)
+ val tableType = if (storage.locationUri.isDefined) {
+ CatalogTableType.EXTERNAL
+ } else {
+ CatalogTableType.MANAGED
+ }
val tableDesc = CatalogTable(
identifier = tableIdent,
- tableType = CatalogTableType.EXTERNAL,
- storage = DataSource.buildStorageFormatFromOptions(options),
+ tableType = tableType,
+ storage = storage,
schema = schema,
provider = Some(source)
)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index 5dd04543ed..801912f441 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -17,6 +17,9 @@
package org.apache.spark.sql.internal
+import java.io.File
+import java.net.URI
+
import org.scalatest.BeforeAndAfterEach
import org.apache.spark.SparkFunSuite
@@ -27,7 +30,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
import org.apache.spark.sql.catalyst.plans.logical.Range
import org.apache.spark.sql.test.SharedSQLContext
-import org.apache.spark.sql.types.{IntegerType, StructType}
+import org.apache.spark.sql.types.StructType
/**
@@ -37,6 +40,7 @@ class CatalogSuite
extends SparkFunSuite
with BeforeAndAfterEach
with SharedSQLContext {
+ import testImplicits._
private def sessionCatalog: SessionCatalog = spark.sessionState.catalog
@@ -306,22 +310,6 @@ class CatalogSuite
columnFields.foreach { f => assert(columnString.contains(f.toString)) }
}
- test("createExternalTable should fail if path is not given for file-based data source") {
- val e = intercept[AnalysisException] {
- spark.catalog.createExternalTable("tbl", "json", Map.empty[String, String])
- }
- assert(e.message.contains("Unable to infer schema"))
-
- val e2 = intercept[AnalysisException] {
- spark.catalog.createExternalTable(
- "tbl",
- "json",
- new StructType().add("i", IntegerType),
- Map.empty[String, String])
- }
- assert(e2.message == "Cannot create a file-based external data source table without path")
- }
-
test("dropTempView should not un-cache and drop metastore table if a same-name table exists") {
withTable("same_name") {
spark.range(10).write.saveAsTable("same_name")
@@ -460,6 +448,50 @@ class CatalogSuite
}
}
+ test("createTable with 'path' in options") {
+ withTable("t") {
+ withTempDir { dir =>
+ spark.catalog.createTable(
+ tableName = "t",
+ source = "json",
+ schema = new StructType().add("i", "int"),
+ options = Map("path" -> dir.getAbsolutePath))
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+ assert(table.tableType == CatalogTableType.EXTERNAL)
+ assert(table.storage.locationUri.get == dir.getAbsolutePath)
+
+ Seq((1)).toDF("i").write.insertInto("t")
+ assert(dir.exists() && dir.listFiles().nonEmpty)
+
+ sql("DROP TABLE t")
+ // the table path and data files are still there after DROP TABLE, if custom table path is
+ // specified.
+ assert(dir.exists() && dir.listFiles().nonEmpty)
+ }
+ }
+ }
+
+ test("createTable without 'path' in options") {
+ withTable("t") {
+ spark.catalog.createTable(
+ tableName = "t",
+ source = "json",
+ schema = new StructType().add("i", "int"),
+ options = Map.empty[String, String])
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+ assert(table.tableType == CatalogTableType.MANAGED)
+ val tablePath = new File(new URI(table.storage.locationUri.get))
+ assert(tablePath.exists() && tablePath.listFiles().isEmpty)
+
+ Seq((1)).toDF("i").write.insertInto("t")
+ assert(tablePath.listFiles().nonEmpty)
+
+ sql("DROP TABLE t")
+ // the table path is removed after DROP TABLE, if custom table path is not specified.
+ assert(!tablePath.exists())
+ }
+ }
+
// TODO: add tests for the rest of them
}