aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAli Ghodsi <alig@cs.berkeley.edu>2013-09-06 21:30:42 -0700
committerAli Ghodsi <alig@cs.berkeley.edu>2013-10-24 11:59:51 -0700
commit05a0df2b9e0e4c3d032404187c0adf6d6d881860 (patch)
treef0b8d432c03562c875f3a00c0f4e0d63b728327f /core
parentdadfc63b0314060876ac1787d4de72b37221139c (diff)
downloadspark-05a0df2b9e0e4c3d032404187c0adf6d6d881860.tar.gz
spark-05a0df2b9e0e4c3d032404187c0adf6d6d881860.tar.bz2
spark-05a0df2b9e0e4c3d032404187c0adf6d6d881860.zip
Makes Spark SIMR ready.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala22
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala69
5 files changed, 116 insertions, 4 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 564466cfd5..3b39c97260 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -56,15 +56,21 @@ import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend,
- SparkDeploySchedulerBackend, ClusterScheduler}
+ SparkDeploySchedulerBackend, ClusterScheduler, SimrSchedulerBackend}
+import org.apache.spark.scheduler.local.LocalScheduler
import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import org.apache.spark.scheduler.local.LocalScheduler
import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType,
TimeStampedHashMap, Utils}
-
-
+import org.apache.spark.scheduler.StageInfo
+import org.apache.spark.storage.RDDInfo
+import org.apache.spark.storage.StorageStatus
+import scala.Some
+import org.apache.spark.scheduler.StageInfo
+import org.apache.spark.storage.RDDInfo
+import org.apache.spark.storage.StorageStatus
/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@@ -127,7 +133,7 @@ class SparkContext(
val startTime = System.currentTimeMillis()
// Add each JAR given through the constructor
- if (jars != null) {
+ if (jars != null && jars != Seq(null)) {
jars.foreach { addJar(_) }
}
@@ -158,6 +164,8 @@ class SparkContext(
val SPARK_REGEX = """spark://(.*)""".r
// Regular expression for connection to Mesos cluster
val MESOS_REGEX = """mesos://(.*)""".r
+ //Regular expression for connection to Simr cluster
+ val SIMR_REGEX = """simr://(.*)""".r
master match {
case "local" =>
@@ -176,6 +184,12 @@ class SparkContext(
scheduler.initialize(backend)
scheduler
+ case SIMR_REGEX(simrUrl) =>
+ val scheduler = new ClusterScheduler(this)
+ val backend = new SimrSchedulerBackend(scheduler, this, simrUrl)
+ scheduler.initialize(backend)
+ scheduler
+
case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
// Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
val memoryPerSlaveInt = memoryPerSlave.toInt
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 52b1c492b2..80ff4c59cb 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -80,6 +80,11 @@ private[spark] class CoarseGrainedExecutorBackend(
case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
logError("Driver terminated or disconnected! Shutting down.")
System.exit(1)
+
+ case StopExecutor =>
+ logInfo("Driver commanded a shutdown")
+ context.stop(self)
+ context.system.shutdown()
}
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
index a8230ec6bc..53316dae2a 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
@@ -60,6 +60,10 @@ private[spark] object CoarseGrainedClusterMessages {
case object StopDriver extends CoarseGrainedClusterMessage
+ case object StopExecutor extends CoarseGrainedClusterMessage
+
+ case object StopExecutors extends CoarseGrainedClusterMessage
+
case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index c0f1c6dbad..80a9b4667d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -101,6 +101,13 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
sender ! true
context.stop(self)
+ case StopExecutors =>
+ logInfo("Asking each executor to shut down")
+ for (executor <- executorActor.values) {
+ executor ! StopExecutor
+ }
+ sender ! true
+
case RemoveExecutor(executorId, reason) =>
removeExecutor(executorId, reason)
sender ! true
@@ -170,6 +177,19 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
+ def stopExecutors() {
+ try {
+ if (driverActor != null) {
+ logInfo("Shutting down all executors")
+ val future = driverActor.ask(StopExecutors)(timeout)
+ Await.result(future, timeout)
+ }
+ } catch {
+ case e: Exception =>
+ throw new SparkException("Error asking standalone scheduler to shut down executors", e)
+ }
+ }
+
override def stop() {
try {
if (driverActor != null) {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
new file mode 100644
index 0000000000..ae56244979
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala
@@ -0,0 +1,69 @@
+package org.apache.spark.scheduler.cluster
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+import org.apache.spark.{Logging, SparkContext}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{Path, FileSystem}
+
+private[spark] class SimrSchedulerBackend(
+ scheduler: ClusterScheduler,
+ sc: SparkContext,
+ driverFilePath: String)
+ extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem)
+ with Logging {
+
+ val tmpPath = new Path(driverFilePath + "_tmp");
+ val filePath = new Path(driverFilePath);
+
+ val maxCores = System.getProperty("spark.simr.executor.cores", "1").toInt
+
+ override def start() {
+ super.start()
+
+ val driverUrl = "akka://spark@%s:%s/user/%s".format(
+ System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"),
+ CoarseGrainedSchedulerBackend.ACTOR_NAME)
+
+ val conf = new Configuration()
+ val fs = FileSystem.get(conf)
+
+ logInfo("Writing to HDFS file: " + driverFilePath);
+ logInfo("Writing AKKA address: " + driverUrl);
+
+ // Create temporary file to prevent race condition where executors get empty driverUrl file
+ val temp = fs.create(tmpPath, true)
+ temp.writeUTF(driverUrl)
+ temp.writeInt(maxCores)
+ temp.close()
+
+ // "Atomic" rename
+ fs.rename(tmpPath, filePath);
+ }
+
+ override def stop() {
+ val conf = new Configuration()
+ val fs = FileSystem.get(conf)
+ fs.delete(new Path(driverFilePath), false);
+ super.stopExecutors()
+ super.stop()
+ }
+}
+
+