aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala13
1 files changed, 9 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index 54d736ee51..dce2028b48 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -31,12 +31,15 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
private val outputStreams = new ArrayBuffer[DStream[_]]()
+ @volatile private var inputStreamNameAndID: Seq[(String, Int)] = Nil
+
var rememberDuration: Duration = null
var checkpointInProgress = false
var zeroTime: Time = null
var startTime: Time = null
var batchDuration: Duration = null
+ @volatile private var numReceivers: Int = 0
def start(time: Time) {
this.synchronized {
@@ -45,7 +48,9 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
startTime = time
outputStreams.foreach(_.initialize(zeroTime))
outputStreams.foreach(_.remember(rememberDuration))
- outputStreams.foreach(_.validateAtStart)
+ outputStreams.foreach(_.validateAtStart())
+ numReceivers = inputStreams.count(_.isInstanceOf[ReceiverInputDStream[_]])
+ inputStreamNameAndID = inputStreams.map(is => (is.name, is.id))
inputStreams.par.foreach(_.start())
}
}
@@ -106,9 +111,9 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
.toArray
}
- def getInputStreamName(streamId: Int): Option[String] = synchronized {
- inputStreams.find(_.id == streamId).map(_.name)
- }
+ def getNumReceivers: Int = numReceivers
+
+ def getInputStreamNameAndID: Seq[(String, Int)] = inputStreamNameAndID
def generateJobs(time: Time): Seq[Job] = {
logDebug("Generating jobs for time " + time)