aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKaren Feng <karenfeng.us@gmail.com>2013-07-10 11:47:57 -0700
committerKaren Feng <karenfeng.us@gmail.com>2013-07-10 11:47:57 -0700
commitcfb6447ac4903a870dd268836dc0d8952491d591 (patch)
tree52eaec5d8cb13ac5e506a21d2597093ecfb4041f /core
parent620a6974c6603f1c0e5a7cea8f0387a5d18f2e5e (diff)
downloadspark-cfb6447ac4903a870dd268836dc0d8952491d591.tar.gz
spark-cfb6447ac4903a870dd268836dc0d8952491d591.tar.bz2
spark-cfb6447ac4903a870dd268836dc0d8952491d591.zip
Fixed for nonexistent bytes, added unit tests, changed stdout-page to stdout
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/spark/Utils.scala6
-rw-r--r--core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala2
-rw-r--r--core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala2
-rw-r--r--core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala18
-rw-r--r--core/src/test/scala/spark/ui/UISuite.scala28
5 files changed, 46 insertions, 10 deletions
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 849edc13ee..512ac92d89 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -625,8 +625,10 @@ private object Utils extends Logging {
def offsetBytes(path: String, a: Long, b: Long): String = {
val file = new File(path)
val length = file.length()
- val buff = new Array[Byte](math.min((b-a).toInt, length.toInt))
- val skip = math.max(0, a)
+ val B = math.min(length, b)
+ val A = math.max(0, a)
+ val buff = new Array[Byte]((B-A).toInt)
+ val skip = A
val stream = new FileInputStream(file)
stream.skip(skip)
diff --git a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
index 49ced0d320..dae9995779 100644
--- a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala
@@ -38,7 +38,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) {
state.completedApps.find(_.id == appId).getOrElse(null)
})
- val executorHeaders = Seq("ExecutorID", "Worker", "Cores", "Memory", "State", "Log Pages")
+ val executorHeaders = Seq("ExecutorID", "Worker", "Cores", "Memory", "State", "Logs")
val executors = app.executors.values.toSeq
val executorTable = UIUtils.listingTable(executorHeaders, executorRow, executors)
diff --git a/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala
index af9943853f..a9c6c6d519 100644
--- a/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala
+++ b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala
@@ -29,7 +29,7 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
val stateFuture = (worker ? RequestWorkerState)(timeout).mapTo[WorkerState]
val workerState = Await.result(stateFuture, 30 seconds)
- val executorHeaders = Seq("ExecutorID", "Cores", "Memory", "Job Details", "Log Pages")
+ val executorHeaders = Seq("ExecutorID", "Cores", "Memory", "Job Details", "Logs")
val runningExecutorTable =
UIUtils.listingTable(executorHeaders, executorRow, workerState.executors)
val finishedExecutorTable =
diff --git a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
index 602881d5f1..24356b0f63 100644
--- a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -71,7 +71,7 @@ class WorkerWebUI(val worker: ActorRef, val workDir: File, requestedPort: Option
val appId = request.getParameter("appId")
val executorId = request.getParameter("executorId")
val logType = request.getParameter("logType")
- val offset = Option(request.getParameter("offset")).map(_.toLong).getOrElse(0).asInstanceOf[Long]
+ val offset = Option(request.getParameter("offset")).map(_.toLong).getOrElse(0L)
val maxBytes = 1024 * 1024
val defaultBytes = 100 * 1024
@@ -80,12 +80,18 @@ class WorkerWebUI(val worker: ActorRef, val workDir: File, requestedPort: Option
val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType)
val logLength = new File(path).length()
val logPageLength = math.min(byteLength, maxBytes)
- val logText = <node>{Utils.offsetBytes(path, offset, offset+logPageLength)}</node>
+
+ val fixedOffset =
+ if (offset < 0) 0
+ else if (offset > logLength) logLength
+ else offset
+
+ val logText = <node>{Utils.offsetBytes(path, fixedOffset, fixedOffset+logPageLength)}</node>
val backButton =
- if (offset > 0) {
+ if (fixedOffset > 0) {
<a href={"?appId=%s&executorId=%s&logType=%s&offset=%s&byteLength=%s"
- .format(appId, executorId, logType, math.max(offset-logPageLength, 0), logPageLength)}>
+ .format(appId, executorId, logType, math.max(fixedOffset-logPageLength, 0), logPageLength)}>
<button style="float:left">back</button>
</a>
}
@@ -94,9 +100,9 @@ class WorkerWebUI(val worker: ActorRef, val workDir: File, requestedPort: Option
}
val nextButton =
- if (offset+logPageLength < logLength) {
+ if (fixedOffset+logPageLength < logLength) {
<a href={"?appId=%s&executorId=%s&logType=%s&offset=%s&byteLength=%s".
- format(appId, executorId, logType, offset+logPageLength, logPageLength)}>
+ format(appId, executorId, logType, fixedOffset+logPageLength, logPageLength)}>
<button style="float:right">next</button>
</a>
}
diff --git a/core/src/test/scala/spark/ui/UISuite.scala b/core/src/test/scala/spark/ui/UISuite.scala
index e4bb3abc33..b7a822c4bc 100644
--- a/core/src/test/scala/spark/ui/UISuite.scala
+++ b/core/src/test/scala/spark/ui/UISuite.scala
@@ -74,4 +74,32 @@ class UISuite extends FunSuite {
FileUtils.deleteDirectory(tmpDir)
}
+
+ test("reading offset bytes of a file") {
+ val tmpDir2 = Files.createTempDir()
+ val f1Path = tmpDir2 + "/f1"
+ val f1 = new FileOutputStream(f1Path)
+ f1.write("1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(Charsets.UTF_8))
+ f1.close()
+
+ // Read first few bytes
+ assert(Utils.offsetBytes(f1Path, 0, 5) === "1\n2\n3")
+
+ // Read some middle bytes
+ assert(Utils.offsetBytes(f1Path, 4, 11) === "3\n4\n5\n6")
+
+ // Read last few bytes
+ assert(Utils.offsetBytes(f1Path, 12, 18) === "7\n8\n9\n")
+
+ //Read some nonexistent bytes in the beginning
+ assert(Utils.offsetBytes(f1Path, -5, 5) === "1\n2\n3")
+
+ //Read some nonexistent bytes at the end
+ assert(Utils.offsetBytes(f1Path, 12, 22) === "7\n8\n9\n")
+
+ //Read some nonexistent bytes on both ends
+ assert(Utils.offsetBytes(f1Path, -3, 25) === "1\n2\n3\n4\n5\n6\n7\n8\n9\n")
+
+ FileUtils.deleteDirectory(tmpDir2)
+ }
}