diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2012-11-12 21:45:16 +0000 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2012-11-12 21:45:16 +0000 |
commit | ae61ebaee64fad117155d65bcdfc8520bda0e6b4 (patch) | |
tree | 12e0c6f0ddc70cfce11961f0ecd263c147eb10f0 | |
parent | 052d0b800ffe1bcfddc33a6fb3ad71e169b219bb (diff) | |
download | spark-ae61ebaee64fad117155d65bcdfc8520bda0e6b4.tar.gz spark-ae61ebaee64fad117155d65bcdfc8520bda0e6b4.tar.bz2 spark-ae61ebaee64fad117155d65bcdfc8520bda0e6b4.zip |
Fixed bugs in RawNetworkInputDStream and in its examples. Made the ReducedWindowedDStream persist RDDs to MEMOERY_SER_ONLY by default. Removed unncessary examples. Added streaming-env.sh.template to add recommended setting for streaming.
14 files changed, 193 insertions, 379 deletions
diff --git a/conf/streaming-env.sh.template b/conf/streaming-env.sh.template new file mode 100755 index 0000000000..6b4094c515 --- /dev/null +++ b/conf/streaming-env.sh.template @@ -0,0 +1,22 @@ +#!/usr/bin/env bash + +# This file contains a few additional setting that are useful for +# running streaming jobs in Spark. Copy this file as streaming-env.sh . +# Note that this shell script will be read after spark-env.sh, so settings +# in this file may override similar settings (if present) in spark-env.sh . + + +# Using concurrent GC is strongly recommended as it can significantly +# reduce GC related pauses. + +SPARK_JAVA_OPTS+=" -XX:+UseConcMarkSweepGC" + +# Using of Kryo serialization can improve serialization performance +# and therefore the throughput of the Spark Streaming programs. However, +# using Kryo serialization with custom classes may required you to +# register the classes with Kryo. Refer to the Spark documentation +# for more details. + +# SPARK_JAVA_OPTS+=" -Dspark.serializer=spark.KryoSerializer" + +export SPARK_JAVA_OPTS @@ -13,6 +13,10 @@ if [ -e $FWDIR/conf/spark-env.sh ] ; then . $FWDIR/conf/spark-env.sh fi +if [ -e $FWDIR/conf/streaming-env.sh ] ; then + . $FWDIR/conf/streaming-env.sh +fi + if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then if [ `command -v scala` ]; then RUNNER="scala" diff --git a/startTrigger.sh b/startTrigger.sh deleted file mode 100755 index 373dbda93e..0000000000 --- a/startTrigger.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/bash - -./run spark.streaming.SentenceGenerator localhost 7078 sentences.txt 1 diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala index 07ef79415d..d0fef70f7e 100644 --- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala +++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala @@ -47,6 +47,7 @@ class NetworkInputTracker( val result = queue.synchronized { queue.dequeueAll(x => true) } + logInfo("Stream " + receiverId + " received " + result.size + " blocks") result.toArray } diff --git a/streaming/src/main/scala/spark/streaming/RawInputDStream.scala b/streaming/src/main/scala/spark/streaming/RawInputDStream.scala index e022b85fbe..03726bfba6 100644 --- a/streaming/src/main/scala/spark/streaming/RawInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/RawInputDStream.scala @@ -69,7 +69,7 @@ class RawNetworkReceiver(streamId: Int, host: String, port: Int, storageLevel: S } def onStop() { - blockPushingThread.interrupt() + if (blockPushingThread != null) blockPushingThread.interrupt() } /** Read a buffer fully from a given Channel */ diff --git a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala index 6df82c0df3..b07d51fa6b 100644 --- a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala @@ -31,10 +31,14 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")" ) - super.persist(StorageLevel.MEMORY_ONLY) - + // Reduce each batch of data using reduceByKey which will be further reduced by window + // by ReducedWindowedDStream val reducedStream = parent.reduceByKey(reduceFunc, partitioner) + // Persist RDDs to memory by default as these RDDs are going to be reused. + super.persist(StorageLevel.MEMORY_ONLY_SER) + reducedStream.persist(StorageLevel.MEMORY_ONLY_SER) + def windowTime: Time = _windowTime override def dependencies = List(reducedStream) @@ -57,13 +61,6 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( this } - protected[streaming] override def setRememberDuration(time: Time) { - if (rememberDuration == null || rememberDuration < time) { - rememberDuration = time - dependencies.foreach(_.setRememberDuration(rememberDuration + windowTime)) - } - } - override def compute(validTime: Time): Option[RDD[(K, V)]] = { val reduceF = reduceFunc val invReduceF = invReduceFunc diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index eb83aaee7a..ab6d6e8dea 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -124,7 +124,7 @@ final class StreamingContext ( def rawNetworkStream[T: ClassManifest]( hostname: String, port: Int, - storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2 + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): DStream[T] = { val inputStream = new RawInputDStream[T](this, hostname, port, storageLevel) graph.addInputStream(inputStream) @@ -132,7 +132,7 @@ final class StreamingContext ( } /** - * This function creates a input stream that monitors a Hadoop-compatible + * This function creates a input stream that monitors a Hadoop-compatible filesystem * for new files and executes the necessary processing on them. */ def fileStream[ diff --git a/streaming/src/main/scala/spark/streaming/examples/Grep2.scala b/streaming/src/main/scala/spark/streaming/examples/Grep2.scala deleted file mode 100644 index b1faa65c17..0000000000 --- a/streaming/src/main/scala/spark/streaming/examples/Grep2.scala +++ /dev/null @@ -1,64 +0,0 @@ -package spark.streaming.examples - -import spark.SparkContext -import SparkContext._ -import spark.streaming._ -import StreamingContext._ - -import spark.storage.StorageLevel - -import scala.util.Sorting -import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.HashMap -import scala.collection.mutable.Queue -import scala.collection.JavaConversions.mapAsScalaMap - -import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap} - - -object Grep2 { - - def warmup(sc: SparkContext) { - (0 until 10).foreach {i => - sc.parallelize(1 to 20000000, 1000) - .map(x => (x % 337, x % 1331)) - .reduceByKey(_ + _) - .count() - } - } - - def main (args: Array[String]) { - - if (args.length != 6) { - println ("Usage: Grep2 <host> <file> <mapTasks> <reduceTasks> <batchMillis> <chkptMillis>") - System.exit(1) - } - - val Array(master, file, mapTasks, reduceTasks, batchMillis, chkptMillis) = args - - val batchDuration = Milliseconds(batchMillis.toLong) - - val ssc = new StreamingContext(master, "Grep2") - ssc.setBatchDuration(batchDuration) - - //warmup(ssc.sc) - - val data = ssc.sc.textFile(file, mapTasks.toInt).persist( - new StorageLevel(false, true, false, 3)) // Memory only, serialized, 3 replicas - println("Data count: " + data.count()) - println("Data count: " + data.count()) - println("Data count: " + data.count()) - - val sentences = new ConstantInputDStream(ssc, data) - ssc.registerInputStream(sentences) - - sentences.filter(_.contains("Culpepper")).count().foreachRDD(r => - println("Grep count: " + r.collect().mkString)) - - ssc.start() - - while(true) { Thread.sleep(1000) } - } -} - - diff --git a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala index b1e1a613fe..ffbea6e55d 100644 --- a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala +++ b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala @@ -2,8 +2,10 @@ package spark.streaming.examples import spark.util.IntParam import spark.storage.StorageLevel + import spark.streaming._ import spark.streaming.StreamingContext._ +import spark.streaming.util.RawTextHelper._ object GrepRaw { def main(args: Array[String]) { @@ -17,16 +19,13 @@ object GrepRaw { // Create the context and set the batch size val ssc = new StreamingContext(master, "GrepRaw") ssc.setBatchDuration(Milliseconds(batchMillis)) + warmUp(ssc.sc) - // Make sure some tasks have started on each node - ssc.sc.parallelize(1 to 1000, 1000).count() - ssc.sc.parallelize(1 to 1000, 1000).count() - ssc.sc.parallelize(1 to 1000, 1000).count() val rawStreams = (1 to numStreams).map(_ => - ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray + ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray val union = new UnionDStream(rawStreams) - union.filter(_.contains("Culpepper")).count().foreachRDD(r => + union.filter(_.contains("Alice")).count().foreachRDD(r => println("Grep count: " + r.collect().mkString)) ssc.start() } diff --git a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala index 750cb7445f..0411bde1a7 100644 --- a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala +++ b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala @@ -1,94 +1,50 @@ package spark.streaming.examples -import spark.util.IntParam -import spark.SparkContext -import spark.SparkContext._ import spark.storage.StorageLevel +import spark.util.IntParam + import spark.streaming._ import spark.streaming.StreamingContext._ +import spark.streaming.util.RawTextHelper._ -import WordCount2_ExtraFunctions._ +import java.util.UUID object TopKWordCountRaw { - def moreWarmup(sc: SparkContext) { - (0 until 40).foreach {i => - sc.parallelize(1 to 20000000, 1000) - .map(_ % 1331).map(_.toString) - .mapPartitions(splitAndCountPartitions).reduceByKey(_ + _, 10) - .collect() - } - } - + def main(args: Array[String]) { - if (args.length != 7) { - System.err.println("Usage: TopKWordCountRaw <master> <streams> <host> <port> <batchMs> <chkptMs> <reduces>") + if (args.length != 4) { + System.err.println("Usage: WordCountRaw <master> <# streams> <port> <HDFS checkpoint directory> ") System.exit(1) } - val Array(master, IntParam(streams), host, IntParam(port), IntParam(batchMs), - IntParam(chkptMs), IntParam(reduces)) = args - - // Create the context and set the batch size - val ssc = new StreamingContext(master, "TopKWordCountRaw") - ssc.setBatchDuration(Milliseconds(batchMs)) - - // Make sure some tasks have started on each node - moreWarmup(ssc.sc) - - val rawStreams = (1 to streams).map(_ => - ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray - val union = new UnionDStream(rawStreams) - - val windowedCounts = union.mapPartitions(splitAndCountPartitions) - .reduceByKeyAndWindow(add _, subtract _, Seconds(30), Milliseconds(batchMs), reduces) - windowedCounts.persist().checkpoint(Milliseconds(chkptMs)) - //.persist(StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY_2, Milliseconds(chkptMs)) - - def topK(data: Iterator[(String, Long)], k: Int): Iterator[(String, Long)] = { - val taken = new Array[(String, Long)](k) - - var i = 0 - var len = 0 - var done = false - var value: (String, Long) = null - var swap: (String, Long) = null - var count = 0 - - while(data.hasNext) { - value = data.next - count += 1 - println("count = " + count) - if (len == 0) { - taken(0) = value - len = 1 - } else if (len < k || value._2 > taken(len - 1)._2) { - if (len < k) { - len += 1 - } - taken(len - 1) = value - i = len - 1 - while(i > 0 && taken(i - 1)._2 < taken(i)._2) { - swap = taken(i) - taken(i) = taken(i-1) - taken(i - 1) = swap - i -= 1 - } - } - } - println("Took " + len + " out of " + count + " items") - return taken.toIterator - } + val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args + val k = 10 - val k = 50 + // Create the context, set the batch size and checkpoint directory. + // Checkpoint directory is necessary for achieving fault-tolerance, by saving counts + // periodically to HDFS + val ssc = new StreamingContext(master, "TopKWordCountRaw") + ssc.setBatchDuration(Seconds(1)) + ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1)) + + // Warm up the JVMs on master and slave for JIT compilation to kick in + /*warmUp(ssc.sc)*/ + + // Set up the raw network streams that will connect to localhost:port to raw test + // senders on the slaves and generate top K words of last 30 seconds + val lines = (1 to numStreams).map(_ => { + ssc.rawNetworkStream[String]("localhost", port, StorageLevel.MEMORY_ONLY_SER_2) + }) + val union = new UnionDStream(lines.toArray) + val counts = union.mapPartitions(splitAndCountPartitions) + val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10) val partialTopKWindowedCounts = windowedCounts.mapPartitions(topK(_, k)) partialTopKWindowedCounts.foreachRDD(rdd => { val collectedCounts = rdd.collect - println("Collected " + collectedCounts.size + " items") - topK(collectedCounts.toIterator, k).foreach(println) + println("Collected " + collectedCounts.size + " words from partial top words") + println("Top " + k + " words are " + topK(collectedCounts.toIterator, k).mkString(",")) }) -// windowedCounts.foreachRDD(r => println("Element count: " + r.count())) - ssc.start() } } diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala deleted file mode 100644 index 865026033e..0000000000 --- a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala +++ /dev/null @@ -1,114 +0,0 @@ -package spark.streaming.examples - -import spark.SparkContext -import SparkContext._ -import spark.streaming._ -import StreamingContext._ - -import spark.storage.StorageLevel - -import scala.util.Sorting -import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.HashMap -import scala.collection.mutable.Queue -import scala.collection.JavaConversions.mapAsScalaMap - -import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap} - - -object WordCount2_ExtraFunctions { - - def add(v1: Long, v2: Long) = (v1 + v2) - - def subtract(v1: Long, v2: Long) = (v1 - v2) - - def max(v1: Long, v2: Long) = math.max(v1, v2) - - def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, Long)] = { - //val map = new java.util.HashMap[String, Long] - val map = new OLMap[String] - var i = 0 - var j = 0 - while (iter.hasNext) { - val s = iter.next() - i = 0 - while (i < s.length) { - j = i - while (j < s.length && s.charAt(j) != ' ') { - j += 1 - } - if (j > i) { - val w = s.substring(i, j) - val c = map.getLong(w) - map.put(w, c + 1) -/* - if (c == null) { - map.put(w, 1) - } else { - map.put(w, c + 1) - } -*/ - } - i = j - while (i < s.length && s.charAt(i) == ' ') { - i += 1 - } - } - } - map.toIterator.map{case (k, v) => (k, v)} - } -} - -object WordCount2 { - - def warmup(sc: SparkContext) { - (0 until 3).foreach {i => - sc.parallelize(1 to 20000000, 500) - .map(x => (x % 337, x % 1331)) - .reduceByKey(_ + _, 100) - .count() - } - } - - def main (args: Array[String]) { - - if (args.length != 6) { - println ("Usage: WordCount2 <host> <file> <mapTasks> <reduceTasks> <batchMillis> <chkptMillis>") - System.exit(1) - } - - val Array(master, file, mapTasks, reduceTasks, batchMillis, chkptMillis) = args - - val batchDuration = Milliseconds(batchMillis.toLong) - - val ssc = new StreamingContext(master, "WordCount2") - ssc.setBatchDuration(batchDuration) - - //warmup(ssc.sc) - - val data = ssc.sc.textFile(file, mapTasks.toInt).persist( - new StorageLevel(false, true, false, 3)) // Memory only, serialized, 3 replicas - println("Data count: " + data.map(x => if (x == "") 1 else x.split(" ").size / x.split(" ").size).count()) - println("Data count: " + data.count()) - println("Data count: " + data.count()) - - val sentences = new ConstantInputDStream(ssc, data) - ssc.registerInputStream(sentences) - - import WordCount2_ExtraFunctions._ - - val windowedCounts = sentences - .mapPartitions(splitAndCountPartitions) - .reduceByKeyAndWindow(add _, subtract _, Seconds(30), batchDuration, reduceTasks.toInt) - - windowedCounts.persist().checkpoint(Milliseconds(chkptMillis.toLong)) - //.persist(StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY_2, Milliseconds(chkptMillis.toLong)) - windowedCounts.foreachRDD(r => println("Element count: " + r.count())) - - ssc.start() - - while(true) { Thread.sleep(1000) } - } -} - - diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala index d1ea9a9cd5..571428c0fe 100644 --- a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala @@ -1,50 +1,43 @@ package spark.streaming.examples -import spark.util.IntParam -import spark.SparkContext -import spark.SparkContext._ import spark.storage.StorageLevel +import spark.util.IntParam + import spark.streaming._ import spark.streaming.StreamingContext._ +import spark.streaming.util.RawTextHelper._ -import WordCount2_ExtraFunctions._ +import java.util.UUID object WordCountRaw { - def moreWarmup(sc: SparkContext) { - (0 until 40).foreach {i => - sc.parallelize(1 to 20000000, 1000) - .map(_ % 1331).map(_.toString) - .mapPartitions(splitAndCountPartitions).reduceByKey(_ + _, 10) - .collect() - } - } def main(args: Array[String]) { - if (args.length != 7) { - System.err.println("Usage: WordCountRaw <master> <streams> <host> <port> <batchMs> <chkptMs> <reduces>") + if (args.length != 4) { + System.err.println("Usage: WordCountRaw <master> <# streams> <port> <HDFS checkpoint directory> ") System.exit(1) } - val Array(master, IntParam(streams), host, IntParam(port), IntParam(batchMs), - IntParam(chkptMs), IntParam(reduces)) = args + val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args - // Create the context and set the batch size + // Create the context, set the batch size and checkpoint directory. + // Checkpoint directory is necessary for achieving fault-tolerance, by saving counts + // periodically to HDFS val ssc = new StreamingContext(master, "WordCountRaw") - ssc.setBatchDuration(Milliseconds(batchMs)) - - // Make sure some tasks have started on each node - moreWarmup(ssc.sc) - - val rawStreams = (1 to streams).map(_ => - ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray - val union = new UnionDStream(rawStreams) - - val windowedCounts = union.mapPartitions(splitAndCountPartitions) - .reduceByKeyAndWindow(add _, subtract _, Seconds(30), Milliseconds(batchMs), reduces) - windowedCounts.persist().checkpoint(chkptMs) - //.persist(StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY_2, Milliseconds(chkptMs)) - - windowedCounts.foreachRDD(r => println("Element count: " + r.count())) + ssc.setBatchDuration(Seconds(1)) + ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1)) + + // Warm up the JVMs on master and slave for JIT compilation to kick in + warmUp(ssc.sc) + + // Set up the raw network streams that will connect to localhost:port to raw test + // senders on the slaves and generate count of words of last 30 seconds + val lines = (1 to numStreams).map(_ => { + ssc.rawNetworkStream[String]("localhost", port, StorageLevel.MEMORY_ONLY_SER_2) + }) + val union = new UnionDStream(lines.toArray) + val counts = union.mapPartitions(splitAndCountPartitions) + val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10) + windowedCounts.foreachRDD(r => println("# unique words = " + r.count())) ssc.start() } diff --git a/streaming/src/main/scala/spark/streaming/examples/WordMax2.scala b/streaming/src/main/scala/spark/streaming/examples/WordMax2.scala deleted file mode 100644 index 6a9c8a9a69..0000000000 --- a/streaming/src/main/scala/spark/streaming/examples/WordMax2.scala +++ /dev/null @@ -1,75 +0,0 @@ -package spark.streaming.examples - -import spark.SparkContext -import SparkContext._ -import spark.streaming._ -import StreamingContext._ - -import spark.storage.StorageLevel - -import scala.util.Sorting -import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.HashMap -import scala.collection.mutable.Queue -import scala.collection.JavaConversions.mapAsScalaMap - -import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap} - - -object WordMax2 { - - def warmup(sc: SparkContext) { - (0 until 10).foreach {i => - sc.parallelize(1 to 20000000, 1000) - .map(x => (x % 337, x % 1331)) - .reduceByKey(_ + _) - .count() - } - } - - def main (args: Array[String]) { - - if (args.length != 6) { - println ("Usage: WordMax2 <host> <file> <mapTasks> <reduceTasks> <batchMillis> <chkptMillis>") - System.exit(1) - } - - val Array(master, file, mapTasks, reduceTasks, batchMillis, chkptMillis) = args - - val batchDuration = Milliseconds(batchMillis.toLong) - - val ssc = new StreamingContext(master, "WordMax2") - ssc.setBatchDuration(batchDuration) - - //warmup(ssc.sc) - - val data = ssc.sc.textFile(file, mapTasks.toInt).persist( - new StorageLevel(false, true, false, 3)) // Memory only, serialized, 3 replicas - println("Data count: " + data.count()) - println("Data count: " + data.count()) - println("Data count: " + data.count()) - - val sentences = new ConstantInputDStream(ssc, data) - ssc.registerInputStream(sentences) - - import WordCount2_ExtraFunctions._ - - val windowedCounts = sentences - .mapPartitions(splitAndCountPartitions) - .reduceByKey(add _, reduceTasks.toInt) - .persist() - .checkpoint(Milliseconds(chkptMillis.toLong)) - //.persist(StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY_2, Milliseconds(chkptMillis.toLong)) - .reduceByKeyAndWindow(max _, Seconds(10), batchDuration, reduceTasks.toInt) - .persist() - .checkpoint(Milliseconds(chkptMillis.toLong)) - //.persist(StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY_2, Milliseconds(chkptMillis.toLong)) - windowedCounts.foreachRDD(r => println("Element count: " + r.count())) - - ssc.start() - - while(true) { Thread.sleep(1000) } - } -} - - diff --git a/streaming/src/main/scala/spark/streaming/util/RawTextHelper.scala b/streaming/src/main/scala/spark/streaming/util/RawTextHelper.scala new file mode 100644 index 0000000000..f31ae39a16 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/util/RawTextHelper.scala @@ -0,0 +1,98 @@ +package spark.streaming.util + +import spark.SparkContext +import spark.SparkContext._ +import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap} +import scala.collection.JavaConversions.mapAsScalaMap + +object RawTextHelper { + + /** + * Splits lines and counts the words in them using specialized object-to-long hashmap + * (to avoid boxing-unboxing overhead of Long in java/scala HashMap) + */ + def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, Long)] = { + val map = new OLMap[String] + var i = 0 + var j = 0 + while (iter.hasNext) { + val s = iter.next() + i = 0 + while (i < s.length) { + j = i + while (j < s.length && s.charAt(j) != ' ') { + j += 1 + } + if (j > i) { + val w = s.substring(i, j) + val c = map.getLong(w) + map.put(w, c + 1) + } + i = j + while (i < s.length && s.charAt(i) == ' ') { + i += 1 + } + } + } + map.toIterator.map{case (k, v) => (k, v)} + } + + /** + * Gets the top k words in terms of word counts. Assumes that each word exists only once + * in the `data` iterator (that is, the counts have been reduced). + */ + def topK(data: Iterator[(String, Long)], k: Int): Iterator[(String, Long)] = { + val taken = new Array[(String, Long)](k) + + var i = 0 + var len = 0 + var done = false + var value: (String, Long) = null + var swap: (String, Long) = null + var count = 0 + + while(data.hasNext) { + value = data.next + if (value != null) { + count += 1 + if (len == 0) { + taken(0) = value + len = 1 + } else if (len < k || value._2 > taken(len - 1)._2) { + if (len < k) { + len += 1 + } + taken(len - 1) = value + i = len - 1 + while(i > 0 && taken(i - 1)._2 < taken(i)._2) { + swap = taken(i) + taken(i) = taken(i-1) + taken(i - 1) = swap + i -= 1 + } + } + } + } + return taken.toIterator + } + + /** + * Warms up the SparkContext in master and slave by running tasks to force JIT kick in + * before real workload starts. + */ + def warmUp(sc: SparkContext) { + for(i <- 0 to 4) { + sc.parallelize(1 to 200000, 1000) + .map(_ % 1331).map(_.toString) + .mapPartitions(splitAndCountPartitions).reduceByKey(_ + _, 10) + .count() + } + } + + def add(v1: Long, v2: Long) = (v1 + v2) + + def subtract(v1: Long, v2: Long) = (v1 - v2) + + def max(v1: Long, v2: Long) = math.max(v1, v2) +} + |