aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIlya Ganelin <ilya.ganelin@capitalone.com>2015-04-17 18:28:42 -0700
committerAndrew Or <andrew@databricks.com>2015-04-17 18:28:42 -0700
commitc5ed510135aee3a1a0402057b3b5229892aa6f3a (patch)
treeb1daa6fe6fb42c924059c9442e10b8c4595d148b
parenta452c59210cf2c8ff8601cdb11401eea6dc14973 (diff)
downloadspark-c5ed510135aee3a1a0402057b3b5229892aa6f3a.tar.gz
spark-c5ed510135aee3a1a0402057b3b5229892aa6f3a.tar.bz2
spark-c5ed510135aee3a1a0402057b3b5229892aa6f3a.zip
[SPARK-6703][Core] Provide a way to discover existing SparkContext's
I've added a static getOrCreate method to the static SparkContext object that allows one to either retrieve a previously created SparkContext or to instantiate a new one with the provided config. The method accepts an optional SparkConf to make usage intuitive. Still working on a test for this, basically want to create a new context from scratch, then ensure that subsequent calls don't overwrite that. Author: Ilya Ganelin <ilya.ganelin@capitalone.com> Closes #5501 from ilganeli/SPARK-6703 and squashes the following commits: db9a963 [Ilya Ganelin] Closing second spark context 1dc0444 [Ilya Ganelin] Added ref equality check 8c884fa [Ilya Ganelin] Made getOrCreate synchronized cb0c6b7 [Ilya Ganelin] Doc updates and code cleanup 270cfe3 [Ilya Ganelin] [SPARK-6703] Documentation fixes 15e8dea [Ilya Ganelin] Updated comments and added MiMa Exclude 0e1567c [Ilya Ganelin] Got rid of unecessary option for AtomicReference dfec4da [Ilya Ganelin] Changed activeContext to AtomicReference 733ec9f [Ilya Ganelin] Fixed some bugs in test code 8be2f83 [Ilya Ganelin] Replaced match with if e92caf7 [Ilya Ganelin] [SPARK-6703] Added test to ensure that getOrCreate both allows creation, retrieval, and a second context if desired a99032f [Ilya Ganelin] Spacing fix d7a06b8 [Ilya Ganelin] Updated SparkConf class to add getOrCreate method. Started test suite implementation
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala49
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextSuite.scala20
-rw-r--r--project/MimaExcludes.scala4
3 files changed, 66 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index e106c5c4be..86269eac52 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -23,7 +23,7 @@ import java.io._
import java.lang.reflect.Constructor
import java.net.URI
import java.util.{Arrays, Properties, UUID}
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean, AtomicInteger}
import java.util.UUID.randomUUID
import scala.collection.{Map, Set}
@@ -1887,11 +1887,12 @@ object SparkContext extends Logging {
private val SPARK_CONTEXT_CONSTRUCTOR_LOCK = new Object()
/**
- * The active, fully-constructed SparkContext. If no SparkContext is active, then this is `None`.
+ * The active, fully-constructed SparkContext. If no SparkContext is active, then this is `null`.
*
- * Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK
+ * Access to this field is guarded by SPARK_CONTEXT_CONSTRUCTOR_LOCK.
*/
- private var activeContext: Option[SparkContext] = None
+ private val activeContext: AtomicReference[SparkContext] =
+ new AtomicReference[SparkContext](null)
/**
* Points to a partially-constructed SparkContext if some thread is in the SparkContext
@@ -1926,7 +1927,8 @@ object SparkContext extends Logging {
logWarning(warnMsg)
}
- activeContext.foreach { ctx =>
+ if (activeContext.get() != null) {
+ val ctx = activeContext.get()
val errMsg = "Only one SparkContext may be running in this JVM (see SPARK-2243)." +
" To ignore this error, set spark.driver.allowMultipleContexts = true. " +
s"The currently running SparkContext was created at:\n${ctx.creationSite.longForm}"
@@ -1942,6 +1944,39 @@ object SparkContext extends Logging {
}
/**
+ * This function may be used to get or instantiate a SparkContext and register it as a
+ * singleton object. Because we can only have one active SparkContext per JVM,
+ * this is useful when applications may wish to share a SparkContext.
+ *
+ * Note: This function cannot be used to create multiple SparkContext instances
+ * even if multiple contexts are allowed.
+ */
+ def getOrCreate(config: SparkConf): SparkContext = {
+ // Synchronize to ensure that multiple create requests don't trigger an exception
+ // from assertNoOtherContextIsRunning within setActiveContext
+ SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
+ if (activeContext.get() == null) {
+ setActiveContext(new SparkContext(config), allowMultipleContexts = false)
+ }
+ activeContext.get()
+ }
+ }
+
+ /**
+ * This function may be used to get or instantiate a SparkContext and register it as a
+ * singleton object. Because we can only have one active SparkContext per JVM,
+ * this is useful when applications may wish to share a SparkContext.
+ *
+ * This method allows not passing a SparkConf (useful if just retrieving).
+ *
+ * Note: This function cannot be used to create multiple SparkContext instances
+ * even if multiple contexts are allowed.
+ */
+ def getOrCreate(): SparkContext = {
+ getOrCreate(new SparkConf())
+ }
+
+ /**
* Called at the beginning of the SparkContext constructor to ensure that no SparkContext is
* running. Throws an exception if a running context is detected and logs a warning if another
* thread is constructing a SparkContext. This warning is necessary because the current locking
@@ -1967,7 +2002,7 @@ object SparkContext extends Logging {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
assertNoOtherContextIsRunning(sc, allowMultipleContexts)
contextBeingConstructed = None
- activeContext = Some(sc)
+ activeContext.set(sc)
}
}
@@ -1978,7 +2013,7 @@ object SparkContext extends Logging {
*/
private[spark] def clearActiveContext(): Unit = {
SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
- activeContext = None
+ activeContext.set(null)
}
}
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
index 94be1c6d63..728558a424 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala
@@ -67,6 +67,26 @@ class SparkContextSuite extends FunSuite with LocalSparkContext {
}
}
+ test("Test getOrCreate") {
+ var sc2: SparkContext = null
+ SparkContext.clearActiveContext()
+ val conf = new SparkConf().setAppName("test").setMaster("local")
+
+ sc = SparkContext.getOrCreate(conf)
+
+ assert(sc.getConf.get("spark.app.name").equals("test"))
+ sc2 = SparkContext.getOrCreate(new SparkConf().setAppName("test2").setMaster("local"))
+ assert(sc2.getConf.get("spark.app.name").equals("test"))
+ assert(sc === sc2)
+ assert(sc eq sc2)
+
+ // Try creating second context to confirm that it's still possible, if desired
+ sc2 = new SparkContext(new SparkConf().setAppName("test3").setMaster("local")
+ .set("spark.driver.allowMultipleContexts", "true"))
+
+ sc2.stop()
+ }
+
test("BytesWritable implicit conversion is correct") {
// Regression test for SPARK-3121
val bytesWritable = new BytesWritable()
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 1564babefa..7ef363a2f0 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -68,6 +68,10 @@ object MimaExcludes {
// SPARK-6693 add tostring with max lines and width for matrix
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.mllib.linalg.Matrix.toString")
+ )++ Seq(
+ // SPARK-6703 Add getOrCreate method to SparkContext
+ ProblemFilters.exclude[IncompatibleResultTypeProblem]
+ ("org.apache.spark.SparkContext.org$apache$spark$SparkContext$$activeContext")
)
case v if v.startsWith("1.3") =>