aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorroot <root@domU-12-31-39-0C-C8-96.compute-1.internal>2012-08-31 07:16:19 +0000
committerroot <root@domU-12-31-39-0C-C8-96.compute-1.internal>2012-08-31 07:16:19 +0000
commite1da274a486b6fd5903d9b3643dc07c79973b81d (patch)
tree5b75c4ad6875c41d74cafb1b631389a712696a3b /streaming
parent113277549c5ee1bcd58c7cebc365d28d92b74b4a (diff)
downloadspark-e1da274a486b6fd5903d9b3643dc07c79973b81d.tar.gz
spark-e1da274a486b6fd5903d9b3643dc07c79973b81d.tar.bz2
spark-e1da274a486b6fd5903d9b3643dc07c79973b81d.zip
WordCount tweaks
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCount2.scala17
1 files changed, 10 insertions, 7 deletions
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala
index d4b7461099..a090dcb85d 100644
--- a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala
@@ -59,12 +59,12 @@ object WordCount2_ExtraFunctions {
object WordCount2 {
- def moreWarmup(sc: SparkContext) {
- (0 until 40).foreach {i =>
+ def warmup(sc: SparkContext) {
+ (0 until 10).foreach {i =>
sc.parallelize(1 to 20000000, 1000)
- .map(_ % 1331).map(_.toString)
- .mapPartitions(WordCount2_ExtraFunctions.splitAndCountPartitions).reduceByKey(_ + _, 10)
- .collect()
+ .map(x => (x % 337, x % 1331))
+ .reduceByKey(_ + _)
+ .count()
}
}
@@ -82,7 +82,10 @@ object WordCount2 {
val ssc = new StreamingContext(master, "WordCount2")
ssc.setBatchDuration(batchDuration)
- val data = ssc.sc.textFile(file, mapTasks.toInt).persist(StorageLevel.MEMORY_ONLY_DESER_2)
+ //warmup(ssc.sc)
+
+ val data = ssc.sc.textFile(file, mapTasks.toInt).persist(
+ new StorageLevel(false, true, true, 2)) // Memory only, deserialized, 2 replicas
println("Data count: " + data.count())
println("Data count: " + data.count())
println("Data count: " + data.count())
@@ -94,7 +97,7 @@ object WordCount2 {
val windowedCounts = sentences
.mapPartitions(splitAndCountPartitions)
- .reduceByKeyAndWindow(add _, subtract _, Seconds(10), batchDuration, reduceTasks.toInt)
+ .reduceByKeyAndWindow(add _, subtract _, Seconds(30), batchDuration, reduceTasks.toInt)
windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2,
Milliseconds(chkptMillis.toLong))
windowedCounts.print()