aboutsummaryrefslogtreecommitdiff
path: root/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
diff options
context:
space:
mode:
Diffstat (limited to 'yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala29
1 files changed, 22 insertions, 7 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index d094302362..23742eab62 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -36,9 +36,11 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason}
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RetrieveLastAllocatedExecutorId
import org.apache.spark.util.ThreadUtils
/**
@@ -61,7 +63,6 @@ private[yarn] class YarnAllocator(
sparkConf: SparkConf,
amClient: AMRMClient[ContainerRequest],
appAttemptId: ApplicationAttemptId,
- args: ApplicationMasterArguments,
securityMgr: SecurityManager)
extends Logging {
@@ -83,8 +84,23 @@ private[yarn] class YarnAllocator(
new ConcurrentHashMap[ContainerId, java.lang.Boolean])
@volatile private var numExecutorsRunning = 0
- // Used to generate a unique ID per executor
- private var executorIdCounter = 0
+
+ /**
+ * Used to generate a unique ID per executor
+ *
+ * Init `executorIdCounter`. when AM restart, `executorIdCounter` will reset to 0. Then
+ * the id of new executor will start from 1, this will conflict with the executor has
+ * already created before. So, we should initialize the `executorIdCounter` by getting
+ * the max executorId from driver.
+ *
+ * And this situation of executorId conflict is just in yarn client mode, so this is an issue
+ * in yarn client mode. For more details, can check in jira.
+ *
+ * @see SPARK-12864
+ */
+ private var executorIdCounter: Int =
+ driverRef.askWithRetry[Int](RetrieveLastAllocatedExecutorId)
+
@volatile private var numExecutorsFailed = 0
@volatile private var targetNumExecutors =
@@ -107,12 +123,12 @@ private[yarn] class YarnAllocator(
private val containerIdToExecutorId = new HashMap[ContainerId, String]
// Executor memory in MB.
- protected val executorMemory = args.executorMemory
+ protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
// Additional memory overhead.
protected val memoryOverhead: Int = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)).toInt
// Number of cores per executor.
- protected val executorCores = args.executorCores
+ protected val executorCores = sparkConf.get(EXECUTOR_CORES)
// Resource capability requested for each executors
private[yarn] val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores)
@@ -132,11 +148,10 @@ private[yarn] class YarnAllocator(
classOf[Array[String]], classOf[Array[String]], classOf[Priority], classOf[Boolean],
classOf[String]))
} catch {
- case e: NoSuchMethodException => {
+ case e: NoSuchMethodException =>
logWarning(s"Node label expression $expr will be ignored because YARN version on" +
" classpath does not support it.")
None
- }
}
}