aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlisurprise <zhichao.li@intel.com>2015-03-16 13:10:32 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-03-16 13:10:32 -0700
commitf149b8b5e542af44650923d0156f037121b45a20 (patch)
tree80fdef3c31ad323a5d0ff3d6e9827c78045413b4
parentd19efeddc0cb710c9496af11e447d39e1ad61b31 (diff)
downloadspark-f149b8b5e542af44650923d0156f037121b45a20.tar.gz
spark-f149b8b5e542af44650923d0156f037121b45a20.tar.bz2
spark-f149b8b5e542af44650923d0156f037121b45a20.zip
[SPARK-6077] Remove streaming tab while stopping StreamingContext
Currently we would create a new streaming tab for each streamingContext even if there's already one on the same sparkContext which would cause duplicate StreamingTab created and none of them is taking effect. snapshot: https://www.dropbox.com/s/t4gd6hqyqo0nivz/bad%20multiple%20streamings.png?dl=0 How to reproduce: 1) import org.apache.spark.SparkConf import org.apache.spark.streaming. {Seconds, StreamingContext} import org.apache.spark.storage.StorageLevel val ssc = new StreamingContext(sc, Seconds(1)) val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ..... 2) ssc.stop(false) val ssc = new StreamingContext(sc, Seconds(1)) val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() Author: lisurprise <zhichao.li@intel.com> Closes #4828 from zhichao-li/master and squashes the following commits: c329806 [lisurprise] add test for attaching/detaching streaming tab 51e6c7f [lisurprise] move detach method into StreamingTab 31a44fa [lisurprise] add unit test for attaching and detaching new tab db25ed2 [lisurprise] clean code 8281bcb [lisurprise] clean code 193c542 [lisurprise] remove streaming tab while closing streaming context
-rw-r--r--core/src/main/scala/org/apache/spark/ui/WebUI.scala28
-rw-r--r--core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala50
-rw-r--r--core/src/test/scala/org/apache/spark/ui/UISuite.scala38
-rw-r--r--streaming/pom.xml5
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala1
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala95
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala55
9 files changed, 179 insertions, 101 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
index ec68837a15..ea548f2312 100644
--- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -20,14 +20,15 @@ package org.apache.spark.ui
import javax.servlet.http.HttpServletRequest
import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashMap
import scala.xml.Node
import org.eclipse.jetty.servlet.ServletContextHandler
import org.json4s.JsonAST.{JNothing, JValue}
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.Utils
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
/**
* The top level component of the UI hierarchy that contains the server.
@@ -45,6 +46,7 @@ private[spark] abstract class WebUI(
protected val tabs = ArrayBuffer[WebUITab]()
protected val handlers = ArrayBuffer[ServletContextHandler]()
+ protected val pageToHandlers = new HashMap[WebUIPage, ArrayBuffer[ServletContextHandler]]
protected var serverInfo: Option[ServerInfo] = None
protected val localHostName = Utils.localHostName()
protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
@@ -60,14 +62,30 @@ private[spark] abstract class WebUI(
tab.pages.foreach(attachPage)
tabs += tab
}
+
+ def detachTab(tab: WebUITab) {
+ tab.pages.foreach(detachPage)
+ tabs -= tab
+ }
+
+ def detachPage(page: WebUIPage) {
+ pageToHandlers.remove(page).foreach(_.foreach(detachHandler))
+ }
/** Attach a page to this UI. */
def attachPage(page: WebUIPage) {
val pagePath = "/" + page.prefix
- attachHandler(createServletHandler(pagePath,
- (request: HttpServletRequest) => page.render(request), securityManager, basePath))
- attachHandler(createServletHandler(pagePath.stripSuffix("/") + "/json",
- (request: HttpServletRequest) => page.renderJson(request), securityManager, basePath))
+ val renderHandler = createServletHandler(pagePath,
+ (request: HttpServletRequest) => page.render(request), securityManager, basePath)
+ val renderJsonHandler = createServletHandler(pagePath.stripSuffix("/") + "/json",
+ (request: HttpServletRequest) => page.renderJson(request), securityManager, basePath)
+ attachHandler(renderHandler)
+ attachHandler(renderJsonHandler)
+ pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]())
+ .append(renderHandler)
+ pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]())
+ .append(renderJsonHandler)
+
}
/** Attach a handler to this UI. */
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index 6a972381fa..0d155982a8 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -17,20 +17,24 @@
package org.apache.spark.ui
+import javax.servlet.http.HttpServletRequest
+
import scala.collection.JavaConversions._
+import scala.xml.Node
-import org.openqa.selenium.{By, WebDriver}
import org.openqa.selenium.htmlunit.HtmlUnitDriver
+import org.openqa.selenium.{By, WebDriver}
import org.scalatest._
import org.scalatest.concurrent.Eventually._
import org.scalatest.selenium.WebBrowser
import org.scalatest.time.SpanSugar._
-import org.apache.spark._
import org.apache.spark.LocalSparkContext._
+import org.apache.spark._
import org.apache.spark.api.java.StorageLevels
import org.apache.spark.shuffle.FetchFailedException
+
/**
* Selenium tests for the Spark Web UI.
*/
@@ -310,4 +314,46 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before
}
}
}
+
+ test("attaching and detaching a new tab") {
+ withSpark(newSparkContext()) { sc =>
+ val sparkUI = sc.ui.get
+
+ val newTab = new WebUITab(sparkUI, "foo") {
+ attachPage(new WebUIPage("") {
+ def render(request: HttpServletRequest): Seq[Node] = {
+ <b>"html magic"</b>
+ }
+ })
+ }
+ sparkUI.attachTab(newTab)
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ go to (sc.ui.get.appUIAddress.stripSuffix("/"))
+ find(cssSelector("""ul li a[href*="jobs"]""")) should not be(None)
+ find(cssSelector("""ul li a[href*="stages"]""")) should not be(None)
+ find(cssSelector("""ul li a[href*="storage"]""")) should not be(None)
+ find(cssSelector("""ul li a[href*="environment"]""")) should not be(None)
+ find(cssSelector("""ul li a[href*="foo"]""")) should not be(None)
+ }
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ // check whether new page exists
+ go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/foo")
+ find(cssSelector("b")).get.text should include ("html magic")
+ }
+ sparkUI.detachTab(newTab)
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ go to (sc.ui.get.appUIAddress.stripSuffix("/"))
+ find(cssSelector("""ul li a[href*="jobs"]""")) should not be(None)
+ find(cssSelector("""ul li a[href*="stages"]""")) should not be(None)
+ find(cssSelector("""ul li a[href*="storage"]""")) should not be(None)
+ find(cssSelector("""ul li a[href*="environment"]""")) should not be(None)
+ find(cssSelector("""ul li a[href*="foo"]""")) should be(None)
+ }
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ // check new page not exist
+ go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/foo")
+ find(cssSelector("b")) should be(None)
+ }
+ }
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
index 92a21f82f3..77a038dc17 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -18,7 +18,6 @@
package org.apache.spark.ui
import java.net.ServerSocket
-import javax.servlet.http.HttpServletRequest
import scala.io.Source
import scala.util.{Failure, Success, Try}
@@ -28,9 +27,8 @@ import org.scalatest.FunSuite
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
-import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.LocalSparkContext._
-import scala.xml.Node
+import org.apache.spark.{SparkConf, SparkContext}
class UISuite extends FunSuite {
@@ -72,40 +70,6 @@ class UISuite extends FunSuite {
}
}
- ignore("attaching a new tab") {
- withSpark(newSparkContext()) { sc =>
- val sparkUI = sc.ui.get
-
- val newTab = new WebUITab(sparkUI, "foo") {
- attachPage(new WebUIPage("") {
- def render(request: HttpServletRequest): Seq[Node] = {
- <b>"html magic"</b>
- }
- })
- }
- sparkUI.attachTab(newTab)
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
- val html = Source.fromURL(sparkUI.appUIAddress).mkString
- assert(!html.contains("random data that should not be present"))
-
- // check whether new page exists
- assert(html.toLowerCase.contains("foo"))
-
- // check whether other pages still exist
- assert(html.toLowerCase.contains("stages"))
- assert(html.toLowerCase.contains("storage"))
- assert(html.toLowerCase.contains("environment"))
- assert(html.toLowerCase.contains("executors"))
- }
-
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
- val html = Source.fromURL(sparkUI.appUIAddress.stripSuffix("/") + "/foo").mkString
- // check whether new page exists
- assert(html.contains("magic"))
- }
- }
- }
-
test("jetty selects different port under contention") {
val server = new ServerSocket(0)
val startPort = server.getLocalPort
diff --git a/streaming/pom.xml b/streaming/pom.xml
index 0370b0e9e1..96508d83f4 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -83,6 +83,11 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.seleniumhq.selenium</groupId>
+ <artifactId>selenium-java</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>com.novocode</groupId>
<artifactId>junit-interface</artifactId>
<scope>test</scope>
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index ba3f23434f..b5b6770a8a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -578,6 +578,7 @@ class StreamingContext private[streaming] (
// Even if we have already stopped, we still need to attempt to stop the SparkContext because
// a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true).
if (stopSparkContext) sc.stop()
+ uiTab.foreach(_.detach())
// The state should always be Stopped after calling `stop()`, even if we haven't started yet:
state = Stopped
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index 98e9a2e639..bfe8086fcf 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -32,7 +32,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
extends WebUIPage("") with Logging {
private val listener = parent.listener
- private val startTime = Calendar.getInstance().getTime()
+ private val startTime = System.currentTimeMillis()
private val emptyCell = "-"
/** Render the page */
@@ -47,7 +47,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
/** Generate basic stats of the streaming program */
private def generateBasicStats(): Seq[Node] = {
- val timeSinceStart = System.currentTimeMillis() - startTime.getTime
+ val timeSinceStart = System.currentTimeMillis() - startTime
<ul class ="unstyled">
<li>
<strong>Started at: </strong> {startTime.toString}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
index d9d04cd706..9a860ea4a6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
@@ -36,6 +36,10 @@ private[spark] class StreamingTab(ssc: StreamingContext)
ssc.addStreamingListener(listener)
attachPage(new StreamingPage(this))
parent.attachTab(this)
+
+ def detach() {
+ getSparkUI(ssc).detachTab(this)
+ }
}
private object StreamingTab {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
new file mode 100644
index 0000000000..87a0395efb
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming
+
+import org.openqa.selenium.WebDriver
+import org.openqa.selenium.htmlunit.HtmlUnitDriver
+import org.scalatest._
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.selenium.WebBrowser
+import org.scalatest.time.SpanSugar._
+
+import org.apache.spark._
+
+
+
+
+/**
+ * Selenium tests for the Spark Web UI.
+ */
+class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with BeforeAndAfterAll with TestSuiteBase {
+
+ implicit var webDriver: WebDriver = _
+
+ override def beforeAll(): Unit = {
+ webDriver = new HtmlUnitDriver
+ }
+
+ override def afterAll(): Unit = {
+ if (webDriver != null) {
+ webDriver.quit()
+ }
+ }
+
+ /**
+ * Create a test SparkStreamingContext with the SparkUI enabled.
+ */
+ private def newSparkStreamingContext(): StreamingContext = {
+ val conf = new SparkConf()
+ .setMaster("local")
+ .setAppName("test")
+ .set("spark.ui.enabled", "true")
+ val ssc = new StreamingContext(conf, Seconds(1))
+ assert(ssc.sc.ui.isDefined, "Spark UI is not started!")
+ ssc
+ }
+
+ test("attaching and detaching a Streaming tab") {
+ withStreamingContext(newSparkStreamingContext()) { ssc =>
+ val sparkUI = ssc.sparkContext.ui.get
+
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ go to (sparkUI.appUIAddress.stripSuffix("/"))
+ find(cssSelector( """ul li a[href*="streaming"]""")) should not be (None)
+ }
+
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ // check whether streaming page exists
+ go to (sparkUI.appUIAddress.stripSuffix("/") + "/streaming")
+ val statisticText = findAll(cssSelector("li strong")).map(_.text).toSeq
+ statisticText should contain("Network receivers:")
+ statisticText should contain("Batch interval:")
+ }
+
+ ssc.stop(false)
+
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ go to (sparkUI.appUIAddress.stripSuffix("/"))
+ find(cssSelector( """ul li a[href*="streaming"]""")) should be(None)
+ }
+
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ go to (sparkUI.appUIAddress.stripSuffix("/") + "/streaming")
+ val statisticText = findAll(cssSelector("li strong")).map(_.text).toSeq
+ statisticText should not contain ("Network receivers:")
+ statisticText should not contain ("Batch interval:")
+ }
+ }
+ }
+}
+
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
deleted file mode 100644
index 8e30118266..0000000000
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISuite.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-import scala.io.Source
-
-import org.scalatest.FunSuite
-import org.scalatest.concurrent.Eventually._
-import org.scalatest.time.SpanSugar._
-
-import org.apache.spark.SparkConf
-
-class UISuite extends FunSuite {
-
- // Ignored: See SPARK-1530
- ignore("streaming tab in spark UI") {
- val conf = new SparkConf()
- .setMaster("local")
- .setAppName("test")
- .set("spark.ui.enabled", "true")
- val ssc = new StreamingContext(conf, Seconds(1))
- assert(ssc.sc.ui.isDefined, "Spark UI is not started!")
- val ui = ssc.sc.ui.get
-
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
- val html = Source.fromURL(ui.appUIAddress).mkString
- assert(!html.contains("random data that should not be present"))
- // test if streaming tab exist
- assert(html.toLowerCase.contains("streaming"))
- // test if other Spark tabs still exist
- assert(html.toLowerCase.contains("stages"))
- }
-
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
- val html = Source.fromURL(ui.appUIAddress.stripSuffix("/") + "/streaming").mkString
- assert(html.toLowerCase.contains("batch"))
- assert(html.toLowerCase.contains("network"))
- }
- }
-}