aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2015-05-11 10:58:56 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-05-11 10:58:56 -0700
commit1b46556999ca126cb593ef052d24afcb75383223 (patch)
tree9a044f110e130c95bba848b5cacf36f6d2b14109 /streaming
parent4f8a15519267ac205424270155254382cc2d3690 (diff)
downloadspark-1b46556999ca126cb593ef052d24afcb75383223.tar.gz
spark-1b46556999ca126cb593ef052d24afcb75383223.tar.bz2
spark-1b46556999ca126cb593ef052d24afcb75383223.zip
[SPARK-7361] [STREAMING] Throw unambiguous exception when attempting to start multiple StreamingContexts in the same JVM
Currently attempt to start a streamingContext while another one is started throws a confusing exception that the action name JobScheduler is already registered. Instead its best to throw a proper exception as it is not supported. Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #5907 from tdas/SPARK-7361 and squashes the following commits: fb81c4a [Tathagata Das] Fix typo a9cd5bb [Tathagata Das] Added startSite to StreamingContext 5fdfc0d [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7361 5870e2b [Tathagata Das] Added check for multiple streaming contexts
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala48
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala18
2 files changed, 58 insertions, 8 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index bbdb4e8af0..5abe136775 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -18,7 +18,7 @@
package org.apache.spark.streaming
import java.io.InputStream
-import java.util.concurrent.atomic.AtomicInteger
+import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
import scala.collection.Map
import scala.collection.mutable.Queue
@@ -28,8 +28,9 @@ import akka.actor.{Props, SupervisorStrategy}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{BytesWritable, LongWritable, Text}
-import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
+import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
+
import org.apache.spark._
import org.apache.spark.annotation.Experimental
import org.apache.spark.input.FixedLengthBinaryInputFormat
@@ -37,8 +38,9 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.{ActorReceiver, ActorSupervisorStrategy, Receiver}
-import org.apache.spark.streaming.scheduler._
+import org.apache.spark.streaming.scheduler.{JobScheduler, StreamingListener}
import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
+import org.apache.spark.util.CallSite
/**
* Main entry point for Spark Streaming functionality. It provides methods used to create
@@ -202,6 +204,8 @@ class StreamingContext private[streaming] (
import StreamingContextState._
private[streaming] var state = Initialized
+ private val startSite = new AtomicReference[CallSite](null)
+
/**
* Return the associated Spark context
*/
@@ -518,6 +522,7 @@ class StreamingContext private[streaming] (
* @throws SparkException if the context has already been started or stopped.
*/
def start(): Unit = synchronized {
+ import StreamingContext._
if (state == Started) {
throw new SparkException("StreamingContext has already been started")
}
@@ -525,10 +530,15 @@ class StreamingContext private[streaming] (
throw new SparkException("StreamingContext has already been stopped")
}
validate()
- sparkContext.setCallSite(DStream.getCreationSite())
- scheduler.start()
- uiTab.foreach(_.attach())
- state = Started
+ startSite.set(DStream.getCreationSite())
+ sparkContext.setCallSite(startSite.get)
+ ACTIVATION_LOCK.synchronized {
+ assertNoOtherContextIsActive()
+ scheduler.start()
+ uiTab.foreach(_.attach())
+ state = Started
+ setActiveContext(this)
+ }
}
/**
@@ -603,6 +613,7 @@ class StreamingContext private[streaming] (
uiTab.foreach(_.detach())
// The state should always be Stopped after calling `stop()`, even if we haven't started yet:
state = Stopped
+ StreamingContext.setActiveContext(null)
}
}
@@ -612,8 +623,29 @@ class StreamingContext private[streaming] (
*/
object StreamingContext extends Logging {
+ /**
+ * Lock that guards access to global variables that track active StreamingContext.
+ */
+ private val ACTIVATION_LOCK = new Object()
- private[streaming] val DEFAULT_CLEANER_TTL = 3600
+ private val activeContext = new AtomicReference[StreamingContext](null)
+
+ private def assertNoOtherContextIsActive(): Unit = {
+ ACTIVATION_LOCK.synchronized {
+ if (activeContext.get() != null) {
+ throw new SparkException(
+ "Only one StreamingContext may be started in this JVM. " +
+ "Currently running StreamingContext was started at" +
+ activeContext.get.startSite.get.longForm)
+ }
+ }
+ }
+
+ private def setActiveContext(ssc: StreamingContext): Unit = {
+ ACTIVATION_LOCK.synchronized {
+ activeContext.set(ssc)
+ }
+ }
@deprecated("Replaced by implicit functions in the DStream companion object. This is " +
"kept here only for backward compatibility.", "1.3.0")
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index a589deb1fa..11c7fd835b 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -480,6 +480,24 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
}
}
+ test("multiple streaming contexts") {
+ sc = new SparkContext(new SparkConf().setMaster(master).setAppName(appName))
+ ssc = new StreamingContext(sc, Seconds(1))
+ val input = addInputStream(ssc)
+ input.foreachRDD { rdd => rdd.count }
+ ssc.start()
+
+ // Creating another streaming context should not create errors
+ val anotherSsc = new StreamingContext(sc, Seconds(10))
+ val anotherInput = addInputStream(anotherSsc)
+ anotherInput.foreachRDD { rdd => rdd.count }
+
+ val exception = intercept[SparkException] {
+ anotherSsc.start()
+ }
+ assert(exception.getMessage.contains("StreamingContext"), "Did not get the right exception")
+ }
+
test("DStream and generated RDD creation sites") {
testPackage.test()
}