diff options
Diffstat (limited to 'core/src/main')
6 files changed, 112 insertions, 21 deletions
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index d2bf151cbf..c6a3f97872 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -627,15 +627,16 @@ private object Utils extends Logging { callSiteInfo.firstUserLine) } - /** Return a string containing the last `n` bytes of a file. */ - def lastNBytes(path: String, n: Int): String = { + /** Return a string containing part of a file from byte 'start' to 'end'. */ + def offsetBytes(path: String, start: Long, end: Long): String = { val file = new File(path) val length = file.length() - val buff = new Array[Byte](math.min(n, length.toInt)) - val skip = math.max(0, length - n) + val effectiveEnd = math.min(length, end) + val effectiveStart = math.max(0, start) + val buff = new Array[Byte]((effectiveEnd-effectiveStart).toInt) val stream = new FileInputStream(file) - stream.skip(skip) + stream.skip(effectiveStart) stream.read(buff) stream.close() Source.fromBytes(buff).mkString diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index dbdc8e1057..4dd6c448a9 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -71,7 +71,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } else { addWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress) context.watch(sender) // This doesn't work with remote actors but helps for testing - sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUi.boundPort) + sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUi.boundPort.get) schedule() } } diff --git a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala index 33a16b5d84..8553377d8f 100644 --- a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala @@ -90,9 +90,9 @@ private[spark] class ApplicationPage(parent: MasterWebUI) { <td>{executor.memory}</td> <td>{executor.state}</td> <td> - <a href={"%s/log?appId=%s&executorId=%s&logType=stdout" + <a href={"%s/logPage?appId=%s&executorId=%s&logType=stdout" .format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stdout</a> - <a href={"%s/log?appId=%s&executorId=%s&logType=stderr" + <a href={"%s/logPage?appId=%s&executorId=%s&logType=stderr" .format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stderr</a> </td> </tr> diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala index 6ae1cef940..f20ea42d7f 100644 --- a/core/src/main/scala/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/spark/deploy/worker/Worker.scala @@ -77,7 +77,7 @@ private[spark] class Worker( sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse(".")) logInfo("Spark home: " + sparkHome) createWorkDir() - webUi = new WorkerWebUI(self, workDir, Some(webUiPort)) + webUi = new WorkerWebUI(this, workDir, Some(webUiPort)) webUi.start() connectToMaster() } diff --git a/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala index e466129c1a..c515f2e238 100644 --- a/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala +++ b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala @@ -16,17 +16,18 @@ import spark.Utils import spark.ui.UIUtils private[spark] class IndexPage(parent: WorkerWebUI) { + val workerActor = parent.worker.self val worker = parent.worker val timeout = parent.timeout def renderJson(request: HttpServletRequest): JValue = { - val stateFuture = (worker ? RequestWorkerState)(timeout).mapTo[WorkerState] + val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerState] val workerState = Await.result(stateFuture, 30 seconds) JsonProtocol.writeWorkerState(workerState) } def render(request: HttpServletRequest): Seq[Node] = { - val stateFuture = (worker ? RequestWorkerState)(timeout).mapTo[WorkerState] + val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerState] val workerState = Await.result(stateFuture, 30 seconds) val executorHeaders = Seq("ExecutorID", "Cores", "Memory", "Job Details", "Logs") @@ -88,11 +89,11 @@ private[spark] class IndexPage(parent: WorkerWebUI) { </ul> </td> <td> - <a href={"log?appId=%s&executorId=%s&logType=stdout" + <a href={"logPage?appId=%s&executorId=%s&logType=stdout" .format(executor.appId, executor.execId)}>stdout</a> - <a href={"log?appId=%s&executorId=%s&logType=stderr" + <a href={"logPage?appId=%s&executorId=%s&logType=stderr" .format(executor.appId, executor.execId)}>stderr</a> - </td> + </td> </tr> } diff --git a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala index 16564d5619..ccd55c1ce4 100644 --- a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala @@ -9,15 +9,17 @@ import javax.servlet.http.HttpServletRequest import org.eclipse.jetty.server.{Handler, Server} +import spark.deploy.worker.Worker import spark.{Utils, Logging} import spark.ui.JettyUtils import spark.ui.JettyUtils._ +import spark.ui.UIUtils /** * Web UI server for the standalone worker. */ private[spark] -class WorkerWebUI(val worker: ActorRef, val workDir: File, requestedPort: Option[Int] = None) +class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None) extends Logging { implicit val timeout = Timeout( Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")) @@ -33,6 +35,7 @@ class WorkerWebUI(val worker: ActorRef, val workDir: File, requestedPort: Option val handlers = Array[(String, Handler)]( ("/static", createStaticHandler(WorkerWebUI.STATIC_RESOURCE_DIR)), ("/log", (request: HttpServletRequest) => log(request)), + ("/logPage", (request: HttpServletRequest) => logPage(request)), ("/json", (request: HttpServletRequest) => indexPage.renderJson(request)), ("*", (request: HttpServletRequest) => indexPage.render(request)) ) @@ -51,18 +54,104 @@ class WorkerWebUI(val worker: ActorRef, val workDir: File, requestedPort: Option } def log(request: HttpServletRequest): String = { + val defaultBytes = 100 * 1024 val appId = request.getParameter("appId") val executorId = request.getParameter("executorId") val logType = request.getParameter("logType") + val offset = Option(request.getParameter("offset")).map(_.toLong) + val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes) + val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType) - val maxBytes = 1024 * 1024 // Guard against OOM - val defaultBytes = 100 * 1024 - val numBytes = Option(request.getParameter("numBytes")) - .flatMap(s => Some(s.toInt)).getOrElse(defaultBytes) + val (startByte, endByte) = getByteRange(path, offset, byteLength) + val file = new File(path) + val logLength = file.length + val pre = "==== Bytes %s-%s of %s of %s/%s/%s ====\n" + .format(startByte, endByte, logLength, appId, executorId, logType) + pre + Utils.offsetBytes(path, startByte, endByte) + } + + def logPage(request: HttpServletRequest): Seq[scala.xml.Node] = { + val defaultBytes = 100 * 1024 + val appId = request.getParameter("appId") + val executorId = request.getParameter("executorId") + val logType = request.getParameter("logType") + val offset = Option(request.getParameter("offset")).map(_.toLong) + val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes) val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType) - val pre = "==== Last %s bytes of %s/%s/%s ====\n".format(numBytes, appId, executorId, logType) - pre + Utils.lastNBytes(path, math.min(numBytes, maxBytes)) + + val (startByte, endByte) = getByteRange(path, offset, byteLength) + val file = new File(path) + val logLength = file.length + + val logText = <node>{Utils.offsetBytes(path, startByte, endByte)}</node> + + val linkToMaster = <p><a href={worker.masterWebUiUrl}>Back to Master</a></p> + + val range = <span>Bytes {startByte.toString} - {endByte.toString} of {logLength}</span> + + val backButton = + if (startByte > 0) { + <a href={"?appId=%s&executorId=%s&logType=%s&offset=%s&byteLength=%s" + .format(appId, executorId, logType, math.max(startByte-byteLength, 0), + byteLength)}> + <button>Previous {Utils.memoryBytesToString(math.min(byteLength, startByte))}</button> + </a> + } + else { + <button disabled="disabled">Previous 0 B</button> + } + + val nextButton = + if (endByte < logLength) { + <a href={"?appId=%s&executorId=%s&logType=%s&offset=%s&byteLength=%s". + format(appId, executorId, logType, endByte, byteLength)}> + <button>Next {Utils.memoryBytesToString(math.min(byteLength, logLength-endByte))}</button> + </a> + } + else { + <button disabled="disabled">Next 0 B</button> + } + + val content = + <html> + <body> + {linkToMaster} + <hr /> + <div> + <div style="float:left;width:40%">{backButton}</div> + <div style="float:left;">{range}</div> + <div style="float:right;">{nextButton}</div> + </div> + <br /> + <div style="height:500px;overflow:auto;padding:5px;"> + <pre>{logText}</pre> + </div> + </body> + </html> + UIUtils.basicSparkPage(content, logType + " log page for " + appId) + } + + /** Determine the byte range for a log or log page. */ + def getByteRange(path: String, offset: Option[Long], byteLength: Int) + : (Long, Long) = { + val defaultBytes = 100 * 1024 + val maxBytes = 1024 * 1024 + + val file = new File(path) + val logLength = file.length() + val getOffset = offset.getOrElse(logLength-defaultBytes) + + val startByte = + if (getOffset < 0) 0L + else if (getOffset > logLength) logLength + else getOffset + + val logPageLength = math.min(byteLength, maxBytes) + + val endByte = math.min(startByte+logPageLength, logLength) + + (startByte, endByte) } def stop() { |