From 381e2c7ac4cba952de2bf8b0090ff0799829cf30 Mon Sep 17 00:00:00 2001 From: haoyuan Date: Thu, 6 Sep 2012 20:54:52 -0700 Subject: add warmup code for TopKWordCountRaw.scala --- .../spark/streaming/examples/TopKWordCountRaw.scala | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) (limited to 'streaming') diff --git a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala index be3188c5ed..3ba07d0448 100644 --- a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala +++ b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala @@ -1,11 +1,24 @@ package spark.streaming.examples import spark.util.IntParam +import spark.SparkContext +import spark.SparkContext._ import spark.storage.StorageLevel import spark.streaming._ import spark.streaming.StreamingContext._ +import WordCount2_ExtraFunctions._ + object TopKWordCountRaw { + def moreWarmup(sc: SparkContext) { + (0 until 40).foreach {i => + sc.parallelize(1 to 20000000, 1000) + .map(_ % 1331).map(_.toString) + .mapPartitions(splitAndCountPartitions).reduceByKey(_ + _, 10) + .collect() + } + } + def main(args: Array[String]) { if (args.length != 7) { System.err.println("Usage: TopKWordCountRaw ") @@ -20,16 +33,12 @@ object TopKWordCountRaw { ssc.setBatchDuration(Milliseconds(batchMs)) // Make sure some tasks have started on each node - ssc.sc.parallelize(1 to 1000, 1000).count() - ssc.sc.parallelize(1 to 1000, 1000).count() - ssc.sc.parallelize(1 to 1000, 1000).count() + moreWarmup(ssc.sc) val rawStreams = (1 to streams).map(_ => ssc.createRawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray val union = new UnifiedDStream(rawStreams) - import WordCount2_ExtraFunctions._ - val windowedCounts = union.mapPartitions(splitAndCountPartitions) .reduceByKeyAndWindow(add _, subtract _, Seconds(30), Milliseconds(batchMs), reduces) windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2, -- cgit v1.2.3