aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala8
1 files changed, 4 insertions, 4 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index 95f582106c..ed4c1e484e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -169,7 +169,7 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext)
}
def numInactiveReceivers: Int = {
- ssc.graph.getReceiverInputStreams().length - numActiveReceivers
+ ssc.graph.getNumReceivers - numActiveReceivers
}
def numTotalCompletedBatches: Long = synchronized {
@@ -197,17 +197,17 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext)
}
def retainedCompletedBatches: Seq[BatchUIData] = synchronized {
- completedBatchUIData.toSeq
+ completedBatchUIData.toIndexedSeq
}
def streamName(streamId: Int): Option[String] = {
- ssc.graph.getInputStreamName(streamId)
+ ssc.graph.getInputStreamNameAndID.find(_._2 == streamId).map(_._1)
}
/**
* Return all InputDStream Ids
*/
- def streamIds: Seq[Int] = ssc.graph.getInputStreams().map(_.id)
+ def streamIds: Seq[Int] = ssc.graph.getInputStreamNameAndID.map(_._2)
/**
* Return all of the record rates for each InputDStream in each batch. The key of the return value