aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala8
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala154
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala22
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala20
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala (renamed from core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala)70
-rw-r--r--yarn/src/test/scala/org/apache/spark/scheduler/cluster/ExtensionServiceIntegrationSuite.scala71
-rw-r--r--yarn/src/test/scala/org/apache/spark/scheduler/cluster/SimpleExtensionService.scala34
-rw-r--r--yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationAttemptId.scala48
-rw-r--r--yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationId.scala42
9 files changed, 431 insertions, 38 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 50ae7ffeec..13ef4dfd64 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -117,6 +117,10 @@ private[spark] class ApplicationMaster(
private var delegationTokenRenewerOption: Option[AMDelegationTokenRenewer] = None
+ def getAttemptId(): ApplicationAttemptId = {
+ client.getAttemptId()
+ }
+
final def run(): Int = {
try {
val appAttemptId = client.getAttemptId()
@@ -662,6 +666,10 @@ object ApplicationMaster extends Logging {
master.sparkContextStopped(sc)
}
+ private[spark] def getAttemptId(): ApplicationAttemptId = {
+ master.getAttemptId
+ }
+
}
/**
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala
new file mode 100644
index 0000000000..c064521845
--- /dev/null
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerExtensionService.scala
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
+
+import org.apache.spark.{Logging, SparkContext}
+import org.apache.spark.util.Utils
+
+/**
+ * An extension service that can be loaded into a Spark YARN scheduler.
+ * A Service that can be started and stopped.
+ *
+ * 1. For implementations to be loadable by `SchedulerExtensionServices`,
+ * they must provide an empty constructor.
+ * 2. The `stop()` operation MUST be idempotent, and succeed even if `start()` was
+ * never invoked.
+ */
+trait SchedulerExtensionService {
+
+ /**
+ * Start the extension service. This should be a no-op if
+ * called more than once.
+ * @param binding binding to the spark application and YARN
+ */
+ def start(binding: SchedulerExtensionServiceBinding): Unit
+
+ /**
+ * Stop the service
+ * The `stop()` operation MUST be idempotent, and succeed even if `start()` was
+ * never invoked.
+ */
+ def stop(): Unit
+}
+
+/**
+ * Binding information for a [[SchedulerExtensionService]].
+ *
+ * The attempt ID will be set if the service is started within a YARN application master;
+ * there is then a different attempt ID for every time that AM is restarted.
+ * When the service binding is instantiated in client mode, there's no attempt ID, as it lacks
+ * this information.
+ * @param sparkContext current spark context
+ * @param applicationId YARN application ID
+ * @param attemptId YARN attemptID. This will always be unset in client mode, and always set in
+ * cluster mode.
+ */
+case class SchedulerExtensionServiceBinding(
+ sparkContext: SparkContext,
+ applicationId: ApplicationId,
+ attemptId: Option[ApplicationAttemptId] = None)
+
+/**
+ * Container for [[SchedulerExtensionService]] instances.
+ *
+ * Loads Extension Services from the configuration property
+ * `"spark.yarn.services"`, instantiates and starts them.
+ * When stopped, it stops all child entries.
+ *
+ * The order in which child extension services are started and stopped
+ * is undefined.
+ */
+private[spark] class SchedulerExtensionServices extends SchedulerExtensionService
+ with Logging {
+ private var serviceOption: Option[String] = None
+ private var services: List[SchedulerExtensionService] = Nil
+ private val started = new AtomicBoolean(false)
+ private var binding: SchedulerExtensionServiceBinding = _
+
+ /**
+ * Binding operation will load the named services and call bind on them too; the
+ * entire set of services are then ready for `init()` and `start()` calls.
+ *
+ * @param binding binding to the spark application and YARN
+ */
+ def start(binding: SchedulerExtensionServiceBinding): Unit = {
+ if (started.getAndSet(true)) {
+ logWarning("Ignoring re-entrant start operation")
+ return
+ }
+ require(binding.sparkContext != null, "Null context parameter")
+ require(binding.applicationId != null, "Null appId parameter")
+ this.binding = binding
+ val sparkContext = binding.sparkContext
+ val appId = binding.applicationId
+ val attemptId = binding.attemptId
+ logInfo(s"Starting Yarn extension services with app $appId and attemptId $attemptId")
+
+ serviceOption = sparkContext.getConf.getOption(SchedulerExtensionServices.SPARK_YARN_SERVICES)
+ services = serviceOption
+ .map { s =>
+ s.split(",").map(_.trim()).filter(!_.isEmpty)
+ .map { sClass =>
+ val instance = Utils.classForName(sClass)
+ .newInstance()
+ .asInstanceOf[SchedulerExtensionService]
+ // bind this service
+ instance.start(binding)
+ logInfo(s"Service $sClass started")
+ instance
+ }.toList
+ }.getOrElse(Nil)
+ }
+
+ /**
+ * Get the list of services.
+ *
+ * @return a list of services; Nil until the service is started
+ */
+ def getServices: List[SchedulerExtensionService] = services
+
+ /**
+ * Stop the services; idempotent.
+ *
+ */
+ override def stop(): Unit = {
+ if (started.getAndSet(false)) {
+ logInfo(s"Stopping $this")
+ services.foreach { s =>
+ Utils.tryLogNonFatalError(s.stop())
+ }
+ }
+ }
+
+ override def toString(): String = s"""SchedulerExtensionServices
+ |(serviceOption=$serviceOption,
+ | services=$services,
+ | started=$started)""".stripMargin
+}
+
+private[spark] object SchedulerExtensionServices {
+
+ /**
+ * A list of comma separated services to instantiate in the scheduler
+ */
+ val SPARK_YARN_SERVICES = "spark.yarn.services"
+}
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 20771f6554..0e27a2665e 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -19,7 +19,7 @@ package org.apache.spark.scheduler.cluster
import scala.collection.mutable.ArrayBuffer
-import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
+import org.apache.hadoop.yarn.api.records.YarnApplicationState
import org.apache.spark.{SparkException, Logging, SparkContext}
import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnSparkHadoopUtil}
@@ -33,7 +33,6 @@ private[spark] class YarnClientSchedulerBackend(
with Logging {
private var client: Client = null
- private var appId: ApplicationId = null
private var monitorThread: MonitorThread = null
/**
@@ -54,13 +53,12 @@ private[spark] class YarnClientSchedulerBackend(
val args = new ClientArguments(argsArrayBuf.toArray, conf)
totalExpectedExecutors = args.numExecutors
client = new Client(args, conf)
- appId = client.submitApplication()
+ bindToYarn(client.submitApplication(), None)
// SPARK-8687: Ensure all necessary properties have already been set before
// we initialize our driver scheduler backend, which serves these properties
// to the executors
super.start()
-
waitForApplication()
// SPARK-8851: In yarn-client mode, the AM still does the credentials refresh. The driver
@@ -116,8 +114,8 @@ private[spark] class YarnClientSchedulerBackend(
* This assumes both `client` and `appId` have already been set.
*/
private def waitForApplication(): Unit = {
- assert(client != null && appId != null, "Application has not been submitted yet!")
- val (state, _) = client.monitorApplication(appId, returnOnRunning = true) // blocking
+ assert(client != null && appId.isDefined, "Application has not been submitted yet!")
+ val (state, _) = client.monitorApplication(appId.get, returnOnRunning = true) // blocking
if (state == YarnApplicationState.FINISHED ||
state == YarnApplicationState.FAILED ||
state == YarnApplicationState.KILLED) {
@@ -125,7 +123,7 @@ private[spark] class YarnClientSchedulerBackend(
"It might have been killed or unable to launch application master.")
}
if (state == YarnApplicationState.RUNNING) {
- logInfo(s"Application $appId has started running.")
+ logInfo(s"Application ${appId.get} has started running.")
}
}
@@ -141,7 +139,7 @@ private[spark] class YarnClientSchedulerBackend(
override def run() {
try {
- val (state, _) = client.monitorApplication(appId, logApplicationReport = false)
+ val (state, _) = client.monitorApplication(appId.get, logApplicationReport = false)
logError(s"Yarn application has already exited with state $state!")
allowInterrupt = false
sc.stop()
@@ -163,7 +161,7 @@ private[spark] class YarnClientSchedulerBackend(
* This assumes both `client` and `appId` have already been set.
*/
private def asyncMonitorApplication(): MonitorThread = {
- assert(client != null && appId != null, "Application has not been submitted yet!")
+ assert(client != null && appId.isDefined, "Application has not been submitted yet!")
val t = new MonitorThread
t.setName("Yarn application state monitor")
t.setDaemon(true)
@@ -193,10 +191,4 @@ private[spark] class YarnClientSchedulerBackend(
logInfo("Stopped")
}
- override def applicationId(): String = {
- Option(appId).map(_.toString).getOrElse {
- logWarning("Application ID is not initialized yet.")
- super.applicationId
- }
- }
}
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index 50b699f11b..ced597bed3 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -21,7 +21,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.spark.SparkContext
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
+import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnSparkHadoopUtil}
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.Utils
@@ -31,26 +31,12 @@ private[spark] class YarnClusterSchedulerBackend(
extends YarnSchedulerBackend(scheduler, sc) {
override def start() {
+ val attemptId = ApplicationMaster.getAttemptId
+ bindToYarn(attemptId.getApplicationId(), Some(attemptId))
super.start()
totalExpectedExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sc.conf)
}
- override def applicationId(): String =
- // In YARN Cluster mode, the application ID is expected to be set, so log an error if it's
- // not found.
- sc.getConf.getOption("spark.yarn.app.id").getOrElse {
- logError("Application ID is not set.")
- super.applicationId
- }
-
- override def applicationAttemptId(): Option[String] =
- // In YARN Cluster mode, the attempt ID is expected to be set, so log an error if it's
- // not found.
- sc.getConf.getOption("spark.yarn.app.attemptId").orElse {
- logError("Application attempt ID is not set.")
- super.applicationAttemptId
- }
-
override def getDriverLogUrls: Option[Map[String, String]] = {
var driverLogs: Option[Map[String, String]] = None
try {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 80da37b09b..e3dd87798f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -17,17 +17,17 @@
package org.apache.spark.scheduler.cluster
-import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.{Future, ExecutionContext}
+import scala.concurrent.{ExecutionContext, Future}
+import scala.util.control.NonFatal
+
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.rpc._
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.scheduler._
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.ui.JettyUtils
-import org.apache.spark.util.{ThreadUtils, RpcUtils}
-
-import scala.util.control.NonFatal
+import org.apache.spark.util.{RpcUtils, ThreadUtils}
/**
* Abstract Yarn scheduler backend that contains common logic
@@ -51,6 +51,64 @@ private[spark] abstract class YarnSchedulerBackend(
private implicit val askTimeout = RpcUtils.askRpcTimeout(sc.conf)
+ /** Application ID. */
+ protected var appId: Option[ApplicationId] = None
+
+ /** Attempt ID. This is unset for client-mode schedulers */
+ private var attemptId: Option[ApplicationAttemptId] = None
+
+ /** Scheduler extension services. */
+ private val services: SchedulerExtensionServices = new SchedulerExtensionServices()
+
+ /**
+ * Bind to YARN. This *must* be done before calling [[start()]].
+ *
+ * @param appId YARN application ID
+ * @param attemptId Optional YARN attempt ID
+ */
+ protected def bindToYarn(appId: ApplicationId, attemptId: Option[ApplicationAttemptId]): Unit = {
+ this.appId = Some(appId)
+ this.attemptId = attemptId
+ }
+
+ override def start() {
+ require(appId.isDefined, "application ID unset")
+ val binding = SchedulerExtensionServiceBinding(sc, appId.get, attemptId)
+ services.start(binding)
+ super.start()
+ }
+
+ override def stop(): Unit = {
+ try {
+ super.stop()
+ } finally {
+ services.stop()
+ }
+ }
+
+ /**
+ * Get the attempt ID for this run, if the cluster manager supports multiple
+ * attempts. Applications run in client mode will not have attempt IDs.
+ *
+ * @return The application attempt id, if available.
+ */
+ override def applicationAttemptId(): Option[String] = {
+ attemptId.map(_.toString)
+ }
+
+ /**
+ * Get an application ID associated with the job.
+ * This returns the string value of [[appId]] if set, otherwise
+ * the locally-generated ID from the superclass.
+ * @return The application ID
+ */
+ override def applicationId(): String = {
+ appId.map(_.toString).getOrElse {
+ logWarning("Application ID is not initialized yet.")
+ super.applicationId
+ }
+ }
+
/**
* Request executors from the ApplicationMaster by specifying the total number desired.
* This includes executors already pending or running.
diff --git a/yarn/src/test/scala/org/apache/spark/scheduler/cluster/ExtensionServiceIntegrationSuite.scala b/yarn/src/test/scala/org/apache/spark/scheduler/cluster/ExtensionServiceIntegrationSuite.scala
new file mode 100644
index 0000000000..b4d1b0a3d2
--- /dev/null
+++ b/yarn/src/test/scala/org/apache/spark/scheduler/cluster/ExtensionServiceIntegrationSuite.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.{LocalSparkContext, Logging, SparkConf, SparkContext, SparkFunSuite}
+
+/**
+ * Test the integration with [[SchedulerExtensionServices]]
+ */
+class ExtensionServiceIntegrationSuite extends SparkFunSuite
+ with LocalSparkContext with BeforeAndAfter
+ with Logging {
+
+ val applicationId = new StubApplicationId(0, 1111L)
+ val attemptId = new StubApplicationAttemptId(applicationId, 1)
+
+ /*
+ * Setup phase creates the spark context
+ */
+ before {
+ val sparkConf = new SparkConf()
+ sparkConf.set(SchedulerExtensionServices.SPARK_YARN_SERVICES,
+ classOf[SimpleExtensionService].getName())
+ sparkConf.setMaster("local").setAppName("ExtensionServiceIntegrationSuite")
+ sc = new SparkContext(sparkConf)
+ }
+
+ test("Instantiate") {
+ val services = new SchedulerExtensionServices()
+ assertResult(Nil, "non-nil service list") {
+ services.getServices
+ }
+ services.start(SchedulerExtensionServiceBinding(sc, applicationId))
+ services.stop()
+ }
+
+ test("Contains SimpleExtensionService Service") {
+ val services = new SchedulerExtensionServices()
+ try {
+ services.start(SchedulerExtensionServiceBinding(sc, applicationId))
+ val serviceList = services.getServices
+ assert(serviceList.nonEmpty, "empty service list")
+ val (service :: Nil) = serviceList
+ val simpleService = service.asInstanceOf[SimpleExtensionService]
+ assert(simpleService.started.get, "service not started")
+ services.stop()
+ assert(!simpleService.started.get, "service not stopped")
+ } finally {
+ services.stop()
+ }
+ }
+}
+
+
diff --git a/yarn/src/test/scala/org/apache/spark/scheduler/cluster/SimpleExtensionService.scala b/yarn/src/test/scala/org/apache/spark/scheduler/cluster/SimpleExtensionService.scala
new file mode 100644
index 0000000000..9b8c98cda8
--- /dev/null
+++ b/yarn/src/test/scala/org/apache/spark/scheduler/cluster/SimpleExtensionService.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+private[spark] class SimpleExtensionService extends SchedulerExtensionService {
+
+ /** started flag; set in the `start()` call, stopped in `stop()`. */
+ val started = new AtomicBoolean(false)
+
+ override def start(binding: SchedulerExtensionServiceBinding): Unit = {
+ started.set(true)
+ }
+
+ override def stop(): Unit = {
+ started.set(false)
+ }
+}
diff --git a/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationAttemptId.scala b/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationAttemptId.scala
new file mode 100644
index 0000000000..4b57b9509a
--- /dev/null
+++ b/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationAttemptId.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.hadoop.yarn.api.records.{ApplicationAttemptId, ApplicationId}
+
+/**
+ * A stub application ID; can be set in constructor and/or updated later.
+ * @param applicationId application ID
+ * @param attempt an attempt counter
+ */
+class StubApplicationAttemptId(var applicationId: ApplicationId, var attempt: Int)
+ extends ApplicationAttemptId {
+
+ override def setApplicationId(appID: ApplicationId): Unit = {
+ applicationId = appID
+ }
+
+ override def getAttemptId: Int = {
+ attempt
+ }
+
+ override def setAttemptId(attemptId: Int): Unit = {
+ attempt = attemptId
+ }
+
+ override def getApplicationId: ApplicationId = {
+ applicationId
+ }
+
+ override def build(): Unit = {
+ }
+}
diff --git a/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationId.scala b/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationId.scala
new file mode 100644
index 0000000000..bffa0e09be
--- /dev/null
+++ b/yarn/src/test/scala/org/apache/spark/scheduler/cluster/StubApplicationId.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster
+
+import org.apache.hadoop.yarn.api.records.ApplicationId
+
+/**
+ * Simple Testing Application Id; ID and cluster timestamp are set in constructor
+ * and cannot be updated.
+ * @param id app id
+ * @param clusterTimestamp timestamp
+ */
+private[spark] class StubApplicationId(id: Int, clusterTimestamp: Long) extends ApplicationId {
+ override def getId: Int = {
+ id
+ }
+
+ override def getClusterTimestamp: Long = {
+ clusterTimestamp
+ }
+
+ override def setId(id: Int): Unit = {}
+
+ override def setClusterTimestamp(clusterTimestamp: Long): Unit = {}
+
+ override def build(): Unit = {}
+}