diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-10-13 15:31:11 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-10-13 15:31:11 -0700 |
commit | 8815aeba0c5366ca8cd89905d6f350235af5d50a (patch) | |
tree | bfbe6c6cfe4957209e3bda7800dadc6ef45f43c9 | |
parent | 84979499dba39e28242c34fc3a3278af271aabfc (diff) | |
download | spark-8815aeba0c5366ca8cd89905d6f350235af5d50a.tar.gz spark-8815aeba0c5366ca8cd89905d6f350235af5d50a.tar.bz2 spark-8815aeba0c5366ca8cd89905d6f350235af5d50a.zip |
Take executor environment vars as an arguemnt to SparkContext
7 files changed, 107 insertions, 79 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 6b81a6f259..becf737597 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -4,8 +4,9 @@ import java.io._ import java.util.concurrent.atomic.AtomicInteger import java.net.{URI, URLClassLoader} -import scala.collection.mutable.{ArrayBuffer, HashMap} +import scala.collection.Map import scala.collection.generic.Growable +import scala.collection.mutable.{ArrayBuffer, HashMap} import akka.actor.Actor import akka.actor.Actor._ @@ -50,22 +51,36 @@ import spark.storage.BlockManagerMaster * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. * - * @constructor Returns a new SparkContext. * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). - * @param jobName A name for your job, to display on the cluster web UI - * @param sparkHome Location where Spark is installed on cluster nodes + * @param jobName A name for your job, to display on the cluster web UI. + * @param sparkHome Location where Spark is installed on cluster nodes. * @param jars Collection of JARs to send to the cluster. These can be paths on the local file * system or HDFS, HTTP, HTTPS, or FTP URLs. + * @param environment Environment variables to set on worker nodes. */ -class SparkContext(master: String, jobName: String, val sparkHome: String, jars: Seq[String]) +class SparkContext( + master: String, + jobName: String, + val sparkHome: String, + jars: Seq[String], + environment: Map[String, String]) extends Logging { /** - * @constructor Returns a new SparkContext. * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). * @param jobName A name for your job, to display on the cluster web UI + * @param sparkHome Location where Spark is installed on cluster nodes. + * @param jars Collection of JARs to send to the cluster. These can be paths on the local file + * system or HDFS, HTTP, HTTPS, or FTP URLs. */ - def this(master: String, jobName: String) = this(master, jobName, null, Nil) + def this(master: String, jobName: String, sparkHome: String, jars: Seq[String]) = + this(master, jobName, sparkHome, jars, Map()) + + /** + * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). + * @param jobName A name for your job, to display on the cluster web UI + */ + def this(master: String, jobName: String) = this(master, jobName, null, Nil, Map()) // Ensure logging is initialized before we spawn any threads initLogging() @@ -92,15 +107,20 @@ class SparkContext(master: String, jobName: String, val sparkHome: String, jars: private[spark] val addedFiles = HashMap[String, Long]() private[spark] val addedJars = HashMap[String, Long]() - // Environment variables to pass to our executors - private[spark] val executorEnvs = HashMap[String, String]() - Seq("SPARK_MEM", "SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", - "SPARK_JAVA_OPTS", "SPARK_TESTING").foreach { key => executorEnvs.put(key, System.getenv(key)) } - - // Add each JAR given through the constructor jars.foreach { addJar(_) } + // Environment variables to pass to our executors + private[spark] val executorEnvs = HashMap[String, String]() + for (key <- Seq("SPARK_MEM", "SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS", + "SPARK_TESTING")) { + val value = System.getenv(key) + if (value != null) { + executorEnvs(key) = value + } + } + executorEnvs ++= environment + // Create and start the scheduler private var taskScheduler: TaskScheduler = { // Regular expression used for local[N] master format @@ -439,12 +459,6 @@ class SparkContext(master: String, jobName: String, val sparkHome: String, jars: addedJars.clear() } - /* Sets an environment variable that will be passed to the executors */ - def putExecutorEnv(key: String, value: String) { - logInfo("Setting executor environment variable " + key + "=" + value) - executorEnvs.put(key,value) - } - /** Shut down the SparkContext. */ def stop() { dagScheduler.stop() @@ -558,16 +572,12 @@ class SparkContext(master: String, jobName: String, val sparkHome: String, jars: private var nextShuffleId = new AtomicInteger(0) - private[spark] def newShuffleId(): Int = { - nextShuffleId.getAndIncrement() - } + private[spark] def newShuffleId(): Int = nextShuffleId.getAndIncrement() private var nextRddId = new AtomicInteger(0) /** Register a new RDD, returning its RDD ID */ - private[spark] def newRddId(): Int = { - nextRddId.getAndIncrement() - } + private[spark] def newRddId(): Int = nextRddId.getAndIncrement() } /** @@ -649,8 +659,10 @@ object SparkContext { implicit def writableWritableConverter[T <: Writable]() = new WritableConverter[T](_.erasure.asInstanceOf[Class[T]], _.asInstanceOf[T]) - // Find the JAR from which a given class was loaded, to make it easy for users to pass - // their JARs to SparkContext + /** + * Find the JAR from which a given class was loaded, to make it easy for users to pass + * their JARs to SparkContext + */ def jarOfClass(cls: Class[_]): Seq[String] = { val uri = cls.getResource("/" + cls.getName.replace('.', '/') + ".class") if (uri != null) { @@ -666,7 +678,7 @@ object SparkContext { } } - // Find the JAR that contains the class of a particular object + /** Find the JAR that contains the class of a particular object */ def jarOfObject(obj: AnyRef): Seq[String] = jarOfClass(obj.getClass) } diff --git a/core/src/main/scala/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/spark/api/java/JavaSparkContext.scala index c78b09c750..edbb187b1b 100644 --- a/core/src/main/scala/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/spark/api/java/JavaSparkContext.scala @@ -1,51 +1,65 @@ package spark.api.java -import spark.{Accumulator, AccumulatorParam, RDD, SparkContext} -import spark.SparkContext.IntAccumulatorParam -import spark.SparkContext.DoubleAccumulatorParam -import spark.broadcast.Broadcast +import java.util.{Map => JMap} +import scala.collection.JavaConversions import scala.collection.JavaConversions._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.InputFormat import org.apache.hadoop.mapred.JobConf - import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} +import spark.{Accumulator, AccumulatorParam, RDD, SparkContext} +import spark.SparkContext.IntAccumulatorParam +import spark.SparkContext.DoubleAccumulatorParam +import spark.broadcast.Broadcast -import scala.collection.JavaConversions - +/** + * A Java-friendly version of [[spark.SparkContext]] that returns [[spark.api.java.JavaRDD]]s and + * works with Java collections instead of Scala ones. + */ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWorkaround { /** - * @constructor Returns a new SparkContext. * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). - * @param frameworkName A name for your job, to display on the cluster web UI + * @param jobName A name for your job, to display on the cluster web UI */ - def this(master: String, frameworkName: String) = this(new SparkContext(master, frameworkName)) + def this(master: String, jobName: String) = this(new SparkContext(master, jobName)) /** - * @constructor Returns a new SparkContext. * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). - * @param frameworkName A name for your job, to display on the cluster web UI + * @param jobName A name for your job, to display on the cluster web UI * @param sparkHome The SPARK_HOME directory on the slave nodes - * @param jarFile A path to a local jar file containing this job + * @param jars Collection of JARs to send to the cluster. These can be paths on the local file + * system or HDFS, HTTP, HTTPS, or FTP URLs. */ - def this(master: String, frameworkName: String, sparkHome: String, jarFile: String) = - this(new SparkContext(master, frameworkName, sparkHome, Seq(jarFile))) + def this(master: String, jobName: String, sparkHome: String, jarFile: String) = + this(new SparkContext(master, jobName, sparkHome, Seq(jarFile))) /** - * @constructor Returns a new SparkContext. * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). - * @param frameworkName A name for your job, to display on the cluster web UI + * @param jobName A name for your job, to display on the cluster web UI * @param sparkHome The SPARK_HOME directory on the slave nodes - * @param jars A set of jar files relating to this job + * @param jars Collection of JARs to send to the cluster. These can be paths on the local file + * system or HDFS, HTTP, HTTPS, or FTP URLs. */ - def this(master: String, frameworkName: String, sparkHome: String, jars: Array[String]) = - this(new SparkContext(master, frameworkName, sparkHome, jars.toSeq)) + def this(master: String, jobName: String, sparkHome: String, jars: Array[String]) = + this(new SparkContext(master, jobName, sparkHome, jars.toSeq)) - val env = sc.env + /** + * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). + * @param jobName A name for your job, to display on the cluster web UI + * @param sparkHome The SPARK_HOME directory on the slave nodes + * @param jars Collection of JARs to send to the cluster. These can be paths on the local file + * system or HDFS, HTTP, HTTPS, or FTP URLs. + * @param environment Environment variables to set on worker nodes + */ + def this(master: String, jobName: String, sparkHome: String, jars: Array[String], + environment: JMap[String, String]) = + this(new SparkContext(master, jobName, sparkHome, jars.toSeq, environment)) + + private[spark] val env = sc.env /** Distribute a local Scala collection to form an RDD. */ def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = { @@ -81,13 +95,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork def parallelizeDoubles(list: java.util.List[java.lang.Double]): JavaDoubleRDD = parallelizeDoubles(list, sc.defaultParallelism) - /** + /** * Read a text file from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI, and return it as an RDD of Strings. */ def textFile(path: String): JavaRDD[String] = sc.textFile(path) - /** + /** * Read a text file from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI, and return it as an RDD of Strings. */ @@ -155,6 +169,11 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minSplits)) } + /** + * Get an RDD for a Hadoop-readable dataset from a Hadooop JobConf giving its InputFormat and any + * other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable, + * etc). + */ def hadoopRDD[K, V, F <: InputFormat[K, V]]( conf: JobConf, inputFormatClass: Class[F], @@ -166,7 +185,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass)) } - /**Get an RDD for a Hadoop file with an arbitrary InputFormat */ + /** Get an RDD for a Hadoop file with an arbitrary InputFormat */ def hadoopFile[K, V, F <: InputFormat[K, V]]( path: String, inputFormatClass: Class[F], @@ -179,6 +198,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits)) } + /** Get an RDD for a Hadoop file with an arbitrary InputFormat */ def hadoopFile[K, V, F <: InputFormat[K, V]]( path: String, inputFormatClass: Class[F], @@ -264,7 +284,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork def accumulator[T](initialValue: T, accumulatorParam: AccumulatorParam[T]): Accumulator[T] = sc.accumulator(initialValue)(accumulatorParam) - /** + /** * Broadcast a read-only variable to the cluster, returning a [[spark.Broadcast]] object for * reading it in distributed functions. The variable will be sent to each cluster only once. */ diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala index 637e763c9e..07ae7bca78 100644 --- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala @@ -116,14 +116,12 @@ private[spark] class ExecutorRunner( val builder = new ProcessBuilder(command: _*).directory(executorDir) val env = builder.environment() for ((key, value) <- jobDesc.command.environment) { - if (value == null) { - logInfo("Environment variable not set: " + key) - } else { - env.put(key, value) - } + env.put(key, value) } - // In case we are running this from within the Spark Shell - // so we are not creating a parent process. + env.put("SPARK_CORES", cores.toString) + env.put("SPARK_MEMORY", memory.toString) + // In case we are running this from within the Spark Shell, avoid creating a "scala" + // parent process for the executor command env.put("SPARK_LAUNCH_WITH_SCALA", "0") process = builder.start() diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala index 29dd36be15..c45c7df69c 100644 --- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala @@ -114,15 +114,11 @@ private[spark] class CoarseMesosSchedulerBackend( val command = "\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format( runScript, masterUrl, offer.getSlaveId.getValue, offer.getHostname, numCores) val environment = Environment.newBuilder() - sc.executorEnvs.foreach { case(key, value) => - if (value == null) { - logInfo("Environment variable not set: " + key) - } else { - environment.addVariables(Environment.Variable.newBuilder() - .setName(key) - .setValue(value) - .build()) - } + sc.executorEnvs.foreach { case (key, value) => + environment.addVariables(Environment.Variable.newBuilder() + .setName(key) + .setValue(value) + .build()) } return CommandInfo.newBuilder().setValue(command).setEnvironment(environment).build() } diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala index c4aee5c9cb..cdfe1f2563 100644 --- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala @@ -85,15 +85,11 @@ private[spark] class MesosSchedulerBackend( } val execScript = new File(sparkHome, "spark-executor").getCanonicalPath val environment = Environment.newBuilder() - sc.executorEnvs.foreach { case(key, value) => - if (value == null) { - logInfo("Environment variable not set: " + key) - } else { - environment.addVariables(Environment.Variable.newBuilder() - .setName(key) - .setValue(value) - .build()) - } + sc.executorEnvs.foreach { case (key, value) => + environment.addVariables(Environment.Variable.newBuilder() + .setName(key) + .setValue(value) + .build()) } val memory = Resource.newBuilder() .setName("mem") diff --git a/core/src/main/scala/spark/util/StatCounter.scala b/core/src/main/scala/spark/util/StatCounter.scala index 9d7e2b804b..5f80180339 100644 --- a/core/src/main/scala/spark/util/StatCounter.scala +++ b/core/src/main/scala/spark/util/StatCounter.scala @@ -14,7 +14,7 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable { merge(values) - /** @constructor Initialize the StatCounter with no values. */ + /** Initialize the StatCounter with no values. */ def this() = this(Nil) /** Add a value into this StatCounter, updating the internal statistics. */ diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala index 2ea38d4a5d..cacc2796b6 100644 --- a/core/src/test/scala/spark/DistributedSuite.scala +++ b/core/src/test/scala/spark/DistributedSuite.scala @@ -182,4 +182,10 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter assert(data.count() === 4000000) System.clearProperty("spark.storage.memoryFraction") } + + test("passing environment variables to cluster") { + sc = new SparkContext(clusterUrl, "test", null, Nil, Map("TEST_VAR" -> "TEST_VALUE")) + val values = sc.parallelize(1 to 2, 2).map(x => System.getenv("TEST_VAR")).collect() + assert(values.toSeq === Seq("TEST_VALUE", "TEST_VALUE")) + } } |