aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorroot <root@ip-10-190-138-117.ec2.internal>2012-09-05 07:08:07 +0000
committerroot <root@ip-10-190-138-117.ec2.internal>2012-09-05 07:08:07 +0000
commitb7ad291ac52896af6cb1d882392f3d6fa0cf3b49 (patch)
treea7e2259e338a223a0ec4c68cea490602cd126ee0 /streaming
parent25fd684b89ac5bdc6675b0a5d5e3caa9fe608d92 (diff)
downloadspark-b7ad291ac52896af6cb1d882392f3d6fa0cf3b49.tar.gz
spark-b7ad291ac52896af6cb1d882392f3d6fa0cf3b49.tar.bz2
spark-b7ad291ac52896af6cb1d882392f3d6fa0cf3b49.zip
Tuning Akka for more connections
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCount2.scala10
1 files changed, 5 insertions, 5 deletions
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala
index aa542ba07d..8561e7f079 100644
--- a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala
@@ -62,10 +62,10 @@ object WordCount2_ExtraFunctions {
object WordCount2 {
def warmup(sc: SparkContext) {
- (0 until 10).foreach {i =>
- sc.parallelize(1 to 20000000, 1000)
+ (0 until 3).foreach {i =>
+ sc.parallelize(1 to 20000000, 500)
.map(x => (x % 337, x % 1331))
- .reduceByKey(_ + _)
+ .reduceByKey(_ + _, 100)
.count()
}
}
@@ -84,11 +84,11 @@ object WordCount2 {
val ssc = new StreamingContext(master, "WordCount2")
ssc.setBatchDuration(batchDuration)
- //warmup(ssc.sc)
+ warmup(ssc.sc)
val data = ssc.sc.textFile(file, mapTasks.toInt).persist(
new StorageLevel(false, true, false, 3)) // Memory only, serialized, 3 replicas
- println("Data count: " + data.count())
+ println("Data count: " + data.map(x => if (x == "") 1 else x.split(" ").size / x.split(" ").size).count())
println("Data count: " + data.count())
println("Data count: " + data.count())