diff options
author | Kousuke Saruta <sarutak@oss.nttdata.co.jp> | 2015-04-14 14:00:49 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-04-14 14:01:55 -0700 |
commit | 4d4b24927417b2c17810e94d6d46c37491c68869 (patch) | |
tree | 98f86d116a45aabf1c9de6ab63bb7b0097fe81da /core/src | |
parent | 65774370a1275e25cd8a3357e397d116767793a9 (diff) | |
download | spark-4d4b24927417b2c17810e94d6d46c37491c68869.tar.gz spark-4d4b24927417b2c17810e94d6d46c37491c68869.tar.bz2 spark-4d4b24927417b2c17810e94d6d46c37491c68869.zip |
[SPARK-6769][YARN][TEST] Usage of the ListenerBus in YarnClusterSuite is wrong
In YarnClusterSuite, a test case uses `SaveExecutorInfo` to handle ExecutorAddedEvent as follows.
```
private class SaveExecutorInfo extends SparkListener {
val addedExecutorInfos = mutable.Map[String, ExecutorInfo]()
override def onExecutorAdded(executor: SparkListenerExecutorAdded) {
addedExecutorInfos(executor.executorId) = executor.executorInfo
}
}
...
listener = new SaveExecutorInfo
val sc = new SparkContext(new SparkConf()
.setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns"))
sc.addSparkListener(listener)
val status = new File(args(0))
var result = "failure"
try {
val data = sc.parallelize(1 to 4, 4).collect().toSet
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
data should be (Set(1, 2, 3, 4))
result = "success"
} finally {
sc.stop()
Files.write(result, status, UTF_8)
}
```
But, the usage is wrong because Executors will spawn during initializing SparkContext and SparkContext#addSparkListener should be invoked after the initialization, thus after Executors spawn, so SaveExecutorInfo cannot handle ExecutorAddedEvent.
Following code refers the result of the handling ExecutorAddedEvent. Because of the reason above, we cannot reach the assertion.
```
// verify log urls are present
listener.addedExecutorInfos.values.foreach { info =>
assert(info.logUrlMap.nonEmpty)
}
```
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>
Closes #5417 from sarutak/SPARK-6769 and squashes the following commits:
8adc8ba [Kousuke Saruta] Fixed compile error
e258530 [Kousuke Saruta] Fixed style
591cf3e [Kousuke Saruta] Fixed style
48ec89a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-6769
860c965 [Kousuke Saruta] Simplified code
207d325 [Kousuke Saruta] Added findListenersByClass method to ListenerBus
2408c84 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-6769
2d7e409 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-6769
3874adf [Kousuke Saruta] Fixed the usage of listener bus in LogUrlsStandaloneSuite
153a91b [Kousuke Saruta] Fixed the usage of listener bus in YarnClusterSuite
Diffstat (limited to 'core/src')
-rw-r--r-- | core/src/main/scala/org/apache/spark/util/ListenerBus.scala | 8 | ||||
-rw-r--r-- | core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala | 20 |
2 files changed, 19 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala index d60b8b9a31..a725767d08 100644 --- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala @@ -19,9 +19,12 @@ package org.apache.spark.util import java.util.concurrent.CopyOnWriteArrayList +import scala.collection.JavaConversions._ +import scala.reflect.ClassTag import scala.util.control.NonFatal import org.apache.spark.Logging +import org.apache.spark.scheduler.SparkListener /** * An event bus which posts events to its listeners. @@ -64,4 +67,9 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { */ def onPostEvent(listener: L, event: E): Unit + private[spark] def findListenersByClass[T <: L : ClassTag](): Seq[T] = { + val c = implicitly[ClassTag[T]].runtimeClass + listeners.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq + } + } diff --git a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala index 9cdb42814c..c93d16f8a1 100644 --- a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy import java.net.URL +import scala.collection.JavaConversions._ import scala.collection.mutable import scala.io.Source @@ -65,16 +66,17 @@ class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext { new MySparkConf().setAll(getAll) } } - val conf = new MySparkConf() + val conf = new MySparkConf().set( + "spark.extraListeners", classOf[SaveExecutorInfo].getName) sc = new SparkContext("local-cluster[2,1,512]", "test", conf) - val listener = new SaveExecutorInfo - sc.addSparkListener(listener) - // Trigger a job so that executors get added sc.parallelize(1 to 100, 4).map(_.toString).count() assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) + val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo] + assert(listeners.size === 1) + val listener = listeners(0) listener.addedExecutorInfos.values.foreach { info => assert(info.logUrlMap.nonEmpty) info.logUrlMap.values.foreach { logUrl => @@ -82,12 +84,12 @@ class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext { } } } +} - private class SaveExecutorInfo extends SparkListener { - val addedExecutorInfos = mutable.Map[String, ExecutorInfo]() +private[spark] class SaveExecutorInfo extends SparkListener { + val addedExecutorInfos = mutable.Map[String, ExecutorInfo]() - override def onExecutorAdded(executor: SparkListenerExecutorAdded) { - addedExecutorInfos(executor.executorId) = executor.executorInfo - } + override def onExecutorAdded(executor: SparkListenerExecutorAdded) { + addedExecutorInfos(executor.executorId) = executor.executorInfo } } |