aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-10-05 19:23:41 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-10-05 19:23:41 -0700
commitbe7c5ff1ad02ce1c03113c98656a4e0c0c3cee83 (patch)
treecd8bd1cd45582b54f71d04ae06684d0bf6a65ddc /streaming
parenta609eb20d964a7b92f1066300443415f6db181e3 (diff)
downloadspark-be7c5ff1ad02ce1c03113c98656a4e0c0c3cee83.tar.gz
spark-be7c5ff1ad02ce1c03113c98656a4e0c0c3cee83.tar.bz2
spark-be7c5ff1ad02ce1c03113c98656a4e0c0c3cee83.zip
[SPARK-10900] [STREAMING] Add output operation events to StreamingListener
Add output operation events to StreamingListener so as to implement the following UI features: 1. Progress bar of a batch in the batch list. 2. Be able to display output operation `description` and `duration` when there is no spark job in a Streaming job. Author: zsxwing <zsxwing@gmail.com> Closes #8958 from zsxwing/output-operation-events.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala7
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala20
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/OutputOperationInfo.scala44
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala16
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala37
7 files changed, 125 insertions, 9 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
index ebbcb6b778..de79c9ef1a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala
@@ -111,7 +111,11 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
def generateJobs(time: Time): Seq[Job] = {
logDebug("Generating jobs for time " + time)
val jobs = this.synchronized {
- outputStreams.flatMap(outputStream => outputStream.generateJob(time))
+ outputStreams.flatMap { outputStream =>
+ val jobOption = outputStream.generateJob(time)
+ jobOption.foreach(_.setCallSite(outputStream.creationSite.longForm))
+ jobOption
+ }
}
logDebug("Generated " + jobs.length + " jobs for time " + time)
jobs
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
index 3c481bf349..1373053f06 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala
@@ -29,6 +29,7 @@ class Job(val time: Time, func: () => _) {
private var _outputOpId: Int = _
private var isSet = false
private var _result: Try[_] = null
+ private var _callSite: String = "Unknown"
def run() {
_result = Try(func())
@@ -70,5 +71,11 @@ class Job(val time: Time, func: () => _) {
_outputOpId = outputOpId
}
+ def setCallSite(callSite: String): Unit = {
+ _callSite = callSite
+ }
+
+ def callSite: String = _callSite
+
override def toString: String = id
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
index 66afbf1b11..0a4a396a0f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala
@@ -30,8 +30,8 @@ import org.apache.spark.util.{EventLoop, ThreadUtils}
private[scheduler] sealed trait JobSchedulerEvent
-private[scheduler] case class JobStarted(job: Job) extends JobSchedulerEvent
-private[scheduler] case class JobCompleted(job: Job) extends JobSchedulerEvent
+private[scheduler] case class JobStarted(job: Job, startTime: Long) extends JobSchedulerEvent
+private[scheduler] case class JobCompleted(job: Job, completedTime: Long) extends JobSchedulerEvent
private[scheduler] case class ErrorReported(msg: String, e: Throwable) extends JobSchedulerEvent
/**
@@ -143,8 +143,8 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
private def processEvent(event: JobSchedulerEvent) {
try {
event match {
- case JobStarted(job) => handleJobStart(job)
- case JobCompleted(job) => handleJobCompletion(job)
+ case JobStarted(job, startTime) => handleJobStart(job, startTime)
+ case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)
case ErrorReported(m, e) => handleError(m, e)
}
} catch {
@@ -153,7 +153,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
}
}
- private def handleJobStart(job: Job) {
+ private def handleJobStart(job: Job, startTime: Long) {
val jobSet = jobSets.get(job.time)
val isFirstJobOfJobSet = !jobSet.hasStarted
jobSet.handleJobStart(job)
@@ -162,12 +162,16 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
// correct "jobSet.processingStartTime".
listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo))
}
+ listenerBus.post(StreamingListenerOutputOperationStarted(
+ OutputOperationInfo(job.time, job.outputOpId, job.callSite, Some(startTime), None)))
logInfo("Starting job " + job.id + " from job set of time " + jobSet.time)
}
- private def handleJobCompletion(job: Job) {
+ private def handleJobCompletion(job: Job, completedTime: Long) {
val jobSet = jobSets.get(job.time)
jobSet.handleJobCompletion(job)
+ listenerBus.post(StreamingListenerOutputOperationCompleted(
+ OutputOperationInfo(job.time, job.outputOpId, job.callSite, None, Some(completedTime))))
logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
if (jobSet.hasCompleted) {
jobSets.remove(jobSet.time)
@@ -210,7 +214,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
// it's possible that when `post` is called, `eventLoop` happens to null.
var _eventLoop = eventLoop
if (_eventLoop != null) {
- _eventLoop.post(JobStarted(job))
+ _eventLoop.post(JobStarted(job, clock.getTimeMillis()))
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// recovery; see SPARK-4835 for more details.
@@ -219,7 +223,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
}
_eventLoop = eventLoop
if (_eventLoop != null) {
- _eventLoop.post(JobCompleted(job))
+ _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
}
} else {
// JobScheduler has been stopped.
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/OutputOperationInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/OutputOperationInfo.scala
new file mode 100644
index 0000000000..d5614b3439
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/OutputOperationInfo.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.scheduler
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.streaming.Time
+
+/**
+ * :: DeveloperApi ::
+ * Class having information on output operations.
+ * @param batchTime Time of the batch
+ * @param id Id of this output operation. Different output operations have different ids in a batch.
+ * @param description The description of this output operation.
+ * @param startTime Clock time of when the output operation started processing
+ * @param endTime Clock time of when the output operation started processing
+ */
+@DeveloperApi
+case class OutputOperationInfo(
+ batchTime: Time,
+ id: Int,
+ description: String,
+ startTime: Option[Long],
+ endTime: Option[Long]) {
+
+ /**
+ * Return the duration of this output operation.
+ */
+ def duration: Option[Long] = for (s <- startTime; e <- endTime) yield e - s
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
index 74dbba453f..d19bdbb443 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
@@ -39,6 +39,14 @@ case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends Streami
case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent
@DeveloperApi
+case class StreamingListenerOutputOperationStarted(outputOperationInfo: OutputOperationInfo)
+ extends StreamingListenerEvent
+
+@DeveloperApi
+case class StreamingListenerOutputOperationCompleted(outputOperationInfo: OutputOperationInfo)
+ extends StreamingListenerEvent
+
+@DeveloperApi
case class StreamingListenerReceiverStarted(receiverInfo: ReceiverInfo)
extends StreamingListenerEvent
@@ -75,6 +83,14 @@ trait StreamingListener {
/** Called when processing of a batch of jobs has completed. */
def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { }
+
+ /** Called when processing of a job of a batch has started. */
+ def onOutputOperationStarted(
+ outputOperationStarted: StreamingListenerOutputOperationStarted) { }
+
+ /** Called when processing of a job of a batch has completed. */
+ def onOutputOperationCompleted(
+ outputOperationCompleted: StreamingListenerOutputOperationCompleted) { }
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
index b07d6cf347..ca111bb636 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
@@ -43,6 +43,10 @@ private[spark] class StreamingListenerBus
listener.onBatchStarted(batchStarted)
case batchCompleted: StreamingListenerBatchCompleted =>
listener.onBatchCompleted(batchCompleted)
+ case outputOperationStarted: StreamingListenerOutputOperationStarted =>
+ listener.onOutputOperationStarted(outputOperationStarted)
+ case outputOperationCompleted: StreamingListenerOutputOperationCompleted =>
+ listener.onOutputOperationCompleted(outputOperationCompleted)
case _ =>
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index d8fd2ced3b..2b43b74670 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -140,6 +140,27 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
}
}
+ test("output operation reporting") {
+ ssc = new StreamingContext("local[2]", "test", Milliseconds(1000))
+ val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver)
+ inputStream.foreachRDD(_.count())
+ inputStream.foreachRDD(_.collect())
+ inputStream.foreachRDD(_.count())
+
+ val collector = new OutputOperationInfoCollector
+ ssc.addStreamingListener(collector)
+
+ ssc.start()
+ try {
+ eventually(timeout(30 seconds), interval(20 millis)) {
+ collector.startedOutputOperationIds.take(3) should be (Seq(0, 1, 2))
+ collector.completedOutputOperationIds.take(3) should be (Seq(0, 1, 2))
+ }
+ } finally {
+ ssc.stop()
+ }
+ }
+
test("onBatchCompleted with successful batch") {
ssc = new StreamingContext("local[2]", "test", Milliseconds(1000))
val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver)
@@ -254,6 +275,22 @@ class ReceiverInfoCollector extends StreamingListener {
}
}
+/** Listener that collects information on processed output operations */
+class OutputOperationInfoCollector extends StreamingListener {
+ val startedOutputOperationIds = new ArrayBuffer[Int] with SynchronizedBuffer[Int]
+ val completedOutputOperationIds = new ArrayBuffer[Int] with SynchronizedBuffer[Int]
+
+ override def onOutputOperationStarted(
+ outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = {
+ startedOutputOperationIds += outputOperationStarted.outputOperationInfo.id
+ }
+
+ override def onOutputOperationCompleted(
+ outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = {
+ completedOutputOperationIds += outputOperationCompleted.outputOperationInfo.id
+ }
+}
+
class StreamingListenerSuiteReceiver extends Receiver[Any](StorageLevel.MEMORY_ONLY) with Logging {
def onStart() {
Future {