diff options
Diffstat (limited to 'yarn/src/test')
-rw-r--r-- | yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala | 31 |
1 files changed, 28 insertions, 3 deletions
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 7165918e1b..eda40efc4c 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -21,16 +21,17 @@ import java.io.File import java.util.concurrent.TimeUnit import scala.collection.JavaConversions._ +import scala.collection.mutable import com.google.common.base.Charsets import com.google.common.io.Files -import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} - import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.server.MiniYARNCluster +import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers} import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException} -import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.scheduler.cluster.ExecutorInfo +import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded} import org.apache.spark.util.Utils class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers with Logging { @@ -143,6 +144,11 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit var result = File.createTempFile("result", null, tempDir) YarnClusterDriver.main(Array("yarn-client", result.getAbsolutePath())) checkResult(result) + + // verify log urls are present + YarnClusterDriver.listener.addedExecutorInfos.values.foreach { info => + assert(info.logUrlMap.nonEmpty) + } } test("run Spark in yarn-cluster mode") { @@ -156,6 +162,11 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit "--num-executors", "1") Client.main(args) checkResult(result) + + // verify log urls are present. + YarnClusterDriver.listener.addedExecutorInfos.values.foreach { info => + assert(info.logUrlMap.nonEmpty) + } } test("run Spark in yarn-cluster mode unsuccessfully") { @@ -203,8 +214,19 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit } +private class SaveExecutorInfo extends SparkListener { + val addedExecutorInfos = mutable.Map[String, ExecutorInfo]() + + override def onExecutorAdded(executor : SparkListenerExecutorAdded) { + addedExecutorInfos(executor.executorId) = executor.executorInfo + } +} + private object YarnClusterDriver extends Logging with Matchers { + val WAIT_TIMEOUT_MILLIS = 10000 + var listener: SaveExecutorInfo = null + def main(args: Array[String]) = { if (args.length != 2) { System.err.println( @@ -216,12 +238,15 @@ private object YarnClusterDriver extends Logging with Matchers { System.exit(1) } + listener = new SaveExecutorInfo val sc = new SparkContext(new SparkConf().setMaster(args(0)) .setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns")) + sc.addSparkListener(listener) val status = new File(args(1)) var result = "failure" try { val data = sc.parallelize(1 to 4, 4).collect().toSet + assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) data should be (Set(1, 2, 3, 4)) result = "success" } finally { |