diff options
author | Andrew Or <andrew@databricks.com> | 2016-04-25 20:54:31 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-04-25 20:54:31 -0700 |
commit | 18c2c92580bdc27aa5129d9e7abda418a3633ea6 (patch) | |
tree | 5d4222dd42ea584ad8259fbe7e22f1eb9b5bdb3c /sql/hive | |
parent | fa3c06987e6148975dd54b629bd9094224358175 (diff) | |
download | spark-18c2c92580bdc27aa5129d9e7abda418a3633ea6.tar.gz spark-18c2c92580bdc27aa5129d9e7abda418a3633ea6.tar.bz2 spark-18c2c92580bdc27aa5129d9e7abda418a3633ea6.zip |
[SPARK-14861][SQL] Replace internal usages of SQLContext with SparkSession
## What changes were proposed in this pull request?
In Spark 2.0, `SparkSession` is the new thing. Internally we should stop using `SQLContext` everywhere since that's supposed to be not the main user-facing API anymore.
In this patch I took care to not break any public APIs. The one place that's suspect is `o.a.s.ml.source.libsvm.DefaultSource`, but according to mengxr it's not supposed to be public so it's OK to change the underlying `FileFormat` trait.
**Reviewers**: This is a big patch that may be difficult to review but the changes are actually really straightforward. If you prefer I can break it up into a few smaller patches, but it will delay the progress of this issue a little.
## How was this patch tested?
No change in functionality intended.
Author: Andrew Or <andrew@databricks.com>
Closes #12625 from andrewor14/spark-session-refactor.
Diffstat (limited to 'sql/hive')
15 files changed, 102 insertions, 91 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index edb87b94ea..01b7cfbd2e 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -24,7 +24,7 @@ import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache} import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, SaveMode, SQLContext} +import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession, SQLContext} import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions._ @@ -45,16 +45,15 @@ import org.apache.spark.sql.types._ * This is still used for things like creating data source tables, but in the future will be * cleaned up to integrate more nicely with [[HiveExternalCatalog]]. */ -private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging { - private val conf = hive.conf - private val sessionState = hive.sessionState.asInstanceOf[HiveSessionState] - private val client = hive.sharedState.asInstanceOf[HiveSharedState].metadataHive - private val hiveconf = sessionState.hiveconf +private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Logging { + private val conf = sparkSession.conf + private val sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState] + private val client = sparkSession.sharedState.asInstanceOf[HiveSharedState].metadataHive /** A fully qualified identifier for a table (i.e., database.tableName) */ case class QualifiedTableName(database: String, name: String) - private def getCurrentDatabase: String = hive.sessionState.catalog.getCurrentDatabase + private def getCurrentDatabase: String = sessionState.catalog.getCurrentDatabase def getQualifiedTableName(tableIdent: TableIdentifier): QualifiedTableName = { QualifiedTableName( @@ -124,7 +123,7 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging { val options = table.storage.serdeProperties val dataSource = DataSource( - hive, + sparkSession, userSpecifiedSchema = userSpecifiedSchema, partitionColumns = partitionColumns, bucketSpec = bucketSpec, @@ -179,12 +178,12 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging { alias match { // because hive use things like `_c0` to build the expanded text // currently we cannot support view from "create view v1(c1) as ..." - case None => SubqueryAlias(table.identifier.table, hive.parseSql(viewText)) - case Some(aliasText) => SubqueryAlias(aliasText, hive.parseSql(viewText)) + case None => SubqueryAlias(table.identifier.table, sparkSession.parseSql(viewText)) + case Some(aliasText) => SubqueryAlias(aliasText, sparkSession.parseSql(viewText)) } } else { MetastoreRelation( - qualifiedTableName.database, qualifiedTableName.name, alias)(table, client, hive) + qualifiedTableName.database, qualifiedTableName.name, alias)(table, client, sparkSession) } } @@ -275,19 +274,20 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging { val hadoopFsRelation = cached.getOrElse { val paths = new Path(metastoreRelation.catalogTable.storage.locationUri.get) :: Nil - val fileCatalog = new MetaStoreFileCatalog(hive, paths, partitionSpec) + val fileCatalog = new MetaStoreFileCatalog(sparkSession, paths, partitionSpec) val inferredSchema = if (fileType.equals("parquet")) { - val inferredSchema = defaultSource.inferSchema(hive, options, fileCatalog.allFiles()) + val inferredSchema = + defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()) inferredSchema.map { inferred => ParquetRelation.mergeMetastoreParquetSchema(metastoreSchema, inferred) }.getOrElse(metastoreSchema) } else { - defaultSource.inferSchema(hive, options, fileCatalog.allFiles()).get + defaultSource.inferSchema(sparkSession, options, fileCatalog.allFiles()).get } val relation = HadoopFsRelation( - sqlContext = hive, + sparkSession = sparkSession, location = fileCatalog, partitionSchema = partitionSchema, dataSchema = inferredSchema, @@ -314,7 +314,7 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging { val created = LogicalRelation( DataSource( - sqlContext = hive, + sparkSession = sparkSession, paths = paths, userSpecifiedSchema = Some(metastoreRelation.schema), bucketSpec = bucketSpec, @@ -436,7 +436,8 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging { case p: LogicalPlan if !p.childrenResolved => p case p: LogicalPlan if p.resolved => p - case CreateViewCommand(table, child, allowExisting, replace, sql) if !conf.nativeView => + case CreateViewCommand(table, child, allowExisting, replace, sql) + if !sessionState.conf.nativeView => HiveNativeCommand(sql) case p @ CreateTableAsSelectLogicalPlan(table, child, allowExisting) => @@ -462,7 +463,7 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging { val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists CreateTableUsingAsSelect( TableIdentifier(desc.identifier.table), - conf.defaultDataSourceName, + sessionState.conf.defaultDataSourceName, temporary = false, Array.empty[String], bucketSpec = None, @@ -538,13 +539,17 @@ private[hive] class HiveMetastoreCatalog(hive: SQLContext) extends Logging { * the information from the metastore. */ private[hive] class MetaStoreFileCatalog( - ctx: SQLContext, + sparkSession: SparkSession, paths: Seq[Path], partitionSpecFromHive: PartitionSpec) - extends HDFSFileCatalog(ctx, Map.empty, paths, Some(partitionSpecFromHive.partitionColumns)) { + extends HDFSFileCatalog( + sparkSession, + Map.empty, + paths, + Some(partitionSpecFromHive.partitionColumns)) { override def getStatus(path: Path): Array[FileStatus] = { - val fs = path.getFileSystem(ctx.sessionState.hadoopConf) + val fs = path.getFileSystem(sparkSession.sessionState.hadoopConf) fs.listStatus(path) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala index 9e527073d4..f70131ec86 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.hive.ql.exec.{UDAF, UDF} import org.apache.hadoop.hive.ql.exec.{FunctionRegistry => HiveFunctionRegistry} import org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, GenericUDF, GenericUDTF} -import org.apache.spark.sql.{AnalysisException, SQLContext} +import org.apache.spark.sql.{AnalysisException, SparkSession} import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.FunctionRegistry import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder @@ -43,7 +43,7 @@ import org.apache.spark.util.Utils private[sql] class HiveSessionCatalog( externalCatalog: HiveExternalCatalog, client: HiveClient, - context: SQLContext, + sparkSession: SparkSession, functionResourceLoader: FunctionResourceLoader, functionRegistry: FunctionRegistry, conf: SQLConf, @@ -82,7 +82,7 @@ private[sql] class HiveSessionCatalog( // essentially a cache for metastore tables. However, it relies on a lot of session-specific // things so it would be a lot of work to split its functionality between HiveSessionCatalog // and HiveCatalog. We should still do it at some point... - private val metastoreCatalog = new HiveMetastoreCatalog(context) + private val metastoreCatalog = new HiveMetastoreCatalog(sparkSession) val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions val OrcConversions: Rule[LogicalPlan] = metastoreCatalog.OrcConversions diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index bf0288c9f7..4a8978e553 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -33,11 +33,14 @@ import org.apache.spark.sql.internal.SessionState /** * A class that holds all session-specific state in a given [[SparkSession]] backed by Hive. */ -private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx) { +private[hive] class HiveSessionState(sparkSession: SparkSession) + extends SessionState(sparkSession) { self => - private lazy val sharedState: HiveSharedState = ctx.sharedState.asInstanceOf[HiveSharedState] + private lazy val sharedState: HiveSharedState = { + sparkSession.sharedState.asInstanceOf[HiveSharedState] + } /** * A Hive client used for execution. @@ -72,8 +75,8 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx) new HiveSessionCatalog( sharedState.externalCatalog, metadataHive, - ctx, - ctx.sessionState.functionResourceLoader, + sparkSession, + functionResourceLoader, functionRegistry, conf, hiveconf) @@ -91,7 +94,7 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx) catalog.PreInsertionCasts :: PreInsertCastAndRename :: DataSourceAnalysis :: - (if (conf.runSQLonFile) new ResolveDataSource(ctx) :: Nil else Nil) + (if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil) override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog)) } @@ -101,9 +104,9 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx) * Planner that takes into account Hive-specific strategies. */ override def planner: SparkPlanner = { - new SparkPlanner(ctx.sparkContext, conf, experimentalMethods.extraStrategies) + new SparkPlanner(sparkSession.sparkContext, conf, experimentalMethods.extraStrategies) with HiveStrategies { - override val context: SQLContext = ctx + override val sparkSession: SparkSession = self.sparkSession override val hiveconf: HiveConf = self.hiveconf override def strategies: Seq[Strategy] = { @@ -225,7 +228,7 @@ private[hive] class HiveSessionState(ctx: SQLContext) extends SessionState(ctx) // TODO: why do we get this from SparkConf but not SQLConf? def hiveThriftServerSingleSession: Boolean = { - ctx.sparkContext.conf.getBoolean( + sparkSession.sparkContext.conf.getBoolean( "spark.sql.hive.thriftServer.singleSession", defaultValue = false) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 2bea32b144..7d1f87f390 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -33,7 +33,7 @@ private[hive] trait HiveStrategies { // Possibly being too clever with types here... or not clever enough. self: SparkPlanner => - val context: SQLContext + val sparkSession: SparkSession val hiveconf: HiveConf object Scripts extends Strategy { @@ -78,7 +78,7 @@ private[hive] trait HiveStrategies { projectList, otherPredicates, identity[Seq[Expression]], - HiveTableScanExec(_, relation, pruningPredicates)(context, hiveconf)) :: Nil + HiveTableScanExec(_, relation, pruningPredicates)(sparkSession, hiveconf)) :: Nil case _ => Nil } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala index cd45706841..0520e75306 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala @@ -26,7 +26,7 @@ import org.apache.hadoop.hive.metastore.api.FieldSchema import org.apache.hadoop.hive.ql.metadata.{Partition, Table => HiveTable} import org.apache.hadoop.hive.ql.plan.TableDesc -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{AttributeMap, AttributeReference, Expression} @@ -42,7 +42,7 @@ private[hive] case class MetastoreRelation( alias: Option[String]) (val catalogTable: CatalogTable, @transient private val client: HiveClient, - @transient private val sqlContext: SQLContext) + @transient private val sparkSession: SparkSession) extends LeafNode with MultiInstanceRelation with FileRelation with CatalogRelation { override def equals(other: Any): Boolean = other match { @@ -58,7 +58,7 @@ private[hive] case class MetastoreRelation( Objects.hashCode(databaseName, tableName, alias, output) } - override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sqlContext :: Nil + override protected def otherCopyArgs: Seq[AnyRef] = catalogTable :: sparkSession :: Nil private def toHiveColumn(c: CatalogColumn): FieldSchema = { new FieldSchema(c.name, c.dataType, c.comment.orNull) @@ -124,7 +124,7 @@ private[hive] case class MetastoreRelation( // if the size is still less than zero, we use default size Option(totalSize).map(_.toLong).filter(_ > 0) .getOrElse(Option(rawDataSize).map(_.toLong).filter(_ > 0) - .getOrElse(sqlContext.conf.defaultSizeInBytes))) + .getOrElse(sparkSession.sessionState.conf.defaultSizeInBytes))) } ) @@ -133,7 +133,7 @@ private[hive] case class MetastoreRelation( private lazy val allPartitions: Seq[CatalogTablePartition] = client.getAllPartitions(catalogTable) def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = { - val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) { + val rawPartitions = if (sparkSession.sessionState.conf.metastorePartitionPruning) { client.getPartitionsByFilter(catalogTable, predicates) } else { allPartitions @@ -226,6 +226,6 @@ private[hive] case class MetastoreRelation( } override def newInstance(): MetastoreRelation = { - MetastoreRelation(databaseName, tableName, alias)(catalogTable, client, sqlContext) + MetastoreRelation(databaseName, tableName, alias)(catalogTable, client, sparkSession) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index e95069e830..af0317f7a1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -36,7 +36,7 @@ import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD} -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.util.DateTimeUtils @@ -61,7 +61,7 @@ private[hive] class HadoopTableReader( @transient private val attributes: Seq[Attribute], @transient private val relation: MetastoreRelation, - @transient private val sc: SQLContext, + @transient private val sparkSession: SparkSession, hiveconf: HiveConf) extends TableReader with Logging { @@ -69,15 +69,15 @@ class HadoopTableReader( // https://hadoop.apache.org/docs/r1.0.4/mapred-default.html // // In order keep consistency with Hive, we will let it be 0 in local mode also. - private val _minSplitsPerRDD = if (sc.sparkContext.isLocal) { + private val _minSplitsPerRDD = if (sparkSession.sparkContext.isLocal) { 0 // will splitted based on block by default. } else { - math.max(hiveconf.getInt("mapred.map.tasks", 1), sc.sparkContext.defaultMinPartitions) + math.max(hiveconf.getInt("mapred.map.tasks", 1), sparkSession.sparkContext.defaultMinPartitions) } - SparkHadoopUtil.get.appendS3AndSparkHadoopConfigurations(sc.sparkContext.conf, hiveconf) + SparkHadoopUtil.get.appendS3AndSparkHadoopConfigurations(sparkSession.sparkContext.conf, hiveconf) private val _broadcastedHiveConf = - sc.sparkContext.broadcast(new SerializableConfiguration(hiveconf)) + sparkSession.sparkContext.broadcast(new SerializableConfiguration(hiveconf)) override def makeRDDForTable(hiveTable: HiveTable): RDD[InternalRow] = makeRDDForTable( @@ -153,7 +153,7 @@ class HadoopTableReader( def verifyPartitionPath( partitionToDeserializer: Map[HivePartition, Class[_ <: Deserializer]]): Map[HivePartition, Class[_ <: Deserializer]] = { - if (!sc.conf.verifyPartitionPath) { + if (!sparkSession.sessionState.conf.verifyPartitionPath) { partitionToDeserializer } else { var existPathSet = collection.mutable.Set[String]() @@ -246,7 +246,7 @@ class HadoopTableReader( // Even if we don't use any partitions, we still need an empty RDD if (hivePartitionRDDs.size == 0) { - new EmptyRDD[InternalRow](sc.sparkContext) + new EmptyRDD[InternalRow](sparkSession.sparkContext) } else { new UnionRDD(hivePartitionRDDs(0).context, hivePartitionRDDs) } @@ -278,7 +278,7 @@ class HadoopTableReader( val initializeJobConfFunc = HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _ val rdd = new HadoopRDD( - sc.sparkContext, + sparkSession.sparkContext, _broadcastedHiveConf.asInstanceOf[Broadcast[SerializableConfiguration]], Some(initializeJobConfFunc), inputFormatClass, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala index 9240f9c7d2..08d4b99d30 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive.execution -import org.apache.spark.sql.{AnalysisException, Row, SQLContext} +import org.apache.spark.sql.{AnalysisException, Row, SparkSession} import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable} import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan} import org.apache.spark.sql.execution.command.RunnableCommand @@ -42,7 +42,7 @@ case class CreateTableAsSelect( override def children: Seq[LogicalPlan] = Seq(query) - override def run(sqlContext: SQLContext): Seq[Row] = { + override def run(sparkSession: SparkSession): Seq[Row] = { lazy val metastoreRelation: MetastoreRelation = { import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe @@ -68,24 +68,24 @@ case class CreateTableAsSelect( withFormat } - sqlContext.sessionState.catalog.createTable(withSchema, ignoreIfExists = false) + sparkSession.sessionState.catalog.createTable(withSchema, ignoreIfExists = false) // Get the Metastore Relation - sqlContext.sessionState.catalog.lookupRelation(tableIdentifier) match { + sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match { case r: MetastoreRelation => r } } // TODO ideally, we should get the output data ready first and then // add the relation into catalog, just in case of failure occurs while data // processing. - if (sqlContext.sessionState.catalog.tableExists(tableIdentifier)) { + if (sparkSession.sessionState.catalog.tableExists(tableIdentifier)) { if (allowExisting) { // table already exists, will do nothing, to keep consistent with Hive } else { throw new AnalysisException(s"$tableIdentifier already exists.") } } else { - sqlContext.executePlan(InsertIntoTable( + sparkSession.executePlan(InsertIntoTable( metastoreRelation, Map(), query, overwrite = true, ifNotExists = false)).toRdd } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala index 0f72091096..cc5bbf59db 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScanExec.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.Object import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils import org.apache.spark.rdd.RDD -import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.execution._ @@ -48,8 +48,8 @@ case class HiveTableScanExec( requestedAttributes: Seq[Attribute], relation: MetastoreRelation, partitionPruningPred: Seq[Expression])( - @transient val context: SQLContext, - @transient val hiveconf: HiveConf) + @transient private val sparkSession: SparkSession, + @transient private val hiveconf: HiveConf) extends LeafExecNode { require(partitionPruningPred.isEmpty || relation.hiveQlTable.isPartitioned, @@ -84,7 +84,7 @@ case class HiveTableScanExec( @transient private[this] val hadoopReader = - new HadoopTableReader(attributes, relation, context, hiveExtraConf) + new HadoopTableReader(attributes, relation, sparkSession, hiveExtraConf) private[this] def castFromString(value: String, dataType: DataType) = { Cast(Literal(value), dataType).eval(null) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala index 1095f5fd95..cb49fc910b 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/orc/OrcRelation.scala @@ -34,7 +34,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} import org.apache.spark.internal.Logging import org.apache.spark.rdd.{HadoopRDD, RDD} -import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection @@ -52,17 +52,17 @@ private[sql] class DefaultSource override def toString: String = "ORC" override def inferSchema( - sqlContext: SQLContext, + sparkSession: SparkSession, options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { OrcFileOperator.readSchema( files.map(_.getPath.toUri.toString), - Some(new Configuration(sqlContext.sessionState.hadoopConf)) + Some(new Configuration(sparkSession.sessionState.hadoopConf)) ) } override def prepareWrite( - sqlContext: SQLContext, + sparkSession: SparkSession, job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = { @@ -109,15 +109,15 @@ private[sql] class DefaultSource } override def buildReader( - sqlContext: SQLContext, + sparkSession: SparkSession, dataSchema: StructType, partitionSchema: StructType, requiredSchema: StructType, filters: Seq[Filter], options: Map[String, String]): (PartitionedFile) => Iterator[InternalRow] = { - val orcConf = new Configuration(sqlContext.sessionState.hadoopConf) + val orcConf = new Configuration(sparkSession.sessionState.hadoopConf) - if (sqlContext.conf.orcFilterPushDown) { + if (sparkSession.sessionState.conf.orcFilterPushDown) { // Sets pushed predicates OrcFilters.createFilter(filters.toArray).foreach { f => orcConf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo) @@ -125,7 +125,8 @@ private[sql] class DefaultSource } } - val broadcastedConf = sqlContext.sparkContext.broadcast(new SerializableConfiguration(orcConf)) + val broadcastedConf = + sparkSession.sparkContext.broadcast(new SerializableConfiguration(orcConf)) (file: PartitionedFile) => { val conf = broadcastedConf.value.value @@ -270,7 +271,7 @@ private[orc] class OrcOutputWriter( } private[orc] case class OrcTableScan( - @transient sqlContext: SQLContext, + @transient sparkSession: SparkSession, attributes: Seq[Attribute], filters: Array[Filter], @transient inputPaths: Seq[FileStatus]) @@ -278,11 +279,11 @@ private[orc] case class OrcTableScan( with HiveInspectors { def execute(): RDD[InternalRow] = { - val job = Job.getInstance(new Configuration(sqlContext.sessionState.hadoopConf)) + val job = Job.getInstance(new Configuration(sparkSession.sessionState.hadoopConf)) val conf = job.getConfiguration // Tries to push down filters if ORC filter push-down is enabled - if (sqlContext.conf.orcFilterPushDown) { + if (sparkSession.sessionState.conf.orcFilterPushDown) { OrcFilters.createFilter(filters).foreach { f => conf.set(OrcTableScan.SARG_PUSHDOWN, f.toKryo) conf.setBoolean(ConfVars.HIVEOPTINDEXFILTER.varname, true) @@ -294,14 +295,14 @@ private[orc] case class OrcTableScan( val orcFormat = new DefaultSource val dataSchema = orcFormat - .inferSchema(sqlContext, Map.empty, inputPaths) + .inferSchema(sparkSession, Map.empty, inputPaths) .getOrElse(sys.error("Failed to read schema from target ORC files.")) // Sets requested columns OrcRelation.setRequiredColumns(conf, dataSchema, StructType.fromAttributes(attributes)) if (inputPaths.isEmpty) { // the input path probably be pruned, return an empty RDD. - return sqlContext.sparkContext.emptyRDD[InternalRow] + return sparkSession.sparkContext.emptyRDD[InternalRow] } FileInputFormat.setInputPaths(job, inputPaths.map(_.getPath): _*) @@ -309,7 +310,7 @@ private[orc] case class OrcTableScan( classOf[OrcInputFormat] .asInstanceOf[Class[_ <: MapRedInputFormat[NullWritable, Writable]]] - val rdd = sqlContext.sparkContext.hadoopRDD( + val rdd = sparkSession.sparkContext.hadoopRDD( conf.asInstanceOf[JobConf], inputFormatClass, classOf[NullWritable], diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala index 04b2494043..f74e5cd6f5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala @@ -71,7 +71,9 @@ object TestHive * hive metastore seems to lead to weird non-deterministic failures. Therefore, the execution of * test cases that rely on TestHive must be serialized. */ -class TestHiveContext(@transient val sparkSession: TestHiveSparkSession, isRootContext: Boolean) +class TestHiveContext( + @transient override val sparkSession: TestHiveSparkSession, + isRootContext: Boolean) extends SQLContext(sparkSession, isRootContext) { def this(sc: SparkContext) { @@ -106,11 +108,11 @@ class TestHiveContext(@transient val sparkSession: TestHiveSparkSession, isRootC private[hive] class TestHiveSparkSession( - sc: SparkContext, + @transient private val sc: SparkContext, val warehousePath: File, scratchDirPath: File, metastoreTemporaryConf: Map[String, String], - existingSharedState: Option[TestHiveSharedState]) + @transient private val existingSharedState: Option[TestHiveSharedState]) extends SparkSession(sc) with Logging { self => def this(sc: SparkContext) { @@ -463,7 +465,7 @@ private[hive] class TestHiveSparkSession( private[hive] class TestHiveQueryExecution( sparkSession: TestHiveSparkSession, logicalPlan: LogicalPlan) - extends QueryExecution(new SQLContext(sparkSession), logicalPlan) with Logging { + extends QueryExecution(sparkSession, logicalPlan) with Logging { def this(sparkSession: TestHiveSparkSession, sql: String) { this(sparkSession, sparkSession.sessionState.sqlParser.parsePlan(sql)) @@ -525,7 +527,7 @@ private[hive] class TestHiveSharedState( private[hive] class TestHiveSessionState(sparkSession: TestHiveSparkSession) - extends HiveSessionState(new SQLContext(sparkSession)) { + extends HiveSessionState(sparkSession) { override lazy val conf: SQLConf = { new SQLConf { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/SQLBuilderTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/SQLBuilderTest.scala index b121600dae..27c9e992de 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/SQLBuilderTest.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/catalyst/SQLBuilderTest.scala @@ -64,7 +64,7 @@ abstract class SQLBuilderTest extends QueryTest with TestHiveSingleton { """.stripMargin) } - checkAnswer(sqlContext.sql(generatedSQL), Dataset.ofRows(sqlContext, plan)) + checkAnswer(sqlContext.sql(generatedSQL), Dataset.ofRows(sqlContext.sparkSession, plan)) } protected def checkSQL(df: DataFrame, expectedSQL: String): Unit = { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 5965cdc81c..7cd01c9104 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -701,7 +701,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv // Manually create a metastore data source table. CreateDataSourceTableUtils.createDataSourceTable( - sqlContext = sqlContext, + sparkSession = sqlContext.sparkSession, tableIdent = TableIdentifier("wide_schema"), userSpecifiedSchema = Some(schema), partitionColumns = Array.empty[String], @@ -910,7 +910,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv val schema = StructType((1 to 5).map(i => StructField(s"c_$i", StringType))) CreateDataSourceTableUtils.createDataSourceTable( - sqlContext = sqlContext, + sparkSession = sqlContext.sparkSession, tableIdent = TableIdentifier("not_skip_hive_metadata"), userSpecifiedSchema = Some(schema), partitionColumns = Array.empty[String], @@ -925,7 +925,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv .forall(column => DataTypeParser.parse(column.dataType) == StringType)) CreateDataSourceTableUtils.createDataSourceTable( - sqlContext = sqlContext, + sparkSession = sqlContext.sparkSession, tableIdent = TableIdentifier("skip_hive_metadata"), userSpecifiedSchema = Some(schema), partitionColumns = Array.empty[String], diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala index bc87d3ef38..b16c9c133b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala @@ -975,7 +975,7 @@ class TungstenAggregationQueryWithControlledFallbackSuite extends AggregationQue // Create a new df to make sure its physical operator picks up // spark.sql.TungstenAggregate.testFallbackStartsAt. // todo: remove it? - val newActual = Dataset.ofRows(sqlContext, actual.logicalPlan) + val newActual = Dataset.ofRows(sqlContext.sparkSession, actual.logicalPlan) QueryTest.checkAnswer(newActual, expectedAnswer) match { case Some(errorMessage) => diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala index 4a2d190353..5a8a7f0ab5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/CommitFailureTestSource.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.sources import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext} import org.apache.spark.TaskContext -import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory} import org.apache.spark.sql.types.StructType @@ -33,7 +33,7 @@ class CommitFailureTestSource extends SimpleTextSource { * by setting the output committer class in the conf of spark.sql.sources.outputCommitterClass. */ override def prepareWrite( - sqlContext: SQLContext, + sparkSession: SparkSession, job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala index eced8ed57f..e4bd1f93c5 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/SimpleTextRelation.scala @@ -25,7 +25,7 @@ import org.apache.hadoop.io.{NullWritable, Text} import org.apache.hadoop.mapreduce.{Job, RecordWriter, TaskAttemptContext} import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat, TextOutputFormat} -import org.apache.spark.sql.{sources, Row, SQLContext} +import org.apache.spark.sql.{sources, Row, SparkSession} import org.apache.spark.sql.catalyst.{expressions, InternalRow} import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, GenericInternalRow, InterpretedPredicate, InterpretedProjection, JoinedRow, Literal} import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection @@ -37,14 +37,14 @@ class SimpleTextSource extends FileFormat with DataSourceRegister { override def shortName(): String = "test" override def inferSchema( - sqlContext: SQLContext, + sparkSession: SparkSession, options: Map[String, String], files: Seq[FileStatus]): Option[StructType] = { Some(DataType.fromJson(options("dataSchema")).asInstanceOf[StructType]) } override def prepareWrite( - sqlContext: SQLContext, + sparkSession: SparkSession, job: Job, options: Map[String, String], dataSchema: StructType): OutputWriterFactory = new OutputWriterFactory { @@ -58,7 +58,7 @@ class SimpleTextSource extends FileFormat with DataSourceRegister { } override def buildReader( - sqlContext: SQLContext, + sparkSession: SparkSession, dataSchema: StructType, partitionSchema: StructType, requiredSchema: StructType, @@ -74,9 +74,9 @@ class SimpleTextSource extends FileFormat with DataSourceRegister { inputAttributes.find(_.name == field.name) } - val conf = new Configuration(sqlContext.sessionState.hadoopConf) + val conf = new Configuration(sparkSession.sessionState.hadoopConf) val broadcastedConf = - sqlContext.sparkContext.broadcast(new SerializableConfiguration(conf)) + sparkSession.sparkContext.broadcast(new SerializableConfiguration(conf)) (file: PartitionedFile) => { val predicate = { |