diff options
Diffstat (limited to 'streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala | 8 |
1 files changed, 4 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala index 95f582106c..ed4c1e484e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala @@ -169,7 +169,7 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext) } def numInactiveReceivers: Int = { - ssc.graph.getReceiverInputStreams().length - numActiveReceivers + ssc.graph.getNumReceivers - numActiveReceivers } def numTotalCompletedBatches: Long = synchronized { @@ -197,17 +197,17 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext) } def retainedCompletedBatches: Seq[BatchUIData] = synchronized { - completedBatchUIData.toSeq + completedBatchUIData.toIndexedSeq } def streamName(streamId: Int): Option[String] = { - ssc.graph.getInputStreamName(streamId) + ssc.graph.getInputStreamNameAndID.find(_._2 == streamId).map(_._1) } /** * Return all InputDStream Ids */ - def streamIds: Seq[Int] = ssc.graph.getInputStreams().map(_.id) + def streamIds: Seq[Int] = ssc.graph.getInputStreamNameAndID.map(_._2) /** * Return all of the record rates for each InputDStream in each batch. The key of the return value |