aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei.zaharia@gmail.com>2013-07-12 20:28:21 -0700
committerMatei Zaharia <matei.zaharia@gmail.com>2013-07-12 20:28:21 -0700
commit5a7835c152acf0411e117c2fb2bca1177d8938f4 (patch)
treed6f4e58674e594b8ac9456cecea892f4ce007ebb /core
parent71ccca0cc126c3dec12df10bca9e56a04a7a39e5 (diff)
parent73984b96a8450674b472676eaa855cc2df68a754 (diff)
downloadspark-5a7835c152acf0411e117c2fb2bca1177d8938f4.tar.gz
spark-5a7835c152acf0411e117c2fb2bca1177d8938f4.tar.bz2
spark-5a7835c152acf0411e117c2fb2bca1177d8938f4.zip
Merge pull request #691 from karenfeng/logpaging
Create log pages
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/Utils.scala11
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala2
-rw-r--r--core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala4
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala2
-rw-r--r--core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala11
-rw-r--r--core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala103
-rw-r--r--core/src/test/scala/spark/UtilsSuite.scala49
-rw-r--r--core/src/test/scala/spark/ui/UISuite.scala52
8 files changed, 162 insertions, 72 deletions
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index d2bf151cbf..c6a3f97872 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -627,15 +627,16 @@ private object Utils extends Logging {
callSiteInfo.firstUserLine)
}
- /** Return a string containing the last `n` bytes of a file. */
- def lastNBytes(path: String, n: Int): String = {
+ /** Return a string containing part of a file from byte 'start' to 'end'. */
+ def offsetBytes(path: String, start: Long, end: Long): String = {
val file = new File(path)
val length = file.length()
- val buff = new Array[Byte](math.min(n, length.toInt))
- val skip = math.max(0, length - n)
+ val effectiveEnd = math.min(length, end)
+ val effectiveStart = math.max(0, start)
+ val buff = new Array[Byte]((effectiveEnd-effectiveStart).toInt)
val stream = new FileInputStream(file)
- stream.skip(skip)
+ stream.skip(effectiveStart)
stream.read(buff)
stream.close()
Source.fromBytes(buff).mkString
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index dbdc8e1057..4dd6c448a9 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -71,7 +71,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
} else {
addWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress)
context.watch(sender) // This doesn't work with remote actors but helps for testing
- sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUi.boundPort)
+ sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUi.boundPort.get)
schedule()
}
}
diff --git a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
index 33a16b5d84..8553377d8f 100644
--- a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
@@ -90,9 +90,9 @@ private[spark] class ApplicationPage(parent: MasterWebUI) {
<td>{executor.memory}</td>
<td>{executor.state}</td>
<td>
- <a href={"%s/log?appId=%s&executorId=%s&logType=stdout"
+ <a href={"%s/logPage?appId=%s&executorId=%s&logType=stdout"
.format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stdout</a>
- <a href={"%s/log?appId=%s&executorId=%s&logType=stderr"
+ <a href={"%s/logPage?appId=%s&executorId=%s&logType=stderr"
.format(executor.worker.webUiAddress, executor.application.id, executor.id)}>stderr</a>
</td>
</tr>
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 6ae1cef940..f20ea42d7f 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -77,7 +77,7 @@ private[spark] class Worker(
sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
logInfo("Spark home: " + sparkHome)
createWorkDir()
- webUi = new WorkerWebUI(self, workDir, Some(webUiPort))
+ webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
webUi.start()
connectToMaster()
}
diff --git a/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala
index e466129c1a..c515f2e238 100644
--- a/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala
+++ b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala
@@ -16,17 +16,18 @@ import spark.Utils
import spark.ui.UIUtils
private[spark] class IndexPage(parent: WorkerWebUI) {
+ val workerActor = parent.worker.self
val worker = parent.worker
val timeout = parent.timeout
def renderJson(request: HttpServletRequest): JValue = {
- val stateFuture = (worker ? RequestWorkerState)(timeout).mapTo[WorkerState]
+ val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerState]
val workerState = Await.result(stateFuture, 30 seconds)
JsonProtocol.writeWorkerState(workerState)
}
def render(request: HttpServletRequest): Seq[Node] = {
- val stateFuture = (worker ? RequestWorkerState)(timeout).mapTo[WorkerState]
+ val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerState]
val workerState = Await.result(stateFuture, 30 seconds)
val executorHeaders = Seq("ExecutorID", "Cores", "Memory", "Job Details", "Logs")
@@ -88,11 +89,11 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
</ul>
</td>
<td>
- <a href={"log?appId=%s&executorId=%s&logType=stdout"
+ <a href={"logPage?appId=%s&executorId=%s&logType=stdout"
.format(executor.appId, executor.execId)}>stdout</a>
- <a href={"log?appId=%s&executorId=%s&logType=stderr"
+ <a href={"logPage?appId=%s&executorId=%s&logType=stderr"
.format(executor.appId, executor.execId)}>stderr</a>
- </td>
+ </td>
</tr>
}
diff --git a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
index 16564d5619..ccd55c1ce4 100644
--- a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -9,15 +9,17 @@ import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.{Handler, Server}
+import spark.deploy.worker.Worker
import spark.{Utils, Logging}
import spark.ui.JettyUtils
import spark.ui.JettyUtils._
+import spark.ui.UIUtils
/**
* Web UI server for the standalone worker.
*/
private[spark]
-class WorkerWebUI(val worker: ActorRef, val workDir: File, requestedPort: Option[Int] = None)
+class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None)
extends Logging {
implicit val timeout = Timeout(
Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds"))
@@ -33,6 +35,7 @@ class WorkerWebUI(val worker: ActorRef, val workDir: File, requestedPort: Option
val handlers = Array[(String, Handler)](
("/static", createStaticHandler(WorkerWebUI.STATIC_RESOURCE_DIR)),
("/log", (request: HttpServletRequest) => log(request)),
+ ("/logPage", (request: HttpServletRequest) => logPage(request)),
("/json", (request: HttpServletRequest) => indexPage.renderJson(request)),
("*", (request: HttpServletRequest) => indexPage.render(request))
)
@@ -51,18 +54,104 @@ class WorkerWebUI(val worker: ActorRef, val workDir: File, requestedPort: Option
}
def log(request: HttpServletRequest): String = {
+ val defaultBytes = 100 * 1024
val appId = request.getParameter("appId")
val executorId = request.getParameter("executorId")
val logType = request.getParameter("logType")
+ val offset = Option(request.getParameter("offset")).map(_.toLong)
+ val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
+ val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType)
- val maxBytes = 1024 * 1024 // Guard against OOM
- val defaultBytes = 100 * 1024
- val numBytes = Option(request.getParameter("numBytes"))
- .flatMap(s => Some(s.toInt)).getOrElse(defaultBytes)
+ val (startByte, endByte) = getByteRange(path, offset, byteLength)
+ val file = new File(path)
+ val logLength = file.length
+ val pre = "==== Bytes %s-%s of %s of %s/%s/%s ====\n"
+ .format(startByte, endByte, logLength, appId, executorId, logType)
+ pre + Utils.offsetBytes(path, startByte, endByte)
+ }
+
+ def logPage(request: HttpServletRequest): Seq[scala.xml.Node] = {
+ val defaultBytes = 100 * 1024
+ val appId = request.getParameter("appId")
+ val executorId = request.getParameter("executorId")
+ val logType = request.getParameter("logType")
+ val offset = Option(request.getParameter("offset")).map(_.toLong)
+ val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes)
val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType)
- val pre = "==== Last %s bytes of %s/%s/%s ====\n".format(numBytes, appId, executorId, logType)
- pre + Utils.lastNBytes(path, math.min(numBytes, maxBytes))
+
+ val (startByte, endByte) = getByteRange(path, offset, byteLength)
+ val file = new File(path)
+ val logLength = file.length
+
+ val logText = <node>{Utils.offsetBytes(path, startByte, endByte)}</node>
+
+ val linkToMaster = <p><a href={worker.masterWebUiUrl}>Back to Master</a></p>
+
+ val range = <span>Bytes {startByte.toString} - {endByte.toString} of {logLength}</span>
+
+ val backButton =
+ if (startByte > 0) {
+ <a href={"?appId=%s&executorId=%s&logType=%s&offset=%s&byteLength=%s"
+ .format(appId, executorId, logType, math.max(startByte-byteLength, 0),
+ byteLength)}>
+ <button>Previous {Utils.memoryBytesToString(math.min(byteLength, startByte))}</button>
+ </a>
+ }
+ else {
+ <button disabled="disabled">Previous 0 B</button>
+ }
+
+ val nextButton =
+ if (endByte < logLength) {
+ <a href={"?appId=%s&executorId=%s&logType=%s&offset=%s&byteLength=%s".
+ format(appId, executorId, logType, endByte, byteLength)}>
+ <button>Next {Utils.memoryBytesToString(math.min(byteLength, logLength-endByte))}</button>
+ </a>
+ }
+ else {
+ <button disabled="disabled">Next 0 B</button>
+ }
+
+ val content =
+ <html>
+ <body>
+ {linkToMaster}
+ <hr />
+ <div>
+ <div style="float:left;width:40%">{backButton}</div>
+ <div style="float:left;">{range}</div>
+ <div style="float:right;">{nextButton}</div>
+ </div>
+ <br />
+ <div style="height:500px;overflow:auto;padding:5px;">
+ <pre>{logText}</pre>
+ </div>
+ </body>
+ </html>
+ UIUtils.basicSparkPage(content, logType + " log page for " + appId)
+ }
+
+ /** Determine the byte range for a log or log page. */
+ def getByteRange(path: String, offset: Option[Long], byteLength: Int)
+ : (Long, Long) = {
+ val defaultBytes = 100 * 1024
+ val maxBytes = 1024 * 1024
+
+ val file = new File(path)
+ val logLength = file.length()
+ val getOffset = offset.getOrElse(logLength-defaultBytes)
+
+ val startByte =
+ if (getOffset < 0) 0L
+ else if (getOffset > logLength) logLength
+ else getOffset
+
+ val logPageLength = math.min(byteLength, maxBytes)
+
+ val endByte = math.min(startByte+logPageLength, logLength)
+
+ (startByte, endByte)
}
def stop() {
diff --git a/core/src/test/scala/spark/UtilsSuite.scala b/core/src/test/scala/spark/UtilsSuite.scala
index 4a113e16bf..1e1260f606 100644
--- a/core/src/test/scala/spark/UtilsSuite.scala
+++ b/core/src/test/scala/spark/UtilsSuite.scala
@@ -1,7 +1,10 @@
package spark
+import com.google.common.base.Charsets
+import com.google.common.io.Files
+import java.io.{ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream, File}
import org.scalatest.FunSuite
-import java.io.{ByteArrayOutputStream, ByteArrayInputStream}
+import org.apache.commons.io.FileUtils
import scala.util.Random
class UtilsSuite extends FunSuite {
@@ -71,5 +74,49 @@ class UtilsSuite extends FunSuite {
assert(Utils.splitCommandString("''") === Seq(""))
assert(Utils.splitCommandString("\"\"") === Seq(""))
}
+
+ test("string formatting of time durations") {
+ val second = 1000
+ val minute = second * 60
+ val hour = minute * 60
+ def str = Utils.msDurationToString(_)
+
+ assert(str(123) === "123 ms")
+ assert(str(second) === "1.0 s")
+ assert(str(second + 462) === "1.5 s")
+ assert(str(hour) === "1.00 h")
+ assert(str(minute) === "1.0 m")
+ assert(str(minute + 4 * second + 34) === "1.1 m")
+ assert(str(10 * hour + minute + 4 * second) === "10.02 h")
+ assert(str(10 * hour + 59 * minute + 59 * second + 999) === "11.00 h")
+ }
+
+ test("reading offset bytes of a file") {
+ val tmpDir2 = Files.createTempDir()
+ val f1Path = tmpDir2 + "/f1"
+ val f1 = new FileOutputStream(f1Path)
+ f1.write("1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(Charsets.UTF_8))
+ f1.close()
+
+ // Read first few bytes
+ assert(Utils.offsetBytes(f1Path, 0, 5) === "1\n2\n3")
+
+ // Read some middle bytes
+ assert(Utils.offsetBytes(f1Path, 4, 11) === "3\n4\n5\n6")
+
+ // Read last few bytes
+ assert(Utils.offsetBytes(f1Path, 12, 18) === "7\n8\n9\n")
+
+ // Read some nonexistent bytes in the beginning
+ assert(Utils.offsetBytes(f1Path, -5, 5) === "1\n2\n3")
+
+ // Read some nonexistent bytes at the end
+ assert(Utils.offsetBytes(f1Path, 12, 22) === "7\n8\n9\n")
+
+ // Read some nonexistent bytes on both ends
+ assert(Utils.offsetBytes(f1Path, -3, 25) === "1\n2\n3\n4\n5\n6\n7\n8\n9\n")
+
+ FileUtils.deleteDirectory(tmpDir2)
+ }
}
diff --git a/core/src/test/scala/spark/ui/UISuite.scala b/core/src/test/scala/spark/ui/UISuite.scala
index e4bb3abc33..fc0c160720 100644
--- a/core/src/test/scala/spark/ui/UISuite.scala
+++ b/core/src/test/scala/spark/ui/UISuite.scala
@@ -1,14 +1,9 @@
package spark.ui
+import scala.util.{Failure, Success, Try}
+import java.net.ServerSocket
import org.scalatest.FunSuite
import org.eclipse.jetty.server.Server
-import java.net.ServerSocket
-import scala.util.{Failure, Success, Try}
-import spark.Utils
-import com.google.common.io.Files
-import org.apache.commons.io.FileUtils
-import java.io.{FileOutputStream, File}
-import com.google.common.base.Charsets
class UISuite extends FunSuite {
test("jetty port increases under contention") {
@@ -31,47 +26,4 @@ class UISuite extends FunSuite {
case Failure (e) =>
}
}
-
- test("string formatting of time durations") {
- val second = 1000
- val minute = second * 60
- val hour = minute * 60
- def str = Utils.msDurationToString(_)
-
- assert(str(123) === "123 ms")
- assert(str(second) === "1.0 s")
- assert(str(second + 462) === "1.5 s")
- assert(str(hour) === "1.00 h")
- assert(str(minute) === "1.0 m")
- assert(str(minute + 4 * second + 34) === "1.1 m")
- assert(str(10 * hour + minute + 4 * second) === "10.02 h")
- assert(str(10 * hour + 59 * minute + 59 * second + 999) === "11.00 h")
- }
-
- test("reading last n bytes of a file") {
- val tmpDir = Files.createTempDir()
-
- // File smaller than limit
- val f1Path = tmpDir + "/f1"
- val f1 = new FileOutputStream(f1Path)
- f1.write("a\nb\nc\nd".getBytes(Charsets.UTF_8))
- f1.close()
- assert(Utils.lastNBytes(f1Path, 1024) === "a\nb\nc\nd")
-
- // File larger than limit
- val f2Path = tmpDir + "/f2"
- val f2 = new FileOutputStream(f2Path)
- f2.write("1\n2\n3\n4\n5\n6\n7\n8\n".getBytes(Charsets.UTF_8))
- f2.close()
- assert(Utils.lastNBytes(f2Path, 8) === "5\n6\n7\n8\n")
-
- // Request limit too
- val f3Path = tmpDir + "/f2"
- val f3 = new FileOutputStream(f3Path)
- f3.write("1\n2\n3\n4\n5\n6\n7\n8\n".getBytes(Charsets.UTF_8))
- f3.close()
- assert(Utils.lastNBytes(f3Path, 8) === "5\n6\n7\n8\n")
-
- FileUtils.deleteDirectory(tmpDir)
- }
}