diff options
author | Wenchen Fan <wenchen@databricks.com> | 2015-10-14 16:05:37 -0700 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2015-10-14 16:05:37 -0700 |
commit | 56d7da14ab8f89bf4f303b27f51fd22d23967ffb (patch) | |
tree | d21fc4a4d5a197788a8b8e165d89d3e95bff13be /sql/core | |
parent | 9a430a027faafb083ca569698effb697af26a1db (diff) | |
download | spark-56d7da14ab8f89bf4f303b27f51fd22d23967ffb.tar.gz spark-56d7da14ab8f89bf4f303b27f51fd22d23967ffb.tar.bz2 spark-56d7da14ab8f89bf4f303b27f51fd22d23967ffb.zip |
[SPARK-10104] [SQL] Consolidate different forms of table identifiers
Right now, we have QualifiedTableName, TableIdentifier, and Seq[String] to represent table identifiers. We should only have one form and TableIdentifier is the best one because it provides methods to get table name, database name, return unquoted string, and return quoted string.
Author: Wenchen Fan <wenchen@databricks.com>
Author: Wenchen Fan <cloud0fan@163.com>
Closes #8453 from cloud-fan/table-name.
Diffstat (limited to 'sql/core')
10 files changed, 26 insertions, 25 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 97a8b6518a..eacdea2c1e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource} import org.apache.spark.sql.types.StructType import org.apache.spark.{Logging, Partition} +import org.apache.spark.sql.catalyst.{SqlParser, TableIdentifier} /** * :: Experimental :: @@ -287,7 +288,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { * @since 1.4.0 */ def table(tableName: String): DataFrame = { - DataFrame(sqlContext, sqlContext.catalog.lookupRelation(Seq(tableName))) + DataFrame(sqlContext, sqlContext.catalog.lookupRelation(TableIdentifier(tableName))) } /////////////////////////////////////////////////////////////////////////////////////// diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 03e973666e..764510ab4b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -171,7 +171,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { val overwrite = mode == SaveMode.Overwrite df.sqlContext.executePlan( InsertIntoTable( - UnresolvedRelation(tableIdent.toSeq), + UnresolvedRelation(tableIdent), partitions.getOrElse(Map.empty[String, Option[String]]), df.logicalPlan, overwrite, @@ -201,7 +201,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { } private def saveAsTable(tableIdent: TableIdentifier): Unit = { - val tableExists = df.sqlContext.catalog.tableExists(tableIdent.toSeq) + val tableExists = df.sqlContext.catalog.tableExists(tableIdent) (tableExists, mode) match { case (true, SaveMode.Ignore) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 3d5e35ab31..361eb576c5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -714,7 +714,7 @@ class SQLContext private[sql]( * only during the lifetime of this instance of SQLContext. */ private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): Unit = { - catalog.registerTable(Seq(tableName), df.logicalPlan) + catalog.registerTable(TableIdentifier(tableName), df.logicalPlan) } /** @@ -728,7 +728,7 @@ class SQLContext private[sql]( */ def dropTempTable(tableName: String): Unit = { cacheManager.tryUncacheQuery(table(tableName)) - catalog.unregisterTable(Seq(tableName)) + catalog.unregisterTable(TableIdentifier(tableName)) } /** @@ -795,7 +795,7 @@ class SQLContext private[sql]( } private def table(tableIdent: TableIdentifier): DataFrame = { - DataFrame(this, catalog.lookupRelation(tableIdent.toSeq)) + DataFrame(this, catalog.lookupRelation(tableIdent)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala index f7a88b98c0..446739d5b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala @@ -140,7 +140,7 @@ class DDLParser(parseQuery: String => LogicalPlan) protected lazy val describeTable: Parser[LogicalPlan] = (DESCRIBE ~> opt(EXTENDED)) ~ tableIdentifier ^^ { case e ~ tableIdent => - DescribeCommand(UnresolvedRelation(tableIdent.toSeq, None), e.isDefined) + DescribeCommand(UnresolvedRelation(tableIdent, None), e.isDefined) } protected lazy val refreshTable: Parser[LogicalPlan] = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index 31d6b75e13..e7deeff13d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -71,7 +71,6 @@ case class CreateTableUsing( * can analyze the logical plan that will be used to populate the table. * So, [[PreWriteCheck]] can detect cases that are not allowed. */ -// TODO: Use TableIdentifier instead of String for tableName (SPARK-10104). case class CreateTableUsingAsSelect( tableIdent: TableIdentifier, provider: String, @@ -93,7 +92,7 @@ case class CreateTempTableUsing( val resolved = ResolvedDataSource( sqlContext, userSpecifiedSchema, Array.empty[String], provider, options) sqlContext.catalog.registerTable( - tableIdent.toSeq, + tableIdent, DataFrame(sqlContext, LogicalRelation(resolved.relation)).logicalPlan) Seq.empty[Row] @@ -112,7 +111,7 @@ case class CreateTempTableUsingAsSelect( val df = DataFrame(sqlContext, query) val resolved = ResolvedDataSource(sqlContext, provider, partitionColumns, mode, options, df) sqlContext.catalog.registerTable( - tableIdent.toSeq, + tableIdent, DataFrame(sqlContext, LogicalRelation(resolved.relation)).logicalPlan) Seq.empty[Row] @@ -128,7 +127,7 @@ case class RefreshTable(tableIdent: TableIdentifier) // If this table is cached as a InMemoryColumnarRelation, drop the original // cached version and make the new version cached lazily. - val logicalPlan = sqlContext.catalog.lookupRelation(tableIdent.toSeq) + val logicalPlan = sqlContext.catalog.lookupRelation(tableIdent) // Use lookupCachedData directly since RefreshTable also takes databaseName. val isCached = sqlContext.cacheManager.lookupCachedData(logicalPlan).nonEmpty if (isCached) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index 8efc8016f9..b00e5680fe 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -143,9 +143,9 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan => case CreateTableUsingAsSelect(tableIdent, _, _, partitionColumns, mode, _, query) => // When the SaveMode is Overwrite, we need to check if the table is an input table of // the query. If so, we will throw an AnalysisException to let users know it is not allowed. - if (mode == SaveMode.Overwrite && catalog.tableExists(tableIdent.toSeq)) { + if (mode == SaveMode.Overwrite && catalog.tableExists(tableIdent)) { // Need to remove SubQuery operator. - EliminateSubQueries(catalog.lookupRelation(tableIdent.toSeq)) match { + EliminateSubQueries(catalog.lookupRelation(tableIdent)) match { // Only do the check if the table is a data source table // (the relation is a BaseRelation). case l @ LogicalRelation(dest: BaseRelation, _) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 356d4ff3fa..fd566c8276 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.execution.PhysicalRDD import scala.concurrent.duration._ @@ -287,8 +288,7 @@ class CachedTableSuite extends QueryTest with SharedSQLContext { testData.select('key).registerTempTable("t1") sqlContext.table("t1") sqlContext.dropTempTable("t1") - assert( - intercept[RuntimeException](sqlContext.table("t1")).getMessage.startsWith("Table Not Found")) + intercept[NoSuchTableException](sqlContext.table("t1")) } test("Drops cached temporary table") { @@ -300,8 +300,7 @@ class CachedTableSuite extends QueryTest with SharedSQLContext { assert(sqlContext.isCached("t2")) sqlContext.dropTempTable("t1") - assert( - intercept[RuntimeException](sqlContext.table("t1")).getMessage.startsWith("Table Not Found")) + intercept[NoSuchTableException](sqlContext.table("t1")) assert(!sqlContext.isCached("t2")) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 7a027e1308..b1fb068158 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.test.SharedSQLContext @@ -359,8 +360,8 @@ class JoinSuite extends QueryTest with SharedSQLContext { upperCaseData.where('N <= 4).registerTempTable("left") upperCaseData.where('N >= 3).registerTempTable("right") - val left = UnresolvedRelation(Seq("left"), None) - val right = UnresolvedRelation(Seq("right"), None) + val left = UnresolvedRelation(TableIdentifier("left"), None) + val right = UnresolvedRelation(TableIdentifier("right"), None) checkAnswer( left.join(right, $"left.N" === $"right.N", "full"), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala index eab0fbb196..5688f46e5e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala @@ -21,6 +21,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types.{BooleanType, StringType, StructField, StructType} +import org.apache.spark.sql.catalyst.TableIdentifier class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContext { import testImplicits._ @@ -32,7 +33,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex } after { - sqlContext.catalog.unregisterTable(Seq("ListTablesSuiteTable")) + sqlContext.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable")) } test("get all tables") { @@ -44,7 +45,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex sql("SHOW tables").filter("tableName = 'ListTablesSuiteTable'"), Row("ListTablesSuiteTable", true)) - sqlContext.catalog.unregisterTable(Seq("ListTablesSuiteTable")) + sqlContext.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable")) assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0) } @@ -57,7 +58,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex sql("show TABLES in DB").filter("tableName = 'ListTablesSuiteTable'"), Row("ListTablesSuiteTable", true)) - sqlContext.catalog.unregisterTable(Seq("ListTablesSuiteTable")) + sqlContext.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable")) assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index cc02ef81c9..baff7f5752 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -22,7 +22,7 @@ import java.io.File import org.apache.hadoop.fs.Path import org.apache.spark.sql._ -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.{TableIdentifier, InternalRow} import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT} import org.apache.spark.sql.test.SharedSQLContext @@ -49,7 +49,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext sql("INSERT INTO TABLE t SELECT * FROM tmp") checkAnswer(sqlContext.table("t"), (data ++ data).map(Row.fromTuple)) } - sqlContext.catalog.unregisterTable(Seq("tmp")) + sqlContext.catalog.unregisterTable(TableIdentifier("tmp")) } test("overwriting") { @@ -59,7 +59,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp") checkAnswer(sqlContext.table("t"), data.map(Row.fromTuple)) } - sqlContext.catalog.unregisterTable(Seq("tmp")) + sqlContext.catalog.unregisterTable(TableIdentifier("tmp")) } test("self-join") { |