aboutsummaryrefslogtreecommitdiff
path: root/yarn/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'yarn/src/test')
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala31
1 files changed, 28 insertions, 3 deletions
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 7165918e1b..eda40efc4c 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -21,16 +21,17 @@ import java.io.File
import java.util.concurrent.TimeUnit
import scala.collection.JavaConversions._
+import scala.collection.mutable
import com.google.common.base.Charsets
import com.google.common.io.Files
-import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
-
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.server.MiniYARNCluster
+import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException}
-import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.scheduler.cluster.ExecutorInfo
+import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded}
import org.apache.spark.util.Utils
class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers with Logging {
@@ -143,6 +144,11 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
var result = File.createTempFile("result", null, tempDir)
YarnClusterDriver.main(Array("yarn-client", result.getAbsolutePath()))
checkResult(result)
+
+ // verify log urls are present
+ YarnClusterDriver.listener.addedExecutorInfos.values.foreach { info =>
+ assert(info.logUrlMap.nonEmpty)
+ }
}
test("run Spark in yarn-cluster mode") {
@@ -156,6 +162,11 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
"--num-executors", "1")
Client.main(args)
checkResult(result)
+
+ // verify log urls are present.
+ YarnClusterDriver.listener.addedExecutorInfos.values.foreach { info =>
+ assert(info.logUrlMap.nonEmpty)
+ }
}
test("run Spark in yarn-cluster mode unsuccessfully") {
@@ -203,8 +214,19 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
}
+private class SaveExecutorInfo extends SparkListener {
+ val addedExecutorInfos = mutable.Map[String, ExecutorInfo]()
+
+ override def onExecutorAdded(executor : SparkListenerExecutorAdded) {
+ addedExecutorInfos(executor.executorId) = executor.executorInfo
+ }
+}
+
private object YarnClusterDriver extends Logging with Matchers {
+ val WAIT_TIMEOUT_MILLIS = 10000
+ var listener: SaveExecutorInfo = null
+
def main(args: Array[String]) = {
if (args.length != 2) {
System.err.println(
@@ -216,12 +238,15 @@ private object YarnClusterDriver extends Logging with Matchers {
System.exit(1)
}
+ listener = new SaveExecutorInfo
val sc = new SparkContext(new SparkConf().setMaster(args(0))
.setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns"))
+ sc.addSparkListener(listener)
val status = new File(args(1))
var result = "failure"
try {
val data = sc.parallelize(1 to 4, 4).collect().toSet
+ assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
data should be (Set(1, 2, 3, 4))
result = "success"
} finally {