aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorMatei Zaharia <matei.zaharia@gmail.com>2013-07-12 20:28:21 -0700
committerMatei Zaharia <matei.zaharia@gmail.com>2013-07-12 20:28:21 -0700
commit5a7835c152acf0411e117c2fb2bca1177d8938f4 (patch)
treed6f4e58674e594b8ac9456cecea892f4ce007ebb /core/src/main
parent71ccca0cc126c3dec12df10bca9e56a04a7a39e5 (diff)
parent73984b96a8450674b472676eaa855cc2df68a754 (diff)
downloadspark-5a7835c152acf0411e117c2fb2bca1177d8938f4.tar.gz
spark-5a7835c152acf0411e117c2fb2bca1177d8938f4.tar.bz2
spark-5a7835c152acf0411e117c2fb2bca1177d8938f4.zip
Merge pull request #691 from karenfeng/logpaging
Create log pages
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/spark/Utils.scala11
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala2
-rw-r--r--core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala4
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala2
-rw-r--r--core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala11
-rw-r--r--core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala103
6 files changed, 112 insertions, 21 deletions
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index d2bf151cbf..c6a3f97872 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -627,15 +627,16 @@ private object Utils extends Logging {
callSiteInfo.firstUserLine)
}
- /** Return a string containing the last `n` bytes of a file. */
- def lastNBytes(path: String, n: Int): String = {
+ /** Return a string containing part of a file from byte 'start' to 'end'. */
+ def offsetBytes(path: String, start: Long, end: Long): String = {
val file = new File(path)
val length = file.length()
- val buff = new Array[Byte](math.min(n, length.toInt))
- val skip = math.max(0, length - n)
+ val effectiveEnd = math.min(length, end)
+ val effectiveStart = math.max(0, start)
+ val buff = new Array[Byte]((effectiveEnd-effectiveStart).toInt)
val stream = new FileInputStream(file)
- stream.skip(skip)
+ stream.skip(effectiveStart)
stream.read(buff)
stream.close()
Source.fromBytes(buff).mkString
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index dbdc8e1057..4dd6c448a9 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -71,7 +71,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
} else {
addWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress)
context.watch(sender) // This doesn't work with remote actors but helps for testing
- sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUi.boundPort)
+ sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUi.boundPort.get)
schedule()
}
}
diff --git a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
index 33a16b5d84..8553377d8f 100644
--- a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
@@ -90,9 +90,9 @@ private[spark] class ApplicationPage(parent: MasterWebUI) {
<td>{executor.memory}</td>
<td>{executor.state}</td>
<td>
- <a href={"%s/log?appId=%s&executorId=%s&logType=stdout"
+ <a href={"%s/logPage?appId=%s&executorId=%s&logType=stdout"
.format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stdout</a>
- <a href={"%s/log?appId=%s&executorId=%s&logType=stderr"
+ <a href={"%s/logPage?appId=%s&executorId=%s&logType=stderr"
.format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stderr</a>
</td>
</tr>
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 6ae1cef940..f20ea42d7f 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -77,7 +77,7 @@ private[spark] class Worker(
sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
logInfo("Spark home: " + sparkHome)
createWorkDir()
- webUi = new WorkerWebUI(self, workDir, Some(webUiPort))
+ webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
webUi.start()
connectToMaster()
}
diff --git a/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala
index e466129c1a..c515f2e238 100644
--- a/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala
+++ b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala
@@ -16,17 +16,18 @@ import spark.Utils
import spark.ui.UIUtils
private[spark] class IndexPage(parent: WorkerWebUI) {
+ val workerActor = parent.worker.self
val worker = parent.worker
val timeout = parent.timeout
def renderJson(request: HttpServletRequest): JValue = {
- val stateFuture = (worker ? RequestWorkerState)(timeout).mapTo[WorkerState]
+ val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerState]
val workerState = Await.result(stateFuture, 30 seconds)
JsonProtocol.writeWorkerState(workerState)
}
def render(request: HttpServletRequest): Seq[Node] = {
- val stateFuture = (worker ? RequestWorkerState)(timeout).mapTo[WorkerState]
+ val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerState]
val workerState = Await.result(stateFuture, 30 seconds)
val executorHeaders = Seq("ExecutorID", "Cores", "Memory", "Job Details", "Logs")
@@ -88,11 +89,11 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
</ul>
</td>
<td>
- <a href={"log?appId=%s&executorId=%s&logType=stdout"
+ <a href={"logPage?appId=%s&executorId=%s&logType=stdout"
.format(executor.appId, executor.execId)}>stdout</a>
- <a href={"log?appId=%s&executorId=%s&logType=stderr"
+ <a href={"logPage?appId=%s&executorId=%s&logType=stderr"
.format(executor.appId, executor.execId)}>stderr</a>
- </td>
+ </td>
</tr>
}
diff --git a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
index 16564d5619..ccd55c1ce4 100644
--- a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -9,15 +9,17 @@ import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.{Handler, Server}
+import spark.deploy.worker.Worker
import spark.{Utils, Logging}
import spark.ui.JettyUtils
import spark.ui.JettyUtils._
+import spark.ui.UIUtils
/**
* Web UI server for the standalone worker.
*/
private[spark]
-class WorkerWebUI(val worker: ActorRef, val workDir: File, requestedPort: Option[Int] = None)
+class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None)
extends Logging {
implicit val timeout = Timeout(
Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds"))
@@ -33,6 +35,7 @@ class WorkerWebUI(val worker: ActorRef, val workDir: File, requestedPort: Option
val handlers = Array[(String, Handler)](
("/static", createStaticHandler(WorkerWebUI.STATIC_RESOURCE_DIR)),
("/log", (request: HttpServletRequest) => log(request)),
+ ("/logPage", (request: HttpServletRequest) => logPage(request)),
("/json", (request: HttpServletRequest) => indexPage.renderJson(request)),
("*", (request: HttpServletRequest) => indexPage.render(request))
)
@@ -51,18 +54,104 @@ class WorkerWebUI(val worker: ActorRef, val workDir: File, requestedPort: Option
}
def log(request: HttpServletRequest): String = {
+ val defaultBytes = 100 * 1024
val appId = request.getParameter("appId")
val executorId = request.getParameter("executorId")
val logType = request.getParameter("logType")
+ val offset = Option(request.getParameter("offset")).map(_.toLong)
+ val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
+ val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType)
- val maxBytes = 1024 * 1024 // Guard against OOM
- val defaultBytes = 100 * 1024
- val numBytes = Option(request.getParameter("numBytes"))
- .flatMap(s => Some(s.toInt)).getOrElse(defaultBytes)
+ val (startByte, endByte) = getByteRange(path, offset, byteLength)
+ val file = new File(path)
+ val logLength = file.length
+ val pre = "==== Bytes %s-%s of %s of %s/%s/%s ====\n"
+ .format(startByte, endByte, logLength, appId, executorId, logType)
+ pre + Utils.offsetBytes(path, startByte, endByte)
+ }
+
+ def logPage(request: HttpServletRequest): Seq[scala.xml.Node] = {
+ val defaultBytes = 100 * 1024
+ val appId = request.getParameter("appId")
+ val executorId = request.getParameter("executorId")
+ val logType = request.getParameter("logType")
+ val offset = Option(request.getParameter("offset")).map(_.toLong)
+ val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType)
- val pre = "==== Last %s bytes of %s/%s/%s ====\n".format(numBytes, appId, executorId, logType)
- pre + Utils.lastNBytes(path, math.min(numBytes, maxBytes))
+
+ val (startByte, endByte) = getByteRange(path, offset, byteLength)
+ val file = new File(path)
+ val logLength = file.length
+
+ val logText = <node>{Utils.offsetBytes(path, startByte, endByte)}</node>
+
+ val linkToMaster = <p><a href={worker.masterWebUiUrl}>Back to Master</a></p>
+
+ val range = <span>Bytes {startByte.toString} - {endByte.toString} of {logLength}</span>
+
+ val backButton =
+ if (startByte > 0) {
+ <a href={"?appId=%s&executorId=%s&logType=%s&offset=%s&byteLength=%s"
+ .format(appId, executorId, logType, math.max(startByte-byteLength, 0),
+ byteLength)}>
+ <button>Previous {Utils.memoryBytesToString(math.min(byteLength, startByte))}</button>
+ </a>
+ }
+ else {
+ <button disabled="disabled">Previous 0 B</button>
+ }
+
+ val nextButton =
+ if (endByte < logLength) {
+ <a href={"?appId=%s&executorId=%s&logType=%s&offset=%s&byteLength=%s".
+ format(appId, executorId, logType, endByte, byteLength)}>
+ <button>Next {Utils.memoryBytesToString(math.min(byteLength, logLength-endByte))}</button>
+ </a>
+ }
+ else {
+ <button disabled="disabled">Next 0 B</button>
+ }
+
+ val content =
+ <html>
+ <body>
+ {linkToMaster}
+ <hr />
+ <div>
+ <div style="float:left;width:40%">{backButton}</div>
+ <div style="float:left;">{range}</div>
+ <div style="float:right;">{nextButton}</div>
+ </div>
+ <br />
+ <div style="height:500px;overflow:auto;padding:5px;">
+ <pre>{logText}</pre>
+ </div>
+ </body>
+ </html>
+ UIUtils.basicSparkPage(content, logType + " log page for " + appId)
+ }
+
+ /** Determine the byte range for a log or log page. */
+ def getByteRange(path: String, offset: Option[Long], byteLength: Int)
+ : (Long, Long) = {
+ val defaultBytes = 100 * 1024
+ val maxBytes = 1024 * 1024
+
+ val file = new File(path)
+ val logLength = file.length()
+ val getOffset = offset.getOrElse(logLength-defaultBytes)
+
+ val startByte =
+ if (getOffset < 0) 0L
+ else if (getOffset > logLength) logLength
+ else getOffset
+
+ val logPageLength = math.min(byteLength, maxBytes)
+
+ val endByte = math.min(startByte+logPageLength, logLength)
+
+ (startByte, endByte)
}
def stop() {