aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2015-02-16 12:52:05 -0800
committerMichael Armbrust <michael@databricks.com>2015-02-16 12:52:05 -0800
commitc51ab37faddf4ede23243058dfb388e74a192552 (patch)
treec8043dec2cc78693532354bc9177aec4781c896e /sql
parent6f54dee66100e5e58f6649158db257eb5009bd6a (diff)
downloadspark-c51ab37faddf4ede23243058dfb388e74a192552.tar.gz
spark-c51ab37faddf4ede23243058dfb388e74a192552.tar.bz2
spark-c51ab37faddf4ede23243058dfb388e74a192552.zip
[SPARK-5833] [SQL] Adds REFRESH TABLE command
Lifts `HiveMetastoreCatalog.refreshTable` to `Catalog`. Adds `RefreshTable` command to refresh (possibly cached) metadata in external data sources tables. <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/4624) <!-- Reviewable:end --> Author: Cheng Lian <lian@databricks.com> Closes #4624 from liancheng/refresh-table and squashes the following commits: 8d1aa4c [Cheng Lian] Adds REFRESH TABLE command
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala52
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala2
4 files changed, 42 insertions, 24 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
index bf97215ee6..9e6e2912e0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
@@ -46,6 +46,8 @@ trait Catalog {
*/
def getTables(databaseName: Option[String]): Seq[(String, Boolean)]
+ def refreshTable(databaseName: String, tableName: String): Unit
+
def registerTable(tableIdentifier: Seq[String], plan: LogicalPlan): Unit
def unregisterTable(tableIdentifier: Seq[String]): Unit
@@ -119,6 +121,10 @@ class SimpleCatalog(val caseSensitive: Boolean) extends Catalog {
case (name, _) => (name, true)
}.toSeq
}
+
+ override def refreshTable(databaseName: String, tableName: String): Unit = {
+ throw new UnsupportedOperationException
+ }
}
/**
@@ -224,4 +230,8 @@ object EmptyCatalog extends Catalog {
}
override def unregisterAllTables(): Unit = {}
+
+ override def refreshTable(databaseName: String, tableName: String): Unit = {
+ throw new UnsupportedOperationException
+ }
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index 8cac9c0fdf..1b5e8c280e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.AbstractSparkSQLParser
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.catalyst.expressions.{Row, AttributeReference}
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@@ -66,6 +66,7 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
protected val EXTENDED = Keyword("EXTENDED")
protected val AS = Keyword("AS")
protected val COMMENT = Keyword("COMMENT")
+ protected val REFRESH = Keyword("REFRESH")
// Data types.
protected val STRING = Keyword("STRING")
@@ -85,7 +86,7 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
protected val MAP = Keyword("MAP")
protected val STRUCT = Keyword("STRUCT")
- protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable
+ protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable | refreshTable
protected def start: Parser[LogicalPlan] = ddl
@@ -104,9 +105,8 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
* AS SELECT ...
*/
protected lazy val createTable: Parser[LogicalPlan] =
- (
- (CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ ident
- ~ (tableCols).? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> restInput).? ^^ {
+ (CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ ident ~
+ tableCols.? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> restInput).? ^^ {
case temp ~ allowExisting ~ tableName ~ columns ~ provider ~ opts ~ query =>
if (temp.isDefined && allowExisting.isDefined) {
throw new DDLException(
@@ -145,8 +145,7 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
allowExisting.isDefined,
managedIfNoPath = false)
}
- }
- )
+ }
protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> repsep(column, ",") <~ ")"
@@ -166,6 +165,12 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
DescribeCommand(UnresolvedRelation(tblIdentifier, None), e.isDefined)
}
+ protected lazy val refreshTable: Parser[LogicalPlan] =
+ REFRESH ~> TABLE ~> (ident <~ ".").? ~ ident ^^ {
+ case maybeDatabaseName ~ tableName =>
+ RefreshTable(maybeDatabaseName.getOrElse("default"), tableName)
+ }
+
protected lazy val options: Parser[Map[String, String]] =
"(" ~> repsep(pair, ",") <~ ")" ^^ { case s: Seq[(String, String)] => s.toMap }
@@ -177,10 +182,10 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
ident ~ dataType ~ (COMMENT ~> stringLit).? ^^ { case columnName ~ typ ~ cm =>
val meta = cm match {
case Some(comment) =>
- new MetadataBuilder().putString(COMMENT.str.toLowerCase(), comment).build()
+ new MetadataBuilder().putString(COMMENT.str.toLowerCase, comment).build()
case None => Metadata.empty
}
- StructField(columnName, typ, true, meta)
+ StructField(columnName, typ, nullable = true, meta)
}
protected lazy val primitiveType: Parser[DataType] =
@@ -318,24 +323,18 @@ private[sql] case class DescribeCommand(
isExtended: Boolean) extends Command {
override val output = Seq(
// Column names are based on Hive.
- AttributeReference("col_name", StringType, nullable = false,
+ AttributeReference("col_name", StringType, nullable = false,
new MetadataBuilder().putString("comment", "name of the column").build())(),
- AttributeReference("data_type", StringType, nullable = false,
+ AttributeReference("data_type", StringType, nullable = false,
new MetadataBuilder().putString("comment", "data type of the column").build())(),
- AttributeReference("comment", StringType, nullable = false,
+ AttributeReference("comment", StringType, nullable = false,
new MetadataBuilder().putString("comment", "comment of the column").build())())
}
/**
* Used to represent the operation of create table using a data source.
- * @param tableName
- * @param userSpecifiedSchema
- * @param provider
- * @param temporary
- * @param options
* @param allowExisting If it is true, we will do nothing when the table already exists.
- * If it is false, an exception will be thrown
- * @param managedIfNoPath
+ * If it is false, an exception will be thrown
*/
private[sql] case class CreateTableUsing(
tableName: String,
@@ -362,7 +361,7 @@ private[sql] case class CreateTableUsingAsLogicalPlan(
options: Map[String, String],
query: LogicalPlan) extends Command
-private [sql] case class CreateTempTableUsing(
+private[sql] case class CreateTempTableUsing(
tableName: String,
userSpecifiedSchema: Option[StructType],
provider: String,
@@ -376,7 +375,7 @@ private [sql] case class CreateTempTableUsing(
}
}
-private [sql] case class CreateTempTableUsingAsSelect(
+private[sql] case class CreateTempTableUsingAsSelect(
tableName: String,
provider: String,
mode: SaveMode,
@@ -393,6 +392,15 @@ private [sql] case class CreateTempTableUsingAsSelect(
}
}
+private[sql] case class RefreshTable(databaseName: String, tableName: String)
+ extends RunnableCommand {
+
+ override def run(sqlContext: SQLContext): Seq[Row] = {
+ sqlContext.catalog.refreshTable(databaseName, tableName)
+ Seq.empty[Row]
+ }
+}
+
/**
* Builds a map in which keys are case insensitive
*/
@@ -408,7 +416,7 @@ protected[sql] class CaseInsensitiveMap(map: Map[String, String]) extends Map[St
override def iterator: Iterator[(String, String)] = baseMap.iterator
- override def -(key: String): Map[String, String] = baseMap - key.toLowerCase()
+ override def -(key: String): Map[String, String] = baseMap - key.toLowerCase
}
/**
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 12f86a04a3..580c5706dd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -91,7 +91,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
CacheBuilder.newBuilder().maximumSize(1000).build(cacheLoader)
}
- def refreshTable(databaseName: String, tableName: String): Unit = {
+ override def refreshTable(databaseName: String, tableName: String): Unit = {
cachedDataSourceTables.refresh(QualifiedTableName(databaseName, tableName).toLowerCase)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index addf887ab9..375aae5d51 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -177,7 +177,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
sql("SELECT * FROM jsonTable"),
Row("a1", "b1"))
- refreshTable("jsonTable")
+ sql("REFRESH TABLE jsonTable")
// Check that the refresh worked
checkAnswer(