aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-07-27 17:15:35 +0800
committerCheng Lian <lian@databricks.com>2015-07-27 17:15:35 +0800
commit72981bc8f0d421e2563e2543a8c16a8cc76ad3aa (patch)
tree1dda4a0e7d3e4305fce65529cf3b3862c7f52330 /sql/hive
parent4ffd3a1db5ecff653b02aa325786e734351c8bd2 (diff)
downloadspark-72981bc8f0d421e2563e2543a8c16a8cc76ad3aa.tar.gz
spark-72981bc8f0d421e2563e2543a8c16a8cc76ad3aa.tar.bz2
spark-72981bc8f0d421e2563e2543a8c16a8cc76ad3aa.zip
[SPARK-7943] [SPARK-8105] [SPARK-8435] [SPARK-8714] [SPARK-8561] Fixes multi-database support
This PR fixes a set of issues related to multi-database. A new data structure `TableIdentifier` is introduced to identify a table among multiple databases. We should stop using a single `String` (table name without database name), or `Seq[String]` (optional database name plus table name) to identify tables internally. Author: Cheng Lian <lian@databricks.com> Closes #7623 from liancheng/spark-8131-multi-db and squashes the following commits: f3bcd4b [Cheng Lian] Addresses PR comments e0eb76a [Cheng Lian] Fixes styling issues 41e2207 [Cheng Lian] Fixes multi-database support d4d1ec2 [Cheng Lian] Adds multi-database test cases
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala5
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala31
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala159
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala7
4 files changed, 191 insertions, 11 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 1b8edefef4..110f51a305 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -40,7 +40,7 @@ import org.apache.spark.annotation.Experimental
import org.apache.spark.sql._
import org.apache.spark.sql.SQLConf.SQLConfEntry
import org.apache.spark.sql.SQLConf.SQLConfEntry._
-import org.apache.spark.sql.catalyst.ParserDialect
+import org.apache.spark.sql.catalyst.{TableIdentifier, ParserDialect}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUDFs, SetCommand}
@@ -267,7 +267,8 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging {
* @since 1.3.0
*/
def refreshTable(tableName: String): Unit = {
- catalog.refreshTable(catalog.client.currentDatabase, tableName)
+ val tableIdent = TableIdentifier(tableName).withDatabase(catalog.client.currentDatabase)
+ catalog.refreshTable(tableIdent)
}
protected[hive] def invalidateTable(tableName: String): Unit = {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 2629235312..9c707a7a2e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -29,13 +29,13 @@ import org.apache.hadoop.hive.ql.metadata._
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.spark.Logging
-import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.{Catalog, MultiInstanceRelation, OverrideCatalog}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
+import org.apache.spark.sql.catalyst.{InternalRow, SqlParser, TableIdentifier}
import org.apache.spark.sql.execution.datasources
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation, Partition => ParquetPartition, PartitionSpec, ResolvedDataSource}
import org.apache.spark.sql.hive.client._
@@ -43,7 +43,6 @@ import org.apache.spark.sql.parquet.ParquetRelation
import org.apache.spark.sql.types._
import org.apache.spark.sql.{AnalysisException, SQLContext, SaveMode}
-
private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: HiveContext)
extends Catalog with Logging {
@@ -115,7 +114,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
CacheBuilder.newBuilder().maximumSize(1000).build(cacheLoader)
}
- override def refreshTable(databaseName: String, tableName: String): Unit = {
+ override def refreshTable(tableIdent: TableIdentifier): Unit = {
// refreshTable does not eagerly reload the cache. It just invalidate the cache.
// Next time when we use the table, it will be populated in the cache.
// Since we also cache ParquetRelations converted from Hive Parquet tables and
@@ -124,7 +123,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
// it is better at here to invalidate the cache to avoid confusing waring logs from the
// cache loader (e.g. cannot find data source provider, which is only defined for
// data source table.).
- invalidateTable(databaseName, tableName)
+ invalidateTable(tableIdent.database.getOrElse(client.currentDatabase), tableIdent.table)
}
def invalidateTable(databaseName: String, tableName: String): Unit = {
@@ -144,7 +143,27 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
provider: String,
options: Map[String, String],
isExternal: Boolean): Unit = {
- val (dbName, tblName) = processDatabaseAndTableName(client.currentDatabase, tableName)
+ createDataSourceTable(
+ new SqlParser().parseTableIdentifier(tableName),
+ userSpecifiedSchema,
+ partitionColumns,
+ provider,
+ options,
+ isExternal)
+ }
+
+ private def createDataSourceTable(
+ tableIdent: TableIdentifier,
+ userSpecifiedSchema: Option[StructType],
+ partitionColumns: Array[String],
+ provider: String,
+ options: Map[String, String],
+ isExternal: Boolean): Unit = {
+ val (dbName, tblName) = {
+ val database = tableIdent.database.getOrElse(client.currentDatabase)
+ processDatabaseAndTableName(database, tableIdent.table)
+ }
+
val tableProperties = new scala.collection.mutable.HashMap[String, String]
tableProperties.put("spark.sql.sources.provider", provider)
@@ -177,7 +196,7 @@ private[hive] class HiveMetastoreCatalog(val client: ClientInterface, hive: Hive
// partitions when we load the table. However, if there are specified partition columns,
// we simplily ignore them and provide a warning message..
logWarning(
- s"The schema and partitions of table $tableName will be inferred when it is loaded. " +
+ s"The schema and partitions of table $tableIdent will be inferred when it is loaded. " +
s"Specified partition columns (${partitionColumns.mkString(",")}) will be ignored.")
}
Seq.empty[HiveColumn]
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
new file mode 100644
index 0000000000..73852f13ad
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.{QueryTest, SQLContext, SaveMode}
+
+class MultiDatabaseSuite extends QueryTest with SQLTestUtils {
+ override val sqlContext: SQLContext = TestHive
+
+ import sqlContext.sql
+
+ private val df = sqlContext.range(10).coalesce(1)
+
+ test(s"saveAsTable() to non-default database - with USE - Overwrite") {
+ withTempDatabase { db =>
+ activateDatabase(db) {
+ df.write.mode(SaveMode.Overwrite).saveAsTable("t")
+ assert(sqlContext.tableNames().contains("t"))
+ checkAnswer(sqlContext.table("t"), df)
+ }
+
+ assert(sqlContext.tableNames(db).contains("t"))
+ checkAnswer(sqlContext.table(s"$db.t"), df)
+ }
+ }
+
+ test(s"saveAsTable() to non-default database - without USE - Overwrite") {
+ withTempDatabase { db =>
+ df.write.mode(SaveMode.Overwrite).saveAsTable(s"$db.t")
+ assert(sqlContext.tableNames(db).contains("t"))
+ checkAnswer(sqlContext.table(s"$db.t"), df)
+ }
+ }
+
+ test(s"saveAsTable() to non-default database - with USE - Append") {
+ withTempDatabase { db =>
+ activateDatabase(db) {
+ df.write.mode(SaveMode.Overwrite).saveAsTable("t")
+ df.write.mode(SaveMode.Append).saveAsTable("t")
+ assert(sqlContext.tableNames().contains("t"))
+ checkAnswer(sqlContext.table("t"), df.unionAll(df))
+ }
+
+ assert(sqlContext.tableNames(db).contains("t"))
+ checkAnswer(sqlContext.table(s"$db.t"), df.unionAll(df))
+ }
+ }
+
+ test(s"saveAsTable() to non-default database - without USE - Append") {
+ withTempDatabase { db =>
+ df.write.mode(SaveMode.Overwrite).saveAsTable(s"$db.t")
+ df.write.mode(SaveMode.Append).saveAsTable(s"$db.t")
+ assert(sqlContext.tableNames(db).contains("t"))
+ checkAnswer(sqlContext.table(s"$db.t"), df.unionAll(df))
+ }
+ }
+
+ test(s"insertInto() non-default database - with USE") {
+ withTempDatabase { db =>
+ activateDatabase(db) {
+ df.write.mode(SaveMode.Overwrite).saveAsTable("t")
+ assert(sqlContext.tableNames().contains("t"))
+
+ df.write.insertInto(s"$db.t")
+ checkAnswer(sqlContext.table(s"$db.t"), df.unionAll(df))
+ }
+ }
+ }
+
+ test(s"insertInto() non-default database - without USE") {
+ withTempDatabase { db =>
+ activateDatabase(db) {
+ df.write.mode(SaveMode.Overwrite).saveAsTable("t")
+ assert(sqlContext.tableNames().contains("t"))
+ }
+
+ assert(sqlContext.tableNames(db).contains("t"))
+
+ df.write.insertInto(s"$db.t")
+ checkAnswer(sqlContext.table(s"$db.t"), df.unionAll(df))
+ }
+ }
+
+ test("Looks up tables in non-default database") {
+ withTempDatabase { db =>
+ activateDatabase(db) {
+ sql("CREATE TABLE t (key INT)")
+ checkAnswer(sqlContext.table("t"), sqlContext.emptyDataFrame)
+ }
+
+ checkAnswer(sqlContext.table(s"$db.t"), sqlContext.emptyDataFrame)
+ }
+ }
+
+ test("Drops a table in a non-default database") {
+ withTempDatabase { db =>
+ activateDatabase(db) {
+ sql(s"CREATE TABLE t (key INT)")
+ assert(sqlContext.tableNames().contains("t"))
+ assert(!sqlContext.tableNames("default").contains("t"))
+ }
+
+ assert(!sqlContext.tableNames().contains("t"))
+ assert(sqlContext.tableNames(db).contains("t"))
+
+ activateDatabase(db) {
+ sql(s"DROP TABLE t")
+ assert(!sqlContext.tableNames().contains("t"))
+ assert(!sqlContext.tableNames("default").contains("t"))
+ }
+
+ assert(!sqlContext.tableNames().contains("t"))
+ assert(!sqlContext.tableNames(db).contains("t"))
+ }
+ }
+
+ test("Refreshes a table in a non-default database") {
+ import org.apache.spark.sql.functions.lit
+
+ withTempDatabase { db =>
+ withTempPath { dir =>
+ val path = dir.getCanonicalPath
+
+ activateDatabase(db) {
+ sql(
+ s"""CREATE EXTERNAL TABLE t (id BIGINT)
+ |PARTITIONED BY (p INT)
+ |STORED AS PARQUET
+ |LOCATION '$path'
+ """.stripMargin)
+
+ checkAnswer(sqlContext.table("t"), sqlContext.emptyDataFrame)
+
+ df.write.parquet(s"$path/p=1")
+ sql("ALTER TABLE t ADD PARTITION (p=1)")
+ sql("REFRESH TABLE t")
+ checkAnswer(sqlContext.table("t"), df.withColumn("p", lit(1)))
+ }
+ }
+ }
+ }
+}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala
index 9d76d6503a..145965388d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcTest.scala
@@ -22,14 +22,15 @@ import java.io.File
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
-import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.SparkFunSuite
import org.apache.spark.sql._
+import org.apache.spark.sql.test.SQLTestUtils
-private[sql] trait OrcTest extends SQLTestUtils {
+private[sql] trait OrcTest extends SQLTestUtils { this: SparkFunSuite =>
lazy val sqlContext = org.apache.spark.sql.hive.test.TestHive
- import sqlContext.sparkContext
import sqlContext.implicits._
+ import sqlContext.sparkContext
/**
* Writes `data` to a Orc file, which is then passed to `f` and will be deleted after `f`