diff options
Diffstat (limited to 'streaming/src')
7 files changed, 125 insertions, 9 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala index ebbcb6b778..de79c9ef1a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/DStreamGraph.scala @@ -111,7 +111,11 @@ final private[streaming] class DStreamGraph extends Serializable with Logging { def generateJobs(time: Time): Seq[Job] = { logDebug("Generating jobs for time " + time) val jobs = this.synchronized { - outputStreams.flatMap(outputStream => outputStream.generateJob(time)) + outputStreams.flatMap { outputStream => + val jobOption = outputStream.generateJob(time) + jobOption.foreach(_.setCallSite(outputStream.creationSite.longForm)) + jobOption + } } logDebug("Generated " + jobs.length + " jobs for time " + time) jobs diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala index 3c481bf349..1373053f06 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/Job.scala @@ -29,6 +29,7 @@ class Job(val time: Time, func: () => _) { private var _outputOpId: Int = _ private var isSet = false private var _result: Try[_] = null + private var _callSite: String = "Unknown" def run() { _result = Try(func()) @@ -70,5 +71,11 @@ class Job(val time: Time, func: () => _) { _outputOpId = outputOpId } + def setCallSite(callSite: String): Unit = { + _callSite = callSite + } + + def callSite: String = _callSite + override def toString: String = id } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala index 66afbf1b11..0a4a396a0f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala @@ -30,8 +30,8 @@ import org.apache.spark.util.{EventLoop, ThreadUtils} private[scheduler] sealed trait JobSchedulerEvent -private[scheduler] case class JobStarted(job: Job) extends JobSchedulerEvent -private[scheduler] case class JobCompleted(job: Job) extends JobSchedulerEvent +private[scheduler] case class JobStarted(job: Job, startTime: Long) extends JobSchedulerEvent +private[scheduler] case class JobCompleted(job: Job, completedTime: Long) extends JobSchedulerEvent private[scheduler] case class ErrorReported(msg: String, e: Throwable) extends JobSchedulerEvent /** @@ -143,8 +143,8 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { private def processEvent(event: JobSchedulerEvent) { try { event match { - case JobStarted(job) => handleJobStart(job) - case JobCompleted(job) => handleJobCompletion(job) + case JobStarted(job, startTime) => handleJobStart(job, startTime) + case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime) case ErrorReported(m, e) => handleError(m, e) } } catch { @@ -153,7 +153,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { } } - private def handleJobStart(job: Job) { + private def handleJobStart(job: Job, startTime: Long) { val jobSet = jobSets.get(job.time) val isFirstJobOfJobSet = !jobSet.hasStarted jobSet.handleJobStart(job) @@ -162,12 +162,16 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { // correct "jobSet.processingStartTime". listenerBus.post(StreamingListenerBatchStarted(jobSet.toBatchInfo)) } + listenerBus.post(StreamingListenerOutputOperationStarted( + OutputOperationInfo(job.time, job.outputOpId, job.callSite, Some(startTime), None))) logInfo("Starting job " + job.id + " from job set of time " + jobSet.time) } - private def handleJobCompletion(job: Job) { + private def handleJobCompletion(job: Job, completedTime: Long) { val jobSet = jobSets.get(job.time) jobSet.handleJobCompletion(job) + listenerBus.post(StreamingListenerOutputOperationCompleted( + OutputOperationInfo(job.time, job.outputOpId, job.callSite, None, Some(completedTime)))) logInfo("Finished job " + job.id + " from job set of time " + jobSet.time) if (jobSet.hasCompleted) { jobSets.remove(jobSet.time) @@ -210,7 +214,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { // it's possible that when `post` is called, `eventLoop` happens to null. var _eventLoop = eventLoop if (_eventLoop != null) { - _eventLoop.post(JobStarted(job)) + _eventLoop.post(JobStarted(job, clock.getTimeMillis())) // Disable checks for existing output directories in jobs launched by the streaming // scheduler, since we may need to write output to an existing directory during checkpoint // recovery; see SPARK-4835 for more details. @@ -219,7 +223,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging { } _eventLoop = eventLoop if (_eventLoop != null) { - _eventLoop.post(JobCompleted(job)) + _eventLoop.post(JobCompleted(job, clock.getTimeMillis())) } } else { // JobScheduler has been stopped. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/OutputOperationInfo.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/OutputOperationInfo.scala new file mode 100644 index 0000000000..d5614b3439 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/OutputOperationInfo.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.scheduler + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.streaming.Time + +/** + * :: DeveloperApi :: + * Class having information on output operations. + * @param batchTime Time of the batch + * @param id Id of this output operation. Different output operations have different ids in a batch. + * @param description The description of this output operation. + * @param startTime Clock time of when the output operation started processing + * @param endTime Clock time of when the output operation started processing + */ +@DeveloperApi +case class OutputOperationInfo( + batchTime: Time, + id: Int, + description: String, + startTime: Option[Long], + endTime: Option[Long]) { + + /** + * Return the duration of this output operation. + */ + def duration: Option[Long] = for (s <- startTime; e <- endTime) yield e - s +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala index 74dbba453f..d19bdbb443 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala @@ -39,6 +39,14 @@ case class StreamingListenerBatchCompleted(batchInfo: BatchInfo) extends Streami case class StreamingListenerBatchStarted(batchInfo: BatchInfo) extends StreamingListenerEvent @DeveloperApi +case class StreamingListenerOutputOperationStarted(outputOperationInfo: OutputOperationInfo) + extends StreamingListenerEvent + +@DeveloperApi +case class StreamingListenerOutputOperationCompleted(outputOperationInfo: OutputOperationInfo) + extends StreamingListenerEvent + +@DeveloperApi case class StreamingListenerReceiverStarted(receiverInfo: ReceiverInfo) extends StreamingListenerEvent @@ -75,6 +83,14 @@ trait StreamingListener { /** Called when processing of a batch of jobs has completed. */ def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { } + + /** Called when processing of a job of a batch has started. */ + def onOutputOperationStarted( + outputOperationStarted: StreamingListenerOutputOperationStarted) { } + + /** Called when processing of a job of a batch has completed. */ + def onOutputOperationCompleted( + outputOperationCompleted: StreamingListenerOutputOperationCompleted) { } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala index b07d6cf347..ca111bb636 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala @@ -43,6 +43,10 @@ private[spark] class StreamingListenerBus listener.onBatchStarted(batchStarted) case batchCompleted: StreamingListenerBatchCompleted => listener.onBatchCompleted(batchCompleted) + case outputOperationStarted: StreamingListenerOutputOperationStarted => + listener.onOutputOperationStarted(outputOperationStarted) + case outputOperationCompleted: StreamingListenerOutputOperationCompleted => + listener.onOutputOperationCompleted(outputOperationCompleted) case _ => } } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala index d8fd2ced3b..2b43b74670 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala @@ -140,6 +140,27 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers { } } + test("output operation reporting") { + ssc = new StreamingContext("local[2]", "test", Milliseconds(1000)) + val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver) + inputStream.foreachRDD(_.count()) + inputStream.foreachRDD(_.collect()) + inputStream.foreachRDD(_.count()) + + val collector = new OutputOperationInfoCollector + ssc.addStreamingListener(collector) + + ssc.start() + try { + eventually(timeout(30 seconds), interval(20 millis)) { + collector.startedOutputOperationIds.take(3) should be (Seq(0, 1, 2)) + collector.completedOutputOperationIds.take(3) should be (Seq(0, 1, 2)) + } + } finally { + ssc.stop() + } + } + test("onBatchCompleted with successful batch") { ssc = new StreamingContext("local[2]", "test", Milliseconds(1000)) val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver) @@ -254,6 +275,22 @@ class ReceiverInfoCollector extends StreamingListener { } } +/** Listener that collects information on processed output operations */ +class OutputOperationInfoCollector extends StreamingListener { + val startedOutputOperationIds = new ArrayBuffer[Int] with SynchronizedBuffer[Int] + val completedOutputOperationIds = new ArrayBuffer[Int] with SynchronizedBuffer[Int] + + override def onOutputOperationStarted( + outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = { + startedOutputOperationIds += outputOperationStarted.outputOperationInfo.id + } + + override def onOutputOperationCompleted( + outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = { + completedOutputOperationIds += outputOperationCompleted.outputOperationInfo.id + } +} + class StreamingListenerSuiteReceiver extends Receiver[Any](StorageLevel.MEMORY_ONLY) with Logging { def onStart() { Future { |