aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <simonh@tw.ibm.com>2016-05-27 21:24:08 -0700
committerReynold Xin <rxin@databricks.com>2016-05-27 21:24:08 -0700
commitf1b220eeeed1d4d12121fe0b3b175da44488da68 (patch)
tree31014aacf7a6359c3b075cc168f2c81a3f5d59cc /sql
parent73178c75565e20f53e6ee1478f3d976732c64438 (diff)
downloadspark-f1b220eeeed1d4d12121fe0b3b175da44488da68.tar.gz
spark-f1b220eeeed1d4d12121fe0b3b175da44488da68.tar.bz2
spark-f1b220eeeed1d4d12121fe0b3b175da44488da68.zip
[SPARK-15553][SQL] Dataset.createTempView should use CreateViewCommand
## What changes were proposed in this pull request? Let `Dataset.createTempView` and `Dataset.createOrReplaceTempView` use `CreateViewCommand`, rather than calling `SparkSession.createTempView`. Besides, this patch also removes `SparkSession.createTempView`. ## How was this patch tested? Existing tests. Author: Liang-Chi Hsieh <simonh@tw.ibm.com> Closes #13327 from viirya/dataset-createtempview.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala23
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala18
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala8
7 files changed, 39 insertions, 31 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 4a073d1189..77731b1f26 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -50,6 +50,11 @@ case class CatalogStorageFormat(
compressed: Boolean,
serdeProperties: Map[String, String])
+object CatalogStorageFormat {
+ /** Empty storage format for default values and copies. */
+ val EmptyStorageFormat = CatalogStorageFormat(locationUri = None, inputFormat = None,
+ outputFormat = None, serde = None, compressed = false, serdeProperties = Map.empty)
+}
/**
* A column in a table.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index abd16f2149..7aeec20c49 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -35,6 +35,7 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.analysis._
+import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
@@ -44,7 +45,7 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.usePrettyExpression
import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution}
-import org.apache.spark.sql.execution.command.ExplainCommand
+import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand}
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation}
import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
import org.apache.spark.sql.execution.python.EvaluatePython
@@ -2329,8 +2330,14 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
@throws[AnalysisException]
- def createTempView(viewName: String): Unit = {
- sparkSession.createTempView(viewName, toDF(), replaceIfExists = false)
+ def createTempView(viewName: String): Unit = withPlan {
+ val tableDesc = CatalogTable(
+ identifier = sparkSession.sessionState.sqlParser.parseTableIdentifier(viewName),
+ tableType = CatalogTableType.VIEW,
+ schema = Seq.empty[CatalogColumn],
+ storage = CatalogStorageFormat.EmptyStorageFormat)
+ CreateViewCommand(tableDesc, logicalPlan, allowExisting = false, replace = false,
+ isTemporary = true, sql = "")
}
/**
@@ -2340,8 +2347,14 @@ class Dataset[T] private[sql](
* @group basic
* @since 2.0.0
*/
- def createOrReplaceTempView(viewName: String): Unit = {
- sparkSession.createTempView(viewName, toDF(), replaceIfExists = true)
+ def createOrReplaceTempView(viewName: String): Unit = withPlan {
+ val tableDesc = CatalogTable(
+ identifier = sparkSession.sessionState.sqlParser.parseTableIdentifier(viewName),
+ tableType = CatalogTableType.VIEW,
+ schema = Seq.empty[CatalogColumn],
+ storage = CatalogStorageFormat.EmptyStorageFormat)
+ CreateViewCommand(tableDesc, logicalPlan, allowExisting = false, replace = true,
+ isTemporary = true, sql = "")
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 66d9aa2c85..af419fcf95 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -552,7 +552,7 @@ class SQLContext private[sql](val sparkSession: SparkSession)
* only during the lifetime of this instance of SQLContext.
*/
private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): Unit = {
- sparkSession.createTempView(tableName, df, replaceIfExists = true)
+ df.createOrReplaceTempView(tableName)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index c9276cf140..20e22baa35 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -583,17 +583,6 @@ class SparkSession private(
Dataset.ofRows(self, sessionState.catalog.lookupRelation(tableIdent))
}
- /**
- * Creates a temporary view with a DataFrame. The lifetime of this temporary view is tied to
- * this [[SparkSession]].
- */
- private[sql] def createTempView(
- viewName: String, df: DataFrame, replaceIfExists: Boolean) = {
- sessionState.catalog.createTempView(
- sessionState.sqlParser.parseTableIdentifier(viewName).table,
- df.logicalPlan, replaceIfExists)
- }
-
/* ----------------- *
| Everything else |
* ----------------- */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index cfebfc6a5c..48fb95b519 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -902,8 +902,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
validateRowFormatFileFormat(ctx.rowFormat, ctx.createFileFormat, ctx)
val fileStorage = Option(ctx.createFileFormat).map(visitCreateFileFormat)
- .getOrElse(EmptyStorageFormat)
- val rowStorage = Option(ctx.rowFormat).map(visitRowFormat).getOrElse(EmptyStorageFormat)
+ .getOrElse(CatalogStorageFormat.EmptyStorageFormat)
+ val rowStorage = Option(ctx.rowFormat).map(visitRowFormat)
+ .getOrElse(CatalogStorageFormat.EmptyStorageFormat)
val location = Option(ctx.locationSpec).map(visitLocationSpec)
// If we are creating an EXTERNAL table, then the LOCATION field is required
if (external && location.isEmpty) {
@@ -976,15 +977,12 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
}
}
- /** Empty storage format for default values and copies. */
- private val EmptyStorageFormat = CatalogStorageFormat(None, None, None, None, false, Map.empty)
-
/**
* Create a [[CatalogStorageFormat]].
*/
override def visitTableFileFormat(
ctx: TableFileFormatContext): CatalogStorageFormat = withOrigin(ctx) {
- EmptyStorageFormat.copy(
+ CatalogStorageFormat.EmptyStorageFormat.copy(
inputFormat = Option(string(ctx.inFmt)),
outputFormat = Option(string(ctx.outFmt)))
}
@@ -997,7 +995,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
val source = ctx.identifier.getText
HiveSerDe.sourceToSerDe(source, conf) match {
case Some(s) =>
- EmptyStorageFormat.copy(
+ CatalogStorageFormat.EmptyStorageFormat.copy(
inputFormat = s.inputFormat,
outputFormat = s.outputFormat,
serde = s.serde)
@@ -1037,7 +1035,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
override def visitRowFormatSerde(
ctx: RowFormatSerdeContext): CatalogStorageFormat = withOrigin(ctx) {
import ctx._
- EmptyStorageFormat.copy(
+ CatalogStorageFormat.EmptyStorageFormat.copy(
serde = Option(string(name)),
serdeProperties = Option(tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty))
}
@@ -1067,7 +1065,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
ctx)
"line.delim" -> value
}
- EmptyStorageFormat.copy(serdeProperties = entries.toMap)
+ CatalogStorageFormat.EmptyStorageFormat.copy(serdeProperties = entries.toMap)
}
/**
@@ -1181,7 +1179,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
identifier = visitTableIdentifier(name),
tableType = CatalogTableType.VIEW,
schema = schema,
- storage = EmptyStorageFormat,
+ storage = CatalogStorageFormat.EmptyStorageFormat,
properties = properties,
viewOriginalText = sql,
viewText = sql,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
index 31dc016a01..b1290a4759 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/cache.scala
@@ -30,8 +30,7 @@ case class CacheTableCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
plan.foreach { logicalPlan =>
- sparkSession.createTempView(
- tableName, Dataset.ofRows(sparkSession, logicalPlan), replaceIfExists = true)
+ Dataset.ofRows(sparkSession, logicalPlan).createOrReplaceTempView(tableName)
}
sparkSession.catalog.cacheTable(tableName)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 84990119c9..6468916cdc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -57,8 +57,12 @@ case class CreateViewCommand(
override def output: Seq[Attribute] = Seq.empty[Attribute]
- require(tableDesc.tableType == CatalogTableType.VIEW)
- require(tableDesc.viewText.isDefined)
+ require(tableDesc.tableType == CatalogTableType.VIEW,
+ "The type of the table to created with CREATE VIEW must be 'CatalogTableType.VIEW'.")
+ if (!isTemporary) {
+ require(tableDesc.viewText.isDefined,
+ "The table to created with CREATE VIEW must have 'viewText'.")
+ }
if (allowExisting && replace) {
throw new AnalysisException("CREATE VIEW with both IF NOT EXISTS and REPLACE is not allowed.")