aboutsummaryrefslogtreecommitdiff
path: root/yarn/src
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-10-09 15:28:09 -0500
committerImran Rashid <irashid@cloudera.com>2015-10-09 15:28:09 -0500
commit015f7ef503d5544f79512b6333326749a1f0c48b (patch)
tree990942b40bd374f632c3954cd4aab3741dd17f63 /yarn/src
parent70f44ad2d836236c74e1336a7368982d5fe3abff (diff)
downloadspark-015f7ef503d5544f79512b6333326749a1f0c48b.tar.gz
spark-015f7ef503d5544f79512b6333326749a1f0c48b.tar.bz2
spark-015f7ef503d5544f79512b6333326749a1f0c48b.zip
[SPARK-8673] [LAUNCHER] API and infrastructure for communicating with child apps.
This change adds an API that encapsulates information about an app launched using the library. It also creates a socket-based communication layer for apps that are launched as child processes; the launching application listens for connections from launched apps, and once communication is established, the channel can be used to send updates to the launching app, or to send commands to the child app. The change also includes hooks for local, standalone/client and yarn masters. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #7052 from vanzin/SPARK-8673.
Diffstat (limited to 'yarn/src')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala43
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala10
-rw-r--r--yarn/src/test/resources/log4j.properties7
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala127
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala76
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala4
6 files changed, 193 insertions, 74 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index eb3b7fb885..cec81b9406 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -55,8 +55,8 @@ import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
import org.apache.hadoop.yarn.util.Records
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException}
+import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils}
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.launcher.YarnCommandBuilderUtils
import org.apache.spark.util.Utils
private[spark] class Client(
@@ -70,8 +70,6 @@ private[spark] class Client(
def this(clientArgs: ClientArguments, spConf: SparkConf) =
this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)
- def this(clientArgs: ClientArguments) = this(clientArgs, new SparkConf())
-
private val yarnClient = YarnClient.createYarnClient
private val yarnConf = new YarnConfiguration(hadoopConf)
private var credentials: Credentials = null
@@ -84,10 +82,27 @@ private[spark] class Client(
private var principal: String = null
private var keytab: String = null
+ private val launcherBackend = new LauncherBackend() {
+ override def onStopRequest(): Unit = {
+ if (isClusterMode && appId != null) {
+ yarnClient.killApplication(appId)
+ } else {
+ setState(SparkAppHandle.State.KILLED)
+ stop()
+ }
+ }
+ }
private val fireAndForget = isClusterMode &&
!sparkConf.getBoolean("spark.yarn.submit.waitAppCompletion", true)
+ private var appId: ApplicationId = null
+
+ def reportLauncherState(state: SparkAppHandle.State): Unit = {
+ launcherBackend.setState(state)
+ }
+
def stop(): Unit = {
+ launcherBackend.close()
yarnClient.stop()
// Unset YARN mode system env variable, to allow switching between cluster types.
System.clearProperty("SPARK_YARN_MODE")
@@ -103,6 +118,7 @@ private[spark] class Client(
def submitApplication(): ApplicationId = {
var appId: ApplicationId = null
try {
+ launcherBackend.connect()
// Setup the credentials before doing anything else,
// so we have don't have issues at any point.
setupCredentials()
@@ -116,6 +132,8 @@ private[spark] class Client(
val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
appId = newAppResponse.getApplicationId()
+ reportLauncherState(SparkAppHandle.State.SUBMITTED)
+ launcherBackend.setAppId(appId.toString())
// Verify whether the cluster has enough resources for our AM
verifyClusterResources(newAppResponse)
@@ -881,6 +899,20 @@ private[spark] class Client(
}
}
+ if (lastState != state) {
+ state match {
+ case YarnApplicationState.RUNNING =>
+ reportLauncherState(SparkAppHandle.State.RUNNING)
+ case YarnApplicationState.FINISHED =>
+ reportLauncherState(SparkAppHandle.State.FINISHED)
+ case YarnApplicationState.FAILED =>
+ reportLauncherState(SparkAppHandle.State.FAILED)
+ case YarnApplicationState.KILLED =>
+ reportLauncherState(SparkAppHandle.State.KILLED)
+ case _ =>
+ }
+ }
+
if (state == YarnApplicationState.FINISHED ||
state == YarnApplicationState.FAILED ||
state == YarnApplicationState.KILLED) {
@@ -928,8 +960,8 @@ private[spark] class Client(
* throw an appropriate SparkException.
*/
def run(): Unit = {
- val appId = submitApplication()
- if (fireAndForget) {
+ this.appId = submitApplication()
+ if (!launcherBackend.isConnected() && fireAndForget) {
val report = getApplicationReport(appId)
val state = report.getYarnApplicationState
logInfo(s"Application report for $appId (state: $state)")
@@ -971,6 +1003,7 @@ private[spark] class Client(
}
object Client extends Logging {
+
def main(argStrings: Array[String]) {
if (!sys.props.contains("SPARK_SUBMIT")) {
logWarning("WARNING: This client is deprecated and will be removed in a " +
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 36d5759554..20771f6554 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
import org.apache.spark.{SparkException, Logging, SparkContext}
import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnSparkHadoopUtil}
+import org.apache.spark.launcher.SparkAppHandle
import org.apache.spark.scheduler.TaskSchedulerImpl
private[spark] class YarnClientSchedulerBackend(
@@ -177,6 +178,15 @@ private[spark] class YarnClientSchedulerBackend(
if (monitorThread != null) {
monitorThread.stopMonitor()
}
+
+ // Report a final state to the launcher if one is connected. This is needed since in client
+ // mode this backend doesn't let the app monitor loop run to completion, so it does not report
+ // the final state itself.
+ //
+ // Note: there's not enough information at this point to provide a better final state,
+ // so assume the application was successful.
+ client.reportLauncherState(SparkAppHandle.State.FINISHED)
+
super.stop()
YarnSparkHadoopUtil.get.stopExecutorDelegationTokenRenewer()
client.stop()
diff --git a/yarn/src/test/resources/log4j.properties b/yarn/src/test/resources/log4j.properties
index 6b8a5dbf63..6b9a799954 100644
--- a/yarn/src/test/resources/log4j.properties
+++ b/yarn/src/test/resources/log4j.properties
@@ -23,6 +23,9 @@ log4j.appender.file.file=target/unit-tests.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss.SSS} %t %p %c{1}: %m%n
-# Ignore messages below warning level from Jetty, because it's a bit verbose
-log4j.logger.org.spark-project.jetty=WARN
+# Ignore messages below warning level from a few verbose libraries.
+log4j.logger.com.sun.jersey=WARN
log4j.logger.org.apache.hadoop=WARN
+log4j.logger.org.eclipse.jetty=WARN
+log4j.logger.org.mortbay=WARN
+log4j.logger.org.spark-project.jetty=WARN
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
index 17c59ff06e..12494b0105 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/BaseYarnClusterSuite.scala
@@ -22,15 +22,18 @@ import java.util.Properties
import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+import scala.language.postfixOps
import com.google.common.base.Charsets.UTF_8
import com.google.common.io.Files
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.server.MiniYARNCluster
import org.scalatest.{BeforeAndAfterAll, Matchers}
+import org.scalatest.concurrent.Eventually._
import org.apache.spark._
-import org.apache.spark.launcher.TestClasspathBuilder
+import org.apache.spark.launcher._
import org.apache.spark.util.Utils
abstract class BaseYarnClusterSuite
@@ -46,13 +49,14 @@ abstract class BaseYarnClusterSuite
|log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
|log4j.logger.org.apache.hadoop=WARN
|log4j.logger.org.eclipse.jetty=WARN
+ |log4j.logger.org.mortbay=WARN
|log4j.logger.org.spark-project.jetty=WARN
""".stripMargin
private var yarnCluster: MiniYARNCluster = _
protected var tempDir: File = _
private var fakeSparkJar: File = _
- private var hadoopConfDir: File = _
+ protected var hadoopConfDir: File = _
private var logConfDir: File = _
def newYarnConfig(): YarnConfiguration
@@ -120,15 +124,77 @@ abstract class BaseYarnClusterSuite
clientMode: Boolean,
klass: String,
appArgs: Seq[String] = Nil,
- sparkArgs: Seq[String] = Nil,
+ sparkArgs: Seq[(String, String)] = Nil,
extraClassPath: Seq[String] = Nil,
extraJars: Seq[String] = Nil,
extraConf: Map[String, String] = Map(),
- extraEnv: Map[String, String] = Map()): Unit = {
+ extraEnv: Map[String, String] = Map()): SparkAppHandle.State = {
val master = if (clientMode) "yarn-client" else "yarn-cluster"
- val props = new Properties()
+ val propsFile = createConfFile(extraClassPath = extraClassPath, extraConf = extraConf)
+ val env = Map("YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath()) ++ extraEnv
+
+ val launcher = new SparkLauncher(env.asJava)
+ if (klass.endsWith(".py")) {
+ launcher.setAppResource(klass)
+ } else {
+ launcher.setMainClass(klass)
+ launcher.setAppResource(fakeSparkJar.getAbsolutePath())
+ }
+ launcher.setSparkHome(sys.props("spark.test.home"))
+ .setMaster(master)
+ .setConf("spark.executor.instances", "1")
+ .setPropertiesFile(propsFile)
+ .addAppArgs(appArgs.toArray: _*)
+
+ sparkArgs.foreach { case (name, value) =>
+ if (value != null) {
+ launcher.addSparkArg(name, value)
+ } else {
+ launcher.addSparkArg(name)
+ }
+ }
+ extraJars.foreach(launcher.addJar)
- props.setProperty("spark.yarn.jar", "local:" + fakeSparkJar.getAbsolutePath())
+ val handle = launcher.startApplication()
+ try {
+ eventually(timeout(2 minutes), interval(1 second)) {
+ assert(handle.getState().isFinal())
+ }
+ } finally {
+ handle.kill()
+ }
+
+ handle.getState()
+ }
+
+ /**
+ * This is a workaround for an issue with yarn-cluster mode: the Client class will not provide
+ * any sort of error when the job process finishes successfully, but the job itself fails. So
+ * the tests enforce that something is written to a file after everything is ok to indicate
+ * that the job succeeded.
+ */
+ protected def checkResult(finalState: SparkAppHandle.State, result: File): Unit = {
+ checkResult(finalState, result, "success")
+ }
+
+ protected def checkResult(
+ finalState: SparkAppHandle.State,
+ result: File,
+ expected: String): Unit = {
+ finalState should be (SparkAppHandle.State.FINISHED)
+ val resultString = Files.toString(result, UTF_8)
+ resultString should be (expected)
+ }
+
+ protected def mainClassName(klass: Class[_]): String = {
+ klass.getName().stripSuffix("$")
+ }
+
+ protected def createConfFile(
+ extraClassPath: Seq[String] = Nil,
+ extraConf: Map[String, String] = Map()): String = {
+ val props = new Properties()
+ props.put("spark.yarn.jar", "local:" + fakeSparkJar.getAbsolutePath())
val testClasspath = new TestClasspathBuilder()
.buildClassPath(
@@ -138,69 +204,28 @@ abstract class BaseYarnClusterSuite
.asScala
.mkString(File.pathSeparator)
- props.setProperty("spark.driver.extraClassPath", testClasspath)
- props.setProperty("spark.executor.extraClassPath", testClasspath)
+ props.put("spark.driver.extraClassPath", testClasspath)
+ props.put("spark.executor.extraClassPath", testClasspath)
// SPARK-4267: make sure java options are propagated correctly.
props.setProperty("spark.driver.extraJavaOptions", "-Dfoo=\"one two three\"")
props.setProperty("spark.executor.extraJavaOptions", "-Dfoo=\"one two three\"")
- yarnCluster.getConfig.asScala.foreach { e =>
+ yarnCluster.getConfig().asScala.foreach { e =>
props.setProperty("spark.hadoop." + e.getKey(), e.getValue())
}
-
sys.props.foreach { case (k, v) =>
if (k.startsWith("spark.")) {
props.setProperty(k, v)
}
}
-
extraConf.foreach { case (k, v) => props.setProperty(k, v) }
val propsFile = File.createTempFile("spark", ".properties", tempDir)
val writer = new OutputStreamWriter(new FileOutputStream(propsFile), UTF_8)
props.store(writer, "Spark properties.")
writer.close()
-
- val extraJarArgs = if (extraJars.nonEmpty) Seq("--jars", extraJars.mkString(",")) else Nil
- val mainArgs =
- if (klass.endsWith(".py")) {
- Seq(klass)
- } else {
- Seq("--class", klass, fakeSparkJar.getAbsolutePath())
- }
- val argv =
- Seq(
- new File(sys.props("spark.test.home"), "bin/spark-submit").getAbsolutePath(),
- "--master", master,
- "--num-executors", "1",
- "--properties-file", propsFile.getAbsolutePath()) ++
- extraJarArgs ++
- sparkArgs ++
- mainArgs ++
- appArgs
-
- Utils.executeAndGetOutput(argv,
- extraEnvironment = Map("YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath()) ++ extraEnv)
- }
-
- /**
- * This is a workaround for an issue with yarn-cluster mode: the Client class will not provide
- * any sort of error when the job process finishes successfully, but the job itself fails. So
- * the tests enforce that something is written to a file after everything is ok to indicate
- * that the job succeeded.
- */
- protected def checkResult(result: File): Unit = {
- checkResult(result, "success")
- }
-
- protected def checkResult(result: File, expected: String): Unit = {
- val resultString = Files.toString(result, UTF_8)
- resultString should be (expected)
- }
-
- protected def mainClassName(klass: Class[_]): String = {
- klass.getName().stripSuffix("$")
+ propsFile.getAbsolutePath()
}
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index f1601cd161..d1cd0c89b5 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -19,16 +19,20 @@ package org.apache.spark.deploy.yarn
import java.io.File
import java.net.URL
+import java.util.{HashMap => JHashMap, Properties}
import scala.collection.mutable
+import scala.concurrent.duration._
+import scala.language.postfixOps
import com.google.common.base.Charsets.UTF_8
import com.google.common.io.{ByteStreams, Files}
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.scalatest.Matchers
+import org.scalatest.concurrent.Eventually._
import org.apache.spark._
-import org.apache.spark.launcher.TestClasspathBuilder
+import org.apache.spark.launcher._
import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationStart,
SparkListenerExecutorAdded}
import org.apache.spark.scheduler.cluster.ExecutorInfo
@@ -82,10 +86,8 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
test("run Spark in yarn-cluster mode unsuccessfully") {
// Don't provide arguments so the driver will fail.
- val exception = intercept[SparkException] {
- runSpark(false, mainClassName(YarnClusterDriver.getClass))
- fail("Spark application should have failed.")
- }
+ val finalState = runSpark(false, mainClassName(YarnClusterDriver.getClass))
+ finalState should be (SparkAppHandle.State.FAILED)
}
test("run Python application in yarn-client mode") {
@@ -104,11 +106,42 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
testUseClassPathFirst(false)
}
+ test("monitor app using launcher library") {
+ val env = new JHashMap[String, String]()
+ env.put("YARN_CONF_DIR", hadoopConfDir.getAbsolutePath())
+
+ val propsFile = createConfFile()
+ val handle = new SparkLauncher(env)
+ .setSparkHome(sys.props("spark.test.home"))
+ .setConf("spark.ui.enabled", "false")
+ .setPropertiesFile(propsFile)
+ .setMaster("yarn-client")
+ .setAppResource("spark-internal")
+ .setMainClass(mainClassName(YarnLauncherTestApp.getClass))
+ .startApplication()
+
+ try {
+ eventually(timeout(30 seconds), interval(100 millis)) {
+ handle.getState() should be (SparkAppHandle.State.RUNNING)
+ }
+
+ handle.getAppId() should not be (null)
+ handle.getAppId() should startWith ("application_")
+ handle.stop()
+
+ eventually(timeout(30 seconds), interval(100 millis)) {
+ handle.getState() should be (SparkAppHandle.State.KILLED)
+ }
+ } finally {
+ handle.kill()
+ }
+ }
+
private def testBasicYarnApp(clientMode: Boolean): Unit = {
val result = File.createTempFile("result", null, tempDir)
- runSpark(clientMode, mainClassName(YarnClusterDriver.getClass),
+ val finalState = runSpark(clientMode, mainClassName(YarnClusterDriver.getClass),
appArgs = Seq(result.getAbsolutePath()))
- checkResult(result)
+ checkResult(finalState, result)
}
private def testPySpark(clientMode: Boolean): Unit = {
@@ -143,11 +176,11 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
val pyFiles = Seq(pyModule.getAbsolutePath(), mod2Archive.getPath()).mkString(",")
val result = File.createTempFile("result", null, tempDir)
- runSpark(clientMode, primaryPyFile.getAbsolutePath(),
- sparkArgs = Seq("--py-files", pyFiles),
+ val finalState = runSpark(clientMode, primaryPyFile.getAbsolutePath(),
+ sparkArgs = Seq("--py-files" -> pyFiles),
appArgs = Seq(result.getAbsolutePath()),
extraEnv = extraEnv)
- checkResult(result)
+ checkResult(finalState, result)
}
private def testUseClassPathFirst(clientMode: Boolean): Unit = {
@@ -156,15 +189,15 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
val userJar = TestUtils.createJarWithFiles(Map("test.resource" -> "OVERRIDDEN"), tempDir)
val driverResult = File.createTempFile("driver", null, tempDir)
val executorResult = File.createTempFile("executor", null, tempDir)
- runSpark(clientMode, mainClassName(YarnClasspathTest.getClass),
+ val finalState = runSpark(clientMode, mainClassName(YarnClasspathTest.getClass),
appArgs = Seq(driverResult.getAbsolutePath(), executorResult.getAbsolutePath()),
extraClassPath = Seq(originalJar.getPath()),
extraJars = Seq("local:" + userJar.getPath()),
extraConf = Map(
"spark.driver.userClassPathFirst" -> "true",
"spark.executor.userClassPathFirst" -> "true"))
- checkResult(driverResult, "OVERRIDDEN")
- checkResult(executorResult, "OVERRIDDEN")
+ checkResult(finalState, driverResult, "OVERRIDDEN")
+ checkResult(finalState, executorResult, "OVERRIDDEN")
}
}
@@ -211,8 +244,8 @@ private object YarnClusterDriver extends Logging with Matchers {
data should be (Set(1, 2, 3, 4))
result = "success"
} finally {
- sc.stop()
Files.write(result, status, UTF_8)
+ sc.stop()
}
// verify log urls are present
@@ -297,3 +330,18 @@ private object YarnClasspathTest extends Logging {
}
}
+
+private object YarnLauncherTestApp {
+
+ def main(args: Array[String]): Unit = {
+ // Do not stop the application; the test will stop it using the launcher lib. Just run a task
+ // that will prevent the process from exiting.
+ val sc = new SparkContext(new SparkConf())
+ sc.parallelize(Seq(1)).foreach { i =>
+ this.synchronized {
+ wait()
+ }
+ }
+ }
+
+}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
index a85e5772a0..c17e8695c2 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnShuffleIntegrationSuite.scala
@@ -53,7 +53,7 @@ class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite {
logInfo("Shuffle service port = " + shuffleServicePort)
val result = File.createTempFile("result", null, tempDir)
- runSpark(
+ val finalState = runSpark(
false,
mainClassName(YarnExternalShuffleDriver.getClass),
appArgs = Seq(result.getAbsolutePath(), registeredExecFile.getAbsolutePath),
@@ -62,7 +62,7 @@ class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite {
"spark.shuffle.service.port" -> shuffleServicePort.toString
)
)
- checkResult(result)
+ checkResult(finalState, result)
assert(YarnTestAccessor.getRegisteredExecutorFile(shuffleService).exists())
}
}