aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-08-26 16:45:37 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-08-26 16:45:37 -0700
commit23a29b6d197987fc294d867e16576214d5a5c776 (patch)
tree4890653c5c9f0875d0d7e4dd79e6b5d0877b9f25 /streaming
parentb120e24fe04b987cfb5c487251fc10e36b377d6b (diff)
parentb08ff710af9b6592e3b43308ec4598bd3e6da084 (diff)
downloadspark-23a29b6d197987fc294d867e16576214d5a5c776.tar.gz
spark-23a29b6d197987fc294d867e16576214d5a5c776.tar.bz2
spark-23a29b6d197987fc294d867e16576214d5a5c776.zip
Merge branch 'dev' of github.com:radlab/spark into dev
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala3
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCount2.scala98
-rw-r--r--streaming/src/main/scala/spark/streaming/util/SenderReceiverTest.scala9
3 files changed, 106 insertions, 4 deletions
diff --git a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
index 11fa4e5443..d097896d0a 100644
--- a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
@@ -75,11 +75,12 @@ extends DStream[(K,V)](parent.ssc) {
val previousWindow = getAdjustedWindow(currentTime - slideTime, windowTime)
logInfo("Current window = " + currentWindow)
+ logInfo("Slide time = " + slideTime)
logInfo("Previous window = " + previousWindow)
logInfo("Parent.zeroTime = " + parent.zeroTime)
if (allowPartialWindows) {
- if (currentTime - slideTime == parent.zeroTime) {
+ if (currentTime - slideTime <= parent.zeroTime) {
reducedStream.getOrCompute(currentTime) match {
case Some(rdd) => return Some(rdd)
case None => throw new Exception("Could not get first reduced RDD for time " + currentTime)
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala
new file mode 100644
index 0000000000..83cbd31283
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala
@@ -0,0 +1,98 @@
+package spark.streaming.examples
+
+import spark.SparkContext
+import SparkContext._
+import spark.streaming._
+import SparkStreamContext._
+
+import spark.storage.StorageLevel
+
+import scala.util.Sorting
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.Queue
+import scala.collection.JavaConversions.mapAsScalaMap
+
+import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
+
+
+object WordCount2_ExtraFunctions {
+
+ def add(v1: Long, v2: Long) = (v1 + v2)
+
+ def subtract(v1: Long, v2: Long) = (v1 - v2)
+
+ def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, Long)] = {
+ //val map = new java.util.HashMap[String, Long]
+ val map = new OLMap[String]
+ var i = 0
+ var j = 0
+ while (iter.hasNext) {
+ val s = iter.next()
+ i = 0
+ while (i < s.length) {
+ j = i
+ while (j < s.length && s.charAt(j) != ' ') {
+ j += 1
+ }
+ if (j > i) {
+ val w = s.substring(i, j)
+ val c = map.getLong(w)
+ map.put(w, c + 1)
+/*
+ if (c == null) {
+ map.put(w, 1)
+ } else {
+ map.put(w, c + 1)
+ }
+*/
+ }
+ i = j
+ while (i < s.length && s.charAt(i) == ' ') {
+ i += 1
+ }
+ }
+ }
+ map.toIterator.map{case (k, v) => (k, v)}
+ }
+}
+
+object WordCount2 {
+
+ def moreWarmup(sc: SparkContext) {
+ (0 until 40).foreach {i =>
+ sc.parallelize(1 to 20000000, 1000)
+ .map(_ % 1331).map(_.toString)
+ .mapPartitions(WordCount2_ExtraFunctions.splitAndCountPartitions).reduceByKey(_ + _, 10)
+ .collect()
+ }
+ }
+
+ def main (args: Array[String]) {
+
+ if (args.length < 2) {
+ println ("Usage: SparkStreamContext <host> <file>")
+ System.exit(1)
+ }
+
+ val ssc = new SparkStreamContext(args(0), "WordCount2")
+ ssc.setBatchDuration(Seconds(1))
+
+ val sentences = new ConstantInputDStream(ssc, ssc.sc.textFile(args(1)).cache())
+ ssc.inputStreams += sentences
+
+ import WordCount2_ExtraFunctions._
+
+ val windowedCounts = sentences
+ .mapPartitions(splitAndCountPartitions)
+ .reduceByKeyAndWindow(add _, subtract _, Seconds(10), Seconds(1), 10)
+ windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER, Seconds(10))
+ windowedCounts.print()
+
+ ssc.start()
+
+ while(true) { Thread.sleep(1000) }
+ }
+}
+
+
diff --git a/streaming/src/main/scala/spark/streaming/util/SenderReceiverTest.scala b/streaming/src/main/scala/spark/streaming/util/SenderReceiverTest.scala
index 9fb1924798..3922dfbad6 100644
--- a/streaming/src/main/scala/spark/streaming/util/SenderReceiverTest.scala
+++ b/streaming/src/main/scala/spark/streaming/util/SenderReceiverTest.scala
@@ -19,9 +19,12 @@ object Receiver {
val is = new DataInputStream(new BufferedInputStream(socket.getInputStream))
var loop = true
var string: String = null
- while((string = is.readUTF) != null) {
- count += 28
- }
+ do {
+ string = is.readUTF()
+ if (string != null) {
+ count += 28
+ }
+ } while (string != null)
} catch {
case e: Exception => e.printStackTrace()
}