From c51ab37faddf4ede23243058dfb388e74a192552 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 16 Feb 2015 12:52:05 -0800 Subject: [SPARK-5833] [SQL] Adds REFRESH TABLE command Lifts `HiveMetastoreCatalog.refreshTable` to `Catalog`. Adds `RefreshTable` command to refresh (possibly cached) metadata in external data sources tables. [Review on Reviewable](https://reviewable.io/reviews/apache/spark/4624) Author: Cheng Lian Closes #4624 from liancheng/refresh-table and squashes the following commits: 8d1aa4c [Cheng Lian] Adds REFRESH TABLE command --- .../scala/org/apache/spark/sql/sources/ddl.scala | 52 +++++++++++++--------- 1 file changed, 30 insertions(+), 22 deletions(-) (limited to 'sql/core') diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 8cac9c0fdf..1b5e8c280e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.AbstractSparkSQLParser import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation -import org.apache.spark.sql.catalyst.expressions.AttributeReference +import org.apache.spark.sql.catalyst.expressions.{Row, AttributeReference} import org.apache.spark.sql.execution.RunnableCommand import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -66,6 +66,7 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging { protected val EXTENDED = Keyword("EXTENDED") protected val AS = Keyword("AS") protected val COMMENT = Keyword("COMMENT") + protected val REFRESH = Keyword("REFRESH") // Data types. protected val STRING = Keyword("STRING") @@ -85,7 +86,7 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging { protected val MAP = Keyword("MAP") protected val STRUCT = Keyword("STRUCT") - protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable + protected lazy val ddl: Parser[LogicalPlan] = createTable | describeTable | refreshTable protected def start: Parser[LogicalPlan] = ddl @@ -104,9 +105,8 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging { * AS SELECT ... */ protected lazy val createTable: Parser[LogicalPlan] = - ( - (CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ ident - ~ (tableCols).? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> restInput).? ^^ { + (CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ ident ~ + tableCols.? ~ (USING ~> className) ~ (OPTIONS ~> options).? ~ (AS ~> restInput).? ^^ { case temp ~ allowExisting ~ tableName ~ columns ~ provider ~ opts ~ query => if (temp.isDefined && allowExisting.isDefined) { throw new DDLException( @@ -145,8 +145,7 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging { allowExisting.isDefined, managedIfNoPath = false) } - } - ) + } protected lazy val tableCols: Parser[Seq[StructField]] = "(" ~> repsep(column, ",") <~ ")" @@ -166,6 +165,12 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging { DescribeCommand(UnresolvedRelation(tblIdentifier, None), e.isDefined) } + protected lazy val refreshTable: Parser[LogicalPlan] = + REFRESH ~> TABLE ~> (ident <~ ".").? ~ ident ^^ { + case maybeDatabaseName ~ tableName => + RefreshTable(maybeDatabaseName.getOrElse("default"), tableName) + } + protected lazy val options: Parser[Map[String, String]] = "(" ~> repsep(pair, ",") <~ ")" ^^ { case s: Seq[(String, String)] => s.toMap } @@ -177,10 +182,10 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging { ident ~ dataType ~ (COMMENT ~> stringLit).? ^^ { case columnName ~ typ ~ cm => val meta = cm match { case Some(comment) => - new MetadataBuilder().putString(COMMENT.str.toLowerCase(), comment).build() + new MetadataBuilder().putString(COMMENT.str.toLowerCase, comment).build() case None => Metadata.empty } - StructField(columnName, typ, true, meta) + StructField(columnName, typ, nullable = true, meta) } protected lazy val primitiveType: Parser[DataType] = @@ -318,24 +323,18 @@ private[sql] case class DescribeCommand( isExtended: Boolean) extends Command { override val output = Seq( // Column names are based on Hive. - AttributeReference("col_name", StringType, nullable = false, + AttributeReference("col_name", StringType, nullable = false, new MetadataBuilder().putString("comment", "name of the column").build())(), - AttributeReference("data_type", StringType, nullable = false, + AttributeReference("data_type", StringType, nullable = false, new MetadataBuilder().putString("comment", "data type of the column").build())(), - AttributeReference("comment", StringType, nullable = false, + AttributeReference("comment", StringType, nullable = false, new MetadataBuilder().putString("comment", "comment of the column").build())()) } /** * Used to represent the operation of create table using a data source. - * @param tableName - * @param userSpecifiedSchema - * @param provider - * @param temporary - * @param options * @param allowExisting If it is true, we will do nothing when the table already exists. - * If it is false, an exception will be thrown - * @param managedIfNoPath + * If it is false, an exception will be thrown */ private[sql] case class CreateTableUsing( tableName: String, @@ -362,7 +361,7 @@ private[sql] case class CreateTableUsingAsLogicalPlan( options: Map[String, String], query: LogicalPlan) extends Command -private [sql] case class CreateTempTableUsing( +private[sql] case class CreateTempTableUsing( tableName: String, userSpecifiedSchema: Option[StructType], provider: String, @@ -376,7 +375,7 @@ private [sql] case class CreateTempTableUsing( } } -private [sql] case class CreateTempTableUsingAsSelect( +private[sql] case class CreateTempTableUsingAsSelect( tableName: String, provider: String, mode: SaveMode, @@ -393,6 +392,15 @@ private [sql] case class CreateTempTableUsingAsSelect( } } +private[sql] case class RefreshTable(databaseName: String, tableName: String) + extends RunnableCommand { + + override def run(sqlContext: SQLContext): Seq[Row] = { + sqlContext.catalog.refreshTable(databaseName, tableName) + Seq.empty[Row] + } +} + /** * Builds a map in which keys are case insensitive */ @@ -408,7 +416,7 @@ protected[sql] class CaseInsensitiveMap(map: Map[String, String]) extends Map[St override def iterator: Iterator[(String, String)] = baseMap.iterator - override def -(key: String): Map[String, String] = baseMap - key.toLowerCase() + override def -(key: String): Map[String, String] = baseMap - key.toLowerCase } /** -- cgit v1.2.3