aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-04-11 23:33:49 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-11 23:33:49 -0700
commit6aa08c39cf30fa5c4ed97f4fff16371b9030a2e6 (patch)
tree66542ff62c23f52366ea2e25e346467bff14c97b /streaming/src/test
parent165e06a74c3d75e6b7341c120943add8b035b96a (diff)
downloadspark-6aa08c39cf30fa5c4ed97f4fff16371b9030a2e6.tar.gz
spark-6aa08c39cf30fa5c4ed97f4fff16371b9030a2e6.tar.bz2
spark-6aa08c39cf30fa5c4ed97f4fff16371b9030a2e6.zip
[SPARK-1386] Web UI for Spark Streaming
When debugging Spark Streaming applications it is necessary to monitor certain metrics that are not shown in the Spark application UI. For example, what is average processing time of batches? What is the scheduling delay? Is the system able to process as fast as it is receiving data? How many records I am receiving through my receivers? While the StreamingListener interface introduced in the 0.9 provided some of this information, it could only be accessed programmatically. A UI that shows information specific to the streaming applications is necessary for easier debugging. This PR introduces such a UI. It shows various statistics related to the streaming application. Here is a screenshot of the UI running on my local machine. http://i.imgur.com/1ooDGhm.png This UI is integrated into the Spark UI running at 4040. Author: Tathagata Das <tathagata.das1565@gmail.com> Author: Andrew Or <andrewor14@gmail.com> Closes #290 from tdas/streaming-web-ui and squashes the following commits: fc73ca5 [Tathagata Das] Merge pull request #9 from andrewor14/ui-refactor 642dd88 [Andrew Or] Merge SparkUISuite.scala into UISuite.scala eb30517 [Andrew Or] Merge github.com:apache/spark into ui-refactor f4f4cbe [Tathagata Das] More minor fixes. 34bb364 [Tathagata Das] Merge branch 'streaming-web-ui' of github.com:tdas/spark into streaming-web-ui 252c566 [Tathagata Das] Merge pull request #8 from andrewor14/ui-refactor e038b4b [Tathagata Das] Addressed Patrick's comments. 125a054 [Andrew Or] Disable serving static resources with gzip 90feb8d [Andrew Or] Address Patrick's comments 89dae36 [Tathagata Das] Merge branch 'streaming-web-ui' of github.com:tdas/spark into streaming-web-ui 72fe256 [Tathagata Das] Merge pull request #6 from andrewor14/ui-refactor 2fc09c8 [Tathagata Das] Added binary check exclusions aa396d4 [Andrew Or] Rename tabs and pages (No more IndexPage.scala) f8e1053 [Tathagata Das] Added Spark and Streaming UI unit tests. caa5e05 [Tathagata Das] Merge branch 'streaming-web-ui' of github.com:tdas/spark into streaming-web-ui 585cd65 [Tathagata Das] Merge pull request #5 from andrewor14/ui-refactor 914b8ff [Tathagata Das] Moved utils functions to UIUtils. 548c98c [Andrew Or] Wide refactoring of WebUI, UITab, and UIPage (see commit message) 6de06b0 [Tathagata Das] Merge remote-tracking branch 'apache/master' into streaming-web-ui ee6543f [Tathagata Das] Minor changes based on Andrew's comments. fa760fe [Tathagata Das] Fixed long line. 1c0bcef [Tathagata Das] Refactored streaming UI into two files. 1af239b [Tathagata Das] Changed streaming UI to attach itself as a tab with the Spark UI. 827e81a [Tathagata Das] Merge branch 'streaming-web-ui' of github.com:tdas/spark into streaming-web-ui 168fe86 [Tathagata Das] Merge pull request #2 from andrewor14/ui-refactor 3e986f8 [Tathagata Das] Merge remote-tracking branch 'apache/master' into streaming-web-ui c78c92d [Andrew Or] Remove outdated comment 8f7323b [Andrew Or] End of file new lines, indentation, and imports (minor) 0d61ee8 [Andrew Or] Merge branch 'streaming-web-ui' of github.com:tdas/spark into ui-refactor 9a48fa1 [Andrew Or] Allow adding tabs to SparkUI dynamically + add example 61358e3 [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into streaming-web-ui 53be2c5 [Tathagata Das] Minor style updates. ed25dfc [Andrew Or] Generalize SparkUI header to display tabs dynamically a37ad4f [Andrew Or] Comments, imports and formatting (minor) cd000b0 [Andrew Or] Merge github.com:apache/spark into ui-refactor 7d57444 [Andrew Or] Refactoring the UI interface to add flexibility aef4dd5 [Tathagata Das] Added Apache licenses. db27bad [Tathagata Das] Added last batch processing time to StreamingUI. 4d86e98 [Tathagata Das] Added basic stats to the StreamingUI and refactored the UI to a Page to make it easier to transition to using SparkUI later. 93f1c69 [Tathagata Das] Added network receiver information to the Streaming UI. 56cc7fb [Tathagata Das] First cut implementation of Streaming UI.
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala6
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala1
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala46
3 files changed, 49 insertions, 4 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 389b23d4d5..952511d411 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -239,11 +239,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
/** This is a server to test the network input stream */
-class TestServer() extends Logging {
+class TestServer(portToBind: Int = 0) extends Logging {
val queue = new ArrayBlockingQueue[String](100)
- val serverSocket = new ServerSocket(0)
+ val serverSocket = new ServerSocket(portToBind)
val servingThread = new Thread() {
override def run() {
@@ -282,7 +282,7 @@ class TestServer() extends Logging {
def start() { servingThread.start() }
- def send(msg: String) { queue.add(msg) }
+ def send(msg: String) { queue.put(msg) }
def stop() { servingThread.interrupt() }
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
index 9cc27ef7f0..efd0d22ecb 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala
@@ -161,7 +161,6 @@ class StreamingContextSuite extends FunSuite with BeforeAndAfter with Timeouts w
}
}
-
test("stop only streaming context") {
ssc = new StreamingContext(master, appName, batchDuration)
sc = ssc.sparkContext
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
new file mode 100644
index 0000000000..35538ec188
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import scala.io.Source
+
+import org.scalatest.FunSuite
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.time.SpanSugar._
+
+class UISuite extends FunSuite {
+
+ test("streaming tab in spark UI") {
+ val ssc = new StreamingContext("local", "test", Seconds(1))
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ val html = Source.fromURL(ssc.sparkContext.ui.appUIAddress).mkString
+ assert(!html.contains("random data that should not be present"))
+ // test if streaming tab exist
+ assert(html.toLowerCase.contains("streaming"))
+ // test if other Spark tabs still exist
+ assert(html.toLowerCase.contains("stages"))
+ }
+
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ val html = Source.fromURL(
+ ssc.sparkContext.ui.appUIAddress.stripSuffix("/") + "/streaming").mkString
+ assert(html.toLowerCase.contains("batch"))
+ assert(html.toLowerCase.contains("network"))
+ }
+ }
+}