aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorsaturday_s <shi.indetail@gmail.com>2016-12-22 12:51:37 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2016-12-22 12:51:37 -0800
commitce99f51d2e8cbcb1565c9e7a729183bd74a0c2bc (patch)
tree95c16fa2220409a658202776d9d70a2ed849d636 /streaming
parent31da755c80aed8219c368fd18c72b42e50be46fc (diff)
downloadspark-ce99f51d2e8cbcb1565c9e7a729183bd74a0c2bc.tar.gz
spark-ce99f51d2e8cbcb1565c9e7a729183bd74a0c2bc.tar.bz2
spark-ce99f51d2e8cbcb1565c9e7a729183bd74a0c2bc.zip
[SPARK-18537][WEB UI] Add a REST api to serve spark streaming information
## What changes were proposed in this pull request? This PR is an inheritance from #16000, and is a completion of #15904. **Description** - Augment the `org.apache.spark.status.api.v1` package for serving streaming information. - Retrieve the streaming information through StreamingJobProgressListener. > this api should cover exceptly the same amount of information as you can get from the web interface > the implementation is base on the current REST implementation of spark-core > and will be available for running applications only > > https://issues.apache.org/jira/browse/SPARK-18537 ## How was this patch tested? Local test. Author: saturday_s <shi.indetail@gmail.com> Author: Chan Chor Pang <ChorPang.Chan@access-company.com> Author: peterCPChan <universknight@gmail.com> Closes #16253 from saturday-shi/SPARK-18537.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/java/org/apache/spark/status/api/v1/streaming/BatchStatus.java30
-rw-r--r--streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllBatchesResource.scala78
-rw-r--r--streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllOutputOperationsResource.scala66
-rw-r--r--streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllReceiversResource.scala76
-rw-r--r--streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingApp.scala42
-rw-r--r--streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingRootResource.scala74
-rw-r--r--streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneBatchResource.scala35
-rw-r--r--streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneOutputOperationResource.scala39
-rw-r--r--streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneReceiverResource.scala35
-rw-r--r--streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/StreamingStatisticsResource.scala64
-rw-r--r--streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/api.scala75
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala14
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala6
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala12
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala3
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala1
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java5
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala9
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala4
22 files changed, 676 insertions, 4 deletions
diff --git a/streaming/src/main/java/org/apache/spark/status/api/v1/streaming/BatchStatus.java b/streaming/src/main/java/org/apache/spark/status/api/v1/streaming/BatchStatus.java
new file mode 100644
index 0000000000..1bbca5a225
--- /dev/null
+++ b/streaming/src/main/java/org/apache/spark/status/api/v1/streaming/BatchStatus.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.api.v1.streaming;
+
+import org.apache.spark.util.EnumUtil;
+
+public enum BatchStatus {
+ COMPLETED,
+ QUEUED,
+ PROCESSING;
+
+ public static BatchStatus fromString(String str) {
+ return EnumUtil.parseIgnoreCase(BatchStatus.class, str);
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllBatchesResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllBatchesResource.scala
new file mode 100644
index 0000000000..3a51ae6093
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllBatchesResource.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.api.v1.streaming
+
+import java.util.{ArrayList => JArrayList, Arrays => JArrays, Date, List => JList}
+import javax.ws.rs.{GET, Produces, QueryParam}
+import javax.ws.rs.core.MediaType
+
+import org.apache.spark.status.api.v1.streaming.AllBatchesResource._
+import org.apache.spark.streaming.ui.StreamingJobProgressListener
+
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class AllBatchesResource(listener: StreamingJobProgressListener) {
+
+ @GET
+ def batchesList(@QueryParam("status") statusParams: JList[BatchStatus]): Seq[BatchInfo] = {
+ batchInfoList(listener, statusParams).sortBy(- _.batchId)
+ }
+}
+
+private[v1] object AllBatchesResource {
+
+ def batchInfoList(
+ listener: StreamingJobProgressListener,
+ statusParams: JList[BatchStatus] = new JArrayList[BatchStatus]()): Seq[BatchInfo] = {
+
+ listener.synchronized {
+ val statuses =
+ if (statusParams.isEmpty) JArrays.asList(BatchStatus.values(): _*) else statusParams
+ val statusToBatches = Seq(
+ BatchStatus.COMPLETED -> listener.retainedCompletedBatches,
+ BatchStatus.QUEUED -> listener.waitingBatches,
+ BatchStatus.PROCESSING -> listener.runningBatches
+ )
+
+ val batchInfos = for {
+ (status, batches) <- statusToBatches
+ batch <- batches if statuses.contains(status)
+ } yield {
+ val batchId = batch.batchTime.milliseconds
+ val firstFailureReason = batch.outputOperations.flatMap(_._2.failureReason).headOption
+
+ new BatchInfo(
+ batchId = batchId,
+ batchTime = new Date(batchId),
+ status = status.toString,
+ batchDuration = listener.batchDuration,
+ inputSize = batch.numRecords,
+ schedulingDelay = batch.schedulingDelay,
+ processingTime = batch.processingDelay,
+ totalDelay = batch.totalDelay,
+ numActiveOutputOps = batch.numActiveOutputOp,
+ numCompletedOutputOps = batch.numCompletedOutputOp,
+ numFailedOutputOps = batch.numFailedOutputOp,
+ numTotalOutputOps = batch.outputOperations.size,
+ firstFailureReason = firstFailureReason
+ )
+ }
+
+ batchInfos
+ }
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllOutputOperationsResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllOutputOperationsResource.scala
new file mode 100644
index 0000000000..0eb649f0e1
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllOutputOperationsResource.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.api.v1.streaming
+
+import java.util.Date
+import javax.ws.rs.{GET, PathParam, Produces}
+import javax.ws.rs.core.MediaType
+
+import org.apache.spark.status.api.v1.NotFoundException
+import org.apache.spark.status.api.v1.streaming.AllOutputOperationsResource._
+import org.apache.spark.streaming.Time
+import org.apache.spark.streaming.ui.StreamingJobProgressListener
+
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class AllOutputOperationsResource(listener: StreamingJobProgressListener) {
+
+ @GET
+ def operationsList(@PathParam("batchId") batchId: Long): Seq[OutputOperationInfo] = {
+ outputOperationInfoList(listener, batchId).sortBy(_.outputOpId)
+ }
+}
+
+private[v1] object AllOutputOperationsResource {
+
+ def outputOperationInfoList(
+ listener: StreamingJobProgressListener,
+ batchId: Long): Seq[OutputOperationInfo] = {
+
+ listener.synchronized {
+ listener.getBatchUIData(Time(batchId)) match {
+ case Some(batch) =>
+ for ((opId, op) <- batch.outputOperations) yield {
+ val jobIds = batch.outputOpIdSparkJobIdPairs
+ .filter(_.outputOpId == opId).map(_.sparkJobId).toSeq.sorted
+
+ new OutputOperationInfo(
+ outputOpId = opId,
+ name = op.name,
+ description = op.description,
+ startTime = op.startTime.map(new Date(_)),
+ endTime = op.endTime.map(new Date(_)),
+ duration = op.duration,
+ failureReason = op.failureReason,
+ jobIds = jobIds
+ )
+ }
+ case None => throw new NotFoundException("unknown batch: " + batchId)
+ }
+ }.toSeq
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllReceiversResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllReceiversResource.scala
new file mode 100644
index 0000000000..5a276a9236
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/AllReceiversResource.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.api.v1.streaming
+
+import java.util.Date
+import javax.ws.rs.{GET, Produces}
+import javax.ws.rs.core.MediaType
+
+import org.apache.spark.status.api.v1.streaming.AllReceiversResource._
+import org.apache.spark.streaming.ui.StreamingJobProgressListener
+
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class AllReceiversResource(listener: StreamingJobProgressListener) {
+
+ @GET
+ def receiversList(): Seq[ReceiverInfo] = {
+ receiverInfoList(listener).sortBy(_.streamId)
+ }
+}
+
+private[v1] object AllReceiversResource {
+
+ def receiverInfoList(listener: StreamingJobProgressListener): Seq[ReceiverInfo] = {
+ listener.synchronized {
+ listener.receivedRecordRateWithBatchTime.map { case (streamId, eventRates) =>
+
+ val receiverInfo = listener.receiverInfo(streamId)
+ val streamName = receiverInfo.map(_.name)
+ .orElse(listener.streamName(streamId)).getOrElse(s"Stream-$streamId")
+ val avgEventRate =
+ if (eventRates.isEmpty) None else Some(eventRates.map(_._2).sum / eventRates.size)
+
+ val (errorTime, errorMessage, error) = receiverInfo match {
+ case None => (None, None, None)
+ case Some(info) =>
+ val someTime =
+ if (info.lastErrorTime >= 0) Some(new Date(info.lastErrorTime)) else None
+ val someMessage =
+ if (info.lastErrorMessage.length > 0) Some(info.lastErrorMessage) else None
+ val someError =
+ if (info.lastError.length > 0) Some(info.lastError) else None
+
+ (someTime, someMessage, someError)
+ }
+
+ new ReceiverInfo(
+ streamId = streamId,
+ streamName = streamName,
+ isActive = receiverInfo.map(_.active),
+ executorId = receiverInfo.map(_.executorId),
+ executorHost = receiverInfo.map(_.location),
+ lastErrorTime = errorTime,
+ lastErrorMessage = errorMessage,
+ lastError = error,
+ avgEventRate = avgEventRate,
+ eventRates = eventRates
+ )
+ }.toSeq
+ }
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingApp.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingApp.scala
new file mode 100644
index 0000000000..e64830a945
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingApp.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.api.v1.streaming
+
+import javax.ws.rs.{Path, PathParam}
+
+import org.apache.spark.status.api.v1.UIRootFromServletContext
+
+@Path("/v1")
+private[v1] class ApiStreamingApp extends UIRootFromServletContext {
+
+ @Path("applications/{appId}/streaming")
+ def getStreamingRoot(@PathParam("appId") appId: String): ApiStreamingRootResource = {
+ uiRoot.withSparkUI(appId, None) { ui =>
+ new ApiStreamingRootResource(ui)
+ }
+ }
+
+ @Path("applications/{appId}/{attemptId}/streaming")
+ def getStreamingRoot(
+ @PathParam("appId") appId: String,
+ @PathParam("attemptId") attemptId: String): ApiStreamingRootResource = {
+ uiRoot.withSparkUI(appId, Some(attemptId)) { ui =>
+ new ApiStreamingRootResource(ui)
+ }
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingRootResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingRootResource.scala
new file mode 100644
index 0000000000..1ccd586c84
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/ApiStreamingRootResource.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.api.v1.streaming
+
+import javax.ws.rs.Path
+
+import org.apache.spark.status.api.v1.NotFoundException
+import org.apache.spark.streaming.ui.StreamingJobProgressListener
+import org.apache.spark.ui.SparkUI
+
+private[v1] class ApiStreamingRootResource(ui: SparkUI) {
+
+ import org.apache.spark.status.api.v1.streaming.ApiStreamingRootResource._
+
+ @Path("statistics")
+ def getStreamingStatistics(): StreamingStatisticsResource = {
+ new StreamingStatisticsResource(getListener(ui))
+ }
+
+ @Path("receivers")
+ def getReceivers(): AllReceiversResource = {
+ new AllReceiversResource(getListener(ui))
+ }
+
+ @Path("receivers/{streamId: \\d+}")
+ def getReceiver(): OneReceiverResource = {
+ new OneReceiverResource(getListener(ui))
+ }
+
+ @Path("batches")
+ def getBatches(): AllBatchesResource = {
+ new AllBatchesResource(getListener(ui))
+ }
+
+ @Path("batches/{batchId: \\d+}")
+ def getBatch(): OneBatchResource = {
+ new OneBatchResource(getListener(ui))
+ }
+
+ @Path("batches/{batchId: \\d+}/operations")
+ def getOutputOperations(): AllOutputOperationsResource = {
+ new AllOutputOperationsResource(getListener(ui))
+ }
+
+ @Path("batches/{batchId: \\d+}/operations/{outputOpId: \\d+}")
+ def getOutputOperation(): OneOutputOperationResource = {
+ new OneOutputOperationResource(getListener(ui))
+ }
+
+}
+
+private[v1] object ApiStreamingRootResource {
+ def getListener(ui: SparkUI): StreamingJobProgressListener = {
+ ui.getStreamingJobProgressListener match {
+ case Some(listener) => listener.asInstanceOf[StreamingJobProgressListener]
+ case None => throw new NotFoundException("no streaming listener attached to " + ui.getAppName)
+ }
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneBatchResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneBatchResource.scala
new file mode 100644
index 0000000000..d3c689c790
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneBatchResource.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.api.v1.streaming
+
+import javax.ws.rs.{GET, PathParam, Produces}
+import javax.ws.rs.core.MediaType
+
+import org.apache.spark.status.api.v1.NotFoundException
+import org.apache.spark.streaming.ui.StreamingJobProgressListener
+
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class OneBatchResource(listener: StreamingJobProgressListener) {
+
+ @GET
+ def oneBatch(@PathParam("batchId") batchId: Long): BatchInfo = {
+ val someBatch = AllBatchesResource.batchInfoList(listener)
+ .find { _.batchId == batchId }
+ someBatch.getOrElse(throw new NotFoundException("unknown batch: " + batchId))
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneOutputOperationResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneOutputOperationResource.scala
new file mode 100644
index 0000000000..aabcdb29b0
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneOutputOperationResource.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.api.v1.streaming
+
+import javax.ws.rs.{GET, PathParam, Produces}
+import javax.ws.rs.core.MediaType
+
+import org.apache.spark.status.api.v1.NotFoundException
+import org.apache.spark.streaming.ui.StreamingJobProgressListener
+import org.apache.spark.streaming.ui.StreamingJobProgressListener._
+
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class OneOutputOperationResource(listener: StreamingJobProgressListener) {
+
+ @GET
+ def oneOperation(
+ @PathParam("batchId") batchId: Long,
+ @PathParam("outputOpId") opId: OutputOpId): OutputOperationInfo = {
+
+ val someOutputOp = AllOutputOperationsResource.outputOperationInfoList(listener, batchId)
+ .find { _.outputOpId == opId }
+ someOutputOp.getOrElse(throw new NotFoundException("unknown output operation: " + opId))
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneReceiverResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneReceiverResource.scala
new file mode 100644
index 0000000000..c0cc99da3a
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/OneReceiverResource.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.api.v1.streaming
+
+import javax.ws.rs.{GET, PathParam, Produces}
+import javax.ws.rs.core.MediaType
+
+import org.apache.spark.status.api.v1.NotFoundException
+import org.apache.spark.streaming.ui.StreamingJobProgressListener
+
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class OneReceiverResource(listener: StreamingJobProgressListener) {
+
+ @GET
+ def oneReceiver(@PathParam("streamId") streamId: Int): ReceiverInfo = {
+ val someReceiver = AllReceiversResource.receiverInfoList(listener)
+ .find { _.streamId == streamId }
+ someReceiver.getOrElse(throw new NotFoundException("unknown receiver: " + streamId))
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/StreamingStatisticsResource.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/StreamingStatisticsResource.scala
new file mode 100644
index 0000000000..6cff87be59
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/StreamingStatisticsResource.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.api.v1.streaming
+
+import java.util.Date
+import javax.ws.rs.{GET, Produces}
+import javax.ws.rs.core.MediaType
+
+import org.apache.spark.streaming.ui.StreamingJobProgressListener
+
+@Produces(Array(MediaType.APPLICATION_JSON))
+private[v1] class StreamingStatisticsResource(listener: StreamingJobProgressListener) {
+
+ @GET
+ def streamingStatistics(): StreamingStatistics = {
+ listener.synchronized {
+ val batches = listener.retainedBatches
+ val avgInputRate = avgRate(batches.map(_.numRecords * 1000.0 / listener.batchDuration))
+ val avgSchedulingDelay = avgTime(batches.flatMap(_.schedulingDelay))
+ val avgProcessingTime = avgTime(batches.flatMap(_.processingDelay))
+ val avgTotalDelay = avgTime(batches.flatMap(_.totalDelay))
+
+ new StreamingStatistics(
+ startTime = new Date(listener.startTime),
+ batchDuration = listener.batchDuration,
+ numReceivers = listener.numReceivers,
+ numActiveReceivers = listener.numActiveReceivers,
+ numInactiveReceivers = listener.numInactiveReceivers,
+ numTotalCompletedBatches = listener.numTotalCompletedBatches,
+ numRetainedCompletedBatches = listener.retainedCompletedBatches.size,
+ numActiveBatches = listener.numUnprocessedBatches,
+ numProcessedRecords = listener.numTotalProcessedRecords,
+ numReceivedRecords = listener.numTotalReceivedRecords,
+ avgInputRate = avgInputRate,
+ avgSchedulingDelay = avgSchedulingDelay,
+ avgProcessingTime = avgProcessingTime,
+ avgTotalDelay = avgTotalDelay
+ )
+ }
+ }
+
+ private def avgRate(data: Seq[Double]): Option[Double] = {
+ if (data.isEmpty) None else Some(data.sum / data.size)
+ }
+
+ private def avgTime(data: Seq[Long]): Option[Long] = {
+ if (data.isEmpty) None else Some(data.sum / data.size)
+ }
+}
diff --git a/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/api.scala b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/api.scala
new file mode 100644
index 0000000000..403b0eb0b5
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/spark/status/api/v1/streaming/api.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.api.v1.streaming
+
+import java.util.Date
+
+import org.apache.spark.streaming.ui.StreamingJobProgressListener._
+
+class StreamingStatistics private[spark](
+ val startTime: Date,
+ val batchDuration: Long,
+ val numReceivers: Int,
+ val numActiveReceivers: Int,
+ val numInactiveReceivers: Int,
+ val numTotalCompletedBatches: Long,
+ val numRetainedCompletedBatches: Long,
+ val numActiveBatches: Long,
+ val numProcessedRecords: Long,
+ val numReceivedRecords: Long,
+ val avgInputRate: Option[Double],
+ val avgSchedulingDelay: Option[Long],
+ val avgProcessingTime: Option[Long],
+ val avgTotalDelay: Option[Long])
+
+class ReceiverInfo private[spark](
+ val streamId: Int,
+ val streamName: String,
+ val isActive: Option[Boolean],
+ val executorId: Option[String],
+ val executorHost: Option[String],
+ val lastErrorTime: Option[Date],
+ val lastErrorMessage: Option[String],
+ val lastError: Option[String],
+ val avgEventRate: Option[Double],
+ val eventRates: Seq[(Long, Double)])
+
+class BatchInfo private[spark](
+ val batchId: Long,
+ val batchTime: Date,
+ val status: String,
+ val batchDuration: Long,
+ val inputSize: Long,
+ val schedulingDelay: Option[Long],
+ val processingTime: Option[Long],
+ val totalDelay: Option[Long],
+ val numActiveOutputOps: Int,
+ val numCompletedOutputOps: Int,
+ val numFailedOutputOps: Int,
+ val numTotalOutputOps: Int,
+ val firstFailureReason: Option[String])
+
+class OutputOperationInfo private[spark](
+ val outputOpId: OutputOpId,
+ val name: String,
+ val description: String,
+ val startTime: Option[Date],
+ val endTime: Option[Date],
+ val duration: Option[Long],
+ val failureReason: Option[String],
+ val jobIds: Seq[SparkJobId])
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 444261da8d..0a4c141e5b 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -45,7 +45,8 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContextState._
import org.apache.spark.streaming.dstream._
import org.apache.spark.streaming.receiver.Receiver
-import org.apache.spark.streaming.scheduler.{ExecutorAllocationManager, JobScheduler, StreamingListener}
+import org.apache.spark.streaming.scheduler.
+ {ExecutorAllocationManager, JobScheduler, StreamingListener, StreamingListenerStreamingStarted}
import org.apache.spark.streaming.ui.{StreamingJobProgressListener, StreamingTab}
import org.apache.spark.util.{CallSite, ShutdownHookManager, ThreadUtils, Utils}
@@ -583,6 +584,8 @@ class StreamingContext private[streaming] (
scheduler.start()
}
state = StreamingContextState.ACTIVE
+ scheduler.listenerBus.post(
+ StreamingListenerStreamingStarted(System.currentTimeMillis()))
} catch {
case NonFatal(e) =>
logError("Error starting the context, marking it as stopped", e)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
index db0bae9958..28cb86c9f3 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListener.scala
@@ -21,6 +21,9 @@ import org.apache.spark.streaming.Time
private[streaming] trait PythonStreamingListener{
+ /** Called when the streaming has been started */
+ def onStreamingStarted(streamingStarted: JavaStreamingListenerStreamingStarted) { }
+
/** Called when a receiver has been started */
def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted) { }
@@ -51,6 +54,11 @@ private[streaming] trait PythonStreamingListener{
private[streaming] class PythonStreamingListenerWrapper(listener: PythonStreamingListener)
extends JavaStreamingListener {
+ /** Called when the streaming has been started */
+ override def onStreamingStarted(streamingStarted: JavaStreamingListenerStreamingStarted): Unit = {
+ listener.onStreamingStarted(streamingStarted)
+ }
+
/** Called when a receiver has been started */
override def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted): Unit = {
listener.onReceiverStarted(receiverStarted)
@@ -99,6 +107,9 @@ private[streaming] class PythonStreamingListenerWrapper(listener: PythonStreamin
*/
private[streaming] class JavaStreamingListener {
+ /** Called when the streaming has been started */
+ def onStreamingStarted(streamingStarted: JavaStreamingListenerStreamingStarted): Unit = { }
+
/** Called when a receiver has been started */
def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted): Unit = { }
@@ -131,6 +142,9 @@ private[streaming] class JavaStreamingListener {
*/
private[streaming] sealed trait JavaStreamingListenerEvent
+private[streaming] class JavaStreamingListenerStreamingStarted(val time: Long)
+ extends JavaStreamingListenerEvent
+
private[streaming] class JavaStreamingListenerBatchSubmitted(val batchInfo: JavaBatchInfo)
extends JavaStreamingListenerEvent
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala
index b109b9f1cb..ee8370d262 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala
@@ -77,6 +77,11 @@ private[streaming] class JavaStreamingListenerWrapper(javaStreamingListener: Jav
)
}
+ override def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted): Unit = {
+ javaStreamingListener.onStreamingStarted(
+ new JavaStreamingListenerStreamingStarted(streamingStarted.time))
+ }
+
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted): Unit = {
javaStreamingListener.onReceiverStarted(
new JavaStreamingListenerReceiverStarted(toJavaReceiverInfo(receiverStarted.receiverInfo)))
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
index 58fc78d552..b57f9b772f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListener.scala
@@ -30,6 +30,9 @@ import org.apache.spark.util.Distribution
sealed trait StreamingListenerEvent
@DeveloperApi
+case class StreamingListenerStreamingStarted(time: Long) extends StreamingListenerEvent
+
+@DeveloperApi
case class StreamingListenerBatchSubmitted(batchInfo: BatchInfo) extends StreamingListenerEvent
@DeveloperApi
@@ -66,6 +69,9 @@ case class StreamingListenerReceiverStopped(receiverInfo: ReceiverInfo)
@DeveloperApi
trait StreamingListener {
+ /** Called when the streaming has been started */
+ def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted) { }
+
/** Called when a receiver has been started */
def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) { }
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
index 39f6e711a6..5fb0bd057d 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/StreamingListenerBus.scala
@@ -65,6 +65,8 @@ private[streaming] class StreamingListenerBus(sparkListenerBus: LiveListenerBus)
listener.onOutputOperationStarted(outputOperationStarted)
case outputOperationCompleted: StreamingListenerOutputOperationCompleted =>
listener.onOutputOperationCompleted(outputOperationCompleted)
+ case streamingStarted: StreamingListenerStreamingStarted =>
+ listener.onStreamingStarted(streamingStarted)
case _ =>
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index 61f852a0d3..95f582106c 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -27,7 +27,7 @@ import org.apache.spark.scheduler._
import org.apache.spark.streaming.{StreamingContext, Time}
import org.apache.spark.streaming.scheduler._
-private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
+private[spark] class StreamingJobProgressListener(ssc: StreamingContext)
extends SparkListener with StreamingListener {
private val waitingBatchUIData = new HashMap[Time, BatchUIData]
@@ -39,6 +39,8 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
private var totalProcessedRecords = 0L
private val receiverInfos = new HashMap[Int, ReceiverInfo]
+ private var _startTime = -1L
+
// Because onJobStart and onBatchXXX messages are processed in different threads,
// we may not be able to get the corresponding BatchUIData when receiving onJobStart. So here we
// cannot use a map of (Time, BatchUIData).
@@ -66,6 +68,10 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
val batchDuration = ssc.graph.batchDuration.milliseconds
+ override def onStreamingStarted(streamingStarted: StreamingListenerStreamingStarted) {
+ _startTime = streamingStarted.time
+ }
+
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) {
synchronized {
receiverInfos(receiverStarted.receiverInfo.streamId) = receiverStarted.receiverInfo
@@ -152,6 +158,8 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}
}
+ def startTime: Long = _startTime
+
def numReceivers: Int = synchronized {
receiverInfos.size
}
@@ -267,7 +275,7 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}
}
-private[streaming] object StreamingJobProgressListener {
+private[spark] object StreamingJobProgressListener {
type SparkJobId = Int
type OutputOpId = Int
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index 46cd3092e9..7abafd6ba7 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -143,7 +143,8 @@ private[ui] class StreamingPage(parent: StreamingTab)
import StreamingPage._
private val listener = parent.listener
- private val startTime = System.currentTimeMillis()
+
+ private def startTime: Long = listener.startTime
/** Render the page */
def render(request: HttpServletRequest): Seq[Node] = {
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
index c5f8aada3f..9d1b82a634 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
@@ -38,6 +38,7 @@ private[spark] class StreamingTab(val ssc: StreamingContext)
ssc.addStreamingListener(listener)
ssc.sc.addSparkListener(listener)
+ parent.setStreamingJobProgressListener(listener)
attachPage(new StreamingPage(this))
attachPage(new BatchPage(this))
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java
index ff0be820e0..63fd6c4422 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaStreamingListenerAPISuite.java
@@ -23,6 +23,11 @@ import org.apache.spark.streaming.api.java.*;
public class JavaStreamingListenerAPISuite extends JavaStreamingListener {
@Override
+ public void onStreamingStarted(JavaStreamingListenerStreamingStarted streamingStarted) {
+ super.onStreamingStarted(streamingStarted);
+ }
+
+ @Override
public void onReceiverStarted(JavaStreamingListenerReceiverStarted receiverStarted) {
JavaReceiverInfo receiverInfo = receiverStarted.receiverInfo();
receiverInfo.streamId();
diff --git a/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala b/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
index 0295e059f7..cfd4323531 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
+++ b/streaming/src/test/java/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapperSuite.scala
@@ -29,6 +29,10 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
val listener = new TestJavaStreamingListener()
val listenerWrapper = new JavaStreamingListenerWrapper(listener)
+ val streamingStarted = StreamingListenerStreamingStarted(1000L)
+ listenerWrapper.onStreamingStarted(streamingStarted)
+ assert(listener.streamingStarted.time === streamingStarted.time)
+
val receiverStarted = StreamingListenerReceiverStarted(ReceiverInfo(
streamId = 2,
name = "test",
@@ -249,6 +253,7 @@ class JavaStreamingListenerWrapperSuite extends SparkFunSuite {
class TestJavaStreamingListener extends JavaStreamingListener {
+ var streamingStarted: JavaStreamingListenerStreamingStarted = null
var receiverStarted: JavaStreamingListenerReceiverStarted = null
var receiverError: JavaStreamingListenerReceiverError = null
var receiverStopped: JavaStreamingListenerReceiverStopped = null
@@ -258,6 +263,10 @@ class TestJavaStreamingListener extends JavaStreamingListener {
var outputOperationStarted: JavaStreamingListenerOutputOperationStarted = null
var outputOperationCompleted: JavaStreamingListenerOutputOperationCompleted = null
+ override def onStreamingStarted(streamingStarted: JavaStreamingListenerStreamingStarted): Unit = {
+ this.streamingStarted = streamingStarted
+ }
+
override def onReceiverStarted(receiverStarted: JavaStreamingListenerReceiverStarted): Unit = {
this.receiverStarted = receiverStarted
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
index 46ab3ac8de..56b400850f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
@@ -62,6 +62,10 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
0 -> StreamInputInfo(0, 300L),
1 -> StreamInputInfo(1, 300L, Map(StreamInputInfo.METADATA_KEY_DESCRIPTION -> "test")))
+ // onStreamingStarted
+ listener.onStreamingStarted(StreamingListenerStreamingStarted(100L))
+ listener.startTime should be (100)
+
// onBatchSubmitted
val batchInfoSubmitted = BatchInfo(Time(1000), streamIdToInputInfo, 1000, None, None, Map.empty)
listener.onBatchSubmitted(StreamingListenerBatchSubmitted(batchInfoSubmitted))