aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2015-10-14 16:05:37 -0700
committerDavies Liu <davies.liu@gmail.com>2015-10-14 16:05:37 -0700
commit56d7da14ab8f89bf4f303b27f51fd22d23967ffb (patch)
treed21fc4a4d5a197788a8b8e165d89d3e95bff13be /sql/core
parent9a430a027faafb083ca569698effb697af26a1db (diff)
downloadspark-56d7da14ab8f89bf4f303b27f51fd22d23967ffb.tar.gz
spark-56d7da14ab8f89bf4f303b27f51fd22d23967ffb.tar.bz2
spark-56d7da14ab8f89bf4f303b27f51fd22d23967ffb.zip
[SPARK-10104] [SQL] Consolidate different forms of table identifiers
Right now, we have QualifiedTableName, TableIdentifier, and Seq[String] to represent table identifiers. We should only have one form and TableIdentifier is the best one because it provides methods to get table name, database name, return unquoted string, and return quoted string. Author: Wenchen Fan <wenchen@databricks.com> Author: Wenchen Fan <cloud0fan@163.com> Closes #8453 from cloud-fan/table-name.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala6
10 files changed, 26 insertions, 25 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 97a8b6518a..eacdea2c1e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation
import org.apache.spark.sql.execution.datasources.{LogicalRelation, ResolvedDataSource}
import org.apache.spark.sql.types.StructType
import org.apache.spark.{Logging, Partition}
+import org.apache.spark.sql.catalyst.{SqlParser, TableIdentifier}
/**
* :: Experimental ::
@@ -287,7 +288,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
* @since 1.4.0
*/
def table(tableName: String): DataFrame = {
- DataFrame(sqlContext, sqlContext.catalog.lookupRelation(Seq(tableName)))
+ DataFrame(sqlContext, sqlContext.catalog.lookupRelation(TableIdentifier(tableName)))
}
///////////////////////////////////////////////////////////////////////////////////////
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 03e973666e..764510ab4b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -171,7 +171,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
val overwrite = mode == SaveMode.Overwrite
df.sqlContext.executePlan(
InsertIntoTable(
- UnresolvedRelation(tableIdent.toSeq),
+ UnresolvedRelation(tableIdent),
partitions.getOrElse(Map.empty[String, Option[String]]),
df.logicalPlan,
overwrite,
@@ -201,7 +201,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
}
private def saveAsTable(tableIdent: TableIdentifier): Unit = {
- val tableExists = df.sqlContext.catalog.tableExists(tableIdent.toSeq)
+ val tableExists = df.sqlContext.catalog.tableExists(tableIdent)
(tableExists, mode) match {
case (true, SaveMode.Ignore) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 3d5e35ab31..361eb576c5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -714,7 +714,7 @@ class SQLContext private[sql](
* only during the lifetime of this instance of SQLContext.
*/
private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): Unit = {
- catalog.registerTable(Seq(tableName), df.logicalPlan)
+ catalog.registerTable(TableIdentifier(tableName), df.logicalPlan)
}
/**
@@ -728,7 +728,7 @@ class SQLContext private[sql](
*/
def dropTempTable(tableName: String): Unit = {
cacheManager.tryUncacheQuery(table(tableName))
- catalog.unregisterTable(Seq(tableName))
+ catalog.unregisterTable(TableIdentifier(tableName))
}
/**
@@ -795,7 +795,7 @@ class SQLContext private[sql](
}
private def table(tableIdent: TableIdentifier): DataFrame = {
- DataFrame(this, catalog.lookupRelation(tableIdent.toSeq))
+ DataFrame(this, catalog.lookupRelation(tableIdent))
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala
index f7a88b98c0..446739d5b8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DDLParser.scala
@@ -140,7 +140,7 @@ class DDLParser(parseQuery: String => LogicalPlan)
protected lazy val describeTable: Parser[LogicalPlan] =
(DESCRIBE ~> opt(EXTENDED)) ~ tableIdentifier ^^ {
case e ~ tableIdent =>
- DescribeCommand(UnresolvedRelation(tableIdent.toSeq, None), e.isDefined)
+ DescribeCommand(UnresolvedRelation(tableIdent, None), e.isDefined)
}
protected lazy val refreshTable: Parser[LogicalPlan] =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 31d6b75e13..e7deeff13d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -71,7 +71,6 @@ case class CreateTableUsing(
* can analyze the logical plan that will be used to populate the table.
* So, [[PreWriteCheck]] can detect cases that are not allowed.
*/
-// TODO: Use TableIdentifier instead of String for tableName (SPARK-10104).
case class CreateTableUsingAsSelect(
tableIdent: TableIdentifier,
provider: String,
@@ -93,7 +92,7 @@ case class CreateTempTableUsing(
val resolved = ResolvedDataSource(
sqlContext, userSpecifiedSchema, Array.empty[String], provider, options)
sqlContext.catalog.registerTable(
- tableIdent.toSeq,
+ tableIdent,
DataFrame(sqlContext, LogicalRelation(resolved.relation)).logicalPlan)
Seq.empty[Row]
@@ -112,7 +111,7 @@ case class CreateTempTableUsingAsSelect(
val df = DataFrame(sqlContext, query)
val resolved = ResolvedDataSource(sqlContext, provider, partitionColumns, mode, options, df)
sqlContext.catalog.registerTable(
- tableIdent.toSeq,
+ tableIdent,
DataFrame(sqlContext, LogicalRelation(resolved.relation)).logicalPlan)
Seq.empty[Row]
@@ -128,7 +127,7 @@ case class RefreshTable(tableIdent: TableIdentifier)
// If this table is cached as a InMemoryColumnarRelation, drop the original
// cached version and make the new version cached lazily.
- val logicalPlan = sqlContext.catalog.lookupRelation(tableIdent.toSeq)
+ val logicalPlan = sqlContext.catalog.lookupRelation(tableIdent)
// Use lookupCachedData directly since RefreshTable also takes databaseName.
val isCached = sqlContext.cacheManager.lookupCachedData(logicalPlan).nonEmpty
if (isCached) {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 8efc8016f9..b00e5680fe 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -143,9 +143,9 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
case CreateTableUsingAsSelect(tableIdent, _, _, partitionColumns, mode, _, query) =>
// When the SaveMode is Overwrite, we need to check if the table is an input table of
// the query. If so, we will throw an AnalysisException to let users know it is not allowed.
- if (mode == SaveMode.Overwrite && catalog.tableExists(tableIdent.toSeq)) {
+ if (mode == SaveMode.Overwrite && catalog.tableExists(tableIdent)) {
// Need to remove SubQuery operator.
- EliminateSubQueries(catalog.lookupRelation(tableIdent.toSeq)) match {
+ EliminateSubQueries(catalog.lookupRelation(tableIdent)) match {
// Only do the check if the table is a data source table
// (the relation is a BaseRelation).
case l @ LogicalRelation(dest: BaseRelation, _) =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 356d4ff3fa..fd566c8276 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.execution.PhysicalRDD
import scala.concurrent.duration._
@@ -287,8 +288,7 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
testData.select('key).registerTempTable("t1")
sqlContext.table("t1")
sqlContext.dropTempTable("t1")
- assert(
- intercept[RuntimeException](sqlContext.table("t1")).getMessage.startsWith("Table Not Found"))
+ intercept[NoSuchTableException](sqlContext.table("t1"))
}
test("Drops cached temporary table") {
@@ -300,8 +300,7 @@ class CachedTableSuite extends QueryTest with SharedSQLContext {
assert(sqlContext.isCached("t2"))
sqlContext.dropTempTable("t1")
- assert(
- intercept[RuntimeException](sqlContext.table("t1")).getMessage.startsWith("Table Not Found"))
+ intercept[NoSuchTableException](sqlContext.table("t1"))
assert(!sqlContext.isCached("t2"))
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 7a027e1308..b1fb068158 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.test.SharedSQLContext
@@ -359,8 +360,8 @@ class JoinSuite extends QueryTest with SharedSQLContext {
upperCaseData.where('N <= 4).registerTempTable("left")
upperCaseData.where('N >= 3).registerTempTable("right")
- val left = UnresolvedRelation(Seq("left"), None)
- val right = UnresolvedRelation(Seq("right"), None)
+ val left = UnresolvedRelation(TableIdentifier("left"), None)
+ val right = UnresolvedRelation(TableIdentifier("right"), None)
checkAnswer(
left.join(right, $"left.N" === $"right.N", "full"),
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala
index eab0fbb196..5688f46e5e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala
@@ -21,6 +21,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.{BooleanType, StringType, StructField, StructType}
+import org.apache.spark.sql.catalyst.TableIdentifier
class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContext {
import testImplicits._
@@ -32,7 +33,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
}
after {
- sqlContext.catalog.unregisterTable(Seq("ListTablesSuiteTable"))
+ sqlContext.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
}
test("get all tables") {
@@ -44,7 +45,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
sql("SHOW tables").filter("tableName = 'ListTablesSuiteTable'"),
Row("ListTablesSuiteTable", true))
- sqlContext.catalog.unregisterTable(Seq("ListTablesSuiteTable"))
+ sqlContext.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0)
}
@@ -57,7 +58,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
sql("show TABLES in DB").filter("tableName = 'ListTablesSuiteTable'"),
Row("ListTablesSuiteTable", true))
- sqlContext.catalog.unregisterTable(Seq("ListTablesSuiteTable"))
+ sqlContext.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index cc02ef81c9..baff7f5752 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -22,7 +22,7 @@ import java.io.File
import org.apache.hadoop.fs.Path
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.{TableIdentifier, InternalRow}
import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
import org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStruct, NestedStructUDT}
import org.apache.spark.sql.test.SharedSQLContext
@@ -49,7 +49,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
sql("INSERT INTO TABLE t SELECT * FROM tmp")
checkAnswer(sqlContext.table("t"), (data ++ data).map(Row.fromTuple))
}
- sqlContext.catalog.unregisterTable(Seq("tmp"))
+ sqlContext.catalog.unregisterTable(TableIdentifier("tmp"))
}
test("overwriting") {
@@ -59,7 +59,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp")
checkAnswer(sqlContext.table("t"), data.map(Row.fromTuple))
}
- sqlContext.catalog.unregisterTable(Seq("tmp"))
+ sqlContext.catalog.unregisterTable(TableIdentifier("tmp"))
}
test("self-join") {