aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2016-06-01 16:02:27 -0700
committerCheng Lian <lian@databricks.com>2016-06-01 16:02:27 -0700
commit7bb64aae27f670531699f59d3f410e38866609b7 (patch)
tree7c1ca94c9e87d7d7807dda3ba74a0cffcc7660e2
parent9e2643b21d5749f2f5447b0274a6a35496054342 (diff)
downloadspark-7bb64aae27f670531699f59d3f410e38866609b7.tar.gz
spark-7bb64aae27f670531699f59d3f410e38866609b7.tar.bz2
spark-7bb64aae27f670531699f59d3f410e38866609b7.zip
[SPARK-15269][SQL] Removes unexpected empty table directories created while creating external Spark SQL data sourcet tables.
This PR is an alternative to #13120 authored by xwu0226. ## What changes were proposed in this pull request? When creating an external Spark SQL data source table and persisting its metadata to Hive metastore, we don't use the standard Hive `Table.dataLocation` field because Hive only allows directory paths as data locations while Spark SQL also allows file paths. However, if we don't set `Table.dataLocation`, Hive always creates an unexpected empty table directory under database location, but doesn't remove it while dropping the table (because the table is external). This PR works around this issue by explicitly setting `Table.dataLocation` and then manullay removing the created directory after creating the external table. Please refer to [this JIRA comment][1] for more details about why we chose this approach as a workaround. [1]: https://issues.apache.org/jira/browse/SPARK-15269?focusedCommentId=15297408&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15297408 ## How was this patch tested? 1. A new test case is added in `HiveQuerySuite` for this case 2. Updated `ShowCreateTableSuite` to use the same table name in all test cases. (This is how I hit this issue at the first place.) Author: Cheng Lian <lian@databricks.com> Closes #13270 from liancheng/spark-15269-unpleasant-fix.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala5
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala1
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala45
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala22
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala3
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala13
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala36
11 files changed, 105 insertions, 36 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
index d2003fd689..6911843999 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/AnalysisException.scala
@@ -32,8 +32,9 @@ class AnalysisException protected[sql] (
val message: String,
val line: Option[Int] = None,
val startPosition: Option[Int] = None,
- val plan: Option[LogicalPlan] = None)
- extends Exception with Serializable {
+ val plan: Option[LogicalPlan] = None,
+ val cause: Option[Throwable] = None)
+ extends Exception(message, cause.orNull) with Serializable {
def withPosition(line: Option[Int], startPosition: Option[Int]): AnalysisException = {
val newException = new AnalysisException(message, line, startPosition)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index cf9286e6b9..371c198aa3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -22,7 +22,7 @@ import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 52bedf9dbd..7d7fd0399d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -557,9 +557,9 @@ class SparkSession private(
}
- /* ------------------------ *
- | Catalog-related methods |
- * ----------------- ------ */
+ /* ------------------------- *
+ | Catalog-related methods |
+ * ------------------------- */
/**
* Interface through which the user may create, drop, alter or query underlying
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 4b9aab612e..9956c5b092 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -118,8 +118,8 @@ case class CreateDataSourceTableCommand(
/**
* A command used to create a data source table using the result of a query.
*
- * Note: This is different from [[CreateTableAsSelect]]. Please check the syntax for difference.
- * This is not intended for temporary tables.
+ * Note: This is different from [[CreateTableAsSelectLogicalPlan]]. Please check the syntax for
+ * difference. This is not intended for temporary tables.
*
* The syntax of using this command in SQL is:
* {{{
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
index 38317d46dd..d554937d8b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
@@ -77,4 +77,3 @@ object HiveSerDe {
serdeMap.get(key)
}
}
-
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 5ffd8ef149..b8bc9ab900 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -21,6 +21,8 @@ import java.util
import scala.util.control.NonFatal
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.hive.ql.metadata.HiveException
import org.apache.thrift.TException
@@ -35,7 +37,9 @@ import org.apache.spark.sql.hive.client.HiveClient
* A persistent implementation of the system catalog using Hive.
* All public methods must be synchronized for thread-safety.
*/
-private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCatalog with Logging {
+private[spark] class HiveExternalCatalog(client: HiveClient, hadoopConf: Configuration)
+ extends ExternalCatalog with Logging {
+
import CatalogTypes.TablePartitionSpec
// Exceptions thrown by the hive client that we would like to wrap
@@ -68,7 +72,8 @@ private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCat
body
} catch {
case NonFatal(e) if isClientException(e) =>
- throw new AnalysisException(e.getClass.getCanonicalName + ": " + e.getMessage)
+ throw new AnalysisException(
+ e.getClass.getCanonicalName + ": " + e.getMessage, cause = Some(e))
}
}
@@ -147,7 +152,41 @@ private[spark] class HiveExternalCatalog(client: HiveClient) extends ExternalCat
ignoreIfExists: Boolean): Unit = withClient {
requireDbExists(db)
requireDbMatches(db, tableDefinition)
- client.createTable(tableDefinition, ignoreIfExists)
+
+ if (
+ // If this is an external data source table...
+ tableDefinition.properties.contains("spark.sql.sources.provider") &&
+ tableDefinition.tableType == CatalogTableType.EXTERNAL &&
+ // ... that is not persisted as Hive compatible format (external tables in Hive compatible
+ // format always set `locationUri` to the actual data location and should NOT be hacked as
+ // following.)
+ tableDefinition.storage.locationUri.isEmpty
+ ) {
+ // !! HACK ALERT !!
+ //
+ // Due to a restriction of Hive metastore, here we have to set `locationUri` to a temporary
+ // directory that doesn't exist yet but can definitely be successfully created, and then
+ // delete it right after creating the external data source table. This location will be
+ // persisted to Hive metastore as standard Hive table location URI, but Spark SQL doesn't
+ // really use it. Also, since we only do this workaround for external tables, deleting the
+ // directory after the fact doesn't do any harm.
+ //
+ // Please refer to https://issues.apache.org/jira/browse/SPARK-15269 for more details.
+ val tempPath = {
+ val dbLocation = getDatabase(tableDefinition.database).locationUri
+ new Path(dbLocation, tableDefinition.identifier.table + "-__PLACEHOLDER__")
+ }
+
+ try {
+ client.createTable(
+ tableDefinition.withNewStorage(locationUri = Some(tempPath.toString)),
+ ignoreIfExists)
+ } finally {
+ FileSystem.get(tempPath.toUri, hadoopConf).delete(tempPath, true)
+ }
+ } else {
+ client.createTable(tableDefinition, ignoreIfExists)
+ }
}
override def dropTable(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala
index f0d96403e8..a0106ee882 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSharedState.scala
@@ -51,6 +51,6 @@ private[hive] class HiveSharedState(override val sparkContext: SparkContext)
/**
* A catalog that interacts with the Hive metastore.
*/
- override lazy val externalCatalog = new HiveExternalCatalog(metadataHive)
-
+ override lazy val externalCatalog =
+ new HiveExternalCatalog(metadataHive, sparkContext.hadoopConfiguration)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 71d5c9960a..47fa41823c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -43,6 +43,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.QueryExecutionException
+import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.util.{CircularBuffer, Utils}
/**
@@ -323,7 +324,7 @@ private[hive] class HiveClientImpl(
}
override def listDatabases(pattern: String): Seq[String] = withHiveState {
- client.getDatabasesByPattern(pattern).asScala.toSeq
+ client.getDatabasesByPattern(pattern).asScala
}
override def getTableOption(
@@ -351,6 +352,8 @@ private[hive] class HiveClientImpl(
unsupportedFeatures += "bucketing"
}
+ val properties = h.getParameters.asScala.toMap
+
CatalogTable(
identifier = TableIdentifier(h.getTableName, Option(h.getDbName)),
tableType = h.getTableType match {
@@ -368,14 +371,27 @@ private[hive] class HiveClientImpl(
createTime = h.getTTable.getCreateTime.toLong * 1000,
lastAccessTime = h.getLastAccessTime.toLong * 1000,
storage = CatalogStorageFormat(
- locationUri = shim.getDataLocation(h),
+ locationUri = shim.getDataLocation(h).filterNot { _ =>
+ // SPARK-15269: Persisted data source tables always store the location URI as a SerDe
+ // property named "path" instead of standard Hive `dataLocation`, because Hive only
+ // allows directory paths as location URIs while Spark SQL data source tables also
+ // allows file paths. So the standard Hive `dataLocation` is meaningless for Spark SQL
+ // data source tables.
+ DDLUtils.isDatasourceTable(properties) &&
+ h.getTableType == HiveTableType.EXTERNAL_TABLE &&
+ // Spark SQL may also save external data source in Hive compatible format when
+ // possible, so that these tables can be directly accessed by Hive. For these tables,
+ // `dataLocation` is still necessary. Here we also check for input format class
+ // because only these Hive compatible tables set this field.
+ h.getInputFormatClass == null
+ },
inputFormat = Option(h.getInputFormatClass).map(_.getName),
outputFormat = Option(h.getOutputFormatClass).map(_.getName),
serde = Option(h.getSerializationLib),
compressed = h.getTTable.getSd.isCompressed,
serdeProperties = h.getTTable.getSd.getSerdeInfo.getParameters.asScala.toMap
),
- properties = h.getParameters.asScala.toMap,
+ properties = properties,
viewOriginalText = Option(h.getViewOriginalText),
viewText = Option(h.getViewExpandedText),
unsupportedFeatures = unsupportedFeatures)
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
index bf9935ae41..175889b08b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogSuite.scala
@@ -37,7 +37,8 @@ class HiveExternalCatalogSuite extends ExternalCatalogSuite {
protected override val utils: CatalogTestUtils = new CatalogTestUtils {
override val tableInputFormat: String = "org.apache.hadoop.mapred.SequenceFileInputFormat"
override val tableOutputFormat: String = "org.apache.hadoop.mapred.SequenceFileOutputFormat"
- override def newEmptyCatalog(): ExternalCatalog = new HiveExternalCatalog(client)
+ override def newEmptyCatalog(): ExternalCatalog =
+ new HiveExternalCatalog(client, new Configuration())
}
protected override def resetState(): Unit = client.reset()
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 2c50cc88cc..3d8123d3c0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -1104,4 +1104,17 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
}
}
}
+
+ test("SPARK-15269 external data source table creation") {
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+ spark.range(1).write.json(path)
+
+ withTable("t") {
+ sql(s"CREATE TABLE t USING json OPTIONS (PATH '$path')")
+ sql("DROP TABLE t")
+ sql(s"CREATE TABLE t USING json AS SELECT 1 AS c")
+ }
+ }
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
index f789d88d5d..3f3dc12209 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
@@ -28,11 +28,11 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing
import testImplicits._
test("data source table with user specified schema") {
- withTable("ddl_test1") {
+ withTable("ddl_test") {
val jsonFilePath = Utils.getSparkClassLoader.getResource("sample.json").getFile
sql(
- s"""CREATE TABLE ddl_test1 (
+ s"""CREATE TABLE ddl_test (
| a STRING,
| b STRING,
| `extra col` ARRAY<INT>,
@@ -45,55 +45,55 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing
""".stripMargin
)
- checkCreateTable("ddl_test1")
+ checkCreateTable("ddl_test")
}
}
test("data source table CTAS") {
- withTable("ddl_test2") {
+ withTable("ddl_test") {
sql(
- s"""CREATE TABLE ddl_test2
+ s"""CREATE TABLE ddl_test
|USING json
|AS SELECT 1 AS a, "foo" AS b
""".stripMargin
)
- checkCreateTable("ddl_test2")
+ checkCreateTable("ddl_test")
}
}
test("partitioned data source table") {
- withTable("ddl_test3") {
+ withTable("ddl_test") {
sql(
- s"""CREATE TABLE ddl_test3
+ s"""CREATE TABLE ddl_test
|USING json
|PARTITIONED BY (b)
|AS SELECT 1 AS a, "foo" AS b
""".stripMargin
)
- checkCreateTable("ddl_test3")
+ checkCreateTable("ddl_test")
}
}
test("bucketed data source table") {
- withTable("ddl_test3") {
+ withTable("ddl_test") {
sql(
- s"""CREATE TABLE ddl_test3
+ s"""CREATE TABLE ddl_test
|USING json
|CLUSTERED BY (a) SORTED BY (b) INTO 2 BUCKETS
|AS SELECT 1 AS a, "foo" AS b
""".stripMargin
)
- checkCreateTable("ddl_test3")
+ checkCreateTable("ddl_test")
}
}
test("partitioned bucketed data source table") {
- withTable("ddl_test4") {
+ withTable("ddl_test") {
sql(
- s"""CREATE TABLE ddl_test4
+ s"""CREATE TABLE ddl_test
|USING json
|PARTITIONED BY (c)
|CLUSTERED BY (a) SORTED BY (b) INTO 2 BUCKETS
@@ -101,12 +101,12 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing
""".stripMargin
)
- checkCreateTable("ddl_test4")
+ checkCreateTable("ddl_test")
}
}
test("data source table using Dataset API") {
- withTable("ddl_test5") {
+ withTable("ddl_test") {
spark
.range(3)
.select('id as 'a, 'id as 'b, 'id as 'c, 'id as 'd, 'id as 'e)
@@ -114,9 +114,9 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing
.mode("overwrite")
.partitionBy("a", "b")
.bucketBy(2, "c", "d")
- .saveAsTable("ddl_test5")
+ .saveAsTable("ddl_test")
- checkCreateTable("ddl_test5")
+ checkCreateTable("ddl_test")
}
}