diff options
author | root <root@domU-12-31-39-0C-C8-96.compute-1.internal> | 2012-08-31 00:34:57 +0000 |
---|---|---|
committer | root <root@domU-12-31-39-0C-C8-96.compute-1.internal> | 2012-08-31 00:34:57 +0000 |
commit | d4d2cb670f9a19a6ab151e51398ba135445f1bb9 (patch) | |
tree | b5d0e7f3e91eccf01e1641c325e7bd7217657050 /streaming | |
parent | c4366eb76425d1c6aeaa7df750a2681a0da75db8 (diff) | |
download | spark-d4d2cb670f9a19a6ab151e51398ba135445f1bb9.tar.gz spark-d4d2cb670f9a19a6ab151e51398ba135445f1bb9.tar.bz2 spark-d4d2cb670f9a19a6ab151e51398ba135445f1bb9.zip |
Make checkpoint interval configurable in WordCount2
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/main/scala/spark/streaming/examples/WordCount2.scala | 15 |
1 files changed, 8 insertions, 7 deletions
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala index 3b2d909584..d4b7461099 100644 --- a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala @@ -70,17 +70,17 @@ object WordCount2 { def main (args: Array[String]) { - if (args.length != 5) { - println ("Usage: WordCount2 <host> <file> <mapTasks> <reduceTasks> <batchMillis>") + if (args.length != 6) { + println ("Usage: WordCount2 <host> <file> <mapTasks> <reduceTasks> <batchMillis> <chkptMillis>") System.exit(1) } - val Array(master, file, mapTasks, reduceTasks, batchMillis) = args + val Array(master, file, mapTasks, reduceTasks, batchMillis, chkptMillis) = args - val BATCH_DURATION = Milliseconds(batchMillis.toLong) + val batchDuration = Milliseconds(batchMillis.toLong) val ssc = new StreamingContext(master, "WordCount2") - ssc.setBatchDuration(BATCH_DURATION) + ssc.setBatchDuration(batchDuration) val data = ssc.sc.textFile(file, mapTasks.toInt).persist(StorageLevel.MEMORY_ONLY_DESER_2) println("Data count: " + data.count()) @@ -94,8 +94,9 @@ object WordCount2 { val windowedCounts = sentences .mapPartitions(splitAndCountPartitions) - .reduceByKeyAndWindow(add _, subtract _, Seconds(10), BATCH_DURATION, reduceTasks.toInt) - windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2, Seconds(10)) + .reduceByKeyAndWindow(add _, subtract _, Seconds(10), batchDuration, reduceTasks.toInt) + windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2, + Milliseconds(chkptMillis.toLong)) windowedCounts.print() ssc.start() |