/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.spark import java.io.File import java.net.Socket import scala.collection.mutable import scala.util.Properties import com.google.common.collect.MapMaker import org.apache.spark.annotation.DeveloperApi import org.apache.spark.api.python.PythonWorkerFactory import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.internal.Logging import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, UnifiedMemoryManager} import org.apache.spark.metrics.MetricsSystem import org.apache.spark.network.BlockTransferService import org.apache.spark.network.netty.NettyBlockTransferService import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv} import org.apache.spark.scheduler.{LiveListenerBus, OutputCommitCoordinator} import org.apache.spark.scheduler.OutputCommitCoordinator.OutputCommitCoordinatorEndpoint import org.apache.spark.serializer.{JavaSerializer, Serializer, SerializerManager} import org.apache.spark.shuffle.ShuffleManager import org.apache.spark.storage._ import org.apache.spark.util.{RpcUtils, Utils} /** * :: DeveloperApi :: * Holds all the runtime environment objects for a running Spark instance (either master or worker), * including the serializer, RpcEnv, block manager, map output tracker, etc. Currently * Spark code finds the SparkEnv through a global variable, so all the threads can access the same * SparkEnv. It can be accessed by SparkEnv.get (e.g. after creating a SparkContext). * * NOTE: This is not intended for external use. This is exposed for Shark and may be made private * in a future release. */ @DeveloperApi class SparkEnv ( val executorId: String, private[spark] val rpcEnv: RpcEnv, val serializer: Serializer, val closureSerializer: Serializer, val serializerManager: SerializerManager, val mapOutputTracker: MapOutputTracker, val shuffleManager: ShuffleManager, val broadcastManager: BroadcastManager, val blockTransferService: BlockTransferService, val blockManager: BlockManager, val securityManager: SecurityManager, val sparkFilesDir: String, val metricsSystem: MetricsSystem, val memoryManager: MemoryManager, val outputCommitCoordinator: OutputCommitCoordinator, val conf: SparkConf) extends Logging { private[spark] var isStopped = false private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() // A general, soft-reference map for metadata needed during HadoopRDD split computation // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats). private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]() private var driverTmpDirToDelete: Option[String] = None private[spark] def stop() { if (!isStopped) { isStopped = true pythonWorkers.values.foreach(_.stop()) mapOutputTracker.stop() shuffleManager.stop() broadcastManager.stop() blockManager.stop() blockManager.master.stop() metricsSystem.stop() outputCommitCoordinator.stop() rpcEnv.shutdown() rpcEnv.awaitTermination() // Note that blockTransferService is stopped by BlockManager since it is started by it. // If we only stop sc, but the driver process still run as a services then we need to delete // the tmp dir, if not, it will create too many tmp dirs. // We only need to delete the tmp dir create by driver, because sparkFilesDir is point to the // current working dir in executor which we do not need to delete. driverTmpDirToDelete match { case Some(path) => try { Utils.deleteRecursively(new File(path)) } catch { case e: Exception => logWarning(s"Exception while deleting Spark temp dir: $path", e) } case None => // We just need to delete tmp dir created by driver, so do nothing on executor } } } private[spark] def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = { synchronized { val key = (pythonExec, envVars) pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create() } } private[spark] def destroyPythonWorker(pythonExec: String, envVars: Map[String, String], worker: Socket) { synchronized { val key = (pythonExec, envVars) pythonWorkers.get(key).foreach(_.stopWorker(worker)) } } private[spark] def releasePythonWorker(pythonExec: String, envVars: Map[String, String], worker: Socket) { synchronized { val key = (pythonExec, envVars) pythonWorkers.get(key).foreach(_.releaseWorker(worker)) } } } object SparkEnv extends Logging { @volatile private var env: SparkEnv = _ private[spark] val driverSystemName = "sparkDriver" private[spark] val executorSystemName = "sparkExecutor" def set(e: SparkEnv) { env = e } /** * Returns the SparkEnv. */ def get: SparkEnv = { env } /** * Create a SparkEnv for the driver. */ private[spark] def createDriverEnv( conf: SparkConf, isLocal: Boolean, listenerBus: LiveListenerBus, numCores: Int, mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = { assert(conf.contains("spark.driver.host"), "spark.driver.host is not set on the driver!") assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!") val hostname = conf.get("spark.driver.host") val port = conf.get("spark.driver.port").toInt create( conf, SparkContext.DRIVER_IDENTIFIER, hostname, port, isDriver = true, isLocal = isLocal, numUsableCores = numCores, listenerBus = listenerBus, mockOutputCommitCoordinator = mockOutputCommitCoordinator ) } /** * Create a SparkEnv for an executor. * In coarse-grained mode, the executor provides an RpcEnv that is already instantiated. */ private[spark] def createExecutorEnv( conf: SparkConf, executorId: String, hostname: String, port: Int, numCores: Int, isLocal: Boolean): SparkEnv = { val env = create( conf, executorId, hostname, port, isDriver = false, isLocal = isLocal, numUsableCores = numCores ) SparkEnv.set(env) env } /** * Helper method to create a SparkEnv for a driver or an executor. */ private def create( conf: SparkConf, executorId: String, hostname: String, port: Int, isDriver: Boolean, isLocal: Boolean, numUsableCores: Int, listenerBus: LiveListenerBus = null, mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = { // Listener bus is only used on the driver if (isDriver) { assert(listenerBus != null, "Attempted to create driver SparkEnv with null listener bus!") } val securityManager = new SecurityManager(conf) val systemName = if (isDriver) driverSystemName else executorSystemName val rpcEnv = RpcEnv.create(systemName, hostname, port, conf, securityManager, clientMode = !isDriver) // Figure out which port RpcEnv actually bound to in case the original port is 0 or occupied. // In the non-driver case, the RPC env's address may be null since it may not be listening // for incoming connections. if (isDriver) { conf.set("spark.driver.port", rpcEnv.address.port.toString) } else if (rpcEnv.address != null) { conf.set("spark.executor.port", rpcEnv.address.port.toString) } // Create an instance of the class with the given name, possibly initializing it with our conf def instantiateClass[T](className: String): T = { val cls = Utils.classForName(className) // Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just // SparkConf, then one taking no arguments try { cls.getConstructor(classOf[SparkConf], java.lang.Boolean.TYPE) .newInstance(conf, new java.lang.Boolean(isDriver)) .asInstanceOf[T] } catch { case _: NoSuchMethodException => try { cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[T] } catch { case _: NoSuchMethodException => cls.getConstructor().newInstance().asInstanceOf[T] } } } // Create an instance of the class named by the given SparkConf property, or defaultClassName // if the property is not set, possibly initializing it with our conf def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = { instantiateClass[T](conf.get(propertyName, defaultClassName)) } val serializer = instantiateClassFromConf[Serializer]( "spark.serializer", "org.apache.spark.serializer.JavaSerializer") logDebug(s"Using serializer: ${serializer.getClass}") val serializerManager = new SerializerManager(serializer, conf) val closureSerializer = new JavaSerializer(conf) def registerOrLookupEndpoint( name: String, endpointCreator: => RpcEndpoint): RpcEndpointRef = { if (isDriver) { logInfo("Registering " + name) rpcEnv.setupEndpoint(name, endpointCreator) } else { RpcUtils.makeDriverRef(name, conf, rpcEnv) } } val mapOutputTracker = if (isDriver) { new MapOutputTrackerMaster(conf) } else { new MapOutputTrackerWorker(conf) } // Have to assign trackerEndpoint after initialization as MapOutputTrackerEndpoint // requires the MapOutputTracker itself mapOutputTracker.trackerEndpoint = registerOrLookupEndpoint(MapOutputTracker.ENDPOINT_NAME, new MapOutputTrackerMasterEndpoint( rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf)) // Let the user specify short names for shuffle managers val shortShuffleMgrNames = Map( "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName, "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName) val shuffleMgrName = conf.get("spark.shuffle.manager", "sort") val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName) val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass) val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false) val memoryManager: MemoryManager = if (useLegacyMemoryManager) { new StaticMemoryManager(conf, numUsableCores) } else { UnifiedMemoryManager(conf, numUsableCores) } val blockTransferService = new NettyBlockTransferService(conf, securityManager, hostname, numUsableCores) val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint( BlockManagerMaster.DRIVER_ENDPOINT_NAME, new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)), conf, isDriver) // NB: blockManager is not valid until initialize() is called later. val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster, serializerManager, conf, memoryManager, mapOutputTracker, shuffleManager, blockTransferService, securityManager, numUsableCores) val broadcastManager = new BroadcastManager(isDriver, conf, securityManager) val metricsSystem = if (isDriver) { // Don't start metrics system right now for Driver. // We need to wait for the task scheduler to give us an app ID. // Then we can start the metrics system. MetricsSystem.createMetricsSystem("driver", conf, securityManager) } else { // We need to set the executor ID before the MetricsSystem is created because sources and // sinks specified in the metrics configuration file will want to incorporate this executor's // ID into the metrics they report. conf.set("spark.executor.id", executorId) val ms = MetricsSystem.createMetricsSystem("executor", conf, securityManager) ms.start() ms } // Set the sparkFiles directory, used when downloading dependencies. In local mode, // this is a temporary directory; in distributed mode, this is the executor's current working // directory. val sparkFilesDir: String = if (isDriver) { Utils.createTempDir(Utils.getLocalDir(conf), "userFiles").getAbsolutePath } else { "." } val outputCommitCoordinator = mockOutputCommitCoordinator.getOrElse { new OutputCommitCoordinator(conf, isDriver) } val outputCommitCoordinatorRef = registerOrLookupEndpoint("OutputCommitCoordinator", new OutputCommitCoordinatorEndpoint(rpcEnv, outputCommitCoordinator)) outputCommitCoordinator.coordinatorRef = Some(outputCommitCoordinatorRef) val envInstance = new SparkEnv( executorId, rpcEnv, serializer, closureSerializer, serializerManager, mapOutputTracker, shuffleManager, broadcastManager, blockTransferService, blockManager, securityManager, sparkFilesDir, metricsSystem, memoryManager, outputCommitCoordinator, conf) // Add a reference to tmp dir created by driver, we will delete this tmp dir when stop() is // called, and we only need to do it for driver. Because driver may run as a service, and if we // don't delete this tmp dir when sc is stopped, then will create too many tmp dirs. if (isDriver) { envInstance.driverTmpDirToDelete = Some(sparkFilesDir) } envInstance } /** * Return a map representation of jvm information, Spark properties, system properties, and * class paths. Map keys define the category, and map values represent the corresponding * attributes as a sequence of KV pairs. This is used mainly for SparkListenerEnvironmentUpdate. */ private[spark] def environmentDetails( conf: SparkConf, schedulingMode: String, addedJars: Seq[String], addedFiles: Seq[String]): Map[String, Seq[(String, String)]] = { import Properties._ val jvmInformation = Seq( ("Java Version", s"$javaVersion ($javaVendor)"), ("Java Home", javaHome), ("Scala Version", versionString) ).sorted // Spark properties // This includes the scheduling mode whether or not it is configured (used by SparkUI) val schedulerMode = if (!conf.contains("spark.scheduler.mode")) { Seq(("spark.scheduler.mode", schedulingMode)) } else { Seq[(String, String)]() } val sparkProperties = (conf.getAll ++ schedulerMode).sorted // System properties that are not java classpaths val systemProperties = Utils.getSystemProperties.toSeq val otherProperties = systemProperties.filter { case (k, _) => k != "java.class.path" && !k.startsWith("spark.") }.sorted // Class paths including all added jars and files val classPathEntries = javaClassPath .split(File.pathSeparator) .filterNot(_.isEmpty) .map((_, "System Classpath")) val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User")) val classPaths = (addedJarsAndFiles ++ classPathEntries).sorted Map[String, Seq[(String, String)]]( "JVM Information" -> jvmInformation, "Spark Properties" -> sparkProperties, "System Properties" -> otherProperties, "Classpath Entries" -> classPaths) } }