diff options
author | Ilya Ganelin <ilya.ganelin@capitalone.com> | 2015-04-17 18:28:42 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-04-17 18:28:42 -0700 |
commit | c5ed510135aee3a1a0402057b3b5229892aa6f3a (patch) | |
tree | b1daa6fe6fb42c924059c9442e10b8c4595d148b /core | |
parent | a452c59210cf2c8ff8601cdb11401eea6dc14973 (diff) | |
download | spark-c5ed510135aee3a1a0402057b3b5229892aa6f3a.tar.gz spark-c5ed510135aee3a1a0402057b3b5229892aa6f3a.tar.bz2 spark-c5ed510135aee3a1a0402057b3b5229892aa6f3a.zip |
[SPARK-6703][Core] Provide a way to discover existing SparkContext's
I've added a static getOrCreate method to the static SparkContext object that allows one to either retrieve a previously created SparkContext or to instantiate a new one with the provided config. The method accepts an optional SparkConf to make usage intuitive.
Still working on a test for this, basically want to create a new context from scratch, then ensure that subsequent calls don't overwrite that.
Author: Ilya Ganelin <ilya.ganelin@capitalone.com>
Closes #5501 from ilganeli/SPARK-6703 and squashes the following commits:
db9a963 [Ilya Ganelin] Closing second spark context
1dc0444 [Ilya Ganelin] Added ref equality check
8c884fa [Ilya Ganelin] Made getOrCreate synchronized
cb0c6b7 [Ilya Ganelin] Doc updates and code cleanup
270cfe3 [Ilya Ganelin] [SPARK-6703] Documentation fixes
15e8dea [Ilya Ganelin] Updated comments and added MiMa Exclude
0e1567c [Ilya Ganelin] Got rid of unecessary option for AtomicReference
dfec4da [Ilya Ganelin] Changed activeContext to AtomicReference
733ec9f [Ilya Ganelin] Fixed some bugs in test code
8be2f83 [Ilya Ganelin] Replaced match with if
e92caf7 [Ilya Ganelin] [SPARK-6703] Added test to ensure that getOrCreate both allows creation, retrieval, and a second context if desired
a99032f [Ilya Ganelin] Spacing fix
d7a06b8 [Ilya Ganelin] Updated SparkConf class to add getOrCreate method. Started test suite implementation
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/SparkContext.scala | 49 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/SparkContextSuite.scala | 20 |
2 files changed, 62 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index e106c5c4be..86269eac52 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -23,7 +23,7 @@ import java.io._ import java.lang.reflect.Constructor import java.net.URI import java.util.{Arrays, Properties, UUID} -import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger} +import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean, AtomicInteger} import java.util.UUID.randomUUID import scala.collection.{Map, Set} @@ -1887,11 +1887,12 @@ object SparkContext extends Logging { private val SPARK_CONTEXT_CONSTRUCTOR_LOCK = new Object() /** - * The active, fully-constructed SparkContext. If no SparkContext is active, then this is `None`. + * The active, fully-constructed SparkContext. If no SparkContext is active, then this is `null`. * - * Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK + * Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK. */ - private var activeContext: Option[SparkContext] = None + private val activeContext: AtomicReference[SparkContext] = + new AtomicReference[SparkContext](null) /** * Points to a partially-constructed SparkContext if some thread is in the SparkContext @@ -1926,7 +1927,8 @@ object SparkContext extends Logging { logWarning(warnMsg) } - activeContext.foreach { ctx => + if (activeContext.get() != null) { + val ctx = activeContext.get() val errMsg = "Only one SparkContext may be running in this JVM (see SPARK-2243)." + " To ignore this error, set spark.driver.allowMultipleContexts = true. " + s"The currently running SparkContext was created at:\n${ctx.creationSite.longForm}" @@ -1942,6 +1944,39 @@ object SparkContext extends Logging { } /** + * This function may be used to get or instantiate a SparkContext and register it as a + * singleton object. Because we can only have one active SparkContext per JVM, + * this is useful when applications may wish to share a SparkContext. + * + * Note: This function cannot be used to create multiple SparkContext instances + * even if multiple contexts are allowed. + */ + def getOrCreate(config: SparkConf): SparkContext = { + // Synchronize to ensure that multiple create requests don't trigger an exception + // from assertNoOtherContextIsRunning within setActiveContext + SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { + if (activeContext.get() == null) { + setActiveContext(new SparkContext(config), allowMultipleContexts = false) + } + activeContext.get() + } + } + + /** + * This function may be used to get or instantiate a SparkContext and register it as a + * singleton object. Because we can only have one active SparkContext per JVM, + * this is useful when applications may wish to share a SparkContext. + * + * This method allows not passing a SparkConf (useful if just retrieving). + * + * Note: This function cannot be used to create multiple SparkContext instances + * even if multiple contexts are allowed. + */ + def getOrCreate(): SparkContext = { + getOrCreate(new SparkConf()) + } + + /** * Called at the beginning of the SparkContext constructor to ensure that no SparkContext is * running. Throws an exception if a running context is detected and logs a warning if another * thread is constructing a SparkContext. This warning is necessary because the current locking @@ -1967,7 +2002,7 @@ object SparkContext extends Logging { SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { assertNoOtherContextIsRunning(sc, allowMultipleContexts) contextBeingConstructed = None - activeContext = Some(sc) + activeContext.set(sc) } } @@ -1978,7 +2013,7 @@ object SparkContext extends Logging { */ private[spark] def clearActiveContext(): Unit = { SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { - activeContext = None + activeContext.set(null) } } diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 94be1c6d63..728558a424 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -67,6 +67,26 @@ class SparkContextSuite extends FunSuite with LocalSparkContext { } } + test("Test getOrCreate") { + var sc2: SparkContext = null + SparkContext.clearActiveContext() + val conf = new SparkConf().setAppName("test").setMaster("local") + + sc = SparkContext.getOrCreate(conf) + + assert(sc.getConf.get("spark.app.name").equals("test")) + sc2 = SparkContext.getOrCreate(new SparkConf().setAppName("test2").setMaster("local")) + assert(sc2.getConf.get("spark.app.name").equals("test")) + assert(sc === sc2) + assert(sc eq sc2) + + // Try creating second context to confirm that it's still possible, if desired + sc2 = new SparkContext(new SparkConf().setAppName("test3").setMaster("local") + .set("spark.driver.allowMultipleContexts", "true")) + + sc2.stop() + } + test("BytesWritable implicit conversion is correct") { // Regression test for SPARK-3121 val bytesWritable = new BytesWritable() |