aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/SparkEnv.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/SparkEnv.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala54
1 files changed, 30 insertions, 24 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 826f5c2d8c..634a94f0a7 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -40,7 +40,7 @@ import com.google.common.collect.MapMaker
* objects needs to have the right SparkEnv set. You can get the current environment with
* SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set.
*/
-class SparkEnv (
+class SparkEnv private[spark] (
val executorId: String,
val actorSystem: ActorSystem,
val serializerManager: SerializerManager,
@@ -54,7 +54,8 @@ class SparkEnv (
val connectionManager: ConnectionManager,
val httpFileServer: HttpFileServer,
val sparkFilesDir: String,
- val metricsSystem: MetricsSystem) {
+ val metricsSystem: MetricsSystem,
+ val conf: SparkConf) {
private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
@@ -62,7 +63,7 @@ class SparkEnv (
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
- def stop() {
+ private[spark] def stop() {
pythonWorkers.foreach { case(key, worker) => worker.stop() }
httpFileServer.stop()
mapOutputTracker.stop()
@@ -78,6 +79,7 @@ class SparkEnv (
//actorSystem.awaitTermination()
}
+ private[spark]
def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = {
synchronized {
val key = (pythonExec, envVars)
@@ -106,33 +108,35 @@ object SparkEnv extends Logging {
/**
* Returns the ThreadLocal SparkEnv.
*/
- def getThreadLocal : SparkEnv = {
+ def getThreadLocal: SparkEnv = {
env.get()
}
- def createFromSystemProperties(
+ private[spark] def create(
+ conf: SparkConf,
executorId: String,
hostname: String,
port: Int,
isDriver: Boolean,
isLocal: Boolean): SparkEnv = {
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port)
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port,
+ conf = conf)
// Bit of a hack: If this is the driver and our port was 0 (meaning bind to any free port),
// figure out which port number Akka actually bound to and set spark.driver.port to it.
if (isDriver && port == 0) {
- System.setProperty("spark.driver.port", boundPort.toString)
+ conf.set("spark.driver.port", boundPort.toString)
}
// set only if unset until now.
- if (System.getProperty("spark.hostPort", null) == null) {
+ if (!conf.contains("spark.hostPort")) {
if (!isDriver){
// unexpected
Utils.logErrorWithStack("Unexpected NOT to have spark.hostPort set")
}
Utils.checkHost(hostname)
- System.setProperty("spark.hostPort", hostname + ":" + boundPort)
+ conf.set("spark.hostPort", hostname + ":" + boundPort)
}
val classLoader = Thread.currentThread.getContextClassLoader
@@ -140,25 +144,26 @@ object SparkEnv extends Logging {
// Create an instance of the class named by the given Java system property, or by
// defaultClassName if the property is not set, and return it as a T
def instantiateClass[T](propertyName: String, defaultClassName: String): T = {
- val name = System.getProperty(propertyName, defaultClassName)
+ val name = conf.get(propertyName, defaultClassName)
Class.forName(name, true, classLoader).newInstance().asInstanceOf[T]
}
val serializerManager = new SerializerManager
val serializer = serializerManager.setDefault(
- System.getProperty("spark.serializer", "org.apache.spark.serializer.JavaSerializer"))
+ conf.get("spark.serializer", "org.apache.spark.serializer.JavaSerializer"), conf)
val closureSerializer = serializerManager.get(
- System.getProperty("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"))
+ conf.get("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"),
+ conf)
def registerOrLookup(name: String, newActor: => Actor): Either[ActorRef, ActorSelection] = {
if (isDriver) {
logInfo("Registering " + name)
Left(actorSystem.actorOf(Props(newActor), name = name))
} else {
- val driverHost: String = System.getProperty("spark.driver.host", "localhost")
- val driverPort: Int = System.getProperty("spark.driver.port", "7077").toInt
+ val driverHost: String = conf.get("spark.driver.host", "localhost")
+ val driverPort: Int = conf.get("spark.driver.port", "7077").toInt
Utils.checkHost(driverHost, "Expected hostname")
val url = "akka.tcp://spark@%s:%s/user/%s".format(driverHost, driverPort, name)
logInfo("Connecting to " + name + ": " + url)
@@ -168,21 +173,21 @@ object SparkEnv extends Logging {
val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
"BlockManagerMaster",
- new BlockManagerMasterActor(isLocal)))
- val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer)
+ new BlockManagerMasterActor(isLocal, conf)), conf)
+ val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer, conf)
val connectionManager = blockManager.connectionManager
- val broadcastManager = new BroadcastManager(isDriver)
+ val broadcastManager = new BroadcastManager(isDriver, conf)
val cacheManager = new CacheManager(blockManager)
// Have to assign trackerActor after initialization as MapOutputTrackerActor
// requires the MapOutputTracker itself
val mapOutputTracker = if (isDriver) {
- new MapOutputTrackerMaster()
+ new MapOutputTrackerMaster(conf)
} else {
- new MapOutputTracker()
+ new MapOutputTracker(conf)
}
mapOutputTracker.trackerActor = registerOrLookup(
"MapOutputTracker",
@@ -193,12 +198,12 @@ object SparkEnv extends Logging {
val httpFileServer = new HttpFileServer()
httpFileServer.initialize()
- System.setProperty("spark.fileserver.uri", httpFileServer.serverUri)
+ conf.set("spark.fileserver.uri", httpFileServer.serverUri)
val metricsSystem = if (isDriver) {
- MetricsSystem.createMetricsSystem("driver")
+ MetricsSystem.createMetricsSystem("driver", conf)
} else {
- MetricsSystem.createMetricsSystem("executor")
+ MetricsSystem.createMetricsSystem("executor", conf)
}
metricsSystem.start()
@@ -212,7 +217,7 @@ object SparkEnv extends Logging {
}
// Warn about deprecated spark.cache.class property
- if (System.getProperty("spark.cache.class") != null) {
+ if (conf.contains("spark.cache.class")) {
logWarning("The spark.cache.class property is no longer being used! Specify storage " +
"levels using the RDD.persist() method instead.")
}
@@ -231,6 +236,7 @@ object SparkEnv extends Logging {
connectionManager,
httpFileServer,
sparkFilesDir,
- metricsSystem)
+ metricsSystem,
+ conf)
}
}