diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-08-26 16:45:37 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-08-26 16:45:37 -0700 |
commit | 23a29b6d197987fc294d867e16576214d5a5c776 (patch) | |
tree | 4890653c5c9f0875d0d7e4dd79e6b5d0877b9f25 /streaming | |
parent | b120e24fe04b987cfb5c487251fc10e36b377d6b (diff) | |
parent | b08ff710af9b6592e3b43308ec4598bd3e6da084 (diff) | |
download | spark-23a29b6d197987fc294d867e16576214d5a5c776.tar.gz spark-23a29b6d197987fc294d867e16576214d5a5c776.tar.bz2 spark-23a29b6d197987fc294d867e16576214d5a5c776.zip |
Merge branch 'dev' of github.com:radlab/spark into dev
Diffstat (limited to 'streaming')
3 files changed, 106 insertions, 4 deletions
diff --git a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala index 11fa4e5443..d097896d0a 100644 --- a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala @@ -75,11 +75,12 @@ extends DStream[(K,V)](parent.ssc) { val previousWindow = getAdjustedWindow(currentTime - slideTime, windowTime) logInfo("Current window = " + currentWindow) + logInfo("Slide time = " + slideTime) logInfo("Previous window = " + previousWindow) logInfo("Parent.zeroTime = " + parent.zeroTime) if (allowPartialWindows) { - if (currentTime - slideTime == parent.zeroTime) { + if (currentTime - slideTime <= parent.zeroTime) { reducedStream.getOrCompute(currentTime) match { case Some(rdd) => return Some(rdd) case None => throw new Exception("Could not get first reduced RDD for time " + currentTime) diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala new file mode 100644 index 0000000000..83cbd31283 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala @@ -0,0 +1,98 @@ +package spark.streaming.examples + +import spark.SparkContext +import SparkContext._ +import spark.streaming._ +import SparkStreamContext._ + +import spark.storage.StorageLevel + +import scala.util.Sorting +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.HashMap +import scala.collection.mutable.Queue +import scala.collection.JavaConversions.mapAsScalaMap + +import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap} + + +object WordCount2_ExtraFunctions { + + def add(v1: Long, v2: Long) = (v1 + v2) + + def subtract(v1: Long, v2: Long) = (v1 - v2) + + def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, Long)] = { + //val map = new java.util.HashMap[String, Long] + val map = new OLMap[String] + var i = 0 + var j = 0 + while (iter.hasNext) { + val s = iter.next() + i = 0 + while (i < s.length) { + j = i + while (j < s.length && s.charAt(j) != ' ') { + j += 1 + } + if (j > i) { + val w = s.substring(i, j) + val c = map.getLong(w) + map.put(w, c + 1) +/* + if (c == null) { + map.put(w, 1) + } else { + map.put(w, c + 1) + } +*/ + } + i = j + while (i < s.length && s.charAt(i) == ' ') { + i += 1 + } + } + } + map.toIterator.map{case (k, v) => (k, v)} + } +} + +object WordCount2 { + + def moreWarmup(sc: SparkContext) { + (0 until 40).foreach {i => + sc.parallelize(1 to 20000000, 1000) + .map(_ % 1331).map(_.toString) + .mapPartitions(WordCount2_ExtraFunctions.splitAndCountPartitions).reduceByKey(_ + _, 10) + .collect() + } + } + + def main (args: Array[String]) { + + if (args.length < 2) { + println ("Usage: SparkStreamContext <host> <file>") + System.exit(1) + } + + val ssc = new SparkStreamContext(args(0), "WordCount2") + ssc.setBatchDuration(Seconds(1)) + + val sentences = new ConstantInputDStream(ssc, ssc.sc.textFile(args(1)).cache()) + ssc.inputStreams += sentences + + import WordCount2_ExtraFunctions._ + + val windowedCounts = sentences + .mapPartitions(splitAndCountPartitions) + .reduceByKeyAndWindow(add _, subtract _, Seconds(10), Seconds(1), 10) + windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER, Seconds(10)) + windowedCounts.print() + + ssc.start() + + while(true) { Thread.sleep(1000) } + } +} + + diff --git a/streaming/src/main/scala/spark/streaming/util/SenderReceiverTest.scala b/streaming/src/main/scala/spark/streaming/util/SenderReceiverTest.scala index 9fb1924798..3922dfbad6 100644 --- a/streaming/src/main/scala/spark/streaming/util/SenderReceiverTest.scala +++ b/streaming/src/main/scala/spark/streaming/util/SenderReceiverTest.scala @@ -19,9 +19,12 @@ object Receiver { val is = new DataInputStream(new BufferedInputStream(socket.getInputStream)) var loop = true var string: String = null - while((string = is.readUTF) != null) { - count += 28 - } + do { + string = is.readUTF() + if (string != null) { + count += 28 + } + } while (string != null) } catch { case e: Exception => e.printStackTrace() } |