diff options
author | Kousuke Saruta <sarutak@oss.nttdata.co.jp> | 2015-03-25 13:27:15 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-03-25 13:27:15 -0700 |
commit | acef51defb991bcdc99b76cf2a126afd6d60ec70 (patch) | |
tree | a2933a2773e291a1a474b44f3d4241cdcf7b3b76 /core | |
parent | 883b7e9030e1a3948acee17608e51dcd9f4d55e1 (diff) | |
download | spark-acef51defb991bcdc99b76cf2a126afd6d60ec70.tar.gz spark-acef51defb991bcdc99b76cf2a126afd6d60ec70.tar.bz2 spark-acef51defb991bcdc99b76cf2a126afd6d60ec70.zip |
[SPARK-6537] UIWorkloadGenerator: The main thread should not stop SparkContext until all jobs finish
The main thread of UIWorkloadGenerator spawn sub threads to launch jobs but the main thread stop SparkContext without waiting for finishing those threads.
Author: Kousuke Saruta <sarutak@oss.nttdata.co.jp>
Closes #5187 from sarutak/SPARK-6537 and squashes the following commits:
4e9307a [Kousuke Saruta] Fixed UIWorkloadGenerator so that the main thread stop SparkContext after all jobs finish
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala | 9 |
1 files changed, 9 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala index 19ac7a826e..5fbcd6bb8a 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala @@ -17,6 +17,8 @@ package org.apache.spark.ui +import java.util.concurrent.Semaphore + import scala.util.Random import org.apache.spark.{SparkConf, SparkContext} @@ -88,6 +90,8 @@ private[spark] object UIWorkloadGenerator { ("Job with delays", baseData.map(x => Thread.sleep(100)).count) ) + val barrier = new Semaphore(-nJobSet * jobs.size + 1) + (1 to nJobSet).foreach { _ => for ((desc, job) <- jobs) { new Thread { @@ -99,12 +103,17 @@ private[spark] object UIWorkloadGenerator { } catch { case e: Exception => println("Job Failed: " + desc) + } finally { + barrier.release() } } }.start Thread.sleep(INTER_JOB_WAIT_MS) } } + + // Waiting for threads. + barrier.acquire() sc.stop() } } |