aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorroot <root@domU-12-31-39-0C-C8-96.compute-1.internal>2012-08-31 00:34:57 +0000
committerroot <root@domU-12-31-39-0C-C8-96.compute-1.internal>2012-08-31 00:34:57 +0000
commitd4d2cb670f9a19a6ab151e51398ba135445f1bb9 (patch)
treeb5d0e7f3e91eccf01e1641c325e7bd7217657050 /streaming
parentc4366eb76425d1c6aeaa7df750a2681a0da75db8 (diff)
downloadspark-d4d2cb670f9a19a6ab151e51398ba135445f1bb9.tar.gz
spark-d4d2cb670f9a19a6ab151e51398ba135445f1bb9.tar.bz2
spark-d4d2cb670f9a19a6ab151e51398ba135445f1bb9.zip
Make checkpoint interval configurable in WordCount2
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCount2.scala15
1 files changed, 8 insertions, 7 deletions
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala
index 3b2d909584..d4b7461099 100644
--- a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala
@@ -70,17 +70,17 @@ object WordCount2 {
def main (args: Array[String]) {
- if (args.length != 5) {
- println ("Usage: WordCount2 <host> <file> <mapTasks> <reduceTasks> <batchMillis>")
+ if (args.length != 6) {
+ println ("Usage: WordCount2 <host> <file> <mapTasks> <reduceTasks> <batchMillis> <chkptMillis>")
System.exit(1)
}
- val Array(master, file, mapTasks, reduceTasks, batchMillis) = args
+ val Array(master, file, mapTasks, reduceTasks, batchMillis, chkptMillis) = args
- val BATCH_DURATION = Milliseconds(batchMillis.toLong)
+ val batchDuration = Milliseconds(batchMillis.toLong)
val ssc = new StreamingContext(master, "WordCount2")
- ssc.setBatchDuration(BATCH_DURATION)
+ ssc.setBatchDuration(batchDuration)
val data = ssc.sc.textFile(file, mapTasks.toInt).persist(StorageLevel.MEMORY_ONLY_DESER_2)
println("Data count: " + data.count())
@@ -94,8 +94,9 @@ object WordCount2 {
val windowedCounts = sentences
.mapPartitions(splitAndCountPartitions)
- .reduceByKeyAndWindow(add _, subtract _, Seconds(10), BATCH_DURATION, reduceTasks.toInt)
- windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2, Seconds(10))
+ .reduceByKeyAndWindow(add _, subtract _, Seconds(10), batchDuration, reduceTasks.toInt)
+ windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2,
+ Milliseconds(chkptMillis.toLong))
windowedCounts.print()
ssc.start()