aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2016-04-25 20:54:31 -0700
committerReynold Xin <rxin@databricks.com>2016-04-25 20:54:31 -0700
commit18c2c92580bdc27aa5129d9e7abda418a3633ea6 (patch)
tree5d4222dd42ea584ad8259fbe7e22f1eb9b5bdb3c /sql/hive
parentfa3c06987e6148975dd54b629bd9094224358175 (diff)
downloadspark-18c2c92580bdc27aa5129d9e7abda418a3633ea6.tar.gz
spark-18c2c92580bdc27aa5129d9e7abda418a3633ea6.tar.bz2
spark-18c2c92580bdc27aa5129d9e7abda418a3633ea6.zip
[SPARK-14861][SQL] Replace internal usages of SQLContext with SparkSession
## What changes were proposed in this pull request? In Spark 2.0, `SparkSession` is the new thing. Internally we should stop using `SQLContext` everywhere since that's supposed to be not the main user-facing API anymore. In this patch I took care to not break any public APIs. The one place that's suspect is `o.a.s.ml.source.libsvm.DefaultSource`, but according to mengxr it's not supposed to be public so it's OK to change the underlying `FileFormat` trait. **Reviewers**: This is a big patch that may be difficult to review but the changes are actually really straightforward. If you prefer I can break it up into a few smaller patches, but it will delay the progress of this issue a little. ## How was this patch tested? No change in functionality intended. Author: Andrew Or <andrew@databricks.com> Closes #12625 from andrewor14/spark-session-refactor.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala47
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala6
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala19
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala4
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala12
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala18
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala12
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala8
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala29
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala12
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/catalyst/SQLBuilderTest.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala2
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala12
15 files changed, 102 insertions, 91 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index edb87b94ea..01b7cfbd2e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -24,7 +24,7 @@ import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, SaveMode, SQLContext}
+import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession, SQLContext}
import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions._
@@ -45,16 +45,15 @@ import org.apache.spark.sql.types._
* This is still used for things like creating data source tables, but in the future will be
* cleaned up to integrate more nicely with [[HiveExternalCatalog]].
*/
-private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging {
- private val conf = hive.conf
- private val sessionState = hive.sessionState.asInstanceOf[HiveSessionState]
- private val client = hive.sharedState.asInstanceOf[HiveSharedState].metadataHive
- private val hiveconf = sessionState.hiveconf
+private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Logging {
+ private val conf = sparkSession.conf
+ private val sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState]
+ private val client = sparkSession.sharedState.asInstanceOf[HiveSharedState].metadataHive
/** A fully qualified identifier for a table (i.e., database.tableName) */
case class QualifiedTableName(database: String, name: String)
- private def getCurrentDatabase: String = hive.sessionState.catalog.getCurrentDatabase
+ private def getCurrentDatabase: String = sessionState.catalog.getCurrentDatabase
def getQualifiedTableName(tableIdent: TableIdentifier): QualifiedTableName = {
QualifiedTableName(
@@ -124,7 +123,7 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging {
val options = table.storage.serdeProperties
val dataSource =
DataSource(
- hive,
+ sparkSession,
userSpecifiedSchema = userSpecifiedSchema,
partitionColumns = partitionColumns,
bucketSpec = bucketSpec,
@@ -179,12 +178,12 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging {
alias match {
// because hive use things like `_c0` to build the expanded text
// currently we cannot support view from "create view v1(c1) as ..."
- case None => SubqueryAlias(table.identifier.table, hive.parseSql(viewText))
- case Some(aliasText) => SubqueryAlias(aliasText, hive.parseSql(viewText))
+ case None => SubqueryAlias(table.identifier.table, sparkSession.parseSql(viewText))
+ case Some(aliasText) => SubqueryAlias(aliasText, sparkSession.parseSql(viewText))
}
} else {
MetastoreRelation(
- qualifiedTableName.database, qualifiedTableName.name, alias)(table, client, hive)
+ qualifiedTableName.database, qualifiedTableName.name, alias)(table, client, sparkSession)
}
}
@@ -275,19 +274,20 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging {
val hadoopFsRelation = cached.getOrElse {
val paths = new Path(metastoreRelation.catalogTable.storage.locationUri.get) :: Nil
- val fileCatalog = new MetaStoreFileCatalog(hive, paths, partitionSpec)
+ val fileCatalog = new MetaStoreFileCatalog(sparkSession, paths, partitionSpec)
val inferredSchema = if (fileType.equals("parquet")) {
- val inferredSchema = defaultSource.inferSchema(hive, options, fileCatalog.allFiles())
+ val inferredSchema =
+ defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles())
inferredSchema.map { inferred =>
ParquetRelation.mergeMetastoreParquetSchema(metastoreSchema, inferred)
}.getOrElse(metastoreSchema)
} else {
- defaultSource.inferSchema(hive, options, fileCatalog.allFiles()).get
+ defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()).get
}
val relation = HadoopFsRelation(
- sqlContext = hive,
+ sparkSession = sparkSession,
location = fileCatalog,
partitionSchema = partitionSchema,
dataSchema = inferredSchema,
@@ -314,7 +314,7 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging {
val created =
LogicalRelation(
DataSource(
- sqlContext = hive,
+ sparkSession = sparkSession,
paths = paths,
userSpecifiedSchema = Some(metastoreRelation.schema),
bucketSpec = bucketSpec,
@@ -436,7 +436,8 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging {
case p: LogicalPlan if !p.childrenResolved => p
case p: LogicalPlan if p.resolved => p
- case CreateViewCommand(table, child, allowExisting, replace, sql) if !conf.nativeView =>
+ case CreateViewCommand(table, child, allowExisting, replace, sql)
+ if !sessionState.conf.nativeView =>
HiveNativeCommand(sql)
case p @ CreateTableAsSelectLogicalPlan(table, child, allowExisting) =>
@@ -462,7 +463,7 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging {
val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists
CreateTableUsingAsSelect(
TableIdentifier(desc.identifier.table),
- conf.defaultDataSourceName,
+ sessionState.conf.defaultDataSourceName,
temporary = false,
Array.empty[String],
bucketSpec = None,
@@ -538,13 +539,17 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging {
* the information from the metastore.
*/
private[hive] class MetaStoreFileCatalog(
- ctx: SQLContext,
+ sparkSession: SparkSession,
paths: Seq[Path],
partitionSpecFromHive: PartitionSpec)
- extends HDFSFileCatalog(ctx, Map.empty, paths, Some(partitionSpecFromHive.partitionColumns)) {
+ extends HDFSFileCatalog(
+ sparkSession,
+ Map.empty,
+ paths,
+ Some(partitionSpecFromHive.partitionColumns)) {
override def getStatus(path: Path): Array[FileStatus] = {
- val fs = path.getFileSystem(ctx.sessionState.hadoopConf)
+ val fs = path.getFileSystem(sparkSession.sessionState.hadoopConf)
fs.listStatus(path)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index 9e527073d4..f70131ec86 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.hive.ql.exec.{UDAF, UDF}
import org.apache.hadoop.hive.ql.exec.{FunctionRegistry => HiveFunctionRegistry}
import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF}
-import org.apache.spark.sql.{AnalysisException, SQLContext}
+import org.apache.spark.sql.{AnalysisException, SparkSession}
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
@@ -43,7 +43,7 @@ import org.apache.spark.util.Utils
private[sql] class HiveSessionCatalog(
externalCatalog: HiveExternalCatalog,
client: HiveClient,
- context: SQLContext,
+ sparkSession: SparkSession,
functionResourceLoader: FunctionResourceLoader,
functionRegistry: FunctionRegistry,
conf: SQLConf,
@@ -82,7 +82,7 @@ private[sql] class HiveSessionCatalog(
// essentially a cache for metastore tables. However, it relies on a lot of session-specific
// things so it would be a lot of work to split its functionality between HiveSessionCatalog
// and HiveCatalog. We should still do it at some point...
- private val metastoreCatalog = new HiveMetastoreCatalog(context)
+ private val metastoreCatalog = new HiveMetastoreCatalog(sparkSession)
val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions
val OrcConversions: Rule[LogicalPlan] = metastoreCatalog.OrcConversions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index bf0288c9f7..4a8978e553 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -33,11 +33,14 @@ import org.apache.spark.sql.internal.SessionState
/**
* A class that holds all session-specific state in a given [[SparkSession]] backed by Hive.
*/
-private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx) {
+private[hive] class HiveSessionState(sparkSession: SparkSession)
+ extends SessionState(sparkSession) {
self =>
- private lazy val sharedState: HiveSharedState = ctx.sharedState.asInstanceOf[HiveSharedState]
+ private lazy val sharedState: HiveSharedState = {
+ sparkSession.sharedState.asInstanceOf[HiveSharedState]
+ }
/**
* A Hive client used for execution.
@@ -72,8 +75,8 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx)
new HiveSessionCatalog(
sharedState.externalCatalog,
metadataHive,
- ctx,
- ctx.sessionState.functionResourceLoader,
+ sparkSession,
+ functionResourceLoader,
functionRegistry,
conf,
hiveconf)
@@ -91,7 +94,7 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx)
catalog.PreInsertionCasts ::
PreInsertCastAndRename ::
DataSourceAnalysis ::
- (if (conf.runSQLonFile) new ResolveDataSource(ctx) :: Nil else Nil)
+ (if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil)
override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog))
}
@@ -101,9 +104,9 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx)
* Planner that takes into account Hive-specific strategies.
*/
override def planner: SparkPlanner = {
- new SparkPlanner(ctx.sparkContext, conf, experimentalMethods.extraStrategies)
+ new SparkPlanner(sparkSession.sparkContext, conf, experimentalMethods.extraStrategies)
with HiveStrategies {
- override val context: SQLContext = ctx
+ override val sparkSession: SparkSession = self.sparkSession
override val hiveconf: HiveConf = self.hiveconf
override def strategies: Seq[Strategy] = {
@@ -225,7 +228,7 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx)
// TODO: why do we get this from SparkConf but not SQLConf?
def hiveThriftServerSingleSession: Boolean = {
- ctx.sparkContext.conf.getBoolean(
+ sparkSession.sparkContext.conf.getBoolean(
"spark.sql.hive.thriftServer.singleSession", defaultValue = false)
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 2bea32b144..7d1f87f390 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -33,7 +33,7 @@ private[hive] trait HiveStrategies {
// Possibly being too clever with types here... or not clever enough.
self: SparkPlanner =>
- val context: SQLContext
+ val sparkSession: SparkSession
val hiveconf: HiveConf
object Scripts extends Strategy {
@@ -78,7 +78,7 @@ private[hive] trait HiveStrategies {
projectList,
otherPredicates,
identity[Seq[Expression]],
- HiveTableScanExec(_, relation, pruningPredicates)(context, hiveconf)) :: Nil
+ HiveTableScanExec(_, relation, pruningPredicates)(sparkSession, hiveconf)) :: Nil
case _ =>
Nil
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
index cd45706841..0520e75306 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema
import org.apache.hadoop.hive.ql.metadata.{Partition, Table => HiveTable}
import org.apache.hadoop.hive.ql.plan.TableDesc
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference, Expression}
@@ -42,7 +42,7 @@ private[hive] case class MetastoreRelation(
alias: Option[String])
(val catalogTable: CatalogTable,
@transient private val client: HiveClient,
- @transient private val sqlContext: SQLContext)
+ @transient private val sparkSession: SparkSession)
extends LeafNode with MultiInstanceRelation with FileRelation with CatalogRelation {
override def equals(other: Any): Boolean = other match {
@@ -58,7 +58,7 @@ private[hive] case class MetastoreRelation(
Objects.hashCode(databaseName, tableName, alias, output)
}
- override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sqlContext :: Nil
+ override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil
private def toHiveColumn(c: CatalogColumn): FieldSchema = {
new FieldSchema(c.name, c.dataType, c.comment.orNull)
@@ -124,7 +124,7 @@ private[hive] case class MetastoreRelation(
// if the size is still less than zero, we use default size
Option(totalSize).map(_.toLong).filter(_ > 0)
.getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0)
- .getOrElse(sqlContext.conf.defaultSizeInBytes)))
+ .getOrElse(sparkSession.sessionState.conf.defaultSizeInBytes)))
}
)
@@ -133,7 +133,7 @@ private[hive] case class MetastoreRelation(
private lazy val allPartitions: Seq[CatalogTablePartition] = client.getAllPartitions(catalogTable)
def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
- val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
+ val rawPartitions = if (sparkSession.sessionState.conf.metastorePartitionPruning) {
client.getPartitionsByFilter(catalogTable, predicates)
} else {
allPartitions
@@ -226,6 +226,6 @@ private[hive] case class MetastoreRelation(
}
override def newInstance(): MetastoreRelation = {
- MetastoreRelation(databaseName, tableName, alias)(catalogTable, client, sqlContext)
+ MetastoreRelation(databaseName, tableName, alias)(catalogTable, client, sparkSession)
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
index e95069e830..af0317f7a1 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
@@ -36,7 +36,7 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD}
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
@@ -61,7 +61,7 @@ private[hive]
class HadoopTableReader(
@transient private val attributes: Seq[Attribute],
@transient private val relation: MetastoreRelation,
- @transient private val sc: SQLContext,
+ @transient private val sparkSession: SparkSession,
hiveconf: HiveConf)
extends TableReader with Logging {
@@ -69,15 +69,15 @@ class HadoopTableReader(
// https://hadoop.apache.org/docs/r1.0.4/mapred-default.html
//
// In order keep consistency with Hive, we will let it be 0 in local mode also.
- private val _minSplitsPerRDD = if (sc.sparkContext.isLocal) {
+ private val _minSplitsPerRDD = if (sparkSession.sparkContext.isLocal) {
0 // will splitted based on block by default.
} else {
- math.max(hiveconf.getInt("mapred.map.tasks", 1), sc.sparkContext.defaultMinPartitions)
+ math.max(hiveconf.getInt("mapred.map.tasks", 1), sparkSession.sparkContext.defaultMinPartitions)
}
- SparkHadoopUtil.get.appendS3AndSparkHadoopConfigurations(sc.sparkContext.conf, hiveconf)
+ SparkHadoopUtil.get.appendS3AndSparkHadoopConfigurations(sparkSession.sparkContext.conf, hiveconf)
private val _broadcastedHiveConf =
- sc.sparkContext.broadcast(new SerializableConfiguration(hiveconf))
+ sparkSession.sparkContext.broadcast(new SerializableConfiguration(hiveconf))
override def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow] =
makeRDDForTable(
@@ -153,7 +153,7 @@ class HadoopTableReader(
def verifyPartitionPath(
partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]]):
Map[HivePartition, Class[_ <: Deserializer]] = {
- if (!sc.conf.verifyPartitionPath) {
+ if (!sparkSession.sessionState.conf.verifyPartitionPath) {
partitionToDeserializer
} else {
var existPathSet = collection.mutable.Set[String]()
@@ -246,7 +246,7 @@ class HadoopTableReader(
// Even if we don't use any partitions, we still need an empty RDD
if (hivePartitionRDDs.size == 0) {
- new EmptyRDD[InternalRow](sc.sparkContext)
+ new EmptyRDD[InternalRow](sparkSession.sparkContext)
} else {
new UnionRDD(hivePartitionRDDs(0).context, hivePartitionRDDs)
}
@@ -278,7 +278,7 @@ class HadoopTableReader(
val initializeJobConfFunc = HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _
val rdd = new HadoopRDD(
- sc.sparkContext,
+ sparkSession.sparkContext,
_broadcastedHiveConf.asInstanceOf[Broadcast[SerializableConfiguration]],
Some(initializeJobConfFunc),
inputFormatClass,
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index 9240f9c7d2..08d4b99d30 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -17,7 +17,7 @@
package org.apache.spark.sql.hive.execution
-import org.apache.spark.sql.{AnalysisException, Row, SQLContext}
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable}
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
import org.apache.spark.sql.execution.command.RunnableCommand
@@ -42,7 +42,7 @@ case class CreateTableAsSelect(
override def children: Seq[LogicalPlan] = Seq(query)
- override def run(sqlContext: SQLContext): Seq[Row] = {
+ override def run(sparkSession: SparkSession): Seq[Row] = {
lazy val metastoreRelation: MetastoreRelation = {
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
@@ -68,24 +68,24 @@ case class CreateTableAsSelect(
withFormat
}
- sqlContext.sessionState.catalog.createTable(withSchema, ignoreIfExists = false)
+ sparkSession.sessionState.catalog.createTable(withSchema, ignoreIfExists = false)
// Get the Metastore Relation
- sqlContext.sessionState.catalog.lookupRelation(tableIdentifier) match {
+ sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
case r: MetastoreRelation => r
}
}
// TODO ideally, we should get the output data ready first and then
// add the relation into catalog, just in case of failure occurs while data
// processing.
- if (sqlContext.sessionState.catalog.tableExists(tableIdentifier)) {
+ if (sparkSession.sessionState.catalog.tableExists(tableIdentifier)) {
if (allowExisting) {
// table already exists, will do nothing, to keep consistent with Hive
} else {
throw new AnalysisException(s"$tableIdentifier already exists.")
}
} else {
- sqlContext.executePlan(InsertIntoTable(
+ sparkSession.executePlan(InsertIntoTable(
metastoreRelation, Map(), query, overwrite = true, ifNotExists = false)).toRdd
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
index 0f72091096..cc5bbf59db 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala
@@ -27,7 +27,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.Object
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils
import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution._
@@ -48,8 +48,8 @@ case class HiveTableScanExec(
requestedAttributes: Seq[Attribute],
relation: MetastoreRelation,
partitionPruningPred: Seq[Expression])(
- @transient val context: SQLContext,
- @transient val hiveconf: HiveConf)
+ @transient private val sparkSession: SparkSession,
+ @transient private val hiveconf: HiveConf)
extends LeafExecNode {
require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned,
@@ -84,7 +84,7 @@ case class HiveTableScanExec(
@transient
private[this] val hadoopReader =
- new HadoopTableReader(attributes, relation, context, hiveExtraConf)
+ new HadoopTableReader(attributes, relation, sparkSession, hiveExtraConf)
private[this] def castFromString(value: String, dataType: DataType) = {
Cast(Literal(value), dataType).eval(null)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
index 1095f5fd95..cb49fc910b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala
@@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.{HadoopRDD, RDD}
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
@@ -52,17 +52,17 @@ private[sql] class DefaultSource
override def toString: String = "ORC"
override def inferSchema(
- sqlContext: SQLContext,
+ sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
OrcFileOperator.readSchema(
files.map(_.getPath.toUri.toString),
- Some(new Configuration(sqlContext.sessionState.hadoopConf))
+ Some(new Configuration(sparkSession.sessionState.hadoopConf))
)
}
override def prepareWrite(
- sqlContext: SQLContext,
+ sparkSession: SparkSession,
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = {
@@ -109,15 +109,15 @@ private[sql] class DefaultSource
}
override def buildReader(
- sqlContext: SQLContext,
+ sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = {
- val orcConf = new Configuration(sqlContext.sessionState.hadoopConf)
+ val orcConf = new Configuration(sparkSession.sessionState.hadoopConf)
- if (sqlContext.conf.orcFilterPushDown) {
+ if (sparkSession.sessionState.conf.orcFilterPushDown) {
// Sets pushed predicates
OrcFilters.createFilter(filters.toArray).foreach { f =>
orcConf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo)
@@ -125,7 +125,8 @@ private[sql] class DefaultSource
}
}
- val broadcastedConf = sqlContext.sparkContext.broadcast(new SerializableConfiguration(orcConf))
+ val broadcastedConf =
+ sparkSession.sparkContext.broadcast(new SerializableConfiguration(orcConf))
(file: PartitionedFile) => {
val conf = broadcastedConf.value.value
@@ -270,7 +271,7 @@ private[orc] class OrcOutputWriter(
}
private[orc] case class OrcTableScan(
- @transient sqlContext: SQLContext,
+ @transient sparkSession: SparkSession,
attributes: Seq[Attribute],
filters: Array[Filter],
@transient inputPaths: Seq[FileStatus])
@@ -278,11 +279,11 @@ private[orc] case class OrcTableScan(
with HiveInspectors {
def execute(): RDD[InternalRow] = {
- val job = Job.getInstance(new Configuration(sqlContext.sessionState.hadoopConf))
+ val job = Job.getInstance(new Configuration(sparkSession.sessionState.hadoopConf))
val conf = job.getConfiguration
// Tries to push down filters if ORC filter push-down is enabled
- if (sqlContext.conf.orcFilterPushDown) {
+ if (sparkSession.sessionState.conf.orcFilterPushDown) {
OrcFilters.createFilter(filters).foreach { f =>
conf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo)
conf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true)
@@ -294,14 +295,14 @@ private[orc] case class OrcTableScan(
val orcFormat = new DefaultSource
val dataSchema =
orcFormat
- .inferSchema(sqlContext, Map.empty, inputPaths)
+ .inferSchema(sparkSession, Map.empty, inputPaths)
.getOrElse(sys.error("Failed to read schema from target ORC files."))
// Sets requested columns
OrcRelation.setRequiredColumns(conf, dataSchema, StructType.fromAttributes(attributes))
if (inputPaths.isEmpty) {
// the input path probably be pruned, return an empty RDD.
- return sqlContext.sparkContext.emptyRDD[InternalRow]
+ return sparkSession.sparkContext.emptyRDD[InternalRow]
}
FileInputFormat.setInputPaths(job, inputPaths.map(_.getPath): _*)
@@ -309,7 +310,7 @@ private[orc] case class OrcTableScan(
classOf[OrcInputFormat]
.asInstanceOf[Class[_ <: MapRedInputFormat[NullWritable, Writable]]]
- val rdd = sqlContext.sparkContext.hadoopRDD(
+ val rdd = sparkSession.sparkContext.hadoopRDD(
conf.asInstanceOf[JobConf],
inputFormatClass,
classOf[NullWritable],
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index 04b2494043..f74e5cd6f5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -71,7 +71,9 @@ object TestHive
* hive metastore seems to lead to weird non-deterministic failures. Therefore, the execution of
* test cases that rely on TestHive must be serialized.
*/
-class TestHiveContext(@transient val sparkSession: TestHiveSparkSession, isRootContext: Boolean)
+class TestHiveContext(
+ @transient override val sparkSession: TestHiveSparkSession,
+ isRootContext: Boolean)
extends SQLContext(sparkSession, isRootContext) {
def this(sc: SparkContext) {
@@ -106,11 +108,11 @@ class TestHiveContext(@transient val sparkSession: TestHiveSparkSession, isRootC
private[hive] class TestHiveSparkSession(
- sc: SparkContext,
+ @transient private val sc: SparkContext,
val warehousePath: File,
scratchDirPath: File,
metastoreTemporaryConf: Map[String, String],
- existingSharedState: Option[TestHiveSharedState])
+ @transient private val existingSharedState: Option[TestHiveSharedState])
extends SparkSession(sc) with Logging { self =>
def this(sc: SparkContext) {
@@ -463,7 +465,7 @@ private[hive] class TestHiveSparkSession(
private[hive] class TestHiveQueryExecution(
sparkSession: TestHiveSparkSession,
logicalPlan: LogicalPlan)
- extends QueryExecution(new SQLContext(sparkSession), logicalPlan) with Logging {
+ extends QueryExecution(sparkSession, logicalPlan) with Logging {
def this(sparkSession: TestHiveSparkSession, sql: String) {
this(sparkSession, sparkSession.sessionState.sqlParser.parsePlan(sql))
@@ -525,7 +527,7 @@ private[hive] class TestHiveSharedState(
private[hive] class TestHiveSessionState(sparkSession: TestHiveSparkSession)
- extends HiveSessionState(new SQLContext(sparkSession)) {
+ extends HiveSessionState(sparkSession) {
override lazy val conf: SQLConf = {
new SQLConf {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/SQLBuilderTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/SQLBuilderTest.scala
index b121600dae..27c9e992de 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/SQLBuilderTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/SQLBuilderTest.scala
@@ -64,7 +64,7 @@ abstract class SQLBuilderTest extends QueryTest with TestHiveSingleton {
""".stripMargin)
}
- checkAnswer(sqlContext.sql(generatedSQL), Dataset.ofRows(sqlContext, plan))
+ checkAnswer(sqlContext.sql(generatedSQL), Dataset.ofRows(sqlContext.sparkSession, plan))
}
protected def checkSQL(df: DataFrame, expectedSQL: String): Unit = {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 5965cdc81c..7cd01c9104 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -701,7 +701,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
// Manually create a metastore data source table.
CreateDataSourceTableUtils.createDataSourceTable(
- sqlContext = sqlContext,
+ sparkSession = sqlContext.sparkSession,
tableIdent = TableIdentifier("wide_schema"),
userSpecifiedSchema = Some(schema),
partitionColumns = Array.empty[String],
@@ -910,7 +910,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
val schema = StructType((1 to 5).map(i => StructField(s"c_$i", StringType)))
CreateDataSourceTableUtils.createDataSourceTable(
- sqlContext = sqlContext,
+ sparkSession = sqlContext.sparkSession,
tableIdent = TableIdentifier("not_skip_hive_metadata"),
userSpecifiedSchema = Some(schema),
partitionColumns = Array.empty[String],
@@ -925,7 +925,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
.forall(column => DataTypeParser.parse(column.dataType) == StringType))
CreateDataSourceTableUtils.createDataSourceTable(
- sqlContext = sqlContext,
+ sparkSession = sqlContext.sparkSession,
tableIdent = TableIdentifier("skip_hive_metadata"),
userSpecifiedSchema = Some(schema),
partitionColumns = Array.empty[String],
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
index bc87d3ef38..b16c9c133b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
@@ -975,7 +975,7 @@ class TungstenAggregationQueryWithControlledFallbackSuite extends AggregationQue
// Create a new df to make sure its physical operator picks up
// spark.sql.TungstenAggregate.testFallbackStartsAt.
// todo: remove it?
- val newActual = Dataset.ofRows(sqlContext, actual.logicalPlan)
+ val newActual = Dataset.ofRows(sqlContext.sparkSession, actual.logicalPlan)
QueryTest.checkAnswer(newActual, expectedAnswer) match {
case Some(errorMessage) =>
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala
index 4a2d190353..5a8a7f0ab5 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.sources
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
import org.apache.spark.TaskContext
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory}
import org.apache.spark.sql.types.StructType
@@ -33,7 +33,7 @@ class CommitFailureTestSource extends SimpleTextSource {
* by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass.
*/
override def prepareWrite(
- sqlContext: SQLContext,
+ sparkSession: SparkSession,
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory =
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
index eced8ed57f..e4bd1f93c5 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala
@@ -25,7 +25,7 @@ import org.apache.hadoop.io.{NullWritable, Text}
import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext}
import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat}
-import org.apache.spark.sql.{sources, Row, SQLContext}
+import org.apache.spark.sql.{sources, Row, SparkSession}
import org.apache.spark.sql.catalyst.{expressions, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, GenericInternalRow, InterpretedPredicate, InterpretedProjection, JoinedRow, Literal}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
@@ -37,14 +37,14 @@ class SimpleTextSource extends FileFormat with DataSourceRegister {
override def shortName(): String = "test"
override def inferSchema(
- sqlContext: SQLContext,
+ sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = {
Some(DataType.fromJson(options("dataSchema")).asInstanceOf[StructType])
}
override def prepareWrite(
- sqlContext: SQLContext,
+ sparkSession: SparkSession,
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory = new OutputWriterFactory {
@@ -58,7 +58,7 @@ class SimpleTextSource extends FileFormat with DataSourceRegister {
}
override def buildReader(
- sqlContext: SQLContext,
+ sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
@@ -74,9 +74,9 @@ class SimpleTextSource extends FileFormat with DataSourceRegister {
inputAttributes.find(_.name == field.name)
}
- val conf = new Configuration(sqlContext.sessionState.hadoopConf)
+ val conf = new Configuration(sparkSession.sessionState.hadoopConf)
val broadcastedConf =
- sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf))
+ sparkSession.sparkContext.broadcast(new SerializableConfiguration(conf))
(file: PartitionedFile) => {
val predicate = {