aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorKousuke Saruta <sarutak@oss.nttdata.co.jp>2015-03-25 13:27:15 -0700
committerAndrew Or <andrew@databricks.com>2015-03-25 13:27:15 -0700
commitacef51defb991bcdc99b76cf2a126afd6d60ec70 (patch)
treea2933a2773e291a1a474b44f3d4241cdcf7b3b76 /core
parent883b7e9030e1a3948acee17608e51dcd9f4d55e1 (diff)
downloadspark-acef51defb991bcdc99b76cf2a126afd6d60ec70.tar.gz
spark-acef51defb991bcdc99b76cf2a126afd6d60ec70.tar.bz2
spark-acef51defb991bcdc99b76cf2a126afd6d60ec70.zip
[SPARK-6537] UIWorkloadGenerator: The main thread should not stop SparkContext until all jobs finish
The main thread of UIWorkloadGenerator spawn sub threads to launch jobs but the main thread stop SparkContext without waiting for finishing those threads. Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp> Closes #5187 from sarutak/SPARK-6537 and squashes the following commits: 4e9307a [Kousuke Saruta] Fixed UIWorkloadGenerator so that the main thread stop SparkContext after all jobs finish
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala9
1 files changed, 9 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
index 19ac7a826e..5fbcd6bb8a 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
@@ -17,6 +17,8 @@
package org.apache.spark.ui
+import java.util.concurrent.Semaphore
+
import scala.util.Random
import org.apache.spark.{SparkConf, SparkContext}
@@ -88,6 +90,8 @@ private[spark] object UIWorkloadGenerator {
("Job with delays", baseData.map(x => Thread.sleep(100)).count)
)
+ val barrier = new Semaphore(-nJobSet * jobs.size + 1)
+
(1 to nJobSet).foreach { _ =>
for ((desc, job) <- jobs) {
new Thread {
@@ -99,12 +103,17 @@ private[spark] object UIWorkloadGenerator {
} catch {
case e: Exception =>
println("Job Failed: " + desc)
+ } finally {
+ barrier.release()
}
}
}.start
Thread.sleep(INTER_JOB_WAIT_MS)
}
}
+
+ // Waiting for threads.
+ barrier.acquire()
sc.stop()
}
}