aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorlisurprise <zhichao.li@intel.com>2015-03-16 13:10:32 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-03-16 13:10:32 -0700
commitf149b8b5e542af44650923d0156f037121b45a20 (patch)
tree80fdef3c31ad323a5d0ff3d6e9827c78045413b4 /streaming
parentd19efeddc0cb710c9496af11e447d39e1ad61b31 (diff)
downloadspark-f149b8b5e542af44650923d0156f037121b45a20.tar.gz
spark-f149b8b5e542af44650923d0156f037121b45a20.tar.bz2
spark-f149b8b5e542af44650923d0156f037121b45a20.zip
[SPARK-6077] Remove streaming tab while stopping StreamingContext
Currently we would create a new streaming tab for each streamingContext even if there's already one on the same sparkContext which would cause duplicate StreamingTab created and none of them is taking effect. snapshot: https://www.dropbox.com/s/t4gd6hqyqo0nivz/bad%20multiple%20streamings.png?dl=0 How to reproduce: 1) import org.apache.spark.SparkConf import org.apache.spark.streaming. {Seconds, StreamingContext} import org.apache.spark.storage.StorageLevel val ssc = new StreamingContext(sc, Seconds(1)) val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ..... 2) ssc.stop(false) val ssc = new StreamingContext(sc, Seconds(1)) val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() Author: lisurprise <zhichao.li@intel.com> Closes #4828 from zhichao-li/master and squashes the following commits: c329806 [lisurprise] add test for attaching/detaching streaming tab 51e6c7f [lisurprise] move detach method into StreamingTab 31a44fa [lisurprise] add unit test for attaching and detaching new tab db25ed2 [lisurprise] clean code 8281bcb [lisurprise] clean code 193c542 [lisurprise] remove streaming tab while closing streaming context
Diffstat (limited to 'streaming')
-rw-r--r--streaming/pom.xml5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala1
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala95
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala55
6 files changed, 107 insertions, 57 deletions
diff --git a/streaming/pom.xml b/streaming/pom.xml
index 0370b0e9e1..96508d83f4 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -83,6 +83,11 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.seleniumhq.selenium</groupId>
+ <artifactId>selenium-java</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>com.novocode</groupId>
<artifactId>junit-interface</artifactId>
<scope>test</scope>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index ba3f23434f..b5b6770a8a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -578,6 +578,7 @@ class StreamingContext private[streaming] (
// Even if we have already stopped, we still need to attempt to stop the SparkContext because
// a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true).
if (stopSparkContext) sc.stop()
+ uiTab.foreach(_.detach())
// The state should always be Stopped after calling `stop()`, even if we haven't started yet:
state = Stopped
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index 98e9a2e639..bfe8086fcf 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -32,7 +32,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
extends WebUIPage("") with Logging {
private val listener = parent.listener
- private val startTime = Calendar.getInstance().getTime()
+ private val startTime = System.currentTimeMillis()
private val emptyCell = "-"
/** Render the page */
@@ -47,7 +47,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
/** Generate basic stats of the streaming program */
private def generateBasicStats(): Seq[Node] = {
- val timeSinceStart = System.currentTimeMillis() - startTime.getTime
+ val timeSinceStart = System.currentTimeMillis() - startTime
<ul class ="unstyled">
<li>
<strong>Started at: </strong> {startTime.toString}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
index d9d04cd706..9a860ea4a6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
@@ -36,6 +36,10 @@ private[spark] class StreamingTab(ssc: StreamingContext)
ssc.addStreamingListener(listener)
attachPage(new StreamingPage(this))
parent.attachTab(this)
+
+ def detach() {
+ getSparkUI(ssc).detachTab(this)
+ }
}
private object StreamingTab {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
new file mode 100644
index 0000000000..87a0395efb
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import org.openqa.selenium.WebDriver
+import org.openqa.selenium.htmlunit.HtmlUnitDriver
+import org.scalatest._
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.selenium.WebBrowser
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark._
+
+
+
+
+/**
+ * Selenium tests for the Spark Web UI.
+ */
+class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with BeforeAndAfterAll with TestSuiteBase {
+
+ implicit var webDriver: WebDriver = _
+
+ override def beforeAll(): Unit = {
+ webDriver = new HtmlUnitDriver
+ }
+
+ override def afterAll(): Unit = {
+ if (webDriver != null) {
+ webDriver.quit()
+ }
+ }
+
+ /**
+ * Create a test SparkStreamingContext with the SparkUI enabled.
+ */
+ private def newSparkStreamingContext(): StreamingContext = {
+ val conf = new SparkConf()
+ .setMaster("local")
+ .setAppName("test")
+ .set("spark.ui.enabled", "true")
+ val ssc = new StreamingContext(conf, Seconds(1))
+ assert(ssc.sc.ui.isDefined, "Spark UI is not started!")
+ ssc
+ }
+
+ test("attaching and detaching a Streaming tab") {
+ withStreamingContext(newSparkStreamingContext()) { ssc =>
+ val sparkUI = ssc.sparkContext.ui.get
+
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ go to (sparkUI.appUIAddress.stripSuffix("/"))
+ find(cssSelector( """ul li a[href*="streaming"]""")) should not be (None)
+ }
+
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ // check whether streaming page exists
+ go to (sparkUI.appUIAddress.stripSuffix("/") + "/streaming")
+ val statisticText = findAll(cssSelector("li strong")).map(_.text).toSeq
+ statisticText should contain("Network receivers:")
+ statisticText should contain("Batch interval:")
+ }
+
+ ssc.stop(false)
+
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ go to (sparkUI.appUIAddress.stripSuffix("/"))
+ find(cssSelector( """ul li a[href*="streaming"]""")) should be(None)
+ }
+
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ go to (sparkUI.appUIAddress.stripSuffix("/") + "/streaming")
+ val statisticText = findAll(cssSelector("li strong")).map(_.text).toSeq
+ statisticText should not contain ("Network receivers:")
+ statisticText should not contain ("Batch interval:")
+ }
+ }
+ }
+}
+
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
deleted file mode 100644
index 8e30118266..0000000000
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-import scala.io.Source
-
-import org.scalatest.FunSuite
-import org.scalatest.concurrent.Eventually._
-import org.scalatest.time.SpanSugar._
-
-import org.apache.spark.SparkConf
-
-class UISuite extends FunSuite {
-
- // Ignored: See SPARK-1530
- ignore("streaming tab in spark UI") {
- val conf = new SparkConf()
- .setMaster("local")
- .setAppName("test")
- .set("spark.ui.enabled", "true")
- val ssc = new StreamingContext(conf, Seconds(1))
- assert(ssc.sc.ui.isDefined, "Spark UI is not started!")
- val ui = ssc.sc.ui.get
-
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
- val html = Source.fromURL(ui.appUIAddress).mkString
- assert(!html.contains("random data that should not be present"))
- // test if streaming tab exist
- assert(html.toLowerCase.contains("streaming"))
- // test if other Spark tabs still exist
- assert(html.toLowerCase.contains("stages"))
- }
-
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
- val html = Source.fromURL(ui.appUIAddress.stripSuffix("/") + "/streaming").mkString
- assert(html.toLowerCase.contains("batch"))
- assert(html.toLowerCase.contains("network"))
- }
- }
-}