aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala
diff options
context:
space:
mode:
authorDenny <dennybritz@gmail.com>2012-08-02 16:00:33 -0700
committerDenny <dennybritz@gmail.com>2012-08-02 16:00:33 -0700
commit0008994044ca0d40facc489cbcd0371a66e7630f (patch)
tree536240b872167b00deb696322bc4a4f70c66d21a /core/src/main/scala
parent53008c2d8a8b28f4204eaafa89f0e8087dc11466 (diff)
parent71a958b0b7f628fefa82c1c2284369a6a557fa7b (diff)
downloadspark-0008994044ca0d40facc489cbcd0371a66e7630f.tar.gz
spark-0008994044ca0d40facc489cbcd0371a66e7630f.tar.bz2
spark-0008994044ca0d40facc489cbcd0371a66e7630f.zip
merged dev branch
Diffstat (limited to 'core/src/main/scala')
-rw-r--r--core/src/main/scala/spark/CacheTracker.scala13
-rw-r--r--core/src/main/scala/spark/deploy/DeployMessage.scala29
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala16
-rw-r--r--core/src/main/scala/spark/deploy/master/MasterWebUI.scala40
-rw-r--r--core/src/main/scala/spark/deploy/master/WorkerInfo.scala7
-rw-r--r--core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala20
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala31
-rw-r--r--core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala29
-rw-r--r--core/src/main/scala/spark/storage/BlockManager.scala2
9 files changed, 150 insertions, 37 deletions
diff --git a/core/src/main/scala/spark/CacheTracker.scala b/core/src/main/scala/spark/CacheTracker.scala
index 19870408d3..22110832f8 100644
--- a/core/src/main/scala/spark/CacheTracker.scala
+++ b/core/src/main/scala/spark/CacheTracker.scala
@@ -222,11 +222,16 @@ class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, blockManager: Bl
// TODO: also register a listener for when it unloads
logInfo("Computing partition " + split)
try {
- val values = new ArrayBuffer[Any]
- values ++= rdd.compute(split)
- blockManager.put(key, values.iterator, storageLevel, false)
+ // BlockManager will iterate over results from compute to create RDD
+ blockManager.put(key, rdd.compute(split), storageLevel, false)
//future.apply() // Wait for the reply from the cache tracker
- return values.iterator.asInstanceOf[Iterator[T]]
+ blockManager.get(key) match {
+ case Some(values) =>
+ return values.asInstanceOf[Iterator[T]]
+ case None =>
+ logWarning("loading partition failed after computing it " + key)
+ return null
+ }
} finally {
loading.synchronized {
loading.remove(key)
diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala
index cf5e42797b..141bbe4d57 100644
--- a/core/src/main/scala/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/spark/deploy/DeployMessage.scala
@@ -1,12 +1,17 @@
package spark.deploy
import spark.deploy.ExecutorState.ExecutorState
+import spark.deploy.master.{WorkerInfo, JobInfo}
+import spark.deploy.worker.ExecutorRunner
+import scala.collection.immutable.List
+import scala.collection.mutable.HashMap
+
sealed trait DeployMessage extends Serializable
// Worker to Master
-case class RegisterWorker(id: String, host: String, port: Int, cores: Int, memory: Int)
+case class RegisterWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int)
extends DeployMessage
case class ExecutorStateChanged(
@@ -18,7 +23,7 @@ case class ExecutorStateChanged(
// Master to Worker
-case object RegisteredWorker extends DeployMessage
+case class RegisteredWorker(masterWebUiUrl: String) extends DeployMessage
case class RegisterWorkerFailed(message: String) extends DeployMessage
case class KillExecutor(jobId: String, execId: Int) extends DeployMessage
@@ -44,4 +49,22 @@ case class JobKilled(message: String)
// Internal message in Client
-case object StopClient \ No newline at end of file
+case object StopClient
+
+// MasterWebUI To Master
+
+case object RequestMasterState
+
+// Master to MasterWebUI
+
+case class MasterState(uri : String, workers: List[WorkerInfo], activeJobs: List[JobInfo],
+ completedJobs: List[JobInfo])
+
+// WorkerWebUI to Worker
+case object RequestWorkerState
+
+// Worker to WorkerWebUI
+
+case class WorkerState(uri: String, workerId: String, executors: List[ExecutorRunner],
+ finishedExecutors: List[ExecutorRunner], masterUrl: String, cores: Int, memory: Int,
+ coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String) \ No newline at end of file
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 9114004411..c98dddea7b 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -51,15 +51,15 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
}
override def receive = {
- case RegisterWorker(id, host, workerPort, cores, memory) => {
+ case RegisterWorker(id, host, workerPort, cores, memory, worker_webUiPort) => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
host, workerPort, cores, Utils.memoryMegabytesToString(memory)))
if (idToWorker.contains(id)) {
sender ! RegisterWorkerFailed("Duplicate worker ID")
} else {
- addWorker(id, host, workerPort, cores, memory)
+ addWorker(id, host, workerPort, cores, memory, worker_webUiPort)
context.watch(sender) // This doesn't work with remote actors but helps for testing
- sender ! RegisteredWorker
+ sender ! RegisteredWorker("http://" + ip + ":" + webUiPort)
schedule()
}
}
@@ -112,6 +112,10 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
addressToWorker.get(address).foreach(removeWorker)
addressToJob.get(address).foreach(removeJob)
}
+
+ case RequestMasterState => {
+ sender ! MasterState(ip + ":" + port, workers.toList, jobs.toList, completedJobs.toList)
+ }
}
/**
@@ -131,6 +135,7 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
}
if (job.coresLeft == 0) {
waitingJobs -= job
+ job.state = JobState.RUNNING
}
}
}
@@ -143,8 +148,8 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
exec.job.actor ! ExecutorAdded(exec.id, worker.id, worker.host, exec.cores, exec.memory)
}
- def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int): WorkerInfo = {
- val worker = new WorkerInfo(id, host, port, cores, memory, sender)
+ def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int): WorkerInfo = {
+ val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort)
workers += worker
idToWorker(worker.id) = worker
actorToWorker(sender) = worker
@@ -186,6 +191,7 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
exec.worker.removeExecutor(exec)
exec.worker.actor ! KillExecutor(exec.job.id, exec.id)
}
+ job.state = JobState.FINISHED
schedule()
}
diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
index b0c871dd7b..f03c0a0229 100644
--- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
+++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala
@@ -1,17 +1,53 @@
package spark.deploy.master
import akka.actor.{ActorRef, ActorSystem}
+import akka.dispatch.Await
+import akka.pattern.ask
+import akka.util.Timeout
+import akka.util.duration._
import cc.spray.Directives
+import cc.spray.directives._
+import cc.spray.typeconversion.TwirlSupport._
+import spark.deploy._
class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives {
val RESOURCE_DIR = "spark/deploy/master/webui"
-
+ val STATIC_RESOURCE_DIR = "spark/deploy/static"
+
+ implicit val timeout = Timeout(1 seconds)
+
val handler = {
get {
path("") {
- getFromResource(RESOURCE_DIR + "/index.html")
+ completeWith {
+ val future = master ? RequestMasterState
+ future.map {
+ masterState => masterui.html.index.render(masterState.asInstanceOf[MasterState])
+ }
+ }
+ } ~
+ path("job") {
+ parameter("jobId") { jobId =>
+ completeWith {
+ val future = master ? RequestMasterState
+ future.map { state =>
+ val masterState = state.asInstanceOf[MasterState]
+
+ // A bit ugly an inefficient, but we won't have a number of jobs
+ // so large that it will make a significant difference.
+ (masterState.activeJobs ::: masterState.completedJobs).find(_.id == jobId) match {
+ case Some(job) => masterui.html.job_details.render(job)
+ case _ => null
+ }
+ }
+ }
+ }
+ } ~
+ pathPrefix("static") {
+ getFromResourceDirectory(STATIC_RESOURCE_DIR)
} ~
getFromResourceDirectory(RESOURCE_DIR)
}
}
+
}
diff --git a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
index af0be108ea..59474a0945 100644
--- a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
@@ -9,7 +9,8 @@ class WorkerInfo(
val port: Int,
val cores: Int,
val memory: Int,
- val actor: ActorRef) {
+ val actor: ActorRef,
+ val webUiPort: Int) {
var executors = new mutable.HashMap[String, ExecutorInfo] // fullId => info
@@ -32,4 +33,8 @@ class WorkerInfo(
memoryUsed -= exec.memory
}
}
+
+ def webUiAddress : String = {
+ "http://" + this.host + ":" + this.webUiPort
+ }
}
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index ecd558546b..3e24380810 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -14,16 +14,16 @@ import spark.deploy.ExecutorStateChanged
* Manages the execution of one executor process.
*/
class ExecutorRunner(
- jobId: String,
- execId: Int,
- jobDesc: JobDescription,
- cores: Int,
- memory: Int,
- worker: ActorRef,
- workerId: String,
- hostname: String,
- sparkHome: File,
- workDir: File)
+ val jobId: String,
+ val execId: Int,
+ val jobDesc: JobDescription,
+ val cores: Int,
+ val memory: Int,
+ val worker: ActorRef,
+ val workerId: String,
+ val hostname: String,
+ val sparkHome: File,
+ val workDir: File)
extends Logging {
val fullId = jobId + "/" + execId
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index f8142b0fca..0a80463c0b 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -23,11 +23,12 @@ class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, mas
val MASTER_REGEX = "spark://([^:]+):([0-9]+)".r
var master: ActorRef = null
+ var masterWebUiUrl : String = ""
val workerId = generateWorkerId()
var sparkHome: File = null
var workDir: File = null
val executors = new HashMap[String, ExecutorRunner]
- val finishedExecutors = new ArrayBuffer[String]
+ val finishedExecutors = new HashMap[String, ExecutorRunner]
var coresUsed = 0
var memoryUsed = 0
@@ -67,7 +68,7 @@ class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, mas
val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort)
try {
master = context.actorFor(akkaUrl)
- master ! RegisterWorker(workerId, ip, port, cores, memory)
+ master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
context.watch(master) // Doesn't work with remote actors, but useful for testing
} catch {
@@ -95,7 +96,8 @@ class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, mas
}
override def receive = {
- case RegisteredWorker =>
+ case RegisteredWorker(url) =>
+ masterWebUiUrl = url
logInfo("Successfully registered with master")
case RegisterWorkerFailed(message) =>
@@ -108,25 +110,36 @@ class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, mas
jobId, execId, jobDesc, cores_, memory_, self, workerId, ip, sparkHome, workDir)
executors(jobId + "/" + execId) = manager
manager.start()
+ coresUsed += cores_
+ memoryUsed += memory_
master ! ExecutorStateChanged(jobId, execId, ExecutorState.LOADING, None)
case ExecutorStateChanged(jobId, execId, state, message) =>
master ! ExecutorStateChanged(jobId, execId, state, message)
+ val fullId = jobId + "/" + execId
if (ExecutorState.isFinished(state)) {
- logInfo("Executor " + jobId + "/" + execId + " finished with state " + state)
- executors -= jobId + "/" + execId
- finishedExecutors += jobId + "/" + execId
+ val executor = executors(fullId)
+ logInfo("Executor " + fullId + " finished with state " + state)
+ finishedExecutors(fullId) = executor
+ executors -= fullId
+ coresUsed -= executor.cores
+ memoryUsed -= executor.memory
}
case KillExecutor(jobId, execId) =>
val fullId = jobId + "/" + execId
+ val executor = executors(fullId)
logInfo("Asked to kill executor " + fullId)
- executors(jobId + "/" + execId).kill()
- executors -= fullId
- finishedExecutors += fullId
+ executor.kill()
case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
masterDisconnected()
+
+ case RequestWorkerState => {
+ sender ! WorkerState(ip + ":" + port, workerId, executors.values.toList,
+ finishedExecutors.values.toList, masterUrl, cores, memory,
+ coresUsed, memoryUsed, masterWebUiUrl)
+ }
}
def masterDisconnected() {
diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
index efd3822e61..58a05e1a38 100644
--- a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
+++ b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala
@@ -1,17 +1,42 @@
package spark.deploy.worker
import akka.actor.{ActorRef, ActorSystem}
+import akka.dispatch.Await
+import akka.pattern.ask
+import akka.util.Timeout
+import akka.util.duration._
import cc.spray.Directives
+import cc.spray.typeconversion.TwirlSupport._
+import spark.deploy.{WorkerState, RequestWorkerState}
class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Directives {
val RESOURCE_DIR = "spark/deploy/worker/webui"
-
+ val STATIC_RESOURCE_DIR = "spark/deploy/static"
+
+ implicit val timeout = Timeout(1 seconds)
+
val handler = {
get {
path("") {
- getFromResource(RESOURCE_DIR + "/index.html")
+ completeWith{
+ val future = worker ? RequestWorkerState
+ future.map { workerState =>
+ workerui.html.index(workerState.asInstanceOf[WorkerState])
+ }
+ }
+ } ~
+ path("log") {
+ parameters("jobId", "executorId", "logType") { (jobId, executorId, logType) =>
+ respondWithMediaType(cc.spray.http.MediaTypes.`text/plain`) {
+ getFromFileName("work/" + jobId + "/" + executorId + "/" + logType)
+ }
+ }
+ } ~
+ pathPrefix("static") {
+ getFromResourceDirectory(STATIC_RESOURCE_DIR)
} ~
getFromResourceDirectory(RESOURCE_DIR)
}
}
+
}
diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala
index 5067601198..cde74e5805 100644
--- a/core/src/main/scala/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/spark/storage/BlockManager.scala
@@ -580,6 +580,6 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
object BlockManager {
def getMaxMemoryFromSystemProperties(): Long = {
val memoryFraction = System.getProperty("spark.storage.memoryFraction", "0.66").toDouble
- (Runtime.getRuntime.totalMemory * memoryFraction).toLong
+ (Runtime.getRuntime.maxMemory * memoryFraction).toLong
}
}