From 74737264c4a9b2a9a99bf3aa77928f6960bad78c Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Thu, 17 Oct 2013 18:51:19 -0700 Subject: Spark shell exits if it cannot create SparkContext Mainly, this occurs if you provide a messed up MASTER url (one that doesn't match one of our regexes). Previously, we would default to Mesos, fail, and then start the shell anyway, except that any Spark command would fail. --- repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) (limited to 'repl') diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala index 36f54a22cf..48a8fa9328 100644 --- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -845,7 +845,14 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master: val jars = Option(System.getenv("ADD_JARS")).map(_.split(',')) .getOrElse(new Array[String](0)) .map(new java.io.File(_).getAbsolutePath) - sparkContext = new SparkContext(master, "Spark shell", System.getenv("SPARK_HOME"), jars) + try { + sparkContext = new SparkContext(master, "Spark shell", System.getenv("SPARK_HOME"), jars) + } catch { + case e: Exception => + e.printStackTrace() + echo("Failed to create SparkContext, exiting...") + sys.exit(1) + } sparkContext } -- cgit v1.2.3 From 05a0df2b9e0e4c3d032404187c0adf6d6d881860 Mon Sep 17 00:00:00 2001 From: Ali Ghodsi Date: Fri, 6 Sep 2013 21:30:42 -0700 Subject: Makes Spark SIMR ready. --- .../main/scala/org/apache/spark/SparkContext.scala | 22 +++++-- .../executor/CoarseGrainedExecutorBackend.scala | 5 ++ .../cluster/CoarseGrainedClusterMessage.scala | 4 ++ .../cluster/CoarseGrainedSchedulerBackend.scala | 20 +++++++ .../scheduler/cluster/SimrSchedulerBackend.scala | 69 ++++++++++++++++++++++ .../scala/org/apache/spark/examples/SparkPi.scala | 2 +- .../scala/org/apache/spark/repl/SparkILoop.scala | 14 +++++ 7 files changed, 131 insertions(+), 5 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala (limited to 'repl') diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 564466cfd5..3b39c97260 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -56,15 +56,21 @@ import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, - SparkDeploySchedulerBackend, ClusterScheduler} + SparkDeploySchedulerBackend, ClusterScheduler, SimrSchedulerBackend} +import org.apache.spark.scheduler.local.LocalScheduler import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} import org.apache.spark.scheduler.local.LocalScheduler import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils} import org.apache.spark.ui.SparkUI import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils} - - +import org.apache.spark.scheduler.StageInfo +import org.apache.spark.storage.RDDInfo +import org.apache.spark.storage.StorageStatus +import scala.Some +import org.apache.spark.scheduler.StageInfo +import org.apache.spark.storage.RDDInfo +import org.apache.spark.storage.StorageStatus /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark @@ -127,7 +133,7 @@ class SparkContext( val startTime = System.currentTimeMillis() // Add each JAR given through the constructor - if (jars != null) { + if (jars != null && jars != Seq(null)) { jars.foreach { addJar(_) } } @@ -158,6 +164,8 @@ class SparkContext( val SPARK_REGEX = """spark://(.*)""".r // Regular expression for connection to Mesos cluster val MESOS_REGEX = """mesos://(.*)""".r + //Regular expression for connection to Simr cluster + val SIMR_REGEX = """simr://(.*)""".r master match { case "local" => @@ -176,6 +184,12 @@ class SparkContext( scheduler.initialize(backend) scheduler + case SIMR_REGEX(simrUrl) => + val scheduler = new ClusterScheduler(this) + val backend = new SimrSchedulerBackend(scheduler, this, simrUrl) + scheduler.initialize(backend) + scheduler + case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) => // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang. val memoryPerSlaveInt = memoryPerSlave.toInt diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 52b1c492b2..80ff4c59cb 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -80,6 +80,11 @@ private[spark] class CoarseGrainedExecutorBackend( case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => logError("Driver terminated or disconnected! Shutting down.") System.exit(1) + + case StopExecutor => + logInfo("Driver commanded a shutdown") + context.stop(self) + context.system.shutdown() } override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index a8230ec6bc..53316dae2a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -60,6 +60,10 @@ private[spark] object CoarseGrainedClusterMessages { case object StopDriver extends CoarseGrainedClusterMessage + case object StopExecutor extends CoarseGrainedClusterMessage + + case object StopExecutors extends CoarseGrainedClusterMessage + case class RemoveExecutor(executorId: String, reason: String) extends CoarseGrainedClusterMessage } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index c0f1c6dbad..80a9b4667d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -101,6 +101,13 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac sender ! true context.stop(self) + case StopExecutors => + logInfo("Asking each executor to shut down") + for (executor <- executorActor.values) { + executor ! StopExecutor + } + sender ! true + case RemoveExecutor(executorId, reason) => removeExecutor(executorId, reason) sender ! true @@ -170,6 +177,19 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") + def stopExecutors() { + try { + if (driverActor != null) { + logInfo("Shutting down all executors") + val future = driverActor.ask(StopExecutors)(timeout) + Await.result(future, timeout) + } + } catch { + case e: Exception => + throw new SparkException("Error asking standalone scheduler to shut down executors", e) + } + } + override def stop() { try { if (driverActor != null) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala new file mode 100644 index 0000000000..ae56244979 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala @@ -0,0 +1,69 @@ +package org.apache.spark.scheduler.cluster + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +import org.apache.spark.{Logging, SparkContext} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{Path, FileSystem} + +private[spark] class SimrSchedulerBackend( + scheduler: ClusterScheduler, + sc: SparkContext, + driverFilePath: String) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) + with Logging { + + val tmpPath = new Path(driverFilePath + "_tmp"); + val filePath = new Path(driverFilePath); + + val maxCores = System.getProperty("spark.simr.executor.cores", "1").toInt + + override def start() { + super.start() + + val driverUrl = "akka://spark@%s:%s/user/%s".format( + System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), + CoarseGrainedSchedulerBackend.ACTOR_NAME) + + val conf = new Configuration() + val fs = FileSystem.get(conf) + + logInfo("Writing to HDFS file: " + driverFilePath); + logInfo("Writing AKKA address: " + driverUrl); + + // Create temporary file to prevent race condition where executors get empty driverUrl file + val temp = fs.create(tmpPath, true) + temp.writeUTF(driverUrl) + temp.writeInt(maxCores) + temp.close() + + // "Atomic" rename + fs.rename(tmpPath, filePath); + } + + override def stop() { + val conf = new Configuration() + val fs = FileSystem.get(conf) + fs.delete(new Path(driverFilePath), false); + super.stopExecutors() + super.stop() + } +} + + diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala index 5a2bc9b0d0..a689e5a360 100644 --- a/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala +++ b/examples/src/main/scala/org/apache/spark/examples/SparkPi.scala @@ -38,6 +38,6 @@ object SparkPi { if (x*x + y*y < 1) 1 else 0 }.reduce(_ + _) println("Pi is roughly " + 4.0 * count / n) - System.exit(0) + spark.stop() } } diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala index 48a8fa9328..0ced284da6 100644 --- a/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -633,6 +633,20 @@ class SparkILoop(in0: Option[BufferedReader], val out: PrintWriter, val master: Result(true, shouldReplay) } + def addAllClasspath(args: Seq[String]): Unit = { + var added = false + var totalClasspath = "" + for (arg <- args) { + val f = File(arg).normalize + if (f.exists) { + added = true + addedClasspath = ClassPath.join(addedClasspath, f.path) + totalClasspath = ClassPath.join(settings.classpath.value, addedClasspath) + } + } + if (added) replay() + } + def addClasspath(arg: String): Unit = { val f = File(arg).normalize if (f.exists) { -- cgit v1.2.3