aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-06-24 16:25:05 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-06-24 16:25:05 -0700
commit42157027f247035e9ae41efe899e27c0942f5cd8 (patch)
tree4fbb231c0f28af57c04ae9cf3784d8090fc4c4c1 /core/src
parenta4248138b4c4b374e61f474886463c58f5bb3915 (diff)
downloadspark-42157027f247035e9ae41efe899e27c0942f5cd8.tar.gz
spark-42157027f247035e9ae41efe899e27c0942f5cd8.tar.bz2
spark-42157027f247035e9ae41efe899e27c0942f5cd8.zip
A few bug fixes and a unit test
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/spark/SparkContext.scala1
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala9
-rw-r--r--core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala10
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala9
-rw-r--r--core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala10
-rw-r--r--core/src/main/scala/spark/ui/JettyUtils.scala7
-rw-r--r--core/src/main/scala/spark/ui/SparkUI.scala13
-rw-r--r--core/src/test/resources/log4j.properties1
-rw-r--r--core/src/test/scala/spark/DistributedSuite.scala2
-rw-r--r--core/src/test/scala/spark/ui/UISuite.scala15
10 files changed, 58 insertions, 19 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 1f420d73f7..500d25efdd 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -578,6 +578,7 @@ class SparkContext(
/** Shut down the SparkContext. */
def stop() {
+ ui.stop()
// Do this only if not stopped already - best case effort.
// prevent NPE if stopped more than once.
val dagSchedulerCopy = dagScheduler
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 3eca522c7e..2a49dfb486 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -36,6 +36,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
var firstApp: Option[ApplicationInfo] = None
+ val webUi = new MasterWebUI(self)
+
Utils.checkHost(host, "Expected hostname")
val masterPublicAddress = {
@@ -52,13 +54,12 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
logInfo("Starting Spark master at spark://" + host + ":" + port)
// Listen for remote client disconnection events, since they don't go through Akka's watch()
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
- startWebUi()
+ webUi.start()
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers())
}
- def startWebUi() {
- val webUi = new MasterWebUI(self)
- webUi.start()
+ override def postStop() {
+ webUi.stop()
}
override def receive = {
diff --git a/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
index 5fd17f10c6..065b62a4b2 100644
--- a/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
+++ b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala
@@ -5,7 +5,7 @@ import akka.util.Duration
import javax.servlet.http.HttpServletRequest
-import org.eclipse.jetty.server.Handler
+import org.eclipse.jetty.server.{Handler, Server}
import spark.{Logging, Utils}
import spark.ui.JettyUtils
@@ -21,13 +21,15 @@ class MasterWebUI(val master: ActorRef) extends Logging {
val host = Utils.localHostName()
val port = Option(System.getProperty("master.ui.port"))
.getOrElse(MasterWebUI.DEFAULT_PORT).toInt
+ var server: Option[Server] = None
val applicationPage = new ApplicationPage(this)
val indexPage = new IndexPage(this)
def start() {
try {
- val (server, boundPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers)
+ val (srv, boundPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers)
+ server = Some(srv)
logInfo("Started Master web UI at http://%s:%d".format(host, boundPort))
} catch {
case e: Exception =>
@@ -42,6 +44,10 @@ class MasterWebUI(val master: ActorRef) extends Logging {
("/app", (request: HttpServletRequest) => applicationPage.render(request)),
("*", (request: HttpServletRequest) => indexPage.render(request))
)
+
+ def stop() {
+ server.foreach(_.stop())
+ }
}
object MasterWebUI {
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 690bdfe128..28553b6c02 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -45,6 +45,7 @@ private[spark] class Worker(
val envVar = System.getenv("SPARK_PUBLIC_DNS")
if (envVar != null) envVar else host
}
+ val webUi = new WorkerWebUI(self, workDir)
var coresUsed = 0
var memoryUsed = 0
@@ -77,7 +78,7 @@ private[spark] class Worker(
logInfo("Spark home: " + sparkHome)
createWorkDir()
connectToMaster()
- startWebUi()
+ webUi.start()
}
def connectToMaster() {
@@ -88,11 +89,6 @@ private[spark] class Worker(
context.watch(master) // Doesn't work with remote actors, but useful for testing
}
- def startWebUi() {
- val webUi = new WorkerWebUI(self, workDir)
- webUi.start()
- }
-
override def receive = {
case RegisteredWorker(url) =>
masterWebUiUrl = url
@@ -163,6 +159,7 @@ private[spark] class Worker(
override def postStop() {
executors.values.foreach(_.kill())
+ webUi.stop()
}
}
diff --git a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
index abfc847527..ee3889192d 100644
--- a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -7,7 +7,7 @@ import java.io.File
import javax.servlet.http.HttpServletRequest
-import org.eclipse.jetty.server.Handler
+import org.eclipse.jetty.server.{Handler, Server}
import scala.io.Source
@@ -25,6 +25,7 @@ class WorkerWebUI(val worker: ActorRef, val workDir: File) extends Logging {
val host = Utils.localHostName()
val port = Option(System.getProperty("wroker.ui.port"))
.getOrElse(WorkerWebUI.DEFAULT_PORT).toInt
+ var server: Option[Server] = None
val indexPage = new IndexPage(this)
@@ -37,7 +38,8 @@ class WorkerWebUI(val worker: ActorRef, val workDir: File) extends Logging {
def start() {
try {
- val (server, boundPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers)
+ val (srv, boundPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers)
+ server = Some(srv)
logInfo("Started Worker web UI at http://%s:%d".format(host, boundPort))
} catch {
case e: Exception =>
@@ -56,6 +58,10 @@ class WorkerWebUI(val worker: ActorRef, val workDir: File) extends Logging {
source.close()
lines
}
+
+ def stop() {
+ server.foreach(_.stop())
+ }
}
object WorkerWebUI {
diff --git a/core/src/main/scala/spark/ui/JettyUtils.scala b/core/src/main/scala/spark/ui/JettyUtils.scala
index b5270e6062..85d6a7e867 100644
--- a/core/src/main/scala/spark/ui/JettyUtils.scala
+++ b/core/src/main/scala/spark/ui/JettyUtils.scala
@@ -12,7 +12,8 @@ import org.eclipse.jetty.server.handler.{ResourceHandler, HandlerList, ContextHa
import scala.util.{Try, Success, Failure}
import scala.xml.Node
-import spark.{Utils, SparkContext, Logging}
+import spark.{SparkContext, Logging}
+import org.eclipse.jetty.util.log.Log
/** Utilities for launching a web server using Jetty's HTTP Server class */
private[spark] object JettyUtils extends Logging {
@@ -91,12 +92,14 @@ private[spark] object JettyUtils extends Logging {
@tailrec
def connect(currentPort: Int): (Server, Int) = {
- val server = new Server(port)
+ val server = new Server(currentPort)
server.setHandler(handlerList)
Try { server.start() } match {
case s: Success[_] => (server, currentPort)
case f: Failure[_] =>
+ server.stop()
logInfo("Failed to create UI at port, %s. Trying again.".format(currentPort))
+ logInfo("Error was: " + f.toString)
connect((currentPort + 1) % 65536)
}
}
diff --git a/core/src/main/scala/spark/ui/SparkUI.scala b/core/src/main/scala/spark/ui/SparkUI.scala
index 2d5a328015..487f005615 100644
--- a/core/src/main/scala/spark/ui/SparkUI.scala
+++ b/core/src/main/scala/spark/ui/SparkUI.scala
@@ -2,7 +2,7 @@ package spark.ui
import javax.servlet.http.HttpServletRequest
-import org.eclipse.jetty.server.Handler
+import org.eclipse.jetty.server.{Handler, Server}
import spark.{Logging, SparkContext, Utils}
import spark.ui.storage.BlockManagerUI
@@ -17,6 +17,7 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
val host = Utils.localHostName()
val port = Option(System.getProperty("spark.ui.port")).getOrElse(SparkUI.DEFAULT_PORT).toInt
var boundPort: Option[Int] = None
+ var server: Option[Server] = None
val handlers = Seq[(String, Handler)](
("/static", createStaticHandler(SparkUI.STATIC_RESOURCE_DIR)),
@@ -26,11 +27,12 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
val jobs = new JobProgressUI(sc)
val allHandlers = storage.getHandlers ++ jobs.getHandlers ++ handlers
+ /** Bind the HTTP server which backs this web interface */
def bind() {
- /** Start an HTTP server to run the Web interface */
try {
- val (server, usedPort) = JettyUtils.startJettyServer("0.0.0.0", port, allHandlers)
+ val (srv, usedPort) = JettyUtils.startJettyServer("0.0.0.0", port, allHandlers)
logInfo("Started Spark Web UI at http://%s:%d".format(host, usedPort))
+ server = Some(srv)
boundPort = Some(usedPort)
} catch {
case e: Exception =>
@@ -38,6 +40,7 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
System.exit(1)
}
}
+
/** Initialize all components of the server */
def start() {
// NOTE: This is decoupled from bind() because of the following dependency cycle:
@@ -47,6 +50,10 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
jobs.start()
}
+ def stop() {
+ server.foreach(_.stop())
+ }
+
private[spark] def appUIAddress = "http://" + host + ":" + boundPort.getOrElse("-1")
}
diff --git a/core/src/test/resources/log4j.properties b/core/src/test/resources/log4j.properties
index 6ec89c0184..d05cf3dec1 100644
--- a/core/src/test/resources/log4j.properties
+++ b/core/src/test/resources/log4j.properties
@@ -8,3 +8,4 @@ log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %p %c{1}:
# Ignore messages below warning level from Jetty, because it's a bit verbose
log4j.logger.org.eclipse.jetty=WARN
+org.eclipse.jetty.LEVEL=WARN
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala
index 0866fb47b3..0024ede828 100644
--- a/core/src/test/scala/spark/DistributedSuite.scala
+++ b/core/src/test/scala/spark/DistributedSuite.scala
@@ -10,6 +10,7 @@ import org.scalatest.time.{Span, Millis}
import org.scalacheck.Arbitrary._
import org.scalacheck.Gen
import org.scalacheck.Prop._
+import org.eclipse.jetty.server.{Server, Request, Handler}
import com.google.common.io.Files
@@ -17,6 +18,7 @@ import scala.collection.mutable.ArrayBuffer
import SparkContext._
import storage.{GetBlock, BlockManagerWorker, StorageLevel}
+import ui.JettyUtils
class NotSerializableClass
diff --git a/core/src/test/scala/spark/ui/UISuite.scala b/core/src/test/scala/spark/ui/UISuite.scala
new file mode 100644
index 0000000000..6766b158f6
--- /dev/null
+++ b/core/src/test/scala/spark/ui/UISuite.scala
@@ -0,0 +1,15 @@
+package spark.ui
+
+import org.scalatest.FunSuite
+import org.eclipse.jetty.server.Server
+
+class UISuite extends FunSuite {
+ test("jetty port increases under contention") {
+ val startPort = 33333
+ val server = new Server(startPort)
+ server.start()
+ val (jettyServer, boundPort) = JettyUtils.startJettyServer("localhost", startPort, Seq())
+ assert(boundPort === startPort + 1)
+ }
+
+}