aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-10-13 15:31:11 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-10-13 15:31:11 -0700
commit8815aeba0c5366ca8cd89905d6f350235af5d50a (patch)
treebfbe6c6cfe4957209e3bda7800dadc6ef45f43c9
parent84979499dba39e28242c34fc3a3278af271aabfc (diff)
downloadspark-8815aeba0c5366ca8cd89905d6f350235af5d50a.tar.gz
spark-8815aeba0c5366ca8cd89905d6f350235af5d50a.tar.bz2
spark-8815aeba0c5366ca8cd89905d6f350235af5d50a.zip
Take executor environment vars as an arguemnt to SparkContext
-rw-r--r--core/src/main/scala/spark/SparkContext.scala68
-rw-r--r--core/src/main/scala/spark/api/java/JavaSparkContext.scala70
-rw-r--r--core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala12
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala14
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala14
-rw-r--r--core/src/main/scala/spark/util/StatCounter.scala2
-rw-r--r--core/src/test/scala/spark/DistributedSuite.scala6
7 files changed, 107 insertions, 79 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 6b81a6f259..becf737597 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -4,8 +4,9 @@ import java.io._
import java.util.concurrent.atomic.AtomicInteger
import java.net.{URI, URLClassLoader}
-import scala.collection.mutable.{ArrayBuffer, HashMap}
+import scala.collection.Map
import scala.collection.generic.Growable
+import scala.collection.mutable.{ArrayBuffer, HashMap}
import akka.actor.Actor
import akka.actor.Actor._
@@ -50,22 +51,36 @@ import spark.storage.BlockManagerMaster
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
* cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.
*
- * @constructor Returns a new SparkContext.
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
- * @param jobName A name for your job, to display on the cluster web UI
- * @param sparkHome Location where Spark is installed on cluster nodes
+ * @param jobName A name for your job, to display on the cluster web UI.
+ * @param sparkHome Location where Spark is installed on cluster nodes.
* @param jars Collection of JARs to send to the cluster. These can be paths on the local file
* system or HDFS, HTTP, HTTPS, or FTP URLs.
+ * @param environment Environment variables to set on worker nodes.
*/
-class SparkContext(master: String, jobName: String, val sparkHome: String, jars: Seq[String])
+class SparkContext(
+ master: String,
+ jobName: String,
+ val sparkHome: String,
+ jars: Seq[String],
+ environment: Map[String, String])
extends Logging {
/**
- * @constructor Returns a new SparkContext.
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
* @param jobName A name for your job, to display on the cluster web UI
+ * @param sparkHome Location where Spark is installed on cluster nodes.
+ * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
+ * system or HDFS, HTTP, HTTPS, or FTP URLs.
*/
- def this(master: String, jobName: String) = this(master, jobName, null, Nil)
+ def this(master: String, jobName: String, sparkHome: String, jars: Seq[String]) =
+ this(master, jobName, sparkHome, jars, Map())
+
+ /**
+ * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
+ * @param jobName A name for your job, to display on the cluster web UI
+ */
+ def this(master: String, jobName: String) = this(master, jobName, null, Nil, Map())
// Ensure logging is initialized before we spawn any threads
initLogging()
@@ -92,15 +107,20 @@ class SparkContext(master: String, jobName: String, val sparkHome: String, jars:
private[spark] val addedFiles = HashMap[String, Long]()
private[spark] val addedJars = HashMap[String, Long]()
- // Environment variables to pass to our executors
- private[spark] val executorEnvs = HashMap[String, String]()
- Seq("SPARK_MEM", "SPARK_CLASSPATH", "SPARK_LIBRARY_PATH",
- "SPARK_JAVA_OPTS", "SPARK_TESTING").foreach { key => executorEnvs.put(key, System.getenv(key)) }
-
-
// Add each JAR given through the constructor
jars.foreach { addJar(_) }
+ // Environment variables to pass to our executors
+ private[spark] val executorEnvs = HashMap[String, String]()
+ for (key <- Seq("SPARK_MEM", "SPARK_CLASSPATH", "SPARK_LIBRARY_PATH", "SPARK_JAVA_OPTS",
+ "SPARK_TESTING")) {
+ val value = System.getenv(key)
+ if (value != null) {
+ executorEnvs(key) = value
+ }
+ }
+ executorEnvs ++= environment
+
// Create and start the scheduler
private var taskScheduler: TaskScheduler = {
// Regular expression used for local[N] master format
@@ -439,12 +459,6 @@ class SparkContext(master: String, jobName: String, val sparkHome: String, jars:
addedJars.clear()
}
- /* Sets an environment variable that will be passed to the executors */
- def putExecutorEnv(key: String, value: String) {
- logInfo("Setting executor environment variable " + key + "=" + value)
- executorEnvs.put(key,value)
- }
-
/** Shut down the SparkContext. */
def stop() {
dagScheduler.stop()
@@ -558,16 +572,12 @@ class SparkContext(master: String, jobName: String, val sparkHome: String, jars:
private var nextShuffleId = new AtomicInteger(0)
- private[spark] def newShuffleId(): Int = {
- nextShuffleId.getAndIncrement()
- }
+ private[spark] def newShuffleId(): Int = nextShuffleId.getAndIncrement()
private var nextRddId = new AtomicInteger(0)
/** Register a new RDD, returning its RDD ID */
- private[spark] def newRddId(): Int = {
- nextRddId.getAndIncrement()
- }
+ private[spark] def newRddId(): Int = nextRddId.getAndIncrement()
}
/**
@@ -649,8 +659,10 @@ object SparkContext {
implicit def writableWritableConverter[T <: Writable]() =
new WritableConverter[T](_.erasure.asInstanceOf[Class[T]], _.asInstanceOf[T])
- // Find the JAR from which a given class was loaded, to make it easy for users to pass
- // their JARs to SparkContext
+ /**
+ * Find the JAR from which a given class was loaded, to make it easy for users to pass
+ * their JARs to SparkContext
+ */
def jarOfClass(cls: Class[_]): Seq[String] = {
val uri = cls.getResource("/" + cls.getName.replace('.', '/') + ".class")
if (uri != null) {
@@ -666,7 +678,7 @@ object SparkContext {
}
}
- // Find the JAR that contains the class of a particular object
+ /** Find the JAR that contains the class of a particular object */
def jarOfObject(obj: AnyRef): Seq[String] = jarOfClass(obj.getClass)
}
diff --git a/core/src/main/scala/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/spark/api/java/JavaSparkContext.scala
index c78b09c750..edbb187b1b 100644
--- a/core/src/main/scala/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/spark/api/java/JavaSparkContext.scala
@@ -1,51 +1,65 @@
package spark.api.java
-import spark.{Accumulator, AccumulatorParam, RDD, SparkContext}
-import spark.SparkContext.IntAccumulatorParam
-import spark.SparkContext.DoubleAccumulatorParam
-import spark.broadcast.Broadcast
+import java.util.{Map => JMap}
+import scala.collection.JavaConversions
import scala.collection.JavaConversions._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.InputFormat
import org.apache.hadoop.mapred.JobConf
-
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
+import spark.{Accumulator, AccumulatorParam, RDD, SparkContext}
+import spark.SparkContext.IntAccumulatorParam
+import spark.SparkContext.DoubleAccumulatorParam
+import spark.broadcast.Broadcast
-import scala.collection.JavaConversions
-
+/**
+ * A Java-friendly version of [[spark.SparkContext]] that returns [[spark.api.java.JavaRDD]]s and
+ * works with Java collections instead of Scala ones.
+ */
class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWorkaround {
/**
- * @constructor Returns a new SparkContext.
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
- * @param frameworkName A name for your job, to display on the cluster web UI
+ * @param jobName A name for your job, to display on the cluster web UI
*/
- def this(master: String, frameworkName: String) = this(new SparkContext(master, frameworkName))
+ def this(master: String, jobName: String) = this(new SparkContext(master, jobName))
/**
- * @constructor Returns a new SparkContext.
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
- * @param frameworkName A name for your job, to display on the cluster web UI
+ * @param jobName A name for your job, to display on the cluster web UI
* @param sparkHome The SPARK_HOME directory on the slave nodes
- * @param jarFile A path to a local jar file containing this job
+ * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
+ * system or HDFS, HTTP, HTTPS, or FTP URLs.
*/
- def this(master: String, frameworkName: String, sparkHome: String, jarFile: String) =
- this(new SparkContext(master, frameworkName, sparkHome, Seq(jarFile)))
+ def this(master: String, jobName: String, sparkHome: String, jarFile: String) =
+ this(new SparkContext(master, jobName, sparkHome, Seq(jarFile)))
/**
- * @constructor Returns a new SparkContext.
* @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
- * @param frameworkName A name for your job, to display on the cluster web UI
+ * @param jobName A name for your job, to display on the cluster web UI
* @param sparkHome The SPARK_HOME directory on the slave nodes
- * @param jars A set of jar files relating to this job
+ * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
+ * system or HDFS, HTTP, HTTPS, or FTP URLs.
*/
- def this(master: String, frameworkName: String, sparkHome: String, jars: Array[String]) =
- this(new SparkContext(master, frameworkName, sparkHome, jars.toSeq))
+ def this(master: String, jobName: String, sparkHome: String, jars: Array[String]) =
+ this(new SparkContext(master, jobName, sparkHome, jars.toSeq))
- val env = sc.env
+ /**
+ * @param master Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]).
+ * @param jobName A name for your job, to display on the cluster web UI
+ * @param sparkHome The SPARK_HOME directory on the slave nodes
+ * @param jars Collection of JARs to send to the cluster. These can be paths on the local file
+ * system or HDFS, HTTP, HTTPS, or FTP URLs.
+ * @param environment Environment variables to set on worker nodes
+ */
+ def this(master: String, jobName: String, sparkHome: String, jars: Array[String],
+ environment: JMap[String, String]) =
+ this(new SparkContext(master, jobName, sparkHome, jars.toSeq, environment))
+
+ private[spark] val env = sc.env
/** Distribute a local Scala collection to form an RDD. */
def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = {
@@ -81,13 +95,13 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
def parallelizeDoubles(list: java.util.List[java.lang.Double]): JavaDoubleRDD =
parallelizeDoubles(list, sc.defaultParallelism)
- /**
+ /**
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
def textFile(path: String): JavaRDD[String] = sc.textFile(path)
- /**
+ /**
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
@@ -155,6 +169,11 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minSplits))
}
+ /**
+ * Get an RDD for a Hadoop-readable dataset from a Hadooop JobConf giving its InputFormat and any
+ * other necessary info (e.g. file name for a filesystem-based dataset, table name for HyperTable,
+ * etc).
+ */
def hadoopRDD[K, V, F <: InputFormat[K, V]](
conf: JobConf,
inputFormatClass: Class[F],
@@ -166,7 +185,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass))
}
- /**Get an RDD for a Hadoop file with an arbitrary InputFormat */
+ /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
def hadoopFile[K, V, F <: InputFormat[K, V]](
path: String,
inputFormatClass: Class[F],
@@ -179,6 +198,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits))
}
+ /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
def hadoopFile[K, V, F <: InputFormat[K, V]](
path: String,
inputFormatClass: Class[F],
@@ -264,7 +284,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
def accumulator[T](initialValue: T, accumulatorParam: AccumulatorParam[T]): Accumulator[T] =
sc.accumulator(initialValue)(accumulatorParam)
- /**
+ /**
* Broadcast a read-only variable to the cluster, returning a [[spark.Broadcast]] object for
* reading it in distributed functions. The variable will be sent to each cluster only once.
*/
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index 637e763c9e..07ae7bca78 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -116,14 +116,12 @@ private[spark] class ExecutorRunner(
val builder = new ProcessBuilder(command: _*).directory(executorDir)
val env = builder.environment()
for ((key, value) <- jobDesc.command.environment) {
- if (value == null) {
- logInfo("Environment variable not set: " + key)
- } else {
- env.put(key, value)
- }
+ env.put(key, value)
}
- // In case we are running this from within the Spark Shell
- // so we are not creating a parent process.
+ env.put("SPARK_CORES", cores.toString)
+ env.put("SPARK_MEMORY", memory.toString)
+ // In case we are running this from within the Spark Shell, avoid creating a "scala"
+ // parent process for the executor command
env.put("SPARK_LAUNCH_WITH_SCALA", "0")
process = builder.start()
diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
index 29dd36be15..c45c7df69c 100644
--- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
@@ -114,15 +114,11 @@ private[spark] class CoarseMesosSchedulerBackend(
val command = "\"%s\" spark.executor.StandaloneExecutorBackend %s %s %s %d".format(
runScript, masterUrl, offer.getSlaveId.getValue, offer.getHostname, numCores)
val environment = Environment.newBuilder()
- sc.executorEnvs.foreach { case(key, value) =>
- if (value == null) {
- logInfo("Environment variable not set: " + key)
- } else {
- environment.addVariables(Environment.Variable.newBuilder()
- .setName(key)
- .setValue(value)
- .build())
- }
+ sc.executorEnvs.foreach { case (key, value) =>
+ environment.addVariables(Environment.Variable.newBuilder()
+ .setName(key)
+ .setValue(value)
+ .build())
}
return CommandInfo.newBuilder().setValue(command).setEnvironment(environment).build()
}
diff --git a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
index c4aee5c9cb..cdfe1f2563 100644
--- a/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/MesosSchedulerBackend.scala
@@ -85,15 +85,11 @@ private[spark] class MesosSchedulerBackend(
}
val execScript = new File(sparkHome, "spark-executor").getCanonicalPath
val environment = Environment.newBuilder()
- sc.executorEnvs.foreach { case(key, value) =>
- if (value == null) {
- logInfo("Environment variable not set: " + key)
- } else {
- environment.addVariables(Environment.Variable.newBuilder()
- .setName(key)
- .setValue(value)
- .build())
- }
+ sc.executorEnvs.foreach { case (key, value) =>
+ environment.addVariables(Environment.Variable.newBuilder()
+ .setName(key)
+ .setValue(value)
+ .build())
}
val memory = Resource.newBuilder()
.setName("mem")
diff --git a/core/src/main/scala/spark/util/StatCounter.scala b/core/src/main/scala/spark/util/StatCounter.scala
index 9d7e2b804b..5f80180339 100644
--- a/core/src/main/scala/spark/util/StatCounter.scala
+++ b/core/src/main/scala/spark/util/StatCounter.scala
@@ -14,7 +14,7 @@ class StatCounter(values: TraversableOnce[Double]) extends Serializable {
merge(values)
- /** @constructor Initialize the StatCounter with no values. */
+ /** Initialize the StatCounter with no values. */
def this() = this(Nil)
/** Add a value into this StatCounter, updating the internal statistics. */
diff --git a/core/src/test/scala/spark/DistributedSuite.scala b/core/src/test/scala/spark/DistributedSuite.scala
index 2ea38d4a5d..cacc2796b6 100644
--- a/core/src/test/scala/spark/DistributedSuite.scala
+++ b/core/src/test/scala/spark/DistributedSuite.scala
@@ -182,4 +182,10 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
assert(data.count() === 4000000)
System.clearProperty("spark.storage.memoryFraction")
}
+
+ test("passing environment variables to cluster") {
+ sc = new SparkContext(clusterUrl, "test", null, Nil, Map("TEST_VAR" -> "TEST_VALUE"))
+ val values = sc.parallelize(1 to 2, 2).map(x => System.getenv("TEST_VAR")).collect()
+ assert(values.toSeq === Seq("TEST_VALUE", "TEST_VALUE"))
+ }
}