aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/SparkEnv.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/SparkEnv.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala240
1 files changed, 240 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
new file mode 100644
index 0000000000..478e5a0aaf
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import collection.mutable
+import serializer.Serializer
+
+import akka.actor.{Actor, ActorRef, Props, ActorSystemImpl, ActorSystem}
+import akka.remote.RemoteActorRefProvider
+
+import org.apache.spark.broadcast.BroadcastManager
+import org.apache.spark.metrics.MetricsSystem
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.storage.{BlockManagerMasterActor, BlockManager, BlockManagerMaster}
+import org.apache.spark.network.ConnectionManager
+import org.apache.spark.serializer.{Serializer, SerializerManager}
+import org.apache.spark.util.{Utils, AkkaUtils}
+import org.apache.spark.api.python.PythonWorkerFactory
+
+
+/**
+ * Holds all the runtime environment objects for a running Spark instance (either master or worker),
+ * including the serializer, Akka actor system, block manager, map output tracker, etc. Currently
+ * Spark code finds the SparkEnv through a thread-local variable, so each thread that accesses these
+ * objects needs to have the right SparkEnv set. You can get the current environment with
+ * SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set.
+ */
+class SparkEnv (
+ val executorId: String,
+ val actorSystem: ActorSystem,
+ val serializerManager: SerializerManager,
+ val serializer: Serializer,
+ val closureSerializer: Serializer,
+ val cacheManager: CacheManager,
+ val mapOutputTracker: MapOutputTracker,
+ val shuffleFetcher: ShuffleFetcher,
+ val broadcastManager: BroadcastManager,
+ val blockManager: BlockManager,
+ val connectionManager: ConnectionManager,
+ val httpFileServer: HttpFileServer,
+ val sparkFilesDir: String,
+ val metricsSystem: MetricsSystem) {
+
+ private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
+
+ val hadoop = {
+ val yarnMode = java.lang.Boolean.valueOf(System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
+ if(yarnMode) {
+ try {
+ Class.forName("spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil]
+ } catch {
+ case th: Throwable => throw new SparkException("Unable to load YARN support", th)
+ }
+ } else {
+ new SparkHadoopUtil
+ }
+ }
+
+ def stop() {
+ pythonWorkers.foreach { case(key, worker) => worker.stop() }
+ httpFileServer.stop()
+ mapOutputTracker.stop()
+ shuffleFetcher.stop()
+ broadcastManager.stop()
+ blockManager.stop()
+ blockManager.master.stop()
+ metricsSystem.stop()
+ actorSystem.shutdown()
+ // Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
+ // down, but let's call it anyway in case it gets fixed in a later release
+ actorSystem.awaitTermination()
+ }
+
+ def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = {
+ synchronized {
+ val key = (pythonExec, envVars)
+ pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create()
+ }
+ }
+}
+
+object SparkEnv extends Logging {
+ private val env = new ThreadLocal[SparkEnv]
+ @volatile private var lastSetSparkEnv : SparkEnv = _
+
+ def set(e: SparkEnv) {
+ lastSetSparkEnv = e
+ env.set(e)
+ }
+
+ /**
+ * Returns the ThreadLocal SparkEnv, if non-null. Else returns the SparkEnv
+ * previously set in any thread.
+ */
+ def get: SparkEnv = {
+ Option(env.get()).getOrElse(lastSetSparkEnv)
+ }
+
+ /**
+ * Returns the ThreadLocal SparkEnv.
+ */
+ def getThreadLocal : SparkEnv = {
+ env.get()
+ }
+
+ def createFromSystemProperties(
+ executorId: String,
+ hostname: String,
+ port: Int,
+ isDriver: Boolean,
+ isLocal: Boolean): SparkEnv = {
+
+ val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port)
+
+ // Bit of a hack: If this is the driver and our port was 0 (meaning bind to any free port),
+ // figure out which port number Akka actually bound to and set spark.driver.port to it.
+ if (isDriver && port == 0) {
+ System.setProperty("spark.driver.port", boundPort.toString)
+ }
+
+ // set only if unset until now.
+ if (System.getProperty("spark.hostPort", null) == null) {
+ if (!isDriver){
+ // unexpected
+ Utils.logErrorWithStack("Unexpected NOT to have spark.hostPort set")
+ }
+ Utils.checkHost(hostname)
+ System.setProperty("spark.hostPort", hostname + ":" + boundPort)
+ }
+
+ val classLoader = Thread.currentThread.getContextClassLoader
+
+ // Create an instance of the class named by the given Java system property, or by
+ // defaultClassName if the property is not set, and return it as a T
+ def instantiateClass[T](propertyName: String, defaultClassName: String): T = {
+ val name = System.getProperty(propertyName, defaultClassName)
+ Class.forName(name, true, classLoader).newInstance().asInstanceOf[T]
+ }
+
+ val serializerManager = new SerializerManager
+
+ val serializer = serializerManager.setDefault(
+ System.getProperty("spark.serializer", "org.apache.spark.serializer.JavaSerializer"))
+
+ val closureSerializer = serializerManager.get(
+ System.getProperty("spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer"))
+
+ def registerOrLookup(name: String, newActor: => Actor): ActorRef = {
+ if (isDriver) {
+ logInfo("Registering " + name)
+ actorSystem.actorOf(Props(newActor), name = name)
+ } else {
+ val driverHost: String = System.getProperty("spark.driver.host", "localhost")
+ val driverPort: Int = System.getProperty("spark.driver.port", "7077").toInt
+ Utils.checkHost(driverHost, "Expected hostname")
+ val url = "akka://spark@%s:%s/user/%s".format(driverHost, driverPort, name)
+ logInfo("Connecting to " + name + ": " + url)
+ actorSystem.actorFor(url)
+ }
+ }
+
+ val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
+ "BlockManagerMaster",
+ new BlockManagerMasterActor(isLocal)))
+ val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer)
+
+ val connectionManager = blockManager.connectionManager
+
+ val broadcastManager = new BroadcastManager(isDriver)
+
+ val cacheManager = new CacheManager(blockManager)
+
+ // Have to assign trackerActor after initialization as MapOutputTrackerActor
+ // requires the MapOutputTracker itself
+ val mapOutputTracker = new MapOutputTracker()
+ mapOutputTracker.trackerActor = registerOrLookup(
+ "MapOutputTracker",
+ new MapOutputTrackerActor(mapOutputTracker))
+
+ val shuffleFetcher = instantiateClass[ShuffleFetcher](
+ "spark.shuffle.fetcher", "org.apache.spark.BlockStoreShuffleFetcher")
+
+ val httpFileServer = new HttpFileServer()
+ httpFileServer.initialize()
+ System.setProperty("spark.fileserver.uri", httpFileServer.serverUri)
+
+ val metricsSystem = if (isDriver) {
+ MetricsSystem.createMetricsSystem("driver")
+ } else {
+ MetricsSystem.createMetricsSystem("executor")
+ }
+ metricsSystem.start()
+
+ // Set the sparkFiles directory, used when downloading dependencies. In local mode,
+ // this is a temporary directory; in distributed mode, this is the executor's current working
+ // directory.
+ val sparkFilesDir: String = if (isDriver) {
+ Utils.createTempDir().getAbsolutePath
+ } else {
+ "."
+ }
+
+ // Warn about deprecated spark.cache.class property
+ if (System.getProperty("spark.cache.class") != null) {
+ logWarning("The spark.cache.class property is no longer being used! Specify storage " +
+ "levels using the RDD.persist() method instead.")
+ }
+
+ new SparkEnv(
+ executorId,
+ actorSystem,
+ serializerManager,
+ serializer,
+ closureSerializer,
+ cacheManager,
+ mapOutputTracker,
+ shuffleFetcher,
+ broadcastManager,
+ blockManager,
+ connectionManager,
+ httpFileServer,
+ sparkFilesDir,
+ metricsSystem)
+ }
+}