diff options
Diffstat (limited to 'streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala | 13 |
1 files changed, 9 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index 54d736ee51..dce2028b48 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -31,12 +31,15 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { private val inputStreams = new ArrayBuffer[InputDStream[_]]() private val outputStreams = new ArrayBuffer[DStream[_]]() + @volatile private var inputStreamNameAndID: Seq[(String, Int)] = Nil + var rememberDuration: Duration = null var checkpointInProgress = false var zeroTime: Time = null var startTime: Time = null var batchDuration: Duration = null + @volatile private var numReceivers: Int = 0 def start(time: Time) { this.synchronized { @@ -45,7 +48,9 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { startTime = time outputStreams.foreach(_.initialize(zeroTime)) outputStreams.foreach(_.remember(rememberDuration)) - outputStreams.foreach(_.validateAtStart) + outputStreams.foreach(_.validateAtStart()) + numReceivers = inputStreams.count(_.isInstanceOf[ReceiverInputDStream[_]]) + inputStreamNameAndID = inputStreams.map(is => (is.name, is.id)) inputStreams.par.foreach(_.start()) } } @@ -106,9 +111,9 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { .toArray } - def getInputStreamName(streamId: Int): Option[String] = synchronized { - inputStreams.find(_.id == streamId).map(_.name) - } + def getNumReceivers: Int = numReceivers + + def getInputStreamNameAndID: Seq[(String, Int)] = inputStreamNameAndID def generateJobs(time: Time): Seq[Job] = { logDebug("Generating jobs for time " + time) |