aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorlisurprise <zhichao.li@intel.com>2015-03-16 13:10:32 -0700
committerTathagata Das <tathagata.das1565@gmail.com>2015-03-16 13:10:32 -0700
commitf149b8b5e542af44650923d0156f037121b45a20 (patch)
tree80fdef3c31ad323a5d0ff3d6e9827c78045413b4 /core
parentd19efeddc0cb710c9496af11e447d39e1ad61b31 (diff)
downloadspark-f149b8b5e542af44650923d0156f037121b45a20.tar.gz
spark-f149b8b5e542af44650923d0156f037121b45a20.tar.bz2
spark-f149b8b5e542af44650923d0156f037121b45a20.zip
[SPARK-6077] Remove streaming tab while stopping StreamingContext
Currently we would create a new streaming tab for each streamingContext even if there's already one on the same sparkContext which would cause duplicate StreamingTab created and none of them is taking effect. snapshot: https://www.dropbox.com/s/t4gd6hqyqo0nivz/bad%20multiple%20streamings.png?dl=0 How to reproduce: 1) import org.apache.spark.SparkConf import org.apache.spark.streaming. {Seconds, StreamingContext} import org.apache.spark.storage.StorageLevel val ssc = new StreamingContext(sc, Seconds(1)) val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ..... 2) ssc.stop(false) val ssc = new StreamingContext(sc, Seconds(1)) val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() Author: lisurprise <zhichao.li@intel.com> Closes #4828 from zhichao-li/master and squashes the following commits: c329806 [lisurprise] add test for attaching/detaching streaming tab 51e6c7f [lisurprise] move detach method into StreamingTab 31a44fa [lisurprise] add unit test for attaching and detaching new tab db25ed2 [lisurprise] clean code 8281bcb [lisurprise] clean code 193c542 [lisurprise] remove streaming tab while closing streaming context
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ui/WebUI.scala28
-rw-r--r--core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala50
-rw-r--r--core/src/test/scala/org/apache/spark/ui/UISuite.scala38
3 files changed, 72 insertions, 44 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
index ec68837a15..ea548f2312 100644
--- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala
+++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala
@@ -20,14 +20,15 @@ package org.apache.spark.ui
import javax.servlet.http.HttpServletRequest
import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashMap
import scala.xml.Node
import org.eclipse.jetty.servlet.ServletContextHandler
import org.json4s.JsonAST.{JNothing, JValue}
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.Utils
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
/**
* The top level component of the UI hierarchy that contains the server.
@@ -45,6 +46,7 @@ private[spark] abstract class WebUI(
protected val tabs = ArrayBuffer[WebUITab]()
protected val handlers = ArrayBuffer[ServletContextHandler]()
+ protected val pageToHandlers = new HashMap[WebUIPage, ArrayBuffer[ServletContextHandler]]
protected var serverInfo: Option[ServerInfo] = None
protected val localHostName = Utils.localHostName()
protected val publicHostName = Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHostName)
@@ -60,14 +62,30 @@ private[spark] abstract class WebUI(
tab.pages.foreach(attachPage)
tabs += tab
}
+
+ def detachTab(tab: WebUITab) {
+ tab.pages.foreach(detachPage)
+ tabs -= tab
+ }
+
+ def detachPage(page: WebUIPage) {
+ pageToHandlers.remove(page).foreach(_.foreach(detachHandler))
+ }
/** Attach a page to this UI. */
def attachPage(page: WebUIPage) {
val pagePath = "/" + page.prefix
- attachHandler(createServletHandler(pagePath,
- (request: HttpServletRequest) => page.render(request), securityManager, basePath))
- attachHandler(createServletHandler(pagePath.stripSuffix("/") + "/json",
- (request: HttpServletRequest) => page.renderJson(request), securityManager, basePath))
+ val renderHandler = createServletHandler(pagePath,
+ (request: HttpServletRequest) => page.render(request), securityManager, basePath)
+ val renderJsonHandler = createServletHandler(pagePath.stripSuffix("/") + "/json",
+ (request: HttpServletRequest) => page.renderJson(request), securityManager, basePath)
+ attachHandler(renderHandler)
+ attachHandler(renderJsonHandler)
+ pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]())
+ .append(renderHandler)
+ pageToHandlers.getOrElseUpdate(page, ArrayBuffer[ServletContextHandler]())
+ .append(renderJsonHandler)
+
}
/** Attach a handler to this UI. */
diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
index 6a972381fa..0d155982a8 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala
@@ -17,20 +17,24 @@
package org.apache.spark.ui
+import javax.servlet.http.HttpServletRequest
+
import scala.collection.JavaConversions._
+import scala.xml.Node
-import org.openqa.selenium.{By, WebDriver}
import org.openqa.selenium.htmlunit.HtmlUnitDriver
+import org.openqa.selenium.{By, WebDriver}
import org.scalatest._
import org.scalatest.concurrent.Eventually._
import org.scalatest.selenium.WebBrowser
import org.scalatest.time.SpanSugar._
-import org.apache.spark._
import org.apache.spark.LocalSparkContext._
+import org.apache.spark._
import org.apache.spark.api.java.StorageLevels
import org.apache.spark.shuffle.FetchFailedException
+
/**
* Selenium tests for the Spark Web UI.
*/
@@ -310,4 +314,46 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before
}
}
}
+
+ test("attaching and detaching a new tab") {
+ withSpark(newSparkContext()) { sc =>
+ val sparkUI = sc.ui.get
+
+ val newTab = new WebUITab(sparkUI, "foo") {
+ attachPage(new WebUIPage("") {
+ def render(request: HttpServletRequest): Seq[Node] = {
+ <b>"html magic"</b>
+ }
+ })
+ }
+ sparkUI.attachTab(newTab)
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ go to (sc.ui.get.appUIAddress.stripSuffix("/"))
+ find(cssSelector("""ul li a[href*="jobs"]""")) should not be(None)
+ find(cssSelector("""ul li a[href*="stages"]""")) should not be(None)
+ find(cssSelector("""ul li a[href*="storage"]""")) should not be(None)
+ find(cssSelector("""ul li a[href*="environment"]""")) should not be(None)
+ find(cssSelector("""ul li a[href*="foo"]""")) should not be(None)
+ }
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ // check whether new page exists
+ go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/foo")
+ find(cssSelector("b")).get.text should include ("html magic")
+ }
+ sparkUI.detachTab(newTab)
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ go to (sc.ui.get.appUIAddress.stripSuffix("/"))
+ find(cssSelector("""ul li a[href*="jobs"]""")) should not be(None)
+ find(cssSelector("""ul li a[href*="stages"]""")) should not be(None)
+ find(cssSelector("""ul li a[href*="storage"]""")) should not be(None)
+ find(cssSelector("""ul li a[href*="environment"]""")) should not be(None)
+ find(cssSelector("""ul li a[href*="foo"]""")) should be(None)
+ }
+ eventually(timeout(10 seconds), interval(50 milliseconds)) {
+ // check new page not exist
+ go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/foo")
+ find(cssSelector("b")) should be(None)
+ }
+ }
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/ui/UISuite.scala b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
index 92a21f82f3..77a038dc17 100644
--- a/core/src/test/scala/org/apache/spark/ui/UISuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/UISuite.scala
@@ -18,7 +18,6 @@
package org.apache.spark.ui
import java.net.ServerSocket
-import javax.servlet.http.HttpServletRequest
import scala.io.Source
import scala.util.{Failure, Success, Try}
@@ -28,9 +27,8 @@ import org.scalatest.FunSuite
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
-import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.LocalSparkContext._
-import scala.xml.Node
+import org.apache.spark.{SparkConf, SparkContext}
class UISuite extends FunSuite {
@@ -72,40 +70,6 @@ class UISuite extends FunSuite {
}
}
- ignore("attaching a new tab") {
- withSpark(newSparkContext()) { sc =>
- val sparkUI = sc.ui.get
-
- val newTab = new WebUITab(sparkUI, "foo") {
- attachPage(new WebUIPage("") {
- def render(request: HttpServletRequest): Seq[Node] = {
- <b>"html magic"</b>
- }
- })
- }
- sparkUI.attachTab(newTab)
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
- val html = Source.fromURL(sparkUI.appUIAddress).mkString
- assert(!html.contains("random data that should not be present"))
-
- // check whether new page exists
- assert(html.toLowerCase.contains("foo"))
-
- // check whether other pages still exist
- assert(html.toLowerCase.contains("stages"))
- assert(html.toLowerCase.contains("storage"))
- assert(html.toLowerCase.contains("environment"))
- assert(html.toLowerCase.contains("executors"))
- }
-
- eventually(timeout(10 seconds), interval(50 milliseconds)) {
- val html = Source.fromURL(sparkUI.appUIAddress.stripSuffix("/") + "/foo").mkString
- // check whether new page exists
- assert(html.contains("magic"))
- }
- }
- }
-
test("jetty selects different port under contention") {
val server = new ServerSocket(0)
val startPort = server.getLocalPort