aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKousuke Saruta <sarutak@oss.nttdata.co.jp>2015-04-14 14:00:49 -0700
committerAndrew Or <andrew@databricks.com>2015-04-14 14:01:55 -0700
commit4d4b24927417b2c17810e94d6d46c37491c68869 (patch)
tree98f86d116a45aabf1c9de6ab63bb7b0097fe81da /core
parent65774370a1275e25cd8a3357e397d116767793a9 (diff)
downloadspark-4d4b24927417b2c17810e94d6d46c37491c68869.tar.gz
spark-4d4b24927417b2c17810e94d6d46c37491c68869.tar.bz2
spark-4d4b24927417b2c17810e94d6d46c37491c68869.zip
[SPARK-6769][YARN][TEST] Usage of the ListenerBus in YarnClusterSuite is wrong
In YarnClusterSuite, a test case uses `SaveExecutorInfo` to handle ExecutorAddedEvent as follows. ``` private class SaveExecutorInfo extends SparkListener { val addedExecutorInfos = mutable.Map[String, ExecutorInfo]() override def onExecutorAdded(executor: SparkListenerExecutorAdded) { addedExecutorInfos(executor.executorId) = executor.executorInfo } } ... listener = new SaveExecutorInfo val sc = new SparkContext(new SparkConf() .setAppName("yarn \"test app\" 'with quotes' and \\back\\slashes and $dollarSigns")) sc.addSparkListener(listener) val status = new File(args(0)) var result = "failure" try { val data = sc.parallelize(1 to 4, 4).collect().toSet assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)) data should be (Set(1, 2, 3, 4)) result = "success" } finally { sc.stop() Files.write(result, status, UTF_8) } ``` But, the usage is wrong because Executors will spawn during initializing SparkContext and SparkContext#addSparkListener should be invoked after the initialization, thus after Executors spawn, so SaveExecutorInfo cannot handle ExecutorAddedEvent. Following code refers the result of the handling ExecutorAddedEvent. Because of the reason above, we cannot reach the assertion. ``` // verify log urls are present listener.addedExecutorInfos.values.foreach { info => assert(info.logUrlMap.nonEmpty) } ``` Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp> Closes #5417 from sarutak/SPARK-6769 and squashes the following commits: 8adc8ba [Kousuke Saruta] Fixed compile error e258530 [Kousuke Saruta] Fixed style 591cf3e [Kousuke Saruta] Fixed style 48ec89a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-6769 860c965 [Kousuke Saruta] Simplified code 207d325 [Kousuke Saruta] Added findListenersByClass method to ListenerBus 2408c84 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-6769 2d7e409 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-6769 3874adf [Kousuke Saruta] Fixed the usage of listener bus in LogUrlsStandaloneSuite 153a91b [Kousuke Saruta] Fixed the usage of listener bus in YarnClusterSuite
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/util/ListenerBus.scala8
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala20
2 files changed, 19 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
index d60b8b9a31..a725767d08 100644
--- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala
@@ -19,9 +19,12 @@ package org.apache.spark.util
import java.util.concurrent.CopyOnWriteArrayList
+import scala.collection.JavaConversions._
+import scala.reflect.ClassTag
import scala.util.control.NonFatal
import org.apache.spark.Logging
+import org.apache.spark.scheduler.SparkListener
/**
* An event bus which posts events to its listeners.
@@ -64,4 +67,9 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
*/
def onPostEvent(listener: L, event: E): Unit
+ private[spark] def findListenersByClass[T <: L : ClassTag](): Seq[T] = {
+ val c = implicitly[ClassTag[T]].runtimeClass
+ listeners.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq
+ }
+
}
diff --git a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
index 9cdb42814c..c93d16f8a1 100644
--- a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy
import java.net.URL
+import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.io.Source
@@ -65,16 +66,17 @@ class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext {
new MySparkConf().setAll(getAll)
}
}
- val conf = new MySparkConf()
+ val conf = new MySparkConf().set(
+ "spark.extraListeners", classOf[SaveExecutorInfo].getName)
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
- val listener = new SaveExecutorInfo
- sc.addSparkListener(listener)
-
// Trigger a job so that executors get added
sc.parallelize(1 to 100, 4).map(_.toString).count()
assert(sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
+ val listeners = sc.listenerBus.findListenersByClass[SaveExecutorInfo]
+ assert(listeners.size === 1)
+ val listener = listeners(0)
listener.addedExecutorInfos.values.foreach { info =>
assert(info.logUrlMap.nonEmpty)
info.logUrlMap.values.foreach { logUrl =>
@@ -82,12 +84,12 @@ class LogUrlsStandaloneSuite extends FunSuite with LocalSparkContext {
}
}
}
+}
- private class SaveExecutorInfo extends SparkListener {
- val addedExecutorInfos = mutable.Map[String, ExecutorInfo]()
+private[spark] class SaveExecutorInfo extends SparkListener {
+ val addedExecutorInfos = mutable.Map[String, ExecutorInfo]()
- override def onExecutorAdded(executor: SparkListenerExecutorAdded) {
- addedExecutorInfos(executor.executorId) = executor.executorInfo
- }
+ override def onExecutorAdded(executor: SparkListenerExecutorAdded) {
+ addedExecutorInfos(executor.executorId) = executor.executorInfo
}
}