aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-04-10 01:51:42 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-04-10 01:51:42 -0700
commit18ca089bed41ce3e87deeb14206317863518c12c (patch)
tree86809c8b45fe71a5062a57e91885c13e6c40ff68 /streaming
parent3290d2d13bb4bd875aec14425c8e3766f9cc644b (diff)
downloadspark-18ca089bed41ce3e87deeb14206317863518c12c.tar.gz
spark-18ca089bed41ce3e87deeb14206317863518c12c.tar.bz2
spark-18ca089bed41ce3e87deeb14206317863518c12c.zip
[SPARK-6766][Streaming] Fix issue about StreamingListenerBatchSubmitted and StreamingListenerBatchStarted
This PR includes: 1. Send `StreamingListenerBatchSubmitted` when `JobSet` is submitted 1. Fix `StreamingListenerBatchStarted.batchInfo.processingStartTime` 1. Fix a type: `completedaBatchInfos` -> `completedBatchInfos` Author: zsxwing <zsxwing@gmail.com> Closes #5414 from zsxwing/SPARK-6766 and squashes the following commits: 2f85060 [zsxwing] Update tests ca0955b [zsxwing] Combine unit tests 79b4fed [zsxwing] Add StreamingJobProgressListenerSuite to test StreamingJobProgressListener fc3a2a1 [zsxwing] Add unit tests for SPARK-6766 74aed99 [zsxwing] Refactor as per TD's suggestion 493f978 [zsxwing] Send StreamingListenerBatchSubmitted when JobSet is submitted; fix StreamingListenerBatchStarted.batchInfo.processingStartTime; fix a typo
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala8
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala16
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala55
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala119
4 files changed, 180 insertions, 18 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index d6a93acbe7..95f1857b4c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -105,6 +105,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
if (jobSet.jobs.isEmpty) {
logInfo("No jobs added for time " + jobSet.time)
} else {
+ listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
jobSets.put(jobSet.time, jobSet)
jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
logInfo("Added jobs for time " + jobSet.time)
@@ -134,10 +135,13 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
private def handleJobStart(job: Job) {
val jobSet = jobSets.get(job.time)
- if (!jobSet.hasStarted) {
+ val isFirstJobOfJobSet = !jobSet.hasStarted
+ jobSet.handleJobStart(job)
+ if (isFirstJobOfJobSet) {
+ // "StreamingListenerBatchStarted" should be posted after calling "handleJobStart" to get the
+ // correct "jobSet.processingStartTime".
listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo))
}
- jobSet.handleJobStart(job)
logInfo("Starting job " + job.id + " from job set of time " + jobSet.time)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index e4bd067cac..84f80e638f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -33,7 +33,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
private val waitingBatchInfos = new HashMap[Time, BatchInfo]
private val runningBatchInfos = new HashMap[Time, BatchInfo]
- private val completedaBatchInfos = new Queue[BatchInfo]
+ private val completedBatchInfos = new Queue[BatchInfo]
private val batchInfoLimit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
private var totalCompletedBatches = 0L
private var totalReceivedRecords = 0L
@@ -62,7 +62,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted): Unit = {
synchronized {
- runningBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo
+ waitingBatchInfos(batchSubmitted.batchInfo.batchTime) = batchSubmitted.batchInfo
}
}
@@ -79,8 +79,8 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
synchronized {
waitingBatchInfos.remove(batchCompleted.batchInfo.batchTime)
runningBatchInfos.remove(batchCompleted.batchInfo.batchTime)
- completedaBatchInfos.enqueue(batchCompleted.batchInfo)
- if (completedaBatchInfos.size > batchInfoLimit) completedaBatchInfos.dequeue()
+ completedBatchInfos.enqueue(batchCompleted.batchInfo)
+ if (completedBatchInfos.size > batchInfoLimit) completedBatchInfos.dequeue()
totalCompletedBatches += 1L
batchCompleted.batchInfo.receivedBlockInfo.foreach { case (_, infos) =>
@@ -118,7 +118,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}
def retainedCompletedBatches: Seq[BatchInfo] = synchronized {
- completedaBatchInfos.toSeq
+ completedBatchInfos.toSeq
}
def processingDelayDistribution: Option[Distribution] = synchronized {
@@ -165,7 +165,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}
def lastCompletedBatch: Option[BatchInfo] = {
- completedaBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption
+ completedBatchInfos.sortBy(_.batchTime)(Time.ordering).lastOption
}
def lastReceivedBatch: Option[BatchInfo] = {
@@ -174,10 +174,10 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
private def retainedBatches: Seq[BatchInfo] = synchronized {
(waitingBatchInfos.values.toSeq ++
- runningBatchInfos.values.toSeq ++ completedaBatchInfos).sortBy(_.batchTime)(Time.ordering)
+ runningBatchInfos.values.toSeq ++ completedBatchInfos).sortBy(_.batchTime)(Time.ordering)
}
private def extractDistribution(getMetric: BatchInfo => Option[Long]): Option[Distribution] = {
- Distribution(completedaBatchInfos.flatMap(getMetric(_)).map(_.toDouble))
+ Distribution(completedBatchInfos.flatMap(getMetric(_)).map(_.toDouble))
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 852e8bb71d..7210439509 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -46,10 +46,38 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
val collector = new BatchInfoCollector
ssc.addStreamingListener(collector)
runStreams(ssc, input.size, input.size)
- val batchInfos = collector.batchInfos
- batchInfos should have size 4
- batchInfos.foreach(info => {
+ // SPARK-6766: batch info should be submitted
+ val batchInfosSubmitted = collector.batchInfosSubmitted
+ batchInfosSubmitted should have size 4
+
+ batchInfosSubmitted.foreach(info => {
+ info.schedulingDelay should be (None)
+ info.processingDelay should be (None)
+ info.totalDelay should be (None)
+ })
+
+ isInIncreasingOrder(batchInfosSubmitted.map(_.submissionTime)) should be (true)
+
+ // SPARK-6766: processingStartTime of batch info should not be None when starting
+ val batchInfosStarted = collector.batchInfosStarted
+ batchInfosStarted should have size 4
+
+ batchInfosStarted.foreach(info => {
+ info.schedulingDelay should not be None
+ info.schedulingDelay.get should be >= 0L
+ info.processingDelay should be (None)
+ info.totalDelay should be (None)
+ })
+
+ isInIncreasingOrder(batchInfosStarted.map(_.submissionTime)) should be (true)
+ isInIncreasingOrder(batchInfosStarted.map(_.processingStartTime.get)) should be (true)
+
+ // test onBatchCompleted
+ val batchInfosCompleted = collector.batchInfosCompleted
+ batchInfosCompleted should have size 4
+
+ batchInfosCompleted.foreach(info => {
info.schedulingDelay should not be None
info.processingDelay should not be None
info.totalDelay should not be None
@@ -58,9 +86,9 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
info.totalDelay.get should be >= 0L
})
- isInIncreasingOrder(batchInfos.map(_.submissionTime)) should be (true)
- isInIncreasingOrder(batchInfos.map(_.processingStartTime.get)) should be (true)
- isInIncreasingOrder(batchInfos.map(_.processingEndTime.get)) should be (true)
+ isInIncreasingOrder(batchInfosCompleted.map(_.submissionTime)) should be (true)
+ isInIncreasingOrder(batchInfosCompleted.map(_.processingStartTime.get)) should be (true)
+ isInIncreasingOrder(batchInfosCompleted.map(_.processingEndTime.get)) should be (true)
}
test("receiver info reporting") {
@@ -99,9 +127,20 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
/** Listener that collects information on processed batches */
class BatchInfoCollector extends StreamingListener {
- val batchInfos = new ArrayBuffer[BatchInfo]
+ val batchInfosCompleted = new ArrayBuffer[BatchInfo]
+ val batchInfosStarted = new ArrayBuffer[BatchInfo]
+ val batchInfosSubmitted = new ArrayBuffer[BatchInfo]
+
+ override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) {
+ batchInfosSubmitted += batchSubmitted.batchInfo
+ }
+
+ override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) {
+ batchInfosStarted += batchStarted.batchInfo
+ }
+
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
- batchInfos += batchCompleted.batchInfo
+ batchInfosCompleted += batchCompleted.batchInfo
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
new file mode 100644
index 0000000000..2b9d164500
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.ui
+
+import org.scalatest.Matchers
+
+import org.apache.spark.streaming.dstream.DStream
+import org.apache.spark.streaming.scheduler._
+import org.apache.spark.streaming.{Time, Milliseconds, TestSuiteBase}
+
+class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
+
+ val input = (1 to 4).map(Seq(_)).toSeq
+ val operation = (d: DStream[Int]) => d.map(x => x)
+
+ override def batchDuration = Milliseconds(100)
+
+ test("onBatchSubmitted, onBatchStarted, onBatchCompleted, " +
+ "onReceiverStarted, onReceiverError, onReceiverStopped") {
+ val ssc = setupStreams(input, operation)
+ val listener = new StreamingJobProgressListener(ssc)
+
+ val receivedBlockInfo = Map(
+ 0 -> Array(ReceivedBlockInfo(0, 100, null), ReceivedBlockInfo(0, 200, null)),
+ 1 -> Array(ReceivedBlockInfo(1, 300, null))
+ )
+
+ // onBatchSubmitted
+ val batchInfoSubmitted = BatchInfo(Time(1000), receivedBlockInfo, 1000, None, None)
+ listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))
+ listener.waitingBatches should be (List(batchInfoSubmitted))
+ listener.runningBatches should be (Nil)
+ listener.retainedCompletedBatches should be (Nil)
+ listener.lastCompletedBatch should be (None)
+ listener.numUnprocessedBatches should be (1)
+ listener.numTotalCompletedBatches should be (0)
+ listener.numTotalProcessedRecords should be (0)
+ listener.numTotalReceivedRecords should be (0)
+
+ // onBatchStarted
+ val batchInfoStarted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None)
+ listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted))
+ listener.waitingBatches should be (Nil)
+ listener.runningBatches should be (List(batchInfoStarted))
+ listener.retainedCompletedBatches should be (Nil)
+ listener.lastCompletedBatch should be (None)
+ listener.numUnprocessedBatches should be (1)
+ listener.numTotalCompletedBatches should be (0)
+ listener.numTotalProcessedRecords should be (0)
+ listener.numTotalReceivedRecords should be (600)
+
+ // onBatchCompleted
+ val batchInfoCompleted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None)
+ listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
+ listener.waitingBatches should be (Nil)
+ listener.runningBatches should be (Nil)
+ listener.retainedCompletedBatches should be (List(batchInfoCompleted))
+ listener.lastCompletedBatch should be (Some(batchInfoCompleted))
+ listener.numUnprocessedBatches should be (0)
+ listener.numTotalCompletedBatches should be (1)
+ listener.numTotalProcessedRecords should be (600)
+ listener.numTotalReceivedRecords should be (600)
+
+ // onReceiverStarted
+ val receiverInfoStarted = ReceiverInfo(0, "test", null, true, "localhost")
+ listener.onReceiverStarted(StreamingListenerReceiverStarted(receiverInfoStarted))
+ listener.receiverInfo(0) should be (Some(receiverInfoStarted))
+ listener.receiverInfo(1) should be (None)
+
+ // onReceiverError
+ val receiverInfoError = ReceiverInfo(1, "test", null, true, "localhost")
+ listener.onReceiverError(StreamingListenerReceiverError(receiverInfoError))
+ listener.receiverInfo(0) should be (Some(receiverInfoStarted))
+ listener.receiverInfo(1) should be (Some(receiverInfoError))
+ listener.receiverInfo(2) should be (None)
+
+ // onReceiverStopped
+ val receiverInfoStopped = ReceiverInfo(2, "test", null, true, "localhost")
+ listener.onReceiverStopped(StreamingListenerReceiverStopped(receiverInfoStopped))
+ listener.receiverInfo(0) should be (Some(receiverInfoStarted))
+ listener.receiverInfo(1) should be (Some(receiverInfoError))
+ listener.receiverInfo(2) should be (Some(receiverInfoStopped))
+ listener.receiverInfo(3) should be (None)
+ }
+
+ test("Remove the old completed batches when exceeding the limit") {
+ val ssc = setupStreams(input, operation)
+ val limit = ssc.conf.getInt("spark.streaming.ui.retainedBatches", 100)
+ val listener = new StreamingJobProgressListener(ssc)
+
+ val receivedBlockInfo = Map(
+ 0 -> Array(ReceivedBlockInfo(0, 100, null), ReceivedBlockInfo(0, 200, null)),
+ 1 -> Array(ReceivedBlockInfo(1, 300, null))
+ )
+ val batchInfoCompleted = BatchInfo(Time(1000), receivedBlockInfo, 1000, Some(2000), None)
+
+ for(_ <- 0 until (limit + 10)) {
+ listener.onBatchCompleted(StreamingListenerBatchCompleted(batchInfoCompleted))
+ }
+
+ listener.retainedCompletedBatches.size should be (limit)
+ listener.numTotalCompletedBatches should be(limit + 10)
+ }
+}