aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala43
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala10
-rw-r--r--yarn/src/test/resources/log4j.properties7
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala127
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala76
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala4
6 files changed, 193 insertions, 74 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index eb3b7fb885..cec81b9406 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -55,8 +55,8 @@ import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
import org.apache.hadoop.yarn.util.Records
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException}
+import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils}
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.launcher.YarnCommandBuilderUtils
import org.apache.spark.util.Utils
private[spark] class Client(
@@ -70,8 +70,6 @@ private[spark] class Client(
def this(clientArgs: ClientArguments, spConf: SparkConf) =
this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)
- def this(clientArgs: ClientArguments) = this(clientArgs, new SparkConf())
-
private val yarnClient = YarnClient.createYarnClient
private val yarnConf = new YarnConfiguration(hadoopConf)
private var credentials: Credentials = null
@@ -84,10 +82,27 @@ private[spark] class Client(
private var principal: String = null
private var keytab: String = null
+ private val launcherBackend = new LauncherBackend() {
+ override def onStopRequest(): Unit = {
+ if (isClusterMode && appId != null) {
+ yarnClient.killApplication(appId)
+ } else {
+ setState(SparkAppHandle.State.KILLED)
+ stop()
+ }
+ }
+ }
private val fireAndForget = isClusterMode &&
!sparkConf.getBoolean("spark.yarn.submit.waitAppCompletion", true)
+ private var appId: ApplicationId = null
+
+ def reportLauncherState(state: SparkAppHandle.State): Unit = {
+ launcherBackend.setState(state)
+ }
+
def stop(): Unit = {
+ launcherBackend.close()
yarnClient.stop()
// Unset YARN mode system env variable, to allow switching between cluster types.
System.clearProperty("SPARK_YARN_MODE")
@@ -103,6 +118,7 @@ private[spark] class Client(
def submitApplication(): ApplicationId = {
var appId: ApplicationId = null
try {
+ launcherBackend.connect()
// Setup the credentials before doing anything else,
// so we have don't have issues at any point.
setupCredentials()
@@ -116,6 +132,8 @@ private[spark] class Client(
val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
appId = newAppResponse.getApplicationId()
+ reportLauncherState(SparkAppHandle.State.SUBMITTED)
+ launcherBackend.setAppId(appId.toString())
// Verify whether the cluster has enough resources for our AM
verifyClusterResources(newAppResponse)
@@ -881,6 +899,20 @@ private[spark] class Client(
}
}
+ if (lastState != state) {
+ state match {
+ case YarnApplicationState.RUNNING =>
+ reportLauncherState(SparkAppHandle.State.RUNNING)
+ case YarnApplicationState.FINISHED =>
+ reportLauncherState(SparkAppHandle.State.FINISHED)
+ case YarnApplicationState.FAILED =>
+ reportLauncherState(SparkAppHandle.State.FAILED)
+ case YarnApplicationState.KILLED =>
+ reportLauncherState(SparkAppHandle.State.KILLED)
+ case _ =>
+ }
+ }
+
if (state == YarnApplicationState.FINISHED ||
state == YarnApplicationState.FAILED ||
state == YarnApplicationState.KILLED) {
@@ -928,8 +960,8 @@ private[spark] class Client(
* throw an appropriate SparkException.
*/
def run(): Unit = {
- val appId = submitApplication()
- if (fireAndForget) {
+ this.appId = submitApplication()
+ if (!launcherBackend.isConnected() && fireAndForget) {
val report = getApplicationReport(appId)
val state = report.getYarnApplicationState
logInfo(s"Application report for $appId (state: $state)")
@@ -971,6 +1003,7 @@ private[spark] class Client(
}
object Client extends Logging {
+
def main(argStrings: Array[String]) {
if (!sys.props.contains("SPARK_SUBMIT")) {
logWarning("WARNING: This client is deprecated and will be removed in a " +
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 36d5759554..20771f6554 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
import org.apache.spark.{SparkException, Logging, SparkContext}
import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnSparkHadoopUtil}
+import org.apache.spark.launcher.SparkAppHandle
import org.apache.spark.scheduler.TaskSchedulerImpl
private[spark] class YarnClientSchedulerBackend(
@@ -177,6 +178,15 @@ private[spark] class YarnClientSchedulerBackend(
if (monitorThread != null) {
monitorThread.stopMonitor()
}
+
+ // Report a final state to the launcher if one is connected. This is needed since in client
+ // mode this backend doesn't let the app monitor loop run to completion, so it does not report
+ // the final state itself.
+ //
+ // Note: there's not enough information at this point to provide a better final state,
+ // so assume the application was successful.
+ client.reportLauncherState(SparkAppHandle.State.FINISHED)
+
super.stop()
YarnSparkHadoopUtil.get.stopExecutorDelegationTokenRenewer()
client.stop()
diff --git a/yarn/src/test/resources/log4j.properties b/yarn/src/test/resources/log4j.properties
index 6b8a5dbf63..6b9a799954 100644
--- a/yarn/src/test/resources/log4j.properties
+++ b/yarn/src/test/resources/log4j.properties
@@ -23,6 +23,9 @@ log4j.appender.file.file=target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
-# Ignore messages below warning level from Jetty, because it's a bit verbose
-log4j.logger.org.spark-project.jetty=WARN
+# Ignore messages below warning level from a few verbose libraries.
+log4j.logger.com.sun.jersey=WARN
log4j.logger.org.apache.hadoop=WARN
+log4j.logger.org.eclipse.jetty=WARN
+log4j.logger.org.mortbay=WARN
+log4j.logger.org.spark-project.jetty=WARN
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
index 17c59ff06e..12494b0105 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
@@ -22,15 +22,18 @@ import java.util.Properties
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+import scala.language.postfixOps
import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.server.MiniYARNCluster
import org.scalatest.{BeforeAndAfterAll, Matchers}
+import org.scalatest.concurrent.Eventually._
import org.apache.spark._
-import org.apache.spark.launcher.TestClasspathBuilder
+import org.apache.spark.launcher._
import org.apache.spark.util.Utils
abstract class BaseYarnClusterSuite
@@ -46,13 +49,14 @@ abstract class BaseYarnClusterSuite
|log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
|log4j.logger.org.apache.hadoop=WARN
|log4j.logger.org.eclipse.jetty=WARN
+ |log4j.logger.org.mortbay=WARN
|log4j.logger.org.spark-project.jetty=WARN
""".stripMargin
private var yarnCluster: MiniYARNCluster = _
protected var tempDir: File = _
private var fakeSparkJar: File = _
- private var hadoopConfDir: File = _
+ protected var hadoopConfDir: File = _
private var logConfDir: File = _
def newYarnConfig(): YarnConfiguration
@@ -120,15 +124,77 @@ abstract class BaseYarnClusterSuite
clientMode: Boolean,
klass: String,
appArgs: Seq[String] = Nil,
- sparkArgs: Seq[String] = Nil,
+ sparkArgs: Seq[(String, String)] = Nil,
extraClassPath: Seq[String] = Nil,
extraJars: Seq[String] = Nil,
extraConf: Map[String, String] = Map(),
- extraEnv: Map[String, String] = Map()): Unit = {
+ extraEnv: Map[String, String] = Map()): SparkAppHandle.State = {
val master = if (clientMode) "yarn-client" else "yarn-cluster"
- val props = new Properties()
+ val propsFile = createConfFile(extraClassPath = extraClassPath, extraConf = extraConf)
+ val env = Map("YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath()) ++ extraEnv
+
+ val launcher = new SparkLauncher(env.asJava)
+ if (klass.endsWith(".py")) {
+ launcher.setAppResource(klass)
+ } else {
+ launcher.setMainClass(klass)
+ launcher.setAppResource(fakeSparkJar.getAbsolutePath())
+ }
+ launcher.setSparkHome(sys.props("spark.test.home"))
+ .setMaster(master)
+ .setConf("spark.executor.instances", "1")
+ .setPropertiesFile(propsFile)
+ .addAppArgs(appArgs.toArray: _*)
+
+ sparkArgs.foreach { case (name, value) =>
+ if (value != null) {
+ launcher.addSparkArg(name, value)
+ } else {
+ launcher.addSparkArg(name)
+ }
+ }
+ extraJars.foreach(launcher.addJar)
- props.setProperty("spark.yarn.jar", "local:" + fakeSparkJar.getAbsolutePath())
+ val handle = launcher.startApplication()
+ try {
+ eventually(timeout(2 minutes), interval(1 second)) {
+ assert(handle.getState().isFinal())
+ }
+ } finally {
+ handle.kill()
+ }
+
+ handle.getState()
+ }
+
+ /**
+ * This is a workaround for an issue with yarn-cluster mode: the Client class will not provide
+ * any sort of error when the job process finishes successfully, but the job itself fails. So
+ * the tests enforce that something is written to a file after everything is ok to indicate
+ * that the job succeeded.
+ */
+ protected def checkResult(finalState: SparkAppHandle.State, result: File): Unit = {
+ checkResult(finalState, result, "success")
+ }
+
+ protected def checkResult(
+ finalState: SparkAppHandle.State,
+ result: File,
+ expected: String): Unit = {
+ finalState should be (SparkAppHandle.State.FINISHED)
+ val resultString = Files.toString(result, UTF_8)
+ resultString should be (expected)
+ }
+
+ protected def mainClassName(klass: Class[_]): String = {
+ klass.getName().stripSuffix("$")
+ }
+
+ protected def createConfFile(
+ extraClassPath: Seq[String] = Nil,
+ extraConf: Map[String, String] = Map()): String = {
+ val props = new Properties()
+ props.put("spark.yarn.jar", "local:" + fakeSparkJar.getAbsolutePath())
val testClasspath = new TestClasspathBuilder()
.buildClassPath(
@@ -138,69 +204,28 @@ abstract class BaseYarnClusterSuite
.asScala
.mkString(File.pathSeparator)
- props.setProperty("spark.driver.extraClassPath", testClasspath)
- props.setProperty("spark.executor.extraClassPath", testClasspath)
+ props.put("spark.driver.extraClassPath", testClasspath)
+ props.put("spark.executor.extraClassPath", testClasspath)
// SPARK-4267: make sure java options are propagated correctly.
props.setProperty("spark.driver.extraJavaOptions", "-Dfoo=\"one two three\"")
props.setProperty("spark.executor.extraJavaOptions", "-Dfoo=\"one two three\"")
- yarnCluster.getConfig.asScala.foreach { e =>
+ yarnCluster.getConfig().asScala.foreach { e =>
props.setProperty("spark.hadoop." + e.getKey(), e.getValue())
}
-
sys.props.foreach { case (k, v) =>
if (k.startsWith("spark.")) {
props.setProperty(k, v)
}
}
-
extraConf.foreach { case (k, v) => props.setProperty(k, v) }
val propsFile = File.createTempFile("spark", ".properties", tempDir)
val writer = new OutputStreamWriter(new FileOutputStream(propsFile), UTF_8)
props.store(writer, "Spark properties.")
writer.close()
-
- val extraJarArgs = if (extraJars.nonEmpty) Seq("--jars", extraJars.mkString(",")) else Nil
- val mainArgs =
- if (klass.endsWith(".py")) {
- Seq(klass)
- } else {
- Seq("--class", klass, fakeSparkJar.getAbsolutePath())
- }
- val argv =
- Seq(
- new File(sys.props("spark.test.home"), "bin/spark-submit").getAbsolutePath(),
- "--master", master,
- "--num-executors", "1",
- "--properties-file", propsFile.getAbsolutePath()) ++
- extraJarArgs ++
- sparkArgs ++
- mainArgs ++
- appArgs
-
- Utils.executeAndGetOutput(argv,
- extraEnvironment = Map("YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath()) ++ extraEnv)
- }
-
- /**
- * This is a workaround for an issue with yarn-cluster mode: the Client class will not provide
- * any sort of error when the job process finishes successfully, but the job itself fails. So
- * the tests enforce that something is written to a file after everything is ok to indicate
- * that the job succeeded.
- */
- protected def checkResult(result: File): Unit = {
- checkResult(result, "success")
- }
-
- protected def checkResult(result: File, expected: String): Unit = {
- val resultString = Files.toString(result, UTF_8)
- resultString should be (expected)
- }
-
- protected def mainClassName(klass: Class[_]): String = {
- klass.getName().stripSuffix("$")
+ propsFile.getAbsolutePath()
}
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index f1601cd161..d1cd0c89b5 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -19,16 +19,20 @@ package org.apache.spark.deploy.yarn
import java.io.File
import java.net.URL
+import java.util.{HashMap => JHashMap, Properties}
import scala.collection.mutable
+import scala.concurrent.duration._
+import scala.language.postfixOps
import com.google.common.base.Charsets.UTF_8
import com.google.common.io.{ByteStreams, Files}
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.scalatest.Matchers
+import org.scalatest.concurrent.Eventually._
import org.apache.spark._
-import org.apache.spark.launcher.TestClasspathBuilder
+import org.apache.spark.launcher._
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationStart,
SparkListenerExecutorAdded}
import org.apache.spark.scheduler.cluster.ExecutorInfo
@@ -82,10 +86,8 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
test("run Spark in yarn-cluster mode unsuccessfully") {
// Don't provide arguments so the driver will fail.
- val exception = intercept[SparkException] {
- runSpark(false, mainClassName(YarnClusterDriver.getClass))
- fail("Spark application should have failed.")
- }
+ val finalState = runSpark(false, mainClassName(YarnClusterDriver.getClass))
+ finalState should be (SparkAppHandle.State.FAILED)
}
test("run Python application in yarn-client mode") {
@@ -104,11 +106,42 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
testUseClassPathFirst(false)
}
+ test("monitor app using launcher library") {
+ val env = new JHashMap[String, String]()
+ env.put("YARN_CONF_DIR", hadoopConfDir.getAbsolutePath())
+
+ val propsFile = createConfFile()
+ val handle = new SparkLauncher(env)
+ .setSparkHome(sys.props("spark.test.home"))
+ .setConf("spark.ui.enabled", "false")
+ .setPropertiesFile(propsFile)
+ .setMaster("yarn-client")
+ .setAppResource("spark-internal")
+ .setMainClass(mainClassName(YarnLauncherTestApp.getClass))
+ .startApplication()
+
+ try {
+ eventually(timeout(30 seconds), interval(100 millis)) {
+ handle.getState() should be (SparkAppHandle.State.RUNNING)
+ }
+
+ handle.getAppId() should not be (null)
+ handle.getAppId() should startWith ("application_")
+ handle.stop()
+
+ eventually(timeout(30 seconds), interval(100 millis)) {
+ handle.getState() should be (SparkAppHandle.State.KILLED)
+ }
+ } finally {
+ handle.kill()
+ }
+ }
+
private def testBasicYarnApp(clientMode: Boolean): Unit = {
val result = File.createTempFile("result", null, tempDir)
- runSpark(clientMode, mainClassName(YarnClusterDriver.getClass),
+ val finalState = runSpark(clientMode, mainClassName(YarnClusterDriver.getClass),
appArgs = Seq(result.getAbsolutePath()))
- checkResult(result)
+ checkResult(finalState, result)
}
private def testPySpark(clientMode: Boolean): Unit = {
@@ -143,11 +176,11 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
val pyFiles = Seq(pyModule.getAbsolutePath(), mod2Archive.getPath()).mkString(",")
val result = File.createTempFile("result", null, tempDir)
- runSpark(clientMode, primaryPyFile.getAbsolutePath(),
- sparkArgs = Seq("--py-files", pyFiles),
+ val finalState = runSpark(clientMode, primaryPyFile.getAbsolutePath(),
+ sparkArgs = Seq("--py-files" -> pyFiles),
appArgs = Seq(result.getAbsolutePath()),
extraEnv = extraEnv)
- checkResult(result)
+ checkResult(finalState, result)
}
private def testUseClassPathFirst(clientMode: Boolean): Unit = {
@@ -156,15 +189,15 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
val userJar = TestUtils.createJarWithFiles(Map("test.resource" -> "OVERRIDDEN"), tempDir)
val driverResult = File.createTempFile("driver", null, tempDir)
val executorResult = File.createTempFile("executor", null, tempDir)
- runSpark(clientMode, mainClassName(YarnClasspathTest.getClass),
+ val finalState = runSpark(clientMode, mainClassName(YarnClasspathTest.getClass),
appArgs = Seq(driverResult.getAbsolutePath(), executorResult.getAbsolutePath()),
extraClassPath = Seq(originalJar.getPath()),
extraJars = Seq("local:" + userJar.getPath()),
extraConf = Map(
"spark.driver.userClassPathFirst" -> "true",
"spark.executor.userClassPathFirst" -> "true"))
- checkResult(driverResult, "OVERRIDDEN")
- checkResult(executorResult, "OVERRIDDEN")
+ checkResult(finalState, driverResult, "OVERRIDDEN")
+ checkResult(finalState, executorResult, "OVERRIDDEN")
}
}
@@ -211,8 +244,8 @@ private object YarnClusterDriver extends Logging with Matchers {
data should be (Set(1, 2, 3, 4))
result = "success"
} finally {
- sc.stop()
Files.write(result, status, UTF_8)
+ sc.stop()
}
// verify log urls are present
@@ -297,3 +330,18 @@ private object YarnClasspathTest extends Logging {
}
}
+
+private object YarnLauncherTestApp {
+
+ def main(args: Array[String]): Unit = {
+ // Do not stop the application; the test will stop it using the launcher lib. Just run a task
+ // that will prevent the process from exiting.
+ val sc = new SparkContext(new SparkConf())
+ sc.parallelize(Seq(1)).foreach { i =>
+ this.synchronized {
+ wait()
+ }
+ }
+ }
+
+}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
index a85e5772a0..c17e8695c2 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
@@ -53,7 +53,7 @@ class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite {
logInfo("Shuffle service port = " + shuffleServicePort)
val result = File.createTempFile("result", null, tempDir)
- runSpark(
+ val finalState = runSpark(
false,
mainClassName(YarnExternalShuffleDriver.getClass),
appArgs = Seq(result.getAbsolutePath(), registeredExecFile.getAbsolutePath),
@@ -62,7 +62,7 @@ class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite {
"spark.shuffle.service.port" -> shuffleServicePort.toString
)
)
- checkResult(result)
+ checkResult(finalState, result)
assert(YarnTestAccessor.getRegisteredExecutorFile(shuffleService).exists())
}
}