aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorhaoyuan <haoyuan@eecs.berkeley.edu>2012-09-06 20:54:52 -0700
committerhaoyuan <haoyuan@eecs.berkeley.edu>2012-09-06 20:54:52 -0700
commit381e2c7ac4cba952de2bf8b0090ff0799829cf30 (patch)
tree3dccd042ebec761aed4078eaec1abe29d0a1cfbc /streaming
parent0681bbc5d9e353e6400087d36a758e89d423bca3 (diff)
downloadspark-381e2c7ac4cba952de2bf8b0090ff0799829cf30.tar.gz
spark-381e2c7ac4cba952de2bf8b0090ff0799829cf30.tar.bz2
spark-381e2c7ac4cba952de2bf8b0090ff0799829cf30.zip
add warmup code for TopKWordCountRaw.scala
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala19
1 files changed, 14 insertions, 5 deletions
diff --git a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
index be3188c5ed..3ba07d0448 100644
--- a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
@@ -1,11 +1,24 @@
package spark.streaming.examples
import spark.util.IntParam
+import spark.SparkContext
+import spark.SparkContext._
import spark.storage.StorageLevel
import spark.streaming._
import spark.streaming.StreamingContext._
+import WordCount2_ExtraFunctions._
+
object TopKWordCountRaw {
+ def moreWarmup(sc: SparkContext) {
+ (0 until 40).foreach {i =>
+ sc.parallelize(1 to 20000000, 1000)
+ .map(_ % 1331).map(_.toString)
+ .mapPartitions(splitAndCountPartitions).reduceByKey(_ + _, 10)
+ .collect()
+ }
+ }
+
def main(args: Array[String]) {
if (args.length != 7) {
System.err.println("Usage: TopKWordCountRaw <master> <streams> <host> <port> <batchMs> <chkptMs> <reduces>")
@@ -20,16 +33,12 @@ object TopKWordCountRaw {
ssc.setBatchDuration(Milliseconds(batchMs))
// Make sure some tasks have started on each node
- ssc.sc.parallelize(1 to 1000, 1000).count()
- ssc.sc.parallelize(1 to 1000, 1000).count()
- ssc.sc.parallelize(1 to 1000, 1000).count()
+ moreWarmup(ssc.sc)
val rawStreams = (1 to streams).map(_ =>
ssc.createRawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray
val union = new UnifiedDStream(rawStreams)
- import WordCount2_ExtraFunctions._
-
val windowedCounts = union.mapPartitions(splitAndCountPartitions)
.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Milliseconds(batchMs), reduces)
windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2,