diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2015-05-11 10:58:56 -0700 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2015-05-11 10:58:56 -0700 |
commit | 1b46556999ca126cb593ef052d24afcb75383223 (patch) | |
tree | 9a044f110e130c95bba848b5cacf36f6d2b14109 /streaming | |
parent | 4f8a15519267ac205424270155254382cc2d3690 (diff) | |
download | spark-1b46556999ca126cb593ef052d24afcb75383223.tar.gz spark-1b46556999ca126cb593ef052d24afcb75383223.tar.bz2 spark-1b46556999ca126cb593ef052d24afcb75383223.zip |
[SPARK-7361] [STREAMING] Throw unambiguous exception when attempting to start multiple StreamingContexts in the same JVM
Currently attempt to start a streamingContext while another one is started throws a confusing exception that the action name JobScheduler is already registered. Instead its best to throw a proper exception as it is not supported.
Author: Tathagata Das <tathagata.das1565@gmail.com>
Closes #5907 from tdas/SPARK-7361 and squashes the following commits:
fb81c4a [Tathagata Das] Fix typo
a9cd5bb [Tathagata Das] Added startSite to StreamingContext
5fdfc0d [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7361
5870e2b [Tathagata Das] Added check for multiple streaming contexts
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala | 48 | ||||
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala | 18 |
2 files changed, 58 insertions, 8 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index bbdb4e8af0..5abe136775 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -18,7 +18,7 @@ package org.apache.spark.streaming import java.io.InputStream -import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} import scala.collection.Map import scala.collection.mutable.Queue @@ -28,8 +28,9 @@ import akka.actor.{Props, SupervisorStrategy} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{BytesWritable, LongWritable, Text} -import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.lib.input.TextInputFormat +import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} + import org.apache.spark._ import org.apache.spark.annotation.Experimental import org.apache.spark.input.FixedLengthBinaryInputFormat @@ -37,8 +38,9 @@ import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream._ import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver} -import org.apache.spark.streaming.scheduler._ +import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener} import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab} +import org.apache.spark.util.CallSite /** * Main entry point for Spark Streaming functionality. It provides methods used to create @@ -202,6 +204,8 @@ class StreamingContext private[streaming] ( import StreamingContextState._ private[streaming] var state = Initialized + private val startSite = new AtomicReference[CallSite](null) + /** * Return the associated Spark context */ @@ -518,6 +522,7 @@ class StreamingContext private[streaming] ( * @throws SparkException if the context has already been started or stopped. */ def start(): Unit = synchronized { + import StreamingContext._ if (state == Started) { throw new SparkException("StreamingContext has already been started") } @@ -525,10 +530,15 @@ class StreamingContext private[streaming] ( throw new SparkException("StreamingContext has already been stopped") } validate() - sparkContext.setCallSite(DStream.getCreationSite()) - scheduler.start() - uiTab.foreach(_.attach()) - state = Started + startSite.set(DStream.getCreationSite()) + sparkContext.setCallSite(startSite.get) + ACTIVATION_LOCK.synchronized { + assertNoOtherContextIsActive() + scheduler.start() + uiTab.foreach(_.attach()) + state = Started + setActiveContext(this) + } } /** @@ -603,6 +613,7 @@ class StreamingContext private[streaming] ( uiTab.foreach(_.detach()) // The state should always be Stopped after calling `stop()`, even if we haven't started yet: state = Stopped + StreamingContext.setActiveContext(null) } } @@ -612,8 +623,29 @@ class StreamingContext private[streaming] ( */ object StreamingContext extends Logging { + /** + * Lock that guards access to global variables that track active StreamingContext. + */ + private val ACTIVATION_LOCK = new Object() - private[streaming] val DEFAULT_CLEANER_TTL = 3600 + private val activeContext = new AtomicReference[StreamingContext](null) + + private def assertNoOtherContextIsActive(): Unit = { + ACTIVATION_LOCK.synchronized { + if (activeContext.get() != null) { + throw new SparkException( + "Only one StreamingContext may be started in this JVM. " + + "Currently running StreamingContext was started at" + + activeContext.get.startSite.get.longForm) + } + } + } + + private def setActiveContext(ssc: StreamingContext): Unit = { + ACTIVATION_LOCK.synchronized { + activeContext.set(ssc) + } + } @deprecated("Replaced by implicit functions in the DStream companion object. This is " + "kept here only for backward compatibility.", "1.3.0") diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala index a589deb1fa..11c7fd835b 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala @@ -480,6 +480,24 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w } } + test("multiple streaming contexts") { + sc = new SparkContext(new SparkConf().setMaster(master).setAppName(appName)) + ssc = new StreamingContext(sc, Seconds(1)) + val input = addInputStream(ssc) + input.foreachRDD { rdd => rdd.count } + ssc.start() + + // Creating another streaming context should not create errors + val anotherSsc = new StreamingContext(sc, Seconds(10)) + val anotherInput = addInputStream(anotherSsc) + anotherInput.foreachRDD { rdd => rdd.count } + + val exception = intercept[SparkException] { + anotherSsc.start() + } + assert(exception.getMessage.contains("StreamingContext"), "Did not get the right exception") + } + test("DStream and generated RDD creation sites") { testPackage.test() } |