From a98f5a0ebb3e94f55439b81bee77b1def079d67c Mon Sep 17 00:00:00 2001 From: Harvey Feng Date: Fri, 15 Nov 2013 03:32:37 -0800 Subject: Misc style changes in the 'yarn' package. --- .../spark/deploy/yarn/ApplicationMaster.scala | 166 +++++++------ .../org/apache/spark/deploy/yarn/Client.scala | 134 ++++++----- .../apache/spark/deploy/yarn/WorkerRunnable.scala | 85 ++++--- .../spark/deploy/yarn/YarnAllocationHandler.scala | 264 +++++++++++++-------- 4 files changed, 376 insertions(+), 273 deletions(-) (limited to 'yarn') diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 0e47bd7a10..e4f3d3ef64 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -21,9 +21,13 @@ import java.io.IOException import java.net.Socket import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} + +import scala.collection.JavaConversions._ + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.net.NetUtils +import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.util.ShutdownHookManager import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.records._ @@ -31,35 +35,36 @@ import org.apache.hadoop.yarn.api.protocolrecords._ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{ConverterUtils, Records} + import org.apache.spark.{SparkContext, Logging} import org.apache.spark.util.Utils -import scala.collection.JavaConversions._ + class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging { def this(args: ApplicationMasterArguments) = this(args, new Configuration()) private var rpc: YarnRPC = YarnRPC.create(conf) - private var resourceManager: AMRMProtocol = null - private var appAttemptId: ApplicationAttemptId = null - private var userThread: Thread = null + private var resourceManager: AMRMProtocol = _ + private var appAttemptId: ApplicationAttemptId = _ + private var userThread: Thread = _ private val yarnConf: YarnConfiguration = new YarnConfiguration(conf) private val fs = FileSystem.get(yarnConf) - private var yarnAllocator: YarnAllocationHandler = null - private var isFinished:Boolean = false - private var uiAddress: String = "" + private var yarnAllocator: YarnAllocationHandler = _ + private var isFinished: Boolean = false + private var uiAddress: String = _ private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES, YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES) private var isLastAMRetry: Boolean = true def run() { - // setup the directories so things go to yarn approved directories rather - // then user specified and /tmp + // Setup the directories so things go to yarn approved directories rather + // then user specified and /tmp. System.setProperty("spark.local.dir", getLocalDirs()) - // use priority 30 as its higher then HDFS. Its same priority as MapReduce is using + // Use priority 30 as its higher then HDFS. Its same priority as MapReduce is using. ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30) appAttemptId = getApplicationAttemptId() @@ -68,9 +73,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // Workaround until hadoop moves to something which has // https://issues.apache.org/jira/browse/HADOOP-8406 - fixed in (2.0.2-alpha but no 0.23 line) - // ignore result + // ignore result. // This does not, unfortunately, always work reliably ... but alleviates the bug a lot of times - // Hence args.workerCores = numCore disabled above. Any better option ? + // Hence args.workerCores = numCore disabled above. Any better option? // Compute number of threads for akka //val minimumMemory = appMasterResponse.getMinimumResourceCapability().getMemory() @@ -96,7 +101,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e waitForSparkContextInitialized() - // do this after spark master is up and SparkContext is created so that we can register UI Url + // Do this after spark master is up and SparkContext is created so that we can register UI Url val appMasterResponse: RegisterApplicationMasterResponse = registerApplicationMaster() // Allocate all containers @@ -115,12 +120,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X val localDirs = Option(System.getenv("YARN_LOCAL_DIRS")) .getOrElse(Option(System.getenv("LOCAL_DIRS")) - .getOrElse("")) + .getOrElse("")) if (localDirs.isEmpty()) { throw new Exception("Yarn Local dirs can't be empty") } - return localDirs + localDirs } private def getApplicationAttemptId(): ApplicationAttemptId = { @@ -129,7 +134,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e val containerId = ConverterUtils.toContainerId(containerIdString) val appAttemptId = containerId.getApplicationAttemptId() logInfo("ApplicationAttemptId: " + appAttemptId) - return appAttemptId + appAttemptId } private def registerWithResourceManager(): AMRMProtocol = { @@ -137,7 +142,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e YarnConfiguration.RM_SCHEDULER_ADDRESS, YarnConfiguration.DEFAULT_RM_SCHEDULER_ADDRESS)) logInfo("Connecting to ResourceManager at " + rmAddress) - return rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol] + rpc.getProxy(classOf[AMRMProtocol], rmAddress, conf).asInstanceOf[AMRMProtocol] } private def registerApplicationMaster(): RegisterApplicationMasterResponse = { @@ -145,12 +150,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest]) .asInstanceOf[RegisterApplicationMasterRequest] appMasterRequest.setApplicationAttemptId(appAttemptId) - // Setting this to master host,port - so that the ApplicationReport at client has some sensible info. + // Setting this to master host,port - so that the ApplicationReport at client has some + // sensible info. // Users can then monitor stderr/stdout on that node if required. appMasterRequest.setHost(Utils.localHostName()) appMasterRequest.setRpcPort(0) appMasterRequest.setTrackingUrl(uiAddress) - return resourceManager.registerApplicationMaster(appMasterRequest) + resourceManager.registerApplicationMaster(appMasterRequest) } private def waitForSparkMaster() { @@ -164,21 +170,25 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e try { val socket = new Socket(driverHost, driverPort.toInt) socket.close() - logInfo("Driver now available: " + driverHost + ":" + driverPort) + logInfo("Driver now available: %s:%s".format(driverHost, driverPort)) driverUp = true } catch { - case e: Exception => - logWarning("Failed to connect to driver at " + driverHost + ":" + driverPort + ", retrying") - Thread.sleep(100) - tries = tries + 1 + case e: Exception => { + logWarning("Failed to connect to driver at %s:%s, retrying ..."). + format(driverHost, driverPort) + Thread.sleep(100) + tries = tries + 1 + } } } } private def startUserClass(): Thread = { logInfo("Starting the user JAR in a separate Thread") - val mainMethod = Class.forName(args.userClass, false, Thread.currentThread.getContextClassLoader) - .getMethod("main", classOf[Array[String]]) + val mainMethod = Class.forName( + args.userClass, + false /* initialize */, + Thread.currentThread.getContextClassLoader).getMethod("main", classOf[Array[String]]) val t = new Thread { override def run() { var successed = false @@ -203,7 +213,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e } } t.start() - return t + t } // this need to happen before allocateWorkers @@ -225,12 +235,20 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e if (null != sparkContext) { uiAddress = sparkContext.ui.appUIAddress - this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args, - sparkContext.preferredNodeLocationData) + this.yarnAllocator = YarnAllocationHandler.newAllocator( + yarnConf, + resourceManager, + appAttemptId, + args, + sparkContext.preferredNodeLocationData) } else { - logWarning("Unable to retrieve sparkContext inspite of waiting for " + count * waitTime + - ", numTries = " + numTries) - this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args) + logWarning("Unable to retrieve sparkContext inspite of waiting for %d, numTries = %d". + format(count * waitTime, numTries)) + this.yarnAllocator = YarnAllocationHandler.newAllocator( + yarnConf, + resourceManager, + appAttemptId, + args) } } } finally { @@ -246,35 +264,37 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // Wait until all containers have finished // TODO: This is a bit ugly. Can we make it nicer? // TODO: Handle container failure - while(yarnAllocator.getNumWorkersRunning < args.numWorkers && - // If user thread exists, then quit ! - userThread.isAlive) { - this.yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0)) - ApplicationMaster.incrementAllocatorLoop(1) - Thread.sleep(100) + // Exists the loop if the user thread exits. + while (yarnAllocator.getNumWorkersRunning < args.numWorkers && userThread.isAlive) { + val numContainersToAllocate = math.max( + args.numWorkers - yarnAllocator.getNumWorkersRunning, 0) + this.yarnAllocator.allocateContainers(numContainersToAllocate) + ApplicationMaster.incrementAllocatorLoop(1) + Thread.sleep(100) } } finally { - // in case of exceptions, etc - ensure that count is atleast ALLOCATOR_LOOP_WAIT_COUNT : - // so that the loop (in ApplicationMaster.sparkContextInitialized) breaks + // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT, + // so that the loop in ApplicationMaster#sparkContextInitialized() breaks. ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT) } logInfo("All workers have launched.") - // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout + // Launch a progress reporter thread, else the app will get killed after expiration + // (def: 10mins) timeout. + // TODO(harvey): Verify the timeout if (userThread.isAlive) { - // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse. - + // Ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapses. val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) - // must be <= timeoutInterval/ 2. - // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM. - // so atleast 1 minute or timeoutInterval / 10 - whichever is higher. + // Must be <= timeoutInterval/ 2. + // On other hand, also ensure that we are reasonably responsive without causing too many + // requests to RM. So, at least 1 minute or timeoutInterval / 10 - whichever is higher. val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L)) launchReporterThread(interval) } } - // TODO: We might want to extend this to allocate more containers in case they die ! + // TODO: We might want to extend this to allocate more containers in case they die. private def launchReporterThread(_sleepTime: Long): Thread = { val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime @@ -283,7 +303,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e while (userThread.isAlive) { val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning if (missingWorkerCount > 0) { - logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers") + logInfo("Allocating %d containers to make up for (potentially) lost containers". + format(missingWorkerCount)) yarnAllocator.allocateContainers(missingWorkerCount) } else sendProgress() @@ -291,16 +312,16 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e } } } - // setting to daemon status, though this is usually not a good idea. + // Setting to daemon status, though this is usually not a good idea. t.setDaemon(true) t.start() logInfo("Started progress reporter thread - sleep time : " + sleepTime) - return t + t } private def sendProgress() { logDebug("Sending progress") - // simulated with an allocate request with no nodes requested ... + // Simulated with an allocate request with no nodes requested ... yarnAllocator.allocateContainers(0) } @@ -320,7 +341,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e */ def finishApplicationMaster(status: FinalApplicationStatus) { - synchronized { if (isFinished) { return @@ -333,14 +353,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e .asInstanceOf[FinishApplicationMasterRequest] finishReq.setAppAttemptId(appAttemptId) finishReq.setFinishApplicationStatus(status) - // set tracking url to empty since we don't have a history server + // Set tracking url to empty since we don't have a history server. finishReq.setTrackingUrl("") resourceManager.finishApplicationMaster(finishReq) - } /** - * clean up the staging directory. + * Clean up the staging directory. */ private def cleanupStagingDir() { var stagingDirPath: Path = null @@ -356,13 +375,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e fs.delete(stagingDirPath, true) } } catch { - case e: IOException => - logError("Failed to cleanup staging dir " + stagingDirPath, e) + case ioe: IOException => + logError("Failed to cleanup staging dir " + stagingDirPath, ioe) } } - // The shutdown hook that runs when a signal is received AND during normal - // close of the JVM. + // The shutdown hook that runs when a signal is received AND during normal close of the JVM. class AppMasterShutdownHook(appMaster: ApplicationMaster) extends Runnable { def run() { @@ -372,15 +390,14 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir() } } - } object ApplicationMaster { - // number of times to wait for the allocator loop to complete. - // each loop iteration waits for 100ms, so maximum of 3 seconds. + // Number of times to wait for the allocator loop to complete. + // Each loop iteration waits for 100ms, so maximum of 3 seconds. // This is to ensure that we have reasonable number of containers before we start - // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be optimal as more - // containers are available. Might need to handle this better. + // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be + // optimal as more containers are available. Might need to handle this better. private val ALLOCATOR_LOOP_WAIT_COUNT = 30 def incrementAllocatorLoop(by: Int) { val count = yarnAllocatorLoop.getAndAdd(by) @@ -398,7 +415,8 @@ object ApplicationMaster { applicationMasters.add(master) } - val sparkContextRef: AtomicReference[SparkContext] = new AtomicReference[SparkContext](null) + val sparkContextRef: AtomicReference[SparkContext] = + new AtomicReference[SparkContext](null /* initialValue */) val yarnAllocatorLoop: AtomicInteger = new AtomicInteger(0) def sparkContextInitialized(sc: SparkContext): Boolean = { @@ -408,19 +426,21 @@ object ApplicationMaster { sparkContextRef.notifyAll() } - // Add a shutdown hook - as a best case effort in case users do not call sc.stop or do System.exit - // Should not really have to do this, but it helps yarn to evict resources earlier. - // not to mention, prevent Client declaring failure even though we exit'ed properly. - // Note that this will unfortunately not properly clean up the staging files because it gets called to - // late and the filesystem is already shutdown. + // Add a shutdown hook - as a best case effort in case users do not call sc.stop or do + // System.exit. + // Should not really have to do this, but it helps YARN to evict resources earlier. + // Not to mention, prevent the Client from declaring failure even though we exited properly. + // Note that this will unfortunately not properly clean up the staging files because it gets + // called too late, after the filesystem is already shutdown. if (modified) { Runtime.getRuntime().addShutdownHook(new Thread with Logging { - // This is not just to log, but also to ensure that log system is initialized for this instance when we actually are 'run' + // This is not only logs, but also ensures that log system is initialized for this instance + // when we are actually 'run'-ing. logInfo("Adding shutdown hook for context " + sc) override def run() { logInfo("Invoking sc stop from shutdown hook") sc.stop() - // best case ... + // Best case ... for (master <- applicationMasters) { master.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED) } @@ -428,7 +448,7 @@ object ApplicationMaster { } ) } - // Wait for initialization to complete and atleast 'some' nodes can get allocated + // Wait for initialization to complete and atleast 'some' nodes can get allocated. yarnAllocatorLoop.synchronized { while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) { yarnAllocatorLoop.wait(1000L) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index c38bdd14ec..08699cc5f8 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -20,43 +20,43 @@ package org.apache.spark.deploy.yarn import java.net.{InetAddress, InetSocketAddress, UnknownHostException, URI} import java.nio.ByteBuffer +import scala.collection.JavaConversions._ +import scala.collection.mutable.HashMap +import scala.collection.mutable.Map + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path, FileUtil} -import org.apache.hadoop.fs.permission.FsPermission +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.mapred.Master import org.apache.hadoop.net.NetUtils -import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment -import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.api.protocolrecords._ +import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.YarnClientImpl import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{Apps, Records} -import scala.collection.mutable.HashMap -import scala.collection.mutable.Map -import scala.collection.JavaConversions._ - import org.apache.spark.Logging import org.apache.spark.util.Utils import org.apache.spark.deploy.SparkHadoopUtil + class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging { - + def this(args: ClientArguments) = this(new Configuration(), args) - + var rpc: YarnRPC = YarnRPC.create(conf) val yarnConf: YarnConfiguration = new YarnConfiguration(conf) val credentials = UserGroupInformation.getCurrentUser().getCredentials() private val SPARK_STAGING: String = ".sparkStaging" private val distCacheMgr = new ClientDistributedCacheManager() - // staging directory is private! -> rwx-------- + // Staging directory is private! -> rwx-------- val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700:Short) - // app files are world-wide readable and owner writable -> rw-r--r-- + // App files are world-wide readable and owner writable -> rw-r--r-- val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644:Short) def run() { @@ -79,7 +79,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl appContext.setUser(UserGroupInformation.getCurrentUser().getShortUserName()) submitApp(appContext) - + monitorApplication(appId) System.exit(0) } @@ -90,20 +90,25 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl def logClusterResourceDetails() { val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics - logInfo("Got Cluster metric info from ASM, numNodeManagers=" + clusterMetrics.getNumNodeManagers) + logInfo("Got Cluster metric info from ASM, numNodeManagers = " + + clusterMetrics.getNumNodeManagers) val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue) - logInfo("Queue info .. queueName=" + queueInfo.getQueueName + ", queueCurrentCapacity=" + queueInfo.getCurrentCapacity + - ", queueMaxCapacity=" + queueInfo.getMaximumCapacity + ", queueApplicationCount=" + queueInfo.getApplications.size + - ", queueChildQueueCount=" + queueInfo.getChildQueues.size) + logInfo("""Queue info ... queueName = %s, queueCurrentCapacity = %s, queueMaxCapacity = %s, + queueApplicationCount = %s, queueChildQueueCount = %s""".format( + queueInfo.getQueueName, + queueInfo.getCurrentCapacity, + queueInfo.getMaximumCapacity, + queueInfo.getApplications.size, + queueInfo.getChildQueues.size) } - + def verifyClusterResources(app: GetNewApplicationResponse) = { val maxMem = app.getMaximumResourceCapability().getMemory() logInfo("Max mem capabililty of a single resource in this cluster " + maxMem) - - // if we have requested more then the clusters max for a single resource then exit. + + // If we have requested more then the clusters max for a single resource then exit. if (args.workerMemory > maxMem) { logError("the worker size is to large to run on this cluster " + args.workerMemory); System.exit(1) @@ -114,10 +119,10 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl System.exit(1) } - // We could add checks to make sure the entire cluster has enough resources but that involves getting - // all the node reports and computing ourselves + // We could add checks to make sure the entire cluster has enough resources but that involves + // getting all the node reports and computing ourselves } - + def createApplicationSubmissionContext(appId: ApplicationId): ApplicationSubmissionContext = { logInfo("Setting up application submission context for ASM") val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) @@ -126,9 +131,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl return appContext } - /* - * see if two file systems are the same or not. - */ + /** See if two file systems are the same or not. */ private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = { val srcUri = srcFs.getUri() val dstUri = destFs.getUri() @@ -163,9 +166,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl return true; } - /** - * Copy the file into HDFS if needed. - */ + /** Copy the file into HDFS if needed. */ private def copyRemoteFile( dstDir: Path, originalPath: Path, @@ -181,9 +182,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl fs.setReplication(newPath, replication); if (setPerms) fs.setPermission(newPath, new FsPermission(APP_FILE_PERMISSION)) } - // resolve any symlinks in the URI path so using a "current" symlink - // to point to a specific version shows the specific version - // in the distributed cache configuration + // Resolve any symlinks in the URI path so using a "current" symlink to point to a specific + // version shows the specific version in the distributed cache configuration val qualPath = fs.makeQualified(newPath) val fc = FileContext.getFileContext(qualPath.toUri(), conf) val destPath = fc.resolvePath(qualPath) @@ -192,8 +192,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = { logInfo("Preparing Local resources") - // Upload Spark and the application JAR to the remote file system if necessary - // Add them as local resources to the AM + // Upload Spark and the application JAR to the remote file system if necessary. Add them as + // local resources to the AM. val fs = FileSystem.get(conf) val delegTokenRenewer = Master.getMasterPrincipal(conf); @@ -276,7 +276,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl UserGroupInformation.getCurrentUser().addCredentials(credentials); return localResources } - + def setupLaunchEnv( localResources: HashMap[String, LocalResource], stagingDir: String): HashMap[String, String] = { @@ -289,16 +289,16 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl env("SPARK_YARN_MODE") = "true" env("SPARK_YARN_STAGING_DIR") = stagingDir - // set the environment variables to be passed on to the Workers + // Set the environment variables to be passed on to the Workers. distCacheMgr.setDistFilesEnv(env) distCacheMgr.setDistArchivesEnv(env) - // allow users to specify some environment variables + // Allow users to specify some environment variables. Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV")) - // Add each SPARK-* key to the environment + // Add each SPARK-* key to the environment. System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v } - return env + env } def userArgsToString(clientArgs: ClientArguments): String = { @@ -308,13 +308,13 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl for (arg <- args){ retval.append(prefix).append(" '").append(arg).append("' ") } - retval.toString } - def createContainerLaunchContext(newApp: GetNewApplicationResponse, - localResources: HashMap[String, LocalResource], - env: HashMap[String, String]): ContainerLaunchContext = { + def createContainerLaunchContext( + newApp: GetNewApplicationResponse, + localResources: HashMap[String, LocalResource], + env: HashMap[String, String]): ContainerLaunchContext = { logInfo("Setting up container launch context") val amContainer = Records.newRecord(classOf[ContainerLaunchContext]) amContainer.setLocalResources(localResources) @@ -322,8 +322,10 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val minResMemory: Int = newApp.getMinimumResourceCapability().getMemory() + // TODO(harvey): This can probably be a val. var amMemory = ((args.amMemory / minResMemory) * minResMemory) + - (if (0 != (args.amMemory % minResMemory)) minResMemory else 0) - YarnAllocationHandler.MEMORY_OVERHEAD + ((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) - + YarnAllocationHandler.MEMORY_OVERHEAD) // Extra options for the JVM var JAVA_OPTS = "" @@ -334,14 +336,18 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " " - - // Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out. - // The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same - // node, spark gc effects all other containers performance (which can also be other spark containers) - // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in multi-tenant environments. Not sure how default java gc behaves if it is - // limited to subset of cores on a node. - if (env.isDefinedAt("SPARK_USE_CONC_INCR_GC") && java.lang.Boolean.parseBoolean(env("SPARK_USE_CONC_INCR_GC"))) { - // In our expts, using (default) throughput collector has severe perf ramnifications in multi-tenant machines + // Commenting it out for now - so that people can refer to the properties if required. Remove + // it once cpuset version is pushed out. The context is, default gc for server class machines + // end up using all cores to do gc - hence if there are multiple containers in same node, + // spark gc effects all other containers performance (which can also be other spark containers) + // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in + // multi-tenant environments. Not sure how default java gc behaves if it is limited to subset + // of cores on a node. + val useConcurrentAndIncrementalGC = env.isDefinedAt("SPARK_USE_CONC_INCR_GC") && + java.lang.Boolean.parseBoolean(env("SPARK_USE_CONC_INCR_GC")) + if (useConcurrentAndIncrementalGC) { + // In our expts, using (default) throughput collector has severe perf ramnifications in + // multi-tenant machines JAVA_OPTS += " -XX:+UseConcMarkSweepGC " JAVA_OPTS += " -XX:+CMSIncrementalMode " JAVA_OPTS += " -XX:+CMSIncrementalPacing " @@ -353,7 +359,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl JAVA_OPTS += env("SPARK_JAVA_OPTS") + " " } - // Command for the ApplicationMaster + // Command for the ApplicationMaster. var javaCommand = "java"; val javaHome = System.getenv("JAVA_HOME") if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) { @@ -379,28 +385,28 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr") logInfo("Command for the ApplicationMaster: " + commands(0)) amContainer.setCommands(commands) - + val capability = Records.newRecord(classOf[Resource]).asInstanceOf[Resource] - // Memory for the ApplicationMaster + // Memory for the ApplicationMaster. capability.setMemory(args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD) amContainer.setResource(capability) - // Setup security tokens + // Setup security tokens. val dob = new DataOutputBuffer() credentials.writeTokenStorageToStream(dob) amContainer.setContainerTokens(ByteBuffer.wrap(dob.getData())) - return amContainer + amContainer } - + def submitApp(appContext: ApplicationSubmissionContext) = { - // Submit the application to the applications manager + // Submit the application to the applications manager. logInfo("Submitting application to ASM") super.submitApplication(appContext) } - + def monitorApplication(appId: ApplicationId): Boolean = { - while(true) { + while (true) { Thread.sleep(1000) val report = super.getApplicationReport(appId) @@ -418,16 +424,16 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl "\t appTrackingUrl: " + report.getTrackingUrl() + "\n" + "\t appUser: " + report.getUser() ) - + val state = report.getYarnApplicationState() val dsStatus = report.getFinalApplicationStatus() if (state == YarnApplicationState.FINISHED || state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) { - return true + return true } } - return true + true } } @@ -459,7 +465,7 @@ object Client { Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + Path.SEPARATOR + LOG4J_PROP) } - // normally the users app.jar is last in case conflicts with spark jars + // Normally the users app.jar is last in case conflicts with spark jars val userClasspathFirst = System.getProperty("spark.yarn.user.classpath.first", "false") .toBoolean if (userClasspathFirst) { diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala index 7a66532254..d9eabf3bae 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala @@ -21,53 +21,60 @@ import java.net.URI import java.nio.ByteBuffer import java.security.PrivilegedExceptionAction +import scala.collection.JavaConversions._ +import scala.collection.mutable.HashMap + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.net.NetUtils import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.api._ +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.api.protocolrecords._ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils} -import org.apache.hadoop.yarn.api.ApplicationConstants.Environment - -import scala.collection.JavaConversions._ -import scala.collection.mutable.HashMap import org.apache.spark.Logging import org.apache.spark.util.Utils -class WorkerRunnable(container: Container, conf: Configuration, masterAddress: String, - slaveId: String, hostname: String, workerMemory: Int, workerCores: Int) - extends Runnable with Logging { - + +class WorkerRunnable( + container: Container, + conf: Configuration, + masterAddress: String, + slaveId: String, + hostname: String, + workerMemory: Int, + workerCores: Int) + extends Runnable with Logging { + var rpc: YarnRPC = YarnRPC.create(conf) var cm: ContainerManager = null val yarnConf: YarnConfiguration = new YarnConfiguration(conf) - + def run = { logInfo("Starting Worker Container") cm = connectToCM startContainer } - + def startContainer = { logInfo("Setting up ContainerLaunchContext") - + val ctx = Records.newRecord(classOf[ContainerLaunchContext]) .asInstanceOf[ContainerLaunchContext] - + ctx.setContainerId(container.getId()) ctx.setResource(container.getResource()) val localResources = prepareLocalResources ctx.setLocalResources(localResources) - + val env = prepareEnvironment ctx.setEnvironment(env) - + // Extra options for the JVM var JAVA_OPTS = "" // Set the JVM memory @@ -80,17 +87,21 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " " - - // Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out. - // The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same - // node, spark gc effects all other containers performance (which can also be other spark containers) - // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in multi-tenant environments. Not sure how default java gc behaves if it is - // limited to subset of cores on a node. + // Commenting it out for now - so that people can refer to the properties if required. Remove + // it once cpuset version is pushed out. + // The context is, default gc for server class machines end up using all cores to do gc - hence + // if there are multiple containers in same node, spark gc effects all other containers + // performance (which can also be other spark containers) + // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in + // multi-tenant environments. Not sure how default java gc behaves if it is limited to subset + // of cores on a node. /* else { // If no java_opts specified, default to using -XX:+CMSIncrementalMode - // It might be possible that other modes/config is being done in SPARK_JAVA_OPTS, so we dont want to mess with it. - // In our expts, using (default) throughput collector has severe perf ramnifications in multi-tennent machines + // It might be possible that other modes/config is being done in SPARK_JAVA_OPTS, so we dont + // want to mess with it. + // In our expts, using (default) throughput collector has severe perf ramnifications in + // multi-tennent machines // The options are based on // http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use%20the%20Concurrent%20Low%20Pause%20Collector|outline JAVA_OPTS += " -XX:+UseConcMarkSweepGC " @@ -117,8 +128,10 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S val commands = List[String](javaCommand + " -server " + // Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling. - // Not killing the task leaves various aspects of the worker and (to some extent) the jvm in an inconsistent state. - // TODO: If the OOM is not recoverable by rescheduling it on different node, then do 'something' to fail job ... akin to blacklisting trackers in mapred ? + // Not killing the task leaves various aspects of the worker and (to some extent) the jvm in + // an inconsistent state. + // TODO: If the OOM is not recoverable by rescheduling it on different node, then do + // 'something' to fail job ... akin to blacklisting trackers in mapred ? " -XX:OnOutOfMemoryError='kill %p' " + JAVA_OPTS + " org.apache.spark.executor.CoarseGrainedExecutorBackend " + @@ -130,7 +143,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S " 2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr") logInfo("Setting up worker with commands: " + commands) ctx.setCommands(commands) - + // Send the start request to the ContainerManager val startReq = Records.newRecord(classOf[StartContainerRequest]) .asInstanceOf[StartContainerRequest] @@ -138,7 +151,8 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S cm.startContainer(startReq) } - private def setupDistributedCache(file: String, + private def setupDistributedCache( + file: String, rtype: LocalResourceType, localResources: HashMap[String, LocalResource], timestamp: String, @@ -153,12 +167,11 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S amJarRsrc.setSize(size.toLong) localResources(uri.getFragment()) = amJarRsrc } - - + def prepareLocalResources: HashMap[String, LocalResource] = { logInfo("Preparing Local resources") val localResources = HashMap[String, LocalResource]() - + if (System.getenv("SPARK_YARN_CACHE_FILES") != null) { val timeStamps = System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',') val fileSizes = System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',') @@ -180,30 +193,30 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S timeStamps(i), fileSizes(i), visibilities(i)) } } - + logInfo("Prepared Local resources " + localResources) return localResources } - + def prepareEnvironment: HashMap[String, String] = { val env = new HashMap[String, String]() Client.populateClasspath(yarnConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env) - // allow users to specify some environment variables + // Allow users to specify some environment variables Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV")) System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v } return env } - + def connectToCM: ContainerManager = { val cmHostPortStr = container.getNodeId().getHost() + ":" + container.getNodeId().getPort() val cmAddress = NetUtils.createSocketAddr(cmHostPortStr) logInfo("Connecting to ContainerManager at " + cmHostPortStr) - // use doAs and remoteUser here so we can add the container token and not - // pollute the current users credentials with all of the individual container tokens + // Use doAs and remoteUser here so we can add the container token and not pollute the current + // users credentials with all of the individual container tokens val user = UserGroupInformation.createRemoteUser(container.getId().toString()); val containerToken = container.getContainerToken(); if (containerToken != null) { @@ -219,5 +232,5 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S }); return proxy; } - + } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 25da9aa917..a9fbc27613 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -17,55 +17,70 @@ package org.apache.spark.deploy.yarn +import java.lang.{Boolean => JBoolean} +import java.util.{Collections, Set => JSet} +import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap} +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection +import scala.collection.JavaConversions._ +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} + import org.apache.spark.Logging -import org.apache.spark.util.Utils import org.apache.spark.scheduler.SplitInfo -import scala.collection -import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId, ContainerId, Priority, Resource, ResourceRequest, ContainerStatus, Container} import org.apache.spark.scheduler.cluster.{ClusterScheduler, CoarseGrainedSchedulerBackend} +import org.apache.spark.util.Utils + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.api.AMRMProtocol +import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId} +import org.apache.hadoop.yarn.api.records.{Container, ContainerId, ContainerStatus} +import org.apache.hadoop.yarn.api.records.{Priority, Resource, ResourceRequest} import org.apache.hadoop.yarn.api.protocolrecords.{AllocateRequest, AllocateResponse} import org.apache.hadoop.yarn.util.{RackResolver, Records} -import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap} -import java.util.concurrent.atomic.AtomicInteger -import org.apache.hadoop.yarn.api.AMRMProtocol -import collection.JavaConversions._ -import collection.mutable.{ArrayBuffer, HashMap, HashSet} -import org.apache.hadoop.conf.Configuration -import java.util.{Collections, Set => JSet} -import java.lang.{Boolean => JBoolean} + object AllocationType extends Enumeration ("HOST", "RACK", "ANY") { type AllocationType = Value val HOST, RACK, ANY = Value } -// too many params ? refactor it 'somehow' ? -// needs to be mt-safe -// Need to refactor this to make it 'cleaner' ... right now, all computation is reactive : should make it -// more proactive and decoupled. +// TODO: +// Too many params. +// Needs to be mt-safe +// Need to refactor this to make it 'cleaner' ... right now, all computation is reactive - should +// make it more proactive and decoupled. + // Note that right now, we assume all node asks as uniform in terms of capabilities and priority // Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for more info // on how we are requesting for containers. -private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceManager: AMRMProtocol, - val appAttemptId: ApplicationAttemptId, - val maxWorkers: Int, val workerMemory: Int, val workerCores: Int, - val preferredHostToCount: Map[String, Int], - val preferredRackToCount: Map[String, Int]) +private[yarn] class YarnAllocationHandler( + val conf: Configuration, + val resourceManager: AMRMProtocol, + val appAttemptId: ApplicationAttemptId, + val maxWorkers: Int, + val workerMemory: Int, + val workerCores: Int, + val preferredHostToCount: Map[String, Int], + val preferredRackToCount: Map[String, Int]) extends Logging { - - // These three are locked on allocatedHostToContainersMap. Complementary data structures // allocatedHostToContainersMap : containers which are running : host, Set - // allocatedContainerToHostMap: container to host mapping - private val allocatedHostToContainersMap = new HashMap[String, collection.mutable.Set[ContainerId]]() + // allocatedContainerToHostMap: container to host mapping. + private val allocatedHostToContainersMap = + new HashMap[String, collection.mutable.Set[ContainerId]]() + private val allocatedContainerToHostMap = new HashMap[ContainerId, String]() - // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an allocated node) - // As with the two data structures above, tightly coupled with them, and to be locked on allocatedHostToContainersMap + + // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an + // allocated node) + // As with the two data structures above, tightly coupled with them, and to be locked on + // allocatedHostToContainersMap private val allocatedRackCount = new HashMap[String, Int]() - // containers which have been released. + // Containers which have been released. private val releasedContainerList = new CopyOnWriteArrayList[ContainerId]() - // containers to be released in next request to RM + // Containers to be released in next request to RM private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean] private val numWorkersRunning = new AtomicInteger() @@ -75,13 +90,13 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM def getNumWorkersRunning: Int = numWorkersRunning.intValue - def isResourceConstraintSatisfied(container: Container): Boolean = { container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) } def allocateContainers(workersToRequest: Int) { - // We need to send the request only once from what I understand ... but for now, not modifying this much. + // We need to send the request only once from what I understand ... but for now, not modifying + // this much. // Keep polling the Resource Manager for containers val amResp = allocateWorkerResources(workersToRequest).getAMResponse @@ -97,7 +112,7 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM val hostToContainers = new HashMap[String, ArrayBuffer[Container]]() - // ignore if not satisfying constraints { + // Ignore if not satisfying constraints { for (container <- _allocatedContainers) { if (isResourceConstraintSatisfied(container)) { // allocatedContainers += container @@ -111,8 +126,7 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM else releasedContainerList.add(container.getId()) } - // Find the appropriate containers to use - // Slightly non trivial groupBy I guess ... + // Find the appropriate containers to use. Slightly non trivial groupBy ... val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]() val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]() val offRackContainers = new HashMap[String, ArrayBuffer[Container]]() @@ -132,7 +146,7 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM remainingContainers = null } else if (requiredHostCount > 0) { - // container list has more containers than we need for data locality. + // Container list has more containers than we need for data locality. // Split into two : data local container count of (remainingContainers.size - requiredHostCount) // and rest as remainingContainer val (dataLocal, remaining) = remainingContainers.splitAt(remainingContainers.size - requiredHostCount) @@ -140,13 +154,13 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM // remainingContainers = remaining // yarn has nasty habit of allocating a tonne of containers on a host - discourage this : - // add remaining to release list. If we have insufficient containers, next allocation cycle - // will reallocate (but wont treat it as data local) + // add remaining to release list. If we have insufficient containers, next allocation + // cycle will reallocate (but wont treat it as data local) for (container <- remaining) releasedContainerList.add(container.getId()) remainingContainers = null } - // now rack local + // Now rack local if (remainingContainers != null){ val rack = YarnAllocationHandler.lookupRack(conf, candidateHost) @@ -159,15 +173,17 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM if (requiredRackCount >= remainingContainers.size){ // Add all to dataLocalContainers dataLocalContainers.put(rack, remainingContainers) - // all consumed + // All consumed remainingContainers = null } else if (requiredRackCount > 0) { // container list has more containers than we need for data locality. // Split into two : data local container count of (remainingContainers.size - requiredRackCount) // and rest as remainingContainer - val (rackLocal, remaining) = remainingContainers.splitAt(remainingContainers.size - requiredRackCount) - val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack, new ArrayBuffer[Container]()) + val (rackLocal, remaining) = remainingContainers.splitAt( + remainingContainers.size - requiredRackCount) + val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack, + new ArrayBuffer[Container]()) existingRackLocal ++= rackLocal remainingContainers = remaining @@ -183,8 +199,8 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM // Now that we have split the containers into various groups, go through them in order : // first host local, then rack local and then off rack (everything else). - // Note that the list we create below tries to ensure that not all containers end up within a host - // if there are sufficiently large number of hosts/containers. + // Note that the list we create below tries to ensure that not all containers end up within a + // host if there are sufficiently large number of hosts/containers. val allocatedContainers = new ArrayBuffer[Container](_allocatedContainers.size) allocatedContainers ++= ClusterScheduler.prioritizeContainers(dataLocalContainers) @@ -197,7 +213,7 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM val workerHostname = container.getNodeId.getHost val containerId = container.getId - assert (container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)) + assert(container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)) if (numWorkersRunningNow > maxWorkers) { logInfo("Ignoring container " + containerId + " at host " + workerHostname + @@ -207,19 +223,22 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM numWorkersRunning.decrementAndGet() } else { - // deallocate + allocate can result in reusing id's wrongly - so use a different counter (workerIdCounter) + // Deallocate + allocate can result in reusing id's wrongly - so use a different counter + // (workerIdCounter) val workerId = workerIdCounter.incrementAndGet().toString val driverUrl = "akka://spark@%s:%s/user/%s".format( System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) logInfo("launching container on " + containerId + " host " + workerHostname) - // just to be safe, simply remove it from pendingReleaseContainers. Should not be there, but .. + // Just to be safe, simply remove it from pendingReleaseContainers. + // Should not be there, but .. pendingReleaseContainers.remove(containerId) val rack = YarnAllocationHandler.lookupRack(conf, workerHostname) allocatedHostToContainersMap.synchronized { - val containerSet = allocatedHostToContainersMap.getOrElseUpdate(workerHostname, new HashSet[ContainerId]()) + val containerSet = allocatedHostToContainersMap.getOrElseUpdate(workerHostname, + new HashSet[ContainerId]()) containerSet += containerId allocatedContainerToHostMap.put(containerId, workerHostname) @@ -251,7 +270,7 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM pendingReleaseContainers.remove(containerId) } else { - // simply decrement count - next iteration of ReporterThread will take care of allocating ! + // Simply decrement count - next iteration of ReporterThread will take care of allocating. numWorkersRunning.decrementAndGet() logInfo("Container completed ? nodeId: " + containerId + ", state " + completedContainer.getState + " httpaddress: " + completedContainer.getDiagnostics) @@ -271,7 +290,7 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM allocatedContainerToHostMap -= containerId - // doing this within locked context, sigh ... move to outside ? + // Doing this within locked context, sigh ... move to outside ? val rack = YarnAllocationHandler.lookupRack(conf, host) if (rack != null) { val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1 @@ -350,17 +369,24 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM val requiredCount = candidateCount - allocatedContainersOnHost(candidateHost) if (requiredCount > 0) { - hostContainerRequests += - createResourceRequest(AllocationType.HOST, candidateHost, requiredCount, YarnAllocationHandler.PRIORITY) + hostContainerRequests += createResourceRequest( + AllocationType.HOST, + candidateHost, + requiredCount, + YarnAllocationHandler.PRIORITY) } } - val rackContainerRequests: List[ResourceRequest] = createRackResourceRequests(hostContainerRequests.toList) + val rackContainerRequests: List[ResourceRequest] = createRackResourceRequests( + hostContainerRequests.toList) - val anyContainerRequests: ResourceRequest = - createResourceRequest(AllocationType.ANY, null, numWorkers, YarnAllocationHandler.PRIORITY) + val anyContainerRequests: ResourceRequest = createResourceRequest( + AllocationType.ANY, + resource = null, + numWorkers, + YarnAllocationHandler.PRIORITY) - val containerRequests: ArrayBuffer[ResourceRequest] = - new ArrayBuffer[ResourceRequest](hostContainerRequests.size() + rackContainerRequests.size() + 1) + val containerRequests: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest]( + hostContainerRequests.size() + rackContainerRequests.size() + 1) containerRequests ++= hostContainerRequests containerRequests ++= rackContainerRequests @@ -378,55 +404,60 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM val releasedContainerList = createReleasedContainerList() req.addAllReleases(releasedContainerList) - - if (numWorkers > 0) { - logInfo("Allocating " + numWorkers + " worker containers with " + (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + " of memory each.") + logInfo("Allocating %d worker containers with %d of memory each.").format(numWorkers, + workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) } else { logDebug("Empty allocation req .. release : " + releasedContainerList) } - for (req <- resourceRequests) { - logInfo("rsrcRequest ... host : " + req.getHostName + ", numContainers : " + req.getNumContainers + - ", p = " + req.getPriority().getPriority + ", capability: " + req.getCapability) + for (request <- resourceRequests) { + logInfo("ResourceRequest (host : %s, num containers: %d, priority = %d , capability : %s)"). + format( + request.getHostName, + request.getNumContainers, + request.getPriority, + request.getCapability) } resourceManager.allocate(req) } - private def createResourceRequest(requestType: AllocationType.AllocationType, - resource:String, numWorkers: Int, priority: Int): ResourceRequest = { + private def createResourceRequest( + requestType: AllocationType.AllocationType, + resource:String, + numWorkers: Int, + priority: Int): ResourceRequest = { // If hostname specified, we need atleast two requests - node local and rack local. // There must be a third request - which is ANY : that will be specially handled. requestType match { case AllocationType.HOST => { - assert (YarnAllocationHandler.ANY_HOST != resource) - + assert(YarnAllocationHandler.ANY_HOST != resource) val hostname = resource val nodeLocal = createResourceRequestImpl(hostname, numWorkers, priority) - // add to host->rack mapping + // Add to host->rack mapping YarnAllocationHandler.populateRackInfo(conf, hostname) nodeLocal } - case AllocationType.RACK => { val rack = resource createResourceRequestImpl(rack, numWorkers, priority) } - - case AllocationType.ANY => { - createResourceRequestImpl(YarnAllocationHandler.ANY_HOST, numWorkers, priority) - } - - case _ => throw new IllegalArgumentException("Unexpected/unsupported request type .. " + requestType) + case AllocationType.ANY => createResourceRequestImpl( + YarnAllocationHandler.ANY_HOST, numWorkers, priority) + case _ => throw new IllegalArgumentException( + "Unexpected/unsupported request type: " + requestType) } } - private def createResourceRequestImpl(hostname:String, numWorkers: Int, priority: Int): ResourceRequest = { + private def createResourceRequestImpl( + hostname:String, + numWorkers: Int, + priority: Int): ResourceRequest = { val rsrcRequest = Records.newRecord(classOf[ResourceRequest]) val memCapability = Records.newRecord(classOf[Resource]) @@ -447,11 +478,11 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM def createReleasedContainerList(): ArrayBuffer[ContainerId] = { val retval = new ArrayBuffer[ContainerId](1) - // iterator on COW list ... + // Iterator on COW list ... for (container <- releasedContainerList.iterator()){ retval += container } - // remove from the original list. + // Remove from the original list. if (! retval.isEmpty) { releasedContainerList.removeAll(retval) for (v <- retval) pendingReleaseContainers.put(v, true) @@ -466,14 +497,14 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM object YarnAllocationHandler { val ANY_HOST = "*" - // all requests are issued with same priority : we do not (yet) have any distinction between + // All requests are issued with same priority : we do not (yet) have any distinction between // request types (like map/reduce in hadoop for example) val PRIORITY = 1 // Additional memory overhead - in mb val MEMORY_OVERHEAD = 384 - // host to rack map - saved from allocation requests + // Host to rack map - saved from allocation requests // We are expecting this not to change. // Note that it is possible for this to change : and RM will indicate that to us via update // response to allocate. But we are punting on handling that for now. @@ -481,38 +512,69 @@ object YarnAllocationHandler { private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]() - def newAllocator(conf: Configuration, - resourceManager: AMRMProtocol, appAttemptId: ApplicationAttemptId, - args: ApplicationMasterArguments): YarnAllocationHandler = { - - new YarnAllocationHandler(conf, resourceManager, appAttemptId, args.numWorkers, - args.workerMemory, args.workerCores, Map[String, Int](), Map[String, Int]()) + def newAllocator( + conf: Configuration, + resourceManager: AMRMProtocol, + appAttemptId: ApplicationAttemptId, + args: ApplicationMasterArguments): YarnAllocationHandler = { + + new YarnAllocationHandler( + conf, + resourceManager, + appAttemptId, + args.numWorkers, + args.workerMemory, + args.workerCores, + Map[String, Int](), + Map[String, Int]()) } - def newAllocator(conf: Configuration, - resourceManager: AMRMProtocol, appAttemptId: ApplicationAttemptId, - args: ApplicationMasterArguments, - map: collection.Map[String, collection.Set[SplitInfo]]): YarnAllocationHandler = { + def newAllocator( + conf: Configuration, + resourceManager: AMRMProtocol, + appAttemptId: ApplicationAttemptId, + args: ApplicationMasterArguments, + map: collection.Map[String, + collection.Set[SplitInfo]]): YarnAllocationHandler = { val (hostToCount, rackToCount) = generateNodeToWeight(conf, map) - - new YarnAllocationHandler(conf, resourceManager, appAttemptId, args.numWorkers, - args.workerMemory, args.workerCores, hostToCount, rackToCount) + new YarnAllocationHandler( + conf, + resourceManager, + appAttemptId, + args.numWorkers, + args.workerMemory, + args.workerCores, + hostToCount, + rackToCount) } - def newAllocator(conf: Configuration, - resourceManager: AMRMProtocol, appAttemptId: ApplicationAttemptId, - maxWorkers: Int, workerMemory: Int, workerCores: Int, - map: collection.Map[String, collection.Set[SplitInfo]]): YarnAllocationHandler = { + def newAllocator( + conf: Configuration, + resourceManager: AMRMProtocol, + appAttemptId: ApplicationAttemptId, + maxWorkers: Int, + workerMemory: Int, + workerCores: Int, + map: collection.Map[String, collection.Set[SplitInfo]]): YarnAllocationHandler = { val (hostToCount, rackToCount) = generateNodeToWeight(conf, map) - new YarnAllocationHandler(conf, resourceManager, appAttemptId, maxWorkers, - workerMemory, workerCores, hostToCount, rackToCount) + new YarnAllocationHandler( + conf, + resourceManager, + appAttemptId, + maxWorkers, + workerMemory, + workerCores, + hostToCount, + rackToCount) } // A simple method to copy the split info map. - private def generateNodeToWeight(conf: Configuration, input: collection.Map[String, collection.Set[SplitInfo]]) : + private def generateNodeToWeight( + conf: Configuration, + input: collection.Map[String, collection.Set[SplitInfo]]) : // host to count, rack to count (Map[String, Int], Map[String, Int]) = { @@ -536,7 +598,7 @@ object YarnAllocationHandler { } def lookupRack(conf: Configuration, host: String): String = { - if (! hostToRack.contains(host)) populateRackInfo(conf, host) + if (!hostToRack.contains(host)) populateRackInfo(conf, host) hostToRack.get(host) } @@ -559,10 +621,12 @@ object YarnAllocationHandler { val rack = rackInfo.getNetworkLocation hostToRack.put(hostname, rack) if (! rackToHostSet.containsKey(rack)) { - rackToHostSet.putIfAbsent(rack, Collections.newSetFromMap(new ConcurrentHashMap[String, JBoolean]())) + rackToHostSet.putIfAbsent(rack, + Collections.newSetFromMap(new ConcurrentHashMap[String, JBoolean]())) } rackToHostSet.get(rack).add(hostname) + // TODO(harvey): Figure out this comment... // Since RackResolver caches, we are disabling this for now ... } /* else { // right ? Else we will keep calling rack resolver in case we cant resolve rack info ... -- cgit v1.2.3