aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-05-21 14:08:20 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-05-21 14:08:20 -0700
commit3d0cccc85850ca9c79f3e5ff7395bd04d212b063 (patch)
tree7fb84fd00e51202c23ea989f47bd30a28b786da0 /sql/core
parent30f3f556f7161a49baf145c0cbba8c088b512a6a (diff)
downloadspark-3d0cccc85850ca9c79f3e5ff7395bd04d212b063.tar.gz
spark-3d0cccc85850ca9c79f3e5ff7395bd04d212b063.tar.bz2
spark-3d0cccc85850ca9c79f3e5ff7395bd04d212b063.zip
[SPARK-7478] [SQL] Added SQLContext.getOrCreate
Having a SQLContext singleton would make it easier for applications to use a lazily instantiated single shared instance of SQLContext when needed. It would avoid problems like 1. In REPL/notebook environment, rerunning the line {{val sqlContext = new SQLContext}} multiple times created different contexts while overriding the reference to previous context, leading to issues like registered temp tables going missing. 2. In Streaming, creating SQLContext directly leads to serialization/deserialization issues when attempting to recover from DStream checkpoints. See [SPARK-6770]. Also to get around this problem I had to suggest creating a singleton instance - https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala This can be solved by {{SQLContext.getOrCreate}} which get or creates a new singleton instance of SQLContext using either a given SparkContext or a given SparkConf. rxin marmbrus Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #6006 from tdas/SPARK-7478 and squashes the following commits: 25f4da9 [Tathagata Das] Addressed comments. 79fe069 [Tathagata Das] Added comments. c66ca76 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7478 48adb14 [Tathagata Das] Removed HiveContext.getOrCreate bf8cf50 [Tathagata Das] Fix more bug dec5594 [Tathagata Das] Fixed bug b4e9721 [Tathagata Das] Remove unnecessary import 4ef513b [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7478 d3ea8e4 [Tathagata Das] Added HiveContext 83bc950 [Tathagata Das] Updated tests f82ae81 [Tathagata Das] Fixed test bc72868 [Tathagata Das] Added SQLContext.getOrCreate
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala47
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala49
2 files changed, 95 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 304e958192..1ea596dddf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql
import java.beans.Introspector
import java.util.Properties
+import java.util.concurrent.atomic.AtomicReference
import scala.collection.JavaConversions._
import scala.collection.immutable
@@ -1270,9 +1271,53 @@ class SQLContext(@transient val sparkContext: SparkContext)
////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////
- // End of eeprecated methods
+ // End of deprecated methods
////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////
+
+
+ // Register a succesfully instantiatd context to the singleton. This should be at the end of
+ // the class definition so that the singleton is updated only if there is no exception in the
+ // construction of the instance.
+ SQLContext.setLastInstantiatedContext(self)
}
+/**
+ * This SQLContext object contains utility functions to create a singleton SQLContext instance,
+ * or to get the last created SQLContext instance.
+ */
+object SQLContext {
+
+ private val INSTANTIATION_LOCK = new Object()
+
+ /**
+ * Reference to the last created SQLContext.
+ */
+ @transient private val lastInstantiatedContext = new AtomicReference[SQLContext]()
+
+ /**
+ * Get the singleton SQLContext if it exists or create a new one using the given SparkContext.
+ * This function can be used to create a singleton SQLContext object that can be shared across
+ * the JVM.
+ */
+ def getOrCreate(sparkContext: SparkContext): SQLContext = {
+ INSTANTIATION_LOCK.synchronized {
+ if (lastInstantiatedContext.get() == null) {
+ new SQLContext(sparkContext)
+ }
+ }
+ lastInstantiatedContext.get()
+ }
+
+ private[sql] def clearLastInstantiatedContext(): Unit = {
+ INSTANTIATION_LOCK.synchronized {
+ lastInstantiatedContext.set(null)
+ }
+ }
+ private[sql] def setLastInstantiatedContext(sqlContext: SQLContext): Unit = {
+ INSTANTIATION_LOCK.synchronized {
+ lastInstantiatedContext.set(sqlContext)
+ }
+ }
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
new file mode 100644
index 0000000000..f186bc1c18
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLContextSuite.scala
@@ -0,0 +1,49 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql
+
+import org.scalatest.{BeforeAndAfterAll, FunSuite}
+
+import org.apache.spark.sql.test.TestSQLContext
+
+class SQLContextSuite extends FunSuite with BeforeAndAfterAll {
+
+ private val testSqlContext = TestSQLContext
+ private val testSparkContext = TestSQLContext.sparkContext
+
+ override def afterAll(): Unit = {
+ SQLContext.setLastInstantiatedContext(testSqlContext)
+ }
+
+ test("getOrCreate instantiates SQLContext") {
+ SQLContext.clearLastInstantiatedContext()
+ val sqlContext = SQLContext.getOrCreate(testSparkContext)
+ assert(sqlContext != null, "SQLContext.getOrCreate returned null")
+ assert(SQLContext.getOrCreate(testSparkContext).eq(sqlContext),
+ "SQLContext created by SQLContext.getOrCreate not returned by SQLContext.getOrCreate")
+ }
+
+ test("getOrCreate gets last explicitly instantiated SQLContext") {
+ SQLContext.clearLastInstantiatedContext()
+ val sqlContext = new SQLContext(testSparkContext)
+ assert(SQLContext.getOrCreate(testSparkContext) != null,
+ "SQLContext.getOrCreate after explicitly created SQLContext returned null")
+ assert(SQLContext.getOrCreate(testSparkContext).eq(sqlContext),
+ "SQLContext.getOrCreate after explicitly created SQLContext did not return the context")
+ }
+}