aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorKousuke Saruta <sarutak@oss.nttdata.co.jp>2015-04-14 14:00:49 -0700
committerAndrew Or <andrew@databricks.com>2015-04-14 14:01:55 -0700
commit4d4b24927417b2c17810e94d6d46c37491c68869 (patch)
tree98f86d116a45aabf1c9de6ab63bb7b0097fe81da /yarn
parent65774370a1275e25cd8a3357e397d116767793a9 (diff)
downloadspark-4d4b24927417b2c17810e94d6d46c37491c68869.tar.gz
spark-4d4b24927417b2c17810e94d6d46c37491c68869.tar.bz2
spark-4d4b24927417b2c17810e94d6d46c37491c68869.zip
[SPARK-6769][YARN][TEST] Usage of the ListenerBus in YarnClusterSuite is wrong
In YarnClusterSuite, a test case uses `SaveExecutorInfo` to handle ExecutorAddedEvent as follows. ``` private class SaveExecutorInfo extends SparkListener { val addedExecutorInfos = mutable.Map[String, ExecutorInfo]() override def onExecutorAdded(executor: SparkListenerExecutorAdded) { addedExecutorInfos(executor.executorId) = executor.executorInfo } } ... listener = new SaveExecutorInfo val sc = new SparkContext(new SparkConf() .setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns")) sc.addSparkListener(listener) val status = new File(args(0)) var result = "failure" try { val data = sc.parallelize(1 to 4, 4).collect().toSet assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) data should be (Set(1, 2, 3, 4)) result = "success" } finally { sc.stop() Files.write(result, status, UTF_8) } ``` But, the usage is wrong because Executors will spawn during initializing SparkContext and SparkContext#addSparkListener should be invoked after the initialization, thus after Executors spawn, so SaveExecutorInfo cannot handle ExecutorAddedEvent. Following code refers the result of the handling ExecutorAddedEvent. Because of the reason above, we cannot reach the assertion. ``` // verify log urls are present listener.addedExecutorInfos.values.foreach { info => assert(info.logUrlMap.nonEmpty) } ``` Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp> Closes #5417 from sarutak/SPARK-6769 and squashes the following commits: 8adc8ba [Kousuke Saruta] Fixed compile error e258530 [Kousuke Saruta] Fixed style 591cf3e [Kousuke Saruta] Fixed style 48ec89a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-6769 860c965 [Kousuke Saruta] Simplified code 207d325 [Kousuke Saruta] Added findListenersByClass method to ListenerBus 2408c84 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-6769 2d7e409 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-6769 3874adf [Kousuke Saruta] Fixed the usage of listener bus in LogUrlsStandaloneSuite 153a91b [Kousuke Saruta] Fixed the usage of listener bus in YarnClusterSuite
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala17
1 files changed, 10 insertions, 7 deletions
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 76952e3341..a18c94d4ab 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -33,7 +33,7 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite, Matchers}
import org.apache.spark.{Logging, SparkConf, SparkContext, SparkException, TestUtils}
import org.apache.spark.scheduler.cluster.ExecutorInfo
-import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorAdded}
+import org.apache.spark.scheduler.{SparkListenerJobStart, SparkListener, SparkListenerExecutorAdded}
import org.apache.spark.util.Utils
/**
@@ -282,10 +282,10 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit
}
-private class SaveExecutorInfo extends SparkListener {
+private[spark] class SaveExecutorInfo extends SparkListener {
val addedExecutorInfos = mutable.Map[String, ExecutorInfo]()
- override def onExecutorAdded(executor : SparkListenerExecutorAdded) {
+ override def onExecutorAdded(executor: SparkListenerExecutorAdded) {
addedExecutorInfos(executor.executorId) = executor.executorInfo
}
}
@@ -293,7 +293,6 @@ private class SaveExecutorInfo extends SparkListener {
private object YarnClusterDriver extends Logging with Matchers {
val WAIT_TIMEOUT_MILLIS = 10000
- var listener: SaveExecutorInfo = null
def main(args: Array[String]): Unit = {
if (args.length != 1) {
@@ -306,10 +305,9 @@ private object YarnClusterDriver extends Logging with Matchers {
System.exit(1)
}
- listener = new SaveExecutorInfo
val sc = new SparkContext(new SparkConf()
+ .set("spark.extraListeners", classOf[SaveExecutorInfo].getName)
.setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns"))
- sc.addSparkListener(listener)
val status = new File(args(0))
var result = "failure"
try {
@@ -323,7 +321,12 @@ private object YarnClusterDriver extends Logging with Matchers {
}
// verify log urls are present
- listener.addedExecutorInfos.values.foreach { info =>
+ val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo]
+ assert(listeners.size === 1)
+ val listener = listeners(0)
+ val executorInfos = listener.addedExecutorInfos.values
+ assert(executorInfos.nonEmpty)
+ executorInfos.foreach { info =>
assert(info.logUrlMap.nonEmpty)
}
}