diff options
author | Karen Feng <karenfeng.us@gmail.com> | 2013-07-10 11:47:57 -0700 |
---|---|---|
committer | Karen Feng <karenfeng.us@gmail.com> | 2013-07-10 11:47:57 -0700 |
commit | cfb6447ac4903a870dd268836dc0d8952491d591 (patch) | |
tree | 52eaec5d8cb13ac5e506a21d2597093ecfb4041f /core/src | |
parent | 620a6974c6603f1c0e5a7cea8f0387a5d18f2e5e (diff) | |
download | spark-cfb6447ac4903a870dd268836dc0d8952491d591.tar.gz spark-cfb6447ac4903a870dd268836dc0d8952491d591.tar.bz2 spark-cfb6447ac4903a870dd268836dc0d8952491d591.zip |
Fixed for nonexistent bytes, added unit tests, changed stdout-page to stdout
Diffstat (limited to 'core/src')
5 files changed, 46 insertions, 10 deletions
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala index 849edc13ee..512ac92d89 100644 --- a/core/src/main/scala/spark/Utils.scala +++ b/core/src/main/scala/spark/Utils.scala @@ -625,8 +625,10 @@ private object Utils extends Logging { def offsetBytes(path: String, a: Long, b: Long): String = { val file = new File(path) val length = file.length() - val buff = new Array[Byte](math.min((b-a).toInt, length.toInt)) - val skip = math.max(0, a) + val B = math.min(length, b) + val A = math.max(0, a) + val buff = new Array[Byte]((B-A).toInt) + val skip = A val stream = new FileInputStream(file) stream.skip(skip) diff --git a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala index 49ced0d320..dae9995779 100644 --- a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala @@ -38,7 +38,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) { state.completedApps.find(_.id == appId).getOrElse(null) }) - val executorHeaders = Seq("ExecutorID", "Worker", "Cores", "Memory", "State", "Log Pages") + val executorHeaders = Seq("ExecutorID", "Worker", "Cores", "Memory", "State", "Logs") val executors = app.executors.values.toSeq val executorTable = UIUtils.listingTable(executorHeaders, executorRow, executors) diff --git a/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala index af9943853f..a9c6c6d519 100644 --- a/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala +++ b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala @@ -29,7 +29,7 @@ private[spark] class IndexPage(parent: WorkerWebUI) { val stateFuture = (worker ? RequestWorkerState)(timeout).mapTo[WorkerState] val workerState = Await.result(stateFuture, 30 seconds) - val executorHeaders = Seq("ExecutorID", "Cores", "Memory", "Job Details", "Log Pages") + val executorHeaders = Seq("ExecutorID", "Cores", "Memory", "Job Details", "Logs") val runningExecutorTable = UIUtils.listingTable(executorHeaders, executorRow, workerState.executors) val finishedExecutorTable = diff --git a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala index 602881d5f1..24356b0f63 100644 --- a/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/spark/deploy/worker/ui/WorkerWebUI.scala @@ -71,7 +71,7 @@ class WorkerWebUI(val worker: ActorRef, val workDir: File, requestedPort: Option val appId = request.getParameter("appId") val executorId = request.getParameter("executorId") val logType = request.getParameter("logType") - val offset = Option(request.getParameter("offset")).map(_.toLong).getOrElse(0).asInstanceOf[Long] + val offset = Option(request.getParameter("offset")).map(_.toLong).getOrElse(0L) val maxBytes = 1024 * 1024 val defaultBytes = 100 * 1024 @@ -80,12 +80,18 @@ class WorkerWebUI(val worker: ActorRef, val workDir: File, requestedPort: Option val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType) val logLength = new File(path).length() val logPageLength = math.min(byteLength, maxBytes) - val logText = <node>{Utils.offsetBytes(path, offset, offset+logPageLength)}</node> + + val fixedOffset = + if (offset < 0) 0 + else if (offset > logLength) logLength + else offset + + val logText = <node>{Utils.offsetBytes(path, fixedOffset, fixedOffset+logPageLength)}</node> val backButton = - if (offset > 0) { + if (fixedOffset > 0) { <a href={"?appId=%s&executorId=%s&logType=%s&offset=%s&byteLength=%s" - .format(appId, executorId, logType, math.max(offset-logPageLength, 0), logPageLength)}> + .format(appId, executorId, logType, math.max(fixedOffset-logPageLength, 0), logPageLength)}> <button style="float:left">back</button> </a> } @@ -94,9 +100,9 @@ class WorkerWebUI(val worker: ActorRef, val workDir: File, requestedPort: Option } val nextButton = - if (offset+logPageLength < logLength) { + if (fixedOffset+logPageLength < logLength) { <a href={"?appId=%s&executorId=%s&logType=%s&offset=%s&byteLength=%s". - format(appId, executorId, logType, offset+logPageLength, logPageLength)}> + format(appId, executorId, logType, fixedOffset+logPageLength, logPageLength)}> <button style="float:right">next</button> </a> } diff --git a/core/src/test/scala/spark/ui/UISuite.scala b/core/src/test/scala/spark/ui/UISuite.scala index e4bb3abc33..b7a822c4bc 100644 --- a/core/src/test/scala/spark/ui/UISuite.scala +++ b/core/src/test/scala/spark/ui/UISuite.scala @@ -74,4 +74,32 @@ class UISuite extends FunSuite { FileUtils.deleteDirectory(tmpDir) } + + test("reading offset bytes of a file") { + val tmpDir2 = Files.createTempDir() + val f1Path = tmpDir2 + "/f1" + val f1 = new FileOutputStream(f1Path) + f1.write("1\n2\n3\n4\n5\n6\n7\n8\n9\n".getBytes(Charsets.UTF_8)) + f1.close() + + // Read first few bytes + assert(Utils.offsetBytes(f1Path, 0, 5) === "1\n2\n3") + + // Read some middle bytes + assert(Utils.offsetBytes(f1Path, 4, 11) === "3\n4\n5\n6") + + // Read last few bytes + assert(Utils.offsetBytes(f1Path, 12, 18) === "7\n8\n9\n") + + //Read some nonexistent bytes in the beginning + assert(Utils.offsetBytes(f1Path, -5, 5) === "1\n2\n3") + + //Read some nonexistent bytes at the end + assert(Utils.offsetBytes(f1Path, 12, 22) === "7\n8\n9\n") + + //Read some nonexistent bytes on both ends + assert(Utils.offsetBytes(f1Path, -3, 25) === "1\n2\n3\n4\n5\n6\n7\n8\n9\n") + + FileUtils.deleteDirectory(tmpDir2) + } } |