diff options
Diffstat (limited to 'core/src/main/scala')
42 files changed, 1551 insertions, 395 deletions
diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index fe812fe530..0095b868a8 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -22,6 +22,7 @@ import org.apache.hadoop.mapred.OutputFormat import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat} import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat, RecordWriter => NewRecordWriter, Job => NewAPIHadoopJob, HadoopMapReduceUtil} +import org.apache.hadoop.security.UserGroupInformation import spark.partial.BoundedDouble import spark.partial.PartialResult diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index e88290fdb2..8dde5391d3 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -108,6 +108,9 @@ abstract class RDD[T: ClassTag]( // Methods and fields available on all RDDs // ======================================================================= + /** The SparkContext that created this RDD. */ + def sparkContext: SparkContext = sc + /** A unique ID for this RDD (within its SparkContext). */ val id: Int = sc.newRddId() @@ -284,31 +287,35 @@ abstract class RDD[T: ClassTag]( def takeSample(withReplacement: Boolean, num: Int, seed: Int): Array[T] = { var fraction = 0.0 var total = 0 - val multiplier = 3.0 - val initialCount = count() + var multiplier = 3.0 + var initialCount = this.count() var maxSelected = 0 + if (num < 0) { + throw new IllegalArgumentException("Negative number of elements requested") + } + if (initialCount > Integer.MAX_VALUE - 1) { maxSelected = Integer.MAX_VALUE - 1 } else { maxSelected = initialCount.toInt } - if (num > initialCount) { + if (num > initialCount && !withReplacement) { total = maxSelected - fraction = math.min(multiplier * (maxSelected + 1) / initialCount, 1.0) - } else if (num < 0) { - throw(new IllegalArgumentException("Negative number of elements requested")) + fraction = multiplier * (maxSelected + 1) / initialCount } else { - fraction = math.min(multiplier * (num + 1) / initialCount, 1.0) + fraction = multiplier * (num + 1) / initialCount total = num } val rand = new Random(seed) - var samples = this.sample(withReplacement, fraction, rand.nextInt).collect() + var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect() + // If the first sample didn't turn out large enough, keep trying to take samples; + // this shouldn't happen often because we use a big multiplier for thei initial size while (samples.length < total) { - samples = this.sample(withReplacement, fraction, rand.nextInt).collect() + samples = this.sample(withReplacement, fraction, rand.nextInt()).collect() } Utils.randomizeInPlace(samples, rand).take(total) diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index ef6de87193..01d9a70a78 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -39,6 +39,7 @@ import org.apache.hadoop.mapred.TextInputFormat import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} import org.apache.hadoop.mapreduce.{Job => NewHadoopJob} import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} +import org.apache.hadoop.security.UserGroupInformation import org.apache.mesos.MesosNativeLibrary @@ -49,8 +50,9 @@ import spark.scheduler.{DAGScheduler, ResultTask, ShuffleMapTask, SparkListener, import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler} import spark.scheduler.local.LocalScheduler import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} -import spark.storage.{BlockManagerUI, StorageStatus, StorageUtils, RDDInfo} +import spark.storage.{StorageStatus, StorageUtils, RDDInfo} import spark.util.{MetadataCleaner, TimeStampedHashMap} +import ui.{SparkUI} /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark @@ -96,11 +98,6 @@ class SparkContext( isLocal) SparkEnv.set(env) - // Start the BlockManager UI - private[spark] val ui = new BlockManagerUI( - env.actorSystem, env.blockManager.master.driverActor, this) - ui.start() - // Used to store a URL for each static file/jar together with the file's local timestamp private[spark] val addedFiles = HashMap[String, Long]() private[spark] val addedJars = HashMap[String, Long]() @@ -109,6 +106,9 @@ class SparkContext( private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]] private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup) + // Initalize the Spark UI + private[spark] val ui = new SparkUI(this) + ui.bind() // Add each JAR given through the constructor if (jars != null) { @@ -217,6 +217,8 @@ class SparkContext( @volatile private var dagScheduler = new DAGScheduler(taskScheduler) dagScheduler.start() + ui.start() + /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */ val hadoopConfiguration = { val conf = SparkHadoopUtil.newConfiguration() @@ -579,6 +581,7 @@ class SparkContext( /** Shut down the SparkContext. */ def stop() { + ui.stop() // Do this only if not stopped already - best case effort. // prevent NPE if stopped more than once. val dagSchedulerCopy = dagScheduler diff --git a/core/src/main/scala/spark/TaskEndReason.scala b/core/src/main/scala/spark/TaskEndReason.scala index 8140cba084..bb75ec208c 100644 --- a/core/src/main/scala/spark/TaskEndReason.scala +++ b/core/src/main/scala/spark/TaskEndReason.scala @@ -1,5 +1,6 @@ package spark +import spark.executor.TaskMetrics import spark.storage.BlockManagerId /** @@ -24,7 +25,8 @@ private[spark] case class FetchFailed( private[spark] case class ExceptionFailure( className: String, description: String, - stackTrace: Array[StackTraceElement]) + stackTrace: Array[StackTraceElement], + metrics: Option[TaskMetrics]) extends TaskEndReason private[spark] case class OtherFailure(message: String) extends TaskEndReason diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index e02507f83e..f90b2ccaa1 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -490,6 +490,26 @@ private object Utils extends Logging { } /** + * Returns a human-readable string representing a duration such as "35ms" + */ + def msDurationToString(ms: Long): String = { + val second = 1000 + val minute = 60 * second + val hour = 60 * minute + + ms match { + case t if t < second => + "%d ms".format(t) + case t if t < minute => + "%.1f s".format(t.toFloat / second) + case t if t < hour => + "%.1f m".format(t.toFloat / minute) + case t => + "%.2f h".format(t.toFloat / hour) + } + } + + /** * Convert a memory quantity in megabytes to a human-readable string such as "4.0 MB". */ def memoryMegabytesToString(megabytes: Long): String = { @@ -604,18 +624,19 @@ private object Utils extends Logging { "%s at %s:%s".format(callSiteInfo.lastSparkMethod, callSiteInfo.firstUserFile, callSiteInfo.firstUserLine) } - /** - * Try to find a free port to bind to on the local host. This should ideally never be needed, - * except that, unfortunately, some of the networking libraries we currently rely on (e.g. Spray) - * don't let users bind to port 0 and then figure out which free port they actually bound to. - * We work around this by binding a ServerSocket and immediately unbinding it. This is *not* - * necessarily guaranteed to work, but it's the best we can do. - */ - def findFreePort(): Int = { - val socket = new ServerSocket(0) - val portBound = socket.getLocalPort - socket.close() - portBound + + /** Return a string containing the last `n` bytes of a file. */ + def lastNBytes(path: String, n: Int): String = { + val file = new File(path) + val length = file.length() + val buff = new Array[Byte](math.min(n, length.toInt)) + val skip = math.max(0, length - n) + val stream = new FileInputStream(file) + + stream.skip(skip) + stream.read(buff) + stream.close() + Source.fromBytes(buff).mkString } /** diff --git a/core/src/main/scala/spark/deploy/JsonProtocol.scala b/core/src/main/scala/spark/deploy/JsonProtocol.scala index 88b03a007c..607d34d111 100644 --- a/core/src/main/scala/spark/deploy/JsonProtocol.scala +++ b/core/src/main/scala/spark/deploy/JsonProtocol.scala @@ -1,79 +1,65 @@ package spark.deploy import master.{ApplicationInfo, WorkerInfo} +import net.liftweb.json.JsonDSL._ import worker.ExecutorRunner -import spray.json._ -/** - * spray-json helper class containing implicit conversion to json for marshalling responses - */ -private[spark] object JsonProtocol extends DefaultJsonProtocol { - implicit object WorkerInfoJsonFormat extends RootJsonWriter[WorkerInfo] { - def write(obj: WorkerInfo) = JsObject( - "id" -> JsString(obj.id), - "host" -> JsString(obj.host), - "port" -> JsNumber(obj.port), - "webuiaddress" -> JsString(obj.webUiAddress), - "cores" -> JsNumber(obj.cores), - "coresused" -> JsNumber(obj.coresUsed), - "memory" -> JsNumber(obj.memory), - "memoryused" -> JsNumber(obj.memoryUsed) - ) - } +private[spark] object JsonProtocol { + def writeWorkerInfo(obj: WorkerInfo) = { + ("id" -> obj.id) ~ + ("host" -> obj.host) ~ + ("port" -> obj.port) ~ + ("webuiaddress" -> obj.webUiAddress) ~ + ("cores" -> obj.cores) ~ + ("coresused" -> obj.coresUsed) ~ + ("memory" -> obj.memory) ~ + ("memoryused" -> obj.memoryUsed) + } - implicit object AppInfoJsonFormat extends RootJsonWriter[ApplicationInfo] { - def write(obj: ApplicationInfo) = JsObject( - "starttime" -> JsNumber(obj.startTime), - "id" -> JsString(obj.id), - "name" -> JsString(obj.desc.name), - "cores" -> JsNumber(obj.desc.maxCores), - "user" -> JsString(obj.desc.user), - "memoryperslave" -> JsNumber(obj.desc.memoryPerSlave), - "submitdate" -> JsString(obj.submitDate.toString)) + def writeApplicationInfo(obj: ApplicationInfo) = { + ("starttime" -> obj.startTime) ~ + ("id" -> obj.id) ~ + ("name" -> obj.desc.name) ~ + ("cores" -> obj.desc.maxCores) ~ + ("user" -> obj.desc.user) ~ + ("memoryperslave" -> obj.desc.memoryPerSlave) ~ + ("submitdate" -> obj.submitDate.toString) } - implicit object AppDescriptionJsonFormat extends RootJsonWriter[ApplicationDescription] { - def write(obj: ApplicationDescription) = JsObject( - "name" -> JsString(obj.name), - "cores" -> JsNumber(obj.maxCores), - "memoryperslave" -> JsNumber(obj.memoryPerSlave), - "user" -> JsString(obj.user) - ) + def writeApplicationDescription(obj: ApplicationDescription) = { + ("name" -> obj.name) ~ + ("cores" -> obj.maxCores) ~ + ("memoryperslave" -> obj.memoryPerSlave) ~ + ("user" -> obj.user) } - implicit object ExecutorRunnerJsonFormat extends RootJsonWriter[ExecutorRunner] { - def write(obj: ExecutorRunner) = JsObject( - "id" -> JsNumber(obj.execId), - "memory" -> JsNumber(obj.memory), - "appid" -> JsString(obj.appId), - "appdesc" -> obj.appDesc.toJson.asJsObject - ) + def writeExecutorRunner(obj: ExecutorRunner) = { + ("id" -> obj.execId) ~ + ("memory" -> obj.memory) ~ + ("appid" -> obj.appId) ~ + ("appdesc" -> writeApplicationDescription(obj.appDesc)) } - implicit object MasterStateJsonFormat extends RootJsonWriter[MasterState] { - def write(obj: MasterState) = JsObject( - "url" -> JsString("spark://" + obj.uri), - "workers" -> JsArray(obj.workers.toList.map(_.toJson)), - "cores" -> JsNumber(obj.workers.map(_.cores).sum), - "coresused" -> JsNumber(obj.workers.map(_.coresUsed).sum), - "memory" -> JsNumber(obj.workers.map(_.memory).sum), - "memoryused" -> JsNumber(obj.workers.map(_.memoryUsed).sum), - "activeapps" -> JsArray(obj.activeApps.toList.map(_.toJson)), - "completedapps" -> JsArray(obj.completedApps.toList.map(_.toJson)) - ) + def writeMasterState(obj: MasterState) = { + ("url" -> ("spark://" + obj.uri)) ~ + ("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~ + ("cores" -> obj.workers.map(_.cores).sum) ~ + ("coresused" -> obj.workers.map(_.coresUsed).sum) ~ + ("memory" -> obj.workers.map(_.memory).sum) ~ + ("memoryused" -> obj.workers.map(_.memoryUsed).sum) ~ + ("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~ + ("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo)) } - implicit object WorkerStateJsonFormat extends RootJsonWriter[WorkerState] { - def write(obj: WorkerState) = JsObject( - "id" -> JsString(obj.workerId), - "masterurl" -> JsString(obj.masterUrl), - "masterwebuiurl" -> JsString(obj.masterWebUiUrl), - "cores" -> JsNumber(obj.cores), - "coresused" -> JsNumber(obj.coresUsed), - "memory" -> JsNumber(obj.memory), - "memoryused" -> JsNumber(obj.memoryUsed), - "executors" -> JsArray(obj.executors.toList.map(_.toJson)), - "finishedexecutors" -> JsArray(obj.finishedExecutors.toList.map(_.toJson)) - ) + def writeWorkerState(obj: WorkerState) = { + ("id" -> obj.workerId) ~ + ("masterurl" -> obj.masterUrl) ~ + ("masterwebuiurl" -> obj.masterWebUiUrl) ~ + ("cores" -> obj.cores) ~ + ("coresused" -> obj.coresUsed) ~ + ("memory" -> obj.memory) ~ + ("memoryused" -> obj.memoryUsed) ~ + ("executors" -> obj.executors.toList.map(writeExecutorRunner)) ~ + ("finishedexecutors" -> obj.finishedExecutors.toList.map(writeExecutorRunner)) } } diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala index 2b0b3b10e7..494b1cc6f0 100644 --- a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala @@ -17,11 +17,11 @@ import scala.collection.mutable.ArrayBuffer */ private[spark] class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: Int) extends Logging { - + private val localHostname = Utils.localHostName() private val masterActorSystems = ArrayBuffer[ActorSystem]() private val workerActorSystems = ArrayBuffer[ActorSystem]() - + def start(): String = { logInfo("Starting a local Spark cluster with " + numWorkers + " workers.") diff --git a/core/src/main/scala/spark/deploy/WebUI.scala b/core/src/main/scala/spark/deploy/WebUI.scala index ad1a1092b2..844c4142c7 100644 --- a/core/src/main/scala/spark/deploy/WebUI.scala +++ b/core/src/main/scala/spark/deploy/WebUI.scala @@ -6,7 +6,7 @@ import java.util.Date /** * Utilities used throughout the web UI. */ -private[spark] object WebUI { +private[spark] object DeployWebUI { val DATE_FORMAT = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") def formatDate(date: Date): String = DATE_FORMAT.format(date) diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index 770cfe9d05..d31e6735b7 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -13,6 +13,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import spark.deploy._ import spark.{Logging, SparkException, Utils} import spark.util.AkkaUtils +import ui.MasterWebUI private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging { @@ -35,6 +36,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act var firstApp: Option[ApplicationInfo] = None + val webUi = new MasterWebUI(self) + Utils.checkHost(host, "Expected hostname") val masterPublicAddress = { @@ -51,20 +54,13 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act logInfo("Starting Spark master at spark://" + host + ":" + port) // Listen for remote client disconnection events, since they don't go through Akka's watch() context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) - startWebUi() + webUi.start() import context.dispatcher context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers()) } - def startWebUi() { - val webUi = new MasterWebUI(self) - try { - AkkaUtils.startSprayServer(context.system, "0.0.0.0", webUiPort, webUi.handler) - } catch { - case e: Exception => - logError("Failed to create web UI", e) - System.exit(1) - } + override def postStop() { + webUi.stop() } override def receive = { @@ -76,7 +72,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } else { addWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress) context.watch(sender) // This doesn't work with remote actors but helps for testing - sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUiPort) + sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUi.boundPort) schedule() } } @@ -279,7 +275,9 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act exec.state = ExecutorState.KILLED } app.markFinished(state) - app.driver ! ApplicationRemoved(state.toString) + if (state != ApplicationState.FINISHED) { + app.driver ! ApplicationRemoved(state.toString) + } schedule() } } diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala deleted file mode 100644 index 34cee87853..0000000000 --- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala +++ /dev/null @@ -1,78 +0,0 @@ -package spark.deploy.master - -import akka.actor.{ActorRef, ActorContext, ActorRefFactory} -import scala.concurrent.Await -import akka.pattern.ask - -import akka.util.Timeout -import scala.concurrent.duration._ -import spray.routing.Directives -import spray.routing.directives._ -import spray.httpx.TwirlSupport._ -import spray.httpx.SprayJsonSupport._ -import spray.http.MediaTypes._ - -import spark.deploy._ -import spark.deploy.JsonProtocol._ - -/** - * Web UI server for the standalone master. - */ -private[spark] -class MasterWebUI(master: ActorRef)(implicit val context: ActorContext) extends Directives { - import context.dispatcher - - val actorSystem = context.system - val RESOURCE_DIR = "spark/deploy/master/webui" - val STATIC_RESOURCE_DIR = "spark/deploy/static" - - implicit val timeout = Timeout(Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")) - - val handler = { - get { - (path("") & parameters('format ?)) { - case Some(js) if js.equalsIgnoreCase("json") => - val future = (master ? RequestMasterState).mapTo[MasterState] - respondWithMediaType(`application/json`) { ctx => - ctx.complete(future.mapTo[MasterState]) - } - case _ => - complete { - val future = (master ? RequestMasterState).mapTo[MasterState] - future.map { - masterState => spark.deploy.master.html.index.render(masterState) - } - } - } ~ - path("app") { - parameters("appId", 'format ?) { - case (appId, Some(js)) if (js.equalsIgnoreCase("json")) => - val future = master ? RequestMasterState - val appInfo = for (masterState <- future.mapTo[MasterState]) yield { - masterState.activeApps.find(_.id == appId).getOrElse({ - masterState.completedApps.find(_.id == appId).getOrElse(null) - }) - } - respondWithMediaType(`application/json`) { ctx => - ctx.complete(appInfo.mapTo[ApplicationInfo]) - } - case (appId, _) => - complete { - val future = master ? RequestMasterState - future.map { state => - val masterState = state.asInstanceOf[MasterState] - val app = masterState.activeApps.find(_.id == appId).getOrElse({ - masterState.completedApps.find(_.id == appId).getOrElse(null) - }) - spark.deploy.master.html.app_details.render(app) - } - } - } - } ~ - pathPrefix("static") { - getFromResourceDirectory(STATIC_RESOURCE_DIR) - } ~ - getFromResourceDirectory(RESOURCE_DIR) - } - } -} diff --git a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala new file mode 100644 index 0000000000..939f8c9587 --- /dev/null +++ b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala @@ -0,0 +1,101 @@ +package spark.deploy.master.ui + +import akka.pattern.ask + +import scala.concurrent.Await +import scala.concurrent.duration._ + +import javax.servlet.http.HttpServletRequest + +import net.liftweb.json.JsonAST.JValue + +import scala.xml.Node + +import spark.deploy.{RequestMasterState, JsonProtocol, MasterState} +import spark.deploy.master.ExecutorInfo +import spark.ui.UIUtils + +private[spark] class ApplicationPage(parent: MasterWebUI) { + val master = parent.master + implicit val timeout = parent.timeout + + /** Executor details for a particular application */ + def renderJson(request: HttpServletRequest): JValue = { + val appId = request.getParameter("appId") + val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterState] + val state = Await.result(stateFuture, 30 seconds) + val app = state.activeApps.find(_.id == appId).getOrElse({ + state.completedApps.find(_.id == appId).getOrElse(null) + }) + JsonProtocol.writeApplicationInfo(app) + } + + /** Executor details for a particular application */ + def render(request: HttpServletRequest): Seq[Node] = { + val appId = request.getParameter("appId") + val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterState] + val state = Await.result(stateFuture, 30 seconds) + val app = state.activeApps.find(_.id == appId).getOrElse({ + state.completedApps.find(_.id == appId).getOrElse(null) + }) + + val executorHeaders = Seq("ExecutorID", "Worker", "Cores", "Memory", "State", "Logs") + val executors = app.executors.values.toSeq + val executorTable = UIUtils.listingTable(executorHeaders, executorRow, executors) + + val content = + <hr /> + <div class="row"> + <div class="span12"> + <ul class="unstyled"> + <li><strong>ID:</strong> {app.id}</li> + <li><strong>Description:</strong> {app.desc.name}</li> + <li><strong>User:</strong> {app.desc.user}</li> + <li><strong>Cores:</strong> + { + if (app.desc.maxCores == Integer.MAX_VALUE) { + "Unlimited %s granted".format(app.coresGranted) + } else { + "%s (%s granted, %s left)".format( + app.desc.maxCores, app.coresGranted, app.coresLeft) + } + } + </li> + <li><strong>Memory per Slave:</strong> {app.desc.memoryPerSlave}</li> + <li><strong>Submit Date:</strong> {app.submitDate}</li> + <li><strong>State:</strong> {app.state}</li> + <li><strong><a href={app.appUiUrl}>Application Detail UI</a></strong></li> + </ul> + </div> + </div> + + <hr/> + + <div class="row"> <!-- Executors --> + <div class="span12"> + <h3> Executor Summary </h3> + <br/> + {executorTable} + </div> + </div>; + UIUtils.basicSparkPage(content, "Application Info: " + app.desc.name) + } + + def executorRow(executor: ExecutorInfo): Seq[Node] = { + <tr> + <td>{executor.id}</td> + <td> + <a href={executor.worker.webUiAddress}>{executor.worker.id}</a> + </td> + <td>{executor.cores}</td> + <td>{executor.memory}</td> + <td>{executor.state}</td> + <td> + <a href={"%s/log?appId=%s&executorId=%s&logType=stdout" + .format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stdout</a> + <a href={"%s/log?appId=%s&executorId=%s&logType=stderr" + .format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stderr</a> + </td> + </tr> + } +} diff --git a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala new file mode 100644 index 0000000000..73e378f988 --- /dev/null +++ b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala @@ -0,0 +1,119 @@ +package spark.deploy.master.ui + +import scala.concurrent.Await +import akka.pattern.ask +import scala.concurrent.duration._ + +import javax.servlet.http.HttpServletRequest + +import scala.xml.Node + +import spark.deploy.{RequestMasterState, DeployWebUI, MasterState} +import spark.Utils +import spark.ui.UIUtils +import spark.deploy.master.{ApplicationInfo, WorkerInfo} + +private[spark] class IndexPage(parent: MasterWebUI) { + val master = parent.master + implicit val timeout = parent.timeout + + /** Index view listing applications and executors */ + def render(request: HttpServletRequest): Seq[Node] = { + val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterState] + val state = Await.result(stateFuture, 30 seconds) + + val workerHeaders = Seq("Id", "Address", "State", "Cores", "Memory") + val workers = state.workers.sortBy(_.id) + val workerTable = UIUtils.listingTable(workerHeaders, workerRow, workers) + + val appHeaders = Seq("ID", "Description", "Cores", "Memory per Node", "Submit Time", "User", + "State", "Duration") + val activeApps = state.activeApps.sortBy(_.startTime).reverse + val activeAppsTable = UIUtils.listingTable(appHeaders, appRow, activeApps) + val completedApps = state.completedApps.sortBy(_.endTime).reverse + val completedAppsTable = UIUtils.listingTable(appHeaders, appRow, completedApps) + + val content = + <hr /> + <div class="row"> + <div class="span12"> + <ul class="unstyled"> + <li><strong>URL:</strong>{state.uri}</li> + <li><strong>Workers:</strong>{state.workers.size}</li> + <li><strong>Cores:</strong> {state.workers.map(_.cores).sum} Total, + {state.workers.map(_.coresUsed).sum} Used</li> + <li><strong>Memory:</strong> + {Utils.memoryMegabytesToString(state.workers.map(_.memory).sum)} Total, + {Utils.memoryMegabytesToString(state.workers.map(_.memoryUsed).sum)} Used</li> + <li><strong>Applications:</strong> + {state.activeApps.size} Running, + {state.completedApps.size} Completed </li> + </ul> + </div> + </div> + + <div class="row"> + <div class="span12"> + <h3> Workers </h3> + <br/> + {workerTable} + </div> + </div> + + <hr/> + + <div class="row"> + <div class="span12"> + <h3> Running Applications </h3> + <br/> + {activeAppsTable} + </div> + </div> + + <hr/> + + <div class="row"> + <div class="span12"> + <h3> Completed Applications </h3> + <br/> + {completedAppsTable} + </div> + </div>; + UIUtils.basicSparkPage(content, "Spark Master: " + state.uri) + } + + def workerRow(worker: WorkerInfo): Seq[Node] = { + <tr> + <td> + <a href={worker.webUiAddress}>{worker.id}</a> + </td> + <td>{worker.host}:{worker.port}</td> + <td>{worker.state}</td> + <td>{worker.cores} ({worker.coresUsed} Used)</td> + <td sorttable_customkey={"%s.%s".format(worker.memory, worker.memoryUsed)}> + {Utils.memoryMegabytesToString(worker.memory)} + ({Utils.memoryMegabytesToString(worker.memoryUsed)} Used) + </td> + </tr> + } + + + def appRow(app: ApplicationInfo): Seq[Node] = { + <tr> + <td> + <a href={"app?appId=" + app.id}>{app.id}</a> + </td> + <td>{app.desc.name}</td> + <td> + {app.coresGranted} + </td> + <td sorttable_customkey={app.desc.memoryPerSlave.toString}> + {Utils.memoryMegabytesToString(app.desc.memoryPerSlave)} + </td> + <td>{DeployWebUI.formatDate(app.submitDate)}</td> + <td>{app.desc.user}</td> + <td>{app.state.toString}</td> + <td>{DeployWebUI.formatDuration(app.duration)}</td> + </tr> + } +} diff --git a/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala new file mode 100644 index 0000000000..bcc4d49234 --- /dev/null +++ b/core/src/main/scala/spark/deploy/master/ui/MasterWebUI.scala @@ -0,0 +1,59 @@ +package spark.deploy.master.ui + +import akka.actor.ActorRef +import scala.concurrent.duration._ + +import javax.servlet.http.HttpServletRequest + +import org.eclipse.jetty.server.{Handler, Server} + +import spark.{Logging, Utils} +import spark.ui.JettyUtils +import spark.ui.JettyUtils._ + +/** + * Web UI server for the standalone master. + */ +private[spark] +class MasterWebUI(val master: ActorRef, requestedPort: Option[Int] = None) extends Logging { + implicit val timeout = Duration.create( + System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") + val host = Utils.localHostName() + val port = requestedPort.getOrElse( + System.getProperty("master.ui.port", MasterWebUI.DEFAULT_PORT).toInt) + + var server: Option[Server] = None + var boundPort: Option[Int] = None + + val applicationPage = new ApplicationPage(this) + val indexPage = new IndexPage(this) + + def start() { + try { + val (srv, bPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers) + server = Some(srv) + boundPort = Some(bPort) + logInfo("Started Master web UI at http://%s:%d".format(host, boundPort.get)) + } catch { + case e: Exception => + logError("Failed to create Master JettyUtils", e) + System.exit(1) + } + } + + val handlers = Array[(String, Handler)]( + ("/static", createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR)), + ("/app/json", (request: HttpServletRequest) => applicationPage.renderJson(request)), + ("/app", (request: HttpServletRequest) => applicationPage.render(request)), + ("*", (request: HttpServletRequest) => indexPage.render(request)) + ) + + def stop() { + server.foreach(_.stop()) + } +} + +private[spark] object MasterWebUI { + val STATIC_RESOURCE_DIR = "spark/ui/static" + val DEFAULT_PORT = "8080" +} diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala index b5dfd16e67..7c1871e047 100644 --- a/core/src/main/scala/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/spark/deploy/worker/Worker.scala @@ -14,7 +14,7 @@ import spark.deploy.LaunchExecutor import spark.deploy.RegisterWorkerFailed import spark.deploy.master.Master import java.io.File - +import ui.WorkerWebUI private[spark] class Worker( host: String, @@ -45,6 +45,7 @@ private[spark] class Worker( val envVar = System.getenv("SPARK_PUBLIC_DNS") if (envVar != null) envVar else host } + var webUi: WorkerWebUI = null var coresUsed = 0 var memoryUsed = 0 @@ -76,35 +77,26 @@ private[spark] class Worker( sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse(".")) logInfo("Spark home: " + sparkHome) createWorkDir() + webUi = new WorkerWebUI(self, workDir, Some(webUiPort)) + webUi.start() connectToMaster() - startWebUi() } def connectToMaster() { logInfo("Connecting to master " + masterUrl) master = context.actorFor(Master.toAkkaUrl(masterUrl)) - master ! RegisterWorker(workerId, host, port, cores, memory, webUiPort, publicAddress) + master ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort.get, publicAddress) context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) context.watch(master) // Doesn't work with remote actors, but useful for testing } - def startWebUi() { - val webUi = new WorkerWebUI(self, workDir) - try { - AkkaUtils.startSprayServer(context.system, "0.0.0.0", webUiPort, webUi.handler) - } catch { - case e: Exception => - logError("Failed to create web UI", e) - System.exit(1) - } - } + import context.dispatcher override def receive = { case RegisteredWorker(url) => masterWebUiUrl = url logInfo("Successfully registered with master") - import context.dispatcher - context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) { + context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) { master ! Heartbeat(workerId) } @@ -170,6 +162,7 @@ private[spark] class Worker( override def postStop() { executors.values.foreach(_.kill()) + webUi.stop() } } diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala deleted file mode 100644 index cc2ab6187a..0000000000 --- a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala +++ /dev/null @@ -1,61 +0,0 @@ -package spark.deploy.worker - -import akka.actor.{ActorRef, ActorContext} -import scala.concurrent.Await -import akka.pattern.ask - -import akka.util.Timeout -import scala.concurrent.duration._ -import spray.routing.Directives -import spray.httpx.TwirlSupport._ -import spray.httpx.SprayJsonSupport._ -import spray.http.MediaTypes._ - -import spark.deploy.{WorkerState, RequestWorkerState} -import spark.deploy.JsonProtocol._ -import java.io.File - -/** - * Web UI server for the standalone worker. - */ -private[spark] -class WorkerWebUI(worker: ActorRef, workDir: File)(implicit val context: ActorContext) extends Directives { - import context.dispatcher - - val actorSystem = context.system - val RESOURCE_DIR = "spark/deploy/worker/webui" - val STATIC_RESOURCE_DIR = "spark/deploy/static" - - implicit val timeout = Timeout(Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")) - - val handler = { - get { - (path("") & parameters('format ?)) { - case Some(js) if js.equalsIgnoreCase("json") => { - val future = (worker ? RequestWorkerState).mapTo[WorkerState] - respondWithMediaType(`application/json`) { ctx => - ctx.complete(future) - } - } - case _ => - complete { - val future = (worker ? RequestWorkerState).mapTo[WorkerState] - future.map { workerState => - spark.deploy.worker.html.index(workerState) - } - } - } ~ - path("log") { - parameters("appId", "executorId", "logType") { (appId, executorId, logType) => - respondWithMediaType(`text/plain`) { - getFromFile(workDir.getPath() + "/" + appId + "/" + executorId + "/" + logType) - } - } - } ~ - pathPrefix("static") { - getFromResourceDirectory(STATIC_RESOURCE_DIR) - } ~ - getFromResourceDirectory(RESOURCE_DIR) - } - } -} diff --git a/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala new file mode 100644 index 0000000000..8cb74632aa --- /dev/null +++ b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala @@ -0,0 +1,100 @@ +package spark.deploy.worker.ui + +import scala.concurrent.duration._ +import scala.concurrent.Await + +import akka.pattern.ask + +import javax.servlet.http.HttpServletRequest + +import net.liftweb.json.JsonAST.JValue + +import scala.xml.Node + +import spark.deploy.{RequestWorkerState, JsonProtocol, WorkerState} +import spark.deploy.worker.ExecutorRunner +import spark.Utils +import spark.ui.UIUtils + +private[spark] class IndexPage(parent: WorkerWebUI) { + val worker = parent.worker + val timeout = parent.timeout + + def renderJson(request: HttpServletRequest): JValue = { + val stateFuture = (worker ? RequestWorkerState)(timeout).mapTo[WorkerState] + val workerState = Await.result(stateFuture, 30 seconds) + JsonProtocol.writeWorkerState(workerState) + } + + def render(request: HttpServletRequest): Seq[Node] = { + val stateFuture = (worker ? RequestWorkerState)(timeout).mapTo[WorkerState] + val workerState = Await.result(stateFuture, 30 seconds) + + val executorHeaders = Seq("ExecutorID", "Cores", "Memory", "Job Details", "Logs") + val runningExecutorTable = + UIUtils.listingTable(executorHeaders, executorRow, workerState.executors) + val finishedExecutorTable = + UIUtils.listingTable(executorHeaders, executorRow, workerState.finishedExecutors) + + val content = + <hr /> + <div class="row"> <!-- Worker Details --> + <div class="span12"> + <ul class="unstyled"> + <li><strong>ID:</strong> {workerState.workerId}</li> + <li><strong> + Master URL:</strong> {workerState.masterUrl} + </li> + <li><strong>Cores:</strong> {workerState.cores} ({workerState.coresUsed} Used)</li> + <li><strong>Memory:</strong> {Utils.memoryMegabytesToString(workerState.memory)} + ({Utils.memoryMegabytesToString(workerState.memoryUsed)} Used)</li> + </ul> + <p><a href={workerState.masterWebUiUrl}>Back to Master</a></p> + </div> + </div> + <hr/> + + <div class="row"> <!-- Running Executors --> + <div class="span12"> + <h3> Running Executors {workerState.executors.size} </h3> + <br/> + {runningExecutorTable} + </div> + </div> + <hr/> + + <div class="row"> <!-- Finished Executors --> + <div class="span12"> + <h3> Finished Executors </h3> + <br/> + {finishedExecutorTable} + </div> + </div>; + + UIUtils.basicSparkPage(content, "Spark Worker on %s:%s".format(workerState.host, workerState.port)) + } + + def executorRow(executor: ExecutorRunner): Seq[Node] = { + <tr> + <td>{executor.execId}</td> + <td>{executor.cores}</td> + <td sorttable_customkey={executor.memory.toString}> + {Utils.memoryMegabytesToString(executor.memory)} + </td> + <td> + <ul class="unstyled"> + <li><strong>ID:</strong> {executor.appId}</li> + <li><strong>Name:</strong> {executor.appDesc.name}</li> + <li><strong>User:</strong> {executor.appDesc.user}</li> + </ul> + </td> + <td> + <a href={"log?appId=%s&executorId=%s&logType=stdout" + .format(executor.appId, executor.execId)}>stdout</a> + <a href={"log?appId=%s&executorId=%s&logType=stderr" + .format(executor.appId, executor.execId)}>stderr</a> + </td> + </tr> + } + +} diff --git a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala new file mode 100644 index 0000000000..b1336dd1af --- /dev/null +++ b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala @@ -0,0 +1,78 @@ +package spark.deploy.worker.ui + +import akka.actor.ActorRef +import akka.util.Timeout + +import scala.concurrent.duration._ + +import java.io.{FileInputStream, File} + +import javax.servlet.http.HttpServletRequest + +import org.eclipse.jetty.server.{Handler, Server} + +import spark.{Utils, Logging} +import spark.ui.JettyUtils +import spark.ui.JettyUtils._ + +/** + * Web UI server for the standalone worker. + */ +private[spark] +class WorkerWebUI(val worker: ActorRef, val workDir: File, requestedPort: Option[Int] = None) + extends Logging { + implicit val timeout = Timeout( + Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")) + val host = Utils.localHostName() + val port = requestedPort.getOrElse( + System.getProperty("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt) + + var server: Option[Server] = None + var boundPort: Option[Int] = None + + val indexPage = new IndexPage(this) + + val handlers = Array[(String, Handler)]( + ("/static", createStaticHandler(WorkerWebUI.STATIC_RESOURCE_DIR)), + ("/log", (request: HttpServletRequest) => log(request)), + ("/json", (request: HttpServletRequest) => indexPage.renderJson(request)), + ("*", (request: HttpServletRequest) => indexPage.render(request)) + ) + + def start() { + try { + val (srv, bPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers) + server = Some(srv) + boundPort = Some(bPort) + logInfo("Started Worker web UI at http://%s:%d".format(host, bPort)) + } catch { + case e: Exception => + logError("Failed to create Worker JettyUtils", e) + System.exit(1) + } + } + + def log(request: HttpServletRequest): String = { + val appId = request.getParameter("appId") + val executorId = request.getParameter("executorId") + val logType = request.getParameter("logType") + + val maxBytes = 1024 * 1024 // Guard against OOM + val defaultBytes = 100 * 1024 + val numBytes = Option(request.getParameter("numBytes")) + .flatMap(s => Some(s.toInt)).getOrElse(defaultBytes) + + val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType) + val pre = "==== Last %s bytes of %s/%s/%s ====\n".format(numBytes, appId, executorId, logType) + pre + Utils.lastNBytes(path, math.min(numBytes, maxBytes)) + } + + def stop() { + server.foreach(_.stop()) + } +} + +private[spark] object WorkerWebUI { + val STATIC_RESOURCE_DIR = "spark/ui/static" + val DEFAULT_PORT="8081" +} diff --git a/core/src/main/scala/spark/executor/Executor.scala b/core/src/main/scala/spark/executor/Executor.scala index 2bf55ea9a9..8360547a74 100644 --- a/core/src/main/scala/spark/executor/Executor.scala +++ b/core/src/main/scala/spark/executor/Executor.scala @@ -93,15 +93,18 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert val ser = SparkEnv.get.closureSerializer.newInstance() logInfo("Running task ID " + taskId) context.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER) + var attemptedTask: Option[Task[Any]] = None + var taskStart: Long = 0 try { SparkEnv.set(env) Accumulators.clear() val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask) updateDependencies(taskFiles, taskJars) val task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader) + attemptedTask = Some(task) logInfo("Its generation is " + task.generation) env.mapOutputTracker.updateGeneration(task.generation) - val taskStart = System.currentTimeMillis() + taskStart = System.currentTimeMillis() val value = task.run(taskId.toInt) val taskFinish = System.currentTimeMillis() task.metrics.foreach{ m => @@ -129,7 +132,10 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert } case t: Throwable => { - val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace) + val serviceTime = (System.currentTimeMillis() - taskStart).toInt + val metrics = attemptedTask.flatMap(t => t.metrics) + metrics.foreach{m => m.executorRunTime = serviceTime} + val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics) context.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) // TODO: Should we exit the whole executor here? On the one hand, the failed task may diff --git a/core/src/main/scala/spark/rdd/HadoopRDD.scala b/core/src/main/scala/spark/rdd/HadoopRDD.scala index cbf5512e24..07c103503c 100644 --- a/core/src/main/scala/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/spark/rdd/HadoopRDD.scala @@ -15,6 +15,7 @@ import org.apache.hadoop.mapred.RecordReader import org.apache.hadoop.mapred.Reporter import org.apache.hadoop.util.ReflectionUtils +import spark.deploy.SparkHadoopUtil import spark.{Dependency, Logging, Partition, RDD, SerializableWritable, SparkContext, TaskContext} import spark.util.NextIterator import org.apache.hadoop.conf.Configurable @@ -50,6 +51,7 @@ class HadoopRDD[K, V]( private val confBroadcast = sc.broadcast(new SerializableWritable(conf)) override def getPartitions: Array[Partition] = { + SparkHadoopUtil.addCredentials(conf); val inputFormat = createInputFormat(conf) if (inputFormat.isInstanceOf[Configurable]) { inputFormat.asInstanceOf[Configurable].setConf(conf) diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 1164c40c43..d4bda150cb 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -257,7 +257,7 @@ class DAGScheduler( eventQueue.put(toSubmit) waiter.awaitResult() match { case JobSucceeded => {} - case JobFailed(exception: Exception) => + case JobFailed(exception: Exception, _) => logInfo("Failed to run " + callSite) throw exception } @@ -313,7 +313,7 @@ class DAGScheduler( handleExecutorLost(execId) case completion: CompletionEvent => - sparkListeners.foreach(_.onTaskEnd(SparkListenerTaskEnd(completion.task, + sparkListeners.foreach(_.onTaskEnd(SparkListenerTaskEnd(completion.task, completion.reason, completion.taskInfo, completion.taskMetrics))) handleTaskCompletion(completion) @@ -325,7 +325,7 @@ class DAGScheduler( for (job <- activeJobs) { val error = new SparkException("Job cancelled because SparkContext was shut down") job.listener.jobFailed(error) - sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error)))) + sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error, None)))) } return true } @@ -504,6 +504,7 @@ class DAGScheduler( case _ => "Unkown" } logInfo("%s (%s) finished in %s s".format(stage, stage.origin, serviceTime)) + stage.completionTime = Some(System.currentTimeMillis) val stageComp = StageCompleted(stageToInfos(stage)) sparkListeners.foreach{_.onStageCompleted(stageComp)} running -= stage @@ -525,6 +526,7 @@ class DAGScheduler( job.numFinished += 1 // If the whole job has finished, remove it if (job.numFinished == job.numPartitions) { + idToActiveJob -= stage.priority activeJobs -= job resultStageToJob -= stage markStageAsFinished(stage) @@ -618,8 +620,11 @@ class DAGScheduler( handleExecutorLost(bmAddress.executorId, Some(task.generation)) } + case ExceptionFailure(className, description, stackTrace, metrics) => + // Do nothing here, left up to the TaskScheduler to decide how to handle user failures + case other => - // Non-fetch failure -- probably a bug in user code; abort all jobs depending on this stage + // Unrecognized failure - abort all jobs depending on this stage abortStage(idToStage(task.stageId), task + " failed: " + other) } } @@ -652,7 +657,7 @@ class DAGScheduler( "(generation " + currentGeneration + ")") } } - + private def handleExecutorGained(execId: String, hostPort: String) { // remove from failedGeneration(execId) ? if (failedGeneration.contains(execId)) { @@ -667,11 +672,13 @@ class DAGScheduler( */ private def abortStage(failedStage: Stage, reason: String) { val dependentStages = resultStageToJob.keys.filter(x => stageDependsOn(x, failedStage)).toSeq + failedStage.completionTime = Some(System.currentTimeMillis()) for (resultStage <- dependentStages) { val job = resultStageToJob(resultStage) val error = new SparkException("Job failed: " + reason) job.listener.jobFailed(error) - sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error)))) + sparkListeners.foreach(_.onJobEnd(SparkListenerJobEnd(job, JobFailed(error, Some(failedStage))))) + idToActiveJob -= resultStage.priority activeJobs -= job resultStageToJob -= resultStage } @@ -748,6 +755,10 @@ class DAGScheduler( sizeBefore = pendingTasks.size pendingTasks.clearOldValues(cleanupTime) logInfo("pendingTasks " + sizeBefore + " --> " + pendingTasks.size) + + sizeBefore = stageToInfos.size + stageToInfos.clearOldValues(cleanupTime) + logInfo("stageToInfos " + sizeBefore + " --> " + stageToInfos.size) } def stop() { diff --git a/core/src/main/scala/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/spark/scheduler/InputFormatInfo.scala index 287f731787..17d0ea4f80 100644 --- a/core/src/main/scala/spark/scheduler/InputFormatInfo.scala +++ b/core/src/main/scala/spark/scheduler/InputFormatInfo.scala @@ -3,11 +3,13 @@ package spark.scheduler import spark.Logging import scala.collection.immutable.Set import org.apache.hadoop.mapred.{FileInputFormat, JobConf} +import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.util.ReflectionUtils import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.conf.Configuration import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.collection.JavaConversions._ +import spark.deploy.SparkHadoopUtil /** @@ -70,6 +72,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl // This method does not expect failures, since validate has already passed ... private def prefLocsFromMapreduceInputFormat(): Set[SplitInfo] = { val conf = new JobConf(configuration) + SparkHadoopUtil.addCredentials(conf); FileInputFormat.setInputPaths(conf, path) val instance: org.apache.hadoop.mapreduce.InputFormat[_, _] = @@ -89,6 +92,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl // This method does not expect failures, since validate has already passed ... private def prefLocsFromMapredInputFormat(): Set[SplitInfo] = { val jobConf = new JobConf(configuration) + SparkHadoopUtil.addCredentials(jobConf); FileInputFormat.setInputPaths(jobConf, path) val instance: org.apache.hadoop.mapred.InputFormat[_, _] = diff --git a/core/src/main/scala/spark/scheduler/JobLogger.scala b/core/src/main/scala/spark/scheduler/JobLogger.scala index 178bfaba3d..6a9d52f356 100644 --- a/core/src/main/scala/spark/scheduler/JobLogger.scala +++ b/core/src/main/scala/spark/scheduler/JobLogger.scala @@ -275,7 +275,7 @@ class JobLogger(val logDirName: String) extends SparkListener with Logging { var info = "JOB_ID=" + job.runId
reason match {
case JobSucceeded => info += " STATUS=SUCCESS"
- case JobFailed(exception) =>
+ case JobFailed(exception, _) =>
info += " STATUS=FAILED REASON="
exception.getMessage.split("\\s+").foreach(info += _ + "_")
case _ =>
diff --git a/core/src/main/scala/spark/scheduler/JobResult.scala b/core/src/main/scala/spark/scheduler/JobResult.scala index 654131ee84..a0fdf391e6 100644 --- a/core/src/main/scala/spark/scheduler/JobResult.scala +++ b/core/src/main/scala/spark/scheduler/JobResult.scala @@ -6,4 +6,4 @@ package spark.scheduler private[spark] sealed trait JobResult private[spark] case object JobSucceeded extends JobResult -private[spark] case class JobFailed(exception: Exception) extends JobResult +private[spark] case class JobFailed(exception: Exception, failedStage: Option[Stage]) extends JobResult diff --git a/core/src/main/scala/spark/scheduler/JobWaiter.scala b/core/src/main/scala/spark/scheduler/JobWaiter.scala index 3cc6a86345..6ff2e29434 100644 --- a/core/src/main/scala/spark/scheduler/JobWaiter.scala +++ b/core/src/main/scala/spark/scheduler/JobWaiter.scala @@ -35,7 +35,7 @@ private[spark] class JobWaiter[T](totalTasks: Int, resultHandler: (Int, T) => Un throw new UnsupportedOperationException("jobFailed() called on a finished JobWaiter") } jobFinished = true - jobResult = JobFailed(exception) + jobResult = JobFailed(exception, None) this.notifyAll() } } diff --git a/core/src/main/scala/spark/scheduler/SparkListener.scala b/core/src/main/scala/spark/scheduler/SparkListener.scala index bac984b5c9..8de3aa91a4 100644 --- a/core/src/main/scala/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/spark/scheduler/SparkListener.scala @@ -68,6 +68,7 @@ class StatsReportListener extends SparkListener with Logging { showBytesDistribution("task result size:", (_, metric) => Some(metric.resultSize)) //runtime breakdown + val runtimePcts = stageCompleted.stageInfo.taskInfos.map{ case (info, metrics) => RuntimePercentage(info.duration, metrics) } diff --git a/core/src/main/scala/spark/scheduler/Stage.scala b/core/src/main/scala/spark/scheduler/Stage.scala index 7fc9e13fd9..539cf8233b 100644 --- a/core/src/main/scala/spark/scheduler/Stage.scala +++ b/core/src/main/scala/spark/scheduler/Stage.scala @@ -34,6 +34,7 @@ private[spark] class Stage( /** When first task was submitted to scheduler. */ var submissionTime: Option[Long] = None + var completionTime: Option[Long] = None private var nextAttemptId = 0 diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala index d72b0bfc9f..fe6420a522 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala @@ -571,6 +571,7 @@ private[spark] class ClusterTaskSetManager( return case ef: ExceptionFailure => + sched.listener.taskEnded(tasks(index), ef, null, null, info, ef.metrics.getOrElse(null)) val key = ef.description val now = System.currentTimeMillis val (printFull, dupCount) = { diff --git a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala index 93d4318b29..b000e328e6 100644 --- a/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala +++ b/core/src/main/scala/spark/scheduler/local/LocalScheduler.scala @@ -145,6 +145,9 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: // Set the Spark execution environment for the worker thread SparkEnv.set(env) val ser = SparkEnv.get.closureSerializer.newInstance() + var attemptedTask: Option[Task[_]] = None + val start = System.currentTimeMillis() + var taskStart: Long = 0 try { Accumulators.clear() Thread.currentThread().setContextClassLoader(classLoader) @@ -153,10 +156,11 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: // this adds a bit of unnecessary overhead but matches how the Mesos Executor works. val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(bytes) updateDependencies(taskFiles, taskJars) // Download any files added with addFile - val deserStart = System.currentTimeMillis() val deserializedTask = ser.deserialize[Task[_]]( taskBytes, Thread.currentThread.getContextClassLoader) - val deserTime = System.currentTimeMillis() - deserStart + attemptedTask = Some(deserializedTask) + val deserTime = System.currentTimeMillis() - start + taskStart = System.currentTimeMillis() // Run it val result: Any = deserializedTask.run(taskId) @@ -170,16 +174,19 @@ private[spark] class LocalScheduler(threads: Int, val maxFailures: Int, val sc: val resultToReturn = ser.deserialize[Any](serResult) val accumUpdates = ser.deserialize[collection.mutable.Map[Long, Any]]( ser.serialize(Accumulators.values)) + val serviceTime = System.currentTimeMillis() - taskStart logInfo("Finished " + taskId) - deserializedTask.metrics.get.executorRunTime = deserTime.toInt//info.duration.toInt //close enough + deserializedTask.metrics.get.executorRunTime = serviceTime.toInt deserializedTask.metrics.get.executorDeserializeTime = deserTime.toInt - val taskResult = new TaskResult(result, accumUpdates, deserializedTask.metrics.getOrElse(null)) val serializedResult = ser.serialize(taskResult) localActor ! LocalStatusUpdate(taskId, TaskState.FINISHED, serializedResult) } catch { case t: Throwable => { - val failure = new ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace) + val serviceTime = System.currentTimeMillis() - taskStart + val metrics = attemptedTask.flatMap(t => t.metrics) + metrics.foreach{m => m.executorRunTime = serviceTime.toInt} + val failure = new ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics) localActor ! LocalStatusUpdate(taskId, TaskState.FAILED, ser.serialize(failure)) } } diff --git a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala index 70b69bb26f..f12fec41d5 100644 --- a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala @@ -152,6 +152,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas info.markFailed() decreaseRunningTasks(1) val reason: ExceptionFailure = ser.deserialize[ExceptionFailure](serializedData, getClass.getClassLoader) + sched.listener.taskEnded(task, reason, null, null, info, reason.metrics.getOrElse(null)) if (!finished(index)) { copiesRunning(index) -= 1 numFailures(index) += 1 diff --git a/core/src/main/scala/spark/storage/BlockManagerUI.scala b/core/src/main/scala/spark/storage/BlockManagerUI.scala deleted file mode 100644 index 631455abcd..0000000000 --- a/core/src/main/scala/spark/storage/BlockManagerUI.scala +++ /dev/null @@ -1,82 +0,0 @@ -package spark.storage - -import akka.actor.{ActorRef, ActorSystem} - -import akka.util.Timeout -import scala.concurrent.duration._ -import spray.httpx.TwirlSupport._ -import spray.routing.Directives - -import spark.{Logging, SparkContext} -import spark.util.AkkaUtils -import spark.Utils - - -/** - * Web UI server for the BlockManager inside each SparkContext. - */ -private[spark] -class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, sc: SparkContext) - extends Directives with Logging { - - implicit val implicitActorSystem = actorSystem - val STATIC_RESOURCE_DIR = "spark/deploy/static" - - implicit val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") - val host = Utils.localHostName() - val port = if (System.getProperty("spark.ui.port") != null) { - System.getProperty("spark.ui.port").toInt - } else { - // TODO: Unfortunately, it's not possible to pass port 0 to spray and figure out which - // random port it bound to, so we have to try to find a local one by creating a socket. - Utils.findFreePort() - } - - /** Start a HTTP server to run the Web interface */ - def start() { - try { - AkkaUtils.startSprayServer(actorSystem, "0.0.0.0", port, handler, "BlockManagerHTTPServer") - logInfo("Started BlockManager web UI at http://%s:%d".format(host, port)) - } catch { - case e: Exception => - logError("Failed to create BlockManager WebUI", e) - System.exit(1) - } - } - - val handler = { - get { - path("") { - complete { - // Request the current storage status from the Master - val storageStatusList = sc.getExecutorStorageStatus - // Calculate macro-level statistics - val maxMem = storageStatusList.map(_.maxMem).reduce(_+_) - val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_) - val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)) - .reduceOption(_+_).getOrElse(0L) - val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc) - spark.storage.html.index. - render(maxMem, remainingMem, diskSpaceUsed, rdds, storageStatusList) - } - } ~ - path("rdd") { - parameter("id") { id => - complete { - val prefix = "rdd_" + id.toString - val storageStatusList = sc.getExecutorStorageStatus - val filteredStorageStatusList = StorageUtils. - filterStorageStatusByPrefix(storageStatusList, prefix) - val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head - spark.storage.html.rdd.render(rddInfo, filteredStorageStatusList) - } - } - } ~ - pathPrefix("static") { - getFromResourceDirectory(STATIC_RESOURCE_DIR) - } - } - } - - private[spark] def appUIAddress = "http://" + host + ":" + port -} diff --git a/core/src/main/scala/spark/ui/JettyUtils.scala b/core/src/main/scala/spark/ui/JettyUtils.scala new file mode 100644 index 0000000000..bc6f9c10d5 --- /dev/null +++ b/core/src/main/scala/spark/ui/JettyUtils.scala @@ -0,0 +1,115 @@ +package spark.ui + +import annotation.tailrec + +import javax.servlet.http.{HttpServletResponse, HttpServletRequest} + +import net.liftweb.json.{JValue, pretty, render} + +import org.eclipse.jetty.server.{Server, Request, Handler} +import org.eclipse.jetty.server.handler.{ResourceHandler, HandlerList, ContextHandler, AbstractHandler} +import org.eclipse.jetty.util.thread.QueuedThreadPool + +import scala.util.{Try, Success, Failure} +import scala.xml.Node + +import spark.Logging + +/** Utilities for launching a web server using Jetty's HTTP Server class */ +private[spark] object JettyUtils extends Logging { + // Base type for a function that returns something based on an HTTP request. Allows for + // implicit conversion from many types of functions to jetty Handlers. + type Responder[T] = HttpServletRequest => T + + // Conversions from various types of Responder's to jetty Handlers + implicit def jsonResponderToHandler(responder: Responder[JValue]): Handler = + createHandler(responder, "text/json", (in: JValue) => pretty(render(in))) + + implicit def htmlResponderToHandler(responder: Responder[Seq[Node]]): Handler = + createHandler(responder, "text/html", (in: Seq[Node]) => "<!DOCTYPE html>" + in.toString) + + implicit def textResponderToHandler(responder: Responder[String]): Handler = + createHandler(responder, "text/plain") + + private def createHandler[T <% AnyRef](responder: Responder[T], contentType: String, + extractFn: T => String = (in: Any) => in.toString): Handler = { + new AbstractHandler { + def handle(target: String, + baseRequest: Request, + request: HttpServletRequest, + response: HttpServletResponse) { + response.setContentType("%s;charset=utf-8".format(contentType)) + response.setStatus(HttpServletResponse.SC_OK) + baseRequest.setHandled(true) + val result = responder(request) + response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate") + response.getWriter().println(extractFn(result)) + } + } + } + + /** Creates a handler that always redirects the user to a given path */ + def createRedirectHandler(newPath: String): Handler = { + new AbstractHandler { + def handle(target: String, + baseRequest: Request, + request: HttpServletRequest, + response: HttpServletResponse) { + response.setStatus(302) + response.setHeader("Location", baseRequest.getRootURL + newPath) + baseRequest.setHandled(true) + } + } + } + + /** Creates a handler for serving files from a static directory */ + def createStaticHandler(resourceBase: String): ResourceHandler = { + val staticHandler = new ResourceHandler + Option(getClass.getClassLoader.getResource(resourceBase)) match { + case Some(res) => + staticHandler.setResourceBase(res.toString) + case None => + throw new Exception("Could not find resource path for Web UI: " + resourceBase) + } + staticHandler + } + + /** + * Attempts to start a Jetty server at the supplied ip:port which uses the supplied handlers. + * + * If the desired port number is contented, continues incrementing ports until a free port is + * found. Returns the chosen port and the jetty Server object. + */ + def startJettyServer(ip: String, port: Int, handlers: Seq[(String, Handler)]): (Server, Int) = { + val handlersToRegister = handlers.map { case(path, handler) => + val contextHandler = new ContextHandler(path) + contextHandler.setHandler(handler) + contextHandler.asInstanceOf[org.eclipse.jetty.server.Handler] + } + + val handlerList = new HandlerList + handlerList.setHandlers(handlersToRegister.toArray) + + @tailrec + def connect(currentPort: Int): (Server, Int) = { + val server = new Server(currentPort) + val pool = new QueuedThreadPool + pool.setDaemon(true) + server.setThreadPool(pool) + server.setHandler(handlerList) + + Try { server.start() } match { + case s: Success[_] => + sys.addShutdownHook(server.stop()) // Be kind, un-bind + (server, server.getConnectors.head.getLocalPort) + case f: Failure[_] => + server.stop() + logInfo("Failed to create UI at port, %s. Trying again.".format(currentPort)) + logInfo("Error was: " + f.toString) + connect((currentPort + 1) % 65536) + } + } + + connect(port) + } +} diff --git a/core/src/main/scala/spark/ui/Page.scala b/core/src/main/scala/spark/ui/Page.scala new file mode 100644 index 0000000000..c853b44b76 --- /dev/null +++ b/core/src/main/scala/spark/ui/Page.scala @@ -0,0 +1,3 @@ +package spark.ui + +private[spark] object Page extends Enumeration { val Storage, Jobs = Value }
\ No newline at end of file diff --git a/core/src/main/scala/spark/ui/SparkUI.scala b/core/src/main/scala/spark/ui/SparkUI.scala new file mode 100644 index 0000000000..b3bdc2c490 --- /dev/null +++ b/core/src/main/scala/spark/ui/SparkUI.scala @@ -0,0 +1,63 @@ +package spark.ui + +import javax.servlet.http.HttpServletRequest + +import org.eclipse.jetty.server.{Handler, Server} + +import spark.{Logging, SparkContext, Utils} +import spark.ui.storage.BlockManagerUI +import spark.ui.jobs.JobProgressUI +import spark.ui.UIUtils._ +import spark.ui.JettyUtils._ + +/** Top level user interface for Spark */ +private[spark] class SparkUI(sc: SparkContext) extends Logging { + // TODO(pwendell): It would be nice to add a view that prints out environment information + + val host = Utils.localHostName() + val port = Option(System.getProperty("spark.ui.port")).getOrElse(SparkUI.DEFAULT_PORT).toInt + var boundPort: Option[Int] = None + var server: Option[Server] = None + + val handlers = Seq[(String, Handler)]( + ("/static", createStaticHandler(SparkUI.STATIC_RESOURCE_DIR)), + ("/", createRedirectHandler("/stages")) + ) + val storage = new BlockManagerUI(sc) + val jobs = new JobProgressUI(sc) + val allHandlers = storage.getHandlers ++ jobs.getHandlers ++ handlers + + /** Bind the HTTP server which backs this web interface */ + def bind() { + try { + val (srv, usedPort) = JettyUtils.startJettyServer("0.0.0.0", port, allHandlers) + logInfo("Started Spark Web UI at http://%s:%d".format(host, usedPort)) + server = Some(srv) + boundPort = Some(usedPort) + } catch { + case e: Exception => + logError("Failed to create Spark JettyUtils", e) + System.exit(1) + } + } + + /** Initialize all components of the server */ + def start() { + // NOTE: This is decoupled from bind() because of the following dependency cycle: + // DAGScheduler() requires that the port of this server is known + // This server must register all handlers, including JobProgressUI, before binding + // JobProgressUI registers a listener with SparkContext, which requires sc to initialize + jobs.start() + } + + def stop() { + server.foreach(_.stop()) + } + + private[spark] def appUIAddress = "http://" + host + ":" + boundPort.getOrElse("-1") +} + +private[spark] object SparkUI { + val DEFAULT_PORT = "33000" + val STATIC_RESOURCE_DIR = "spark/ui/static" +} diff --git a/core/src/main/scala/spark/ui/UIUtils.scala b/core/src/main/scala/spark/ui/UIUtils.scala new file mode 100644 index 0000000000..7b79290d1b --- /dev/null +++ b/core/src/main/scala/spark/ui/UIUtils.scala @@ -0,0 +1,113 @@ +package spark.ui + +import scala.xml.Node + +import spark.SparkContext + +/** Utility functions for generating XML pages with spark content. */ +private[spark] object UIUtils { + import Page._ + + /** Returns a spark page with correctly formatted headers */ + def headerSparkPage(content: => Seq[Node], sc: SparkContext, title: String, page: Page.Value) + : Seq[Node] = { + val storage = page match { + case Storage => <li class="active"><a href="/storage">Storage</a></li> + case _ => <li><a href="/storage">Storage</a></li> + } + val jobs = page match { + case Jobs => <li class="active"><a href="/stages">Jobs</a></li> + case _ => <li><a href="/stages">Jobs</a></li> + } + + <html> + <head> + <meta http-equiv="Content-type" content="text/html; charset=utf-8" /> + <link rel="stylesheet" href="/static/bootstrap.min.css" type="text/css" /> + <link rel="stylesheet" href="/static/webui.css" type="text/css" /> + <link rel="stylesheet" href="/static/bootstrap-responsive.min.css" type="text/css" /> + <script src="/static/sorttable.js"></script> + <title>{title}</title> + <style type="text/css"> + table.sortable thead {{ cursor: pointer; }} + </style> + </head> + <body> + <div class="container"> + + <div class="row"> + <div class="span12"> + <div class="navbar"> + <div class="navbar-inner"> + <div class="container"> + <div class="brand"><img src="/static/spark-logo-77x50px-hd.png" /></div> + <ul class="nav"> + {storage} + {jobs} + </ul> + <ul id="infolist"> + <li>Application: <strong>{sc.appName}</strong></li> + <li>Master: <strong>{sc.master}</strong></li> + <li>Executors: <strong>{sc.getExecutorStorageStatus.size}</strong></li> + </ul> + </div> + </div> + </div> + </div> + </div> + + <div class="row" style="padding-top: 5px;"> + <div class="span12"> + <h1 style="vertical-align: bottom; display: inline-block;"> + {title} + </h1> + </div> + </div> + <hr/> + {content} + </div> + </body> + </html> + } + + /** Returns a page with the spark css/js and a simple format. Used for scheduler UI. */ + def basicSparkPage(content: => Seq[Node], title: String): Seq[Node] = { + <html> + <head> + <meta http-equiv="Content-type" content="text/html; charset=utf-8" /> + <link rel="stylesheet" href="/static/bootstrap.min.css" type="text/css" /> + <link rel="stylesheet" href="/static/bootstrap-responsive.min.css" type="text/css" /> + <script src="/static/sorttable.js"></script> + <title>{title}</title> + <style type="text/css"> + table.sortable thead {{ cursor: pointer; }} + </style> + </head> + <body> + <div class="container"> + <div class="row"> + <div class="span2"> + <img src="/static/spark_logo.png" /> + </div> + <div class="span10"> + <h1 style="vertical-align: bottom; margin-top: 40px; display: inline-block;"> + {title} + </h1> + </div> + </div> + {content} + </div> + </body> + </html> + } + + /** Returns an HTML table constructed by generating a row for each object in a sequence. */ + def listingTable[T](headers: Seq[String], makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = { + <table class="table table-bordered table-striped table-condensed sortable"> + <thead>{headers.map(h => <th>{h}</th>)}</thead> + <tbody> + {rows.map(r => makeRow(r))} + </tbody> + </table> + } +}
\ No newline at end of file diff --git a/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala new file mode 100644 index 0000000000..8bbc6ce88e --- /dev/null +++ b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala @@ -0,0 +1,71 @@ +package spark.ui + +import scala.util.Random + +import spark.SparkContext +import spark.SparkContext._ + +/** + * Continuously generates jobs that expose various features of the WebUI (internal testing tool). + * + * Usage: ./run spark.ui.UIWorkloadGenerator [master] + */ +private[spark] object UIWorkloadGenerator { + val NUM_PARTITIONS = 100 + val INTER_JOB_WAIT_MS = 500 + + def main(args: Array[String]) { + val master = args(0) + val appName = "Spark UI Tester" + val sc = new SparkContext(master, appName) + + // NOTE: Right now there is no easy way for us to show spark.job.annotation for a given phase, + // but we pass it here anyways since it will be useful once we do. + def setName(s: String) = { + sc.addLocalProperties("spark.job.annotation", s) + } + val baseData = sc.makeRDD(1 to NUM_PARTITIONS * 10, NUM_PARTITIONS) + def nextFloat() = (new Random()).nextFloat() + + val jobs = Seq[(String, () => Long)]( + ("Count", baseData.count), + ("Cache and Count", baseData.map(x => x).cache.count), + ("Single Shuffle", baseData.map(x => (x % 10, x)).reduceByKey(_ + _).count), + ("Entirely failed phase", baseData.map(x => throw new Exception).count), + ("Partially failed phase", { + baseData.map{x => + val probFailure = (4.0 / NUM_PARTITIONS) + if (nextFloat() < probFailure) { + throw new Exception("This is a task failure") + } + 1 + }.count + }), + ("Partially failed phase (longer tasks)", { + baseData.map{x => + val probFailure = (4.0 / NUM_PARTITIONS) + if (nextFloat() < probFailure) { + Thread.sleep(100) + throw new Exception("This is a task failure") + } + 1 + }.count + }), + ("Job with delays", baseData.map(x => Thread.sleep(100)).count) + ) + + while (true) { + for ((desc, job) <- jobs) { + try { + setName(desc) + job() + println("Job funished: " + desc) + } catch { + case e: Exception => + println("Job Failed: " + desc) + } + Thread.sleep(INTER_JOB_WAIT_MS) + } + } + } +} diff --git a/core/src/main/scala/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/spark/ui/jobs/IndexPage.scala new file mode 100644 index 0000000000..1e675ab2cb --- /dev/null +++ b/core/src/main/scala/spark/ui/jobs/IndexPage.scala @@ -0,0 +1,112 @@ +package spark.ui.jobs + +import java.util.Date + +import javax.servlet.http.HttpServletRequest + +import scala.Some +import scala.xml.{NodeSeq, Node} + +import spark.scheduler.Stage +import spark.ui.UIUtils._ +import spark.ui.Page._ +import spark.storage.StorageLevel + +/** Page showing list of all ongoing and recently finished stages */ +private[spark] class IndexPage(parent: JobProgressUI) { + def listener = parent.listener + val dateFmt = parent.dateFmt + + def render(request: HttpServletRequest): Seq[Node] = { + val activeStages = listener.activeStages.toSeq + val completedStages = listener.completedStages.reverse.toSeq + val failedStages = listener.failedStages.reverse.toSeq + + /** Special table which merges two header cells. */ + def stageTable[T](makeRow: T => Seq[Node], rows: Seq[T]): Seq[Node] = { + <table class="table table-bordered table-striped table-condensed sortable"> + <thead> + <th>Stage Id</th> + <th>Origin</th> + <th>Submitted</th> + <td>Duration</td> + <td colspan="2">Tasks: Complete/Total</td> + <td>Shuffle Activity</td> + <td>Stored RDD</td> + </thead> + <tbody> + {rows.map(r => makeRow(r))} + </tbody> + </table> + } + + val activeStageTable: NodeSeq = stageTable(stageRow, activeStages) + val completedStageTable = stageTable(stageRow, completedStages) + val failedStageTable: NodeSeq = stageTable(stageRow, failedStages) + + val content = <h2>Active Stages</h2> ++ activeStageTable ++ + <h2>Completed Stages</h2> ++ completedStageTable ++ + <h2>Failed Stages</h2> ++ failedStageTable + + headerSparkPage(content, parent.sc, "Spark Stages", Jobs) + } + + def getElapsedTime(submitted: Option[Long], completed: Long): String = { + submitted match { + case Some(t) => parent.formatDuration(completed - t) + case _ => "Unknown" + } + } + + def makeProgressBar(completed: Int, total: Int): Seq[Node] = { + val width=130 + val height=15 + val completeWidth = (completed.toDouble / total) * width + + <svg width={width.toString} height={height.toString}> + <rect width={width.toString} height={height.toString} + fill="white" stroke="rgb(51,51,51)" stroke-width="1" /> + <rect width={completeWidth.toString} height={height.toString} + fill="rgb(0,136,204)" stroke="black" stroke-width="1" /> + </svg> + } + + + def stageRow(s: Stage): Seq[Node] = { + val submissionTime = s.submissionTime match { + case Some(t) => dateFmt.format(new Date(t)) + case None => "Unknown" + } + val (read, write) = (listener.hasShuffleRead(s.id), listener.hasShuffleWrite(s.id)) + val shuffleInfo = (read, write) match { + case (true, true) => "Read/Write" + case (true, false) => "Read" + case (false, true) => "Write" + case _ => "" + } + val completedTasks = listener.stageToTasksComplete.getOrElse(s.id, 0) + val totalTasks = s.numPartitions + + <tr> + <td>{s.id}</td> + <td><a href={"/stages/stage?id=%s".format(s.id)}>{s.origin}</a></td> + <td>{submissionTime}</td> + <td>{getElapsedTime(s.submissionTime, + s.completionTime.getOrElse(System.currentTimeMillis()))}</td> + <td class="progress-cell">{makeProgressBar(completedTasks, totalTasks)}</td> + <td style="border-left: 0; text-align: center;">{completedTasks} / {totalTasks} + {listener.stageToTasksFailed.getOrElse(s.id, 0) match { + case f if f > 0 => "(%s failed)".format(f) + case _ => + }} + </td> + <td>{shuffleInfo}</td> + <td>{if (s.rdd.getStorageLevel != StorageLevel.NONE) { + <a href={"/storage/rdd?id=%s".format(s.rdd.id)}> + {Option(s.rdd.name).getOrElse(s.rdd.id)} + </a> + }} + </td> + </tr> + } +} diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala new file mode 100644 index 0000000000..c5d2aa5d60 --- /dev/null +++ b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala @@ -0,0 +1,127 @@ +package spark.ui.jobs + +import scala.concurrent.duration._ + +import java.text.SimpleDateFormat + +import javax.servlet.http.HttpServletRequest + +import org.eclipse.jetty.server.Handler + +import scala.Seq +import scala.collection.mutable.{HashSet, ListBuffer, HashMap, ArrayBuffer} + +import spark.ui.JettyUtils._ +import spark.{ExceptionFailure, SparkContext, Success, Utils} +import spark.scheduler._ +import spark.scheduler.cluster.TaskInfo +import spark.executor.TaskMetrics +import collection.mutable + +/** Web UI showing progress status of all jobs in the given SparkContext. */ +private[spark] class JobProgressUI(val sc: SparkContext) { + private var _listener: Option[JobProgressListener] = None + def listener = _listener.get + val dateFmt = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") + + private val indexPage = new IndexPage(this) + private val stagePage = new StagePage(this) + + def start() { + _listener = Some(new JobProgressListener) + sc.addSparkListener(listener) + } + + def formatDuration(ms: Long) = Utils.msDurationToString(ms) + + def getHandlers = Seq[(String, Handler)]( + ("/stages/stage", (request: HttpServletRequest) => stagePage.render(request)), + ("/stages", (request: HttpServletRequest) => indexPage.render(request)) + ) +} + +private[spark] class JobProgressListener extends SparkListener { + // How many stages to remember + val RETAINED_STAGES = System.getProperty("spark.ui.retained_stages", "1000").toInt + + val activeStages = HashSet[Stage]() + val completedStages = ListBuffer[Stage]() + val failedStages = ListBuffer[Stage]() + + val stageToTasksComplete = HashMap[Int, Int]() + val stageToTasksFailed = HashMap[Int, Int]() + val stageToTaskInfos = + HashMap[Int, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]() + + override def onJobStart(jobStart: SparkListenerJobStart) {} + + override def onStageCompleted(stageCompleted: StageCompleted) = { + val stage = stageCompleted.stageInfo.stage + activeStages -= stage + completedStages += stage + trimIfNecessary(completedStages) + } + + /** If stages is too large, remove and garbage collect old stages */ + def trimIfNecessary(stages: ListBuffer[Stage]) { + if (stages.size > RETAINED_STAGES) { + val toRemove = RETAINED_STAGES / 10 + stages.takeRight(toRemove).foreach( s => { + stageToTaskInfos.remove(s.id) + }) + stages.trimEnd(toRemove) + } + } + + override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = + activeStages += stageSubmitted.stage + + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { + val sid = taskEnd.task.stageId + val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = + taskEnd.reason match { + case e: ExceptionFailure => + stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1 + (Some(e), e.metrics) + case _ => + stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1 + (None, Some(taskEnd.taskMetrics)) + } + val taskList = stageToTaskInfos.getOrElse( + sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) + taskList += ((taskEnd.taskInfo, metrics, failureInfo)) + stageToTaskInfos(sid) = taskList + } + + override def onJobEnd(jobEnd: SparkListenerJobEnd) { + jobEnd match { + case end: SparkListenerJobEnd => + end.jobResult match { + case JobFailed(ex, Some(stage)) => + activeStages -= stage + failedStages += stage + trimIfNecessary(failedStages) + case _ => + } + case _ => + } + } + + /** Is this stage's input from a shuffle read. */ + def hasShuffleRead(stageID: Int): Boolean = { + // This is written in a slightly complicated way to avoid having to scan all tasks + for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) { + if (s._2 != null) return s._2.flatMap(m => m.shuffleReadMetrics).isDefined + } + return false // No tasks have finished for this stage + } + + /** Is this stage's output to a shuffle write. */ + def hasShuffleWrite(stageID: Int): Boolean = { + // This is written in a slightly complicated way to avoid having to scan all tasks + for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) { + if (s._2 != null) return s._2.flatMap(m => m.shuffleWriteMetrics).isDefined + } + return false // No tasks have finished for this stage + } +} diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala new file mode 100644 index 0000000000..51b82b6a8c --- /dev/null +++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala @@ -0,0 +1,114 @@ +package spark.ui.jobs + +import java.util.Date + +import javax.servlet.http.HttpServletRequest + +import scala.xml.Node + +import spark.ui.UIUtils._ +import spark.ui.Page._ +import spark.util.Distribution +import spark.{ExceptionFailure, Utils} +import spark.scheduler.cluster.TaskInfo +import spark.executor.TaskMetrics + +/** Page showing statistics and task list for a given stage */ +private[spark] class StagePage(parent: JobProgressUI) { + def listener = parent.listener + val dateFmt = parent.dateFmt + + def render(request: HttpServletRequest): Seq[Node] = { + val stageId = request.getParameter("id").toInt + + if (!listener.stageToTaskInfos.contains(stageId)) { + val content = + <div> + <h2>Summary Metrics</h2> No tasks have finished yet + <h2>Tasks</h2> No tasks have finished yet + </div> + return headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs) + } + + val tasks = listener.stageToTaskInfos(stageId) + + val shuffleRead = listener.hasShuffleRead(stageId) + val shuffleWrite = listener.hasShuffleWrite(stageId) + + val taskHeaders: Seq[String] = + Seq("Task ID", "Duration", "Locality Level", "Worker", "Launch Time") ++ + {if (shuffleRead) Seq("Shuffle Read") else Nil} ++ + {if (shuffleWrite) Seq("Shuffle Write") else Nil} ++ + Seq("Details") + + val taskTable = listingTable(taskHeaders, taskRow, tasks) + + // Excludes tasks which failed and have incomplete metrics + val validTasks = tasks.filter(t => Option(t._2).isDefined) + + val summaryTable: Option[Seq[Node]] = + if (validTasks.size == 0) { + None + } + else { + val serviceTimes = validTasks.map{case (info, metrics, exception) => + metrics.get.executorRunTime.toDouble} + val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map( + ms => parent.formatDuration(ms.toLong)) + + def getQuantileCols(data: Seq[Double]) = + Distribution(data).get.getQuantiles().map(d => Utils.memoryBytesToString(d.toLong)) + + val shuffleReadSizes = validTasks.map { + case(info, metrics, exception) => + metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble + } + val shuffleReadQuantiles = "Shuffle Read (Remote)" +: getQuantileCols(shuffleReadSizes) + + val shuffleWriteSizes = validTasks.map { + case(info, metrics, exception) => + metrics.get.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble + } + val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes) + + val listings: Seq[Seq[String]] = Seq(serviceQuantiles, + if (shuffleRead) shuffleReadQuantiles else Nil, + if (shuffleWrite) shuffleWriteQuantiles else Nil) + + val quantileHeaders = Seq("Metric", "Min", "25%", "50%", "75%", "Max") + def quantileRow(data: Seq[String]): Seq[Node] = <tr> {data.map(d => <td>{d}</td>)} </tr> + Some(listingTable(quantileHeaders, quantileRow, listings)) + } + + val content = + <h2>Summary Metrics</h2> ++ summaryTable.getOrElse(Nil) ++ <h2>Tasks</h2> ++ taskTable; + + headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs) + } + + + def taskRow(taskData: (TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])): Seq[Node] = { + def fmtStackTrace(trace: Seq[StackTraceElement]): Seq[Node] = + trace.map(e => <span style="display:block;">{e.toString}</span>) + val (info, metrics, exception) = taskData + <tr> + <td>{info.taskId}</td> + <td sorttable_customkey={metrics.map{m => m.executorRunTime.toString}.getOrElse("1")}> + {metrics.map{m => parent.formatDuration(m.executorRunTime)}.getOrElse("")} + </td> + <td>{info.taskLocality}</td> + <td>{info.hostPort}</td> + <td>{dateFmt.format(new Date(info.launchTime))}</td> + {metrics.flatMap{m => m.shuffleReadMetrics}.map{s => + <td>{Utils.memoryBytesToString(s.remoteBytesRead)}</td>}.getOrElse("")} + {metrics.flatMap{m => m.shuffleWriteMetrics}.map{s => + <td>{Utils.memoryBytesToString(s.shuffleBytesWritten)}</td>}.getOrElse("")} + <td>{exception.map(e => + <span> + {e.className} ({e.description})<br/> + {fmtStackTrace(e.stackTrace)} + </span>).getOrElse("")} + </td> + </tr> + } +} diff --git a/core/src/main/scala/spark/ui/storage/BlockManagerUI.scala b/core/src/main/scala/spark/ui/storage/BlockManagerUI.scala new file mode 100644 index 0000000000..252600aad4 --- /dev/null +++ b/core/src/main/scala/spark/ui/storage/BlockManagerUI.scala @@ -0,0 +1,24 @@ +package spark.ui.storage + +import scala.concurrent.duration._ + +import javax.servlet.http.HttpServletRequest + +import org.eclipse.jetty.server.Handler + +import spark.{Logging, SparkContext} +import spark.ui.JettyUtils._ + +/** Web UI showing storage status of all RDD's in the given SparkContext. */ +private[spark] class BlockManagerUI(val sc: SparkContext) extends Logging { + implicit val timeout = Duration.create( + System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") + + val indexPage = new IndexPage(this) + val rddPage = new RDDPage(this) + + def getHandlers = Seq[(String, Handler)]( + ("/storage/rdd", (request: HttpServletRequest) => rddPage.render(request)), + ("/storage", (request: HttpServletRequest) => indexPage.render(request)) + ) +} diff --git a/core/src/main/scala/spark/ui/storage/IndexPage.scala b/core/src/main/scala/spark/ui/storage/IndexPage.scala new file mode 100644 index 0000000000..d284134391 --- /dev/null +++ b/core/src/main/scala/spark/ui/storage/IndexPage.scala @@ -0,0 +1,64 @@ +package spark.ui.storage + +import javax.servlet.http.HttpServletRequest + +import scala.xml.Node + +import spark.storage.{RDDInfo, StorageUtils} +import spark.Utils +import spark.ui.UIUtils._ +import spark.ui.Page._ + +/** Page showing list of RDD's currently stored in the cluster */ +private[spark] class IndexPage(parent: BlockManagerUI) { + val sc = parent.sc + + def render(request: HttpServletRequest): Seq[Node] = { + val storageStatusList = sc.getExecutorStorageStatus + // Calculate macro-level statistics + val maxMem = storageStatusList.map(_.maxMem).reduce(_+_) + val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_) + val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)) + .reduceOption(_+_).getOrElse(0L) + + val rddHeaders = Seq( + "RDD Name", + "Storage Level", + "Cached Partitions", + "Fraction Partitions Cached", + "Size in Memory", + "Size on Disk") + val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc) + val rddTable = listingTable(rddHeaders, rddRow, rdds) + + val content = + <div class="row"> + <div class="span12"> + <ul class="unstyled"> + <li><strong>Memory:</strong> + {Utils.memoryBytesToString(maxMem - remainingMem)} Used + ({Utils.memoryBytesToString(remainingMem)} Available) </li> + <li><strong>Disk:</strong> {Utils.memoryBytesToString(diskSpaceUsed)} Used </li> + </ul> + </div> + </div> ++ {rddTable}; + + headerSparkPage(content, parent.sc, "Spark Storage ", Storage) + } + + def rddRow(rdd: RDDInfo): Seq[Node] = { + <tr> + <td> + <a href={"/storage/rdd?id=%s".format(rdd.id)}> + {rdd.name} + </a> + </td> + <td>{rdd.storageLevel.description} + </td> + <td>{rdd.numCachedPartitions}</td> + <td>{rdd.numCachedPartitions / rdd.numPartitions.toDouble}</td> + <td>{Utils.memoryBytesToString(rdd.memSize)}</td> + <td>{Utils.memoryBytesToString(rdd.diskSize)}</td> + </tr> + } +} diff --git a/core/src/main/scala/spark/ui/storage/RDDPage.scala b/core/src/main/scala/spark/ui/storage/RDDPage.scala new file mode 100644 index 0000000000..0cb1e47ea5 --- /dev/null +++ b/core/src/main/scala/spark/ui/storage/RDDPage.scala @@ -0,0 +1,104 @@ +package spark.ui.storage + +import javax.servlet.http.HttpServletRequest + +import scala.xml.Node + +import spark.storage.{StorageStatus, StorageUtils} +import spark.ui.UIUtils._ +import spark.Utils +import spark.storage.BlockManagerMasterActor.BlockStatus +import spark.ui.Page._ + +/** Page showing storage details for a given RDD */ +private[spark] class RDDPage(parent: BlockManagerUI) { + val sc = parent.sc + + def render(request: HttpServletRequest): Seq[Node] = { + val id = request.getParameter("id") + val prefix = "rdd_" + id.toString + val storageStatusList = sc.getExecutorStorageStatus + val filteredStorageStatusList = StorageUtils. + filterStorageStatusByPrefix(storageStatusList, prefix) + val rddInfo = StorageUtils.rddInfoFromStorageStatus(filteredStorageStatusList, sc).head + + val workerHeaders = Seq("Host", "Memory Usage", "Disk Usage") + val workers = filteredStorageStatusList.map((prefix, _)) + val workerTable = listingTable(workerHeaders, workerRow, workers) + + val blockHeaders = Seq("Block Name", "Storage Level", "Size in Memory", "Size on Disk") + val blocks = filteredStorageStatusList.flatMap(_.blocks).toArray.sortWith(_._1 < _._1) + val blockTable = listingTable(blockHeaders, blockRow, blocks) + + val content = + <div class="row"> + <div class="span12"> + <ul class="unstyled"> + <li> + <strong>Storage Level:</strong> + {rddInfo.storageLevel.description} + </li> + <li> + <strong>Cached Partitions:</strong> + {rddInfo.numCachedPartitions} + </li> + <li> + <strong>Total Partitions:</strong> + {rddInfo.numPartitions} + </li> + <li> + <strong>Memory Size:</strong> + {Utils.memoryBytesToString(rddInfo.memSize)} + </li> + <li> + <strong>Disk Size:</strong> + {Utils.memoryBytesToString(rddInfo.diskSize)} + </li> + </ul> + </div> + </div> + <hr/> + <div class="row"> + <div class="span12"> + {workerTable} + </div> + </div> + <hr/> + <div class="row"> + <div class="span12"> + <h3> RDD Summary </h3> + {blockTable} + </div> + </div>; + + headerSparkPage(content, parent.sc, "RDD Info: " + rddInfo.name, Jobs) + } + + def blockRow(blk: (String, BlockStatus)): Seq[Node] = { + val (id, block) = blk + <tr> + <td>{id}</td> + <td> + {block.storageLevel.description} + </td> + <td sorttable_customkey={block.memSize.toString}> + {Utils.memoryBytesToString(block.memSize)} + </td> + <td sorttable_customkey={block.diskSize.toString}> + {Utils.memoryBytesToString(block.diskSize)} + </td> + </tr> + } + + def workerRow(worker: (String, StorageStatus)): Seq[Node] = { + val (prefix, status) = worker + <tr> + <td>{status.blockManagerId.host + ":" + status.blockManagerId.port}</td> + <td> + {Utils.memoryBytesToString(status.memUsed(prefix))} + ({Utils.memoryBytesToString(status.memRemaining)} Total Available) + </td> + <td>{Utils.memoryBytesToString(status.diskUsed(prefix))}</td> + </tr> + } +} diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala index ea39888c21..c381a0510b 100644 --- a/core/src/main/scala/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/spark/util/AkkaUtils.scala @@ -1,20 +1,10 @@ package spark.util -import akka.actor.{Props, ActorSystem, ExtendedActorSystem} +import akka.actor.{ActorSystem, ExtendedActorSystem} import com.typesafe.config.ConfigFactory import scala.concurrent.duration._ -import akka.pattern.ask -import akka.remote.RemoteActorRefProvider - -import spray.routing.Route -import spray.io.IOExtension -import spray.routing.HttpServiceActor -import spray.can.server.{HttpServer, ServerSettings} -import spray.io.SingletonHandler import scala.concurrent.Await -import spark.{Utils, SparkException} - -import java.util.concurrent.TimeoutException +import akka.remote.RemoteActorRefProvider /** * Various utility classes for working with Akka. @@ -65,29 +55,4 @@ private[spark] object AkkaUtils { return (actorSystem, boundPort) } - /** - * Creates a Spray HTTP server bound to a given IP and port with a given Spray Route object to - * handle requests. Returns the bound port or throws a SparkException on failure. - * TODO: Not changing ip to host here - is it required ? - */ - def startSprayServer(actorSystem: ActorSystem, ip: String, port: Int, route: Route, name: String = "HttpServer") = { - val ioWorker = IOExtension(actorSystem).ioBridge() - val httpService = actorSystem.actorOf(Props(HttpServiceActor(route))) - val server = actorSystem.actorOf( - Props(new HttpServer(ioWorker, SingletonHandler(httpService), ServerSettings())), name = name) - actorSystem.registerOnTermination { actorSystem.stop(ioWorker) } - val timeout = 3.seconds - val future = server.ask(HttpServer.Bind(ip, port))(timeout) - try { - Await.result(future, timeout) match { - case bound: HttpServer.Bound => - server - case other: Any => - throw new SparkException("Failed to bind web UI to port " + port + ": " + other) - } - } catch { - case e: TimeoutException => - throw new SparkException("Failed to bind web UI to port " + port) - } - } } |