diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/SparkContext.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/SparkContext.scala | 993 |
1 files changed, 993 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala new file mode 100644 index 0000000000..faf0c2362a --- /dev/null +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -0,0 +1,993 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.io._ +import java.net.URI +import java.util.Properties +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.Map +import scala.collection.generic.Growable +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.HashMap +import scala.util.DynamicVariable + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.io.ArrayWritable +import org.apache.hadoop.io.BooleanWritable +import org.apache.hadoop.io.BytesWritable +import org.apache.hadoop.io.DoubleWritable +import org.apache.hadoop.io.FloatWritable +import org.apache.hadoop.io.IntWritable +import org.apache.hadoop.io.LongWritable +import org.apache.hadoop.io.NullWritable +import org.apache.hadoop.io.Text +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapred.FileInputFormat +import org.apache.hadoop.mapred.InputFormat +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapred.SequenceFileInputFormat +import org.apache.hadoop.mapred.TextInputFormat +import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} +import org.apache.hadoop.mapreduce.{Job => NewHadoopJob} +import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} + +import org.apache.mesos.MesosNativeLibrary + +import org.apache.spark.deploy.LocalSparkCluster +import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} +import org.apache.spark.rdd._ +import org.apache.spark.scheduler._ +import org.apache.spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, + ClusterScheduler, Schedulable, SchedulingMode} +import org.apache.spark.scheduler.local.LocalScheduler +import org.apache.spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} +import org.apache.spark.storage.{StorageUtils, BlockManagerSource} +import org.apache.spark.ui.SparkUI +import org.apache.spark.util.{ClosureCleaner, Utils, MetadataCleaner, TimeStampedHashMap} +import org.apache.spark.scheduler.StageInfo +import org.apache.spark.storage.RDDInfo +import org.apache.spark.storage.StorageStatus + +/** + * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark + * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. + * + * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). + * @param appName A name for your application, to display on the cluster web UI. + * @param sparkHome Location where Spark is installed on cluster nodes. + * @param jars Collection of JARs to send to the cluster. These can be paths on the local file + * system or HDFS, HTTP, HTTPS, or FTP URLs. + * @param environment Environment variables to set on worker nodes. + */ +class SparkContext( + val master: String, + val appName: String, + val sparkHome: String = null, + val jars: Seq[String] = Nil, + val environment: Map[String, String] = Map(), + // This is used only by yarn for now, but should be relevant to other cluster types (mesos, etc) too. + // This is typically generated from InputFormatInfo.computePreferredLocations .. host, set of data-local splits on host + val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = scala.collection.immutable.Map()) + extends Logging { + + // Ensure logging is initialized before we spawn any threads + initLogging() + + // Set Spark driver host and port system properties + if (System.getProperty("spark.driver.host") == null) { + System.setProperty("spark.driver.host", Utils.localHostName()) + } + if (System.getProperty("spark.driver.port") == null) { + System.setProperty("spark.driver.port", "0") + } + + val isLocal = (master == "local" || master.startsWith("local[")) + + // Create the Spark execution environment (cache, map output tracker, etc) + private[spark] val env = SparkEnv.createFromSystemProperties( + "<driver>", + System.getProperty("spark.driver.host"), + System.getProperty("spark.driver.port").toInt, + true, + isLocal) + SparkEnv.set(env) + + // Used to store a URL for each static file/jar together with the file's local timestamp + private[spark] val addedFiles = HashMap[String, Long]() + private[spark] val addedJars = HashMap[String, Long]() + + // Keeps track of all persisted RDDs + private[spark] val persistentRdds = new TimeStampedHashMap[Int, RDD[_]] + private[spark] val metadataCleaner = new MetadataCleaner("SparkContext", this.cleanup) + + // Initalize the Spark UI + private[spark] val ui = new SparkUI(this) + ui.bind() + + val startTime = System.currentTimeMillis() + + // Add each JAR given through the constructor + if (jars != null) { + jars.foreach { addJar(_) } + } + + // Environment variables to pass to our executors + private[spark] val executorEnvs = HashMap[String, String]() + // Note: SPARK_MEM is included for Mesos, but overwritten for standalone mode in ExecutorRunner + for (key <- Seq("SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS", "SPARK_TESTING")) { + val value = System.getenv(key) + if (value != null) { + executorEnvs(key) = value + } + } + // Since memory can be set with a system property too, use that + executorEnvs("SPARK_MEM") = SparkContext.executorMemoryRequested + "m" + if (environment != null) { + executorEnvs ++= environment + } + + // Create and start the scheduler + private var taskScheduler: TaskScheduler = { + // Regular expression used for local[N] master format + val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r + // Regular expression for local[N, maxRetries], used in tests with failing tasks + val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+)\s*,\s*([0-9]+)\]""".r + // Regular expression for simulating a Spark cluster of [N, cores, memory] locally + val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r + // Regular expression for connecting to Spark deploy clusters + val SPARK_REGEX = """(spark://.*)""".r + //Regular expression for connection to Mesos cluster + val MESOS_REGEX = """(mesos://.*)""".r + + master match { + case "local" => + new LocalScheduler(1, 0, this) + + case LOCAL_N_REGEX(threads) => + new LocalScheduler(threads.toInt, 0, this) + + case LOCAL_N_FAILURES_REGEX(threads, maxFailures) => + new LocalScheduler(threads.toInt, maxFailures.toInt, this) + + case SPARK_REGEX(sparkUrl) => + val scheduler = new ClusterScheduler(this) + val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName) + scheduler.initialize(backend) + scheduler + + case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) => + // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang. + val memoryPerSlaveInt = memoryPerSlave.toInt + if (SparkContext.executorMemoryRequested > memoryPerSlaveInt) { + throw new SparkException( + "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format( + memoryPerSlaveInt, SparkContext.executorMemoryRequested)) + } + + val scheduler = new ClusterScheduler(this) + val localCluster = new LocalSparkCluster( + numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt) + val sparkUrl = localCluster.start() + val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName) + scheduler.initialize(backend) + backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => { + localCluster.stop() + } + scheduler + + case "yarn-standalone" => + val scheduler = try { + val clazz = Class.forName("spark.scheduler.cluster.YarnClusterScheduler") + val cons = clazz.getConstructor(classOf[SparkContext]) + cons.newInstance(this).asInstanceOf[ClusterScheduler] + } catch { + // TODO: Enumerate the exact reasons why it can fail + // But irrespective of it, it means we cannot proceed ! + case th: Throwable => { + throw new SparkException("YARN mode not available ?", th) + } + } + val backend = new StandaloneSchedulerBackend(scheduler, this.env.actorSystem) + scheduler.initialize(backend) + scheduler + + case _ => + if (MESOS_REGEX.findFirstIn(master).isEmpty) { + logWarning("Master %s does not match expected format, parsing as Mesos URL".format(master)) + } + MesosNativeLibrary.load() + val scheduler = new ClusterScheduler(this) + val coarseGrained = System.getProperty("spark.mesos.coarse", "false").toBoolean + val masterWithoutProtocol = master.replaceFirst("^mesos://", "") // Strip initial mesos:// + val backend = if (coarseGrained) { + new CoarseMesosSchedulerBackend(scheduler, this, masterWithoutProtocol, appName) + } else { + new MesosSchedulerBackend(scheduler, this, masterWithoutProtocol, appName) + } + scheduler.initialize(backend) + scheduler + } + } + taskScheduler.start() + + @volatile private var dagScheduler = new DAGScheduler(taskScheduler) + dagScheduler.start() + + ui.start() + + /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */ + val hadoopConfiguration = { + val env = SparkEnv.get + val conf = env.hadoop.newConfiguration() + // Explicitly check for S3 environment variables + if (System.getenv("AWS_ACCESS_KEY_ID") != null && System.getenv("AWS_SECRET_ACCESS_KEY") != null) { + conf.set("fs.s3.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID")) + conf.set("fs.s3n.awsAccessKeyId", System.getenv("AWS_ACCESS_KEY_ID")) + conf.set("fs.s3.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY")) + conf.set("fs.s3n.awsSecretAccessKey", System.getenv("AWS_SECRET_ACCESS_KEY")) + } + // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar" + for (key <- System.getProperties.toMap[String, String].keys if key.startsWith("spark.hadoop.")) { + conf.set(key.substring("spark.hadoop.".length), System.getProperty(key)) + } + val bufferSize = System.getProperty("spark.buffer.size", "65536") + conf.set("io.file.buffer.size", bufferSize) + conf + } + + private[spark] var checkpointDir: Option[String] = None + + // Thread Local variable that can be used by users to pass information down the stack + private val localProperties = new DynamicVariable[Properties](null) + + def initLocalProperties() { + localProperties.value = new Properties() + } + + def setLocalProperty(key: String, value: String) { + if (localProperties.value == null) { + localProperties.value = new Properties() + } + if (value == null) { + localProperties.value.remove(key) + } else { + localProperties.value.setProperty(key, value) + } + } + + /** Set a human readable description of the current job. */ + def setJobDescription(value: String) { + setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, value) + } + + // Post init + taskScheduler.postStartHook() + + val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler) + val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager) + + def initDriverMetrics() { + SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource) + SparkEnv.get.metricsSystem.registerSource(blockManagerSource) + } + + initDriverMetrics() + + // Methods for creating RDDs + + /** Distribute a local Scala collection to form an RDD. */ + def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = { + new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]()) + } + + /** Distribute a local Scala collection to form an RDD. */ + def makeRDD[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = { + parallelize(seq, numSlices) + } + + /** Distribute a local Scala collection to form an RDD, with one or more + * location preferences (hostnames of Spark nodes) for each object. + * Create a new partition for each collection item. */ + def makeRDD[T: ClassManifest](seq: Seq[(T, Seq[String])]): RDD[T] = { + val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap + new ParallelCollectionRDD[T](this, seq.map(_._1), seq.size, indexToPrefs) + } + + /** + * Read a text file from HDFS, a local file system (available on all nodes), or any + * Hadoop-supported file system URI, and return it as an RDD of Strings. + */ + def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = { + hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minSplits) + .map(pair => pair._2.toString) + } + + /** + * Get an RDD for a Hadoop-readable dataset from a Hadoop JobConf giving its InputFormat and any + * other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable, + * etc). + */ + def hadoopRDD[K, V]( + conf: JobConf, + inputFormatClass: Class[_ <: InputFormat[K, V]], + keyClass: Class[K], + valueClass: Class[V], + minSplits: Int = defaultMinSplits + ): RDD[(K, V)] = { + new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) + } + + /** Get an RDD for a Hadoop file with an arbitrary InputFormat */ + def hadoopFile[K, V]( + path: String, + inputFormatClass: Class[_ <: InputFormat[K, V]], + keyClass: Class[K], + valueClass: Class[V], + minSplits: Int = defaultMinSplits + ) : RDD[(K, V)] = { + val conf = new JobConf(hadoopConfiguration) + FileInputFormat.setInputPaths(conf, path) + new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) + } + + /** + * Smarter version of hadoopFile() that uses class manifests to figure out the classes of keys, + * values and the InputFormat so that users don't need to pass them directly. Instead, callers + * can just write, for example, + * {{{ + * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path, minSplits) + * }}} + */ + def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minSplits: Int) + (implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F]) + : RDD[(K, V)] = { + hadoopFile(path, + fm.erasure.asInstanceOf[Class[F]], + km.erasure.asInstanceOf[Class[K]], + vm.erasure.asInstanceOf[Class[V]], + minSplits) + } + + /** + * Smarter version of hadoopFile() that uses class manifests to figure out the classes of keys, + * values and the InputFormat so that users don't need to pass them directly. Instead, callers + * can just write, for example, + * {{{ + * val file = sparkContext.hadoopFile[LongWritable, Text, TextInputFormat](path) + * }}} + */ + def hadoopFile[K, V, F <: InputFormat[K, V]](path: String) + (implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F]): RDD[(K, V)] = + hadoopFile[K, V, F](path, defaultMinSplits) + + /** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */ + def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](path: String) + (implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F]): RDD[(K, V)] = { + newAPIHadoopFile( + path, + fm.erasure.asInstanceOf[Class[F]], + km.erasure.asInstanceOf[Class[K]], + vm.erasure.asInstanceOf[Class[V]]) + } + + /** + * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat + * and extra configuration options to pass to the input format. + */ + def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]( + path: String, + fClass: Class[F], + kClass: Class[K], + vClass: Class[V], + conf: Configuration = hadoopConfiguration): RDD[(K, V)] = { + val job = new NewHadoopJob(conf) + NewFileInputFormat.addInputPath(job, new Path(path)) + val updatedConf = job.getConfiguration + new NewHadoopRDD(this, fClass, kClass, vClass, updatedConf) + } + + /** + * Get an RDD for a given Hadoop file with an arbitrary new API InputFormat + * and extra configuration options to pass to the input format. + */ + def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]]( + conf: Configuration = hadoopConfiguration, + fClass: Class[F], + kClass: Class[K], + vClass: Class[V]): RDD[(K, V)] = { + new NewHadoopRDD(this, fClass, kClass, vClass, conf) + } + + /** Get an RDD for a Hadoop SequenceFile with given key and value types. */ + def sequenceFile[K, V](path: String, + keyClass: Class[K], + valueClass: Class[V], + minSplits: Int + ): RDD[(K, V)] = { + val inputFormatClass = classOf[SequenceFileInputFormat[K, V]] + hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits) + } + + /** Get an RDD for a Hadoop SequenceFile with given key and value types. */ + def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)] = + sequenceFile(path, keyClass, valueClass, defaultMinSplits) + + /** + * Version of sequenceFile() for types implicitly convertible to Writables through a + * WritableConverter. For example, to access a SequenceFile where the keys are Text and the + * values are IntWritable, you could simply write + * {{{ + * sparkContext.sequenceFile[String, Int](path, ...) + * }}} + * + * WritableConverters are provided in a somewhat strange way (by an implicit function) to support + * both subclasses of Writable and types for which we define a converter (e.g. Int to + * IntWritable). The most natural thing would've been to have implicit objects for the + * converters, but then we couldn't have an object for every subclass of Writable (you can't + * have a parameterized singleton object). We use functions instead to create a new converter + * for the appropriate type. In addition, we pass the converter a ClassManifest of its type to + * allow it to figure out the Writable class to use in the subclass case. + */ + def sequenceFile[K, V](path: String, minSplits: Int = defaultMinSplits) + (implicit km: ClassManifest[K], vm: ClassManifest[V], + kcf: () => WritableConverter[K], vcf: () => WritableConverter[V]) + : RDD[(K, V)] = { + val kc = kcf() + val vc = vcf() + val format = classOf[SequenceFileInputFormat[Writable, Writable]] + val writables = hadoopFile(path, format, + kc.writableClass(km).asInstanceOf[Class[Writable]], + vc.writableClass(vm).asInstanceOf[Class[Writable]], minSplits) + writables.map{case (k,v) => (kc.convert(k), vc.convert(v))} + } + + /** + * Load an RDD saved as a SequenceFile containing serialized objects, with NullWritable keys and + * BytesWritable values that contain a serialized partition. This is still an experimental storage + * format and may not be supported exactly as is in future Spark releases. It will also be pretty + * slow if you use the default serializer (Java serialization), though the nice thing about it is + * that there's very little effort required to save arbitrary objects. + */ + def objectFile[T: ClassManifest]( + path: String, + minSplits: Int = defaultMinSplits + ): RDD[T] = { + sequenceFile(path, classOf[NullWritable], classOf[BytesWritable], minSplits) + .flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes)) + } + + + protected[spark] def checkpointFile[T: ClassManifest]( + path: String + ): RDD[T] = { + new CheckpointRDD[T](this, path) + } + + /** Build the union of a list of RDDs. */ + def union[T: ClassManifest](rdds: Seq[RDD[T]]): RDD[T] = new UnionRDD(this, rdds) + + /** Build the union of a list of RDDs passed as variable-length arguments. */ + def union[T: ClassManifest](first: RDD[T], rest: RDD[T]*): RDD[T] = + new UnionRDD(this, Seq(first) ++ rest) + + // Methods for creating shared variables + + /** + * Create an [[org.apache.spark.Accumulator]] variable of a given type, which tasks can "add" values + * to using the `+=` method. Only the driver can access the accumulator's `value`. + */ + def accumulator[T](initialValue: T)(implicit param: AccumulatorParam[T]) = + new Accumulator(initialValue, param) + + /** + * Create an [[org.apache.spark.Accumulable]] shared variable, to which tasks can add values with `+=`. + * Only the driver can access the accumuable's `value`. + * @tparam T accumulator type + * @tparam R type that can be added to the accumulator + */ + def accumulable[T, R](initialValue: T)(implicit param: AccumulableParam[T, R]) = + new Accumulable(initialValue, param) + + /** + * Create an accumulator from a "mutable collection" type. + * + * Growable and TraversableOnce are the standard APIs that guarantee += and ++=, implemented by + * standard mutable collections. So you can use this with mutable Map, Set, etc. + */ + def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable, T](initialValue: R) = { + val param = new GrowableAccumulableParam[R,T] + new Accumulable(initialValue, param) + } + + /** + * Broadcast a read-only variable to the cluster, returning a [[org.apache.spark.broadcast.Broadcast]] object for + * reading it in distributed functions. The variable will be sent to each cluster only once. + */ + def broadcast[T](value: T) = env.broadcastManager.newBroadcast[T](value, isLocal) + + /** + * Add a file to be downloaded with this Spark job on every node. + * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported + * filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs, + * use `SparkFiles.get(path)` to find its download location. + */ + def addFile(path: String) { + val uri = new URI(path) + val key = uri.getScheme match { + case null | "file" => env.httpFileServer.addFile(new File(uri.getPath)) + case _ => path + } + addedFiles(key) = System.currentTimeMillis + + // Fetch the file locally in case a job is executed locally. + // Jobs that run through LocalScheduler will already fetch the required dependencies, + // but jobs run in DAGScheduler.runLocally() will not so we must fetch the files here. + Utils.fetchFile(path, new File(SparkFiles.getRootDirectory)) + + logInfo("Added file " + path + " at " + key + " with timestamp " + addedFiles(key)) + } + + def addSparkListener(listener: SparkListener) { + dagScheduler.addSparkListener(listener) + } + + /** + * Return a map from the slave to the max memory available for caching and the remaining + * memory available for caching. + */ + def getExecutorMemoryStatus: Map[String, (Long, Long)] = { + env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) => + (blockManagerId.host + ":" + blockManagerId.port, mem) + } + } + + /** + * Return information about what RDDs are cached, if they are in mem or on disk, how much space + * they take, etc. + */ + def getRDDStorageInfo: Array[RDDInfo] = { + StorageUtils.rddInfoFromStorageStatus(getExecutorStorageStatus, this) + } + + /** + * Returns an immutable map of RDDs that have marked themselves as persistent via cache() call. + * Note that this does not necessarily mean the caching or computation was successful. + */ + def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap + + def getStageInfo: Map[Stage,StageInfo] = { + dagScheduler.stageToInfos + } + + /** + * Return information about blocks stored in all of the slaves + */ + def getExecutorStorageStatus: Array[StorageStatus] = { + env.blockManager.master.getStorageStatus + } + + /** + * Return pools for fair scheduler + * TODO(xiajunluan): We should take nested pools into account + */ + def getAllPools: ArrayBuffer[Schedulable] = { + taskScheduler.rootPool.schedulableQueue + } + + /** + * Return the pool associated with the given name, if one exists + */ + def getPoolForName(pool: String): Option[Schedulable] = { + taskScheduler.rootPool.schedulableNameToSchedulable.get(pool) + } + + /** + * Return current scheduling mode + */ + def getSchedulingMode: SchedulingMode.SchedulingMode = { + taskScheduler.schedulingMode + } + + /** + * Clear the job's list of files added by `addFile` so that they do not get downloaded to + * any new nodes. + */ + def clearFiles() { + addedFiles.clear() + } + + /** + * Gets the locality information associated with the partition in a particular rdd + * @param rdd of interest + * @param partition to be looked up for locality + * @return list of preferred locations for the partition + */ + private [spark] def getPreferredLocs(rdd: RDD[_], partition: Int): Seq[TaskLocation] = { + dagScheduler.getPreferredLocs(rdd, partition) + } + + /** + * Adds a JAR dependency for all tasks to be executed on this SparkContext in the future. + * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported + * filesystems), or an HTTP, HTTPS or FTP URI. + */ + def addJar(path: String) { + if (null == path) { + logWarning("null specified as parameter to addJar", + new SparkException("null specified as parameter to addJar")) + } else { + val env = SparkEnv.get + val uri = new URI(path) + val key = uri.getScheme match { + case null | "file" => + if (env.hadoop.isYarnMode()) { + logWarning("local jar specified as parameter to addJar under Yarn mode") + return + } + env.httpFileServer.addJar(new File(uri.getPath)) + case _ => path + } + addedJars(key) = System.currentTimeMillis + logInfo("Added JAR " + path + " at " + key + " with timestamp " + addedJars(key)) + } + } + + /** + * Clear the job's list of JARs added by `addJar` so that they do not get downloaded to + * any new nodes. + */ + def clearJars() { + addedJars.clear() + } + + /** Shut down the SparkContext. */ + def stop() { + ui.stop() + // Do this only if not stopped already - best case effort. + // prevent NPE if stopped more than once. + val dagSchedulerCopy = dagScheduler + dagScheduler = null + if (dagSchedulerCopy != null) { + metadataCleaner.cancel() + dagSchedulerCopy.stop() + taskScheduler = null + // TODO: Cache.stop()? + env.stop() + // Clean up locally linked files + clearFiles() + clearJars() + SparkEnv.set(null) + ShuffleMapTask.clearCache() + ResultTask.clearCache() + logInfo("Successfully stopped SparkContext") + } else { + logInfo("SparkContext already stopped") + } + } + + + /** + * Get Spark's home location from either a value set through the constructor, + * or the spark.home Java property, or the SPARK_HOME environment variable + * (in that order of preference). If neither of these is set, return None. + */ + private[spark] def getSparkHome(): Option[String] = { + if (sparkHome != null) { + Some(sparkHome) + } else if (System.getProperty("spark.home") != null) { + Some(System.getProperty("spark.home")) + } else if (System.getenv("SPARK_HOME") != null) { + Some(System.getenv("SPARK_HOME")) + } else { + None + } + } + + /** + * Run a function on a given set of partitions in an RDD and pass the results to the given + * handler function. This is the main entry point for all actions in Spark. The allowLocal + * flag specifies whether the scheduler can run the computation on the driver rather than + * shipping it out to the cluster, for short actions like first(). + */ + def runJob[T, U: ClassManifest]( + rdd: RDD[T], + func: (TaskContext, Iterator[T]) => U, + partitions: Seq[Int], + allowLocal: Boolean, + resultHandler: (Int, U) => Unit) { + val callSite = Utils.formatSparkCallSite + logInfo("Starting job: " + callSite) + val start = System.nanoTime + val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler, localProperties.value) + logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s") + rdd.doCheckpoint() + result + } + + /** + * Run a function on a given set of partitions in an RDD and return the results as an array. The + * allowLocal flag specifies whether the scheduler can run the computation on the driver rather + * than shipping it out to the cluster, for short actions like first(). + */ + def runJob[T, U: ClassManifest]( + rdd: RDD[T], + func: (TaskContext, Iterator[T]) => U, + partitions: Seq[Int], + allowLocal: Boolean + ): Array[U] = { + val results = new Array[U](partitions.size) + runJob[T, U](rdd, func, partitions, allowLocal, (index, res) => results(index) = res) + results + } + + /** + * Run a job on a given set of partitions of an RDD, but take a function of type + * `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`. + */ + def runJob[T, U: ClassManifest]( + rdd: RDD[T], + func: Iterator[T] => U, + partitions: Seq[Int], + allowLocal: Boolean + ): Array[U] = { + runJob(rdd, (context: TaskContext, iter: Iterator[T]) => func(iter), partitions, allowLocal) + } + + /** + * Run a job on all partitions in an RDD and return the results in an array. + */ + def runJob[T, U: ClassManifest](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U] = { + runJob(rdd, func, 0 until rdd.partitions.size, false) + } + + /** + * Run a job on all partitions in an RDD and return the results in an array. + */ + def runJob[T, U: ClassManifest](rdd: RDD[T], func: Iterator[T] => U): Array[U] = { + runJob(rdd, func, 0 until rdd.partitions.size, false) + } + + /** + * Run a job on all partitions in an RDD and pass the results to a handler function. + */ + def runJob[T, U: ClassManifest]( + rdd: RDD[T], + processPartition: (TaskContext, Iterator[T]) => U, + resultHandler: (Int, U) => Unit) + { + runJob[T, U](rdd, processPartition, 0 until rdd.partitions.size, false, resultHandler) + } + + /** + * Run a job on all partitions in an RDD and pass the results to a handler function. + */ + def runJob[T, U: ClassManifest]( + rdd: RDD[T], + processPartition: Iterator[T] => U, + resultHandler: (Int, U) => Unit) + { + val processFunc = (context: TaskContext, iter: Iterator[T]) => processPartition(iter) + runJob[T, U](rdd, processFunc, 0 until rdd.partitions.size, false, resultHandler) + } + + /** + * Run a job that can return approximate results. + */ + def runApproximateJob[T, U, R]( + rdd: RDD[T], + func: (TaskContext, Iterator[T]) => U, + evaluator: ApproximateEvaluator[U, R], + timeout: Long): PartialResult[R] = { + val callSite = Utils.formatSparkCallSite + logInfo("Starting job: " + callSite) + val start = System.nanoTime + val result = dagScheduler.runApproximateJob(rdd, func, evaluator, callSite, timeout, localProperties.value) + logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s") + result + } + + /** + * Clean a closure to make it ready to serialized and send to tasks + * (removes unreferenced variables in $outer's, updates REPL variables) + */ + private[spark] def clean[F <: AnyRef](f: F): F = { + ClosureCleaner.clean(f) + return f + } + + /** + * Set the directory under which RDDs are going to be checkpointed. The directory must + * be a HDFS path if running on a cluster. If the directory does not exist, it will + * be created. If the directory exists and useExisting is set to true, then the + * exisiting directory will be used. Otherwise an exception will be thrown to + * prevent accidental overriding of checkpoint files in the existing directory. + */ + def setCheckpointDir(dir: String, useExisting: Boolean = false) { + val env = SparkEnv.get + val path = new Path(dir) + val fs = path.getFileSystem(env.hadoop.newConfiguration()) + if (!useExisting) { + if (fs.exists(path)) { + throw new Exception("Checkpoint directory '" + path + "' already exists.") + } else { + fs.mkdirs(path) + } + } + checkpointDir = Some(dir) + } + + /** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). */ + def defaultParallelism: Int = taskScheduler.defaultParallelism + + /** Default min number of partitions for Hadoop RDDs when not given by user */ + def defaultMinSplits: Int = math.min(defaultParallelism, 2) + + private val nextShuffleId = new AtomicInteger(0) + + private[spark] def newShuffleId(): Int = nextShuffleId.getAndIncrement() + + private val nextRddId = new AtomicInteger(0) + + /** Register a new RDD, returning its RDD ID */ + private[spark] def newRddId(): Int = nextRddId.getAndIncrement() + + /** Called by MetadataCleaner to clean up the persistentRdds map periodically */ + private[spark] def cleanup(cleanupTime: Long) { + persistentRdds.clearOldValues(cleanupTime) + } +} + +/** + * The SparkContext object contains a number of implicit conversions and parameters for use with + * various Spark features. + */ +object SparkContext { + val SPARK_JOB_DESCRIPTION = "spark.job.description" + + implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] { + def addInPlace(t1: Double, t2: Double): Double = t1 + t2 + def zero(initialValue: Double) = 0.0 + } + + implicit object IntAccumulatorParam extends AccumulatorParam[Int] { + def addInPlace(t1: Int, t2: Int): Int = t1 + t2 + def zero(initialValue: Int) = 0 + } + + implicit object LongAccumulatorParam extends AccumulatorParam[Long] { + def addInPlace(t1: Long, t2: Long) = t1 + t2 + def zero(initialValue: Long) = 0l + } + + implicit object FloatAccumulatorParam extends AccumulatorParam[Float] { + def addInPlace(t1: Float, t2: Float) = t1 + t2 + def zero(initialValue: Float) = 0f + } + + // TODO: Add AccumulatorParams for other types, e.g. lists and strings + + implicit def rddToPairRDDFunctions[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]) = + new PairRDDFunctions(rdd) + + implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable: ClassManifest]( + rdd: RDD[(K, V)]) = + new SequenceFileRDDFunctions(rdd) + + implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest]( + rdd: RDD[(K, V)]) = + new OrderedRDDFunctions[K, V, (K, V)](rdd) + + implicit def doubleRDDToDoubleRDDFunctions(rdd: RDD[Double]) = new DoubleRDDFunctions(rdd) + + implicit def numericRDDToDoubleRDDFunctions[T](rdd: RDD[T])(implicit num: Numeric[T]) = + new DoubleRDDFunctions(rdd.map(x => num.toDouble(x))) + + // Implicit conversions to common Writable types, for saveAsSequenceFile + + implicit def intToIntWritable(i: Int) = new IntWritable(i) + + implicit def longToLongWritable(l: Long) = new LongWritable(l) + + implicit def floatToFloatWritable(f: Float) = new FloatWritable(f) + + implicit def doubleToDoubleWritable(d: Double) = new DoubleWritable(d) + + implicit def boolToBoolWritable (b: Boolean) = new BooleanWritable(b) + + implicit def bytesToBytesWritable (aob: Array[Byte]) = new BytesWritable(aob) + + implicit def stringToText(s: String) = new Text(s) + + private implicit def arrayToArrayWritable[T <% Writable: ClassManifest](arr: Traversable[T]): ArrayWritable = { + def anyToWritable[U <% Writable](u: U): Writable = u + + new ArrayWritable(classManifest[T].erasure.asInstanceOf[Class[Writable]], + arr.map(x => anyToWritable(x)).toArray) + } + + // Helper objects for converting common types to Writable + private def simpleWritableConverter[T, W <: Writable: ClassManifest](convert: W => T) = { + val wClass = classManifest[W].erasure.asInstanceOf[Class[W]] + new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W])) + } + + implicit def intWritableConverter() = simpleWritableConverter[Int, IntWritable](_.get) + + implicit def longWritableConverter() = simpleWritableConverter[Long, LongWritable](_.get) + + implicit def doubleWritableConverter() = simpleWritableConverter[Double, DoubleWritable](_.get) + + implicit def floatWritableConverter() = simpleWritableConverter[Float, FloatWritable](_.get) + + implicit def booleanWritableConverter() = simpleWritableConverter[Boolean, BooleanWritable](_.get) + + implicit def bytesWritableConverter() = simpleWritableConverter[Array[Byte], BytesWritable](_.getBytes) + + implicit def stringWritableConverter() = simpleWritableConverter[String, Text](_.toString) + + implicit def writableWritableConverter[T <: Writable]() = + new WritableConverter[T](_.erasure.asInstanceOf[Class[T]], _.asInstanceOf[T]) + + /** + * Find the JAR from which a given class was loaded, to make it easy for users to pass + * their JARs to SparkContext + */ + def jarOfClass(cls: Class[_]): Seq[String] = { + val uri = cls.getResource("/" + cls.getName.replace('.', '/') + ".class") + if (uri != null) { + val uriStr = uri.toString + if (uriStr.startsWith("jar:file:")) { + // URI will be of the form "jar:file:/path/foo.jar!/package/cls.class", so pull out the /path/foo.jar + List(uriStr.substring("jar:file:".length, uriStr.indexOf('!'))) + } else { + Nil + } + } else { + Nil + } + } + + /** Find the JAR that contains the class of a particular object */ + def jarOfObject(obj: AnyRef): Seq[String] = jarOfClass(obj.getClass) + + /** Get the amount of memory per executor requested through system properties or SPARK_MEM */ + private[spark] val executorMemoryRequested = { + // TODO: Might need to add some extra memory for the non-heap parts of the JVM + Option(System.getProperty("spark.executor.memory")) + .orElse(Option(System.getenv("SPARK_MEM"))) + .map(Utils.memoryStringToMb) + .getOrElse(512) + } +} + +/** + * A class encapsulating how to convert some type T to Writable. It stores both the Writable class + * corresponding to T (e.g. IntWritable for Int) and a function for doing the conversion. + * The getter for the writable class takes a ClassManifest[T] in case this is a generic object + * that doesn't know the type of T when it is created. This sounds strange but is necessary to + * support converting subclasses of Writable to themselves (writableWritableConverter). + */ +private[spark] class WritableConverter[T]( + val writableClass: ClassManifest[T] => Class[_ <: Writable], + val convert: Writable => T) + extends Serializable + |