aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2014-10-03 11:36:24 -0700
committerAndrew Or <andrewor14@gmail.com>2014-10-03 11:36:24 -0700
commitfbe8e9856b23262193105e7bf86075f516f0db25 (patch)
tree683966d1f22017dcb8c37b5a1b0d954983ba942a /yarn
parent22f8e1ee7c4ea7b3bd4c6faaf0fe5b88a134ae12 (diff)
downloadspark-fbe8e9856b23262193105e7bf86075f516f0db25.tar.gz
spark-fbe8e9856b23262193105e7bf86075f516f0db25.tar.bz2
spark-fbe8e9856b23262193105e7bf86075f516f0db25.zip
[SPARK-2778] [yarn] Add workaround for race in MiniYARNCluster.
Sometimes the cluster's start() method returns before the configuration having been updated, which is done by ClientRMService in, I assume, a separate thread (otherwise there would be no race). That can cause tests to fail if the old configuration data is read, since it will contain the wrong RM address. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #2605 from vanzin/SPARK-2778 and squashes the following commits: 8d02ce0 [Marcelo Vanzin] Minor cleanup. 5bebee7 [Marcelo Vanzin] [SPARK-2778] [yarn] Add workaround for race in MiniYARNCluster.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala35
1 files changed, 31 insertions, 4 deletions
diff --git a/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 4b6635679f..a826b2a78a 100644
--- a/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/stable/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -18,6 +18,7 @@
package org.apache.spark.deploy.yarn
import java.io.File
+import java.util.concurrent.TimeUnit
import scala.collection.JavaConversions._
@@ -32,7 +33,7 @@ import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils
-class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers {
+class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers with Logging {
// log4j configuration for the Yarn containers, so that their output is collected
// by Yarn instead of trying to overwrite unit-tests.log.
@@ -66,7 +67,33 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers {
yarnCluster = new MiniYARNCluster(getClass().getName(), 1, 1, 1)
yarnCluster.init(new YarnConfiguration())
yarnCluster.start()
- yarnCluster.getConfig().foreach { e =>
+
+ // There's a race in MiniYARNCluster in which start() may return before the RM has updated
+ // its address in the configuration. You can see this in the logs by noticing that when
+ // MiniYARNCluster prints the address, it still has port "0" assigned, although later the
+ // test works sometimes:
+ //
+ // INFO MiniYARNCluster: MiniYARN ResourceManager address: blah:0
+ //
+ // That log message prints the contents of the RM_ADDRESS config variable. If you check it
+ // later on, it looks something like this:
+ //
+ // INFO YarnClusterSuite: RM address in configuration is blah:42631
+ //
+ // This hack loops for a bit waiting for the port to change, and fails the test if it hasn't
+ // done so in a timely manner (defined to be 10 seconds).
+ val config = yarnCluster.getConfig()
+ val deadline = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(10)
+ while (config.get(YarnConfiguration.RM_ADDRESS).split(":")(1) == "0") {
+ if (System.currentTimeMillis() > deadline) {
+ throw new IllegalStateException("Timed out waiting for RM to come up.")
+ }
+ logDebug("RM address still not set in configuration, waiting...")
+ TimeUnit.MILLISECONDS.sleep(100)
+ }
+
+ logInfo(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}")
+ config.foreach { e =>
sys.props += ("spark.hadoop." + e.getKey() -> e.getValue())
}
@@ -86,13 +113,13 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers {
super.afterAll()
}
- ignore("run Spark in yarn-client mode") {
+ test("run Spark in yarn-client mode") {
var result = File.createTempFile("result", null, tempDir)
YarnClusterDriver.main(Array("yarn-client", result.getAbsolutePath()))
checkResult(result)
}
- ignore("run Spark in yarn-cluster mode") {
+ test("run Spark in yarn-cluster mode") {
val main = YarnClusterDriver.getClass.getName().stripSuffix("$")
var result = File.createTempFile("result", null, tempDir)