diff options
author | Davies Liu <davies@databricks.com> | 2015-10-08 17:34:24 -0700 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2015-10-08 17:34:24 -0700 |
commit | 3390b400d04e40f767d8a51f1078fcccb4e64abd (patch) | |
tree | d48ed36a14abf0b15467c9ae9c7c04933fdd3a19 /sql/catalyst | |
parent | 84ea287178247c163226e835490c9c70b17d8d3b (diff) | |
download | spark-3390b400d04e40f767d8a51f1078fcccb4e64abd.tar.gz spark-3390b400d04e40f767d8a51f1078fcccb4e64abd.tar.bz2 spark-3390b400d04e40f767d8a51f1078fcccb4e64abd.zip |
[SPARK-10810] [SPARK-10902] [SQL] Improve session management in SQL
This PR improve the sessions management by replacing the thread-local based to one SQLContext per session approach, introduce separated temporary tables and UDFs/UDAFs for each session.
A new session of SQLContext could be created by:
1) create an new SQLContext
2) call newSession() on existing SQLContext
For HiveContext, in order to reduce the cost for each session, the classloader and Hive client are shared across multiple sessions (created by newSession).
CacheManager is also shared by multiple sessions, so cache a table multiple times in different sessions will not cause multiple copies of in-memory cache.
Added jars are still shared by all the sessions, because SparkContext does not support sessions.
cc marmbrus yhuai rxin
Author: Davies Liu <davies@databricks.com>
Closes #8909 from davies/sessions.
Diffstat (limited to 'sql/catalyst')
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala | 28 |
1 files changed, 21 insertions, 7 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index e6122d92b7..ba77b70a37 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -51,23 +51,37 @@ class SimpleFunctionRegistry extends FunctionRegistry { private val functionBuilders = StringKeyHashMap[(ExpressionInfo, FunctionBuilder)](caseSensitive = false) - override def registerFunction(name: String, info: ExpressionInfo, builder: FunctionBuilder) - : Unit = { + override def registerFunction( + name: String, + info: ExpressionInfo, + builder: FunctionBuilder): Unit = synchronized { functionBuilders.put(name, (info, builder)) } override def lookupFunction(name: String, children: Seq[Expression]): Expression = { - val func = functionBuilders.get(name).map(_._2).getOrElse { - throw new AnalysisException(s"undefined function $name") + val func = synchronized { + functionBuilders.get(name).map(_._2).getOrElse { + throw new AnalysisException(s"undefined function $name") + } } func(children) } - override def listFunction(): Seq[String] = functionBuilders.iterator.map(_._1).toList.sorted + override def listFunction(): Seq[String] = synchronized { + functionBuilders.iterator.map(_._1).toList.sorted + } - override def lookupFunction(name: String): Option[ExpressionInfo] = { + override def lookupFunction(name: String): Option[ExpressionInfo] = synchronized { functionBuilders.get(name).map(_._1) } + + def copy(): SimpleFunctionRegistry = synchronized { + val registry = new SimpleFunctionRegistry + functionBuilders.iterator.foreach { case (name, (info, builder)) => + registry.registerFunction(name, info, builder) + } + registry + } } /** @@ -257,7 +271,7 @@ object FunctionRegistry { expression[InputFileName]("input_file_name") ) - val builtin: FunctionRegistry = { + val builtin: SimpleFunctionRegistry = { val fr = new SimpleFunctionRegistry expressions.foreach { case (name, (info, builder)) => fr.registerFunction(name, info, builder) } fr |