diff options
author | root <root@ip-10-190-138-117.ec2.internal> | 2012-09-05 07:08:07 +0000 |
---|---|---|
committer | root <root@ip-10-190-138-117.ec2.internal> | 2012-09-05 07:08:07 +0000 |
commit | b7ad291ac52896af6cb1d882392f3d6fa0cf3b49 (patch) | |
tree | a7e2259e338a223a0ec4c68cea490602cd126ee0 /streaming | |
parent | 25fd684b89ac5bdc6675b0a5d5e3caa9fe608d92 (diff) | |
download | spark-b7ad291ac52896af6cb1d882392f3d6fa0cf3b49.tar.gz spark-b7ad291ac52896af6cb1d882392f3d6fa0cf3b49.tar.bz2 spark-b7ad291ac52896af6cb1d882392f3d6fa0cf3b49.zip |
Tuning Akka for more connections
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/examples/WordCount2.scala | 10 |
1 files changed, 5 insertions, 5 deletions
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala index aa542ba07d..8561e7f079 100644 --- a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala @@ -62,10 +62,10 @@ object WordCount2_ExtraFunctions { object WordCount2 { def warmup(sc: SparkContext) { - (0 until 10).foreach {i => - sc.parallelize(1 to 20000000, 1000) + (0 until 3).foreach {i => + sc.parallelize(1 to 20000000, 500) .map(x => (x % 337, x % 1331)) - .reduceByKey(_ + _) + .reduceByKey(_ + _, 100) .count() } } @@ -84,11 +84,11 @@ object WordCount2 { val ssc = new StreamingContext(master, "WordCount2") ssc.setBatchDuration(batchDuration) - //warmup(ssc.sc) + warmup(ssc.sc) val data = ssc.sc.textFile(file, mapTasks.toInt).persist( new StorageLevel(false, true, false, 3)) // Memory only, serialized, 3 replicas - println("Data count: " + data.count()) + println("Data count: " + data.map(x => if (x == "") 1 else x.split(" ").size / x.split(" ").size).count()) println("Data count: " + data.count()) println("Data count: " + data.count()) |