aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2016-03-15 10:12:32 -0700
committerReynold Xin <rxin@databricks.com>2016-03-15 10:12:32 -0700
commit5e6f2f45639f727cba43967858eb95b865ed81fa (patch)
tree07d6c86cbe877110adb5f88500e43fb64218519a /sql
parent48978abfa4d8f2cf79a4b053cc8bc7254cc2d61b (diff)
downloadspark-5e6f2f45639f727cba43967858eb95b865ed81fa.tar.gz
spark-5e6f2f45639f727cba43967858eb95b865ed81fa.tar.bz2
spark-5e6f2f45639f727cba43967858eb95b865ed81fa.zip
[SPARK-13893][SQL] Remove SQLContext.catalog/analyzer (internal method)
## What changes were proposed in this pull request? Our internal code can go through SessionState.catalog and SessionState.analyzer. This brings two small benefits: 1. Reduces internal dependency on SQLContext. 2. Removes 2 public methods in Java (Java does not obey package private visibility). More importantly, according to the design in SPARK-13485, we'd need to claim this catalog function for the user-facing public functions, rather than having an internal field. ## How was this patch tested? Existing unit/integration test code. Author: Reynold Xin <rxin@databricks.com> Closes #11716 from rxin/SPARK-13893.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala22
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala4
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala3
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala10
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala2
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala19
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala8
-rw-r--r--sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala24
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala18
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala2
27 files changed, 105 insertions, 99 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala
index f7be5f6b37..33588ef72f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala
@@ -155,7 +155,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) {
* @since 1.3.1
*/
def fill(value: Double, cols: Seq[String]): DataFrame = {
- val columnEquals = df.sqlContext.analyzer.resolver
+ val columnEquals = df.sqlContext.sessionState.analyzer.resolver
val projections = df.schema.fields.map { f =>
// Only fill if the column is part of the cols list.
if (f.dataType.isInstanceOf[NumericType] && cols.exists(col => columnEquals(f.name, col))) {
@@ -182,7 +182,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) {
* @since 1.3.1
*/
def fill(value: String, cols: Seq[String]): DataFrame = {
- val columnEquals = df.sqlContext.analyzer.resolver
+ val columnEquals = df.sqlContext.sessionState.analyzer.resolver
val projections = df.schema.fields.map { f =>
// Only fill if the column is part of the cols list.
if (f.dataType.isInstanceOf[StringType] && cols.exists(col => columnEquals(f.name, col))) {
@@ -353,7 +353,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) {
case _: String => StringType
}
- val columnEquals = df.sqlContext.analyzer.resolver
+ val columnEquals = df.sqlContext.sessionState.analyzer.resolver
val projections = df.schema.fields.map { f =>
val shouldReplace = cols.exists(colName => columnEquals(colName, f.name))
if (f.dataType.isInstanceOf[NumericType] && targetColumnType == DoubleType && shouldReplace) {
@@ -382,7 +382,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) {
}
}
- val columnEquals = df.sqlContext.analyzer.resolver
+ val columnEquals = df.sqlContext.sessionState.analyzer.resolver
val projections = df.schema.fields.map { f =>
values.find { case (k, _) => columnEquals(k, f.name) }.map { case (_, v) =>
v match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 76b8d71ac9..57c978bec8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -394,7 +394,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging {
*/
def table(tableName: String): DataFrame = {
Dataset.newDataFrame(sqlContext,
- sqlContext.catalog.lookupRelation(
+ sqlContext.sessionState.catalog.lookupRelation(
sqlContext.sessionState.sqlParser.parseTableIdentifier(tableName)))
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index de87f4d7c2..9951f0fabf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -323,7 +323,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
*/
private def normalize(columnName: String, columnType: String): String = {
val validColumnNames = df.logicalPlan.output.map(_.name)
- validColumnNames.find(df.sqlContext.analyzer.resolver(_, columnName))
+ validColumnNames.find(df.sqlContext.sessionState.analyzer.resolver(_, columnName))
.getOrElse(throw new AnalysisException(s"$columnType column $columnName not found in " +
s"existing columns (${validColumnNames.mkString(", ")})"))
}
@@ -358,7 +358,7 @@ final class DataFrameWriter private[sql](df: DataFrame) {
}
private def saveAsTable(tableIdent: TableIdentifier): Unit = {
- val tableExists = df.sqlContext.catalog.tableExists(tableIdent)
+ val tableExists = df.sqlContext.sessionState.catalog.tableExists(tableIdent)
(tableExists, mode) match {
case (true, SaveMode.Ignore) =>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index ef239a1e2f..f7ef0de21c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -166,15 +166,16 @@ class Dataset[T] private[sql](
private implicit def classTag = unresolvedTEncoder.clsTag
protected[sql] def resolve(colName: String): NamedExpression = {
- queryExecution.analyzed.resolveQuoted(colName, sqlContext.analyzer.resolver).getOrElse {
- throw new AnalysisException(
- s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""")
- }
+ queryExecution.analyzed.resolveQuoted(colName, sqlContext.sessionState.analyzer.resolver)
+ .getOrElse {
+ throw new AnalysisException(
+ s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""")
+ }
}
protected[sql] def numericColumns: Seq[Expression] = {
schema.fields.filter(_.dataType.isInstanceOf[NumericType]).map { n =>
- queryExecution.analyzed.resolveQuoted(n.name, sqlContext.analyzer.resolver).get
+ queryExecution.analyzed.resolveQuoted(n.name, sqlContext.sessionState.analyzer.resolver).get
}
}
@@ -1400,7 +1401,7 @@ class Dataset[T] private[sql](
* @since 1.3.0
*/
def withColumn(colName: String, col: Column): DataFrame = {
- val resolver = sqlContext.analyzer.resolver
+ val resolver = sqlContext.sessionState.analyzer.resolver
val output = queryExecution.analyzed.output
val shouldReplace = output.exists(f => resolver(f.name, colName))
if (shouldReplace) {
@@ -1421,7 +1422,7 @@ class Dataset[T] private[sql](
* Returns a new [[DataFrame]] by adding a column with metadata.
*/
private[spark] def withColumn(colName: String, col: Column, metadata: Metadata): DataFrame = {
- val resolver = sqlContext.analyzer.resolver
+ val resolver = sqlContext.sessionState.analyzer.resolver
val output = queryExecution.analyzed.output
val shouldReplace = output.exists(f => resolver(f.name, colName))
if (shouldReplace) {
@@ -1445,7 +1446,7 @@ class Dataset[T] private[sql](
* @since 1.3.0
*/
def withColumnRenamed(existingName: String, newName: String): DataFrame = {
- val resolver = sqlContext.analyzer.resolver
+ val resolver = sqlContext.sessionState.analyzer.resolver
val output = queryExecution.analyzed.output
val shouldRename = output.exists(f => resolver(f.name, existingName))
if (shouldRename) {
@@ -1480,7 +1481,7 @@ class Dataset[T] private[sql](
*/
@scala.annotation.varargs
def drop(colNames: String*): DataFrame = {
- val resolver = sqlContext.analyzer.resolver
+ val resolver = sqlContext.sessionState.analyzer.resolver
val remainingCols =
schema.filter(f => colNames.forall(n => !resolver(f.name, n))).map(f => Column(f.name))
if (remainingCols.size == this.schema.size) {
@@ -1501,7 +1502,8 @@ class Dataset[T] private[sql](
def drop(col: Column): DataFrame = {
val expression = col match {
case Column(u: UnresolvedAttribute) =>
- queryExecution.analyzed.resolveQuoted(u.name, sqlContext.analyzer.resolver).getOrElse(u)
+ queryExecution.analyzed.resolveQuoted(
+ u.name, sqlContext.sessionState.analyzer.resolver).getOrElse(u)
case Column(expr: Expression) => expr
}
val attrs = this.logicalPlan.output
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 0f5d1c8cab..177d78c4c0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -120,8 +120,6 @@ class SQLContext private[sql](
@transient
protected[sql] lazy val sessionState: SessionState = new SessionState(self)
protected[sql] def conf: SQLConf = sessionState.conf
- protected[sql] def catalog: Catalog = sessionState.catalog
- protected[sql] def analyzer: Analyzer = sessionState.analyzer
/**
* An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s
@@ -699,7 +697,8 @@ class SQLContext private[sql](
* only during the lifetime of this instance of SQLContext.
*/
private[sql] def registerDataFrameAsTable(df: DataFrame, tableName: String): Unit = {
- catalog.registerTable(sessionState.sqlParser.parseTableIdentifier(tableName), df.logicalPlan)
+ sessionState.catalog.registerTable(
+ sessionState.sqlParser.parseTableIdentifier(tableName), df.logicalPlan)
}
/**
@@ -712,7 +711,7 @@ class SQLContext private[sql](
*/
def dropTempTable(tableName: String): Unit = {
cacheManager.tryUncacheQuery(table(tableName))
- catalog.unregisterTable(TableIdentifier(tableName))
+ sessionState.catalog.unregisterTable(TableIdentifier(tableName))
}
/**
@@ -797,7 +796,7 @@ class SQLContext private[sql](
}
private def table(tableIdent: TableIdentifier): DataFrame = {
- Dataset.newDataFrame(this, catalog.lookupRelation(tableIdent))
+ Dataset.newDataFrame(this, sessionState.catalog.lookupRelation(tableIdent))
}
/**
@@ -839,7 +838,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
def tableNames(): Array[String] = {
- catalog.getTables(None).map {
+ sessionState.catalog.getTables(None).map {
case (tableName, _) => tableName
}.toArray
}
@@ -851,7 +850,7 @@ class SQLContext private[sql](
* @since 1.3.0
*/
def tableNames(databaseName: String): Array[String] = {
- catalog.getTables(Some(databaseName)).map {
+ sessionState.catalog.getTables(Some(databaseName)).map {
case (tableName, _) => tableName
}.toArray
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 5b4254f741..912b84abc1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -31,14 +31,14 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
*/
class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {
- def assertAnalyzed(): Unit = try sqlContext.analyzer.checkAnalysis(analyzed) catch {
+ def assertAnalyzed(): Unit = try sqlContext.sessionState.analyzer.checkAnalysis(analyzed) catch {
case e: AnalysisException =>
val ae = new AnalysisException(e.message, e.line, e.startPosition, Some(analyzed))
ae.setStackTrace(e.getStackTrace)
throw ae
}
- lazy val analyzed: LogicalPlan = sqlContext.analyzer.execute(logical)
+ lazy val analyzed: LogicalPlan = sqlContext.sessionState.analyzer.execute(logical)
lazy val withCachedData: LogicalPlan = {
assertAnalyzed()
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
index e711797c1b..44b07e4613 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/commands.scala
@@ -330,7 +330,7 @@ case class ShowTablesCommand(databaseName: Option[String]) extends RunnableComma
override def run(sqlContext: SQLContext): Seq[Row] = {
// Since we need to return a Seq of rows, we will call getTables directly
// instead of calling tables in sqlContext.
- val rows = sqlContext.catalog.getTables(databaseName).map {
+ val rows = sqlContext.sessionState.catalog.getTables(databaseName).map {
case (tableName, isTemporary) => Row(tableName, isTemporary)
}
@@ -417,7 +417,7 @@ case class DescribeFunction(
case class SetDatabaseCommand(databaseName: String) extends RunnableCommand {
override def run(sqlContext: SQLContext): Seq[Row] = {
- sqlContext.catalog.setCurrentDatabase(databaseName)
+ sqlContext.sessionState.catalog.setCurrentDatabase(databaseName)
Seq.empty[Row]
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
index ef95d5d289..84e98c0f9e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala
@@ -67,7 +67,8 @@ private[sql] object FileSourceStrategy extends Strategy with Logging {
val filterSet = ExpressionSet(filters)
val partitionColumns =
- AttributeSet(l.resolve(files.partitionSchema, files.sqlContext.analyzer.resolver))
+ AttributeSet(
+ l.resolve(files.partitionSchema, files.sqlContext.sessionState.analyzer.resolver))
val partitionKeyFilters =
ExpressionSet(filters.filter(_.references.subsetOf(partitionColumns)))
logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 04e51735c4..7ca0e8859a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -99,7 +99,7 @@ case class CreateTempTableUsing(
userSpecifiedSchema = userSpecifiedSchema,
className = provider,
options = options)
- sqlContext.catalog.registerTable(
+ sqlContext.sessionState.catalog.registerTable(
tableIdent,
Dataset.newDataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation())).logicalPlan)
@@ -124,7 +124,7 @@ case class CreateTempTableUsingAsSelect(
bucketSpec = None,
options = options)
val result = dataSource.write(mode, df)
- sqlContext.catalog.registerTable(
+ sqlContext.sessionState.catalog.registerTable(
tableIdent,
Dataset.newDataFrame(sqlContext, LogicalRelation(result)).logicalPlan)
@@ -137,11 +137,11 @@ case class RefreshTable(tableIdent: TableIdentifier)
override def run(sqlContext: SQLContext): Seq[Row] = {
// Refresh the given table's metadata first.
- sqlContext.catalog.refreshTable(tableIdent)
+ sqlContext.sessionState.catalog.refreshTable(tableIdent)
// If this table is cached as a InMemoryColumnarRelation, drop the original
// cached version and make the new version cached lazily.
- val logicalPlan = sqlContext.catalog.lookupRelation(tableIdent)
+ val logicalPlan = sqlContext.sessionState.catalog.lookupRelation(tableIdent)
// Use lookupCachedData directly since RefreshTable also takes databaseName.
val isCached = sqlContext.cacheManager.lookupCachedData(logicalPlan).nonEmpty
if (isCached) {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala
index 3d7c576965..2820e4fa23 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ListTablesSuite.scala
@@ -33,7 +33,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
}
after {
- sqlContext.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
+ sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
}
test("get all tables") {
@@ -45,7 +45,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
sql("SHOW tables").filter("tableName = 'ListTablesSuiteTable'"),
Row("ListTablesSuiteTable", true))
- sqlContext.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
+ sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0)
}
@@ -58,7 +58,7 @@ class ListTablesSuite extends QueryTest with BeforeAndAfter with SharedSQLContex
sql("show TABLES in DB").filter("tableName = 'ListTablesSuiteTable'"),
Row("ListTablesSuiteTable", true))
- sqlContext.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
+ sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
assert(sqlContext.tables().filter("tableName = 'ListTablesSuiteTable'").count() === 0)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index acfc1a518a..fb99b0c7e2 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -51,7 +51,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
sql("INSERT INTO TABLE t SELECT * FROM tmp")
checkAnswer(sqlContext.table("t"), (data ++ data).map(Row.fromTuple))
}
- sqlContext.catalog.unregisterTable(TableIdentifier("tmp"))
+ sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("tmp"))
}
test("overwriting") {
@@ -61,7 +61,7 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext
sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp")
checkAnswer(sqlContext.table("t"), data.map(Row.fromTuple))
}
- sqlContext.catalog.unregisterTable(TableIdentifier("tmp"))
+ sqlContext.sessionState.catalog.unregisterTable(TableIdentifier("tmp"))
}
test("self-join") {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index a78b7b0cc4..05fc569588 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -113,8 +113,6 @@ class HiveContext private[hive](
@transient
protected[sql] override lazy val sessionState = new HiveSessionState(self)
- protected[sql] override def catalog = sessionState.catalog
-
// The Hive UDF current_database() is foldable, will be evaluated by optimizer,
// but the optimizer can't access the SessionState of metadataHive.
sessionState.functionRegistry.registerFunction(
@@ -349,12 +347,12 @@ class HiveContext private[hive](
*/
def refreshTable(tableName: String): Unit = {
val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
- catalog.refreshTable(tableIdent)
+ sessionState.catalog.refreshTable(tableIdent)
}
protected[hive] def invalidateTable(tableName: String): Unit = {
val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
- catalog.invalidateTable(tableIdent)
+ sessionState.catalog.invalidateTable(tableIdent)
}
/**
@@ -368,7 +366,7 @@ class HiveContext private[hive](
*/
def analyze(tableName: String) {
val tableIdent = sessionState.sqlParser.parseTableIdentifier(tableName)
- val relation = EliminateSubqueryAliases(catalog.lookupRelation(tableIdent))
+ val relation = EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdent))
relation match {
case relation: MetastoreRelation =>
@@ -429,7 +427,7 @@ class HiveContext private[hive](
// recorded in the Hive metastore.
// This logic is based on org.apache.hadoop.hive.ql.exec.StatsTask.aggregateStats().
if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
- catalog.client.alterTable(
+ sessionState.catalog.client.alterTable(
relation.table.copy(
properties = relation.table.properties +
(StatsSetupConst.TOTAL_SIZE -> newTotalSize.toString)))
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index 44f579fbb7..91425d1435 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -69,17 +69,17 @@ case class CreateTableAsSelect(
withFormat
}
- hiveContext.catalog.client.createTable(withSchema, ignoreIfExists = false)
+ hiveContext.sessionState.catalog.client.createTable(withSchema, ignoreIfExists = false)
// Get the Metastore Relation
- hiveContext.catalog.lookupRelation(tableIdentifier, None) match {
+ hiveContext.sessionState.catalog.lookupRelation(tableIdentifier, None) match {
case r: MetastoreRelation => r
}
}
// TODO ideally, we should get the output data ready first and then
// add the relation into catalog, just in case of failure occurs while data
// processing.
- if (hiveContext.catalog.tableExists(tableIdentifier)) {
+ if (hiveContext.sessionState.catalog.tableExists(tableIdentifier)) {
if (allowExisting) {
// table already exists, will do nothing, to keep consistent with Hive
} else {
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
index 83d057f7e4..6c2b88eb8c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala
@@ -49,14 +49,14 @@ private[hive] case class CreateViewAsSelect(
override def run(sqlContext: SQLContext): Seq[Row] = {
val hiveContext = sqlContext.asInstanceOf[HiveContext]
- hiveContext.catalog.tableExists(tableIdentifier) match {
+ hiveContext.sessionState.catalog.tableExists(tableIdentifier) match {
case true if allowExisting =>
// Handles `CREATE VIEW IF NOT EXISTS v0 AS SELECT ...`. Does nothing when the target view
// already exists.
case true if orReplace =>
// Handles `CREATE OR REPLACE VIEW v0 AS SELECT ...`
- hiveContext.catalog.client.alertView(prepareTable(sqlContext))
+ hiveContext.sessionState.catalog.client.alertView(prepareTable(sqlContext))
case true =>
// Handles `CREATE VIEW v0 AS SELECT ...`. Throws exception when the target view already
@@ -66,7 +66,7 @@ private[hive] case class CreateViewAsSelect(
"CREATE OR REPLACE VIEW AS")
case false =>
- hiveContext.catalog.client.createView(prepareTable(sqlContext))
+ hiveContext.sessionState.catalog.client.createView(prepareTable(sqlContext))
}
Seq.empty[Row]
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index b3d38dfdb4..4ffd868242 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -45,7 +45,7 @@ case class InsertIntoHiveTable(
@transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext]
@transient private lazy val hiveContext = new Context(sc.hiveconf)
- @transient private lazy val catalog = sc.catalog
+ @transient private lazy val catalog = sc.sessionState.catalog
def output: Seq[Attribute] = Seq.empty
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index c4723fcb82..ff66573620 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -71,7 +71,7 @@ case class DropTable(
}
hiveContext.invalidateTable(tableName)
hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
- hiveContext.catalog.unregisterTable(TableIdentifier(tableName))
+ hiveContext.sessionState.catalog.unregisterTable(TableIdentifier(tableName))
Seq.empty[Row]
}
}
@@ -130,7 +130,7 @@ case class CreateMetastoreDataSource(
val tableName = tableIdent.unquotedString
val hiveContext = sqlContext.asInstanceOf[HiveContext]
- if (hiveContext.catalog.tableExists(tableIdent)) {
+ if (hiveContext.sessionState.catalog.tableExists(tableIdent)) {
if (allowExisting) {
return Seq.empty[Row]
} else {
@@ -142,7 +142,7 @@ case class CreateMetastoreDataSource(
val optionsWithPath =
if (!options.contains("path") && managedIfNoPath) {
isExternal = false
- options + ("path" -> hiveContext.catalog.hiveDefaultTableFilePath(tableIdent))
+ options + ("path" -> hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdent))
} else {
options
}
@@ -155,7 +155,7 @@ case class CreateMetastoreDataSource(
bucketSpec = None,
options = optionsWithPath).resolveRelation()
- hiveContext.catalog.createDataSourceTable(
+ hiveContext.sessionState.catalog.createDataSourceTable(
tableIdent,
userSpecifiedSchema,
Array.empty[String],
@@ -200,13 +200,13 @@ case class CreateMetastoreDataSourceAsSelect(
val optionsWithPath =
if (!options.contains("path")) {
isExternal = false
- options + ("path" -> hiveContext.catalog.hiveDefaultTableFilePath(tableIdent))
+ options + ("path" -> hiveContext.sessionState.catalog.hiveDefaultTableFilePath(tableIdent))
} else {
options
}
var existingSchema = None: Option[StructType]
- if (sqlContext.catalog.tableExists(tableIdent)) {
+ if (sqlContext.sessionState.catalog.tableExists(tableIdent)) {
// Check if we need to throw an exception or just return.
mode match {
case SaveMode.ErrorIfExists =>
@@ -230,7 +230,8 @@ case class CreateMetastoreDataSourceAsSelect(
// TODO: Check that options from the resolved relation match the relation that we are
// inserting into (i.e. using the same compression).
- EliminateSubqueryAliases(sqlContext.catalog.lookupRelation(tableIdent)) match {
+ EliminateSubqueryAliases(
+ sqlContext.sessionState.catalog.lookupRelation(tableIdent)) match {
case l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation, _, _) =>
existingSchema = Some(l.schema)
case o =>
@@ -267,7 +268,7 @@ case class CreateMetastoreDataSourceAsSelect(
// We will use the schema of resolved.relation as the schema of the table (instead of
// the schema of df). It is important since the nullability may be changed by the relation
// provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
- hiveContext.catalog.createDataSourceTable(
+ hiveContext.sessionState.catalog.createDataSourceTable(
tableIdent,
Some(result.schema),
partitionColumns,
@@ -278,7 +279,7 @@ case class CreateMetastoreDataSourceAsSelect(
}
// Refresh the cache of the table in the catalog.
- hiveContext.catalog.refreshTable(tableIdent)
+ hiveContext.sessionState.catalog.refreshTable(tableIdent)
Seq.empty[Row]
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 5887f69e13..19c05f9cb0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -205,7 +205,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
logDebug(s"Query references test tables: ${referencedTestTables.mkString(", ")}")
referencedTestTables.foreach(loadTestTable)
// Proceed with analysis.
- analyzer.execute(logical)
+ sessionState.analyzer.execute(logical)
}
}
@@ -427,9 +427,9 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
cacheManager.clearCache()
loadedTables.clear()
- catalog.cachedDataSourceTables.invalidateAll()
- catalog.client.reset()
- catalog.unregisterAllTables()
+ sessionState.catalog.cachedDataSourceTables.invalidateAll()
+ sessionState.catalog.client.reset()
+ sessionState.catalog.unregisterAllTables()
FunctionRegistry.getFunctionNames.asScala.filterNot(originalUDFs.contains(_)).
foreach { udfName => FunctionRegistry.unregisterTemporaryUDF(udfName) }
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
index e9356541c2..bd14a243ea 100644
--- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
@@ -70,7 +70,7 @@ public class JavaMetastoreDataSourcesSuite {
if (path.exists()) {
path.delete();
}
- hiveManagedPath = new Path(sqlContext.catalog().hiveDefaultTableFilePath(
+ hiveManagedPath = new Path(sqlContext.sessionState().catalog().hiveDefaultTableFilePath(
new TableIdentifier("javaSavedTable")));
fs = hiveManagedPath.getFileSystem(sc.hadoopConfiguration());
if (fs.exists(hiveManagedPath)){
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
index 90d65d9e9b..ce7b08ab72 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveMetastoreCatalogSuite.scala
@@ -83,7 +83,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
.saveAsTable("t")
}
- val hiveTable = catalog.client.getTable("default", "t")
+ val hiveTable = sessionState.catalog.client.getTable("default", "t")
assert(hiveTable.storage.inputFormat === Some(inputFormat))
assert(hiveTable.storage.outputFormat === Some(outputFormat))
assert(hiveTable.storage.serde === Some(serde))
@@ -114,7 +114,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
.saveAsTable("t")
}
- val hiveTable = catalog.client.getTable("default", "t")
+ val hiveTable = sessionState.catalog.client.getTable("default", "t")
assert(hiveTable.storage.inputFormat === Some(inputFormat))
assert(hiveTable.storage.outputFormat === Some(outputFormat))
assert(hiveTable.storage.serde === Some(serde))
@@ -144,7 +144,7 @@ class DataSourceWithHiveMetastoreCatalogSuite
|AS SELECT 1 AS d1, "val_1" AS d2
""".stripMargin)
- val hiveTable = catalog.client.getTable("default", "t")
+ val hiveTable = sessionState.catalog.client.getTable("default", "t")
assert(hiveTable.storage.inputFormat === Some(inputFormat))
assert(hiveTable.storage.outputFormat === Some(outputFormat))
assert(hiveTable.storage.serde === Some(serde))
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
index a94f7053c3..0a31ac64a2 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ListTablesSuite.scala
@@ -32,14 +32,14 @@ class ListTablesSuite extends QueryTest with TestHiveSingleton with BeforeAndAft
override def beforeAll(): Unit = {
// The catalog in HiveContext is a case insensitive one.
- catalog.registerTable(TableIdentifier("ListTablesSuiteTable"), df.logicalPlan)
+ sessionState.catalog.registerTable(TableIdentifier("ListTablesSuiteTable"), df.logicalPlan)
sql("CREATE TABLE HiveListTablesSuiteTable (key int, value string)")
sql("CREATE DATABASE IF NOT EXISTS ListTablesSuiteDB")
sql("CREATE TABLE ListTablesSuiteDB.HiveInDBListTablesSuiteTable (key int, value string)")
}
override def afterAll(): Unit = {
- catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
+ sessionState.catalog.unregisterTable(TableIdentifier("ListTablesSuiteTable"))
sql("DROP TABLE IF EXISTS HiveListTablesSuiteTable")
sql("DROP TABLE IF EXISTS ListTablesSuiteDB.HiveInDBListTablesSuiteTable")
sql("DROP DATABASE IF EXISTS ListTablesSuiteDB")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index aaebad79f6..d7974f1ee3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -369,7 +369,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
|)
""".stripMargin)
- val expectedPath = catalog.hiveDefaultTableFilePath(TableIdentifier("ctasJsonTable"))
+ val expectedPath =
+ sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier("ctasJsonTable"))
val filesystemPath = new Path(expectedPath)
val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration)
if (fs.exists(filesystemPath)) fs.delete(filesystemPath, true)
@@ -460,7 +461,8 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
// Drop table will also delete the data.
sql("DROP TABLE savedJsonTable")
intercept[AnalysisException] {
- read.json(catalog.hiveDefaultTableFilePath(TableIdentifier("savedJsonTable")))
+ read.json(
+ sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier("savedJsonTable")))
}
}
@@ -695,7 +697,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
val schema = StructType((1 to 5000).map(i => StructField(s"c_$i", StringType, true)))
// Manually create a metastore data source table.
- catalog.createDataSourceTable(
+ sessionState.catalog.createDataSourceTable(
tableIdent = TableIdentifier("wide_schema"),
userSpecifiedSchema = Some(schema),
partitionColumns = Array.empty[String],
@@ -727,14 +729,14 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
outputFormat = None,
serde = None,
serdeProperties = Map(
- "path" -> catalog.hiveDefaultTableFilePath(TableIdentifier(tableName)))
+ "path" -> sessionState.catalog.hiveDefaultTableFilePath(TableIdentifier(tableName)))
),
properties = Map(
"spark.sql.sources.provider" -> "json",
"spark.sql.sources.schema" -> schema.json,
"EXTERNAL" -> "FALSE"))
- catalog.client.createTable(hiveTable, ignoreIfExists = false)
+ sessionState.catalog.client.createTable(hiveTable, ignoreIfExists = false)
invalidateTable(tableName)
val actualSchema = table(tableName).schema
@@ -749,7 +751,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
withTable(tableName) {
df.write.format("parquet").partitionBy("d", "b").saveAsTable(tableName)
invalidateTable(tableName)
- val metastoreTable = catalog.client.getTable("default", tableName)
+ val metastoreTable = sessionState.catalog.client.getTable("default", tableName)
val expectedPartitionColumns = StructType(df.schema("d") :: df.schema("b") :: Nil)
val numPartCols = metastoreTable.properties("spark.sql.sources.schema.numPartCols").toInt
@@ -784,7 +786,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
.sortBy("c")
.saveAsTable(tableName)
invalidateTable(tableName)
- val metastoreTable = catalog.client.getTable("default", tableName)
+ val metastoreTable = sessionState.catalog.client.getTable("default", tableName)
val expectedBucketByColumns = StructType(df.schema("d") :: df.schema("b") :: Nil)
val expectedSortByColumns = StructType(df.schema("c") :: Nil)
@@ -901,7 +903,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
test("skip hive metadata on table creation") {
val schema = StructType((1 to 5).map(i => StructField(s"c_$i", StringType)))
- catalog.createDataSourceTable(
+ sessionState.catalog.createDataSourceTable(
tableIdent = TableIdentifier("not_skip_hive_metadata"),
userSpecifiedSchema = Some(schema),
partitionColumns = Array.empty[String],
@@ -912,10 +914,10 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
// As a proxy for verifying that the table was stored in Hive compatible format, we verify that
// each column of the table is of native type StringType.
- assert(catalog.client.getTable("default", "not_skip_hive_metadata").schema
+ assert(sessionState.catalog.client.getTable("default", "not_skip_hive_metadata").schema
.forall(column => HiveMetastoreTypes.toDataType(column.dataType) == StringType))
- catalog.createDataSourceTable(
+ sessionState.catalog.createDataSourceTable(
tableIdent = TableIdentifier("skip_hive_metadata"),
userSpecifiedSchema = Some(schema),
partitionColumns = Array.empty[String],
@@ -926,7 +928,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
// As a proxy for verifying that the table was stored in SparkSQL format, we verify that
// the table has a column type as array of StringType.
- assert(catalog.client.getTable("default", "skip_hive_metadata").schema
+ assert(sessionState.catalog.client.getTable("default", "skip_hive_metadata").schema
.forall(column => HiveMetastoreTypes.toDataType(column.dataType) == ArrayType(StringType)))
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
index 488f298981..e2effef0b9 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
@@ -25,8 +25,9 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle
private lazy val df = sqlContext.range(10).coalesce(1)
private def checkTablePath(dbName: String, tableName: String): Unit = {
- val metastoreTable = hiveContext.catalog.client.getTable(dbName, tableName)
- val expectedPath = hiveContext.catalog.client.getDatabase(dbName).locationUri + "/" + tableName
+ val metastoreTable = hiveContext.sessionState.catalog.client.getTable(dbName, tableName)
+ val expectedPath =
+ hiveContext.sessionState.catalog.client.getDatabase(dbName).locationUri + "/" + tableName
assert(metastoreTable.storage.serdeProperties("path") === expectedPath)
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 3952e716d3..1d8c293d43 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -73,7 +73,8 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
test("analyze MetastoreRelations") {
def queryTotalSize(tableName: String): BigInt =
- hiveContext.catalog.lookupRelation(TableIdentifier(tableName)).statistics.sizeInBytes
+ hiveContext.sessionState.catalog.lookupRelation(
+ TableIdentifier(tableName)).statistics.sizeInBytes
// Non-partitioned table
sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect()
@@ -120,7 +121,7 @@ class StatisticsSuite extends QueryTest with TestHiveSingleton {
intercept[UnsupportedOperationException] {
hiveContext.analyze("tempTable")
}
- hiveContext.catalog.unregisterTable(TableIdentifier("tempTable"))
+ hiveContext.sessionState.catalog.unregisterTable(TableIdentifier("tempTable"))
}
test("estimates the size of a test MetastoreRelation") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index b42f00e90f..21dfb82876 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -292,7 +292,8 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
test("CTAS without serde") {
def checkRelation(tableName: String, isDataSourceParquet: Boolean): Unit = {
- val relation = EliminateSubqueryAliases(catalog.lookupRelation(TableIdentifier(tableName)))
+ val relation = EliminateSubqueryAliases(
+ sessionState.catalog.lookupRelation(TableIdentifier(tableName)))
relation match {
case LogicalRelation(r: HadoopFsRelation, _, _) =>
if (!isDataSourceParquet) {
@@ -720,7 +721,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils with TestHiveSingleton {
(1 to 100).par.map { i =>
val tableName = s"SPARK_6618_table_$i"
sql(s"CREATE TABLE $tableName (col1 string)")
- catalog.lookupRelation(TableIdentifier(tableName))
+ sessionState.catalog.lookupRelation(TableIdentifier(tableName))
table(tableName)
tables()
sql(s"DROP TABLE $tableName")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
index 8cfb32f00a..57c4ad4248 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/OrcQuerySuite.scala
@@ -222,7 +222,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
sql("INSERT INTO TABLE t SELECT * FROM tmp")
checkAnswer(table("t"), (data ++ data).map(Row.fromTuple))
}
- catalog.unregisterTable(TableIdentifier("tmp"))
+ sessionState.catalog.unregisterTable(TableIdentifier("tmp"))
}
test("overwriting") {
@@ -232,7 +232,7 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
sql("INSERT OVERWRITE TABLE t SELECT * FROM tmp")
checkAnswer(table("t"), data.map(Row.fromTuple))
}
- catalog.unregisterTable(TableIdentifier("tmp"))
+ sessionState.catalog.unregisterTable(TableIdentifier("tmp"))
}
test("self-join") {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 8fdbbd94c8..bb53179c3c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -425,10 +425,10 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
}
test("Caching converted data source Parquet Relations") {
- val _catalog = catalog
+ val _catalog = sessionState.catalog
def checkCached(tableIdentifier: _catalog.QualifiedTableName): Unit = {
// Converted test_parquet should be cached.
- catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) match {
+ sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => fail("Converted test_parquet should be cached in the cache.")
case logical @ LogicalRelation(parquetRelation: HadoopFsRelation, _, _) => // OK
case other =>
@@ -456,14 +456,14 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
var tableIdentifier = _catalog.QualifiedTableName("default", "test_insert_parquet")
// First, make sure the converted test_parquet is not cached.
- assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
+ assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
// Table lookup will make the table cached.
table("test_insert_parquet")
checkCached(tableIdentifier)
// For insert into non-partitioned table, we will do the conversion,
// so the converted test_insert_parquet should be cached.
invalidateTable("test_insert_parquet")
- assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
+ assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
sql(
"""
|INSERT INTO TABLE test_insert_parquet
@@ -476,7 +476,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
sql("select a, b from jt").collect())
// Invalidate the cache.
invalidateTable("test_insert_parquet")
- assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
+ assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
// Create a partitioned table.
sql(
@@ -494,7 +494,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
""".stripMargin)
tableIdentifier = _catalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test")
- assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
+ assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
sql(
"""
|INSERT INTO TABLE test_parquet_partitioned_cache_test
@@ -503,14 +503,14 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
""".stripMargin)
// Right now, insert into a partitioned Parquet is not supported in data source Parquet.
// So, we expect it is not cached.
- assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
+ assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
sql(
"""
|INSERT INTO TABLE test_parquet_partitioned_cache_test
|PARTITION (`date`='2015-04-02')
|select a, b from jt
""".stripMargin)
- assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
+ assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
// Make sure we can cache the partitioned table.
table("test_parquet_partitioned_cache_test")
@@ -526,7 +526,7 @@ class ParquetMetastoreSuite extends ParquetPartitioningTest {
""".stripMargin).collect())
invalidateTable("test_parquet_partitioned_cache_test")
- assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
+ assert(sessionState.catalog.cachedDataSourceTables.getIfPresent(tableIdentifier) === null)
dropTables("test_insert_parquet", "test_parquet_partitioned_cache_test")
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
index 33c1bb059e..a3e7737a7c 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/BucketedWriteSuite.scala
@@ -70,7 +70,7 @@ class BucketedWriteSuite extends QueryTest with SQLTestUtils with TestHiveSingle
def tableDir: File = {
val identifier = hiveContext.sessionState.sqlParser.parseTableIdentifier("bucketed_table")
- new File(URI.create(hiveContext.catalog.hiveDefaultTableFilePath(identifier)))
+ new File(URI.create(hiveContext.sessionState.catalog.hiveDefaultTableFilePath(identifier)))
}
/**