diff options
author | Matei Zaharia <matei.zaharia@gmail.com> | 2013-07-29 19:44:33 -0700 |
---|---|---|
committer | Matei Zaharia <matei.zaharia@gmail.com> | 2013-07-29 19:44:33 -0700 |
commit | 468a36c00526872396196458fd7875fd06ac7108 (patch) | |
tree | e9cb2f707e5ba8b01b352dac391908a087da49c9 | |
parent | 1e1ffb192a412d19d368fbf1f32de6f3dffbbec7 (diff) | |
parent | 81720e13fc9e1f475dd1333babfa08f3f806a5d0 (diff) | |
download | spark-468a36c00526872396196458fd7875fd06ac7108.tar.gz spark-468a36c00526872396196458fd7875fd06ac7108.tar.bz2 spark-468a36c00526872396196458fd7875fd06ac7108.zip |
Merge pull request #746 from rxin/cleanup
Internal cleanup
22 files changed, 285 insertions, 397 deletions
diff --git a/core/src/main/scala/spark/Cache.scala b/core/src/main/scala/spark/Cache.scala deleted file mode 100644 index b0c83ce59d..0000000000 --- a/core/src/main/scala/spark/Cache.scala +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package spark - -import java.util.concurrent.atomic.AtomicInteger - -private[spark] sealed trait CachePutResponse -private[spark] case class CachePutSuccess(size: Long) extends CachePutResponse -private[spark] case class CachePutFailure() extends CachePutResponse - -/** - * An interface for caches in Spark, to allow for multiple implementations. Caches are used to store - * both partitions of cached RDDs and broadcast variables on Spark executors. Caches are also aware - * of which entries are part of the same dataset (for example, partitions in the same RDD). The key - * for each value in a cache is a (datasetID, partition) pair. - * - * A single Cache instance gets created on each machine and is shared by all caches (i.e. both the - * RDD split cache and the broadcast variable cache), to enable global replacement policies. - * However, because these several independent modules all perform caching, it is important to give - * them separate key namespaces, so that an RDD and a broadcast variable (for example) do not use - * the same key. For this purpose, Cache has the notion of KeySpaces. Each client module must first - * ask for a KeySpace, and then call get() and put() on that space using its own keys. - * - * This abstract class handles the creation of key spaces, so that subclasses need only deal with - * keys that are unique across modules. - */ -private[spark] abstract class Cache { - private val nextKeySpaceId = new AtomicInteger(0) - private def newKeySpaceId() = nextKeySpaceId.getAndIncrement() - - def newKeySpace() = new KeySpace(this, newKeySpaceId()) - - /** - * Get the value for a given (datasetId, partition), or null if it is not - * found. - */ - def get(datasetId: Any, partition: Int): Any - - /** - * Attempt to put a value in the cache; returns CachePutFailure if this was - * not successful (e.g. because the cache replacement policy forbids it), and - * CachePutSuccess if successful. If size estimation is available, the cache - * implementation should set the size field in CachePutSuccess. - */ - def put(datasetId: Any, partition: Int, value: Any): CachePutResponse - - /** - * Report the capacity of the cache partition. By default this just reports - * zero. Specific implementations can choose to provide the capacity number. - */ - def getCapacity: Long = 0L -} - -/** - * A key namespace in a Cache. - */ -private[spark] class KeySpace(cache: Cache, val keySpaceId: Int) { - def get(datasetId: Any, partition: Int): Any = - cache.get((keySpaceId, datasetId), partition) - - def put(datasetId: Any, partition: Int, value: Any): CachePutResponse = - cache.put((keySpaceId, datasetId), partition, value) - - def getCapacity: Long = cache.getCapacity -} diff --git a/core/src/main/scala/spark/SoftReferenceCache.scala b/core/src/main/scala/spark/SoftReferenceCache.scala deleted file mode 100644 index f41a379582..0000000000 --- a/core/src/main/scala/spark/SoftReferenceCache.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package spark - -import com.google.common.collect.MapMaker - -/** - * An implementation of Cache that uses soft references. - */ -private[spark] class SoftReferenceCache extends Cache { - val map = new MapMaker().softValues().makeMap[Any, Any]() - - override def get(datasetId: Any, partition: Int): Any = - map.get((datasetId, partition)) - - override def put(datasetId: Any, partition: Int, value: Any): CachePutResponse = { - map.put((datasetId, partition), value) - return CachePutSuccess(0) - } -} diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala index e1f8aff6f5..7c37a16615 100644 --- a/core/src/main/scala/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/spark/deploy/DeployMessage.scala @@ -17,109 +17,107 @@ package spark.deploy +import scala.collection.immutable.List + +import spark.Utils import spark.deploy.ExecutorState.ExecutorState import spark.deploy.master.{WorkerInfo, ApplicationInfo} import spark.deploy.worker.ExecutorRunner -import scala.collection.immutable.List -import spark.Utils -private[spark] sealed trait DeployMessage extends Serializable +private[deploy] sealed trait DeployMessage extends Serializable -// Worker to Master +private[deploy] object DeployMessages { -private[spark] -case class RegisterWorker( - id: String, - host: String, - port: Int, - cores: Int, - memory: Int, - webUiPort: Int, - publicAddress: String) - extends DeployMessage { - Utils.checkHost(host, "Required hostname") - assert (port > 0) -} + // Worker to Master -private[spark] -case class ExecutorStateChanged( - appId: String, - execId: Int, - state: ExecutorState, - message: Option[String], - exitStatus: Option[Int]) - extends DeployMessage + case class RegisterWorker( + id: String, + host: String, + port: Int, + cores: Int, + memory: Int, + webUiPort: Int, + publicAddress: String) + extends DeployMessage { + Utils.checkHost(host, "Required hostname") + assert (port > 0) + } -private[spark] case class Heartbeat(workerId: String) extends DeployMessage + case class ExecutorStateChanged( + appId: String, + execId: Int, + state: ExecutorState, + message: Option[String], + exitStatus: Option[Int]) + extends DeployMessage -// Master to Worker + case class Heartbeat(workerId: String) extends DeployMessage -private[spark] case class RegisteredWorker(masterWebUiUrl: String) extends DeployMessage -private[spark] case class RegisterWorkerFailed(message: String) extends DeployMessage -private[spark] case class KillExecutor(appId: String, execId: Int) extends DeployMessage + // Master to Worker -private[spark] case class LaunchExecutor( - appId: String, - execId: Int, - appDesc: ApplicationDescription, - cores: Int, - memory: Int, - sparkHome: String) - extends DeployMessage + case class RegisteredWorker(masterWebUiUrl: String) extends DeployMessage -// Client to Master + case class RegisterWorkerFailed(message: String) extends DeployMessage -private[spark] case class RegisterApplication(appDescription: ApplicationDescription) - extends DeployMessage + case class KillExecutor(appId: String, execId: Int) extends DeployMessage -// Master to Client + case class LaunchExecutor( + appId: String, + execId: Int, + appDesc: ApplicationDescription, + cores: Int, + memory: Int, + sparkHome: String) + extends DeployMessage -private[spark] -case class RegisteredApplication(appId: String) extends DeployMessage + // Client to Master -private[spark] -case class ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) { - Utils.checkHostPort(hostPort, "Required hostport") -} + case class RegisterApplication(appDescription: ApplicationDescription) + extends DeployMessage -private[spark] -case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String], - exitStatus: Option[Int]) + // Master to Client -private[spark] -case class ApplicationRemoved(message: String) + case class RegisteredApplication(appId: String) extends DeployMessage -// Internal message in Client + case class ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) { + Utils.checkHostPort(hostPort, "Required hostport") + } -private[spark] case object StopClient + case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String], + exitStatus: Option[Int]) -// MasterWebUI To Master + case class ApplicationRemoved(message: String) -private[spark] case object RequestMasterState + // Internal message in Client -// Master to MasterWebUI + case object StopClient -private[spark] -case class MasterState(host: String, port: Int, workers: Array[WorkerInfo], - activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo]) { + // MasterWebUI To Master - Utils.checkHost(host, "Required hostname") - assert (port > 0) + case object RequestMasterState - def uri = "spark://" + host + ":" + port -} + // Master to MasterWebUI + + case class MasterStateResponse(host: String, port: Int, workers: Array[WorkerInfo], + activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo]) { + + Utils.checkHost(host, "Required hostname") + assert (port > 0) + + def uri = "spark://" + host + ":" + port + } -// WorkerWebUI to Worker -private[spark] case object RequestWorkerState + // WorkerWebUI to Worker + case object RequestWorkerState -// Worker to WorkerWebUI + // Worker to WorkerWebUI -private[spark] -case class WorkerState(host: String, port: Int, workerId: String, executors: List[ExecutorRunner], - finishedExecutors: List[ExecutorRunner], masterUrl: String, cores: Int, memory: Int, - coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String) { + case class WorkerStateResponse(host: String, port: Int, workerId: String, + executors: List[ExecutorRunner], finishedExecutors: List[ExecutorRunner], masterUrl: String, + cores: Int, memory: Int, coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String) { - Utils.checkHost(host, "Required hostname") - assert (port > 0) + Utils.checkHost(host, "Required hostname") + assert (port > 0) + } } diff --git a/core/src/main/scala/spark/deploy/JsonProtocol.scala b/core/src/main/scala/spark/deploy/JsonProtocol.scala index 64f89623e1..bd1db7c294 100644 --- a/core/src/main/scala/spark/deploy/JsonProtocol.scala +++ b/core/src/main/scala/spark/deploy/JsonProtocol.scala @@ -17,9 +17,12 @@ package spark.deploy -import master.{ApplicationInfo, WorkerInfo} import net.liftweb.json.JsonDSL._ -import worker.ExecutorRunner + +import spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse} +import spark.deploy.master.{ApplicationInfo, WorkerInfo} +import spark.deploy.worker.ExecutorRunner + private[spark] object JsonProtocol { def writeWorkerInfo(obj: WorkerInfo) = { @@ -57,7 +60,7 @@ private[spark] object JsonProtocol { ("appdesc" -> writeApplicationDescription(obj.appDesc)) } - def writeMasterState(obj: MasterState) = { + def writeMasterState(obj: MasterStateResponse) = { ("url" -> ("spark://" + obj.uri)) ~ ("workers" -> obj.workers.toList.map(writeWorkerInfo)) ~ ("cores" -> obj.workers.map(_.cores).sum) ~ @@ -68,7 +71,7 @@ private[spark] object JsonProtocol { ("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo)) } - def writeWorkerState(obj: WorkerState) = { + def writeWorkerState(obj: WorkerStateResponse) = { ("id" -> obj.workerId) ~ ("masterurl" -> obj.masterUrl) ~ ("masterwebuiurl" -> obj.masterWebUiUrl) ~ diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala index 29e494f495..9d5ba8a796 100644 --- a/core/src/main/scala/spark/deploy/client/Client.scala +++ b/core/src/main/scala/spark/deploy/client/Client.scala @@ -17,21 +17,23 @@ package spark.deploy.client -import spark.deploy._ +import java.util.concurrent.TimeoutException + import akka.actor._ +import akka.actor.Terminated import akka.pattern.ask import akka.util.Duration -import akka.util.duration._ -import java.util.concurrent.TimeoutException -import spark.{SparkException, Logging} +import akka.remote.RemoteClientDisconnected import akka.remote.RemoteClientLifeCycleEvent import akka.remote.RemoteClientShutdown -import spark.deploy.RegisterApplication -import spark.deploy.master.Master -import akka.remote.RemoteClientDisconnected -import akka.actor.Terminated import akka.dispatch.Await +import spark.Logging +import spark.deploy.{ApplicationDescription, ExecutorState} +import spark.deploy.DeployMessages._ +import spark.deploy.master.Master + + /** * The main class used to talk to a Spark deploy cluster. Takes a master URL, an app description, * and a listener for cluster events, and calls back the listener when various events occur. diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index 9692af5295..202d5bcdb7 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -17,21 +17,22 @@ package spark.deploy.master -import akka.actor._ -import akka.actor.Terminated -import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown} -import akka.util.duration._ - import java.text.SimpleDateFormat import java.util.Date import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} -import spark.deploy._ +import akka.actor._ +import akka.actor.Terminated +import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown} +import akka.util.duration._ + import spark.{Logging, SparkException, Utils} +import spark.deploy.{ApplicationDescription, ExecutorState} +import spark.deploy.DeployMessages._ +import spark.deploy.master.ui.MasterWebUI import spark.metrics.MetricsSystem import spark.util.AkkaUtils -import ui.MasterWebUI private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging { @@ -168,7 +169,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } case RequestMasterState => { - sender ! MasterState(host, port, workers.toArray, apps.toArray, completedApps.toArray) + sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray) } } @@ -233,20 +234,27 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) worker.addExecutor(exec) - worker.actor ! LaunchExecutor(exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory, sparkHome) - exec.application.driver ! ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory) + worker.actor ! LaunchExecutor( + exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory, sparkHome) + exec.application.driver ! ExecutorAdded( + exec.id, worker.id, worker.hostPort, exec.cores, exec.memory) } def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int, publicAddress: String): WorkerInfo = { - // There may be one or more refs to dead workers on this same node (w/ different ID's), remove them. - workers.filter(w => (w.host == host && w.port == port) && (w.state == WorkerState.DEAD)).foreach(workers -= _) + // There may be one or more refs to dead workers on this same node (w/ different ID's), + // remove them. + workers.filter { w => + (w.host == host && w.port == port) && (w.state == WorkerState.DEAD) + }.foreach { w => + workers -= w + } val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress) workers += worker idToWorker(worker.id) = worker actorToWorker(sender) = worker addressToWorker(sender.path.address) = worker - return worker + worker } def removeWorker(worker: WorkerInfo) { @@ -257,7 +265,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act addressToWorker -= worker.actor.path.address for (exec <- worker.executors.values) { logInfo("Telling app of lost executor: " + exec.id) - exec.application.driver ! ExecutorUpdated(exec.id, ExecutorState.LOST, Some("worker lost"), None) + exec.application.driver ! ExecutorUpdated( + exec.id, ExecutorState.LOST, Some("worker lost"), None) exec.application.removeExecutor(exec) } } @@ -277,7 +286,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act if (workersAlive.size > 0 && !workersAlive.exists(_.memoryFree >= desc.memoryPerSlave)) { logWarning("Could not find any workers with enough memory for " + firstApp.get.id) } - return app + app } def finishApplication(app: ApplicationInfo) { diff --git a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala index 32264af393..b4c62bc224 100644 --- a/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/spark/deploy/master/ui/ApplicationPage.scala @@ -17,6 +17,8 @@ package spark.deploy.master.ui +import scala.xml.Node + import akka.dispatch.Await import akka.pattern.ask import akka.util.duration._ @@ -25,9 +27,8 @@ import javax.servlet.http.HttpServletRequest import net.liftweb.json.JsonAST.JValue -import scala.xml.Node - -import spark.deploy.{RequestMasterState, JsonProtocol, MasterState} +import spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} +import spark.deploy.JsonProtocol import spark.deploy.master.ExecutorInfo import spark.ui.UIUtils @@ -38,7 +39,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) { /** Executor details for a particular application */ def renderJson(request: HttpServletRequest): JValue = { val appId = request.getParameter("appId") - val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterState] + val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] val state = Await.result(stateFuture, 30 seconds) val app = state.activeApps.find(_.id == appId).getOrElse({ state.completedApps.find(_.id == appId).getOrElse(null) @@ -49,7 +50,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) { /** Executor details for a particular application */ def render(request: HttpServletRequest): Seq[Node] = { val appId = request.getParameter("appId") - val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterState] + val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] val state = Await.result(stateFuture, 30 seconds) val app = state.activeApps.find(_.id == appId).getOrElse({ state.completedApps.find(_.id == appId).getOrElse(null) diff --git a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala index b05197c1b9..557df89b41 100644 --- a/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala +++ b/core/src/main/scala/spark/deploy/master/ui/IndexPage.scala @@ -17,18 +17,20 @@ package spark.deploy.master.ui -import akka.dispatch.Await -import akka.pattern.ask -import akka.util.duration._ - import javax.servlet.http.HttpServletRequest import scala.xml.Node -import spark.deploy.{RequestMasterState, DeployWebUI, MasterState} +import akka.dispatch.Await +import akka.pattern.ask +import akka.util.duration._ + import spark.Utils -import spark.ui.UIUtils +import spark.deploy.DeployWebUI +import spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} import spark.deploy.master.{ApplicationInfo, WorkerInfo} +import spark.ui.UIUtils + private[spark] class IndexPage(parent: MasterWebUI) { val master = parent.master @@ -36,7 +38,7 @@ private[spark] class IndexPage(parent: MasterWebUI) { /** Index view listing applications and executors */ def render(request: HttpServletRequest): Seq[Node] = { - val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterState] + val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] val state = Await.result(stateFuture, 30 seconds) val workerHeaders = Seq("Id", "Address", "State", "Cores", "Memory") diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala index 47d3390928..345dfe879c 100644 --- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala @@ -19,14 +19,12 @@ package spark.deploy.worker import java.io._ import java.lang.System.getenv -import spark.deploy.{ExecutorState, ExecutorStateChanged, ApplicationDescription} + import akka.actor.ActorRef + import spark.{Utils, Logging} -import java.net.{URI, URL} -import org.apache.hadoop.fs.{Path, FileSystem} -import org.apache.hadoop.conf.Configuration -import scala.Some -import spark.deploy.ExecutorStateChanged +import spark.deploy.{ExecutorState, ApplicationDescription} +import spark.deploy.DeployMessages.ExecutorStateChanged /** * Manages the execution of one executor process. diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala index 8fa0d12b82..0e46fa281e 100644 --- a/core/src/main/scala/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/spark/deploy/worker/Worker.scala @@ -17,22 +17,24 @@ package spark.deploy.worker -import scala.collection.mutable.{ArrayBuffer, HashMap} +import java.text.SimpleDateFormat +import java.util.Date +import java.io.File + +import scala.collection.mutable.HashMap + import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated} +import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected} import akka.util.duration._ + import spark.{Logging, Utils} -import spark.util.AkkaUtils -import spark.deploy._ -import spark.metrics.MetricsSystem -import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected} -import java.text.SimpleDateFormat -import java.util.Date -import spark.deploy.RegisterWorker -import spark.deploy.LaunchExecutor -import spark.deploy.RegisterWorkerFailed +import spark.deploy.ExecutorState +import spark.deploy.DeployMessages._ import spark.deploy.master.Master -import java.io.File -import ui.WorkerWebUI +import spark.deploy.worker.ui.WorkerWebUI +import spark.metrics.MetricsSystem +import spark.util.AkkaUtils + private[spark] class Worker( host: String, @@ -164,7 +166,7 @@ private[spark] class Worker( masterDisconnected() case RequestWorkerState => { - sender ! WorkerState(host, port, workerId, executors.values.toList, + sender ! WorkerStateResponse(host, port, workerId, executors.values.toList, finishedExecutors.values.toList, masterUrl, cores, memory, coresUsed, memoryUsed, masterWebUiUrl) } diff --git a/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala index 7548a26c2e..1619c6a4c2 100644 --- a/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala +++ b/core/src/main/scala/spark/deploy/worker/ui/IndexPage.scala @@ -17,34 +17,36 @@ package spark.deploy.worker.ui +import javax.servlet.http.HttpServletRequest + +import scala.xml.Node + import akka.dispatch.Await import akka.pattern.ask import akka.util.duration._ -import javax.servlet.http.HttpServletRequest - import net.liftweb.json.JsonAST.JValue -import scala.xml.Node - -import spark.deploy.{RequestWorkerState, JsonProtocol, WorkerState} -import spark.deploy.worker.ExecutorRunner import spark.Utils +import spark.deploy.JsonProtocol +import spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse} +import spark.deploy.worker.ExecutorRunner import spark.ui.UIUtils + private[spark] class IndexPage(parent: WorkerWebUI) { val workerActor = parent.worker.self val worker = parent.worker val timeout = parent.timeout def renderJson(request: HttpServletRequest): JValue = { - val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerState] + val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse] val workerState = Await.result(stateFuture, 30 seconds) JsonProtocol.writeWorkerState(workerState) } def render(request: HttpServletRequest): Seq[Node] = { - val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerState] + val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse] val workerState = Await.result(stateFuture, 30 seconds) val executorHeaders = Seq("ExecutorID", "Cores", "Memory", "Job Details", "Logs") @@ -69,7 +71,7 @@ private[spark] class IndexPage(parent: WorkerWebUI) { <p><a href={workerState.masterWebUiUrl}>Back to Master</a></p> </div> </div> - <hr/> + <hr/> <div class="row"> <!-- Running Executors --> <div class="span12"> @@ -88,7 +90,8 @@ private[spark] class IndexPage(parent: WorkerWebUI) { </div> </div>; - UIUtils.basicSparkPage(content, "Spark Worker on %s:%s".format(workerState.host, workerState.port)) + UIUtils.basicSparkPage(content, "Spark Worker on %s:%s".format( + workerState.host, workerState.port)) } def executorRow(executor: ExecutorRunner): Seq[Node] = { diff --git a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala index f4003da732..e47fe50021 100644 --- a/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala +++ b/core/src/main/scala/spark/executor/StandaloneExecutorBackend.scala @@ -18,19 +18,16 @@ package spark.executor import java.nio.ByteBuffer -import spark.Logging -import spark.TaskState.TaskState -import spark.util.AkkaUtils + import akka.actor.{ActorRef, Actor, Props, Terminated} import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected} -import java.util.concurrent.{TimeUnit, ThreadPoolExecutor, SynchronousQueue} -import spark.scheduler.cluster._ -import spark.scheduler.cluster.RegisteredExecutor -import spark.scheduler.cluster.LaunchTask -import spark.scheduler.cluster.RegisterExecutorFailed -import spark.scheduler.cluster.RegisterExecutor -import spark.Utils + +import spark.{Logging, Utils} +import spark.TaskState.TaskState import spark.deploy.SparkHadoopUtil +import spark.scheduler.cluster.StandaloneClusterMessages._ +import spark.util.AkkaUtils + private[spark] class StandaloneExecutorBackend( driverUrl: String, diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala index ac9e5ef94d..05c29eb72f 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala @@ -17,46 +17,47 @@ package spark.scheduler.cluster -import spark.TaskState.TaskState import java.nio.ByteBuffer -import spark.util.SerializableBuffer + +import spark.TaskState.TaskState import spark.Utils +import spark.util.SerializableBuffer + private[spark] sealed trait StandaloneClusterMessage extends Serializable -// Driver to executors -private[spark] -case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage +private[spark] object StandaloneClusterMessages { -private[spark] -case class RegisteredExecutor(sparkProperties: Seq[(String, String)]) - extends StandaloneClusterMessage + // Driver to executors + case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage -private[spark] -case class RegisterExecutorFailed(message: String) extends StandaloneClusterMessage + case class RegisteredExecutor(sparkProperties: Seq[(String, String)]) + extends StandaloneClusterMessage -// Executors to driver -private[spark] -case class RegisterExecutor(executorId: String, hostPort: String, cores: Int) - extends StandaloneClusterMessage { - Utils.checkHostPort(hostPort, "Expected host port") -} + case class RegisterExecutorFailed(message: String) extends StandaloneClusterMessage -private[spark] -case class StatusUpdate(executorId: String, taskId: Long, state: TaskState, data: SerializableBuffer) - extends StandaloneClusterMessage + // Executors to driver + case class RegisterExecutor(executorId: String, hostPort: String, cores: Int) + extends StandaloneClusterMessage { + Utils.checkHostPort(hostPort, "Expected host port") + } + + case class StatusUpdate(executorId: String, taskId: Long, state: TaskState, + data: SerializableBuffer) extends StandaloneClusterMessage -private[spark] -object StatusUpdate { - /** Alternate factory method that takes a ByteBuffer directly for the data field */ - def apply(executorId: String, taskId: Long, state: TaskState, data: ByteBuffer): StatusUpdate = { - StatusUpdate(executorId, taskId, state, new SerializableBuffer(data)) + object StatusUpdate { + /** Alternate factory method that takes a ByteBuffer directly for the data field */ + def apply(executorId: String, taskId: Long, state: TaskState, data: ByteBuffer) + : StatusUpdate = { + StatusUpdate(executorId, taskId, state, new SerializableBuffer(data)) + } } -} -// Internal messages in driver -private[spark] case object ReviveOffers extends StandaloneClusterMessage -private[spark] case object StopDriver extends StandaloneClusterMessage + // Internal messages in driver + case object ReviveOffers extends StandaloneClusterMessage -private[spark] case class RemoveExecutor(executorId: String, reason: String) - extends StandaloneClusterMessage + case object StopDriver extends StandaloneClusterMessage + + case class RemoveExecutor(executorId: String, reason: String) extends StandaloneClusterMessage + +} diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 03a64e0192..075a7cbf7e 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -17,17 +17,18 @@ package spark.scheduler.cluster +import java.util.concurrent.atomic.AtomicInteger + import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import akka.actor._ -import akka.util.duration._ +import akka.dispatch.Await import akka.pattern.ask +import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent} import akka.util.Duration import spark.{Utils, SparkException, Logging, TaskState} -import akka.dispatch.Await -import java.util.concurrent.atomic.AtomicInteger -import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent} +import spark.scheduler.cluster.StandaloneClusterMessages._ /** * A standalone scheduler backend, which waits for standalone executors to connect to it through diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index 3186f7c85b..76128e8cff 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -23,6 +23,7 @@ import akka.pattern.ask import akka.util.Duration import spark.{Logging, SparkException} +import spark.storage.BlockManagerMessages._ private[spark] class BlockManagerMaster(var driverActor: ActorRef) extends Logging { diff --git a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala index 244000d952..011bb6b83d 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMasterActor.scala @@ -29,6 +29,8 @@ import akka.util.Duration import akka.util.duration._ import spark.{Logging, Utils, SparkException} +import spark.storage.BlockManagerMessages._ + /** * BlockManagerMasterActor is an actor on the master node to track statuses of diff --git a/core/src/main/scala/spark/storage/BlockManagerMessages.scala b/core/src/main/scala/spark/storage/BlockManagerMessages.scala index 01de4ccb8f..9375a9ca54 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMessages.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMessages.scala @@ -22,102 +22,89 @@ import java.io.{Externalizable, ObjectInput, ObjectOutput} import akka.actor.ActorRef -////////////////////////////////////////////////////////////////////////////////// -// Messages from the master to slaves. -////////////////////////////////////////////////////////////////////////////////// -private[spark] -sealed trait ToBlockManagerSlave - -// Remove a block from the slaves that have it. This can only be used to remove -// blocks that the master knows about. -private[spark] -case class RemoveBlock(blockId: String) extends ToBlockManagerSlave - -// Remove all blocks belonging to a specific RDD. -private[spark] case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave - - -////////////////////////////////////////////////////////////////////////////////// -// Messages from slaves to the master. -////////////////////////////////////////////////////////////////////////////////// -private[spark] -sealed trait ToBlockManagerMaster - -private[spark] -case class RegisterBlockManager( - blockManagerId: BlockManagerId, - maxMemSize: Long, - sender: ActorRef) - extends ToBlockManagerMaster - -private[spark] -case class HeartBeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster - -private[spark] -class UpdateBlockInfo( - var blockManagerId: BlockManagerId, - var blockId: String, - var storageLevel: StorageLevel, - var memSize: Long, - var diskSize: Long) - extends ToBlockManagerMaster - with Externalizable { - - def this() = this(null, null, null, 0, 0) // For deserialization only - - override def writeExternal(out: ObjectOutput) { - blockManagerId.writeExternal(out) - out.writeUTF(blockId) - storageLevel.writeExternal(out) - out.writeLong(memSize) - out.writeLong(diskSize) +private[storage] object BlockManagerMessages { + ////////////////////////////////////////////////////////////////////////////////// + // Messages from the master to slaves. + ////////////////////////////////////////////////////////////////////////////////// + sealed trait ToBlockManagerSlave + + // Remove a block from the slaves that have it. This can only be used to remove + // blocks that the master knows about. + case class RemoveBlock(blockId: String) extends ToBlockManagerSlave + + // Remove all blocks belonging to a specific RDD. + case class RemoveRdd(rddId: Int) extends ToBlockManagerSlave + + + ////////////////////////////////////////////////////////////////////////////////// + // Messages from slaves to the master. + ////////////////////////////////////////////////////////////////////////////////// + sealed trait ToBlockManagerMaster + + case class RegisterBlockManager( + blockManagerId: BlockManagerId, + maxMemSize: Long, + sender: ActorRef) + extends ToBlockManagerMaster + + case class HeartBeat(blockManagerId: BlockManagerId) extends ToBlockManagerMaster + + class UpdateBlockInfo( + var blockManagerId: BlockManagerId, + var blockId: String, + var storageLevel: StorageLevel, + var memSize: Long, + var diskSize: Long) + extends ToBlockManagerMaster + with Externalizable { + + def this() = this(null, null, null, 0, 0) // For deserialization only + + override def writeExternal(out: ObjectOutput) { + blockManagerId.writeExternal(out) + out.writeUTF(blockId) + storageLevel.writeExternal(out) + out.writeLong(memSize) + out.writeLong(diskSize) + } + + override def readExternal(in: ObjectInput) { + blockManagerId = BlockManagerId(in) + blockId = in.readUTF() + storageLevel = StorageLevel(in) + memSize = in.readLong() + diskSize = in.readLong() + } } - override def readExternal(in: ObjectInput) { - blockManagerId = BlockManagerId(in) - blockId = in.readUTF() - storageLevel = StorageLevel(in) - memSize = in.readLong() - diskSize = in.readLong() + object UpdateBlockInfo { + def apply(blockManagerId: BlockManagerId, + blockId: String, + storageLevel: StorageLevel, + memSize: Long, + diskSize: Long): UpdateBlockInfo = { + new UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize) + } + + // For pattern-matching + def unapply(h: UpdateBlockInfo): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = { + Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize)) + } } -} -private[spark] -object UpdateBlockInfo { - def apply(blockManagerId: BlockManagerId, - blockId: String, - storageLevel: StorageLevel, - memSize: Long, - diskSize: Long): UpdateBlockInfo = { - new UpdateBlockInfo(blockManagerId, blockId, storageLevel, memSize, diskSize) - } + case class GetLocations(blockId: String) extends ToBlockManagerMaster - // For pattern-matching - def unapply(h: UpdateBlockInfo): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = { - Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize)) - } -} + case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster -private[spark] -case class GetLocations(blockId: String) extends ToBlockManagerMaster + case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster -private[spark] -case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster + case class RemoveExecutor(execId: String) extends ToBlockManagerMaster -private[spark] -case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster + case object StopBlockManagerMaster extends ToBlockManagerMaster -private[spark] -case class RemoveExecutor(execId: String) extends ToBlockManagerMaster + case object GetMemoryStatus extends ToBlockManagerMaster -private[spark] -case object StopBlockManagerMaster extends ToBlockManagerMaster + case object ExpireDeadHosts extends ToBlockManagerMaster -private[spark] -case object GetMemoryStatus extends ToBlockManagerMaster - -private[spark] -case object ExpireDeadHosts extends ToBlockManagerMaster - -private[spark] -case object GetStorageStatus extends ToBlockManagerMaster + case object GetStorageStatus extends ToBlockManagerMaster +} diff --git a/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala b/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala index 45cffad810..6e5fb43732 100644 --- a/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala +++ b/core/src/main/scala/spark/storage/BlockManagerSlaveActor.scala @@ -19,7 +19,7 @@ package spark.storage import akka.actor.Actor -import spark.{Logging, SparkException, Utils} +import spark.storage.BlockManagerMessages._ /** diff --git a/core/src/main/scala/spark/storage/BlockManagerSource.scala b/core/src/main/scala/spark/storage/BlockManagerSource.scala index 4faa715c94..2aecd1ea71 100644 --- a/core/src/main/scala/spark/storage/BlockManagerSource.scala +++ b/core/src/main/scala/spark/storage/BlockManagerSource.scala @@ -3,7 +3,7 @@ package spark.storage import com.codahale.metrics.{Gauge,MetricRegistry} import spark.metrics.source.Source -import spark.storage._ + private[spark] class BlockManagerSource(val blockManager: BlockManager) extends Source { val metricRegistry = new MetricRegistry() diff --git a/core/src/main/scala/spark/storage/BlockMessage.scala b/core/src/main/scala/spark/storage/BlockMessage.scala index ab72dbb62b..bcce26b7c1 100644 --- a/core/src/main/scala/spark/storage/BlockMessage.scala +++ b/core/src/main/scala/spark/storage/BlockMessage.scala @@ -22,7 +22,6 @@ import java.nio.ByteBuffer import scala.collection.mutable.StringBuilder import scala.collection.mutable.ArrayBuffer -import spark._ import spark.network._ private[spark] case class GetBlock(id: String) diff --git a/core/src/main/scala/spark/storage/BlockMessageArray.scala b/core/src/main/scala/spark/storage/BlockMessageArray.scala index b0229d6124..ee2fc167d5 100644 --- a/core/src/main/scala/spark/storage/BlockMessageArray.scala +++ b/core/src/main/scala/spark/storage/BlockMessageArray.scala @@ -19,7 +19,6 @@ package spark.storage import java.nio.ByteBuffer -import scala.collection.mutable.StringBuilder import scala.collection.mutable.ArrayBuffer import spark._ @@ -113,7 +112,7 @@ private[spark] object BlockMessageArray { def main(args: Array[String]) { val blockMessages = - (0 until 10).map(i => { + (0 until 10).map { i => if (i % 2 == 0) { val buffer = ByteBuffer.allocate(100) buffer.clear @@ -121,7 +120,7 @@ private[spark] object BlockMessageArray { } else { BlockMessage.fromGetBlock(GetBlock(i.toString)) } - }) + } val blockMessageArray = new BlockMessageArray(blockMessages) println("Block message array created") diff --git a/core/src/main/scala/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/spark/storage/BlockObjectWriter.scala index 01ed6e8c1f..3812009ca1 100644 --- a/core/src/main/scala/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/spark/storage/BlockObjectWriter.scala @@ -17,8 +17,6 @@ package spark.storage -import java.nio.ByteBuffer - /** * An interface for writing JVM objects to some underlying storage. This interface allows |