diff options
author | Andrew Or <andrew@databricks.com> | 2016-04-20 12:58:48 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-04-20 12:58:48 -0700 |
commit | 8fc267ab3322e46db81e725a5cb1adb5a71b2b4d (patch) | |
tree | 5332cbe84256366dbe8fb2c7e10b86c3a41d2f32 /sql/core | |
parent | cb8ea9e1f34b9af287b3d10e47f24de4307c63ba (diff) | |
download | spark-8fc267ab3322e46db81e725a5cb1adb5a71b2b4d.tar.gz spark-8fc267ab3322e46db81e725a5cb1adb5a71b2b4d.tar.bz2 spark-8fc267ab3322e46db81e725a5cb1adb5a71b2b4d.zip |
[SPARK-14720][SPARK-13643] Move Hive-specific methods into HiveSessionState and Create a SparkSession class
## What changes were proposed in this pull request?
This PR has two main changes.
1. Move Hive-specific methods from HiveContext to HiveSessionState, which help the work of removing HiveContext.
2. Create a SparkSession Class, which will later be the entry point of Spark SQL users.
## How was this patch tested?
Existing tests
This PR is trying to fix test failures of https://github.com/apache/spark/pull/12485.
Author: Andrew Or <andrew@databricks.com>
Author: Yin Huai <yhuai@databricks.com>
Closes #12522 from yhuai/spark-session.
Diffstat (limited to 'sql/core')
3 files changed, 179 insertions, 27 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 781d699819..f3f84144ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -63,14 +63,18 @@ import org.apache.spark.util.Utils * @since 1.0.0 */ class SQLContext private[sql]( - @transient protected[sql] val sharedState: SharedState, + @transient private val sparkSession: SparkSession, val isRootContext: Boolean) extends Logging with Serializable { self => + private[sql] def this(sparkSession: SparkSession) = { + this(sparkSession, true) + } + def this(sc: SparkContext) = { - this(new SharedState(sc), true) + this(new SparkSession(sc)) } def this(sparkContext: JavaSparkContext) = this(sparkContext.sc) @@ -97,12 +101,15 @@ class SQLContext private[sql]( } } - def sparkContext: SparkContext = sharedState.sparkContext - + protected[sql] def sessionState: SessionState = sparkSession.sessionState + protected[sql] def sharedState: SharedState = sparkSession.sharedState + protected[sql] def conf: SQLConf = sessionState.conf protected[sql] def cacheManager: CacheManager = sharedState.cacheManager protected[sql] def listener: SQLListener = sharedState.listener protected[sql] def externalCatalog: ExternalCatalog = sharedState.externalCatalog + def sparkContext: SparkContext = sharedState.sparkContext + /** * Returns a [[SQLContext]] as new session, with separated SQL configurations, temporary * tables, registered functions, but sharing the same [[SparkContext]], cached data and @@ -110,14 +117,9 @@ class SQLContext private[sql]( * * @since 1.6.0 */ - def newSession(): SQLContext = new SQLContext(sharedState, isRootContext = false) - - /** - * Per-session state, e.g. configuration, functions, temporary tables etc. - */ - @transient - protected[sql] lazy val sessionState: SessionState = new SessionState(self) - protected[spark] def conf: SQLConf = sessionState.conf + def newSession(): SQLContext = { + new SQLContext(sparkSession.newSession(), isRootContext = false) + } /** * An interface to register custom [[org.apache.spark.sql.util.QueryExecutionListener]]s @@ -132,10 +134,14 @@ class SQLContext private[sql]( * @group config * @since 1.0.0 */ - def setConf(props: Properties): Unit = conf.setConf(props) + def setConf(props: Properties): Unit = sessionState.setConf(props) - /** Set the given Spark SQL configuration property. */ - private[sql] def setConf[T](entry: ConfigEntry[T], value: T): Unit = conf.setConf(entry, value) + /** + * Set the given Spark SQL configuration property. + */ + private[sql] def setConf[T](entry: ConfigEntry[T], value: T): Unit = { + sessionState.setConf(entry, value) + } /** * Set the given Spark SQL configuration property. @@ -143,7 +149,7 @@ class SQLContext private[sql]( * @group config * @since 1.0.0 */ - def setConf(key: String, value: String): Unit = conf.setConfString(key, value) + def setConf(key: String, value: String): Unit = sessionState.setConf(key, value) /** * Return the value of Spark SQL configuration property for the given key. @@ -186,23 +192,19 @@ class SQLContext private[sql]( */ def getAllConfs: immutable.Map[String, String] = conf.getAllConfs - // Extract `spark.sql.*` entries and put it in our SQLConf. - // Subclasses may additionally set these entries in other confs. - SQLContext.getSQLProperties(sparkContext.getConf).asScala.foreach { case (k, v) => - setConf(k, v) - } - protected[sql] def parseSql(sql: String): LogicalPlan = sessionState.sqlParser.parsePlan(sql) protected[sql] def executeSql(sql: String): QueryExecution = executePlan(parseSql(sql)) - protected[sql] def executePlan(plan: LogicalPlan) = new QueryExecution(this, plan) + protected[sql] def executePlan(plan: LogicalPlan): QueryExecution = { + sessionState.executePlan(plan) + } /** * Add a jar to SQLContext */ protected[sql] def addJar(path: String): Unit = { - sparkContext.addJar(path) + sessionState.addJar(path) } /** A [[FunctionResourceLoader]] that can be used in SessionCatalog. */ @@ -768,7 +770,7 @@ class SQLContext private[sql]( * as Spark can parse all supported Hive DDLs itself. */ private[sql] def runNativeSql(sqlText: String): Seq[Row] = { - throw new UnsupportedOperationException + sessionState.runNativeSql(sqlText).map { r => Row(r) } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala new file mode 100644 index 0000000000..17ba299825 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import scala.reflect.ClassTag +import scala.util.control.NonFatal + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.internal.config.CATALOG_IMPLEMENTATION +import org.apache.spark.sql.internal.{SessionState, SharedState} +import org.apache.spark.util.Utils + + +/** + * The entry point to Spark execution. + */ +class SparkSession private( + sparkContext: SparkContext, + existingSharedState: Option[SharedState]) { self => + + def this(sc: SparkContext) { + this(sc, None) + } + + /** + * Start a new session where configurations, temp tables, temp functions etc. are isolated. + */ + def newSession(): SparkSession = { + // Note: materialize the shared state here to ensure the parent and child sessions are + // initialized with the same shared state. + new SparkSession(sparkContext, Some(sharedState)) + } + + @transient + protected[sql] lazy val sharedState: SharedState = { + existingSharedState.getOrElse( + SparkSession.reflect[SharedState, SparkContext]( + SparkSession.sharedStateClassName(sparkContext.conf), + sparkContext)) + } + + @transient + protected[sql] lazy val sessionState: SessionState = { + SparkSession.reflect[SessionState, SQLContext]( + SparkSession.sessionStateClassName(sparkContext.conf), + new SQLContext(self, isRootContext = false)) + } + +} + + +private object SparkSession { + + private def sharedStateClassName(conf: SparkConf): String = { + conf.get(CATALOG_IMPLEMENTATION) match { + case "hive" => "org.apache.spark.sql.hive.HiveSharedState" + case "in-memory" => classOf[SharedState].getCanonicalName + } + } + + private def sessionStateClassName(conf: SparkConf): String = { + conf.get(CATALOG_IMPLEMENTATION) match { + case "hive" => "org.apache.spark.sql.hive.HiveSessionState" + case "in-memory" => classOf[SessionState].getCanonicalName + } + } + + /** + * Helper method to create an instance of [[T]] using a single-arg constructor that + * accepts an [[Arg]]. + */ + private def reflect[T, Arg <: AnyRef]( + className: String, + ctorArg: Arg)(implicit ctorArgTag: ClassTag[Arg]): T = { + try { + val clazz = Utils.classForName(className) + val ctor = clazz.getDeclaredConstructor(ctorArgTag.runtimeClass) + ctor.newInstance(ctorArg).asInstanceOf[T] + } catch { + case NonFatal(e) => + throw new IllegalArgumentException(s"Error while instantiating '$className':", e) + } + } + +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala index d404a7c0ae..42915d5887 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala @@ -17,15 +17,22 @@ package org.apache.spark.sql.internal +import java.util.Properties + +import scala.collection.JavaConverters._ + +import org.apache.spark.internal.config.ConfigEntry import org.apache.spark.sql.{ContinuousQueryManager, ExperimentalMethods, SQLContext, UDFRegistration} import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry} import org.apache.spark.sql.catalyst.catalog.SessionCatalog import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, PreInsertCastAndRename, ResolveDataSource} import org.apache.spark.sql.util.ExecutionListenerManager + /** * A class that holds all session-specific state in a given [[SQLContext]]. */ @@ -37,7 +44,10 @@ private[sql] class SessionState(ctx: SQLContext) { /** * SQL-specific key-value configurations. */ - lazy val conf = new SQLConf + lazy val conf: SQLConf = new SQLConf + + // Automatically extract `spark.sql.*` entries and put it in our SQLConf + setConf(SQLContext.getSQLProperties(ctx.sparkContext.getConf)) lazy val experimentalMethods = new ExperimentalMethods @@ -101,5 +111,45 @@ private[sql] class SessionState(ctx: SQLContext) { * Interface to start and stop [[org.apache.spark.sql.ContinuousQuery]]s. */ lazy val continuousQueryManager: ContinuousQueryManager = new ContinuousQueryManager(ctx) -} + + // ------------------------------------------------------ + // Helper methods, partially leftover from pre-2.0 days + // ------------------------------------------------------ + + def executePlan(plan: LogicalPlan): QueryExecution = new QueryExecution(ctx, plan) + + def refreshTable(tableName: String): Unit = { + catalog.refreshTable(sqlParser.parseTableIdentifier(tableName)) + } + + def invalidateTable(tableName: String): Unit = { + catalog.invalidateTable(sqlParser.parseTableIdentifier(tableName)) + } + + final def setConf(properties: Properties): Unit = { + properties.asScala.foreach { case (k, v) => setConf(k, v) } + } + + final def setConf[T](entry: ConfigEntry[T], value: T): Unit = { + conf.setConf(entry, value) + setConf(entry.key, entry.stringConverter(value)) + } + + def setConf(key: String, value: String): Unit = { + conf.setConfString(key, value) + } + + def addJar(path: String): Unit = { + ctx.sparkContext.addJar(path) + } + + def analyze(tableName: String): Unit = { + throw new UnsupportedOperationException + } + + def runNativeSql(sql: String): Seq[String] = { + throw new UnsupportedOperationException + } + +} |