1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
|
package spark.deploy.worker.ui
import akka.actor.ActorRef
import akka.util.{Duration, Timeout}
import java.io.{FileInputStream, File}
import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.{Handler, Server}
import spark.{Utils, Logging}
import spark.ui.JettyUtils
import spark.ui.JettyUtils._
/**
* Web UI server for the standalone worker.
*/
private[spark]
class WorkerWebUI(val worker: ActorRef, val workDir: File, requestedPort: Option[Int] = None)
extends Logging {
implicit val timeout = Timeout(
Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds"))
val host = Utils.localHostName()
val port = requestedPort.getOrElse(
System.getProperty("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt)
var server: Option[Server] = None
var boundPort: Option[Int] = None
val indexPage = new IndexPage(this)
val handlers = Array[(String, Handler)](
("/static", createStaticHandler(WorkerWebUI.STATIC_RESOURCE_DIR)),
("/log", (request: HttpServletRequest) => log(request)),
("/json", (request: HttpServletRequest) => indexPage.renderJson(request)),
("*", (request: HttpServletRequest) => indexPage.render(request))
)
def start() {
try {
val (srv, bPort) = JettyUtils.startJettyServer("0.0.0.0", port, handlers)
server = Some(srv)
boundPort = Some(bPort)
logInfo("Started Worker web UI at http://%s:%d".format(host, bPort))
} catch {
case e: Exception =>
logError("Failed to create Worker JettyUtils", e)
System.exit(1)
}
}
def log(request: HttpServletRequest): String = {
val appId = request.getParameter("appId")
val executorId = request.getParameter("executorId")
val logType = request.getParameter("logType")
val maxBytes = 1024 * 1024 // Guard against OOM
val defaultBytes = 100 * 1024
val numBytes = Option(request.getParameter("numBytes"))
.flatMap(s => Some(s.toInt)).getOrElse(defaultBytes)
val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType)
val pre = "==== Last %s bytes of %s/%s/%s ====\n".format(numBytes, appId, executorId, logType)
pre + Utils.lastNBytes(path, math.min(numBytes, maxBytes))
}
def stop() {
server.foreach(_.stop())
}
}
private[spark] object WorkerWebUI {
val STATIC_RESOURCE_DIR = "spark/ui/static"
val DEFAULT_PORT="8081"
}
|