diff options
author | Patrick Wendell <pwendell@gmail.com> | 2013-07-01 09:33:22 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2013-07-01 09:33:22 -0700 |
commit | 735c951a09f71512cf7be834802d1e88567ca9ae (patch) | |
tree | ec9bf118621aad96354587f2a9f9b268ae75983c /core/src/main | |
parent | 5de326db7d5235dbf259a4c8b388cb24c40fd2ec (diff) | |
download | spark-735c951a09f71512cf7be834802d1e88567ca9ae.tar.gz spark-735c951a09f71512cf7be834802d1e88567ca9ae.tar.bz2 spark-735c951a09f71512cf7be834802d1e88567ca9ae.zip |
Adding test script
Diffstat (limited to 'core/src/main')
-rw-r--r-- | core/src/main/scala/spark/ui/UIWorkloadGenerator.scala | 61 |
1 files changed, 61 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala new file mode 100644 index 0000000000..24cfe36aaa --- /dev/null +++ b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala @@ -0,0 +1,61 @@ +package spark.ui + +import scala.util.Random + +import spark.SparkContext +import spark.SparkContext._ + +/** + * Continuously generates jobs that expose various features of the WebUI (internal testing tool). + * + * Usage: ./run spark.ui.UIWorkloadGenerator [master] + */ +private[spark] object UIWorkloadGenerator { + val NUM_PARTITIONS = 100 + val INTER_JOB_WAIT_MS = 500 + + def main(args: Array[String]) { + val master = args(0) + val appName = "Spark UI Tester" + val sc = new SparkContext(master, appName) + + // NOTE: Right now there is no easy way for us to show spark.job.annotation for a given phase, + // but we pass it here anyways since it will be useful once we do. + def setName(s: String) = { + sc.addLocalProperties("spark.job.annotation", s) + } + val baseData = sc.makeRDD(1 to NUM_PARTITIONS * 10, NUM_PARTITIONS) + def nextFloat() = (new Random()).nextFloat() + + val jobs = Seq[(String, () => Long)]( + ("Count", baseData.count), + ("Cache and Count", baseData.map(x => x).cache.count), + ("Single Shuffle", baseData.map(x => (x % 10, x)).reduceByKey(_ + _).count), + ("Entirely failed phase", baseData.map(x => throw new Exception).count), + ("Partially failed phase", { + baseData.map{x => + val probFailure = (4.0 / NUM_PARTITIONS) + if (nextFloat() < probFailure) { + throw new Exception("This is a task failure") + } + 1 + }.count + }), + ("Job with delays", baseData.map(x => Thread.sleep(1000)).count) + ) + + while (true) { + for ((desc, job) <- jobs) { + try { + setName(desc) + job() + println("Job funished: " + desc) + } catch { + case e: Exception => + println("Job Failed: " + desc) + } + Thread.sleep(INTER_JOB_WAIT_MS) + } + } + } +} |