aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2013-07-01 09:33:22 -0700
committerPatrick Wendell <pwendell@gmail.com>2013-07-01 09:33:22 -0700
commit735c951a09f71512cf7be834802d1e88567ca9ae (patch)
treeec9bf118621aad96354587f2a9f9b268ae75983c
parent5de326db7d5235dbf259a4c8b388cb24c40fd2ec (diff)
downloadspark-735c951a09f71512cf7be834802d1e88567ca9ae.tar.gz
spark-735c951a09f71512cf7be834802d1e88567ca9ae.tar.bz2
spark-735c951a09f71512cf7be834802d1e88567ca9ae.zip
Adding test script
-rw-r--r--core/src/main/scala/spark/ui/UIWorkloadGenerator.scala61
1 files changed, 61 insertions, 0 deletions
diff --git a/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala
new file mode 100644
index 0000000000..24cfe36aaa
--- /dev/null
+++ b/core/src/main/scala/spark/ui/UIWorkloadGenerator.scala
@@ -0,0 +1,61 @@
+package spark.ui
+
+import scala.util.Random
+
+import spark.SparkContext
+import spark.SparkContext._
+
+/**
+ * Continuously generates jobs that expose various features of the WebUI (internal testing tool).
+ *
+ * Usage: ./run spark.ui.UIWorkloadGenerator [master]
+ */
+private[spark] object UIWorkloadGenerator {
+ val NUM_PARTITIONS = 100
+ val INTER_JOB_WAIT_MS = 500
+
+ def main(args: Array[String]) {
+ val master = args(0)
+ val appName = "Spark UI Tester"
+ val sc = new SparkContext(master, appName)
+
+ // NOTE: Right now there is no easy way for us to show spark.job.annotation for a given phase,
+ // but we pass it here anyways since it will be useful once we do.
+ def setName(s: String) = {
+ sc.addLocalProperties("spark.job.annotation", s)
+ }
+ val baseData = sc.makeRDD(1 to NUM_PARTITIONS * 10, NUM_PARTITIONS)
+ def nextFloat() = (new Random()).nextFloat()
+
+ val jobs = Seq[(String, () => Long)](
+ ("Count", baseData.count),
+ ("Cache and Count", baseData.map(x => x).cache.count),
+ ("Single Shuffle", baseData.map(x => (x % 10, x)).reduceByKey(_ + _).count),
+ ("Entirely failed phase", baseData.map(x => throw new Exception).count),
+ ("Partially failed phase", {
+ baseData.map{x =>
+ val probFailure = (4.0 / NUM_PARTITIONS)
+ if (nextFloat() < probFailure) {
+ throw new Exception("This is a task failure")
+ }
+ 1
+ }.count
+ }),
+ ("Job with delays", baseData.map(x => Thread.sleep(1000)).count)
+ )
+
+ while (true) {
+ for ((desc, job) <- jobs) {
+ try {
+ setName(desc)
+ job()
+ println("Job funished: " + desc)
+ } catch {
+ case e: Exception =>
+ println("Job Failed: " + desc)
+ }
+ Thread.sleep(INTER_JOB_WAIT_MS)
+ }
+ }
+ }
+}