diff options
author | Liang-Chi Hsieh <simonh@tw.ibm.com> | 2016-05-27 21:24:08 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-05-27 21:24:08 -0700 |
commit | f1b220eeeed1d4d12121fe0b3b175da44488da68 (patch) | |
tree | 31014aacf7a6359c3b075cc168f2c81a3f5d59cc /sql/core | |
parent | 73178c75565e20f53e6ee1478f3d976732c64438 (diff) | |
download | spark-f1b220eeeed1d4d12121fe0b3b175da44488da68.tar.gz spark-f1b220eeeed1d4d12121fe0b3b175da44488da68.tar.bz2 spark-f1b220eeeed1d4d12121fe0b3b175da44488da68.zip |
[SPARK-15553][SQL] Dataset.createTempView should use CreateViewCommand
## What changes were proposed in this pull request?
Let `Dataset.createTempView` and `Dataset.createOrReplaceTempView` use `CreateViewCommand`, rather than calling `SparkSession.createTempView`. Besides, this patch also removes `SparkSession.createTempView`.
## How was this patch tested?
Existing tests.
Author: Liang-Chi Hsieh <simonh@tw.ibm.com>
Closes #13327 from viirya/dataset-createtempview.
Diffstat (limited to 'sql/core')
6 files changed, 34 insertions, 31 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index abd16f2149..7aeec20c49 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -35,6 +35,7 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.encoders._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ @@ -44,7 +45,7 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.util.usePrettyExpression import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution} -import org.apache.spark.sql.execution.command.ExplainCommand +import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand} import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation} import org.apache.spark.sql.execution.datasources.json.JacksonGenerator import org.apache.spark.sql.execution.python.EvaluatePython @@ -2329,8 +2330,14 @@ class Dataset[T] private[sql]( * @since 2.0.0 */ @throws[AnalysisException] - def createTempView(viewName: String): Unit = { - sparkSession.createTempView(viewName, toDF(), replaceIfExists = false) + def createTempView(viewName: String): Unit = withPlan { + val tableDesc = CatalogTable( + identifier = sparkSession.sessionState.sqlParser.parseTableIdentifier(viewName), + tableType = CatalogTableType.VIEW, + schema = Seq.empty[CatalogColumn], + storage = CatalogStorageFormat.EmptyStorageFormat) + CreateViewCommand(tableDesc, logicalPlan, allowExisting = false, replace = false, + isTemporary = true, sql = "") } /** @@ -2340,8 +2347,14 @@ class Dataset[T] private[sql]( * @group basic * @since 2.0.0 */ - def createOrReplaceTempView(viewName: String): Unit = { - sparkSession.createTempView(viewName, toDF(), replaceIfExists = true) + def createOrReplaceTempView(viewName: String): Unit = withPlan { + val tableDesc = CatalogTable( + identifier = sparkSession.sessionState.sqlParser.parseTableIdentifier(viewName), + tableType = CatalogTableType.VIEW, + schema = Seq.empty[CatalogColumn], + storage = CatalogStorageFormat.EmptyStorageFormat) + CreateViewCommand(tableDesc, logicalPlan, allowExisting = false, replace = true, + isTemporary = true, sql = "") } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 66d9aa2c85..af419fcf95 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -552,7 +552,7 @@ class SQLContext private[sql](val sparkSession: SparkSession) * only during the lifetime of this instance of SQLContext. */ private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): Unit = { - sparkSession.createTempView(tableName, df, replaceIfExists = true) + df.createOrReplaceTempView(tableName) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index c9276cf140..20e22baa35 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -583,17 +583,6 @@ class SparkSession private( Dataset.ofRows(self, sessionState.catalog.lookupRelation(tableIdent)) } - /** - * Creates a temporary view with a DataFrame. The lifetime of this temporary view is tied to - * this [[SparkSession]]. - */ - private[sql] def createTempView( - viewName: String, df: DataFrame, replaceIfExists: Boolean) = { - sessionState.catalog.createTempView( - sessionState.sqlParser.parseTableIdentifier(viewName).table, - df.logicalPlan, replaceIfExists) - } - /* ----------------- * | Everything else | * ----------------- */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index cfebfc6a5c..48fb95b519 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -902,8 +902,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx) val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat) - .getOrElse(EmptyStorageFormat) - val rowStorage = Option(ctx.rowFormat).map(visitRowFormat).getOrElse(EmptyStorageFormat) + .getOrElse(CatalogStorageFormat.EmptyStorageFormat) + val rowStorage = Option(ctx.rowFormat).map(visitRowFormat) + .getOrElse(CatalogStorageFormat.EmptyStorageFormat) val location = Option(ctx.locationSpec).map(visitLocationSpec) // If we are creating an EXTERNAL table, then the LOCATION field is required if (external && location.isEmpty) { @@ -976,15 +977,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { } } - /** Empty storage format for default values and copies. */ - private val EmptyStorageFormat = CatalogStorageFormat(None, None, None, None, false, Map.empty) - /** * Create a [[CatalogStorageFormat]]. */ override def visitTableFileFormat( ctx: TableFileFormatContext): CatalogStorageFormat = withOrigin(ctx) { - EmptyStorageFormat.copy( + CatalogStorageFormat.EmptyStorageFormat.copy( inputFormat = Option(string(ctx.inFmt)), outputFormat = Option(string(ctx.outFmt))) } @@ -997,7 +995,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { val source = ctx.identifier.getText HiveSerDe.sourceToSerDe(source, conf) match { case Some(s) => - EmptyStorageFormat.copy( + CatalogStorageFormat.EmptyStorageFormat.copy( inputFormat = s.inputFormat, outputFormat = s.outputFormat, serde = s.serde) @@ -1037,7 +1035,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { override def visitRowFormatSerde( ctx: RowFormatSerdeContext): CatalogStorageFormat = withOrigin(ctx) { import ctx._ - EmptyStorageFormat.copy( + CatalogStorageFormat.EmptyStorageFormat.copy( serde = Option(string(name)), serdeProperties = Option(tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)) } @@ -1067,7 +1065,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { ctx) "line.delim" -> value } - EmptyStorageFormat.copy(serdeProperties = entries.toMap) + CatalogStorageFormat.EmptyStorageFormat.copy(serdeProperties = entries.toMap) } /** @@ -1181,7 +1179,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { identifier = visitTableIdentifier(name), tableType = CatalogTableType.VIEW, schema = schema, - storage = EmptyStorageFormat, + storage = CatalogStorageFormat.EmptyStorageFormat, properties = properties, viewOriginalText = sql, viewText = sql, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala index 31dc016a01..b1290a4759 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala @@ -30,8 +30,7 @@ case class CacheTableCommand( override def run(sparkSession: SparkSession): Seq[Row] = { plan.foreach { logicalPlan => - sparkSession.createTempView( - tableName, Dataset.ofRows(sparkSession, logicalPlan), replaceIfExists = true) + Dataset.ofRows(sparkSession, logicalPlan).createOrReplaceTempView(tableName) } sparkSession.catalog.cacheTable(tableName) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala index 84990119c9..6468916cdc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala @@ -57,8 +57,12 @@ case class CreateViewCommand( override def output: Seq[Attribute] = Seq.empty[Attribute] - require(tableDesc.tableType == CatalogTableType.VIEW) - require(tableDesc.viewText.isDefined) + require(tableDesc.tableType == CatalogTableType.VIEW, + "The type of the table to created with CREATE VIEW must be 'CatalogTableType.VIEW'.") + if (!isTemporary) { + require(tableDesc.viewText.isDefined, + "The table to created with CREATE VIEW must have 'viewText'.") + } if (allowExisting && replace) { throw new AnalysisException("CREATE VIEW with both IF NOT EXISTS and REPLACE is not allowed.") |