aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala9
1 files changed, 9 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
index 19ac7a826e..5fbcd6bb8a 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
@@ -17,6 +17,8 @@
package org.apache.spark.ui
+import java.util.concurrent.Semaphore
+
import scala.util.Random
import org.apache.spark.{SparkConf, SparkContext}
@@ -88,6 +90,8 @@ private[spark] object UIWorkloadGenerator {
("Job with delays", baseData.map(x => Thread.sleep(100)).count)
)
+ val barrier = new Semaphore(-nJobSet * jobs.size + 1)
+
(1 to nJobSet).foreach { _ =>
for ((desc, job) <- jobs) {
new Thread {
@@ -99,12 +103,17 @@ private[spark] object UIWorkloadGenerator {
} catch {
case e: Exception =>
println("Job Failed: " + desc)
+ } finally {
+ barrier.release()
}
}
}.start
Thread.sleep(INTER_JOB_WAIT_MS)
}
}
+
+ // Waiting for threads.
+ barrier.acquire()
sc.stop()
}
}