diff options
Diffstat (limited to 'core/src/main/scala/org/apache')
74 files changed, 1785 insertions, 351 deletions
diff --git a/core/src/main/scala/org/apache/spark/Logging.scala b/core/src/main/scala/org/apache/spark/Logging.scala index d519fc5a29..4a34989e50 100644 --- a/core/src/main/scala/org/apache/spark/Logging.scala +++ b/core/src/main/scala/org/apache/spark/Logging.scala @@ -104,13 +104,15 @@ trait Logging { // If Log4j doesn't seem initialized, load a default properties file val log4jInitialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements if (!log4jInitialized) { - val defaultLogProps = "org/apache/spark/default-log4j.properties" + val defaultLogProps = "org/apache/spark/log4j-defaults.properties" val classLoader = this.getClass.getClassLoader Option(classLoader.getResource(defaultLogProps)) match { - case Some(url) => PropertyConfigurator.configure(url) - case None => System.err.println(s"Spark was unable to load $defaultLogProps") + case Some(url) => + PropertyConfigurator.configure(url) + log.info(s"Using Spark's default log4j profile: $defaultLogProps") + case None => + System.err.println(s"Spark was unable to load $defaultLogProps") } - log.info(s"Using Spark's default log4j profile: $defaultLogProps") } Logging.initialized = true diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index cdae167aef..77b8ca1cce 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -55,7 +55,7 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging { private val timeout = AkkaUtils.askTimeout(conf) // Set to the MapOutputTrackerActor living on the driver - var trackerActor: Either[ActorRef, ActorSelection] = _ + var trackerActor: ActorRef = _ protected val mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]] @@ -71,17 +71,7 @@ private[spark] class MapOutputTracker(conf: SparkConf) extends Logging { // throw a SparkException if this fails. private def askTracker(message: Any): Any = { try { - /* - The difference between ActorRef and ActorSelection is well explained here: - http://doc.akka.io/docs/akka/2.2.3/project/migration-guide-2.1.x-2.2.x.html#Use_actorSelection_instead_of_actorFor - In spark a map output tracker can be either started on Driver where it is created which - is an ActorRef or it can be on executor from where it is looked up which is an - actorSelection. - */ - val future = trackerActor match { - case Left(a: ActorRef) => a.ask(message)(timeout) - case Right(b: ActorSelection) => b.ask(message)(timeout) - } + val future = trackerActor.ask(message)(timeout) Await.result(future, timeout) } catch { case e: Exception => diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index 31b0773bfe..9b043f06dd 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -61,7 +61,8 @@ object Partitioner { } /** - * A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using Java's `Object.hashCode`. + * A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using + * Java's `Object.hashCode`. * * Java arrays have hashCodes that are based on the arrays' identities rather than their contents, * so attempting to partition an RDD[Array[_]] or RDD[(Array[_], _)] using a HashPartitioner will @@ -84,8 +85,8 @@ class HashPartitioner(partitions: Int) extends Partitioner { } /** - * A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly equal ranges. - * Determines the ranges by sampling the RDD passed in. + * A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly + * equal ranges. The ranges are determined by sampling the content of the RDD passed in. */ class RangePartitioner[K <% Ordered[K]: ClassTag, V]( partitions: Int, diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 98343e9532..2de32231e8 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -24,7 +24,7 @@ import com.typesafe.config.ConfigFactory * * @param loadDefaults whether to load values from the system properties and classpath */ -class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable { +class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable with Logging { /** Create a SparkConf that loads defaults from system properties and the classpath */ def this() = this(true) @@ -67,7 +67,8 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable { /** Set JAR files to distribute to the cluster. */ def setJars(jars: Seq[String]): SparkConf = { - set("spark.jars", jars.mkString(",")) + for (jar <- jars if (jar == null)) logWarning("null jar passed to SparkContext constructor") + set("spark.jars", jars.filter(_ != null).mkString(",")) } /** Set JAR files to distribute to the cluster. (Java-friendly version.) */ @@ -164,6 +165,11 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable { getOption(key).map(_.toDouble).getOrElse(defaultValue) } + /** Get a parameter as a boolean, falling back to a default if not set */ + def getBoolean(key: String, defaultValue: Boolean): Boolean = { + getOption(key).map(_.toBoolean).getOrElse(defaultValue) + } + /** Get all executor environment variables set on this SparkConf */ def getExecutorEnv: Seq[(String, String)] = { val prefix = "spark.executorEnv." @@ -171,6 +177,9 @@ class SparkConf(loadDefaults: Boolean) extends Serializable with Cloneable { .map{case (k, v) => (k.substring(prefix.length), v)} } + /** Get all akka conf variables set on this SparkConf */ + def getAkkaConf: Seq[(String, String)] = getAll.filter {case (k, v) => k.startsWith("akka.")} + /** Does the configuration contain a given parameter? */ def contains(key: String): Boolean = settings.contains(key) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 84bd0f7ffd..66c226e491 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -31,9 +31,9 @@ import scala.reflect.{ClassTag, classTag} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, -FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable} + FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable} import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat, -TextInputFormat} + TextInputFormat} import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob} import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} import org.apache.mesos.MesosNativeLibrary @@ -49,7 +49,7 @@ import org.apache.spark.scheduler.local.LocalBackend import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils} import org.apache.spark.ui.SparkUI import org.apache.spark.util.{Utils, TimeStampedHashMap, MetadataCleaner, MetadataCleanerType, -ClosureCleaner} + ClosureCleaner} /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark @@ -116,6 +116,10 @@ class SparkContext( throw new SparkException("An application must be set in your configuration") } + if (conf.getBoolean("spark.logConf", false)) { + logInfo("Spark configuration:\n" + conf.toDebugString) + } + // Set Spark driver host and port system properties conf.setIfMissing("spark.driver.host", Utils.localHostName()) conf.setIfMissing("spark.driver.port", "0") @@ -169,10 +173,16 @@ class SparkContext( // Environment variables to pass to our executors private[spark] val executorEnvs = HashMap[String, String]() // Note: SPARK_MEM is included for Mesos, but overwritten for standalone mode in ExecutorRunner - for (key <- Seq("SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS", "SPARK_TESTING"); + for (key <- Seq("SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS"); value <- Option(System.getenv(key))) { executorEnvs(key) = value } + // Convert java options to env vars as a work around + // since we can't set env vars directly in sbt. + for { (envKey, propKey) <- Seq(("SPARK_HOME", "spark.home"), ("SPARK_TESTING", "spark.testing")) + value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} { + executorEnvs(envKey) = value + } // Since memory can be set with a system property too, use that executorEnvs("SPARK_MEM") = executorMemory + "m" executorEnvs ++= conf.getExecutorEnv @@ -234,6 +244,10 @@ class SparkContext( localProperties.set(new Properties()) } + /** + * Set a local property that affects jobs submitted from this thread, such as the + * Spark fair scheduler pool. + */ def setLocalProperty(key: String, value: String) { if (localProperties.get() == null) { localProperties.set(new Properties()) @@ -245,6 +259,10 @@ class SparkContext( } } + /** + * Get a local property set in this thread, or null if it is missing. See + * [[org.apache.spark.SparkContext.setLocalProperty]]. + */ def getLocalProperty(key: String): String = Option(localProperties.get).map(_.getProperty(key)).getOrElse(null) @@ -255,7 +273,7 @@ class SparkContext( } /** - * Assigns a group id to all the jobs started by this thread until the group id is set to a + * Assigns a group ID to all the jobs started by this thread until the group ID is set to a * different value or cleared. * * Often, a unit of execution in an application consists of multiple Spark actions or jobs. @@ -278,7 +296,7 @@ class SparkContext( setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId) } - /** Clear the job group id and its description. */ + /** Clear the current thread's job group ID and its description. */ def clearJobGroup() { setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, null) setLocalProperty(SparkContext.SPARK_JOB_GROUP_ID, null) @@ -507,15 +525,15 @@ class SparkContext( // Methods for creating shared variables /** - * Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add" values - * to using the `+=` method. Only the driver can access the accumulator's `value`. + * Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add" + * values to using the `+=` method. Only the driver can access the accumulator's `value`. */ def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]) = new Accumulator(initialValue, param) /** - * Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values with `+=`. - * Only the driver can access the accumuable's `value`. + * Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values + * with `+=`. Only the driver can access the accumuable's `value`. * @tparam T accumulator type * @tparam R type that can be added to the accumulator */ @@ -528,14 +546,16 @@ class SparkContext( * Growable and TraversableOnce are the standard APIs that guarantee += and ++=, implemented by * standard mutable collections. So you can use this with mutable Map, Set, etc. */ - def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable, T](initialValue: R) = { + def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable, T] + (initialValue: R) = { val param = new GrowableAccumulableParam[R,T] new Accumulable(initialValue, param) } /** - * Broadcast a read-only variable to the cluster, returning a [[org.apache.spark.broadcast.Broadcast]] object for - * reading it in distributed functions. The variable will be sent to each cluster only once. + * Broadcast a read-only variable to the cluster, returning a + * [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions. + * The variable will be sent to each cluster only once. */ def broadcast[T](value: T) = env.broadcastManager.newBroadcast[T](value, isLocal) @@ -729,6 +749,26 @@ class SparkContext( } /** + * Support function for API backtraces. + */ + def setCallSite(site: String) { + setLocalProperty("externalCallSite", site) + } + + /** + * Support function for API backtraces. + */ + def clearCallSite() { + setLocalProperty("externalCallSite", null) + } + + private[spark] def getCallSite(): String = { + val callSite = getLocalProperty("externalCallSite") + if (callSite == null) return Utils.formatSparkCallSite + callSite + } + + /** * Run a function on a given set of partitions in an RDD and pass the results to the given * handler function. This is the main entry point for all actions in Spark. The allowLocal * flag specifies whether the scheduler can run the computation on the driver rather than @@ -740,7 +780,7 @@ class SparkContext( partitions: Seq[Int], allowLocal: Boolean, resultHandler: (Int, U) => Unit) { - val callSite = Utils.formatSparkCallSite + val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite) val start = System.nanoTime @@ -824,7 +864,7 @@ class SparkContext( func: (TaskContext, Iterator[T]) => U, evaluator: ApproximateEvaluator[U, R], timeout: Long): PartialResult[R] = { - val callSite = Utils.formatSparkCallSite + val callSite = getCallSite logInfo("Starting job: " + callSite) val start = System.nanoTime val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout, @@ -844,7 +884,7 @@ class SparkContext( resultFunc: => R): SimpleFutureAction[R] = { val cleanF = clean(processPartition) - val callSite = Utils.formatSparkCallSite + val callSite = getCallSite val waiter = dagScheduler.submitJob( rdd, (context: TaskContext, iter: Iterator[T]) => cleanF(iter), @@ -980,7 +1020,8 @@ object SparkContext { implicit def stringToText(s: String) = new Text(s) - private implicit def arrayToArrayWritable[T <% Writable: ClassTag](arr: Traversable[T]): ArrayWritable = { + private implicit def arrayToArrayWritable[T <% Writable: ClassTag](arr: Traversable[T]) + : ArrayWritable = { def anyToWritable[U <% Writable](u: U): Writable = u new ArrayWritable(classTag[T].runtimeClass.asInstanceOf[Class[Writable]], @@ -1003,7 +1044,9 @@ object SparkContext { implicit def booleanWritableConverter() = simpleWritableConverter[Boolean, BooleanWritable](_.get) - implicit def bytesWritableConverter() = simpleWritableConverter[Array[Byte], BytesWritable](_.getBytes) + implicit def bytesWritableConverter() = { + simpleWritableConverter[Array[Byte], BytesWritable](_.getBytes) + } implicit def stringWritableConverter() = simpleWritableConverter[String, Text](_.toString) @@ -1019,7 +1062,8 @@ object SparkContext { if (uri != null) { val uriStr = uri.toString if (uriStr.startsWith("jar:file:")) { - // URI will be of the form "jar:file:/path/foo.jar!/package/cls.class", so pull out the /path/foo.jar + // URI will be of the form "jar:file:/path/foo.jar!/package/cls.class", + // so pull out the /path/foo.jar List(uriStr.substring("jar:file:".length, uriStr.indexOf('!'))) } else { Nil @@ -1173,7 +1217,7 @@ object SparkContext { case mesosUrl @ MESOS_REGEX(_) => MesosNativeLibrary.load() val scheduler = new TaskSchedulerImpl(sc) - val coarseGrained = sc.conf.get("spark.mesos.coarse", "false").toBoolean + val coarseGrained = sc.conf.getBoolean("spark.mesos.coarse", false) val url = mesosUrl.stripPrefix("mesos://") // strip scheme from raw Mesos URLs val backend = if (coarseGrained) { new CoarseMesosSchedulerBackend(scheduler, sc, url, appName) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 634a94f0a7..e093e2f162 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -17,11 +17,10 @@ package org.apache.spark -import collection.mutable -import serializer.Serializer +import scala.collection.mutable +import scala.concurrent.Await import akka.actor._ -import akka.remote.RemoteActorRefProvider import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.metrics.MetricsSystem @@ -157,17 +156,18 @@ object SparkEnv extends Logging { conf.get("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"), conf) - def registerOrLookup(name: String, newActor: => Actor): Either[ActorRef, ActorSelection] = { + def registerOrLookup(name: String, newActor: => Actor): ActorRef = { if (isDriver) { logInfo("Registering " + name) - Left(actorSystem.actorOf(Props(newActor), name = name)) + actorSystem.actorOf(Props(newActor), name = name) } else { val driverHost: String = conf.get("spark.driver.host", "localhost") - val driverPort: Int = conf.get("spark.driver.port", "7077").toInt + val driverPort: Int = conf.getInt("spark.driver.port", 7077) Utils.checkHost(driverHost, "Expected hostname") - val url = "akka.tcp://spark@%s:%s/user/%s".format(driverHost, driverPort, name) - logInfo("Connecting to " + name + ": " + url) - Right(actorSystem.actorSelection(url)) + val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name" + val timeout = AkkaUtils.lookupTimeout(conf) + logInfo(s"Connecting to $name: $url") + Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout) } } diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index 103a1c2051..618d95015f 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -127,10 +127,6 @@ class SparkHadoopWriter(@transient jobConf: JobConf) cmtr.commitJob(getJobContext()) } - def cleanup() { - getOutputCommitter().cleanupJob(getJobContext()) - } - // ********* Private Functions ********* private def getOutputFormat(): OutputFormat[AnyRef,AnyRef] = { diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala index da30cf619a..b0dedc6f4e 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala @@ -207,13 +207,13 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav * e.g. for the array * [1,10,20,50] the buckets are [1,10) [10,20) [20,50] * e.g 1<=x<10 , 10<=x<20, 20<=x<50 - * And on the input of 1 and 50 we would have a histogram of 1,0,0 - * + * And on the input of 1 and 50 we would have a histogram of 1,0,0 + * * Note: if your histogram is evenly spaced (e.g. [0, 10, 20, 30]) this can be switched * from an O(log n) inseration to O(1) per element. (where n = # buckets) if you set evenBuckets * to true. * buckets must be sorted and not contain any duplicates. - * buckets array must be at least two elements + * buckets array must be at least two elements * All NaN entries are treated the same. If you have a NaN bucket it must be * the maximum value of the last position and all NaN entries will be counted * in that bucket. @@ -225,6 +225,12 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav def histogram(buckets: Array[Double], evenBuckets: Boolean): Array[Long] = { srdd.histogram(buckets.map(_.toDouble), evenBuckets) } + + /** Assign a name to this RDD */ + def setName(name: String): JavaDoubleRDD = { + srdd.setName(name) + this + } } object JavaDoubleRDD { diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 55c87450ac..0fb7e195b3 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -647,6 +647,12 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = { rdd.countApproxDistinctByKey(relativeSD, numPartitions) } + + /** Assign a name to this RDD */ + def setName(name: String): JavaPairRDD[K, V] = { + rdd.setName(name) + this + } } object JavaPairRDD { diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index 037cd1c774..7d48ce01cf 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -127,6 +127,12 @@ JavaRDDLike[T, JavaRDD[T]] { wrapRDD(rdd.subtract(other, p)) override def toString = rdd.toString + + /** Assign a name to this RDD */ + def setName(name: String): JavaRDD[T] = { + rdd.setName(name) + this + } } object JavaRDD { diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 924d8af060..ebbbbd8806 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -245,6 +245,11 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { } /** + * Return an array that contains all of the elements in this RDD. + */ + def toArray(): JList[T] = collect() + + /** * Return an array that contains all of the elements in a specific partition of this RDD. */ def collectPartitions(partitionIds: Array[Int]): Array[JList[T]] = { @@ -455,4 +460,5 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { */ def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD) + def name(): String = rdd.name } diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 0680a065a3..7a6f044965 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -411,10 +411,82 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork * changed at runtime. */ def getConf: SparkConf = sc.getConf + + /** + * Pass-through to SparkContext.setCallSite. For API support only. + */ + def setCallSite(site: String) { + sc.setCallSite(site) + } + + /** + * Pass-through to SparkContext.setCallSite. For API support only. + */ + def clearCallSite() { + sc.clearCallSite() + } + + /** + * Set a local property that affects jobs submitted from this thread, such as the + * Spark fair scheduler pool. + */ + def setLocalProperty(key: String, value: String): Unit = sc.setLocalProperty(key, value) + + /** + * Get a local property set in this thread, or null if it is missing. See + * [[org.apache.spark.api.java.JavaSparkContext.setLocalProperty]]. + */ + def getLocalProperty(key: String): String = sc.getLocalProperty(key) + + /** + * Assigns a group ID to all the jobs started by this thread until the group ID is set to a + * different value or cleared. + * + * Often, a unit of execution in an application consists of multiple Spark actions or jobs. + * Application programmers can use this method to group all those jobs together and give a + * group description. Once set, the Spark web UI will associate such jobs with this group. + * + * The application can also use [[org.apache.spark.api.java.JavaSparkContext.cancelJobGroup]] + * to cancel all running jobs in this group. For example, + * {{{ + * // In the main thread: + * sc.setJobGroup("some_job_to_cancel", "some job description"); + * rdd.map(...).count(); + * + * // In a separate thread: + * sc.cancelJobGroup("some_job_to_cancel"); + * }}} + */ + def setJobGroup(groupId: String, description: String): Unit = sc.setJobGroup(groupId, description) + + /** Clear the current thread's job group ID and its description. */ + def clearJobGroup(): Unit = sc.clearJobGroup() + + /** + * Cancel active jobs for the specified group. See + * [[org.apache.spark.api.java.JavaSparkContext.setJobGroup]] for more information. + */ + def cancelJobGroup(groupId: String): Unit = sc.cancelJobGroup(groupId) + + /** Cancel all jobs that have been scheduled or are running. */ + def cancelAllJobs(): Unit = sc.cancelAllJobs() } object JavaSparkContext { implicit def fromSparkContext(sc: SparkContext): JavaSparkContext = new JavaSparkContext(sc) implicit def toSparkContext(jsc: JavaSparkContext): SparkContext = jsc.sc + + /** + * Find the JAR from which a given class was loaded, to make it easy for users to pass + * their JARs to SparkContext. + */ + def jarOfClass(cls: Class[_]): Array[String] = SparkContext.jarOfClass(cls).toArray + + /** + * Find the JAR that contains the class of a particular object, to make it easy for users + * to pass their JARs to SparkContext. In most cases you can call jarOfObject(this) in + * your driver program. + */ + def jarOfObject(obj: AnyRef): Array[String] = SparkContext.jarOfObject(obj).toArray } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 32cc70e8c9..40c519b5bd 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -41,7 +41,7 @@ private[spark] class PythonRDD[T: ClassTag]( accumulator: Accumulator[JList[Array[Byte]]]) extends RDD[Array[Byte]](parent) { - val bufferSize = conf.get("spark.buffer.size", "65536").toInt + val bufferSize = conf.getInt("spark.buffer.size", 65536) override def getPartitions = parent.partitions @@ -250,7 +250,7 @@ private class PythonAccumulatorParam(@transient serverHost: String, serverPort: Utils.checkHost(serverHost, "Expected hostname") - val bufferSize = SparkEnv.get.conf.get("spark.buffer.size", "65536").toInt + val bufferSize = SparkEnv.get.conf.getInt("spark.buffer.size", 65536) override def zero(value: JList[Array[Byte]]): JList[Array[Byte]] = new JArrayList diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index db596d5fcc..0eacda3d7d 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -92,8 +92,8 @@ private object HttpBroadcast extends Logging { def initialize(isDriver: Boolean, conf: SparkConf) { synchronized { if (!initialized) { - bufferSize = conf.get("spark.buffer.size", "65536").toInt - compress = conf.get("spark.broadcast.compress", "true").toBoolean + bufferSize = conf.getInt("spark.buffer.size", 65536) + compress = conf.getBoolean("spark.broadcast.compress", true) if (isDriver) { createServer(conf) conf.set("spark.httpBroadcast.uri", serverUri) diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 9530938278..fdf92eca4f 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -180,7 +180,7 @@ extends Logging { initialized = false } - lazy val BLOCK_SIZE = conf.get("spark.broadcast.blockSize", "4096").toInt * 1024 + lazy val BLOCK_SIZE = conf.getInt("spark.broadcast.blockSize", 4096) * 1024 def blockifyObject[T](obj: T): TorrentInfo = { val byteArray = Utils.serialize[T](obj) diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index 19d393a0db..e38459b883 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy private[spark] class ApplicationDescription( val name: String, - val maxCores: Int, /* Integer.MAX_VALUE denotes an unlimited number of cores */ + val maxCores: Option[Int], val memoryPerSlave: Int, val command: Command, val sparkHome: String, diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala new file mode 100644 index 0000000000..e133893f6c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import scala.collection.JavaConversions._ +import scala.collection.mutable.Map +import scala.concurrent._ + +import akka.actor._ +import akka.pattern.ask +import org.apache.log4j.{Level, Logger} + +import org.apache.spark.{Logging, SparkConf} +import org.apache.spark.deploy.DeployMessages._ +import org.apache.spark.deploy.master.{DriverState, Master} +import org.apache.spark.util.{AkkaUtils, Utils} +import akka.actor.Actor.emptyBehavior +import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent} + +/** + * Proxy that relays messages to the driver. + */ +class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends Actor with Logging { + var masterActor: ActorSelection = _ + val timeout = AkkaUtils.askTimeout(conf) + + override def preStart() = { + masterActor = context.actorSelection(Master.toAkkaUrl(driverArgs.master)) + + context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) + + println(s"Sending ${driverArgs.cmd} command to ${driverArgs.master}") + + driverArgs.cmd match { + case "launch" => + // TODO: We could add an env variable here and intercept it in `sc.addJar` that would + // truncate filesystem paths similar to what YARN does. For now, we just require + // people call `addJar` assuming the jar is in the same directory. + val env = Map[String, String]() + System.getenv().foreach{case (k, v) => env(k) = v} + + val mainClass = "org.apache.spark.deploy.worker.DriverWrapper" + val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++ + driverArgs.driverOptions, env) + + val driverDescription = new DriverDescription( + driverArgs.jarUrl, + driverArgs.memory, + driverArgs.cores, + driverArgs.supervise, + command) + + masterActor ! RequestSubmitDriver(driverDescription) + + case "kill" => + val driverId = driverArgs.driverId + val killFuture = masterActor ! RequestKillDriver(driverId) + } + } + + /* Find out driver status then exit the JVM */ + def pollAndReportStatus(driverId: String) { + println(s"... waiting before polling master for driver state") + Thread.sleep(5000) + println("... polling master for driver state") + val statusFuture = (masterActor ? RequestDriverStatus(driverId))(timeout) + .mapTo[DriverStatusResponse] + val statusResponse = Await.result(statusFuture, timeout) + + statusResponse.found match { + case false => + println(s"ERROR: Cluster master did not recognize $driverId") + System.exit(-1) + case true => + println(s"State of $driverId is ${statusResponse.state.get}") + // Worker node, if present + (statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match { + case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) => + println(s"Driver running on $hostPort ($id)") + case _ => + } + // Exception, if present + statusResponse.exception.map { e => + println(s"Exception from cluster was: $e") + System.exit(-1) + } + System.exit(0) + } + } + + override def receive = { + + case SubmitDriverResponse(success, driverId, message) => + println(message) + if (success) pollAndReportStatus(driverId.get) else System.exit(-1) + + case KillDriverResponse(driverId, success, message) => + println(message) + if (success) pollAndReportStatus(driverId) else System.exit(-1) + + case DisassociatedEvent(_, remoteAddress, _) => + println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.") + System.exit(-1) + + case AssociationErrorEvent(cause, _, remoteAddress, _) => + println(s"Error connecting to master ${driverArgs.master} ($remoteAddress), exiting.") + println(s"Cause was: $cause") + System.exit(-1) + } +} + +/** + * Executable utility for starting and terminating drivers inside of a standalone cluster. + */ +object Client { + def main(args: Array[String]) { + val conf = new SparkConf() + val driverArgs = new ClientArguments(args) + + if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) { + conf.set("spark.akka.logLifecycleEvents", "true") + } + conf.set("spark.akka.askTimeout", "10") + conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING")) + Logger.getRootLogger.setLevel(driverArgs.logLevel) + + // TODO: See if we can initialize akka so return messages are sent back using the same TCP + // flow. Else, this (sadly) requires the DriverClient be routable from the Master. + val (actorSystem, _) = AkkaUtils.createActorSystem( + "driverClient", Utils.localHostName(), 0, false, conf) + + actorSystem.actorOf(Props(classOf[ClientActor], driverArgs, conf)) + + actorSystem.awaitTermination() + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala new file mode 100644 index 0000000000..db67c6d1bb --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/ClientArguments.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import java.net.URL + +import scala.collection.mutable.ListBuffer + +import org.apache.log4j.Level + +/** + * Command-line parser for the driver client. + */ +private[spark] class ClientArguments(args: Array[String]) { + val defaultCores = 1 + val defaultMemory = 512 + + var cmd: String = "" // 'launch' or 'kill' + var logLevel = Level.WARN + + // launch parameters + var master: String = "" + var jarUrl: String = "" + var mainClass: String = "" + var supervise: Boolean = false + var memory: Int = defaultMemory + var cores: Int = defaultCores + private var _driverOptions = ListBuffer[String]() + def driverOptions = _driverOptions.toSeq + + // kill parameters + var driverId: String = "" + + parse(args.toList) + + def parse(args: List[String]): Unit = args match { + case ("--cores" | "-c") :: value :: tail => + cores = value.toInt + parse(tail) + + case ("--memory" | "-m") :: value :: tail => + memory = value.toInt + parse(tail) + + case ("--supervise" | "-s") :: tail => + supervise = true + parse(tail) + + case ("--help" | "-h") :: tail => + printUsageAndExit(0) + + case ("--verbose" | "-v") :: tail => + logLevel = Level.INFO + parse(tail) + + case "launch" :: _master :: _jarUrl :: _mainClass :: tail => + cmd = "launch" + + try { + new URL(_jarUrl) + } catch { + case e: Exception => + println(s"Jar url '${_jarUrl}' is not a valid URL.") + println(s"Jar must be in URL format (e.g. hdfs://XX, file://XX)") + printUsageAndExit(-1) + } + + jarUrl = _jarUrl + master = _master + mainClass = _mainClass + _driverOptions ++= tail + + case "kill" :: _master :: _driverId :: tail => + cmd = "kill" + master = _master + driverId = _driverId + + case _ => + printUsageAndExit(1) + } + + /** + * Print usage and exit JVM with the given exit code. + */ + def printUsageAndExit(exitCode: Int) { + // TODO: It wouldn't be too hard to allow users to submit their app and dependency jars + // separately similar to in the YARN client. + val usage = + s""" + |Usage: DriverClient [options] launch <active-master> <jar-url> <main-class> [driver options] + |Usage: DriverClient kill <active-master> <driver-id> + | + |Options: + | -c CORES, --cores CORES Number of cores to request (default: $defaultCores) + | -m MEMORY, --memory MEMORY Megabytes of memory to request (default: $defaultMemory) + | -s, --supervise Whether to restart the driver on failure + | -v, --verbose Print more debugging output + """.stripMargin + System.err.println(usage) + System.exit(exitCode) + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 275331724a..5e824e1a67 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -20,12 +20,12 @@ package org.apache.spark.deploy import scala.collection.immutable.List import org.apache.spark.deploy.ExecutorState.ExecutorState -import org.apache.spark.deploy.master.{WorkerInfo, ApplicationInfo} +import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo} +import org.apache.spark.deploy.master.DriverState.DriverState import org.apache.spark.deploy.master.RecoveryState.MasterState -import org.apache.spark.deploy.worker.ExecutorRunner +import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner} import org.apache.spark.util.Utils - private[deploy] sealed trait DeployMessage extends Serializable /** Contains messages sent between Scheduler actor nodes. */ @@ -54,7 +54,14 @@ private[deploy] object DeployMessages { exitStatus: Option[Int]) extends DeployMessage - case class WorkerSchedulerStateResponse(id: String, executors: List[ExecutorDescription]) + case class DriverStateChanged( + driverId: String, + state: DriverState, + exception: Option[Exception]) + extends DeployMessage + + case class WorkerSchedulerStateResponse(id: String, executors: List[ExecutorDescription], + driverIds: Seq[String]) case class Heartbeat(workerId: String) extends DeployMessage @@ -76,14 +83,18 @@ private[deploy] object DeployMessages { sparkHome: String) extends DeployMessage - // Client to Master + case class LaunchDriver(driverId: String, driverDesc: DriverDescription) extends DeployMessage + + case class KillDriver(driverId: String) extends DeployMessage + + // AppClient to Master case class RegisterApplication(appDescription: ApplicationDescription) extends DeployMessage case class MasterChangeAcknowledged(appId: String) - // Master to Client + // Master to AppClient case class RegisteredApplication(appId: String, masterUrl: String) extends DeployMessage @@ -97,11 +108,28 @@ private[deploy] object DeployMessages { case class ApplicationRemoved(message: String) - // Internal message in Client + // DriverClient <-> Master + + case class RequestSubmitDriver(driverDescription: DriverDescription) extends DeployMessage + + case class SubmitDriverResponse(success: Boolean, driverId: Option[String], message: String) + extends DeployMessage + + case class RequestKillDriver(driverId: String) extends DeployMessage + + case class KillDriverResponse(driverId: String, success: Boolean, message: String) + extends DeployMessage + + case class RequestDriverStatus(driverId: String) extends DeployMessage + + case class DriverStatusResponse(found: Boolean, state: Option[DriverState], + workerId: Option[String], workerHostPort: Option[String], exception: Option[Exception]) + + // Internal message in AppClient - case object StopClient + case object StopAppClient - // Master to Worker & Client + // Master to Worker & AppClient case class MasterChanged(masterUrl: String, masterWebUiUrl: String) @@ -113,6 +141,7 @@ private[deploy] object DeployMessages { case class MasterStateResponse(host: String, port: Int, workers: Array[WorkerInfo], activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo], + activeDrivers: Array[DriverInfo], completedDrivers: Array[DriverInfo], status: MasterState) { Utils.checkHost(host, "Required hostname") @@ -128,14 +157,15 @@ private[deploy] object DeployMessages { // Worker to WorkerWebUI case class WorkerStateResponse(host: String, port: Int, workerId: String, - executors: List[ExecutorRunner], finishedExecutors: List[ExecutorRunner], masterUrl: String, + executors: List[ExecutorRunner], finishedExecutors: List[ExecutorRunner], + drivers: List[DriverRunner], finishedDrivers: List[DriverRunner], masterUrl: String, cores: Int, memory: Int, coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String) { Utils.checkHost(host, "Required hostname") assert (port > 0) } - // Actor System to Worker + // Liveness checks in various places case object SendHeartbeat } diff --git a/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala new file mode 100644 index 0000000000..58c95dc4f9 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/DriverDescription.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +private[spark] class DriverDescription( + val jarUrl: String, + val mem: Int, + val cores: Int, + val supervise: Boolean, + val command: Command) + extends Serializable { + + override def toString: String = s"DriverDescription (${command.mainClass})" +} diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 9bbd635ab9..1415e2f3d1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -24,7 +24,8 @@ import scala.concurrent.duration._ import akka.actor._ import akka.pattern.ask -import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} +import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent} + import org.apache.spark.{Logging, SparkConf, SparkException} import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ @@ -32,16 +33,17 @@ import org.apache.spark.deploy.master.Master import org.apache.spark.util.AkkaUtils /** - * The main class used to talk to a Spark deploy cluster. Takes a master URL, an app description, - * and a listener for cluster events, and calls back the listener when various events occur. + * Interface allowing applications to speak with a Spark deploy cluster. Takes a master URL, + * an app description, and a listener for cluster events, and calls back the listener when various + * events occur. * * @param masterUrls Each url should look like spark://host:port. */ -private[spark] class Client( +private[spark] class AppClient( actorSystem: ActorSystem, masterUrls: Array[String], appDescription: ApplicationDescription, - listener: ClientListener, + listener: AppClientListener, conf: SparkConf) extends Logging { @@ -110,6 +112,12 @@ private[spark] class Client( } } + private def isPossibleMaster(remoteUrl: Address) = { + masterUrls.map(s => Master.toAkkaUrl(s)) + .map(u => AddressFromURIString(u).hostPort) + .contains(remoteUrl.hostPort) + } + override def receive = { case RegisteredApplication(appId_, masterUrl) => appId = appId_ @@ -145,7 +153,10 @@ private[spark] class Client( logWarning(s"Connection to $address failed; waiting for master to reconnect...") markDisconnected() - case StopClient => + case AssociationErrorEvent(cause, _, address, _) if isPossibleMaster(address) => + logWarning(s"Could not connect to $address: $cause") + + case StopAppClient => markDead() sender ! true context.stop(self) @@ -178,7 +189,7 @@ private[spark] class Client( if (actor != null) { try { val timeout = AkkaUtils.askTimeout(conf) - val future = actor.ask(StopClient)(timeout) + val future = actor.ask(StopAppClient)(timeout) Await.result(future, timeout) } catch { case e: TimeoutException => diff --git a/core/src/main/scala/org/apache/spark/deploy/client/ClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala index be7a11bd15..55d4ef1b31 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/ClientListener.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClientListener.scala @@ -24,7 +24,7 @@ package org.apache.spark.deploy.client * * Users of this API should *not* block inside the callback methods. */ -private[spark] trait ClientListener { +private[spark] trait AppClientListener { def connected(appId: String): Unit /** Disconnection may be a temporary state, as we fail over to a new Master. */ diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala index ef649fd80c..ffa909c26b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala @@ -23,7 +23,7 @@ import org.apache.spark.deploy.{Command, ApplicationDescription} private[spark] object TestClient { - class TestListener extends ClientListener with Logging { + class TestListener extends AppClientListener with Logging { def connected(id: String) { logInfo("Connected to master, got app ID " + id) } @@ -48,10 +48,10 @@ private[spark] object TestClient { val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0, conf = new SparkConf) val desc = new ApplicationDescription( - "TestClient", 1, 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()), + "TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()), "dummy-spark-home", "ignored") val listener = new TestListener - val client = new Client(actorSystem, Array(url), desc, listener, new SparkConf) + val client = new AppClient(actorSystem, Array(url), desc, listener, new SparkConf) client.start() actorSystem.awaitTermination() } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index 5150b7c7de..3e26379166 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -28,7 +28,8 @@ private[spark] class ApplicationInfo( val desc: ApplicationDescription, val submitDate: Date, val driver: ActorRef, - val appUiUrl: String) + val appUiUrl: String, + defaultCores: Int) extends Serializable { @transient var state: ApplicationState.Value = _ @@ -81,7 +82,9 @@ private[spark] class ApplicationInfo( } } - def coresLeft: Int = desc.maxCores - coresGranted + private val myMaxCores = desc.maxCores.getOrElse(defaultCores) + + def coresLeft: Int = myMaxCores - coresGranted private var _retryCount = 0 diff --git a/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala new file mode 100644 index 0000000000..33377931d6 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/master/DriverInfo.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.master + +import java.util.Date + +import org.apache.spark.deploy.DriverDescription + +private[spark] class DriverInfo( + val startTime: Long, + val id: String, + val desc: DriverDescription, + val submitDate: Date) + extends Serializable { + + @transient var state: DriverState.Value = DriverState.SUBMITTED + /* If we fail when launching the driver, the exception is stored here. */ + @transient var exception: Option[Exception] = None + /* Most recent worker assigned to this driver */ + @transient var worker: Option[WorkerInfo] = None +} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/DriverState.scala b/core/src/main/scala/org/apache/spark/deploy/master/DriverState.scala new file mode 100644 index 0000000000..26a68bade3 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/master/DriverState.scala @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.master + +private[spark] object DriverState extends Enumeration { + + type DriverState = Value + + // SUBMITTED: Submitted but not yet scheduled on a worker + // RUNNING: Has been allocated to a worker to run + // FINISHED: Previously ran and exited cleanly + // RELAUNCHING: Exited non-zero or due to worker failure, but has not yet started running again + // UNKNOWN: The state of the driver is temporarily not known due to master failure recovery + // KILLED: A user manually killed this driver + // FAILED: The driver exited non-zero and was not supervised + // ERROR: Unable to run or restart due to an unrecoverable error (e.g. missing jar file) + val SUBMITTED, RUNNING, FINISHED, RELAUNCHING, UNKNOWN, KILLED, FAILED, ERROR = Value +} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala index 043945a211..74bb9ebf1d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala @@ -19,8 +19,6 @@ package org.apache.spark.deploy.master import java.io._ -import scala.Serializable - import akka.serialization.Serialization import org.apache.spark.Logging @@ -47,6 +45,15 @@ private[spark] class FileSystemPersistenceEngine( new File(dir + File.separator + "app_" + app.id).delete() } + override def addDriver(driver: DriverInfo) { + val driverFile = new File(dir + File.separator + "driver_" + driver.id) + serializeIntoFile(driverFile, driver) + } + + override def removeDriver(driver: DriverInfo) { + new File(dir + File.separator + "driver_" + driver.id).delete() + } + override def addWorker(worker: WorkerInfo) { val workerFile = new File(dir + File.separator + "worker_" + worker.id) serializeIntoFile(workerFile, worker) @@ -56,13 +63,15 @@ private[spark] class FileSystemPersistenceEngine( new File(dir + File.separator + "worker_" + worker.id).delete() } - override def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo]) = { + override def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = { val sortedFiles = new File(dir).listFiles().sortBy(_.getName) val appFiles = sortedFiles.filter(_.getName.startsWith("app_")) val apps = appFiles.map(deserializeFromFile[ApplicationInfo]) + val driverFiles = sortedFiles.filter(_.getName.startsWith("driver_")) + val drivers = driverFiles.map(deserializeFromFile[DriverInfo]) val workerFiles = sortedFiles.filter(_.getName.startsWith("worker_")) val workers = workerFiles.map(deserializeFromFile[WorkerInfo]) - (apps, workers) + (apps, drivers, workers) } private def serializeIntoFile(file: File, value: AnyRef) { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 7b696cfcca..d9ea96afcf 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -23,19 +23,22 @@ import java.util.Date import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.concurrent.Await import scala.concurrent.duration._ +import scala.util.Random import akka.actor._ import akka.pattern.ask import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} import akka.serialization.SerializationExtension -import org.apache.spark.{SparkConf, SparkContext, Logging, SparkException} -import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} + +import org.apache.spark.{SparkConf, Logging, SparkException} +import org.apache.spark.deploy.{ApplicationDescription, DriverDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.MasterMessages._ import org.apache.spark.deploy.master.ui.MasterWebUI import org.apache.spark.metrics.MetricsSystem import org.apache.spark.util.{AkkaUtils, Utils} +import org.apache.spark.deploy.master.DriverState.DriverState private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging { import context.dispatcher // to use Akka's scheduler.schedule() @@ -43,13 +46,12 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act val conf = new SparkConf val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs - val WORKER_TIMEOUT = conf.get("spark.worker.timeout", "60").toLong * 1000 - val RETAINED_APPLICATIONS = conf.get("spark.deploy.retainedApplications", "200").toInt - val REAPER_ITERATIONS = conf.get("spark.dead.worker.persistence", "15").toInt + val WORKER_TIMEOUT = conf.getLong("spark.worker.timeout", 60) * 1000 + val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 200) + val REAPER_ITERATIONS = conf.getInt("spark.dead.worker.persistence", 15) val RECOVERY_DIR = conf.get("spark.deploy.recoveryDirectory", "") val RECOVERY_MODE = conf.get("spark.deploy.recoveryMode", "NONE") - var nextAppNumber = 0 val workers = new HashSet[WorkerInfo] val idToWorker = new HashMap[String, WorkerInfo] val actorToWorker = new HashMap[ActorRef, WorkerInfo] @@ -59,9 +61,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act val idToApp = new HashMap[String, ApplicationInfo] val actorToApp = new HashMap[ActorRef, ApplicationInfo] val addressToApp = new HashMap[Address, ApplicationInfo] - val waitingApps = new ArrayBuffer[ApplicationInfo] val completedApps = new ArrayBuffer[ApplicationInfo] + var nextAppNumber = 0 + + val drivers = new HashSet[DriverInfo] + val completedDrivers = new ArrayBuffer[DriverInfo] + val waitingDrivers = new ArrayBuffer[DriverInfo] // Drivers currently spooled for scheduling + var nextDriverNumber = 0 Utils.checkHost(host, "Expected hostname") @@ -88,7 +95,13 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act // As a temporary workaround before better ways of configuring memory, we allow users to set // a flag that will perform round-robin scheduling across the nodes (spreading out each app // among all the nodes) instead of trying to consolidate each app onto a small # of nodes. - val spreadOutApps = conf.get("spark.deploy.spreadOut", "true").toBoolean + val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true) + + // Default maxCores for applications that don't specify it (i.e. pass Int.MaxValue) + val defaultCores = conf.getInt("spark.deploy.defaultCores", Int.MaxValue) + if (defaultCores < 1) { + throw new SparkException("spark.deploy.defaultCores must be positive") + } override def preStart() { logInfo("Starting Spark master at " + masterUrl) @@ -136,14 +149,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act override def receive = { case ElectedLeader => { - val (storedApps, storedWorkers) = persistenceEngine.readPersistedData() - state = if (storedApps.isEmpty && storedWorkers.isEmpty) + val (storedApps, storedDrivers, storedWorkers) = persistenceEngine.readPersistedData() + state = if (storedApps.isEmpty && storedDrivers.isEmpty && storedWorkers.isEmpty) RecoveryState.ALIVE else RecoveryState.RECOVERING logInfo("I have been elected leader! New state: " + state) if (state == RecoveryState.RECOVERING) { - beginRecovery(storedApps, storedWorkers) + beginRecovery(storedApps, storedDrivers, storedWorkers) context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() } } } @@ -170,6 +183,69 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } } + case RequestSubmitDriver(description) => { + if (state != RecoveryState.ALIVE) { + val msg = s"Can only accept driver submissions in ALIVE state. Current state: $state." + sender ! SubmitDriverResponse(false, None, msg) + } else { + logInfo("Driver submitted " + description.command.mainClass) + val driver = createDriver(description) + persistenceEngine.addDriver(driver) + waitingDrivers += driver + drivers.add(driver) + schedule() + + // TODO: It might be good to instead have the submission client poll the master to determine + // the current status of the driver. For now it's simply "fire and forget". + + sender ! SubmitDriverResponse(true, Some(driver.id), + s"Driver successfully submitted as ${driver.id}") + } + } + + case RequestKillDriver(driverId) => { + if (state != RecoveryState.ALIVE) { + val msg = s"Can only kill drivers in ALIVE state. Current state: $state." + sender ! KillDriverResponse(driverId, success = false, msg) + } else { + logInfo("Asked to kill driver " + driverId) + val driver = drivers.find(_.id == driverId) + driver match { + case Some(d) => + if (waitingDrivers.contains(d)) { + waitingDrivers -= d + self ! DriverStateChanged(driverId, DriverState.KILLED, None) + } + else { + // We just notify the worker to kill the driver here. The final bookkeeping occurs + // on the return path when the worker submits a state change back to the master + // to notify it that the driver was successfully killed. + d.worker.foreach { w => + w.actor ! KillDriver(driverId) + } + } + // TODO: It would be nice for this to be a synchronous response + val msg = s"Kill request for $driverId submitted" + logInfo(msg) + sender ! KillDriverResponse(driverId, success = true, msg) + case None => + val msg = s"Driver $driverId has already finished or does not exist" + logWarning(msg) + sender ! KillDriverResponse(driverId, success = false, msg) + } + } + } + + case RequestDriverStatus(driverId) => { + (drivers ++ completedDrivers).find(_.id == driverId) match { + case Some(driver) => + sender ! DriverStatusResponse(found = true, Some(driver.state), + driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception) + case None => + sender ! DriverStatusResponse(found = false, None, None, None, None) + } + } + case RegisterApplication(description) => { if (state == RecoveryState.STANDBY) { // ignore, don't send response @@ -212,6 +288,15 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } } + case DriverStateChanged(driverId, state, exception) => { + state match { + case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED => + removeDriver(driverId, state, exception) + case _ => + throw new Exception(s"Received unexpected state update for driver $driverId: $state") + } + } + case Heartbeat(workerId) => { idToWorker.get(workerId) match { case Some(workerInfo) => @@ -233,7 +318,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act if (canCompleteRecovery) { completeRecovery() } } - case WorkerSchedulerStateResponse(workerId, executors) => { + case WorkerSchedulerStateResponse(workerId, executors, driverIds) => { idToWorker.get(workerId) match { case Some(worker) => logInfo("Worker has been re-registered: " + workerId) @@ -246,6 +331,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act worker.addExecutor(execInfo) execInfo.copyState(exec) } + + for (driverId <- driverIds) { + drivers.find(_.id == driverId).foreach { driver => + driver.worker = Some(worker) + driver.state = DriverState.RUNNING + worker.drivers(driverId) = driver + } + } case None => logWarning("Scheduler state from unknown worker: " + workerId) } @@ -263,7 +356,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act case RequestMasterState => { sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray, - state) + drivers.toArray, completedDrivers.toArray, state) } case CheckForWorkerTimeOut => { @@ -279,7 +372,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act workers.count(_.state == WorkerState.UNKNOWN) == 0 && apps.count(_.state == ApplicationState.UNKNOWN) == 0 - def beginRecovery(storedApps: Seq[ApplicationInfo], storedWorkers: Seq[WorkerInfo]) { + def beginRecovery(storedApps: Seq[ApplicationInfo], storedDrivers: Seq[DriverInfo], + storedWorkers: Seq[WorkerInfo]) { for (app <- storedApps) { logInfo("Trying to recover app: " + app.id) try { @@ -291,6 +385,12 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } } + for (driver <- storedDrivers) { + // Here we just read in the list of drivers. Any drivers associated with now-lost workers + // will be re-launched when we detect that the worker is missing. + drivers += driver + } + for (worker <- storedWorkers) { logInfo("Trying to recover worker: " + worker.id) try { @@ -314,6 +414,18 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker) apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication) + // Reschedule drivers which were not claimed by any workers + drivers.filter(_.worker.isEmpty).foreach { d => + logWarning(s"Driver ${d.id} was not found after master recovery") + if (d.desc.supervise) { + logWarning(s"Re-launching ${d.id}") + relaunchDriver(d) + } else { + removeDriver(d.id, DriverState.ERROR, None) + logWarning(s"Did not re-launch ${d.id} because it was not supervised") + } + } + state = RecoveryState.ALIVE schedule() logInfo("Recovery complete - resuming operations!") @@ -334,6 +446,18 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act */ def schedule() { if (state != RecoveryState.ALIVE) { return } + + // First schedule drivers, they take strict precedence over applications + val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers + for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) { + for (driver <- waitingDrivers) { + if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { + launchDriver(worker, driver) + waitingDrivers -= driver + } + } + } + // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app // in the queue, then the second app, etc. if (spreadOutApps) { @@ -420,13 +544,30 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act exec.id, ExecutorState.LOST, Some("worker lost"), None) exec.application.removeExecutor(exec) } + for (driver <- worker.drivers.values) { + if (driver.desc.supervise) { + logInfo(s"Re-launching ${driver.id}") + relaunchDriver(driver) + } else { + logInfo(s"Not re-launching ${driver.id} because it was not supervised") + removeDriver(driver.id, DriverState.ERROR, None) + } + } persistenceEngine.removeWorker(worker) } + def relaunchDriver(driver: DriverInfo) { + driver.worker = None + driver.state = DriverState.RELAUNCHING + waitingDrivers += driver + schedule() + } + def createApplication(desc: ApplicationDescription, driver: ActorRef): ApplicationInfo = { val now = System.currentTimeMillis() val date = new Date(now) - new ApplicationInfo(now, newApplicationId(date), desc, date, driver, desc.appUiUrl) + new ApplicationInfo( + now, newApplicationId(date), desc, date, driver, desc.appUiUrl, defaultCores) } def registerApplication(app: ApplicationInfo): Unit = { @@ -501,6 +642,41 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } } } + + def newDriverId(submitDate: Date): String = { + val appId = "driver-%s-%04d".format(DATE_FORMAT.format(submitDate), nextDriverNumber) + nextDriverNumber += 1 + appId + } + + def createDriver(desc: DriverDescription): DriverInfo = { + val now = System.currentTimeMillis() + val date = new Date(now) + new DriverInfo(now, newDriverId(date), desc, date) + } + + def launchDriver(worker: WorkerInfo, driver: DriverInfo) { + logInfo("Launching driver " + driver.id + " on worker " + worker.id) + worker.addDriver(driver) + driver.worker = Some(worker) + worker.actor ! LaunchDriver(driver.id, driver.desc) + driver.state = DriverState.RUNNING + } + + def removeDriver(driverId: String, finalState: DriverState, exception: Option[Exception]) { + drivers.find(d => d.id == driverId) match { + case Some(driver) => + logInfo(s"Removing driver: $driverId") + drivers -= driver + completedDrivers += driver + persistenceEngine.removeDriver(driver) + driver.state = finalState + driver.exception = exception + driver.worker.foreach(w => w.removeDriver(driver)) + case None => + logWarning(s"Asked to remove unknown driver: $driverId") + } + } } private[spark] object Master { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala index 94b986caf2..e3640ea4f7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala @@ -35,11 +35,15 @@ private[spark] trait PersistenceEngine { def removeWorker(worker: WorkerInfo) + def addDriver(driver: DriverInfo) + + def removeDriver(driver: DriverInfo) + /** * Returns the persisted data sorted by their respective ids (which implies that they're * sorted by time of creation). */ - def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo]) + def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) def close() {} } @@ -49,5 +53,8 @@ private[spark] class BlackHolePersistenceEngine extends PersistenceEngine { override def removeApplication(app: ApplicationInfo) {} override def addWorker(worker: WorkerInfo) {} override def removeWorker(worker: WorkerInfo) {} - override def readPersistedData() = (Nil, Nil) + override def addDriver(driver: DriverInfo) {} + override def removeDriver(driver: DriverInfo) {} + + override def readPersistedData() = (Nil, Nil, Nil) } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala index e05f587b58..c5fa9cf7d7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala @@ -17,8 +17,10 @@ package org.apache.spark.deploy.master -import akka.actor.ActorRef import scala.collection.mutable + +import akka.actor.ActorRef + import org.apache.spark.util.Utils private[spark] class WorkerInfo( @@ -35,7 +37,8 @@ private[spark] class WorkerInfo( Utils.checkHost(host, "Expected hostname") assert (port > 0) - @transient var executors: mutable.HashMap[String, ExecutorInfo] = _ // fullId => info + @transient var executors: mutable.HashMap[String, ExecutorInfo] = _ // executorId => info + @transient var drivers: mutable.HashMap[String, DriverInfo] = _ // driverId => info @transient var state: WorkerState.Value = _ @transient var coresUsed: Int = _ @transient var memoryUsed: Int = _ @@ -54,6 +57,7 @@ private[spark] class WorkerInfo( private def init() { executors = new mutable.HashMap + drivers = new mutable.HashMap state = WorkerState.ALIVE coresUsed = 0 memoryUsed = 0 @@ -83,6 +87,18 @@ private[spark] class WorkerInfo( executors.values.exists(_.application == app) } + def addDriver(driver: DriverInfo) { + drivers(driver.id) = driver + memoryUsed += driver.desc.mem + coresUsed += driver.desc.cores + } + + def removeDriver(driver: DriverInfo) { + drivers -= driver.id + memoryUsed -= driver.desc.mem + coresUsed -= driver.desc.cores + } + def webUiAddress : String = { "http://" + this.publicAddress + ":" + this.webUiPort } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala index 52000d4f9c..f24f49ea8a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala @@ -49,6 +49,14 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf) zk.delete(WORKING_DIR + "/app_" + app.id) } + override def addDriver(driver: DriverInfo) { + serializeIntoFile(WORKING_DIR + "/driver_" + driver.id, driver) + } + + override def removeDriver(driver: DriverInfo) { + zk.delete(WORKING_DIR + "/driver_" + driver.id) + } + override def addWorker(worker: WorkerInfo) { serializeIntoFile(WORKING_DIR + "/worker_" + worker.id, worker) } @@ -61,13 +69,15 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf) zk.close() } - override def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo]) = { + override def readPersistedData(): (Seq[ApplicationInfo], Seq[DriverInfo], Seq[WorkerInfo]) = { val sortedFiles = zk.getChildren(WORKING_DIR).toList.sorted val appFiles = sortedFiles.filter(_.startsWith("app_")) val apps = appFiles.map(deserializeFromFile[ApplicationInfo]) + val driverFiles = sortedFiles.filter(_.startsWith("driver_")) + val drivers = driverFiles.map(deserializeFromFile[DriverInfo]) val workerFiles = sortedFiles.filter(_.startsWith("worker_")) val workers = workerFiles.map(deserializeFromFile[WorkerInfo]) - (apps, workers) + (apps, drivers, workers) } private def serializeIntoFile(path: String, value: AnyRef) { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala index 4ef762892c..a9af8df552 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy.master.ui import scala.concurrent.Await +import scala.concurrent.duration._ import scala.xml.Node import akka.pattern.ask @@ -26,7 +27,7 @@ import net.liftweb.json.JsonAST.JValue import org.apache.spark.deploy.{DeployWebUI, JsonProtocol} import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} -import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo} +import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo} import org.apache.spark.ui.UIUtils import org.apache.spark.util.Utils @@ -56,6 +57,16 @@ private[spark] class IndexPage(parent: MasterWebUI) { val completedApps = state.completedApps.sortBy(_.endTime).reverse val completedAppsTable = UIUtils.listingTable(appHeaders, appRow, completedApps) + val driverHeaders = Seq("ID", "Submitted Time", "Worker", "State", "Cores", "Memory", "Main Class") + val activeDrivers = state.activeDrivers.sortBy(_.startTime).reverse + val activeDriversTable = UIUtils.listingTable(driverHeaders, driverRow, activeDrivers) + val completedDrivers = state.completedDrivers.sortBy(_.startTime).reverse + val completedDriversTable = UIUtils.listingTable(driverHeaders, driverRow, completedDrivers) + + // For now we only show driver information if the user has submitted drivers to the cluster. + // This is until we integrate the notion of drivers and applications in the UI. + def hasDrivers = activeDrivers.length > 0 || completedDrivers.length > 0 + val content = <div class="row-fluid"> <div class="span12"> @@ -70,6 +81,9 @@ private[spark] class IndexPage(parent: MasterWebUI) { <li><strong>Applications:</strong> {state.activeApps.size} Running, {state.completedApps.size} Completed </li> + <li><strong>Drivers:</strong> + {state.activeDrivers.size} Running, + {state.completedDrivers.size} Completed </li> </ul> </div> </div> @@ -84,17 +98,39 @@ private[spark] class IndexPage(parent: MasterWebUI) { <div class="row-fluid"> <div class="span12"> <h4> Running Applications </h4> - {activeAppsTable} </div> </div> + <div> + {if (hasDrivers) + <div class="row-fluid"> + <div class="span12"> + <h4> Running Drivers </h4> + {activeDriversTable} + </div> + </div> + } + </div> + <div class="row-fluid"> <div class="span12"> <h4> Completed Applications </h4> {completedAppsTable} </div> + </div> + + <div> + {if (hasDrivers) + <div class="row-fluid"> + <div class="span12"> + <h4> Completed Drivers </h4> + {completedDriversTable} + </div> + </div> + } </div>; + UIUtils.basicSparkPage(content, "Spark Master at " + state.uri) } @@ -134,4 +170,20 @@ private[spark] class IndexPage(parent: MasterWebUI) { <td>{DeployWebUI.formatDuration(app.duration)}</td> </tr> } + + def driverRow(driver: DriverInfo): Seq[Node] = { + <tr> + <td>{driver.id} </td> + <td>{driver.submitDate}</td> + <td>{driver.worker.map(w => <a href={w.webUiAddress}>{w.id.toString}</a>).getOrElse("None")}</td> + <td>{driver.state}</td> + <td sorttable_customkey={driver.desc.cores.toString}> + {driver.desc.cores} + </td> + <td sorttable_customkey={driver.desc.mem.toString}> + {Utils.megabytesToString(driver.desc.mem.toLong)} + </td> + <td>{driver.desc.command.arguments(1)}</td> + </tr> + } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala new file mode 100644 index 0000000000..7507bf8ad0 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala @@ -0,0 +1,63 @@ +package org.apache.spark.deploy.worker + +import java.io.{File, FileOutputStream, IOException, InputStream} +import java.lang.System._ + +import org.apache.spark.Logging +import org.apache.spark.deploy.Command +import org.apache.spark.util.Utils + +/** + ** Utilities for running commands with the spark classpath. + */ +object CommandUtils extends Logging { + private[spark] def buildCommandSeq(command: Command, memory: Int, sparkHome: String): Seq[String] = { + val runner = getEnv("JAVA_HOME", command).map(_ + "/bin/java").getOrElse("java") + + // SPARK-698: do not call the run.cmd script, as process.destroy() + // fails to kill a process tree on Windows + Seq(runner) ++ buildJavaOpts(command, memory, sparkHome) ++ Seq(command.mainClass) ++ + command.arguments + } + + private def getEnv(key: String, command: Command): Option[String] = + command.environment.get(key).orElse(Option(System.getenv(key))) + + /** + * Attention: this must always be aligned with the environment variables in the run scripts and + * the way the JAVA_OPTS are assembled there. + */ + def buildJavaOpts(command: Command, memory: Int, sparkHome: String): Seq[String] = { + val libraryOpts = getEnv("SPARK_LIBRARY_PATH", command) + .map(p => List("-Djava.library.path=" + p)) + .getOrElse(Nil) + val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS")).map(Utils.splitCommandString).getOrElse(Nil) + val userOpts = getEnv("SPARK_JAVA_OPTS", command).map(Utils.splitCommandString).getOrElse(Nil) + val memoryOpts = Seq(s"-Xms${memory}M", s"-Xmx${memory}M") + + // Figure out our classpath with the external compute-classpath script + val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh" + val classPath = Utils.executeAndGetOutput( + Seq(sparkHome + "/bin/compute-classpath" + ext), + extraEnvironment=command.environment) + + Seq("-cp", classPath) ++ libraryOpts ++ workerLocalOpts ++ userOpts ++ memoryOpts + } + + /** Spawn a thread that will redirect a given stream to a file */ + def redirectStream(in: InputStream, file: File) { + val out = new FileOutputStream(file, true) + // TODO: It would be nice to add a shutdown hook here that explains why the output is + // terminating. Otherwise if the worker dies the executor logs will silently stop. + new Thread("redirect output to " + file) { + override def run() { + try { + Utils.copyStream(in, out, true) + } catch { + case e: IOException => + logInfo("Redirection to " + file + " closed: " + e.getMessage) + } + } + }.start() + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala new file mode 100644 index 0000000000..b4df1a0dd4 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.worker + +import java.io._ + +import scala.collection.JavaConversions._ +import scala.collection.mutable.Map + +import akka.actor.ActorRef +import com.google.common.base.Charsets +import com.google.common.io.Files +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileUtil, Path} + +import org.apache.spark.Logging +import org.apache.spark.deploy.{Command, DriverDescription} +import org.apache.spark.deploy.DeployMessages.DriverStateChanged +import org.apache.spark.deploy.master.DriverState +import org.apache.spark.deploy.master.DriverState.DriverState + +/** + * Manages the execution of one driver, including automatically restarting the driver on failure. + */ +private[spark] class DriverRunner( + val driverId: String, + val workDir: File, + val sparkHome: File, + val driverDesc: DriverDescription, + val worker: ActorRef, + val workerUrl: String) + extends Logging { + + @volatile var process: Option[Process] = None + @volatile var killed = false + + // Populated once finished + var finalState: Option[DriverState] = None + var finalException: Option[Exception] = None + var finalExitCode: Option[Int] = None + + // Decoupled for testing + private[deploy] def setClock(_clock: Clock) = clock = _clock + private[deploy] def setSleeper(_sleeper: Sleeper) = sleeper = _sleeper + private var clock = new Clock { + def currentTimeMillis(): Long = System.currentTimeMillis() + } + private var sleeper = new Sleeper { + def sleep(seconds: Int): Unit = (0 until seconds).takeWhile(f => {Thread.sleep(1000); !killed}) + } + + /** Starts a thread to run and manage the driver. */ + def start() = { + new Thread("DriverRunner for " + driverId) { + override def run() { + try { + val driverDir = createWorkingDirectory() + val localJarFilename = downloadUserJar(driverDir) + + // Make sure user application jar is on the classpath + // TODO: If we add ability to submit multiple jars they should also be added here + val env = Map(driverDesc.command.environment.toSeq: _*) + env("SPARK_CLASSPATH") = env.getOrElse("SPARK_CLASSPATH", "") + s":$localJarFilename" + val newCommand = Command(driverDesc.command.mainClass, + driverDesc.command.arguments.map(substituteVariables), env) + val command = CommandUtils.buildCommandSeq(newCommand, driverDesc.mem, + sparkHome.getAbsolutePath) + launchDriver(command, env, driverDir, driverDesc.supervise) + } + catch { + case e: Exception => finalException = Some(e) + } + + val state = + if (killed) { DriverState.KILLED } + else if (finalException.isDefined) { DriverState.ERROR } + else { + finalExitCode match { + case Some(0) => DriverState.FINISHED + case _ => DriverState.FAILED + } + } + + finalState = Some(state) + + worker ! DriverStateChanged(driverId, state, finalException) + } + }.start() + } + + /** Terminate this driver (or prevent it from ever starting if not yet started) */ + def kill() { + synchronized { + process.foreach(p => p.destroy()) + killed = true + } + } + + /** Replace variables in a command argument passed to us */ + private def substituteVariables(argument: String): String = argument match { + case "{{WORKER_URL}}" => workerUrl + case other => other + } + + /** + * Creates the working directory for this driver. + * Will throw an exception if there are errors preparing the directory. + */ + private def createWorkingDirectory(): File = { + val driverDir = new File(workDir, driverId) + if (!driverDir.exists() && !driverDir.mkdirs()) { + throw new IOException("Failed to create directory " + driverDir) + } + driverDir + } + + /** + * Download the user jar into the supplied directory and return its local path. + * Will throw an exception if there are errors downloading the jar. + */ + private def downloadUserJar(driverDir: File): String = { + + val jarPath = new Path(driverDesc.jarUrl) + + val emptyConf = new Configuration() + val jarFileSystem = jarPath.getFileSystem(emptyConf) + + val destPath = new File(driverDir.getAbsolutePath, jarPath.getName) + val jarFileName = jarPath.getName + val localJarFile = new File(driverDir, jarFileName) + val localJarFilename = localJarFile.getAbsolutePath + + if (!localJarFile.exists()) { // May already exist if running multiple workers on one node + logInfo(s"Copying user jar $jarPath to $destPath") + FileUtil.copy(jarFileSystem, jarPath, destPath, false, emptyConf) + } + + if (!localJarFile.exists()) { // Verify copy succeeded + throw new Exception(s"Did not see expected jar $jarFileName in $driverDir") + } + + localJarFilename + } + + private def launchDriver(command: Seq[String], envVars: Map[String, String], baseDir: File, + supervise: Boolean) { + val builder = new ProcessBuilder(command: _*).directory(baseDir) + envVars.map{ case(k,v) => builder.environment().put(k, v) } + + def initialize(process: Process) = { + // Redirect stdout and stderr to files + val stdout = new File(baseDir, "stdout") + CommandUtils.redirectStream(process.getInputStream, stdout) + + val stderr = new File(baseDir, "stderr") + val header = "Launch Command: %s\n%s\n\n".format( + command.mkString("\"", "\" \"", "\""), "=" * 40) + Files.append(header, stderr, Charsets.UTF_8) + CommandUtils.redirectStream(process.getErrorStream, stderr) + } + runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise) + } + + private[deploy] def runCommandWithRetry(command: ProcessBuilderLike, initialize: Process => Unit, + supervise: Boolean) { + // Time to wait between submission retries. + var waitSeconds = 1 + // A run of this many seconds resets the exponential back-off. + val successfulRunDuration = 5 + + var keepTrying = !killed + + while (keepTrying) { + logInfo("Launch Command: " + command.command.mkString("\"", "\" \"", "\"")) + + synchronized { + if (killed) { return } + process = Some(command.start()) + initialize(process.get) + } + + val processStart = clock.currentTimeMillis() + val exitCode = process.get.waitFor() + if (clock.currentTimeMillis() - processStart > successfulRunDuration * 1000) { + waitSeconds = 1 + } + + if (supervise && exitCode != 0 && !killed) { + logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.") + sleeper.sleep(waitSeconds) + waitSeconds = waitSeconds * 2 // exponential back-off + } + + keepTrying = supervise && exitCode != 0 && !killed + finalExitCode = Some(exitCode) + } + } +} + +private[deploy] trait Clock { + def currentTimeMillis(): Long +} + +private[deploy] trait Sleeper { + def sleep(seconds: Int) +} + +// Needed because ProcessBuilder is a final class and cannot be mocked +private[deploy] trait ProcessBuilderLike { + def start(): Process + def command: Seq[String] +} + +private[deploy] object ProcessBuilderLike { + def apply(processBuilder: ProcessBuilder) = new ProcessBuilderLike { + def start() = processBuilder.start() + def command = processBuilder.command() + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala new file mode 100644 index 0000000000..1640d5fee0 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala @@ -0,0 +1,31 @@ +package org.apache.spark.deploy.worker + +import akka.actor._ + +import org.apache.spark.SparkConf +import org.apache.spark.util.{AkkaUtils, Utils} + +/** + * Utility object for launching driver programs such that they share fate with the Worker process. + */ +object DriverWrapper { + def main(args: Array[String]) { + args.toList match { + case workerUrl :: mainClass :: extraArgs => + val (actorSystem, _) = AkkaUtils.createActorSystem("Driver", + Utils.localHostName(), 0, false, new SparkConf()) + actorSystem.actorOf(Props(classOf[WorkerWatcher], workerUrl), name = "workerWatcher") + + // Delegate to supplied main class + val clazz = Class.forName(args(1)) + val mainMethod = clazz.getMethod("main", classOf[Array[String]]) + mainMethod.invoke(null, extraArgs.toArray[String]) + + actorSystem.shutdown() + + case _ => + System.err.println("Usage: DriverWrapper <workerUrl> <driverMainClass> [options]") + System.exit(-1) + } + } +}
\ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index fff9cb60c7..18885d7ca6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -18,17 +18,15 @@ package org.apache.spark.deploy.worker import java.io._ -import java.lang.System.getenv import akka.actor.ActorRef import com.google.common.base.Charsets import com.google.common.io.Files -import org.apache.spark.{Logging} -import org.apache.spark.deploy.{ExecutorState, ApplicationDescription} +import org.apache.spark.Logging +import org.apache.spark.deploy.{ExecutorState, ApplicationDescription, Command} import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged -import org.apache.spark.util.Utils /** * Manages the execution of one executor process. @@ -44,16 +42,17 @@ private[spark] class ExecutorRunner( val host: String, val sparkHome: File, val workDir: File, + val workerUrl: String, var state: ExecutorState.Value) extends Logging { val fullId = appId + "/" + execId var workerThread: Thread = null var process: Process = null - var shutdownHook: Thread = null - private def getAppEnv(key: String): Option[String] = - appDesc.command.environment.get(key).orElse(Option(getenv(key))) + // NOTE: This is now redundant with the automated shut-down enforced by the Executor. It might + // make sense to remove this in the future. + var shutdownHook: Thread = null def start() { workerThread = new Thread("ExecutorRunner for " + fullId) { @@ -92,55 +91,17 @@ private[spark] class ExecutorRunner( /** Replace variables such as {{EXECUTOR_ID}} and {{CORES}} in a command argument passed to us */ def substituteVariables(argument: String): String = argument match { + case "{{WORKER_URL}}" => workerUrl case "{{EXECUTOR_ID}}" => execId.toString case "{{HOSTNAME}}" => host case "{{CORES}}" => cores.toString case other => other } - def buildCommandSeq(): Seq[String] = { - val command = appDesc.command - val runner = getAppEnv("JAVA_HOME").map(_ + "/bin/java").getOrElse("java") - // SPARK-698: do not call the run.cmd script, as process.destroy() - // fails to kill a process tree on Windows - Seq(runner) ++ buildJavaOpts() ++ Seq(command.mainClass) ++ - (command.arguments ++ Seq(appId)).map(substituteVariables) - } - - /** - * Attention: this must always be aligned with the environment variables in the run scripts and - * the way the JAVA_OPTS are assembled there. - */ - def buildJavaOpts(): Seq[String] = { - val libraryOpts = getAppEnv("SPARK_LIBRARY_PATH") - .map(p => List("-Djava.library.path=" + p)) - .getOrElse(Nil) - val workerLocalOpts = Option(getenv("SPARK_JAVA_OPTS")).map(Utils.splitCommandString).getOrElse(Nil) - val userOpts = getAppEnv("SPARK_JAVA_OPTS").map(Utils.splitCommandString).getOrElse(Nil) - val memoryOpts = Seq("-Xms" + memory + "M", "-Xmx" + memory + "M") - - // Figure out our classpath with the external compute-classpath script - val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh" - val classPath = Utils.executeAndGetOutput( - Seq(sparkHome + "/bin/compute-classpath" + ext), - extraEnvironment=appDesc.command.environment) - - Seq("-cp", classPath) ++ libraryOpts ++ workerLocalOpts ++ userOpts ++ memoryOpts - } - - /** Spawn a thread that will redirect a given stream to a file */ - def redirectStream(in: InputStream, file: File) { - val out = new FileOutputStream(file, true) - new Thread("redirect output to " + file) { - override def run() { - try { - Utils.copyStream(in, out, true) - } catch { - case e: IOException => - logInfo("Redirection to " + file + " closed: " + e.getMessage) - } - } - }.start() + def getCommandSeq = { + val command = Command(appDesc.command.mainClass, + appDesc.command.arguments.map(substituteVariables) ++ Seq(appId), appDesc.command.environment) + CommandUtils.buildCommandSeq(command, memory, sparkHome.getAbsolutePath) } /** @@ -155,7 +116,7 @@ private[spark] class ExecutorRunner( } // Launch the process - val command = buildCommandSeq() + val command = getCommandSeq logInfo("Launch command: " + command.mkString("\"", "\" \"", "\"")) val builder = new ProcessBuilder(command: _*).directory(executorDir) val env = builder.environment() @@ -172,11 +133,11 @@ private[spark] class ExecutorRunner( // Redirect its stdout and stderr to files val stdout = new File(executorDir, "stdout") - redirectStream(process.getInputStream, stdout) + CommandUtils.redirectStream(process.getInputStream, stdout) val stderr = new File(executorDir, "stderr") Files.write(header, stderr, Charsets.UTF_8) - redirectStream(process.getErrorStream, stderr) + CommandUtils.redirectStream(process.getErrorStream, stderr) // Wait for it to exit; this is actually a bad thing if it happens, because we expect to run // long-lived processes only. However, in the future, we might restart the executor a few diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index fcaf4e92b1..5182dcbb2a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -26,10 +26,12 @@ import scala.concurrent.duration._ import akka.actor._ import akka.remote.{DisassociatedEvent, RemotingLifecycleEvent} + import org.apache.spark.{Logging, SparkConf, SparkException} import org.apache.spark.deploy.{ExecutorDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ -import org.apache.spark.deploy.master.Master +import org.apache.spark.deploy.master.{DriverState, Master} +import org.apache.spark.deploy.master.DriverState.DriverState import org.apache.spark.deploy.worker.ui.WorkerWebUI import org.apache.spark.metrics.MetricsSystem import org.apache.spark.util.{AkkaUtils, Utils} @@ -44,6 +46,8 @@ private[spark] class Worker( cores: Int, memory: Int, masterUrls: Array[String], + actorSystemName: String, + actorName: String, workDirPath: String = null, val conf: SparkConf) extends Actor with Logging { @@ -55,7 +59,7 @@ private[spark] class Worker( val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs // Send a heartbeat every (heartbeat timeout) / 4 milliseconds - val HEARTBEAT_MILLIS = conf.get("spark.worker.timeout", "60").toLong * 1000 / 4 + val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4 val REGISTRATION_TIMEOUT = 20.seconds val REGISTRATION_RETRIES = 3 @@ -68,6 +72,7 @@ private[spark] class Worker( var masterAddress: Address = null var activeMasterUrl: String = "" var activeMasterWebUiUrl : String = "" + val akkaUrl = "akka.tcp://%s@%s:%s/user/%s".format(actorSystemName, host, port, actorName) @volatile var registered = false @volatile var connected = false val workerId = generateWorkerId() @@ -75,6 +80,9 @@ private[spark] class Worker( var workDir: File = null val executors = new HashMap[String, ExecutorRunner] val finishedExecutors = new HashMap[String, ExecutorRunner] + val drivers = new HashMap[String, DriverRunner] + val finishedDrivers = new HashMap[String, DriverRunner] + val publicAddress = { val envVar = System.getenv("SPARK_PUBLIC_DNS") if (envVar != null) envVar else host @@ -185,7 +193,10 @@ private[spark] class Worker( val execs = executors.values. map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state)) - sender ! WorkerSchedulerStateResponse(workerId, execs.toList) + sender ! WorkerSchedulerStateResponse(workerId, execs.toList, drivers.keys.toSeq) + + case Heartbeat => + logInfo(s"Received heartbeat from driver ${sender.path}") case RegisterWorkerFailed(message) => if (!registered) { @@ -199,7 +210,7 @@ private[spark] class Worker( } else { logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, - self, workerId, host, new File(execSparkHome_), workDir, ExecutorState.RUNNING) + self, workerId, host, new File(execSparkHome_), workDir, akkaUrl, ExecutorState.RUNNING) executors(appId + "/" + execId) = manager manager.start() coresUsed += cores_ @@ -219,8 +230,8 @@ private[spark] class Worker( logInfo("Executor " + fullId + " finished with state " + state + message.map(" message " + _).getOrElse("") + exitStatus.map(" exitStatus " + _).getOrElse("")) - finishedExecutors(fullId) = executor executors -= fullId + finishedExecutors(fullId) = executor coresUsed -= executor.cores memoryUsed -= executor.memory } @@ -239,13 +250,52 @@ private[spark] class Worker( } } + case LaunchDriver(driverId, driverDesc) => { + logInfo(s"Asked to launch driver $driverId") + val driver = new DriverRunner(driverId, workDir, sparkHome, driverDesc, self, akkaUrl) + drivers(driverId) = driver + driver.start() + + coresUsed += driverDesc.cores + memoryUsed += driverDesc.mem + } + + case KillDriver(driverId) => { + logInfo(s"Asked to kill driver $driverId") + drivers.get(driverId) match { + case Some(runner) => + runner.kill() + case None => + logError(s"Asked to kill unknown driver $driverId") + } + } + + case DriverStateChanged(driverId, state, exception) => { + state match { + case DriverState.ERROR => + logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}") + case DriverState.FINISHED => + logInfo(s"Driver $driverId exited successfully") + case DriverState.KILLED => + logInfo(s"Driver $driverId was killed by user") + } + masterLock.synchronized { + master ! DriverStateChanged(driverId, state, exception) + } + val driver = drivers.remove(driverId).get + finishedDrivers(driverId) = driver + memoryUsed -= driver.driverDesc.mem + coresUsed -= driver.driverDesc.cores + } + case x: DisassociatedEvent if x.remoteAddress == masterAddress => logInfo(s"$x Disassociated !") masterDisconnected() case RequestWorkerState => { sender ! WorkerStateResponse(host, port, workerId, executors.values.toList, - finishedExecutors.values.toList, activeMasterUrl, cores, memory, + finishedExecutors.values.toList, drivers.values.toList, + finishedDrivers.values.toList, activeMasterUrl, cores, memory, coresUsed, memoryUsed, activeMasterWebUiUrl) } } @@ -282,10 +332,11 @@ private[spark] object Worker { // The LocalSparkCluster runs multiple local sparkWorkerX actor systems val conf = new SparkConf val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("") + val actorName = "Worker" val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port, conf = conf) actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory, - masterUrls, workDir, conf), name = "Worker") + masterUrls, systemName, actorName, workDir, conf), name = actorName) (actorSystem, boundPort) } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala new file mode 100644 index 0000000000..0e0d0cd626 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerWatcher.scala @@ -0,0 +1,55 @@ +package org.apache.spark.deploy.worker + +import akka.actor.{Actor, Address, AddressFromURIString} +import akka.remote.{AssociatedEvent, AssociationErrorEvent, AssociationEvent, DisassociatedEvent, RemotingLifecycleEvent} + +import org.apache.spark.Logging +import org.apache.spark.deploy.DeployMessages.SendHeartbeat + +/** + * Actor which connects to a worker process and terminates the JVM if the connection is severed. + * Provides fate sharing between a worker and its associated child processes. + */ +private[spark] class WorkerWatcher(workerUrl: String) extends Actor + with Logging { + override def preStart() { + context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) + + logInfo(s"Connecting to worker $workerUrl") + val worker = context.actorSelection(workerUrl) + worker ! SendHeartbeat // need to send a message here to initiate connection + } + + // Used to avoid shutting down JVM during tests + private[deploy] var isShutDown = false + private[deploy] def setTesting(testing: Boolean) = isTesting = testing + private var isTesting = false + + // Lets us filter events only from the worker's actor system + private val expectedHostPort = AddressFromURIString(workerUrl).hostPort + private def isWorker(address: Address) = address.hostPort == expectedHostPort + + def exitNonZero() = if (isTesting) isShutDown = true else System.exit(-1) + + override def receive = { + case AssociatedEvent(localAddress, remoteAddress, inbound) if isWorker(remoteAddress) => + logInfo(s"Successfully connected to $workerUrl") + + case AssociationErrorEvent(cause, localAddress, remoteAddress, inbound) + if isWorker(remoteAddress) => + // These logs may not be seen if the worker (and associated pipe) has died + logError(s"Could not initialize connection to worker $workerUrl. Exiting.") + logError(s"Error was: $cause") + exitNonZero() + + case DisassociatedEvent(localAddress, remoteAddress, inbound) if isWorker(remoteAddress) => + // This log message will never be seen + logError(s"Lost connection to worker actor $workerUrl. Exiting.") + exitNonZero() + + case e: AssociationEvent => + // pass through association events relating to other remote actor systems + + case e => logWarning(s"Received unexpected actor system event: $e") + } +}
\ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala index 0d59048313..925c6fb183 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala @@ -17,24 +17,20 @@ package org.apache.spark.deploy.worker.ui -import javax.servlet.http.HttpServletRequest - -import scala.xml.Node - -import scala.concurrent.duration._ import scala.concurrent.Await +import scala.xml.Node import akka.pattern.ask - +import javax.servlet.http.HttpServletRequest import net.liftweb.json.JsonAST.JValue import org.apache.spark.deploy.JsonProtocol import org.apache.spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse} -import org.apache.spark.deploy.worker.ExecutorRunner +import org.apache.spark.deploy.master.DriverState +import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner} import org.apache.spark.ui.UIUtils import org.apache.spark.util.Utils - private[spark] class IndexPage(parent: WorkerWebUI) { val workerActor = parent.worker.self val worker = parent.worker @@ -56,6 +52,16 @@ private[spark] class IndexPage(parent: WorkerWebUI) { val finishedExecutorTable = UIUtils.listingTable(executorHeaders, executorRow, workerState.finishedExecutors) + val driverHeaders = Seq("DriverID", "Main Class", "State", "Cores", "Memory", "Logs", "Notes") + val runningDrivers = workerState.drivers.sortBy(_.driverId).reverse + val runningDriverTable = UIUtils.listingTable(driverHeaders, driverRow, runningDrivers) + val finishedDrivers = workerState.finishedDrivers.sortBy(_.driverId).reverse + def finishedDriverTable = UIUtils.listingTable(driverHeaders, driverRow, finishedDrivers) + + // For now we only show driver information if the user has submitted drivers to the cluster. + // This is until we integrate the notion of drivers and applications in the UI. + def hasDrivers = runningDrivers.length > 0 || finishedDrivers.length > 0 + val content = <div class="row-fluid"> <!-- Worker Details --> <div class="span12"> @@ -79,11 +85,33 @@ private[spark] class IndexPage(parent: WorkerWebUI) { </div> </div> + <div> + {if (hasDrivers) + <div class="row-fluid"> <!-- Running Drivers --> + <div class="span12"> + <h4> Running Drivers {workerState.drivers.size} </h4> + {runningDriverTable} + </div> + </div> + } + </div> + <div class="row-fluid"> <!-- Finished Executors --> <div class="span12"> <h4> Finished Executors </h4> {finishedExecutorTable} </div> + </div> + + <div> + {if (hasDrivers) + <div class="row-fluid"> <!-- Finished Drivers --> + <div class="span12"> + <h4> Finished Drivers </h4> + {finishedDriverTable} + </div> + </div> + } </div>; UIUtils.basicSparkPage(content, "Spark Worker at %s:%s".format( @@ -111,6 +139,27 @@ private[spark] class IndexPage(parent: WorkerWebUI) { .format(executor.appId, executor.execId)}>stderr</a> </td> </tr> + } + def driverRow(driver: DriverRunner): Seq[Node] = { + <tr> + <td>{driver.driverId}</td> + <td>{driver.driverDesc.command.arguments(1)}</td> + <td>{driver.finalState.getOrElse(DriverState.RUNNING)}</td> + <td sorttable_customkey={driver.driverDesc.cores.toString}> + {driver.driverDesc.cores.toString} + </td> + <td sorttable_customkey={driver.driverDesc.mem.toString}> + {Utils.megabytesToString(driver.driverDesc.mem)} + </td> + <td> + <a href={s"logPage?driverId=${driver.driverId}&logType=stdout"}>stdout</a> + <a href={s"logPage?driverId=${driver.driverId}&logType=stderr"}>stderr</a> + </td> + <td> + {driver.finalException.getOrElse("")} + </td> + </tr> + } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala index c382034c99..8daa47b2b2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala @@ -69,30 +69,48 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I def log(request: HttpServletRequest): String = { val defaultBytes = 100 * 1024 - val appId = request.getParameter("appId") - val executorId = request.getParameter("executorId") + + val appId = Option(request.getParameter("appId")) + val executorId = Option(request.getParameter("executorId")) + val driverId = Option(request.getParameter("driverId")) val logType = request.getParameter("logType") val offset = Option(request.getParameter("offset")).map(_.toLong) val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes) - val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType) + + val path = (appId, executorId, driverId) match { + case (Some(a), Some(e), None) => + s"${workDir.getPath}/$appId/$executorId/$logType" + case (None, None, Some(d)) => + s"${workDir.getPath}/$driverId/$logType" + case _ => + throw new Exception("Request must specify either application or driver identifiers") + } val (startByte, endByte) = getByteRange(path, offset, byteLength) val file = new File(path) val logLength = file.length - val pre = "==== Bytes %s-%s of %s of %s/%s/%s ====\n" - .format(startByte, endByte, logLength, appId, executorId, logType) + val pre = s"==== Bytes $startByte-$endByte of $logLength of $path ====\n" pre + Utils.offsetBytes(path, startByte, endByte) } def logPage(request: HttpServletRequest): Seq[scala.xml.Node] = { val defaultBytes = 100 * 1024 - val appId = request.getParameter("appId") - val executorId = request.getParameter("executorId") + val appId = Option(request.getParameter("appId")) + val executorId = Option(request.getParameter("executorId")) + val driverId = Option(request.getParameter("driverId")) val logType = request.getParameter("logType") val offset = Option(request.getParameter("offset")).map(_.toLong) val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes) - val path = "%s/%s/%s/%s".format(workDir.getPath, appId, executorId, logType) + + val (path, params) = (appId, executorId, driverId) match { + case (Some(a), Some(e), None) => + (s"${workDir.getPath}/$a/$e/$logType", s"appId=$a&executorId=$e") + case (None, None, Some(d)) => + (s"${workDir.getPath}/$d/$logType", s"driverId=$d") + case _ => + throw new Exception("Request must specify either application or driver identifiers") + } val (startByte, endByte) = getByteRange(path, offset, byteLength) val file = new File(path) @@ -106,9 +124,8 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I val backButton = if (startByte > 0) { - <a href={"?appId=%s&executorId=%s&logType=%s&offset=%s&byteLength=%s" - .format(appId, executorId, logType, math.max(startByte-byteLength, 0), - byteLength)}> + <a href={"?%s&logType=%s&offset=%s&byteLength=%s" + .format(params, logType, math.max(startByte-byteLength, 0), byteLength)}> <button type="button" class="btn btn-default"> Previous {Utils.bytesToString(math.min(byteLength, startByte))} </button> @@ -122,8 +139,8 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I val nextButton = if (endByte < logLength) { - <a href={"?appId=%s&executorId=%s&logType=%s&offset=%s&byteLength=%s". - format(appId, executorId, logType, endByte, byteLength)}> + <a href={"?%s&logType=%s&offset=%s&byteLength=%s". + format(params, logType, endByte, byteLength)}> <button type="button" class="btn btn-default"> Next {Utils.bytesToString(math.min(byteLength, logLength-endByte))} </button> diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 53a2b94a52..f9e43e0e94 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -24,8 +24,9 @@ import akka.remote._ import org.apache.spark.{SparkConf, SparkContext, Logging} import org.apache.spark.TaskState.TaskState +import org.apache.spark.deploy.worker.WorkerWatcher import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ -import org.apache.spark.util.{Utils, AkkaUtils} +import org.apache.spark.util.{AkkaUtils, Utils} private[spark] class CoarseGrainedExecutorBackend( driverUrl: String, @@ -91,7 +92,8 @@ private[spark] class CoarseGrainedExecutorBackend( } private[spark] object CoarseGrainedExecutorBackend { - def run(driverUrl: String, executorId: String, hostname: String, cores: Int) { + def run(driverUrl: String, executorId: String, hostname: String, cores: Int, + workerUrl: Option[String]) { // Debug code Utils.checkHost(hostname) @@ -105,17 +107,24 @@ private[spark] object CoarseGrainedExecutorBackend { actorSystem.actorOf( Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores), name = "Executor") + workerUrl.foreach{ url => + actorSystem.actorOf(Props(classOf[WorkerWatcher], url), name = "WorkerWatcher") + } actorSystem.awaitTermination() } def main(args: Array[String]) { - if (args.length < 4) { - //the reason we allow the last appid argument is to make it easy to kill rogue executors - System.err.println( - "Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> <cores> " + - "[<appid>]") - System.exit(1) + args.length match { + case x if x < 4 => + System.err.println( + // Worker url is used in spark standalone mode to enforce fate-sharing with worker + "Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> " + + "<cores> [<workerUrl>]") + System.exit(1) + case 4 => + run(args(0), args(1), args(2), args(3).toInt, None) + case x if x > 4 => + run(args(0), args(1), args(2), args(3).toInt, Some(args(4))) } - run(args(0), args(1), args(2), args(3).toInt) } } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 3c92c205ea..e51d274d33 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -141,11 +141,6 @@ private[spark] class Executor( val tr = runningTasks.get(taskId) if (tr != null) { tr.kill() - // We remove the task also in the finally block in TaskRunner.run. - // The reason we need to remove it here is because killTask might be called before the task - // is even launched, and never reaching that finally block. ConcurrentHashMap's remove is - // idempotent. - runningTasks.remove(taskId) } } @@ -167,6 +162,8 @@ private[spark] class Executor( class TaskRunner(execBackend: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) extends Runnable { + object TaskKilledException extends Exception + @volatile private var killed = false @volatile private var task: Task[Any] = _ @@ -200,9 +197,11 @@ private[spark] class Executor( // If this task has been killed before we deserialized it, let's quit now. Otherwise, // continue executing the task. if (killed) { - logInfo("Executor killed task " + taskId) - execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled)) - return + // Throw an exception rather than returning, because returning within a try{} block + // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl + // exception will be caught by the catch block, leading to an incorrect ExceptionFailure + // for the task. + throw TaskKilledException } attemptedTask = Some(task) @@ -216,9 +215,7 @@ private[spark] class Executor( // If the task has been killed, let's fail it. if (task.killed) { - logInfo("Executor killed task " + taskId) - execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled)) - return + throw TaskKilledException } val resultSer = SparkEnv.get.serializer.newInstance() @@ -260,6 +257,11 @@ private[spark] class Executor( execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) } + case TaskKilledException => { + logInfo("Executor killed task " + taskId) + execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled)) + } + case t: Throwable => { val serviceTime = (System.currentTimeMillis() - taskStart).toInt val metrics = attemptedTask.flatMap(t => t.metrics) diff --git a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala index a1e98845f6..5980177320 100644 --- a/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala +++ b/core/src/main/scala/org/apache/spark/io/CompressionCodec.scala @@ -71,7 +71,7 @@ class LZFCompressionCodec(conf: SparkConf) extends CompressionCodec { class SnappyCompressionCodec(conf: SparkConf) extends CompressionCodec { override def compressedOutputStream(s: OutputStream): OutputStream = { - val blockSize = conf.get("spark.io.compression.snappy.block.size", "32768").toInt + val blockSize = conf.getInt("spark.io.compression.snappy.block.size", 32768) new SnappyOutputStream(s, blockSize) } diff --git a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala index 46c40d0a2a..e6e01783c8 100644 --- a/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala +++ b/core/src/main/scala/org/apache/spark/network/ConnectionManager.scala @@ -54,22 +54,22 @@ private[spark] class ConnectionManager(port: Int, conf: SparkConf) extends Loggi private val selector = SelectorProvider.provider.openSelector() private val handleMessageExecutor = new ThreadPoolExecutor( - conf.get("spark.core.connection.handler.threads.min", "20").toInt, - conf.get("spark.core.connection.handler.threads.max", "60").toInt, - conf.get("spark.core.connection.handler.threads.keepalive", "60").toInt, TimeUnit.SECONDS, + conf.getInt("spark.core.connection.handler.threads.min", 20), + conf.getInt("spark.core.connection.handler.threads.max", 60), + conf.getInt("spark.core.connection.handler.threads.keepalive", 60), TimeUnit.SECONDS, new LinkedBlockingDeque[Runnable]()) private val handleReadWriteExecutor = new ThreadPoolExecutor( - conf.get("spark.core.connection.io.threads.min", "4").toInt, - conf.get("spark.core.connection.io.threads.max", "32").toInt, - conf.get("spark.core.connection.io.threads.keepalive", "60").toInt, TimeUnit.SECONDS, + conf.getInt("spark.core.connection.io.threads.min", 4), + conf.getInt("spark.core.connection.io.threads.max", 32), + conf.getInt("spark.core.connection.io.threads.keepalive", 60), TimeUnit.SECONDS, new LinkedBlockingDeque[Runnable]()) // Use a different, yet smaller, thread pool - infrequently used with very short lived tasks : which should be executed asap private val handleConnectExecutor = new ThreadPoolExecutor( - conf.get("spark.core.connection.connect.threads.min", "1").toInt, - conf.get("spark.core.connection.connect.threads.max", "8").toInt, - conf.get("spark.core.connection.connect.threads.keepalive", "60").toInt, TimeUnit.SECONDS, + conf.getInt("spark.core.connection.connect.threads.min", 1), + conf.getInt("spark.core.connection.connect.threads.max", 8), + conf.getInt("spark.core.connection.connect.threads.keepalive", 60), TimeUnit.SECONDS, new LinkedBlockingDeque[Runnable]()) private val serverChannel = ServerSocketChannel.open() diff --git a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala index b729eb11c5..d87157e12c 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/ShuffleCopier.scala @@ -36,7 +36,7 @@ private[spark] class ShuffleCopier(conf: SparkConf) extends Logging { resultCollectCallback: (BlockId, Long, ByteBuf) => Unit) { val handler = new ShuffleCopier.ShuffleClientHandler(resultCollectCallback) - val connectTimeout = conf.get("spark.shuffle.netty.connect.timeout", "60000").toInt + val connectTimeout = conf.getInt("spark.shuffle.netty.connect.timeout", 60000) val fc = new FileClient(handler, connectTimeout) try { diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala index 6d4f46125f..83109d1a6f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala @@ -97,7 +97,7 @@ private[spark] object CheckpointRDD extends Logging { throw new IOException("Checkpoint failed: temporary path " + tempOutputPath + " already exists") } - val bufferSize = env.conf.get("spark.buffer.size", "65536").toInt + val bufferSize = env.conf.getInt("spark.buffer.size", 65536) val fileOutputStream = if (blockSize < 0) { fs.create(tempOutputPath, false, bufferSize) @@ -131,7 +131,7 @@ private[spark] object CheckpointRDD extends Logging { ): Iterator[T] = { val env = SparkEnv.get val fs = path.getFileSystem(broadcastedConf.value.value) - val bufferSize = env.conf.get("spark.buffer.size", "65536").toInt + val bufferSize = env.conf.getInt("spark.buffer.size", 65536) val fileInputStream = fs.open(path, bufferSize) val serializer = env.serializer.newInstance() val deserializeStream = serializer.deserializeStream(fileInputStream) diff --git a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala index 2662d48c84..73d15b9082 100644 --- a/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala @@ -76,7 +76,7 @@ class NewHadoopRDD[K, V]( val split = theSplit.asInstanceOf[NewHadoopPartition] logInfo("Input split: " + split.serializableHadoopSplit) val conf = confBroadcast.value.value - val attemptId = newTaskAttemptID(jobtrackerId, id, true, split.index, 0) + val attemptId = newTaskAttemptID(jobtrackerId, id, isMap = true, split.index, 0) val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId) val format = inputFormatClass.newInstance if (format.isInstanceOf[Configurable]) { diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 04a8d05988..c118ddfc01 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -18,35 +18,34 @@ package org.apache.spark.rdd import java.nio.ByteBuffer -import java.util.Date import java.text.SimpleDateFormat +import java.util.Date import java.util.{HashMap => JHashMap} -import scala.collection.{mutable, Map} +import scala.collection.Map +import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConversions._ import scala.reflect.{ClassTag, classTag} -import org.apache.hadoop.mapred._ -import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.SequenceFile.CompressionType -import org.apache.hadoop.mapred.FileOutputFormat -import org.apache.hadoop.mapred.OutputFormat +import org.apache.hadoop.io.compress.CompressionCodec +import org.apache.hadoop.mapred.{FileOutputCommitter, FileOutputFormat, JobConf, OutputFormat} import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} -import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat} -import org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil import org.apache.hadoop.mapreduce.{Job => NewAPIHadoopJob} import org.apache.hadoop.mapreduce.{RecordWriter => NewRecordWriter} +import org.apache.hadoop.mapreduce.lib.output.{FileOutputFormat => NewFileOutputFormat} import com.clearspring.analytics.stream.cardinality.HyperLogLog +// SparkHadoopWriter and SparkHadoopMapReduceUtil are actually source files defined in Spark. +import org.apache.hadoop.mapred.SparkHadoopWriter +import org.apache.hadoop.mapreduce.SparkHadoopMapReduceUtil import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.spark.partial.{BoundedDouble, PartialResult} -import org.apache.spark.Aggregator -import org.apache.spark.Partitioner import org.apache.spark.Partitioner.defaultPartitioner import org.apache.spark.util.SerializableHyperLogLog @@ -120,9 +119,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) } /** - * Merge the values for each key using an associative function and a neutral "zero value" which may - * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for - * list concatenation, 0 for addition, or 1 for multiplication.). + * Merge the values for each key using an associative function and a neutral "zero value" which + * may be added to the result an arbitrary number of times, and must not change the result + * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.). */ def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = { // Serialize the zero value to a byte array so that we can get a new clone of it on each key @@ -138,18 +137,18 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) } /** - * Merge the values for each key using an associative function and a neutral "zero value" which may - * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for - * list concatenation, 0 for addition, or 1 for multiplication.). + * Merge the values for each key using an associative function and a neutral "zero value" which + * may be added to the result an arbitrary number of times, and must not change the result + * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.). */ def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)] = { foldByKey(zeroValue, new HashPartitioner(numPartitions))(func) } /** - * Merge the values for each key using an associative function and a neutral "zero value" which may - * be added to the result an arbitrary number of times, and must not change the result (e.g., Nil for - * list concatenation, 0 for addition, or 1 for multiplication.). + * Merge the values for each key using an associative function and a neutral "zero value" which + * may be added to the result an arbitrary number of times, and must not change the result + * (e.g., Nil for list concatenation, 0 for addition, or 1 for multiplication.). */ def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = { foldByKey(zeroValue, defaultPartitioner(self))(func) @@ -226,7 +225,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) } /** - * Return approximate number of distinct values for each key in this RDD. + * Return approximate number of distinct values for each key in this RDD. * The accuracy of approximation can be controlled through the relative standard deviation * (relativeSD) parameter, which also controls the amount of memory used. Lower values result in * more accurate counts but increase the memory footprint and vise versa. HashPartitions the @@ -579,7 +578,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) */ def saveAsHadoopFile[F <: OutputFormat[K, V]]( path: String, codec: Class[_ <: CompressionCodec]) (implicit fm: ClassTag[F]) { - saveAsHadoopFile(path, getKeyClass, getValueClass, fm.runtimeClass.asInstanceOf[Class[F]], codec) + val runtimeClass = fm.runtimeClass + saveAsHadoopFile(path, getKeyClass, getValueClass, runtimeClass.asInstanceOf[Class[F]], codec) } /** @@ -599,7 +599,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: NewOutputFormat[_, _]], - conf: Configuration = self.context.hadoopConfiguration) { + conf: Configuration = self.context.hadoopConfiguration) + { val job = new NewAPIHadoopJob(conf) job.setOutputKeyClass(keyClass) job.setOutputValueClass(valueClass) @@ -613,7 +614,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) // around by taking a mod. We expect that no task will be attempted 2 billion times. val attemptNumber = (context.attemptId % Int.MaxValue).toInt /* "reduce task" <split #> <attempt # = spark task #> */ - val attemptId = newTaskAttemptID(jobtrackerID, stageId, false, context.partitionId, attemptNumber) + val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId, + attemptNumber) val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId) val format = outputFormatClass.newInstance val committer = format.getOutputCommitter(hadoopContext) @@ -632,13 +634,12 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * however we're only going to use this local OutputCommitter for * setupJob/commitJob, so we just use a dummy "map" task. */ - val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, true, 0, 0) + val jobAttemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = true, 0, 0) val jobTaskContext = newTaskAttemptContext(wrappedConf.value, jobAttemptId) val jobCommitter = jobFormat.getOutputCommitter(jobTaskContext) jobCommitter.setupJob(jobTaskContext) val count = self.context.runJob(self, writeShard _).sum jobCommitter.commitJob(jobTaskContext) - jobCommitter.cleanupJob(jobTaskContext) } /** @@ -668,7 +669,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) codec: Option[Class[_ <: CompressionCodec]] = None) { conf.setOutputKeyClass(keyClass) conf.setOutputValueClass(valueClass) - // conf.setOutputFormat(outputFormatClass) // Doesn't work in Scala 2.9 due to what may be a generics bug + // Doesn't work in Scala 2.9 due to what may be a generics bug + // TODO: Should we uncomment this for Scala 2.10? + // conf.setOutputFormat(outputFormatClass) conf.set("mapred.output.format.class", outputFormatClass.getName) for (c <- codec) { conf.setCompressMapOutput(true) @@ -702,7 +705,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) throw new SparkException("Output value class not set") } - logInfo("Saving as hadoop file of type (" + keyClass.getSimpleName+ ", " + valueClass.getSimpleName+ ")") + logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " + + valueClass.getSimpleName+ ")") val writer = new SparkHadoopWriter(conf) writer.preSetup() @@ -728,7 +732,6 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) self.context.runJob(self, writeToFile _) writer.commitJob() - writer.cleanup() } /** diff --git a/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala new file mode 100644 index 0000000000..4c625d062e --- /dev/null +++ b/core/src/main/scala/org/apache/spark/rdd/PartitionerAwareUnionRDD.scala @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.rdd + +import scala.reflect.ClassTag +import java.io.{ObjectOutputStream, IOException} +import org.apache.spark.{TaskContext, OneToOneDependency, SparkContext, Partition} + + +/** + * Class representing partitions of PartitionerAwareUnionRDD, which maintains the list of corresponding partitions + * of parent RDDs. + */ +private[spark] +class PartitionerAwareUnionRDDPartition( + @transient val rdds: Seq[RDD[_]], + val idx: Int + ) extends Partition { + var parents = rdds.map(_.partitions(idx)).toArray + + override val index = idx + override def hashCode(): Int = idx + + @throws(classOf[IOException]) + private def writeObject(oos: ObjectOutputStream) { + // Update the reference to parent partition at the time of task serialization + parents = rdds.map(_.partitions(index)).toArray + oos.defaultWriteObject() + } +} + +/** + * Class representing an RDD that can take multiple RDDs partitioned by the same partitioner and + * unify them into a single RDD while preserving the partitioner. So m RDDs with p partitions each + * will be unified to a single RDD with p partitions and the same partitioner. The preferred + * location for each partition of the unified RDD will be the most common preferred location + * of the corresponding partitions of the parent RDDs. For example, location of partition 0 + * of the unified RDD will be where most of partition 0 of the parent RDDs are located. + */ +private[spark] +class PartitionerAwareUnionRDD[T: ClassTag]( + sc: SparkContext, + var rdds: Seq[RDD[T]] + ) extends RDD[T](sc, rdds.map(x => new OneToOneDependency(x))) { + require(rdds.length > 0) + require(rdds.flatMap(_.partitioner).toSet.size == 1, + "Parent RDDs have different partitioners: " + rdds.flatMap(_.partitioner)) + + override val partitioner = rdds.head.partitioner + + override def getPartitions: Array[Partition] = { + val numPartitions = partitioner.get.numPartitions + (0 until numPartitions).map(index => { + new PartitionerAwareUnionRDDPartition(rdds, index) + }).toArray + } + + // Get the location where most of the partitions of parent RDDs are located + override def getPreferredLocations(s: Partition): Seq[String] = { + logDebug("Finding preferred location for " + this + ", partition " + s.index) + val parentPartitions = s.asInstanceOf[PartitionerAwareUnionRDDPartition].parents + val locations = rdds.zip(parentPartitions).flatMap { + case (rdd, part) => { + val parentLocations = currPrefLocs(rdd, part) + logDebug("Location of " + rdd + " partition " + part.index + " = " + parentLocations) + parentLocations + } + } + val location = if (locations.isEmpty) { + None + } else { + // Find the location that maximum number of parent partitions prefer + Some(locations.groupBy(x => x).maxBy(_._2.length)._1) + } + logDebug("Selected location for " + this + ", partition " + s.index + " = " + location) + location.toSeq + } + + override def compute(s: Partition, context: TaskContext): Iterator[T] = { + val parentPartitions = s.asInstanceOf[PartitionerAwareUnionRDDPartition].parents + rdds.zip(parentPartitions).iterator.flatMap { + case (rdd, p) => rdd.iterator(p, context) + } + } + + override def clearDependencies() { + super.clearDependencies() + rdds = null + } + + // Get the *current* preferred locations from the DAGScheduler (as opposed to the static ones) + private def currPrefLocs(rdd: RDD[_], part: Partition): Seq[String] = { + rdd.context.getPreferredLocs(rdd, part.index).map(tl => tl.host) + } +} diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 6a7b0f8a86..f9dc12eee3 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -23,7 +23,6 @@ import scala.collection.Map import scala.collection.JavaConversions.mapAsScalaMap import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.HashMap import scala.reflect.{classTag, ClassTag} import org.apache.hadoop.io.BytesWritable @@ -52,11 +51,13 @@ import org.apache.spark._ * partitioned collection of elements that can be operated on in parallel. This class contains the * basic operations available on all RDDs, such as `map`, `filter`, and `persist`. In addition, * [[org.apache.spark.rdd.PairRDDFunctions]] contains operations available only on RDDs of key-value - * pairs, such as `groupByKey` and `join`; [[org.apache.spark.rdd.DoubleRDDFunctions]] contains - * operations available only on RDDs of Doubles; and [[org.apache.spark.rdd.SequenceFileRDDFunctions]] - * contains operations available on RDDs that can be saved as SequenceFiles. These operations are - * automatically available on any RDD of the right type (e.g. RDD[(Int, Int)] through implicit - * conversions when you `import org.apache.spark.SparkContext._`. + * pairs, such as `groupByKey` and `join`; + * [[org.apache.spark.rdd.DoubleRDDFunctions]] contains operations available only on RDDs of + * Doubles; and + * [[org.apache.spark.rdd.SequenceFileRDDFunctions]] contains operations available on RDDs that + * can be saved as SequenceFiles. + * These operations are automatically available on any RDD of the right type (e.g. RDD[(Int, Int)] + * through implicit conversions when you `import org.apache.spark.SparkContext._`. * * Internally, each RDD is characterized by five main properties: * @@ -235,12 +236,9 @@ abstract class RDD[T: ClassTag]( /** * Compute an RDD partition or read it from a checkpoint if the RDD is checkpointing. */ - private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = { - if (isCheckpointed) { - firstParent[T].iterator(split, context) - } else { - compute(split, context) - } + private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] = + { + if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context) } // Transformations (return a new RDD) @@ -268,6 +266,9 @@ abstract class RDD[T: ClassTag]( def distinct(numPartitions: Int): RDD[T] = map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1) + /** + * Return a new RDD containing the distinct elements in this RDD. + */ def distinct(): RDD[T] = distinct(partitions.size) /** @@ -280,7 +281,7 @@ abstract class RDD[T: ClassTag]( * which can avoid performing a shuffle. */ def repartition(numPartitions: Int): RDD[T] = { - coalesce(numPartitions, true) + coalesce(numPartitions, shuffle = true) } /** @@ -646,7 +647,8 @@ abstract class RDD[T: ClassTag]( } /** - * Reduces the elements of this RDD using the specified commutative and associative binary operator. + * Reduces the elements of this RDD using the specified commutative and + * associative binary operator. */ def reduce(f: (T, T) => T): T = { val cleanF = sc.clean(f) @@ -953,7 +955,7 @@ abstract class RDD[T: ClassTag]( private var storageLevel: StorageLevel = StorageLevel.NONE /** Record user function generating this RDD. */ - @transient private[spark] val origin = Utils.formatSparkCallSite + @transient private[spark] val origin = sc.getCallSite() private[spark] def elementClassTag: ClassTag[T] = classTag[T] diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala index 642dabaad5..bc688110f4 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDDCheckpointData.scala @@ -40,7 +40,7 @@ private[spark] object CheckpointState extends Enumeration { * manages the post-checkpoint state by providing the updated partitions, iterator and preferred locations * of the checkpointed RDD. */ -private[spark] class RDDCheckpointData[T: ClassTag](rdd: RDD[T]) +private[spark] class RDDCheckpointData[T: ClassTag](@transient rdd: RDD[T]) extends Logging with Serializable { import CheckpointState._ diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala index e22b1e53e8..c52d6175d2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala @@ -31,7 +31,7 @@ import org.apache.spark.util.Utils private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedulerImpl) extends Logging { - private val THREADS = sparkEnv.conf.get("spark.resultGetter.threads", "4").toInt + private val THREADS = sparkEnv.conf.getInt("spark.resultGetter.threads", 4) private val getTaskResultExecutor = Utils.newDaemonFixedThreadPool( THREADS, "Result resolver thread") diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index d94b706854..d4f74d3e18 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -51,15 +51,15 @@ private[spark] class TaskSchedulerImpl( isLocal: Boolean = false) extends TaskScheduler with Logging { - def this(sc: SparkContext) = this(sc, sc.conf.get("spark.task.maxFailures", "4").toInt) + def this(sc: SparkContext) = this(sc, sc.conf.getInt("spark.task.maxFailures", 4)) val conf = sc.conf // How often to check for speculative tasks - val SPECULATION_INTERVAL = conf.get("spark.speculation.interval", "100").toLong + val SPECULATION_INTERVAL = conf.getLong("spark.speculation.interval", 100) // Threshold above which we warn user initial TaskSet may be starved - val STARVATION_TIMEOUT = conf.get("spark.starvation.timeout", "15000").toLong + val STARVATION_TIMEOUT = conf.getLong("spark.starvation.timeout", 15000) // TaskSetManagers are not thread safe, so any access to one should be synchronized // on this class. @@ -125,7 +125,7 @@ private[spark] class TaskSchedulerImpl( override def start() { backend.start() - if (!isLocal && conf.get("spark.speculation", "false").toBoolean) { + if (!isLocal && conf.getBoolean("spark.speculation", false)) { logInfo("Starting speculative execution thread") import sc.env.actorSystem.dispatcher sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds, @@ -285,7 +285,8 @@ private[spark] class TaskSchedulerImpl( } } case None => - logInfo("Ignoring update from TID " + tid + " because its task set is gone") + logInfo("Ignoring update with state %s from TID %s because its task set is gone" + .format(state, tid)) } } catch { case e: Exception => logError("Exception in statusUpdate", e) @@ -328,7 +329,7 @@ private[spark] class TaskSchedulerImpl( // Have each task set throw a SparkException with the error for ((taskSetId, manager) <- activeTaskSets) { try { - manager.error(message) + manager.abort(message) } catch { case e: Exception => logError("Exception in error callback", e) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 67ad99a4d7..a10e5397ad 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -57,11 +57,11 @@ private[spark] class TaskSetManager( val conf = sched.sc.conf // CPUs to request per task - val CPUS_PER_TASK = conf.get("spark.task.cpus", "1").toInt + val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1) // Quantile of tasks at which to start speculation - val SPECULATION_QUANTILE = conf.get("spark.speculation.quantile", "0.75").toDouble - val SPECULATION_MULTIPLIER = conf.get("spark.speculation.multiplier", "1.5").toDouble + val SPECULATION_QUANTILE = conf.getDouble("spark.speculation.quantile", 0.75) + val SPECULATION_MULTIPLIER = conf.getDouble("spark.speculation.multiplier", 1.5) // Serializer for closures and tasks. val env = SparkEnv.get @@ -116,7 +116,7 @@ private[spark] class TaskSetManager( // How frequently to reprint duplicate exceptions in full, in milliseconds val EXCEPTION_PRINT_INTERVAL = - conf.get("spark.logging.exceptionPrintInterval", "10000").toLong + conf.getLong("spark.logging.exceptionPrintInterval", 10000) // Map of recent exceptions (identified by string representation and top stack frame) to // duplicate count (how many times the same exception has appeared) and time the full exception @@ -548,11 +548,6 @@ private[spark] class TaskSetManager( } } - def error(message: String) { - // Save the error message - abort("Error: " + message) - } - def abort(message: String) { // TODO: Kill running tasks if we were not terminated due to a Mesos error sched.dagScheduler.taskSetFailed(taskSet, message) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 2f5bcafe40..8d596a76c2 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -63,7 +63,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) // Periodically revive offers to allow delay scheduling to work - val reviveInterval = conf.get("spark.scheduler.revive.interval", "1000").toLong + val reviveInterval = conf.getLong("spark.scheduler.revive.interval", 1000) import context.dispatcher context.system.scheduler.schedule(0.millis, reviveInterval.millis, self, ReviveOffers) } @@ -209,8 +209,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A } override def defaultParallelism(): Int = { - conf.getOption("spark.default.parallelism").map(_.toInt).getOrElse( - math.max(totalCoreCount.get(), 2)) + conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2)) } // Called by subclasses when notified of a lost worker diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala index b44d1e43c8..d99c76117c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala @@ -33,7 +33,7 @@ private[spark] class SimrSchedulerBackend( val tmpPath = new Path(driverFilePath + "_tmp") val filePath = new Path(driverFilePath) - val maxCores = conf.get("spark.simr.executor.cores", "1").toInt + val maxCores = conf.getInt("spark.simr.executor.cores", 1) override def start() { super.start() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 9858717d13..faa6e1ebe8 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -20,7 +20,7 @@ package org.apache.spark.scheduler.cluster import scala.collection.mutable.HashMap import org.apache.spark.{Logging, SparkContext} -import org.apache.spark.deploy.client.{Client, ClientListener} +import org.apache.spark.deploy.client.{AppClient, AppClientListener} import org.apache.spark.deploy.{Command, ApplicationDescription} import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl} import org.apache.spark.util.Utils @@ -31,14 +31,14 @@ private[spark] class SparkDeploySchedulerBackend( masters: Array[String], appName: String) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) - with ClientListener + with AppClientListener with Logging { - var client: Client = null + var client: AppClient = null var stopping = false var shutdownCallback : (SparkDeploySchedulerBackend) => Unit = _ - val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt + val maxCores = conf.getOption("spark.cores.max").map(_.toInt) override def start() { super.start() @@ -47,14 +47,14 @@ private[spark] class SparkDeploySchedulerBackend( val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( conf.get("spark.driver.host"), conf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) - val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}") + val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}") val command = Command( "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs) val sparkHome = sc.getSparkHome().getOrElse(null) val appDesc = new ApplicationDescription(appName, maxCores, sc.executorMemory, command, sparkHome, "http://" + sc.ui.appUIAddress) - client = new Client(sc.env.actorSystem, masters, appDesc, this, conf) + client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 08811520cf..e16d60c54c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -77,7 +77,7 @@ private[spark] class CoarseMesosSchedulerBackend( "Spark home is not set; set it through the spark.home system " + "property, the SPARK_HOME environment variable or the SparkContext constructor")) - val extraCoresPerSlave = conf.get("spark.mesos.extra.cores", "0").toInt + val extraCoresPerSlave = conf.getInt("spark.mesos.extra.cores", 0) var nextMesosTaskId = 0 @@ -127,7 +127,7 @@ private[spark] class CoarseMesosSchedulerBackend( CoarseGrainedSchedulerBackend.ACTOR_NAME) val uri = conf.get("spark.executor.uri", null) if (uri == null) { - val runScript = new File(sparkHome, "spark-class").getCanonicalPath + val runScript = new File(sparkHome, "./bin/spark-class").getCanonicalPath command.setValue( "\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d".format( runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)) @@ -136,7 +136,7 @@ private[spark] class CoarseMesosSchedulerBackend( // glob the directory "correctly". val basename = uri.split('/').last.split('.').head command.setValue( - "cd %s*; ./spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d" + "cd %s*; ./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d" .format(basename, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)) command.addUris(CommandInfo.URI.newBuilder().setValue(uri)) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index bb278fb155..b428c82a48 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -102,12 +102,12 @@ private[spark] class MesosSchedulerBackend( .setEnvironment(environment) val uri = sc.conf.get("spark.executor.uri", null) if (uri == null) { - command.setValue(new File(sparkHome, "spark-executor").getCanonicalPath) + command.setValue(new File(sparkHome, "/sbin/spark-executor").getCanonicalPath) } else { // Grab everything to the first '.'. We'll use that and '*' to // glob the directory "correctly". val basename = uri.split('/').last.split('.').head - command.setValue("cd %s*; ./spark-executor".format(basename)) + command.setValue("cd %s*; ./sbin/spark-executor".format(basename)) command.addUris(CommandInfo.URI.newBuilder().setValue(uri)) } val memory = Resource.newBuilder() @@ -340,5 +340,5 @@ private[spark] class MesosSchedulerBackend( } // TODO: query Mesos for number of cores - override def defaultParallelism() = sc.conf.get("spark.default.parallelism", "8").toInt + override def defaultParallelism() = sc.conf.getInt("spark.default.parallelism", 8) } diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index a24a3b04b8..c14cd47556 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -36,7 +36,7 @@ import org.apache.spark.storage.{GetBlock, GotBlock, PutBlock} */ class KryoSerializer(conf: SparkConf) extends org.apache.spark.serializer.Serializer with Logging { private val bufferSize = { - conf.get("spark.kryoserializer.buffer.mb", "2").toInt * 1024 * 1024 + conf.getInt("spark.kryoserializer.buffer.mb", 2) * 1024 * 1024 } def newKryoOutput() = new KryoOutput(bufferSize) @@ -48,7 +48,7 @@ class KryoSerializer(conf: SparkConf) extends org.apache.spark.serializer.Serial // Allow disabling Kryo reference tracking if user knows their object graphs don't have loops. // Do this before we invoke the user registrator so the user registrator can override this. - kryo.setReferences(conf.get("spark.kryo.referenceTracking", "true").toBoolean) + kryo.setReferences(conf.getBoolean("spark.kryo.referenceTracking", true)) for (cls <- KryoSerializer.toRegister) kryo.register(cls) diff --git a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala index 160cca4d6c..9a5e3cb77e 100644 --- a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala @@ -29,6 +29,9 @@ import org.apache.spark.util.{NextIterator, ByteBufferInputStream} * A serializer. Because some serialization libraries are not thread safe, this class is used to * create [[org.apache.spark.serializer.SerializerInstance]] objects that do the actual serialization and are * guaranteed to only be called from one thread at a time. + * + * Implementations of this trait should have a zero-arg constructor or a constructor that accepts a + * [[org.apache.spark.SparkConf]] as parameter. If both constructors are defined, the latter takes precedence. */ trait Serializer { def newInstance(): SerializerInstance diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala index 22465272f3..36a37af4f8 100644 --- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala +++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala @@ -27,6 +27,7 @@ import org.apache.spark.SparkConf * creating a new one. */ private[spark] class SerializerManager { + // TODO: Consider moving this into SparkConf itself to remove the global singleton. private val serializers = new ConcurrentHashMap[String, Serializer] private var _default: Serializer = _ @@ -53,8 +54,18 @@ private[spark] class SerializerManager { if (serializer == null) { val clsLoader = Thread.currentThread.getContextClassLoader val cls = Class.forName(clsName, true, clsLoader) - val constructor = cls.getConstructor(classOf[SparkConf]) - serializer = constructor.newInstance(conf).asInstanceOf[Serializer] + + // First try with the constructor that takes SparkConf. If we can't find one, + // use a no-arg constructor instead. + try { + val constructor = cls.getConstructor(classOf[SparkConf]) + serializer = constructor.newInstance(conf).asInstanceOf[Serializer] + } catch { + case _: NoSuchMethodException => + val constructor = cls.getConstructor() + serializer = constructor.newInstance().asInstanceOf[Serializer] + } + serializers.put(clsName, serializer) } serializer diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala index 47478631a1..4fa2ab96d9 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala @@ -327,7 +327,7 @@ object BlockFetcherIterator { fetchRequestsSync.put(request) } - copiers = startCopiers(conf.get("spark.shuffle.copier.threads", "6").toInt) + copiers = startCopiers(conf.getInt("spark.shuffle.copier.threads", 6)) logInfo("Started " + fetchRequestsSync.size + " remote gets in " + Utils.getUsedTimeMs(startTime)) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 6d2cda97b0..c56e2ca2df 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -58,8 +58,8 @@ private[spark] class BlockManager( // If we use Netty for shuffle, start a new Netty-based shuffle sender service. private val nettyPort: Int = { - val useNetty = conf.get("spark.shuffle.use.netty", "false").toBoolean - val nettyPortConfig = conf.get("spark.shuffle.sender.port", "0").toInt + val useNetty = conf.getBoolean("spark.shuffle.use.netty", false) + val nettyPortConfig = conf.getInt("spark.shuffle.sender.port", 0) if (useNetty) diskBlockManager.startShuffleBlockSender(nettyPortConfig) else 0 } @@ -72,14 +72,14 @@ private[spark] class BlockManager( // Max megabytes of data to keep in flight per reducer (to avoid over-allocating memory // for receiving shuffle outputs) val maxBytesInFlight = - conf.get("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024 + conf.getLong("spark.reducer.maxMbInFlight", 48) * 1024 * 1024 // Whether to compress broadcast variables that are stored - val compressBroadcast = conf.get("spark.broadcast.compress", "true").toBoolean + val compressBroadcast = conf.getBoolean("spark.broadcast.compress", true) // Whether to compress shuffle output that are stored - val compressShuffle = conf.get("spark.shuffle.compress", "true").toBoolean + val compressShuffle = conf.getBoolean("spark.shuffle.compress", true) // Whether to compress RDD partitions that are stored serialized - val compressRdds = conf.get("spark.rdd.compress", "false").toBoolean + val compressRdds = conf.getBoolean("spark.rdd.compress", false) val heartBeatFrequency = BlockManager.getHeartBeatFrequency(conf) @@ -443,7 +443,7 @@ private[spark] class BlockManager( : BlockFetcherIterator = { val iter = - if (conf.get("spark.shuffle.use.netty", "false").toBoolean) { + if (conf.getBoolean("spark.shuffle.use.netty", false)) { new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer) } else { new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer) @@ -469,7 +469,7 @@ private[spark] class BlockManager( def getDiskWriter(blockId: BlockId, file: File, serializer: Serializer, bufferSize: Int) : BlockObjectWriter = { val compressStream: OutputStream => OutputStream = wrapForCompression(blockId, _) - val syncWrites = conf.get("spark.shuffle.sync", "false").toBoolean + val syncWrites = conf.getBoolean("spark.shuffle.sync", false) new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream, syncWrites) } @@ -864,15 +864,15 @@ private[spark] object BlockManager extends Logging { val ID_GENERATOR = new IdGenerator def getMaxMemory(conf: SparkConf): Long = { - val memoryFraction = conf.get("spark.storage.memoryFraction", "0.66").toDouble + val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.66) (Runtime.getRuntime.maxMemory * memoryFraction).toLong } def getHeartBeatFrequency(conf: SparkConf): Long = - conf.get("spark.storage.blockManagerTimeoutIntervalMs", "60000").toLong / 4 + conf.getLong("spark.storage.blockManagerTimeoutIntervalMs", 60000) / 4 def getDisableHeartBeatsForTesting(conf: SparkConf): Boolean = - conf.get("spark.test.disableBlockManagerHeartBeat", "false").toBoolean + conf.getBoolean("spark.test.disableBlockManagerHeartBeat", false) /** * Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index b5afe8cd23..c54e4f2664 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -28,11 +28,10 @@ import org.apache.spark.storage.BlockManagerMessages._ import org.apache.spark.util.AkkaUtils private[spark] -class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection], - conf: SparkConf) extends Logging { +class BlockManagerMaster(var driverActor : ActorRef, conf: SparkConf) extends Logging { - val AKKA_RETRY_ATTEMPTS: Int = conf.get("spark.akka.num.retries", "3").toInt - val AKKA_RETRY_INTERVAL_MS: Int = conf.get("spark.akka.retry.wait", "3000").toInt + val AKKA_RETRY_ATTEMPTS: Int = conf.getInt("spark.akka.num.retries", 3) + val AKKA_RETRY_INTERVAL_MS: Int = conf.getInt("spark.akka.retry.wait", 3000) val DRIVER_AKKA_ACTOR_NAME = "BlockManagerMaster" @@ -159,10 +158,7 @@ class BlockManagerMaster(var driverActor : Either[ActorRef, ActorSelection], while (attempts < AKKA_RETRY_ATTEMPTS) { attempts += 1 try { - val future = driverActor match { - case Left(a: ActorRef) => a.ask(message)(timeout) - case Right(b: ActorSelection) => b.ask(message)(timeout) - } + val future = driverActor.ask(message)(timeout) val result = Await.result(future, timeout) if (result == null) { throw new SparkException("BlockManagerMaster returned null") diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala index 58452d9657..2c1a4e2f5d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala @@ -348,14 +348,19 @@ object BlockManagerMasterActor { if (storageLevel.isValid) { // isValid means it is either stored in-memory or on-disk. - _blocks.put(blockId, BlockStatus(storageLevel, memSize, diskSize)) + // But the memSize here indicates the data size in or dropped from memory, + // and the diskSize here indicates the data size in or dropped to disk. + // They can be both larger than 0, when a block is dropped from memory to disk. + // Therefore, a safe way to set BlockStatus is to set its info in accurate modes. if (storageLevel.useMemory) { + _blocks.put(blockId, BlockStatus(storageLevel, memSize, 0)) _remainingMem -= memSize logInfo("Added %s in memory on %s (size: %s, free: %s)".format( blockId, blockManagerId.hostPort, Utils.bytesToString(memSize), Utils.bytesToString(_remainingMem))) } if (storageLevel.useDisk) { + _blocks.put(blockId, BlockStatus(storageLevel, 0, diskSize)) logInfo("Added %s on disk on %s (size: %s)".format( blockId, blockManagerId.hostPort, Utils.bytesToString(diskSize))) } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 55dcb3742c..edc1133172 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -38,7 +38,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD extends PathResolver with Logging { private val MAX_DIR_CREATION_ATTEMPTS: Int = 10 - private val subDirsPerLocalDir = shuffleManager.conf.get("spark.diskStore.subDirectories", "64").toInt + private val subDirsPerLocalDir = shuffleManager.conf.getInt("spark.diskStore.subDirectories", 64) // Create one local directory for each path mentioned in spark.local.dir; then, inside this // directory, create multiple subdirectories that we will hash files into, in order to avoid diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala index 39dc7bb19a..e2b24298a5 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockManager.scala @@ -64,9 +64,9 @@ class ShuffleBlockManager(blockManager: BlockManager) { // Turning off shuffle file consolidation causes all shuffle Blocks to get their own file. // TODO: Remove this once the shuffle file consolidation feature is stable. val consolidateShuffleFiles = - conf.get("spark.shuffle.consolidateFiles", "false").toBoolean + conf.getBoolean("spark.shuffle.consolidateFiles", false) - private val bufferSize = conf.get("spark.shuffle.file.buffer.kb", "100").toInt * 1024 + private val bufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024 /** * Contains all the state related to a particular shuffle. This includes a pool of unused diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala index dca98c6c05..729ba2c550 100644 --- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala +++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala @@ -95,7 +95,7 @@ private[spark] object ThreadingTest { val conf = new SparkConf() val serializer = new KryoSerializer(conf) val blockManagerMaster = new BlockManagerMaster( - Left(actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf)))), conf) + actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf))), conf) val blockManager = new BlockManager( "<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024, conf) val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i)) diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala index 58d47a201d..6ba15187d9 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala @@ -27,7 +27,7 @@ import org.apache.spark.scheduler.SchedulingMode /** * Continuously generates jobs that expose various features of the WebUI (internal testing tool). * - * Usage: ./run spark.ui.UIWorkloadGenerator [master] + * Usage: ./bin/spark-class org.apache.spark.ui.UIWorkloadGenerator [master] [FIFO|FAIR] */ private[spark] object UIWorkloadGenerator { @@ -36,7 +36,7 @@ private[spark] object UIWorkloadGenerator { def main(args: Array[String]) { if (args.length < 2) { - println("usage: ./spark-class org.apache.spark.ui.UIWorkloadGenerator [master] [FIFO|FAIR]") + println("usage: ./bin/spark-class org.apache.spark.ui.UIWorkloadGenerator [master] [FIFO|FAIR]") System.exit(1) } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala index b7b87250b9..bcd2824450 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressListener.scala @@ -33,7 +33,7 @@ import org.apache.spark.scheduler._ */ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkListener { // How many stages to remember - val RETAINED_STAGES = sc.conf.get("spark.ui.retained_stages", "1000").toInt + val RETAINED_STAGES = sc.conf.getInt("spark.ui.retainedStages", 1000) val DEFAULT_POOL_NAME = "default" val stageIdToPool = new HashMap[Int, String]() diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 362cea5e3e..761d378c7f 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -17,10 +17,13 @@ package org.apache.spark.util +import scala.collection.JavaConversions.mapAsJavaMap import scala.concurrent.duration.{Duration, FiniteDuration} import akka.actor.{ActorSystem, ExtendedActorSystem, IndestructibleActorSystem} import com.typesafe.config.ConfigFactory + +import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf /** @@ -41,21 +44,29 @@ private[spark] object AkkaUtils { def createActorSystem(name: String, host: String, port: Int, indestructible: Boolean = false, conf: SparkConf): (ActorSystem, Int) = { - val akkaThreads = conf.get("spark.akka.threads", "4").toInt - val akkaBatchSize = conf.get("spark.akka.batchSize", "15").toInt + val akkaThreads = conf.getInt("spark.akka.threads", 4) + val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15) + + val akkaTimeout = conf.getInt("spark.akka.timeout", 100) - val akkaTimeout = conf.get("spark.akka.timeout", "100").toInt + val akkaFrameSize = conf.getInt("spark.akka.frameSize", 10) + val akkaLogLifecycleEvents = conf.getBoolean("spark.akka.logLifecycleEvents", false) + val lifecycleEvents = if (akkaLogLifecycleEvents) "on" else "off" + if (!akkaLogLifecycleEvents) { + // As a workaround for Akka issue #3787, we coerce the "EndpointWriter" log to be silent. + // See: https://www.assembla.com/spaces/akka/tickets/3787#/ + Option(Logger.getLogger("akka.remote.EndpointWriter")).map(l => l.setLevel(Level.FATAL)) + } - val akkaFrameSize = conf.get("spark.akka.frameSize", "10").toInt - val lifecycleEvents = - if (conf.get("spark.akka.logLifecycleEvents", "false").toBoolean) "on" else "off" + val logAkkaConfig = if (conf.getBoolean("spark.akka.logAkkaConfig", false)) "on" else "off" - val akkaHeartBeatPauses = conf.get("spark.akka.heartbeat.pauses", "600").toInt + val akkaHeartBeatPauses = conf.getInt("spark.akka.heartbeat.pauses", 600) val akkaFailureDetector = - conf.get("spark.akka.failure-detector.threshold", "300.0").toDouble - val akkaHeartBeatInterval = conf.get("spark.akka.heartbeat.interval", "1000").toInt + conf.getDouble("spark.akka.failure-detector.threshold", 300.0) + val akkaHeartBeatInterval = conf.getInt("spark.akka.heartbeat.interval", 1000) - val akkaConf = ConfigFactory.parseString( + val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]).withFallback( + ConfigFactory.parseString( s""" |akka.daemonic = on |akka.loggers = [""akka.event.slf4j.Slf4jLogger""] @@ -73,8 +84,11 @@ private[spark] object AkkaUtils { |akka.remote.netty.tcp.maximum-frame-size = ${akkaFrameSize}MiB |akka.remote.netty.tcp.execution-pool-size = $akkaThreads |akka.actor.default-dispatcher.throughput = $akkaBatchSize + |akka.log-config-on-start = $logAkkaConfig |akka.remote.log-remote-lifecycle-events = $lifecycleEvents - """.stripMargin) + |akka.log-dead-letters = $lifecycleEvents + |akka.log-dead-letters-during-shutdown = $lifecycleEvents + """.stripMargin)) val actorSystem = if (indestructible) { IndestructibleActorSystem(name, akkaConf) @@ -89,6 +103,11 @@ private[spark] object AkkaUtils { /** Returns the default Spark timeout to use for Akka ask operations. */ def askTimeout(conf: SparkConf): FiniteDuration = { - Duration.create(conf.get("spark.akka.askTimeout", "30").toLong, "seconds") + Duration.create(conf.getLong("spark.akka.askTimeout", 30), "seconds") + } + + /** Returns the default Spark timeout to use for Akka remote actor lookup. */ + def lookupTimeout(conf: SparkConf): FiniteDuration = { + Duration.create(conf.get("spark.akka.lookupTimeout", "30").toLong, "seconds") } } diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala index aa7f52cafb..3d1e90a352 100644 --- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala @@ -74,7 +74,7 @@ object MetadataCleanerType extends Enumeration { // initialization of StreamingContext. It's okay for users trying to configure stuff themselves. object MetadataCleaner { def getDelaySeconds(conf: SparkConf) = { - conf.get("spark.cleaner.ttl", "3500").toInt + conf.getInt("spark.cleaner.ttl", 3500) } def getDelaySeconds(conf: SparkConf, cleanerType: MetadataCleanerType.MetadataCleanerType): Int = diff --git a/core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala b/core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala index e9907e6c85..08b31ac64f 100644 --- a/core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala +++ b/core/src/main/scala/org/apache/spark/util/XORShiftRandom.scala @@ -91,4 +91,4 @@ private[spark] object XORShiftRandom { } -}
\ No newline at end of file +} |