diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2015-05-21 14:08:20 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-05-21 14:08:20 -0700 |
commit | 3d0cccc85850ca9c79f3e5ff7395bd04d212b063 (patch) | |
tree | 7fb84fd00e51202c23ea989f47bd30a28b786da0 /sql | |
parent | 30f3f556f7161a49baf145c0cbba8c088b512a6a (diff) | |
download | spark-3d0cccc85850ca9c79f3e5ff7395bd04d212b063.tar.gz spark-3d0cccc85850ca9c79f3e5ff7395bd04d212b063.tar.bz2 spark-3d0cccc85850ca9c79f3e5ff7395bd04d212b063.zip |
[SPARK-7478] [SQL] Added SQLContext.getOrCreate
Having a SQLContext singleton would make it easier for applications to use a lazily instantiated single shared instance of SQLContext when needed. It would avoid problems like
1. In REPL/notebook environment, rerunning the line {{val sqlContext = new SQLContext}} multiple times created different contexts while overriding the reference to previous context, leading to issues like registered temp tables going missing.
2. In Streaming, creating SQLContext directly leads to serialization/deserialization issues when attempting to recover from DStream checkpoints. See [SPARK-6770]. Also to get around this problem I had to suggest creating a singleton instance - https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala
This can be solved by {{SQLContext.getOrCreate}} which get or creates a new singleton instance of SQLContext using either a given SparkContext or a given SparkConf.
rxin marmbrus
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #6006 from tdas/SPARK-7478 and squashes the following commits:
25f4da9 [Tathagata Das] Addressed comments.
79fe069 [Tathagata Das] Added comments.
c66ca76 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7478
48adb14 [Tathagata Das] Removed HiveContext.getOrCreate
bf8cf50 [Tathagata Das] Fix more bug
dec5594 [Tathagata Das] Fixed bug
b4e9721 [Tathagata Das] Remove unnecessary import
4ef513b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7478
d3ea8e4 [Tathagata Das] Added HiveContext
83bc950 [Tathagata Das] Updated tests
f82ae81 [Tathagata Das] Fixed test
bc72868 [Tathagata Das] Added SQLContext.getOrCreate
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala | 47 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala | 49 |
2 files changed, 95 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 304e958192..1ea596dddf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql import java.beans.Introspector import java.util.Properties +import java.util.concurrent.atomic.AtomicReference import scala.collection.JavaConversions._ import scala.collection.immutable @@ -1270,9 +1271,53 @@ class SQLContext(@transient val sparkContext: SparkContext) //////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////// - // End of eeprecated methods + // End of deprecated methods //////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////// + + + // Register a succesfully instantiatd context to the singleton. This should be at the end of + // the class definition so that the singleton is updated only if there is no exception in the + // construction of the instance. + SQLContext.setLastInstantiatedContext(self) } +/** + * This SQLContext object contains utility functions to create a singleton SQLContext instance, + * or to get the last created SQLContext instance. + */ +object SQLContext { + + private val INSTANTIATION_LOCK = new Object() + + /** + * Reference to the last created SQLContext. + */ + @transient private val lastInstantiatedContext = new AtomicReference[SQLContext]() + + /** + * Get the singleton SQLContext if it exists or create a new one using the given SparkContext. + * This function can be used to create a singleton SQLContext object that can be shared across + * the JVM. + */ + def getOrCreate(sparkContext: SparkContext): SQLContext = { + INSTANTIATION_LOCK.synchronized { + if (lastInstantiatedContext.get() == null) { + new SQLContext(sparkContext) + } + } + lastInstantiatedContext.get() + } + + private[sql] def clearLastInstantiatedContext(): Unit = { + INSTANTIATION_LOCK.synchronized { + lastInstantiatedContext.set(null) + } + } + private[sql] def setLastInstantiatedContext(sqlContext: SQLContext): Unit = { + INSTANTIATION_LOCK.synchronized { + lastInstantiatedContext.set(sqlContext) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala new file mode 100644 index 0000000000..f186bc1c18 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala @@ -0,0 +1,49 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.spark.sql + +import org.scalatest.{BeforeAndAfterAll, FunSuite} + +import org.apache.spark.sql.test.TestSQLContext + +class SQLContextSuite extends FunSuite with BeforeAndAfterAll { + + private val testSqlContext = TestSQLContext + private val testSparkContext = TestSQLContext.sparkContext + + override def afterAll(): Unit = { + SQLContext.setLastInstantiatedContext(testSqlContext) + } + + test("getOrCreate instantiates SQLContext") { + SQLContext.clearLastInstantiatedContext() + val sqlContext = SQLContext.getOrCreate(testSparkContext) + assert(sqlContext != null, "SQLContext.getOrCreate returned null") + assert(SQLContext.getOrCreate(testSparkContext).eq(sqlContext), + "SQLContext created by SQLContext.getOrCreate not returned by SQLContext.getOrCreate") + } + + test("getOrCreate gets last explicitly instantiated SQLContext") { + SQLContext.clearLastInstantiatedContext() + val sqlContext = new SQLContext(testSparkContext) + assert(SQLContext.getOrCreate(testSparkContext) != null, + "SQLContext.getOrCreate after explicitly created SQLContext returned null") + assert(SQLContext.getOrCreate(testSparkContext).eq(sqlContext), + "SQLContext.getOrCreate after explicitly created SQLContext did not return the context") + } +} |