aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala17
1 files changed, 10 insertions, 7 deletions
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 76952e3341..a18c94d4ab 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -33,7 +33,7 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException, TestUtils}
import org.apache.spark.scheduler.cluster.ExecutorInfo
-import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded}
+import org.apache.spark.scheduler.{SparkListenerJobStart, SparkListener, SparkListenerExecutorAdded}
import org.apache.spark.util.Utils
/**
@@ -282,10 +282,10 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
}
-private class SaveExecutorInfo extends SparkListener {
+private[spark] class SaveExecutorInfo extends SparkListener {
val addedExecutorInfos = mutable.Map[String, ExecutorInfo]()
- override def onExecutorAdded(executor : SparkListenerExecutorAdded) {
+ override def onExecutorAdded(executor: SparkListenerExecutorAdded) {
addedExecutorInfos(executor.executorId) = executor.executorInfo
}
}
@@ -293,7 +293,6 @@ private class SaveExecutorInfo extends SparkListener {
private object YarnClusterDriver extends Logging with Matchers {
val WAIT_TIMEOUT_MILLIS = 10000
- var listener: SaveExecutorInfo = null
def main(args: Array[String]): Unit = {
if (args.length != 1) {
@@ -306,10 +305,9 @@ private object YarnClusterDriver extends Logging with Matchers {
System.exit(1)
}
- listener = new SaveExecutorInfo
val sc = new SparkContext(new SparkConf()
+ .set("spark.extraListeners", classOf[SaveExecutorInfo].getName)
.setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns"))
- sc.addSparkListener(listener)
val status = new File(args(0))
var result = "failure"
try {
@@ -323,7 +321,12 @@ private object YarnClusterDriver extends Logging with Matchers {
}
// verify log urls are present
- listener.addedExecutorInfos.values.foreach { info =>
+ val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo]
+ assert(listeners.size === 1)
+ val listener = listeners(0)
+ val executorInfos = listener.addedExecutorInfos.values
+ assert(executorInfos.nonEmpty)
+ executorInfos.foreach { info =>
assert(info.logUrlMap.nonEmpty)
}
}