diff options
Diffstat (limited to 'yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala')
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala | 29 |
1 files changed, 22 insertions, 7 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index d094302362..23742eab62 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -36,9 +36,11 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.Logging +import org.apache.spark.internal.config._ import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef} import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason} import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId import org.apache.spark.util.ThreadUtils /** @@ -61,7 +63,6 @@ private[yarn] class YarnAllocator( sparkConf: SparkConf, amClient: AMRMClient[ContainerRequest], appAttemptId: ApplicationAttemptId, - args: ApplicationMasterArguments, securityMgr: SecurityManager) extends Logging { @@ -83,8 +84,23 @@ private[yarn] class YarnAllocator( new ConcurrentHashMap[ContainerId, java.lang.Boolean]) @volatile private var numExecutorsRunning = 0 - // Used to generate a unique ID per executor - private var executorIdCounter = 0 + + /** + * Used to generate a unique ID per executor + * + * Init `executorIdCounter`. when AM restart, `executorIdCounter` will reset to 0. Then + * the id of new executor will start from 1, this will conflict with the executor has + * already created before. So, we should initialize the `executorIdCounter` by getting + * the max executorId from driver. + * + * And this situation of executorId conflict is just in yarn client mode, so this is an issue + * in yarn client mode. For more details, can check in jira. + * + * @see SPARK-12864 + */ + private var executorIdCounter: Int = + driverRef.askWithRetry[Int](RetrieveLastAllocatedExecutorId) + @volatile private var numExecutorsFailed = 0 @volatile private var targetNumExecutors = @@ -107,12 +123,12 @@ private[yarn] class YarnAllocator( private val containerIdToExecutorId = new HashMap[ContainerId, String] // Executor memory in MB. - protected val executorMemory = args.executorMemory + protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt // Additional memory overhead. protected val memoryOverhead: Int = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse( math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)).toInt // Number of cores per executor. - protected val executorCores = args.executorCores + protected val executorCores = sparkConf.get(EXECUTOR_CORES) // Resource capability requested for each executors private[yarn] val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores) @@ -132,11 +148,10 @@ private[yarn] class YarnAllocator( classOf[Array[String]], classOf[Array[String]], classOf[Priority], classOf[Boolean], classOf[String])) } catch { - case e: NoSuchMethodException => { + case e: NoSuchMethodException => logWarning(s"Node label expression $expr will be ignored because YARN version on" + " classpath does not support it.") None - } } } |