aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rwxr-xr-xconf/streaming-env.sh.template22
-rwxr-xr-xrun4
-rwxr-xr-xstartTrigger.sh3
-rw-r--r--streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala1
-rw-r--r--streaming/src/main/scala/spark/streaming/RawInputDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala15
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/Grep2.scala64
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala11
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala102
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCount2.scala114
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala57
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordMax2.scala75
-rw-r--r--streaming/src/main/scala/spark/streaming/util/RawTextHelper.scala98
14 files changed, 193 insertions, 379 deletions
diff --git a/conf/streaming-env.sh.template b/conf/streaming-env.sh.template
new file mode 100755
index 0000000000..6b4094c515
--- /dev/null
+++ b/conf/streaming-env.sh.template
@@ -0,0 +1,22 @@
+#!/usr/bin/env bash
+
+# This file contains a few additional setting that are useful for
+# running streaming jobs in Spark. Copy this file as streaming-env.sh .
+# Note that this shell script will be read after spark-env.sh, so settings
+# in this file may override similar settings (if present) in spark-env.sh .
+
+
+# Using concurrent GC is strongly recommended as it can significantly
+# reduce GC related pauses.
+
+SPARK_JAVA_OPTS+=" -XX:+UseConcMarkSweepGC"
+
+# Using of Kryo serialization can improve serialization performance
+# and therefore the throughput of the Spark Streaming programs. However,
+# using Kryo serialization with custom classes may required you to
+# register the classes with Kryo. Refer to the Spark documentation
+# for more details.
+
+# SPARK_JAVA_OPTS+=" -Dspark.serializer=spark.KryoSerializer"
+
+export SPARK_JAVA_OPTS
diff --git a/run b/run
index a363599cf0..d91430ad2e 100755
--- a/run
+++ b/run
@@ -13,6 +13,10 @@ if [ -e $FWDIR/conf/spark-env.sh ] ; then
. $FWDIR/conf/spark-env.sh
fi
+if [ -e $FWDIR/conf/streaming-env.sh ] ; then
+ . $FWDIR/conf/streaming-env.sh
+fi
+
if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then
if [ `command -v scala` ]; then
RUNNER="scala"
diff --git a/startTrigger.sh b/startTrigger.sh
deleted file mode 100755
index 373dbda93e..0000000000
--- a/startTrigger.sh
+++ /dev/null
@@ -1,3 +0,0 @@
-#!/bin/bash
-
-./run spark.streaming.SentenceGenerator localhost 7078 sentences.txt 1
diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
index 07ef79415d..d0fef70f7e 100644
--- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
@@ -47,6 +47,7 @@ class NetworkInputTracker(
val result = queue.synchronized {
queue.dequeueAll(x => true)
}
+ logInfo("Stream " + receiverId + " received " + result.size + " blocks")
result.toArray
}
diff --git a/streaming/src/main/scala/spark/streaming/RawInputDStream.scala b/streaming/src/main/scala/spark/streaming/RawInputDStream.scala
index e022b85fbe..03726bfba6 100644
--- a/streaming/src/main/scala/spark/streaming/RawInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/RawInputDStream.scala
@@ -69,7 +69,7 @@ class RawNetworkReceiver(streamId: Int, host: String, port: Int, storageLevel: S
}
def onStop() {
- blockPushingThread.interrupt()
+ if (blockPushingThread != null) blockPushingThread.interrupt()
}
/** Read a buffer fully from a given Channel */
diff --git a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
index 6df82c0df3..b07d51fa6b 100644
--- a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
@@ -31,10 +31,14 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
"must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")"
)
- super.persist(StorageLevel.MEMORY_ONLY)
-
+ // Reduce each batch of data using reduceByKey which will be further reduced by window
+ // by ReducedWindowedDStream
val reducedStream = parent.reduceByKey(reduceFunc, partitioner)
+ // Persist RDDs to memory by default as these RDDs are going to be reused.
+ super.persist(StorageLevel.MEMORY_ONLY_SER)
+ reducedStream.persist(StorageLevel.MEMORY_ONLY_SER)
+
def windowTime: Time = _windowTime
override def dependencies = List(reducedStream)
@@ -57,13 +61,6 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
this
}
- protected[streaming] override def setRememberDuration(time: Time) {
- if (rememberDuration == null || rememberDuration < time) {
- rememberDuration = time
- dependencies.foreach(_.setRememberDuration(rememberDuration + windowTime))
- }
- }
-
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
val reduceF = reduceFunc
val invReduceF = invReduceFunc
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index eb83aaee7a..ab6d6e8dea 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -124,7 +124,7 @@ final class StreamingContext (
def rawNetworkStream[T: ClassManifest](
hostname: String,
port: Int,
- storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[T] = {
val inputStream = new RawInputDStream[T](this, hostname, port, storageLevel)
graph.addInputStream(inputStream)
@@ -132,7 +132,7 @@ final class StreamingContext (
}
/**
- * This function creates a input stream that monitors a Hadoop-compatible
+ * This function creates a input stream that monitors a Hadoop-compatible filesystem
* for new files and executes the necessary processing on them.
*/
def fileStream[
diff --git a/streaming/src/main/scala/spark/streaming/examples/Grep2.scala b/streaming/src/main/scala/spark/streaming/examples/Grep2.scala
deleted file mode 100644
index b1faa65c17..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/Grep2.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-package spark.streaming.examples
-
-import spark.SparkContext
-import SparkContext._
-import spark.streaming._
-import StreamingContext._
-
-import spark.storage.StorageLevel
-
-import scala.util.Sorting
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.Queue
-import scala.collection.JavaConversions.mapAsScalaMap
-
-import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
-
-
-object Grep2 {
-
- def warmup(sc: SparkContext) {
- (0 until 10).foreach {i =>
- sc.parallelize(1 to 20000000, 1000)
- .map(x => (x % 337, x % 1331))
- .reduceByKey(_ + _)
- .count()
- }
- }
-
- def main (args: Array[String]) {
-
- if (args.length != 6) {
- println ("Usage: Grep2 <host> <file> <mapTasks> <reduceTasks> <batchMillis> <chkptMillis>")
- System.exit(1)
- }
-
- val Array(master, file, mapTasks, reduceTasks, batchMillis, chkptMillis) = args
-
- val batchDuration = Milliseconds(batchMillis.toLong)
-
- val ssc = new StreamingContext(master, "Grep2")
- ssc.setBatchDuration(batchDuration)
-
- //warmup(ssc.sc)
-
- val data = ssc.sc.textFile(file, mapTasks.toInt).persist(
- new StorageLevel(false, true, false, 3)) // Memory only, serialized, 3 replicas
- println("Data count: " + data.count())
- println("Data count: " + data.count())
- println("Data count: " + data.count())
-
- val sentences = new ConstantInputDStream(ssc, data)
- ssc.registerInputStream(sentences)
-
- sentences.filter(_.contains("Culpepper")).count().foreachRDD(r =>
- println("Grep count: " + r.collect().mkString))
-
- ssc.start()
-
- while(true) { Thread.sleep(1000) }
- }
-}
-
-
diff --git a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala
index b1e1a613fe..ffbea6e55d 100644
--- a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala
@@ -2,8 +2,10 @@ package spark.streaming.examples
import spark.util.IntParam
import spark.storage.StorageLevel
+
import spark.streaming._
import spark.streaming.StreamingContext._
+import spark.streaming.util.RawTextHelper._
object GrepRaw {
def main(args: Array[String]) {
@@ -17,16 +19,13 @@ object GrepRaw {
// Create the context and set the batch size
val ssc = new StreamingContext(master, "GrepRaw")
ssc.setBatchDuration(Milliseconds(batchMillis))
+ warmUp(ssc.sc)
- // Make sure some tasks have started on each node
- ssc.sc.parallelize(1 to 1000, 1000).count()
- ssc.sc.parallelize(1 to 1000, 1000).count()
- ssc.sc.parallelize(1 to 1000, 1000).count()
val rawStreams = (1 to numStreams).map(_ =>
- ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray
+ ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray
val union = new UnionDStream(rawStreams)
- union.filter(_.contains("Culpepper")).count().foreachRDD(r =>
+ union.filter(_.contains("Alice")).count().foreachRDD(r =>
println("Grep count: " + r.collect().mkString))
ssc.start()
}
diff --git a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
index 750cb7445f..0411bde1a7 100644
--- a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
@@ -1,94 +1,50 @@
package spark.streaming.examples
-import spark.util.IntParam
-import spark.SparkContext
-import spark.SparkContext._
import spark.storage.StorageLevel
+import spark.util.IntParam
+
import spark.streaming._
import spark.streaming.StreamingContext._
+import spark.streaming.util.RawTextHelper._
-import WordCount2_ExtraFunctions._
+import java.util.UUID
object TopKWordCountRaw {
- def moreWarmup(sc: SparkContext) {
- (0 until 40).foreach {i =>
- sc.parallelize(1 to 20000000, 1000)
- .map(_ % 1331).map(_.toString)
- .mapPartitions(splitAndCountPartitions).reduceByKey(_ + _, 10)
- .collect()
- }
- }
-
+
def main(args: Array[String]) {
- if (args.length != 7) {
- System.err.println("Usage: TopKWordCountRaw <master> <streams> <host> <port> <batchMs> <chkptMs> <reduces>")
+ if (args.length != 4) {
+ System.err.println("Usage: WordCountRaw <master> <# streams> <port> <HDFS checkpoint directory> ")
System.exit(1)
}
- val Array(master, IntParam(streams), host, IntParam(port), IntParam(batchMs),
- IntParam(chkptMs), IntParam(reduces)) = args
-
- // Create the context and set the batch size
- val ssc = new StreamingContext(master, "TopKWordCountRaw")
- ssc.setBatchDuration(Milliseconds(batchMs))
-
- // Make sure some tasks have started on each node
- moreWarmup(ssc.sc)
-
- val rawStreams = (1 to streams).map(_ =>
- ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray
- val union = new UnionDStream(rawStreams)
-
- val windowedCounts = union.mapPartitions(splitAndCountPartitions)
- .reduceByKeyAndWindow(add _, subtract _, Seconds(30), Milliseconds(batchMs), reduces)
- windowedCounts.persist().checkpoint(Milliseconds(chkptMs))
- //.persist(StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY_2, Milliseconds(chkptMs))
-
- def topK(data: Iterator[(String, Long)], k: Int): Iterator[(String, Long)] = {
- val taken = new Array[(String, Long)](k)
-
- var i = 0
- var len = 0
- var done = false
- var value: (String, Long) = null
- var swap: (String, Long) = null
- var count = 0
-
- while(data.hasNext) {
- value = data.next
- count += 1
- println("count = " + count)
- if (len == 0) {
- taken(0) = value
- len = 1
- } else if (len < k || value._2 > taken(len - 1)._2) {
- if (len < k) {
- len += 1
- }
- taken(len - 1) = value
- i = len - 1
- while(i > 0 && taken(i - 1)._2 < taken(i)._2) {
- swap = taken(i)
- taken(i) = taken(i-1)
- taken(i - 1) = swap
- i -= 1
- }
- }
- }
- println("Took " + len + " out of " + count + " items")
- return taken.toIterator
- }
+ val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args
+ val k = 10
- val k = 50
+ // Create the context, set the batch size and checkpoint directory.
+ // Checkpoint directory is necessary for achieving fault-tolerance, by saving counts
+ // periodically to HDFS
+ val ssc = new StreamingContext(master, "TopKWordCountRaw")
+ ssc.setBatchDuration(Seconds(1))
+ ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1))
+
+ // Warm up the JVMs on master and slave for JIT compilation to kick in
+ /*warmUp(ssc.sc)*/
+
+ // Set up the raw network streams that will connect to localhost:port to raw test
+ // senders on the slaves and generate top K words of last 30 seconds
+ val lines = (1 to numStreams).map(_ => {
+ ssc.rawNetworkStream[String]("localhost", port, StorageLevel.MEMORY_ONLY_SER_2)
+ })
+ val union = new UnionDStream(lines.toArray)
+ val counts = union.mapPartitions(splitAndCountPartitions)
+ val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10)
val partialTopKWindowedCounts = windowedCounts.mapPartitions(topK(_, k))
partialTopKWindowedCounts.foreachRDD(rdd => {
val collectedCounts = rdd.collect
- println("Collected " + collectedCounts.size + " items")
- topK(collectedCounts.toIterator, k).foreach(println)
+ println("Collected " + collectedCounts.size + " words from partial top words")
+ println("Top " + k + " words are " + topK(collectedCounts.toIterator, k).mkString(","))
})
-// windowedCounts.foreachRDD(r => println("Element count: " + r.count()))
-
ssc.start()
}
}
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala
deleted file mode 100644
index 865026033e..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala
+++ /dev/null
@@ -1,114 +0,0 @@
-package spark.streaming.examples
-
-import spark.SparkContext
-import SparkContext._
-import spark.streaming._
-import StreamingContext._
-
-import spark.storage.StorageLevel
-
-import scala.util.Sorting
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.Queue
-import scala.collection.JavaConversions.mapAsScalaMap
-
-import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
-
-
-object WordCount2_ExtraFunctions {
-
- def add(v1: Long, v2: Long) = (v1 + v2)
-
- def subtract(v1: Long, v2: Long) = (v1 - v2)
-
- def max(v1: Long, v2: Long) = math.max(v1, v2)
-
- def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, Long)] = {
- //val map = new java.util.HashMap[String, Long]
- val map = new OLMap[String]
- var i = 0
- var j = 0
- while (iter.hasNext) {
- val s = iter.next()
- i = 0
- while (i < s.length) {
- j = i
- while (j < s.length && s.charAt(j) != ' ') {
- j += 1
- }
- if (j > i) {
- val w = s.substring(i, j)
- val c = map.getLong(w)
- map.put(w, c + 1)
-/*
- if (c == null) {
- map.put(w, 1)
- } else {
- map.put(w, c + 1)
- }
-*/
- }
- i = j
- while (i < s.length && s.charAt(i) == ' ') {
- i += 1
- }
- }
- }
- map.toIterator.map{case (k, v) => (k, v)}
- }
-}
-
-object WordCount2 {
-
- def warmup(sc: SparkContext) {
- (0 until 3).foreach {i =>
- sc.parallelize(1 to 20000000, 500)
- .map(x => (x % 337, x % 1331))
- .reduceByKey(_ + _, 100)
- .count()
- }
- }
-
- def main (args: Array[String]) {
-
- if (args.length != 6) {
- println ("Usage: WordCount2 <host> <file> <mapTasks> <reduceTasks> <batchMillis> <chkptMillis>")
- System.exit(1)
- }
-
- val Array(master, file, mapTasks, reduceTasks, batchMillis, chkptMillis) = args
-
- val batchDuration = Milliseconds(batchMillis.toLong)
-
- val ssc = new StreamingContext(master, "WordCount2")
- ssc.setBatchDuration(batchDuration)
-
- //warmup(ssc.sc)
-
- val data = ssc.sc.textFile(file, mapTasks.toInt).persist(
- new StorageLevel(false, true, false, 3)) // Memory only, serialized, 3 replicas
- println("Data count: " + data.map(x => if (x == "") 1 else x.split(" ").size / x.split(" ").size).count())
- println("Data count: " + data.count())
- println("Data count: " + data.count())
-
- val sentences = new ConstantInputDStream(ssc, data)
- ssc.registerInputStream(sentences)
-
- import WordCount2_ExtraFunctions._
-
- val windowedCounts = sentences
- .mapPartitions(splitAndCountPartitions)
- .reduceByKeyAndWindow(add _, subtract _, Seconds(30), batchDuration, reduceTasks.toInt)
-
- windowedCounts.persist().checkpoint(Milliseconds(chkptMillis.toLong))
- //.persist(StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY_2, Milliseconds(chkptMillis.toLong))
- windowedCounts.foreachRDD(r => println("Element count: " + r.count()))
-
- ssc.start()
-
- while(true) { Thread.sleep(1000) }
- }
-}
-
-
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
index d1ea9a9cd5..571428c0fe 100644
--- a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
@@ -1,50 +1,43 @@
package spark.streaming.examples
-import spark.util.IntParam
-import spark.SparkContext
-import spark.SparkContext._
import spark.storage.StorageLevel
+import spark.util.IntParam
+
import spark.streaming._
import spark.streaming.StreamingContext._
+import spark.streaming.util.RawTextHelper._
-import WordCount2_ExtraFunctions._
+import java.util.UUID
object WordCountRaw {
- def moreWarmup(sc: SparkContext) {
- (0 until 40).foreach {i =>
- sc.parallelize(1 to 20000000, 1000)
- .map(_ % 1331).map(_.toString)
- .mapPartitions(splitAndCountPartitions).reduceByKey(_ + _, 10)
- .collect()
- }
- }
def main(args: Array[String]) {
- if (args.length != 7) {
- System.err.println("Usage: WordCountRaw <master> <streams> <host> <port> <batchMs> <chkptMs> <reduces>")
+ if (args.length != 4) {
+ System.err.println("Usage: WordCountRaw <master> <# streams> <port> <HDFS checkpoint directory> ")
System.exit(1)
}
- val Array(master, IntParam(streams), host, IntParam(port), IntParam(batchMs),
- IntParam(chkptMs), IntParam(reduces)) = args
+ val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args
- // Create the context and set the batch size
+ // Create the context, set the batch size and checkpoint directory.
+ // Checkpoint directory is necessary for achieving fault-tolerance, by saving counts
+ // periodically to HDFS
val ssc = new StreamingContext(master, "WordCountRaw")
- ssc.setBatchDuration(Milliseconds(batchMs))
-
- // Make sure some tasks have started on each node
- moreWarmup(ssc.sc)
-
- val rawStreams = (1 to streams).map(_ =>
- ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray
- val union = new UnionDStream(rawStreams)
-
- val windowedCounts = union.mapPartitions(splitAndCountPartitions)
- .reduceByKeyAndWindow(add _, subtract _, Seconds(30), Milliseconds(batchMs), reduces)
- windowedCounts.persist().checkpoint(chkptMs)
- //.persist(StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY_2, Milliseconds(chkptMs))
-
- windowedCounts.foreachRDD(r => println("Element count: " + r.count()))
+ ssc.setBatchDuration(Seconds(1))
+ ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1))
+
+ // Warm up the JVMs on master and slave for JIT compilation to kick in
+ warmUp(ssc.sc)
+
+ // Set up the raw network streams that will connect to localhost:port to raw test
+ // senders on the slaves and generate count of words of last 30 seconds
+ val lines = (1 to numStreams).map(_ => {
+ ssc.rawNetworkStream[String]("localhost", port, StorageLevel.MEMORY_ONLY_SER_2)
+ })
+ val union = new UnionDStream(lines.toArray)
+ val counts = union.mapPartitions(splitAndCountPartitions)
+ val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10)
+ windowedCounts.foreachRDD(r => println("# unique words = " + r.count()))
ssc.start()
}
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordMax2.scala b/streaming/src/main/scala/spark/streaming/examples/WordMax2.scala
deleted file mode 100644
index 6a9c8a9a69..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/WordMax2.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-package spark.streaming.examples
-
-import spark.SparkContext
-import SparkContext._
-import spark.streaming._
-import StreamingContext._
-
-import spark.storage.StorageLevel
-
-import scala.util.Sorting
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.Queue
-import scala.collection.JavaConversions.mapAsScalaMap
-
-import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
-
-
-object WordMax2 {
-
- def warmup(sc: SparkContext) {
- (0 until 10).foreach {i =>
- sc.parallelize(1 to 20000000, 1000)
- .map(x => (x % 337, x % 1331))
- .reduceByKey(_ + _)
- .count()
- }
- }
-
- def main (args: Array[String]) {
-
- if (args.length != 6) {
- println ("Usage: WordMax2 <host> <file> <mapTasks> <reduceTasks> <batchMillis> <chkptMillis>")
- System.exit(1)
- }
-
- val Array(master, file, mapTasks, reduceTasks, batchMillis, chkptMillis) = args
-
- val batchDuration = Milliseconds(batchMillis.toLong)
-
- val ssc = new StreamingContext(master, "WordMax2")
- ssc.setBatchDuration(batchDuration)
-
- //warmup(ssc.sc)
-
- val data = ssc.sc.textFile(file, mapTasks.toInt).persist(
- new StorageLevel(false, true, false, 3)) // Memory only, serialized, 3 replicas
- println("Data count: " + data.count())
- println("Data count: " + data.count())
- println("Data count: " + data.count())
-
- val sentences = new ConstantInputDStream(ssc, data)
- ssc.registerInputStream(sentences)
-
- import WordCount2_ExtraFunctions._
-
- val windowedCounts = sentences
- .mapPartitions(splitAndCountPartitions)
- .reduceByKey(add _, reduceTasks.toInt)
- .persist()
- .checkpoint(Milliseconds(chkptMillis.toLong))
- //.persist(StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY_2, Milliseconds(chkptMillis.toLong))
- .reduceByKeyAndWindow(max _, Seconds(10), batchDuration, reduceTasks.toInt)
- .persist()
- .checkpoint(Milliseconds(chkptMillis.toLong))
- //.persist(StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY_2, Milliseconds(chkptMillis.toLong))
- windowedCounts.foreachRDD(r => println("Element count: " + r.count()))
-
- ssc.start()
-
- while(true) { Thread.sleep(1000) }
- }
-}
-
-
diff --git a/streaming/src/main/scala/spark/streaming/util/RawTextHelper.scala b/streaming/src/main/scala/spark/streaming/util/RawTextHelper.scala
new file mode 100644
index 0000000000..f31ae39a16
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/util/RawTextHelper.scala
@@ -0,0 +1,98 @@
+package spark.streaming.util
+
+import spark.SparkContext
+import spark.SparkContext._
+import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
+import scala.collection.JavaConversions.mapAsScalaMap
+
+object RawTextHelper {
+
+ /**
+ * Splits lines and counts the words in them using specialized object-to-long hashmap
+ * (to avoid boxing-unboxing overhead of Long in java/scala HashMap)
+ */
+ def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, Long)] = {
+ val map = new OLMap[String]
+ var i = 0
+ var j = 0
+ while (iter.hasNext) {
+ val s = iter.next()
+ i = 0
+ while (i < s.length) {
+ j = i
+ while (j < s.length && s.charAt(j) != ' ') {
+ j += 1
+ }
+ if (j > i) {
+ val w = s.substring(i, j)
+ val c = map.getLong(w)
+ map.put(w, c + 1)
+ }
+ i = j
+ while (i < s.length && s.charAt(i) == ' ') {
+ i += 1
+ }
+ }
+ }
+ map.toIterator.map{case (k, v) => (k, v)}
+ }
+
+ /**
+ * Gets the top k words in terms of word counts. Assumes that each word exists only once
+ * in the `data` iterator (that is, the counts have been reduced).
+ */
+ def topK(data: Iterator[(String, Long)], k: Int): Iterator[(String, Long)] = {
+ val taken = new Array[(String, Long)](k)
+
+ var i = 0
+ var len = 0
+ var done = false
+ var value: (String, Long) = null
+ var swap: (String, Long) = null
+ var count = 0
+
+ while(data.hasNext) {
+ value = data.next
+ if (value != null) {
+ count += 1
+ if (len == 0) {
+ taken(0) = value
+ len = 1
+ } else if (len < k || value._2 > taken(len - 1)._2) {
+ if (len < k) {
+ len += 1
+ }
+ taken(len - 1) = value
+ i = len - 1
+ while(i > 0 && taken(i - 1)._2 < taken(i)._2) {
+ swap = taken(i)
+ taken(i) = taken(i-1)
+ taken(i - 1) = swap
+ i -= 1
+ }
+ }
+ }
+ }
+ return taken.toIterator
+ }
+
+ /**
+ * Warms up the SparkContext in master and slave by running tasks to force JIT kick in
+ * before real workload starts.
+ */
+ def warmUp(sc: SparkContext) {
+ for(i <- 0 to 4) {
+ sc.parallelize(1 to 200000, 1000)
+ .map(_ % 1331).map(_.toString)
+ .mapPartitions(splitAndCountPartitions).reduceByKey(_ + _, 10)
+ .count()
+ }
+ }
+
+ def add(v1: Long, v2: Long) = (v1 + v2)
+
+ def subtract(v1: Long, v2: Long) = (v1 - v2)
+
+ def max(v1: Long, v2: Long) = math.max(v1, v2)
+}
+