aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorCodingCat <zhunansjtu@gmail.com>2014-01-22 19:32:51 -0500
committerCodingCat <zhunansjtu@gmail.com>2014-01-22 20:20:46 -0500
commit2b3c461451ac2c163956af258dfbf3f208596dbf (patch)
tree079d2536975019afe04e2a6d0b437082c30f1630 /core
parent29f4b6a2d9f42a727691444312964e59ef9b95ee (diff)
downloadspark-2b3c461451ac2c163956af258dfbf3f208596dbf.tar.gz
spark-2b3c461451ac2c163956af258dfbf3f208596dbf.tar.bz2
spark-2b3c461451ac2c163956af258dfbf3f208596dbf.zip
refactor sparkHome to val
clean code
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala7
1 files changed, 4 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index de45da2046..fbf2e0f30f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -76,7 +76,7 @@ private[spark] class Worker(
@volatile var registered = false
@volatile var connected = false
val workerId = generateWorkerId()
- var sparkHome: File = null
+ val sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
var workDir: File = null
val executors = new HashMap[String, ExecutorRunner]
val finishedExecutors = new HashMap[String, ExecutorRunner]
@@ -120,7 +120,6 @@ private[spark] class Worker(
assert(!registered)
logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
host, port, cores, Utils.megabytesToString(memory)))
- sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
logInfo("Spark home: " + sparkHome)
createWorkDir()
webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
@@ -211,7 +210,8 @@ private[spark] class Worker(
try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
- self, workerId, host, new File(appDesc.sparkHome.getOrElse(sparkHome.getAbsolutePath)),
+ self, workerId, host,
+ appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome),
workDir, akkaUrl, ExecutorState.RUNNING)
executors(appId + "/" + execId) = manager
manager.start()
@@ -225,6 +225,7 @@ private[spark] class Worker(
logError("Failed to launch exector %s/%d for %s".format(appId, execId, appDesc.name))
if (executors.contains(appId + "/" + execId)) {
executors(appId + "/" + execId).kill()
+ executors -= appId + "/" + execId
}
masterLock.synchronized {
master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)