aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2017-01-10 19:26:51 +0800
committerWenchen Fan <wenchen@databricks.com>2017-01-10 19:26:51 +0800
commitb0319c2ecb51bb97c3228afa4a384572b9ffbce6 (patch)
tree6319858f1cfc02edbe9e87b32381fc4ddfd6756f /sql
parentb0e5840d4b37d7b73e300671795185bba37effb0 (diff)
downloadspark-b0319c2ecb51bb97c3228afa4a384572b9ffbce6.tar.gz
spark-b0319c2ecb51bb97c3228afa4a384572b9ffbce6.tar.bz2
spark-b0319c2ecb51bb97c3228afa4a384572b9ffbce6.zip
[SPARK-19107][SQL] support creating hive table with DataFrameWriter and Catalog
## What changes were proposed in this pull request? After unifying the CREATE TABLE syntax in https://github.com/apache/spark/pull/16296, it's pretty easy to support creating hive table with `DataFrameWriter` and `Catalog` now. This PR basically just removes the hive provider check in `DataFrameWriter.saveAsTable` and `Catalog.createExternalTable`, and add tests. ## How was this patch tested? new tests in `HiveDDLSuite` Author: Wenchen Fan <wenchen@databricks.com> Closes #16487 from cloud-fan/hive-table.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala7
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala20
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala77
6 files changed, 93 insertions, 40 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 365b50dee9..cd83836178 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -28,6 +28,7 @@ import org.apache.spark.annotation.InterfaceStability
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.json.{JacksonParser, JSONOptions}
import org.apache.spark.sql.execution.LogicalRDD
+import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.jdbc._
import org.apache.spark.sql.execution.datasources.json.InferSchema
@@ -143,6 +144,11 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
*/
@scala.annotation.varargs
def load(paths: String*): DataFrame = {
+ if (source.toLowerCase == DDLUtils.HIVE_PROVIDER) {
+ throw new AnalysisException("Hive data source can only be used with tables, you can not " +
+ "read files of Hive data source directly.")
+ }
+
sparkSession.baseRelationToDataFrame(
DataSource.apply(
sparkSession,
@@ -160,7 +166,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
*/
def jdbc(url: String, table: String, properties: Properties): DataFrame = {
// properties should override settings in extraOptions.
- this.extraOptions = this.extraOptions ++ properties.asScala
+ this.extraOptions ++= properties.asScala
// explicit url and dbtable should override all
this.extraOptions += (JDBCOptions.JDBC_URL -> url, JDBCOptions.JDBC_TABLE_NAME -> table)
format("jdbc").load()
@@ -469,9 +475,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* @since 1.4.0
*/
def table(tableName: String): DataFrame = {
- Dataset.ofRows(sparkSession,
- sparkSession.sessionState.catalog.lookupRelation(
- sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)))
+ sparkSession.table(tableName)
}
/**
@@ -550,6 +554,6 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
private var userSpecifiedSchema: Option[StructType] = None
- private var extraOptions = new scala.collection.mutable.HashMap[String, String]
+ private val extraOptions = new scala.collection.mutable.HashMap[String, String]
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 3127ebf679..82331fdb9b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -205,6 +205,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
* @since 1.4.0
*/
def save(): Unit = {
+ if (source.toLowerCase == DDLUtils.HIVE_PROVIDER) {
+ throw new AnalysisException("Hive data source can only be used with tables, you can not " +
+ "write files of Hive data source directly.")
+ }
+
assertNotBucketed("save")
val dataSource = DataSource(
df.sparkSession,
@@ -361,10 +366,6 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
}
private def saveAsTable(tableIdent: TableIdentifier): Unit = {
- if (source.toLowerCase == DDLUtils.HIVE_PROVIDER) {
- throw new AnalysisException("Cannot create hive serde table with saveAsTable API")
- }
-
val catalog = df.sparkSession.sessionState.catalog
val tableExists = catalog.tableExists(tableIdent)
val db = tableIdent.database.getOrElse(catalog.getCurrentDatabase)
@@ -385,6 +386,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
}
EliminateSubqueryAliases(catalog.lookupRelation(tableIdentWithDB)) match {
// Only do the check if the table is a data source table (the relation is a BaseRelation).
+ // TODO(cloud-fan): also check hive table relation here when we support overwrite mode
+ // for creating hive tables.
case LogicalRelation(dest: BaseRelation, _, _) if srcRelations.contains(dest) =>
throw new AnalysisException(
s"Cannot overwrite table $tableName that is also being read from")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 41ed9d7180..8244b2152c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -347,10 +347,6 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
source: String,
schema: StructType,
options: Map[String, String]): DataFrame = {
- if (source.toLowerCase == "hive") {
- throw new AnalysisException("Cannot create hive serde table with createExternalTable API.")
- }
-
val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
val tableDesc = CatalogTable(
identifier = tableIdent,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index 89ec162c8e..5dd04543ed 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -322,13 +322,6 @@ class CatalogSuite
assert(e2.message == "Cannot create a file-based external data source table without path")
}
- test("createExternalTable should fail if provider is hive") {
- val e = intercept[AnalysisException] {
- spark.catalog.createExternalTable("tbl", "HiVe", Map.empty[String, String])
- }
- assert(e.message.contains("Cannot create hive serde table with createExternalTable API"))
- }
-
test("dropTempView should not un-cache and drop metastore table if a same-name table exists") {
withTable("same_name") {
spark.range(10).write.saveAsTable("same_name")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index aed825e2f3..13ef79e3b7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -1169,26 +1169,6 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
}
}
- test("save API - format hive") {
- withTempDir { dir =>
- val path = dir.getCanonicalPath
- val e = intercept[ClassNotFoundException] {
- spark.range(10).write.format("hive").mode(SaveMode.Ignore).save(path)
- }.getMessage
- assert(e.contains("Failed to find data source: hive"))
- }
- }
-
- test("saveAsTable API - format hive") {
- val tableName = "tab1"
- withTable(tableName) {
- val e = intercept[AnalysisException] {
- spark.range(10).write.format("hive").mode(SaveMode.Overwrite).saveAsTable(tableName)
- }.getMessage
- assert(e.contains("Cannot create hive serde table with saveAsTable API"))
- }
- }
-
test("create a temp view using hive") {
val tableName = "tab1"
withTable (tableName) {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 3ac07d0933..77285282a6 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.types.StructType
class HiveDDLSuite
extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
@@ -1289,4 +1290,80 @@ class HiveDDLSuite
}
}
}
+
+ test("create hive serde table with Catalog") {
+ withTable("t") {
+ withTempDir { dir =>
+ val df = spark.catalog.createExternalTable(
+ "t",
+ "hive",
+ new StructType().add("i", "int"),
+ Map("path" -> dir.getCanonicalPath, "fileFormat" -> "parquet"))
+ assert(df.collect().isEmpty)
+
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+ assert(DDLUtils.isHiveTable(table))
+ assert(table.storage.inputFormat ==
+ Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat"))
+ assert(table.storage.outputFormat ==
+ Some("org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat"))
+ assert(table.storage.serde ==
+ Some("org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe"))
+
+ sql("INSERT INTO t SELECT 1")
+ checkAnswer(spark.table("t"), Row(1))
+ }
+ }
+ }
+
+ test("create hive serde table with DataFrameWriter.saveAsTable") {
+ withTable("t", "t2") {
+ Seq(1 -> "a").toDF("i", "j")
+ .write.format("hive").option("fileFormat", "avro").saveAsTable("t")
+ checkAnswer(spark.table("t"), Row(1, "a"))
+
+ val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
+ assert(DDLUtils.isHiveTable(table))
+ assert(table.storage.inputFormat ==
+ Some("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat"))
+ assert(table.storage.outputFormat ==
+ Some("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat"))
+ assert(table.storage.serde ==
+ Some("org.apache.hadoop.hive.serde2.avro.AvroSerDe"))
+
+ sql("INSERT INTO t SELECT 2, 'b'")
+ checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Nil)
+
+ val e = intercept[AnalysisException] {
+ Seq(1 -> "a").toDF("i", "j").write.format("hive").partitionBy("i").saveAsTable("t2")
+ }
+ assert(e.message.contains("A Create Table As Select (CTAS) statement is not allowed " +
+ "to create a partitioned table using Hive"))
+
+ val e2 = intercept[AnalysisException] {
+ Seq(1 -> "a").toDF("i", "j").write.format("hive").bucketBy(4, "i").saveAsTable("t2")
+ }
+ assert(e2.message.contains("Creating bucketed Hive serde table is not supported yet"))
+
+ val e3 = intercept[AnalysisException] {
+ spark.table("t").write.format("hive").mode("overwrite").saveAsTable("t")
+ }
+ assert(e3.message.contains(
+ "CTAS for hive serde tables does not support append or overwrite semantics"))
+ }
+ }
+
+ test("read/write files with hive data source is not allowed") {
+ withTempDir { dir =>
+ val e = intercept[AnalysisException] {
+ spark.read.format("hive").load(dir.getAbsolutePath)
+ }
+ assert(e.message.contains("Hive data source can only be used with tables"))
+
+ val e2 = intercept[AnalysisException] {
+ Seq(1 -> "a").toDF("i", "j").write.format("hive").save(dir.getAbsolutePath)
+ }
+ assert(e2.message.contains("Hive data source can only be used with tables"))
+ }
+ }
}