aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorjerryshao <saisai.shao@intel.com>2014-04-24 18:56:57 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2014-04-24 18:56:57 -0700
commit80429f3e2ab786d103297652922c3d8da3cf5a01 (patch)
treeda65981b7ee2f62109d1e39e4789b0384b88db67 /streaming
parent44da5ab2dea6dcf1e13d624784741141883870bb (diff)
downloadspark-80429f3e2ab786d103297652922c3d8da3cf5a01.tar.gz
spark-80429f3e2ab786d103297652922c3d8da3cf5a01.tar.bz2
spark-80429f3e2ab786d103297652922c3d8da3cf5a01.zip
[SPARK-1510] Spark Streaming metrics source for metrics system
This pulls in changes made by @jerryshao in https://github.com/apache/spark/pull/424 and merges with the master. Author: jerryshao <saisai.shao@intel.com> Author: Tathagata Das <tathagata.das1565@gmail.com> Closes #545 from tdas/streaming-metrics and squashes the following commits: 034b443 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into streaming-metrics fb3b0a5 [jerryshao] Modify according master update 21939f5 [jerryshao] Style changes according to style check error 976116b [jerryshao] Add StreamSource in StreamingContext for better monitoring through metrics system
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala73
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala3
3 files changed, 79 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 1c89543058..e0677b795c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -154,6 +154,10 @@ class StreamingContext private[streaming] (
private[streaming] val uiTab = new StreamingTab(this)
+ /** Register streaming source to metrics system */
+ private val streamingSource = new StreamingSource(this)
+ SparkEnv.get.metricsSystem.registerSource(streamingSource)
+
/** Enumeration to identify current state of the StreamingContext */
private[streaming] object StreamingContextState extends Enumeration {
type CheckpointState = Value
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
new file mode 100644
index 0000000000..774adc3c23
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingSource.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.metrics.source.Source
+import org.apache.spark.streaming.ui.StreamingJobProgressListener
+
+private[streaming] class StreamingSource(ssc: StreamingContext) extends Source {
+ val metricRegistry = new MetricRegistry
+ val sourceName = "%s.StreamingMetrics".format(ssc.sparkContext.appName)
+
+ val streamingListener = ssc.uiTab.listener
+
+ private def registerGauge[T](name: String, f: StreamingJobProgressListener => T,
+ defaultValue: T) {
+ metricRegistry.register(MetricRegistry.name("streaming", name), new Gauge[T] {
+ override def getValue: T = Option(f(streamingListener)).getOrElse(defaultValue)
+ })
+ }
+
+ // Gauge for number of network receivers
+ registerGauge("receivers", _.numReceivers, 0)
+
+ // Gauge for number of total completed batches
+ registerGauge("totalCompletedBatches", _.numTotalCompletedBatches, 0L)
+
+ // Gauge for number of unprocessed batches
+ registerGauge("unprocessedBatches", _.numUnprocessedBatches, 0L)
+
+ // Gauge for number of waiting batches
+ registerGauge("waitingBatches", _.waitingBatches.size, 0L)
+
+ // Gauge for number of running batches
+ registerGauge("runningBatches", _.runningBatches.size, 0L)
+
+ // Gauge for number of retained completed batches
+ registerGauge("retainedCompletedBatches", _.retainedCompletedBatches.size, 0L)
+
+ // Gauge for last completed batch, useful for monitoring the streaming job's running status,
+ // displayed data -1 for any abnormal condition.
+ registerGauge("lastCompletedBatch_submissionTime",
+ _.lastCompletedBatch.map(_.submissionTime).getOrElse(-1L), -1L)
+ registerGauge("lastCompletedBatch_processStartTime",
+ _.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L)
+ registerGauge("lastCompletedBatch_processEndTime",
+ _.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L)
+
+ // Gauge for last received batch, useful for monitoring the streaming job's running status,
+ // displayed data -1 for any abnormal condition.
+ registerGauge("lastReceivedBatch_submissionTime",
+ _.lastCompletedBatch.map(_.submissionTime).getOrElse(-1L), -1L)
+ registerGauge("lastReceivedBatch_processStartTime",
+ _.lastCompletedBatch.flatMap(_.processingStartTime).getOrElse(-1L), -1L)
+ registerGauge("lastReceivedBatch_processEndTime",
+ _.lastCompletedBatch.flatMap(_.processingEndTime).getOrElse(-1L), -1L)
+}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index bf637c1446..14c33c728b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -28,7 +28,8 @@ import org.apache.spark.streaming.scheduler.StreamingListenerBatchSubmitted
import org.apache.spark.util.Distribution
-private[ui] class StreamingJobProgressListener(ssc: StreamingContext) extends StreamingListener {
+private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
+ extends StreamingListener {
private val waitingBatchInfos = new HashMap[Time, BatchInfo]
private val runningBatchInfos = new HashMap[Time, BatchInfo]