aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authoruncleGen <hustyugm@gmail.com>2017-01-18 10:55:31 -0800
committerShixiong Zhu <shixiong@databricks.com>2017-01-18 10:55:31 -0800
commita81e336f1eddc2c6245d807aae2c81ddc60eabf9 (patch)
treea8033ee2351c5bcb16da85740cf54acc8e6adfac /streaming
parent569e50680f97b1ed054337a39fe198769ef52d93 (diff)
downloadspark-a81e336f1eddc2c6245d807aae2c81ddc60eabf9.tar.gz
spark-a81e336f1eddc2c6245d807aae2c81ddc60eabf9.tar.bz2
spark-a81e336f1eddc2c6245d807aae2c81ddc60eabf9.zip
[SPARK-19182][DSTREAM] Optimize the lock in StreamingJobProgressListener to not block UI when generating Streaming jobs
## What changes were proposed in this pull request? When DStreamGraph is generating a job, it will hold a lock and block other APIs. Because StreamingJobProgressListener (numInactiveReceivers, streamName(streamId: Int), streamIds) needs to call DStreamGraph's methods to access some information, the UI may hang if generating a job is very slow (e.g., talking to the slow Kafka cluster to fetch metadata). It's better to optimize the locks in DStreamGraph and StreamingJobProgressListener to make the UI not block by job generation. ## How was this patch tested? existing ut cc zsxwing Author: uncleGen <hustyugm@gmail.com> Closes #16601 from uncleGen/SPARK-19182.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala13
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala8
2 files changed, 13 insertions, 8 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index 54d736ee51..dce2028b48 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -31,12 +31,15 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
private val inputStreams = new ArrayBuffer[InputDStream[_]]()
private val outputStreams = new ArrayBuffer[DStream[_]]()
+ @volatile private var inputStreamNameAndID: Seq[(String, Int)] = Nil
+
var rememberDuration: Duration = null
var checkpointInProgress = false
var zeroTime: Time = null
var startTime: Time = null
var batchDuration: Duration = null
+ @volatile private var numReceivers: Int = 0
def start(time: Time) {
this.synchronized {
@@ -45,7 +48,9 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
startTime = time
outputStreams.foreach(_.initialize(zeroTime))
outputStreams.foreach(_.remember(rememberDuration))
- outputStreams.foreach(_.validateAtStart)
+ outputStreams.foreach(_.validateAtStart())
+ numReceivers = inputStreams.count(_.isInstanceOf[ReceiverInputDStream[_]])
+ inputStreamNameAndID = inputStreams.map(is => (is.name, is.id))
inputStreams.par.foreach(_.start())
}
}
@@ -106,9 +111,9 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
.toArray
}
- def getInputStreamName(streamId: Int): Option[String] = synchronized {
- inputStreams.find(_.id == streamId).map(_.name)
- }
+ def getNumReceivers: Int = numReceivers
+
+ def getInputStreamNameAndID: Seq[(String, Int)] = inputStreamNameAndID
def generateJobs(time: Time): Seq[Job] = {
logDebug("Generating jobs for time " + time)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index 95f582106c..ed4c1e484e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -169,7 +169,7 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext)
}
def numInactiveReceivers: Int = {
- ssc.graph.getReceiverInputStreams().length - numActiveReceivers
+ ssc.graph.getNumReceivers - numActiveReceivers
}
def numTotalCompletedBatches: Long = synchronized {
@@ -197,17 +197,17 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext)
}
def retainedCompletedBatches: Seq[BatchUIData] = synchronized {
- completedBatchUIData.toSeq
+ completedBatchUIData.toIndexedSeq
}
def streamName(streamId: Int): Option[String] = {
- ssc.graph.getInputStreamName(streamId)
+ ssc.graph.getInputStreamNameAndID.find(_._2 == streamId).map(_._1)
}
/**
* Return all InputDStream Ids
*/
- def streamIds: Seq[Int] = ssc.graph.getInputStreams().map(_.id)
+ def streamIds: Seq[Int] = ssc.graph.getInputStreamNameAndID.map(_._2)
/**
* Return all of the record rates for each InputDStream in each batch. The key of the return value