diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-10-25 18:28:43 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-10-25 18:28:43 -0700 |
commit | bab496c1201305f8ee4b0cfdd9b9e8041ea08d38 (patch) | |
tree | 1404edb223e2e7550bc93c2aeec65f3af04accdd /core | |
parent | d307db6e554885d33e22ac09b567c66a0e702fd1 (diff) | |
parent | eef261c89286ddbcdcc03684c1a5d0b94d6da321 (diff) | |
download | spark-bab496c1201305f8ee4b0cfdd9b9e8041ea08d38.tar.gz spark-bab496c1201305f8ee4b0cfdd9b9e8041ea08d38.tar.bz2 spark-bab496c1201305f8ee4b0cfdd9b9e8041ea08d38.zip |
Merge pull request #108 from alig/master
Changes to enable executing by using HDFS as a synchronization point between driver and executors, as well as ensuring executors exit properly.
Diffstat (limited to 'core')
5 files changed, 108 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 564466cfd5..c9bc01cba5 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -56,16 +56,15 @@ import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, - SparkDeploySchedulerBackend, ClusterScheduler} + SparkDeploySchedulerBackend, ClusterScheduler, SimrSchedulerBackend} import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} import org.apache.spark.scheduler.local.LocalScheduler +import org.apache.spark.scheduler.StageInfo import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils} import org.apache.spark.ui.SparkUI import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils} - - /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. @@ -158,6 +157,8 @@ class SparkContext( val SPARK_REGEX = """spark://(.*)""".r // Regular expression for connection to Mesos cluster val MESOS_REGEX = """mesos://(.*)""".r + // Regular expression for connection to Simr cluster + val SIMR_REGEX = """simr://(.*)""".r master match { case "local" => @@ -176,6 +177,12 @@ class SparkContext( scheduler.initialize(backend) scheduler + case SIMR_REGEX(simrUrl) => + val scheduler = new ClusterScheduler(this) + val backend = new SimrSchedulerBackend(scheduler, this, simrUrl) + scheduler.initialize(backend) + scheduler + case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) => // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang. val memoryPerSlaveInt = memoryPerSlave.toInt @@ -680,8 +687,7 @@ class SparkContext( */ def addJar(path: String) { if (path == null) { - logWarning("null specified as parameter to addJar", - new SparkException("null specified as parameter to addJar")) + logWarning("null specified as parameter to addJar") } else { var key = "" if (path.contains("\\")) { diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 52b1c492b2..80ff4c59cb 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -80,6 +80,11 @@ private[spark] class CoarseGrainedExecutorBackend( case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => logError("Driver terminated or disconnected! Shutting down.") System.exit(1) + + case StopExecutor => + logInfo("Driver commanded a shutdown") + context.stop(self) + context.system.shutdown() } override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index a8230ec6bc..53316dae2a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -60,6 +60,10 @@ private[spark] object CoarseGrainedClusterMessages { case object StopDriver extends CoarseGrainedClusterMessage + case object StopExecutor extends CoarseGrainedClusterMessage + + case object StopExecutors extends CoarseGrainedClusterMessage + case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index c0f1c6dbad..70f3f88401 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -101,6 +101,13 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac sender ! true context.stop(self) + case StopExecutors => + logInfo("Asking each executor to shut down") + for (executor <- executorActor.values) { + executor ! StopExecutor + } + sender ! true + case RemoveExecutor(executorId, reason) => removeExecutor(executorId, reason) sender ! true @@ -170,11 +177,24 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") + def stopExecutors() { + try { + if (driverActor != null) { + logInfo("Shutting down all executors") + val future = driverActor.ask(StopExecutors)(timeout) + Await.ready(future, timeout) + } + } catch { + case e: Exception => + throw new SparkException("Error asking standalone scheduler to shut down executors", e) + } + } + override def stop() { try { if (driverActor != null) { val future = driverActor.ask(StopDriver)(timeout) - Await.result(future, timeout) + Await.ready(future, timeout) } } catch { case e: Exception => @@ -197,7 +217,7 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac def removeExecutor(executorId: String, reason: String) { try { val future = driverActor.ask(RemoveExecutor(executorId, reason))(timeout) - Await.result(future, timeout) + Await.ready(future, timeout) } catch { case e: Exception => throw new SparkException("Error notifying standalone scheduler's driver actor", e) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala new file mode 100644 index 0000000000..d78bdbaa7a --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{Path, FileSystem} +import org.apache.spark.{Logging, SparkContext} + +private[spark] class SimrSchedulerBackend( + scheduler: ClusterScheduler, + sc: SparkContext, + driverFilePath: String) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) + with Logging { + + val tmpPath = new Path(driverFilePath + "_tmp") + val filePath = new Path(driverFilePath) + + val maxCores = System.getProperty("spark.simr.executor.cores", "1").toInt + + override def start() { + super.start() + + val driverUrl = "akka://spark@%s:%s/user/%s".format( + System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), + CoarseGrainedSchedulerBackend.ACTOR_NAME) + + val conf = new Configuration() + val fs = FileSystem.get(conf) + + logInfo("Writing to HDFS file: " + driverFilePath) + logInfo("Writing Akka address: " + driverUrl) + + // Create temporary file to prevent race condition where executors get empty driverUrl file + val temp = fs.create(tmpPath, true) + temp.writeUTF(driverUrl) + temp.writeInt(maxCores) + temp.close() + + // "Atomic" rename + fs.rename(tmpPath, filePath) + } + + override def stop() { + val conf = new Configuration() + val fs = FileSystem.get(conf) + fs.delete(new Path(driverFilePath), false) + super.stopExecutors() + super.stop() + } +} |