aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-05-26 13:03:07 -0700
committerAndrew Or <andrew@databricks.com>2016-05-26 13:03:07 -0700
commit0f61d6efb45b9ee94fa663f67c4489fbdae2eded (patch)
treee54986e0a5f3671d50b7aef91f2f28efc4846c3f /sql
parent594a1bf200fea8d6bcf25839a49186f66f922bc8 (diff)
downloadspark-0f61d6efb45b9ee94fa663f67c4489fbdae2eded.tar.gz
spark-0f61d6efb45b9ee94fa663f67c4489fbdae2eded.tar.bz2
spark-0f61d6efb45b9ee94fa663f67c4489fbdae2eded.zip
[SPARK-15552][SQL] Remove unnecessary private[sql] methods in SparkSession
## What changes were proposed in this pull request? SparkSession has a list of unnecessary private[sql] methods. These methods cause some trouble because private[sql] doesn't apply in Java. In the cases that they are easy to remove, we can simply remove them. This patch does that. As part of this pull request, I also replaced a bunch of protected[sql] with private[sql], to tighten up visibility. ## How was this patch tested? Updated test cases to reflect the changes. Author: Reynold Xin <rxin@databricks.com> Closes #13319 from rxin/SPARK-15552.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala32
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala49
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala54
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala6
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala24
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala4
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala16
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala12
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala18
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala16
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala4
-rw-r--r--sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala7
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelectCommand.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ConcurrentHiveSuite.scala6
29 files changed, 129 insertions, 168 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 3aacce7d7f..2e85e36767 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -402,7 +402,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
Project(inputDataCols ++ inputPartCols, df.logicalPlan)
}.getOrElse(df.logicalPlan)
- df.sparkSession.executePlan(
+ df.sparkSession.sessionState.executePlan(
InsertIntoTable(
UnresolvedRelation(tableIdent),
partitions.getOrElse(Map.empty[String, Option[String]]),
@@ -524,7 +524,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
mode,
extraOptions.toMap,
df.logicalPlan)
- df.sparkSession.executePlan(cmd).toRdd
+ df.sparkSession.sessionState.executePlan(cmd).toRdd
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index e5140fcf13..961ae32b0b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -58,7 +58,7 @@ private[sql] object Dataset {
}
def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
- val qe = sparkSession.executePlan(logicalPlan)
+ val qe = sparkSession.sessionState.executePlan(logicalPlan)
qe.assertAnalyzed()
new Dataset[Row](sparkSession, logicalPlan, RowEncoder(qe.analyzed.schema))
}
@@ -165,14 +165,14 @@ class Dataset[T] private[sql](
// you wrap it with `withNewExecutionId` if this actions doesn't call other action.
def this(sparkSession: SparkSession, logicalPlan: LogicalPlan, encoder: Encoder[T]) = {
- this(sparkSession, sparkSession.executePlan(logicalPlan), encoder)
+ this(sparkSession, sparkSession.sessionState.executePlan(logicalPlan), encoder)
}
def this(sqlContext: SQLContext, logicalPlan: LogicalPlan, encoder: Encoder[T]) = {
this(sqlContext.sparkSession, logicalPlan, encoder)
}
- @transient protected[sql] val logicalPlan: LogicalPlan = {
+ @transient private[sql] val logicalPlan: LogicalPlan = {
def hasSideEffects(plan: LogicalPlan): Boolean = plan match {
case _: Command |
_: InsertIntoTable |
@@ -215,7 +215,7 @@ class Dataset[T] private[sql](
// sqlContext must be val because a stable identifier is expected when you import implicits
@transient lazy val sqlContext: SQLContext = sparkSession.sqlContext
- protected[sql] def resolve(colName: String): NamedExpression = {
+ private[sql] def resolve(colName: String): NamedExpression = {
queryExecution.analyzed.resolveQuoted(colName, sparkSession.sessionState.analyzer.resolver)
.getOrElse {
throw new AnalysisException(
@@ -223,7 +223,7 @@ class Dataset[T] private[sql](
}
}
- protected[sql] def numericColumns: Seq[Expression] = {
+ private[sql] def numericColumns: Seq[Expression] = {
schema.fields.filter(_.dataType.isInstanceOf[NumericType]).map { n =>
queryExecution.analyzed.resolveQuoted(n.name, sparkSession.sessionState.analyzer.resolver).get
}
@@ -417,7 +417,7 @@ class Dataset[T] private[sql](
*/
def explain(extended: Boolean): Unit = {
val explain = ExplainCommand(queryExecution.logical, extended = extended)
- sparkSession.executePlan(explain).executedPlan.executeCollect().foreach {
+ sparkSession.sessionState.executePlan(explain).executedPlan.executeCollect().foreach {
// scalastyle:off println
r => println(r.getString(0))
// scalastyle:on println
@@ -641,7 +641,7 @@ class Dataset[T] private[sql](
def join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame = {
// Analyze the self join. The assumption is that the analyzer will disambiguate left vs right
// by creating a new instance for one of the branch.
- val joined = sparkSession.executePlan(
+ val joined = sparkSession.sessionState.executePlan(
Join(logicalPlan, right.logicalPlan, joinType = JoinType(joinType), None))
.analyzed.asInstanceOf[Join]
@@ -757,7 +757,7 @@ class Dataset[T] private[sql](
val left = this.logicalPlan
val right = other.logicalPlan
- val joined = sparkSession.executePlan(Join(left, right, joinType =
+ val joined = sparkSession.sessionState.executePlan(Join(left, right, joinType =
JoinType(joinType), Some(condition.expr)))
val leftOutput = joined.analyzed.output.take(left.output.length)
val rightOutput = joined.analyzed.output.takeRight(right.output.length)
@@ -1263,7 +1263,7 @@ class Dataset[T] private[sql](
def groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T] = {
val inputPlan = logicalPlan
val withGroupingKey = AppendColumns(func, inputPlan)
- val executed = sparkSession.executePlan(withGroupingKey)
+ val executed = sparkSession.sessionState.executePlan(withGroupingKey)
new KeyValueGroupedDataset(
encoderFor[K],
@@ -2238,7 +2238,7 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
def persist(): this.type = {
- sparkSession.cacheManager.cacheQuery(this)
+ sparkSession.sharedState.cacheManager.cacheQuery(this)
this
}
@@ -2260,7 +2260,7 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
def persist(newLevel: StorageLevel): this.type = {
- sparkSession.cacheManager.cacheQuery(this, None, newLevel)
+ sparkSession.sharedState.cacheManager.cacheQuery(this, None, newLevel)
this
}
@@ -2273,7 +2273,7 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
def unpersist(blocking: Boolean): this.type = {
- sparkSession.cacheManager.tryUncacheQuery(this, blocking)
+ sparkSession.sharedState.cacheManager.tryUncacheQuery(this, blocking)
this
}
@@ -2294,7 +2294,7 @@ class Dataset[T] private[sql](
lazy val rdd: RDD[T] = {
val objectType = unresolvedTEncoder.deserializer.dataType
val deserialized = CatalystSerde.deserialize[T](logicalPlan)
- sparkSession.executePlan(deserialized).toRdd.mapPartitions { rows =>
+ sparkSession.sessionState.executePlan(deserialized).toRdd.mapPartitions { rows =>
rows.map(_.get(0, objectType).asInstanceOf[T])
}
}
@@ -2417,19 +2417,19 @@ class Dataset[T] private[sql](
/**
* Converts a JavaRDD to a PythonRDD.
*/
- protected[sql] def javaToPython: JavaRDD[Array[Byte]] = {
+ private[sql] def javaToPython: JavaRDD[Array[Byte]] = {
val structType = schema // capture it for closure
val rdd = queryExecution.toRdd.map(EvaluatePython.toJava(_, structType))
EvaluatePython.javaToPython(rdd)
}
- protected[sql] def collectToPython(): Int = {
+ private[sql] def collectToPython(): Int = {
withNewExecutionId {
PythonRDD.collectAndServe(javaToPython.rdd)
}
}
- protected[sql] def toPythonIterator(): Int = {
+ private[sql] def toPythonIterator(): Int = {
withNewExecutionId {
PythonRDD.toLocalIteratorAndServe(javaToPython.rdd)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 7013e316ea..b17fb8a839 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -83,12 +83,9 @@ class SQLContext private[sql](
// TODO: move this logic into SparkSession
- protected[sql] def sessionState: SessionState = sparkSession.sessionState
- protected[sql] def sharedState: SharedState = sparkSession.sharedState
- protected[sql] def conf: SQLConf = sessionState.conf
- protected[sql] def runtimeConf: RuntimeConfig = sparkSession.conf
- protected[sql] def cacheManager: CacheManager = sparkSession.cacheManager
- protected[sql] def externalCatalog: ExternalCatalog = sparkSession.externalCatalog
+ private[sql] def sessionState: SessionState = sparkSession.sessionState
+ private[sql] def sharedState: SharedState = sparkSession.sharedState
+ private[sql] def conf: SQLConf = sessionState.conf
def sparkContext: SparkContext = sparkSession.sparkContext
@@ -167,14 +164,6 @@ class SQLContext private[sql](
sparkSession.conf.getAll
}
- protected[sql] def parseSql(sql: String): LogicalPlan = sparkSession.parseSql(sql)
-
- protected[sql] def executeSql(sql: String): QueryExecution = sparkSession.executeSql(sql)
-
- protected[sql] def executePlan(plan: LogicalPlan): QueryExecution = {
- sparkSession.executePlan(plan)
- }
-
/**
* :: Experimental ::
* A collection of methods that are considered experimental, but can be used to hook into
@@ -241,15 +230,6 @@ class SQLContext private[sql](
}
/**
- * Returns true if the [[Dataset]] is currently cached in-memory.
- * @group cachemgmt
- * @since 1.3.0
- */
- private[sql] def isCached(qName: Dataset[_]): Boolean = {
- sparkSession.cacheManager.lookupCachedData(qName).nonEmpty
- }
-
- /**
* Caches the specified table in-memory.
* @group cachemgmt
* @since 1.3.0
@@ -718,26 +698,9 @@ class SQLContext private[sql](
* have the same format as the one generated by `toString` in scala.
* It is only used by PySpark.
*/
- protected[sql] def parseDataType(dataTypeString: String): DataType = {
- sparkSession.parseDataType(dataTypeString)
- }
-
- /**
- * Apply a schema defined by the schemaString to an RDD. It is only used by PySpark.
- */
- protected[sql] def applySchemaToPythonRDD(
- rdd: RDD[Array[Any]],
- schemaString: String): DataFrame = {
- sparkSession.applySchemaToPythonRDD(rdd, schemaString)
- }
-
- /**
- * Apply a schema defined by the schema to an RDD. It is only used by PySpark.
- */
- protected[sql] def applySchemaToPythonRDD(
- rdd: RDD[Array[Any]],
- schema: StructType): DataFrame = {
- sparkSession.applySchemaToPythonRDD(rdd, schema)
+ // TODO: Remove this function (would require updating PySpark).
+ private[sql] def parseDataType(dataTypeString: String): DataType = {
+ DataType.fromJson(dataTypeString)
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 86c97b92df..a36368afe2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -34,10 +34,9 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
import org.apache.spark.sql.catalog.Catalog
import org.apache.spark.sql.catalyst._
-import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders._
import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Range}
+import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Range}
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.execution.ui.SQLListener
@@ -51,7 +50,14 @@ import org.apache.spark.util.Utils
/**
* The entry point to programming Spark with the Dataset and DataFrame API.
*
- * To create a SparkSession, use the following builder pattern:
+ * In environments that this has been created upfront (e.g. REPL, notebooks), use the builder
+ * to get an existing session:
+ *
+ * {{{
+ * SparkSession.builder().getOrCreate()
+ * }}}
+ *
+ * The builder can also be used to create a new session:
*
* {{{
* SparkSession.builder()
@@ -81,7 +87,7 @@ class SparkSession private(
* and a catalog that interacts with external systems.
*/
@transient
- protected[sql] lazy val sharedState: SharedState = {
+ private[sql] lazy val sharedState: SharedState = {
existingSharedState.getOrElse(
SparkSession.reflect[SharedState, SparkContext](
SparkSession.sharedStateClassName(sparkContext.conf),
@@ -93,7 +99,7 @@ class SparkSession private(
* functions, and everything else that accepts a [[org.apache.spark.sql.internal.SQLConf]].
*/
@transient
- protected[sql] lazy val sessionState: SessionState = {
+ private[sql] lazy val sessionState: SessionState = {
SparkSession.reflect[SessionState, SparkSession](
SparkSession.sessionStateClassName(sparkContext.conf),
self)
@@ -105,10 +111,6 @@ class SparkSession private(
@transient
private[sql] val sqlContext: SQLContext = new SQLContext(this)
- protected[sql] def cacheManager: CacheManager = sharedState.cacheManager
- protected[sql] def listener: SQLListener = sharedState.listener
- protected[sql] def externalCatalog: ExternalCatalog = sharedState.externalCatalog
-
/**
* Runtime configuration interface for Spark.
*
@@ -178,12 +180,14 @@ class SparkSession private(
def udf: UDFRegistration = sessionState.udf
/**
+ * :: Experimental ::
* Returns a [[ContinuousQueryManager]] that allows managing all the
* [[org.apache.spark.sql.ContinuousQuery ContinuousQueries]] active on `this`.
*
* @group basic
* @since 2.0.0
*/
+ @Experimental
def streams: ContinuousQueryManager = sessionState.continuousQueryManager
/**
@@ -208,13 +212,11 @@ class SparkSession private(
* --------------------------------- */
/**
- * :: Experimental ::
* Returns a [[DataFrame]] with no rows or columns.
*
* @group dataframes
* @since 2.0.0
*/
- @Experimental
@transient
lazy val emptyDataFrame: DataFrame = {
createDataFrame(sparkContext.emptyRDD[Row], StructType(Nil))
@@ -449,7 +451,7 @@ class SparkSession private(
* Creates a [[DataFrame]] from an RDD[Row].
* User can specify whether the input rows should be converted to Catalyst rows.
*/
- protected[sql] def internalCreateDataFrame(
+ private[sql] def internalCreateDataFrame(
catalystRows: RDD[InternalRow],
schema: StructType): DataFrame = {
// TODO: use MutableProjection when rowRDD is another DataFrame and the applied
@@ -462,7 +464,7 @@ class SparkSession private(
* Creates a [[DataFrame]] from an RDD[Row].
* User can specify whether the input rows should be converted to Catalyst rows.
*/
- protected[sql] def createDataFrame(
+ private[sql] def createDataFrame(
rowRDD: RDD[Row],
schema: StructType,
needsConversion: Boolean) = {
@@ -502,7 +504,7 @@ class SparkSession private(
table(sessionState.sqlParser.parseTableIdentifier(tableName))
}
- protected[sql] def table(tableIdent: TableIdentifier): DataFrame = {
+ private[sql] def table(tableIdent: TableIdentifier): DataFrame = {
Dataset.ofRows(self, sessionState.catalog.lookupRelation(tableIdent))
}
@@ -510,7 +512,7 @@ class SparkSession private(
* Creates a temporary view with a DataFrame. The lifetime of this temporary view is tied to
* this [[SparkSession]].
*/
- protected[sql] def createTempView(
+ private[sql] def createTempView(
viewName: String, df: DataFrame, replaceIfExists: Boolean) = {
sessionState.catalog.createTempView(
sessionState.sqlParser.parseTableIdentifier(viewName).table,
@@ -529,11 +531,10 @@ class SparkSession private(
* @since 2.0.0
*/
def sql(sqlText: String): DataFrame = {
- Dataset.ofRows(self, parseSql(sqlText))
+ Dataset.ofRows(self, sessionState.sqlParser.parsePlan(sqlText))
}
/**
- * :: Experimental ::
* Returns a [[DataFrameReader]] that can be used to read data and streams in as a [[DataFrame]].
* {{{
* sparkSession.read.parquet("/path/to/file.parquet")
@@ -543,7 +544,6 @@ class SparkSession private(
* @group genericdata
* @since 2.0.0
*/
- @Experimental
def read: DataFrameReader = new DataFrameReader(self)
@@ -577,18 +577,6 @@ class SparkSession private(
sparkContext.stop()
}
- protected[sql] def parseSql(sql: String): LogicalPlan = {
- sessionState.sqlParser.parsePlan(sql)
- }
-
- protected[sql] def executeSql(sql: String): QueryExecution = {
- executePlan(parseSql(sql))
- }
-
- protected[sql] def executePlan(plan: LogicalPlan): QueryExecution = {
- sessionState.executePlan(plan)
- }
-
/**
* Parses the data type in our internal string representation. The data type string should
* have the same format as the one generated by `toString` in scala.
@@ -601,17 +589,17 @@ class SparkSession private(
/**
* Apply a schema defined by the schemaString to an RDD. It is only used by PySpark.
*/
- protected[sql] def applySchemaToPythonRDD(
+ private[sql] def applySchemaToPythonRDD(
rdd: RDD[Array[Any]],
schemaString: String): DataFrame = {
- val schema = parseDataType(schemaString).asInstanceOf[StructType]
+ val schema = DataType.fromJson(schemaString).asInstanceOf[StructType]
applySchemaToPythonRDD(rdd, schema)
}
/**
* Apply a schema defined by the schema to an RDD. It is only used by PySpark.
*/
- protected[sql] def applySchemaToPythonRDD(
+ private[sql] def applySchemaToPythonRDD(
rdd: RDD[Array[Any]],
schema: StructType): DataFrame = {
val rowRdd = rdd.map(r => python.EvaluatePython.fromJava(r, schema).asInstanceOf[InternalRow])
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index f601138a9d..c8bdb0d22c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -95,7 +95,7 @@ private[sql] class CacheManager extends Logging {
sparkSession.sessionState.conf.useCompression,
sparkSession.sessionState.conf.columnBatchSize,
storageLevel,
- sparkSession.executePlan(planToCache).executedPlan,
+ sparkSession.sessionState.executePlan(planToCache).executedPlan,
tableName))
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 34187b9a1a..330459c11e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -67,7 +67,7 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) {
lazy val withCachedData: LogicalPlan = {
assertAnalyzed()
assertSupported()
- sparkSession.cacheManager.useCachedData(analyzed)
+ sparkSession.sharedState.cacheManager.useCachedData(analyzed)
}
lazy val optimizedPlan: LogicalPlan = sparkSession.sessionState.optimizer.execute(withCachedData)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index d5aaccc4bd..642a95a992 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -98,7 +98,7 @@ case class ExplainCommand(
// Run through the optimizer to generate the physical plan.
override def run(sparkSession: SparkSession): Seq[Row] = try {
- val queryExecution = sparkSession.executePlan(logicalPlan)
+ val queryExecution = sparkSession.sessionState.executePlan(logicalPlan)
val outputString =
if (codegen) {
codegenString(queryExecution.executedPlan)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index ffea628552..7ce7bb903d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -200,7 +200,8 @@ case class DropTableCommand(
case _ =>
})
try {
- sparkSession.cacheManager.tryUncacheQuery(sparkSession.table(tableName.quotedString))
+ sparkSession.sharedState.cacheManager.tryUncacheQuery(
+ sparkSession.table(tableName.quotedString))
} catch {
case NonFatal(e) => log.warn(e.toString, e)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
index 075849afde..84990119c9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/views.scala
@@ -79,7 +79,7 @@ case class CreateViewCommand(
override def run(sparkSession: SparkSession): Seq[Row] = {
// If the plan cannot be analyzed, throw an exception and don't proceed.
- val qe = sparkSession.executePlan(child)
+ val qe = sparkSession.sessionState.executePlan(child)
qe.assertAnalyzed()
val analyzedPlan = qe.analyzed
@@ -132,7 +132,7 @@ case class CreateViewCommand(
val projectList = analyzedPlan.output.zip(tableDesc.schema).map {
case (attr, col) => Alias(attr, col.name)()
}
- sparkSession.executePlan(Project(projectList, analyzedPlan)).analyzed
+ sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed
}
}
@@ -153,7 +153,7 @@ case class CreateViewCommand(
val projectList = analyzedPlan.output.zip(tableDesc.schema).map {
case (attr, col) => Alias(attr, col.name)()
}
- sparkSession.executePlan(Project(projectList, analyzedPlan)).analyzed
+ sparkSession.sessionState.executePlan(Project(projectList, analyzedPlan)).analyzed
}
new SQLBuilder(logicalPlan).toSQL
} else {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index dfe06478fc..b3beb6c85f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -476,7 +476,7 @@ case class DataSource(
options,
data.logicalPlan,
mode)
- sparkSession.executePlan(plan).toRdd
+ sparkSession.sessionState.executePlan(plan).toRdd
case _ =>
sys.error(s"${providingClass.getCanonicalName} does not allow create table as select.")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
index c3e07f7d00..25b901f2db 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
@@ -40,7 +40,7 @@ private[sql] case class InsertIntoDataSourceCommand(
relation.insert(df, overwrite)
// Invalidate the cache.
- sparkSession.cacheManager.invalidateCache(logicalRelation)
+ sparkSession.sharedState.cacheManager.invalidateCache(logicalRelation)
Seq.empty[Row]
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 1371abe189..f3f36efda5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -230,7 +230,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
bucketSpec = None,
allowExisting = false,
managedIfNoPath = false)
- sparkSession.executePlan(cmd).toRdd
+ sparkSession.sessionState.executePlan(cmd).toRdd
sparkSession.table(tableIdent)
}
@@ -278,7 +278,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
bucketSpec = None,
allowExisting = false,
managedIfNoPath = false)
- sparkSession.executePlan(cmd).toRdd
+ sparkSession.sessionState.executePlan(cmd).toRdd
sparkSession.table(tableIdent)
}
@@ -291,7 +291,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
* @since 2.0.0
*/
override def dropTempView(viewName: String): Unit = {
- sparkSession.cacheManager.tryUncacheQuery(sparkSession.table(viewName))
+ sparkSession.sharedState.cacheManager.tryUncacheQuery(sparkSession.table(viewName))
sessionCatalog.dropTable(TableIdentifier(viewName), ignoreIfNotExists = true)
}
@@ -302,7 +302,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
* @since 2.0.0
*/
override def isCached(tableName: String): Boolean = {
- sparkSession.cacheManager.lookupCachedData(sparkSession.table(tableName)).nonEmpty
+ sparkSession.sharedState.cacheManager.lookupCachedData(sparkSession.table(tableName)).nonEmpty
}
/**
@@ -312,7 +312,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
* @since 2.0.0
*/
override def cacheTable(tableName: String): Unit = {
- sparkSession.cacheManager.cacheQuery(sparkSession.table(tableName), Some(tableName))
+ sparkSession.sharedState.cacheManager.cacheQuery(sparkSession.table(tableName), Some(tableName))
}
/**
@@ -322,7 +322,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
* @since 2.0.0
*/
override def uncacheTable(tableName: String): Unit = {
- sparkSession.cacheManager.uncacheQuery(sparkSession.table(tableName))
+ sparkSession.sharedState.cacheManager.uncacheQuery(sparkSession.table(tableName))
}
/**
@@ -332,7 +332,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
* @since 2.0.0
*/
override def clearCache(): Unit = {
- sparkSession.cacheManager.clearCache()
+ sparkSession.sharedState.cacheManager.clearCache()
}
/**
@@ -342,7 +342,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
* @since 2.0.0
*/
protected[sql] def isCached(qName: Dataset[_]): Boolean = {
- sparkSession.cacheManager.lookupCachedData(qName).nonEmpty
+ sparkSession.sharedState.cacheManager.lookupCachedData(qName).nonEmpty
}
/**
@@ -360,15 +360,15 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
// cached version and make the new version cached lazily.
val logicalPlan = sparkSession.sessionState.catalog.lookupRelation(tableIdent)
// Use lookupCachedData directly since RefreshTable also takes databaseName.
- val isCached = sparkSession.cacheManager.lookupCachedData(logicalPlan).nonEmpty
+ val isCached = sparkSession.sharedState.cacheManager.lookupCachedData(logicalPlan).nonEmpty
if (isCached) {
// Create a data frame to represent the table.
// TODO: Use uncacheTable once it supports database name.
val df = Dataset.ofRows(sparkSession, logicalPlan)
// Uncache the logicalPlan.
- sparkSession.cacheManager.tryUncacheQuery(df, blocking = true)
+ sparkSession.sharedState.cacheManager.tryUncacheQuery(df, blocking = true)
// Cache it again.
- sparkSession.cacheManager.cacheQuery(df, Some(tableIdent.table))
+ sparkSession.sharedState.cacheManager.cacheQuery(df, Some(tableIdent.table))
}
}
@@ -383,7 +383,7 @@ private[sql] object CatalogImpl {
val enc = ExpressionEncoder[T]()
val encoded = data.map(d => enc.toRow(d).copy())
val plan = new LocalRelation(enc.schema.toAttributes, encoded)
- val queryExecution = sparkSession.executePlan(plan)
+ val queryExecution = sparkSession.sessionState.executePlan(plan)
new Dataset[T](sparkSession, queryExecution, enc)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index c9cc2ba04a..4c7bbf04bc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -92,7 +92,7 @@ private[sql] class SessionState(sparkSession: SparkSession) {
* Internal catalog for managing table and database states.
*/
lazy val catalog = new SessionCatalog(
- sparkSession.externalCatalog,
+ sparkSession.sharedState.externalCatalog,
functionResourceLoader,
functionRegistry,
conf,
@@ -161,6 +161,8 @@ private[sql] class SessionState(sparkSession: SparkSession) {
// Helper methods, partially leftover from pre-2.0 days
// ------------------------------------------------------
+ def executeSql(sql: String): QueryExecution = executePlan(sqlParser.parsePlan(sql))
+
def executePlan(plan: LogicalPlan): QueryExecution = new QueryExecution(sparkSession, plan)
def invalidateTable(tableName: String): Unit = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 1c96bdc05c..e08a9ab7e6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -79,17 +79,17 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
}
test("unpersist an uncached table will not raise exception") {
- assert(None == spark.cacheManager.lookupCachedData(testData))
+ assert(None == spark.sharedState.cacheManager.lookupCachedData(testData))
testData.unpersist(blocking = true)
- assert(None == spark.cacheManager.lookupCachedData(testData))
+ assert(None == spark.sharedState.cacheManager.lookupCachedData(testData))
testData.unpersist(blocking = false)
- assert(None == spark.cacheManager.lookupCachedData(testData))
+ assert(None == spark.sharedState.cacheManager.lookupCachedData(testData))
testData.persist()
- assert(None != spark.cacheManager.lookupCachedData(testData))
+ assert(None != spark.sharedState.cacheManager.lookupCachedData(testData))
testData.unpersist(blocking = true)
- assert(None == spark.cacheManager.lookupCachedData(testData))
+ assert(None == spark.sharedState.cacheManager.lookupCachedData(testData))
testData.unpersist(blocking = false)
- assert(None == spark.cacheManager.lookupCachedData(testData))
+ assert(None == spark.sharedState.cacheManager.lookupCachedData(testData))
}
test("cache table as select") {
@@ -311,14 +311,14 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
spark.catalog.cacheTable("t1")
spark.catalog.cacheTable("t2")
spark.catalog.clearCache()
- assert(spark.cacheManager.isEmpty)
+ assert(spark.sharedState.cacheManager.isEmpty)
sql("SELECT key FROM testData LIMIT 10").createOrReplaceTempView("t1")
sql("SELECT key FROM testData LIMIT 5").createOrReplaceTempView("t2")
spark.catalog.cacheTable("t1")
spark.catalog.cacheTable("t2")
sql("Clear CACHE")
- assert(spark.cacheManager.isEmpty)
+ assert(spark.sharedState.cacheManager.isEmpty)
}
test("Clear accumulators when uncacheTable to prevent memory leaking") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
index fa8fa06907..d5cb5e1568 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala
@@ -104,7 +104,7 @@ class DataFramePivotSuite extends QueryTest with SharedSQLContext{
// pivot with extra columns to trigger optimization
.pivot("course", Seq("dotNET", "Java") ++ (1 to 10).map(_.toString))
.agg(sum($"earnings"))
- val queryExecution = spark.executePlan(df.queryExecution.logical)
+ val queryExecution = spark.sessionState.executePlan(df.queryExecution.logical)
assert(queryExecution.simpleString.contains("pivotfirst"))
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
index 8c0906b746..ac9f6c2f38 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala
@@ -39,7 +39,8 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext {
2, 3, 4)
// Drop the cache.
cached.unpersist()
- assert(spark.cacheManager.lookupCachedData(cached).isEmpty, "The Dataset should not be cached.")
+ assert(spark.sharedState.cacheManager.lookupCachedData(cached).isEmpty,
+ "The Dataset should not be cached.")
}
test("persist and then rebind right encoder when join 2 datasets") {
@@ -56,10 +57,10 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext {
assertCached(joined, 2)
ds1.unpersist()
- assert(spark.cacheManager.lookupCachedData(ds1).isEmpty,
+ assert(spark.sharedState.cacheManager.lookupCachedData(ds1).isEmpty,
"The Dataset ds1 should not be cached.")
ds2.unpersist()
- assert(spark.cacheManager.lookupCachedData(ds2).isEmpty,
+ assert(spark.sharedState.cacheManager.lookupCachedData(ds2).isEmpty,
"The Dataset ds2 should not be cached.")
}
@@ -75,9 +76,10 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext {
assertCached(agged.filter(_._1 == "b"))
ds.unpersist()
- assert(spark.cacheManager.lookupCachedData(ds).isEmpty, "The Dataset ds should not be cached.")
+ assert(spark.sharedState.cacheManager.lookupCachedData(ds).isEmpty,
+ "The Dataset ds should not be cached.")
agged.unpersist()
- assert(spark.cacheManager.lookupCachedData(agged).isEmpty,
+ assert(spark.sharedState.cacheManager.lookupCachedData(agged).isEmpty,
"The Dataset agged should not be cached.")
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 5583673708..cbf4a8a612 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -60,7 +60,7 @@ class JoinSuite extends QueryTest with SharedSQLContext {
}
test("join operator selection") {
- spark.cacheManager.clearCache()
+ spark.sharedState.cacheManager.clearCache()
withSQLConf("spark.sql.autoBroadcastJoinThreshold" -> "0",
SQLConf.CROSS_JOINS_ENABLED.key -> "true") {
@@ -113,7 +113,7 @@ class JoinSuite extends QueryTest with SharedSQLContext {
// }
test("broadcasted hash join operator selection") {
- spark.cacheManager.clearCache()
+ spark.sharedState.cacheManager.clearCache()
sql("CACHE TABLE testData")
Seq(
("SELECT * FROM testData join testData2 ON key = a",
@@ -127,7 +127,7 @@ class JoinSuite extends QueryTest with SharedSQLContext {
}
test("broadcasted hash outer join operator selection") {
- spark.cacheManager.clearCache()
+ spark.sharedState.cacheManager.clearCache()
sql("CACHE TABLE testData")
sql("CACHE TABLE testData2")
Seq(
@@ -450,7 +450,7 @@ class JoinSuite extends QueryTest with SharedSQLContext {
}
test("broadcasted existence join operator selection") {
- spark.cacheManager.clearCache()
+ spark.sharedState.cacheManager.clearCache()
sql("CACHE TABLE testData")
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Long.MaxValue.toString) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index e2fb91352d..af3ed14c12 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -33,7 +33,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
setupTestData()
test("simple columnar query") {
- val plan = spark.executePlan(testData.logicalPlan).sparkPlan
+ val plan = spark.sessionState.executePlan(testData.logicalPlan).sparkPlan
val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None)
checkAnswer(scan, testData.collect().toSeq)
@@ -50,7 +50,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
}
test("projection") {
- val plan = spark.executePlan(testData.select('value, 'key).logicalPlan).sparkPlan
+ val plan = spark.sessionState.executePlan(testData.select('value, 'key).logicalPlan).sparkPlan
val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None)
checkAnswer(scan, testData.collect().map {
@@ -59,7 +59,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
}
test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") {
- val plan = spark.executePlan(testData.logicalPlan).sparkPlan
+ val plan = spark.sessionState.executePlan(testData.logicalPlan).sparkPlan
val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan, None)
checkAnswer(scan, testData.collect().toSeq)
@@ -202,7 +202,7 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
cached.count()
// Make sure, the DataFrame is indeed cached.
- assert(spark.cacheManager.lookupCachedData(cached).nonEmpty)
+ assert(spark.sharedState.cacheManager.lookupCachedData(cached).nonEmpty)
// Check result.
checkAnswer(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 12940c86fe..7e9160febd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -71,21 +71,22 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
df: DataFrame,
expectedNumOfJobs: Int,
expectedMetrics: Map[Long, (String, Map[String, Any])]): Unit = {
- val previousExecutionIds = spark.listener.executionIdToData.keySet
+ val previousExecutionIds = spark.sharedState.listener.executionIdToData.keySet
withSQLConf("spark.sql.codegen.wholeStage" -> "false") {
df.collect()
}
sparkContext.listenerBus.waitUntilEmpty(10000)
- val executionIds = spark.listener.executionIdToData.keySet.diff(previousExecutionIds)
+ val executionIds =
+ spark.sharedState.listener.executionIdToData.keySet.diff(previousExecutionIds)
assert(executionIds.size === 1)
val executionId = executionIds.head
- val jobs = spark.listener.getExecution(executionId).get.jobs
+ val jobs = spark.sharedState.listener.getExecution(executionId).get.jobs
// Use "<=" because there is a race condition that we may miss some jobs
// TODO Change it to "=" once we fix the race condition that missing the JobStarted event.
assert(jobs.size <= expectedNumOfJobs)
if (jobs.size == expectedNumOfJobs) {
// If we can track all jobs, check the metric values
- val metricValues = spark.listener.getExecutionMetrics(executionId)
+ val metricValues = spark.sharedState.listener.getExecutionMetrics(executionId)
val actualMetrics = SparkPlanGraph(SparkPlanInfo.fromSparkPlan(
df.queryExecution.executedPlan)).allNodes.filter { node =>
expectedMetrics.contains(node.id)
@@ -283,19 +284,20 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
test("save metrics") {
withTempPath { file =>
- val previousExecutionIds = spark.listener.executionIdToData.keySet
+ val previousExecutionIds = spark.sharedState.listener.executionIdToData.keySet
// Assume the execution plan is
// PhysicalRDD(nodeId = 0)
person.select('name).write.format("json").save(file.getAbsolutePath)
sparkContext.listenerBus.waitUntilEmpty(10000)
- val executionIds = spark.listener.executionIdToData.keySet.diff(previousExecutionIds)
+ val executionIds =
+ spark.sharedState.listener.executionIdToData.keySet.diff(previousExecutionIds)
assert(executionIds.size === 1)
val executionId = executionIds.head
- val jobs = spark.listener.getExecution(executionId).get.jobs
+ val jobs = spark.sharedState.listener.getExecution(executionId).get.jobs
// Use "<=" because there is a race condition that we may miss some jobs
// TODO Change "<=" to "=" once we fix the race condition that missing the JobStarted event.
assert(jobs.size <= 1)
- val metricValues = spark.listener.getExecutionMetrics(executionId)
+ val metricValues = spark.sharedState.listener.getExecutionMetrics(executionId)
// Because "save" will create a new DataFrame internally, we cannot get the real metric id.
// However, we still can check the value.
assert(metricValues.values.toSeq.exists(_ === "2"))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
index 2374ffaaa5..cf7e976acc 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLListenerSuite.scala
@@ -340,16 +340,16 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
}
test("SPARK-11126: no memory leak when running non SQL jobs") {
- val previousStageNumber = spark.listener.stageIdToStageMetrics.size
+ val previousStageNumber = spark.sharedState.listener.stageIdToStageMetrics.size
spark.sparkContext.parallelize(1 to 10).foreach(i => ())
spark.sparkContext.listenerBus.waitUntilEmpty(10000)
// listener should ignore the non SQL stage
- assert(spark.listener.stageIdToStageMetrics.size == previousStageNumber)
+ assert(spark.sharedState.listener.stageIdToStageMetrics.size == previousStageNumber)
spark.sparkContext.parallelize(1 to 10).toDF().foreach(i => ())
spark.sparkContext.listenerBus.waitUntilEmpty(10000)
// listener should save the SQL stage
- assert(spark.listener.stageIdToStageMetrics.size == previousStageNumber + 1)
+ assert(spark.sharedState.listener.stageIdToStageMetrics.size == previousStageNumber + 1)
}
test("SPARK-13055: history listener only tracks SQL metrics") {
@@ -418,12 +418,12 @@ class SQLListenerMemoryLeakSuite extends SparkFunSuite {
}
}
sc.listenerBus.waitUntilEmpty(10000)
- assert(spark.listener.getCompletedExecutions.size <= 50)
- assert(spark.listener.getFailedExecutions.size <= 50)
+ assert(spark.sharedState.listener.getCompletedExecutions.size <= 50)
+ assert(spark.sharedState.listener.getFailedExecutions.size <= 50)
// 50 for successful executions and 50 for failed executions
- assert(spark.listener.executionIdToData.size <= 100)
- assert(spark.listener.jobIdToExecutionId.size <= 100)
- assert(spark.listener.stageIdToStageMetrics.size <= 100)
+ assert(spark.sharedState.listener.executionIdToData.size <= 100)
+ assert(spark.sharedState.listener.jobIdToExecutionId.size <= 100)
+ assert(spark.sharedState.listener.stageIdToStageMetrics.size <= 100)
} finally {
sc.stop()
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 9c9abfeb2a..abb7918ae6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -639,7 +639,7 @@ class JDBCSuite extends SparkFunSuite
test("test credentials in the properties are not in plan output") {
val df = sql("SELECT * FROM parts")
val explain = ExplainCommand(df.queryExecution.logical, extended = true)
- spark.executePlan(explain).executedPlan.executeCollect().foreach {
+ spark.sessionState.executePlan(explain).executedPlan.executeCollect().foreach {
r => assert(!List("testPass", "testUser").exists(r.toString.contains))
}
// test the JdbcRelation toString output
@@ -651,7 +651,7 @@ class JDBCSuite extends SparkFunSuite
test("test credentials in the connection url are not in the plan output") {
val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", new Properties)
val explain = ExplainCommand(df.queryExecution.logical, extended = true)
- spark.executePlan(explain).executedPlan.executeCollect().foreach {
+ spark.sessionState.executePlan(explain).executedPlan.executeCollect().foreach {
r => assert(!List("testPass", "testUser").exists(r.toString.contains))
}
}
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
index c24e474d9c..0d5dc7af5f 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala
@@ -59,7 +59,7 @@ private[hive] class SparkSQLDriver(val context: SQLContext = SparkSQLEnv.sqlCont
// TODO unify the error code
try {
context.sparkContext.setJobDescription(command)
- val execution = context.executePlan(context.sql(command).logicalPlan)
+ val execution = context.sessionState.executePlan(context.sql(command).logicalPlan)
hiveResponse = execution.hiveResultString()
tableSchema = getResultSetSchema(execution)
new CommandProcessorResponse(0)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index b377a20e39..ea721e4d9b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -177,8 +177,11 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
alias match {
// because hive use things like `_c0` to build the expanded text
// currently we cannot support view from "create view v1(c1) as ..."
- case None => SubqueryAlias(table.identifier.table, sparkSession.parseSql(viewText))
- case Some(aliasText) => SubqueryAlias(aliasText, sparkSession.parseSql(viewText))
+ case None =>
+ SubqueryAlias(table.identifier.table,
+ sparkSession.sessionState.sqlParser.parsePlan(viewText))
+ case Some(aliasText) =>
+ SubqueryAlias(aliasText, sessionState.sqlParser.parsePlan(viewText))
}
} else {
MetastoreRelation(
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelectCommand.scala
index 3fc900961e..cfe6149095 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelectCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelectCommand.scala
@@ -86,7 +86,7 @@ case class CreateTableAsSelectCommand(
throw new AnalysisException(s"$tableIdentifier already exists.")
}
} else {
- sparkSession.executePlan(InsertIntoTable(
+ sparkSession.sessionState.executePlan(InsertIntoTable(
metastoreRelation, Map(), query, overwrite = true, ifNotExists = false)).toRdd
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 3805674d39..9e8ff9317c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -296,7 +296,7 @@ case class InsertIntoHiveTable(
}
// Invalidate the cache.
- sqlContext.cacheManager.invalidateCache(table)
+ sqlContext.sharedState.cacheManager.invalidateCache(table)
// It would be nice to just return the childRdd unchanged so insert operations could be chained,
// however for now we return an empty list to simplify compatibility checks with hive, which
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala
index 7c74a0308d..dc8f374eb1 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ErrorPositionSuite.scala
@@ -130,7 +130,7 @@ class ErrorPositionSuite extends QueryTest with TestHiveSingleton with BeforeAnd
* @param token a unique token in the string that should be indicated by the exception
*/
def positionTest(name: String, query: String, token: String): Unit = {
- def ast = hiveContext.parseSql(query)
+ def ast = hiveContext.sessionState.sqlParser.parsePlan(query)
def parseTree = Try(quietly(ast.treeString)).getOrElse("<failed to parse>")
test(name) {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
index dedc8f55f0..f789d88d5d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ShowCreateTableSuite.scala
@@ -279,13 +279,13 @@ class ShowCreateTableSuite extends QueryTest with SQLTestUtils with TestHiveSing
private def checkCreateTableOrView(table: TableIdentifier, checkType: String): Unit = {
val db = table.database.getOrElse("default")
- val expected = spark.externalCatalog.getTable(db, table.table)
+ val expected = spark.sharedState.externalCatalog.getTable(db, table.table)
val shownDDL = sql(s"SHOW CREATE TABLE ${table.quotedString}").head().getString(0)
sql(s"DROP $checkType ${table.quotedString}")
try {
sql(shownDDL)
- val actual = spark.externalCatalog.getTable(db, table.table)
+ val actual = spark.sharedState.externalCatalog.getTable(db, table.table)
checkCatalogTables(expected, actual)
} finally {
sql(s"DROP $checkType IF EXISTS ${table.table}")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index f8e00a35a3..73b1a7850d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -34,7 +34,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton with SQLTestUtils
test("parse analyze commands") {
def assertAnalyzeCommand(analyzeCommand: String, c: Class[_]) {
- val parsed = hiveContext.parseSql(analyzeCommand)
+ val parsed = hiveContext.sessionState.sqlParser.parsePlan(analyzeCommand)
val operators = parsed.collect {
case a: AnalyzeTableCommand => a
case o => o
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ConcurrentHiveSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ConcurrentHiveSuite.scala
index f5cd73d45e..1583a448ef 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ConcurrentHiveSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ConcurrentHiveSuite.scala
@@ -30,9 +30,9 @@ class ConcurrentHiveSuite extends SparkFunSuite with BeforeAndAfterAll {
conf.set("spark.ui.enabled", "false")
val ts =
new TestHiveContext(new SparkContext("local", s"TestSQLContext$i", conf))
- ts.executeSql("SHOW TABLES").toRdd.collect()
- ts.executeSql("SELECT * FROM src").toRdd.collect()
- ts.executeSql("SHOW TABLES").toRdd.collect()
+ ts.sessionState.executeSql("SHOW TABLES").toRdd.collect()
+ ts.sessionState.executeSql("SELECT * FROM src").toRdd.collect()
+ ts.sessionState.executeSql("SHOW TABLES").toRdd.collect()
}
}
}