aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-07-28 20:03:26 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-07-28 20:03:26 -0700
commit47b7ebad12a17218f6ca0301fc802c0e0a81d873 (patch)
treeae063922421acf6507a2bace2f6e1cb02ad914b6
parentdc8763fcf782f8befdff6ec8ee5cbd701025ec87 (diff)
downloadspark-47b7ebad12a17218f6ca0301fc802c0e0a81d873.tar.gz
spark-47b7ebad12a17218f6ca0301fc802c0e0a81d873.tar.bz2
spark-47b7ebad12a17218f6ca0301fc802c0e0a81d873.zip
Added the Spark Streaing code, ported to Akka 2
-rw-r--r--core/src/main/scala/spark/BlockRDD.scala42
-rw-r--r--core/src/main/scala/spark/SparkContext.scala5
-rw-r--r--project/SparkBuild.scala6
-rwxr-xr-xrun2
-rw-r--r--sentences.txt3
-rwxr-xr-xstartTrigger.sh3
-rw-r--r--streaming/src/main/scala/spark/stream/BlockID.scala20
-rw-r--r--streaming/src/main/scala/spark/stream/ConnectionHandler.scala157
-rw-r--r--streaming/src/main/scala/spark/stream/DumbTopKWordCount2_Special.scala138
-rw-r--r--streaming/src/main/scala/spark/stream/DumbWordCount2_Special.scala92
-rw-r--r--streaming/src/main/scala/spark/stream/FileStreamReceiver.scala70
-rw-r--r--streaming/src/main/scala/spark/stream/GrepCount.scala39
-rw-r--r--streaming/src/main/scala/spark/stream/GrepCount2.scala113
-rw-r--r--streaming/src/main/scala/spark/stream/GrepCountApprox.scala54
-rw-r--r--streaming/src/main/scala/spark/stream/IdealPerformance.scala36
-rw-r--r--streaming/src/main/scala/spark/stream/Interval.scala75
-rw-r--r--streaming/src/main/scala/spark/stream/Job.scala21
-rw-r--r--streaming/src/main/scala/spark/stream/JobManager.scala112
-rw-r--r--streaming/src/main/scala/spark/stream/JobManager2.scala37
-rw-r--r--streaming/src/main/scala/spark/stream/NetworkStreamReceiver.scala184
-rw-r--r--streaming/src/main/scala/spark/stream/RDS.scala607
-rw-r--r--streaming/src/main/scala/spark/stream/ReducedWindowedRDS.scala218
-rw-r--r--streaming/src/main/scala/spark/stream/Scheduler.scala181
-rw-r--r--streaming/src/main/scala/spark/stream/SenGeneratorForPerformanceTest.scala78
-rw-r--r--streaming/src/main/scala/spark/stream/SenderReceiverTest.scala63
-rw-r--r--streaming/src/main/scala/spark/stream/SentenceFileGenerator.scala92
-rw-r--r--streaming/src/main/scala/spark/stream/SentenceGenerator.scala103
-rw-r--r--streaming/src/main/scala/spark/stream/ShuffleTest.scala22
-rw-r--r--streaming/src/main/scala/spark/stream/SimpleWordCount.scala30
-rw-r--r--streaming/src/main/scala/spark/stream/SimpleWordCount2.scala51
-rw-r--r--streaming/src/main/scala/spark/stream/SimpleWordCount2_Special.scala83
-rw-r--r--streaming/src/main/scala/spark/stream/SparkStreamContext.scala105
-rw-r--r--streaming/src/main/scala/spark/stream/TestGenerator.scala107
-rw-r--r--streaming/src/main/scala/spark/stream/TestGenerator2.scala119
-rw-r--r--streaming/src/main/scala/spark/stream/TestGenerator4.scala244
-rw-r--r--streaming/src/main/scala/spark/stream/TestInputBlockTracker.scala42
-rw-r--r--streaming/src/main/scala/spark/stream/TestStreamCoordinator.scala38
-rw-r--r--streaming/src/main/scala/spark/stream/TestStreamReceiver3.scala420
-rw-r--r--streaming/src/main/scala/spark/stream/TestStreamReceiver4.scala373
-rw-r--r--streaming/src/main/scala/spark/stream/Time.scala85
-rw-r--r--streaming/src/main/scala/spark/stream/TopContentCount.scala97
-rw-r--r--streaming/src/main/scala/spark/stream/TopKWordCount2.scala103
-rw-r--r--streaming/src/main/scala/spark/stream/TopKWordCount2_Special.scala142
-rw-r--r--streaming/src/main/scala/spark/stream/WindowedRDS.scala68
-rw-r--r--streaming/src/main/scala/spark/stream/WordCount.scala62
-rw-r--r--streaming/src/main/scala/spark/stream/WordCount1.scala46
-rw-r--r--streaming/src/main/scala/spark/stream/WordCount2.scala55
-rw-r--r--streaming/src/main/scala/spark/stream/WordCount2_Special.scala94
-rw-r--r--streaming/src/main/scala/spark/stream/WordCount3.scala49
-rw-r--r--streaming/src/main/scala/spark/stream/WordCountEc2.scala41
-rw-r--r--streaming/src/main/scala/spark/stream/WordCountTrivialWindow.scala51
-rw-r--r--streaming/src/main/scala/spark/stream/WordMax.scala64
52 files changed, 5141 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/BlockRDD.scala b/core/src/main/scala/spark/BlockRDD.scala
new file mode 100644
index 0000000000..ea009f0f4f
--- /dev/null
+++ b/core/src/main/scala/spark/BlockRDD.scala
@@ -0,0 +1,42 @@
+package spark
+
+import scala.collection.mutable.HashMap
+
+class BlockRDDSplit(val blockId: String, idx: Int) extends Split {
+ val index = idx
+}
+
+
+class BlockRDD[T: ClassManifest](sc: SparkContext, blockIds: Array[String]) extends RDD[T](sc) {
+
+ @transient
+ val splits_ = (0 until blockIds.size).map(i => {
+ new BlockRDDSplit(blockIds(i), i).asInstanceOf[Split]
+ }).toArray
+
+ @transient
+ lazy val locations_ = {
+ val blockManager = SparkEnv.get.blockManager
+ /*val locations = blockIds.map(id => blockManager.getLocations(id))*/
+ val locations = blockManager.getLocations(blockIds)
+ HashMap(blockIds.zip(locations):_*)
+ }
+
+ override def splits = splits_
+
+ override def compute(split: Split): Iterator[T] = {
+ val blockManager = SparkEnv.get.blockManager
+ val blockId = split.asInstanceOf[BlockRDDSplit].blockId
+ blockManager.get(blockId) match {
+ case Some(block) => block.asInstanceOf[Iterator[T]]
+ case None =>
+ throw new Exception("Could not compute split, block " + blockId + " not found")
+ }
+ }
+
+ override def preferredLocations(split: Split) =
+ locations_(split.asInstanceOf[BlockRDDSplit].blockId)
+
+ override val dependencies: List[Dependency[_]] = Nil
+}
+
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index dd17d4d6b3..78c7618542 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -409,6 +409,11 @@ class SparkContext(
* various Spark features.
*/
object SparkContext {
+
+ // TODO: temporary hack for using HDFS as input in streaing
+ var inputFile: String = null
+ var idealPartitions: Int = 1
+
implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] {
def addInPlace(t1: Double, t2: Double): Double = t1 + t2
def zero(initialValue: Double) = 0.0
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 726d490738..c4ada2bf2a 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -8,7 +8,7 @@ object SparkBuild extends Build {
// "1.0.1" for Apache releases, or "0.20.2-cdh3u3" for Cloudera Hadoop.
val HADOOP_VERSION = "0.20.205.0"
- lazy val root = Project("root", file("."), settings = sharedSettings) aggregate(core, repl, examples, bagel)
+ lazy val root = Project("root", file("."), settings = sharedSettings) aggregate(core, repl, examples, bagel, streaming)
lazy val core = Project("core", file("core"), settings = coreSettings)
@@ -18,6 +18,8 @@ object SparkBuild extends Build {
lazy val bagel = Project("bagel", file("bagel"), settings = bagelSettings) dependsOn (core)
+ lazy val streaming = Project("streaming", file("streaming"), settings = streamingSettings) dependsOn (core)
+
def sharedSettings = Defaults.defaultSettings ++ Seq(
organization := "org.spark-project",
version := "0.6.0-SNAPSHOT",
@@ -82,6 +84,8 @@ object SparkBuild extends Build {
def bagelSettings = sharedSettings ++ Seq(name := "spark-bagel")
+ def streamingSettings = sharedSettings ++ Seq(name := "spark-streaming")
+
def extraAssemblySettings() = Seq(test in assembly := {}) ++ Seq(
mergeStrategy in assembly := {
case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
diff --git a/run b/run
index 8f7256b4e5..e3e98f4280 100755
--- a/run
+++ b/run
@@ -46,6 +46,7 @@ CORE_DIR="$FWDIR/core"
REPL_DIR="$FWDIR/repl"
EXAMPLES_DIR="$FWDIR/examples"
BAGEL_DIR="$FWDIR/bagel"
+STREAMING_DIR="$FWDIR/streaming"
# Build up classpath
CLASSPATH="$SPARK_CLASSPATH"
@@ -55,6 +56,7 @@ CLASSPATH+=":$CORE_DIR/target/scala-$SCALA_VERSION/classes"
CLASSPATH+=":$CORE_DIR/src/main/resources"
CLASSPATH+=":$REPL_DIR/target/scala-$SCALA_VERSION/classes"
CLASSPATH+=":$EXAMPLES_DIR/target/scala-$SCALA_VERSION/classes"
+CLASSPATH+=":$STREAMING_DIR/target/scala-$SCALA_VERSION/classes"
for jar in `find $CORE_DIR/lib -name '*jar'`; do
CLASSPATH+=":$jar"
done
diff --git a/sentences.txt b/sentences.txt
new file mode 100644
index 0000000000..fedf96c66e
--- /dev/null
+++ b/sentences.txt
@@ -0,0 +1,3 @@
+Hello world!
+What's up?
+There is no cow level
diff --git a/startTrigger.sh b/startTrigger.sh
new file mode 100755
index 0000000000..0afce91a3e
--- /dev/null
+++ b/startTrigger.sh
@@ -0,0 +1,3 @@
+#!/bin/bash
+
+./run spark.stream.SentenceGenerator localhost 7078 sentences.txt 1
diff --git a/streaming/src/main/scala/spark/stream/BlockID.scala b/streaming/src/main/scala/spark/stream/BlockID.scala
new file mode 100644
index 0000000000..a3fd046c9a
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/BlockID.scala
@@ -0,0 +1,20 @@
+package spark.stream
+
+case class BlockID(sRds: String, sInterval: Interval, sPartition: Int) {
+ override def toString : String = (
+ sRds + BlockID.sConnector +
+ sInterval.beginTime + BlockID.sConnector +
+ sInterval.endTime + BlockID.sConnector +
+ sPartition
+ )
+}
+
+object BlockID {
+ val sConnector = '-'
+
+ def parse(name : String) = BlockID(
+ name.split(BlockID.sConnector)(0),
+ new Interval(name.split(BlockID.sConnector)(1).toLong,
+ name.split(BlockID.sConnector)(2).toLong),
+ name.split(BlockID.sConnector)(3).toInt)
+} \ No newline at end of file
diff --git a/streaming/src/main/scala/spark/stream/ConnectionHandler.scala b/streaming/src/main/scala/spark/stream/ConnectionHandler.scala
new file mode 100644
index 0000000000..73b82b76b8
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/ConnectionHandler.scala
@@ -0,0 +1,157 @@
+package spark.stream
+
+import spark.Logging
+
+import scala.collection.mutable.{ArrayBuffer, SynchronizedQueue}
+
+import java.net._
+import java.io._
+import java.nio._
+import java.nio.charset._
+import java.nio.channels._
+import java.nio.channels.spi._
+
+abstract class ConnectionHandler(host: String, port: Int, connect: Boolean)
+extends Thread with Logging {
+
+ val selector = SelectorProvider.provider.openSelector()
+ val interestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)]
+
+ initLogging()
+
+ override def run() {
+ try {
+ if (connect) {
+ connect()
+ } else {
+ listen()
+ }
+
+ var interrupted = false
+ while(!interrupted) {
+
+ preSelect()
+
+ while(!interestChangeRequests.isEmpty) {
+ val (key, ops) = interestChangeRequests.dequeue
+ val lastOps = key.interestOps()
+ key.interestOps(ops)
+
+ def intToOpStr(op: Int): String = {
+ val opStrs = new ArrayBuffer[String]()
+ if ((op & SelectionKey.OP_READ) != 0) opStrs += "READ"
+ if ((op & SelectionKey.OP_WRITE) != 0) opStrs += "WRITE"
+ if ((op & SelectionKey.OP_CONNECT) != 0) opStrs += "CONNECT"
+ if ((op & SelectionKey.OP_ACCEPT) != 0) opStrs += "ACCEPT"
+ if (opStrs.size > 0) opStrs.reduceLeft(_ + " | " + _) else " "
+ }
+
+ logTrace("Changed ops from [" + intToOpStr(lastOps) + "] to [" + intToOpStr(ops) + "]")
+ }
+
+ selector.select()
+ interrupted = Thread.currentThread.isInterrupted
+
+ val selectedKeys = selector.selectedKeys().iterator()
+ while (selectedKeys.hasNext) {
+ val key = selectedKeys.next.asInstanceOf[SelectionKey]
+ selectedKeys.remove()
+ if (key.isValid) {
+ if (key.isAcceptable) {
+ accept(key)
+ } else if (key.isConnectable) {
+ finishConnect(key)
+ } else if (key.isReadable) {
+ read(key)
+ } else if (key.isWritable) {
+ write(key)
+ }
+ }
+ }
+ }
+ } catch {
+ case e: Exception => {
+ logError("Error in select loop", e)
+ }
+ }
+ }
+
+ def connect() {
+ val socketAddress = new InetSocketAddress(host, port)
+ val channel = SocketChannel.open()
+ channel.configureBlocking(false)
+ channel.socket.setReuseAddress(true)
+ channel.socket.setTcpNoDelay(true)
+ channel.connect(socketAddress)
+ channel.register(selector, SelectionKey.OP_CONNECT)
+ logInfo("Initiating connection to [" + socketAddress + "]")
+ }
+
+ def listen() {
+ val channel = ServerSocketChannel.open()
+ channel.configureBlocking(false)
+ channel.socket.setReuseAddress(true)
+ channel.socket.setReceiveBufferSize(256 * 1024)
+ channel.socket.bind(new InetSocketAddress(port))
+ channel.register(selector, SelectionKey.OP_ACCEPT)
+ logInfo("Listening on port " + port)
+ }
+
+ def finishConnect(key: SelectionKey) {
+ try {
+ val channel = key.channel.asInstanceOf[SocketChannel]
+ val address = channel.socket.getRemoteSocketAddress
+ channel.finishConnect()
+ logInfo("Connected to [" + host + ":" + port + "]")
+ ready(key)
+ } catch {
+ case e: IOException => {
+ logError("Error finishing connect to " + host + ":" + port)
+ close(key)
+ }
+ }
+ }
+
+ def accept(key: SelectionKey) {
+ try {
+ val serverChannel = key.channel.asInstanceOf[ServerSocketChannel]
+ val channel = serverChannel.accept()
+ val address = channel.socket.getRemoteSocketAddress
+ channel.configureBlocking(false)
+ logInfo("Accepted connection from [" + address + "]")
+ ready(channel.register(selector, 0))
+ } catch {
+ case e: IOException => {
+ logError("Error accepting connection", e)
+ }
+ }
+ }
+
+ def changeInterest(key: SelectionKey, ops: Int) {
+ logTrace("Added request to change ops to " + ops)
+ interestChangeRequests += ((key, ops))
+ }
+
+ def ready(key: SelectionKey)
+
+ def preSelect() {
+ }
+
+ def read(key: SelectionKey) {
+ throw new UnsupportedOperationException("Cannot read on connection of type " + this.getClass.toString)
+ }
+
+ def write(key: SelectionKey) {
+ throw new UnsupportedOperationException("Cannot write on connection of type " + this.getClass.toString)
+ }
+
+ def close(key: SelectionKey) {
+ try {
+ key.channel.close()
+ key.cancel()
+ Thread.currentThread.interrupt
+ } catch {
+ case e: Exception => logError("Error closing connection", e)
+ }
+ }
+}
diff --git a/streaming/src/main/scala/spark/stream/DumbTopKWordCount2_Special.scala b/streaming/src/main/scala/spark/stream/DumbTopKWordCount2_Special.scala
new file mode 100644
index 0000000000..bd43f44b1a
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/DumbTopKWordCount2_Special.scala
@@ -0,0 +1,138 @@
+package spark.stream
+
+import spark.SparkContext
+import SparkContext._
+import SparkStreamContext._
+
+import spark.storage.StorageLevel
+
+import scala.util.Sorting
+import scala.collection.JavaConversions.mapAsScalaMap
+import scala.collection.mutable.Queue
+
+import java.lang.{Long => JLong}
+
+object DumbTopKWordCount2_Special {
+
+ def moreWarmup(sc: SparkContext) {
+ (0 until 20).foreach {i =>
+ sc.parallelize(1 to 20000000, 500)
+ .map(_ % 100).map(_.toString)
+ .map(x => (x, 1)).reduceByKey(_ + _, 10)
+ .collect()
+ }
+ }
+
+ def main (args: Array[String]) {
+
+ if (args.length < 2) {
+ println ("Usage: SparkStreamContext <host> <# sentence streams>")
+ System.exit(1)
+ }
+
+ val ssc = new SparkStreamContext(args(0), "WordCount2")
+
+ val numSentenceStreams = if (args.length > 1) args(1).toInt else 1
+ if (args.length > 2) {
+ ssc.setTempDir(args(2))
+ }
+
+ GrepCount2.warmConnectionManagers(ssc.sc)
+ moreWarmup(ssc.sc)
+
+ val sentences = new UnifiedRDS(
+ (1 to numSentenceStreams).map(i => ssc.readTestStream("Sentences-" + i, 1000)).toArray
+ )
+
+
+ def add(v1: JLong, v2: JLong) = (v1 + v2)
+ def subtract(v1: JLong, v2: JLong) = (v1 - v2)
+
+ def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, JLong)] = {
+ val map = new java.util.HashMap[String, JLong]
+ var i = 0
+ var j = 0
+ while (iter.hasNext) {
+ val s = iter.next()
+ i = 0
+ while (i < s.length) {
+ j = i
+ while (j < s.length && s.charAt(j) != ' ') {
+ j += 1
+ }
+ if (j > i) {
+ val w = s.substring(i, j)
+ val c = map.get(w)
+ if (c == null) {
+ map.put(w, 1)
+ } else {
+ map.put(w, c + 1)
+ }
+ }
+ i = j
+ while (i < s.length && s.charAt(i) == ' ') {
+ i += 1
+ }
+ }
+ }
+ map.toIterator
+ }
+
+
+ val wordCounts = sentences.mapPartitions(splitAndCountPartitions).reduceByKey(_ + _, 10)
+ wordCounts.persist(StorageLevel.MEMORY_ONLY)
+ val windowedCounts = wordCounts.window(Seconds(10), Seconds(1)).reduceByKey(_ + _, 10)
+
+ def topK(data: Iterator[(String, JLong)], k: Int): Iterator[(String, JLong)] = {
+ val taken = new Array[(String, JLong)](k)
+
+ var i = 0
+ var len = 0
+ var done = false
+ var value: (String, JLong) = null
+ var swap: (String, JLong) = null
+ var count = 0
+
+ while(data.hasNext) {
+ value = data.next
+ count += 1
+ /*println("count = " + count)*/
+ if (len == 0) {
+ taken(0) = value
+ len = 1
+ } else if (len < k || value._2 > taken(len - 1)._2) {
+ if (len < k) {
+ len += 1
+ }
+ taken(len - 1) = value
+ i = len - 1
+ while(i > 0 && taken(i - 1)._2 < taken(i)._2) {
+ swap = taken(i)
+ taken(i) = taken(i-1)
+ taken(i - 1) = swap
+ i -= 1
+ }
+ }
+ }
+ println("Took " + len + " out of " + count + " items")
+ return taken.toIterator
+ }
+
+ val k = 10
+ val partialTopKWindowedCounts = windowedCounts.mapPartitions(topK(_, k))
+ partialTopKWindowedCounts.foreachRDD(rdd => {
+ val collectedCounts = rdd.collect
+ println("Collected " + collectedCounts.size + " items")
+ topK(collectedCounts.toIterator, k).foreach(println)
+ })
+
+ /*
+ windowedCounts.filter(_ == null).foreachRDD(rdd => {
+ val count = rdd.count
+ println("# of nulls = " + count)
+ })*/
+
+ ssc.run
+ }
+}
+
diff --git a/streaming/src/main/scala/spark/stream/DumbWordCount2_Special.scala b/streaming/src/main/scala/spark/stream/DumbWordCount2_Special.scala
new file mode 100644
index 0000000000..31d682348a
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/DumbWordCount2_Special.scala
@@ -0,0 +1,92 @@
+package spark.stream
+
+import spark.SparkContext
+import SparkContext._
+import SparkStreamContext._
+
+import spark.storage.StorageLevel
+
+import scala.util.Sorting
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.Queue
+import scala.collection.JavaConversions.mapAsScalaMap
+
+import java.lang.{Long => JLong}
+import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
+
+object DumbWordCount2_Special {
+
+ def moreWarmup(sc: SparkContext) {
+ (0 until 20).foreach {i =>
+ sc.parallelize(1 to 20000000, 500)
+ .map(_ % 100).map(_.toString)
+ .map(x => (x, 1)).reduceByKey(_ + _, 10)
+ .collect()
+ }
+ }
+
+ def main (args: Array[String]) {
+
+ if (args.length < 2) {
+ println ("Usage: SparkStreamContext <host> <# sentence streams>")
+ System.exit(1)
+ }
+
+ val ssc = new SparkStreamContext(args(0), "WordCount2")
+
+ val numSentenceStreams = if (args.length > 1) args(1).toInt else 1
+ if (args.length > 2) {
+ ssc.setTempDir(args(2))
+ }
+
+ GrepCount2.warmConnectionManagers(ssc.sc)
+ moreWarmup(ssc.sc)
+
+ val sentences = new UnifiedRDS(
+ (1 to numSentenceStreams).map(i => ssc.readTestStream("Sentences-" + i, 1000)).toArray
+ )
+
+ def add(v1: JLong, v2: JLong) = (v1 + v2)
+ def subtract(v1: JLong, v2: JLong) = (v1 - v2)
+
+ def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, JLong)] = {
+ val map = new java.util.HashMap[String, JLong]
+ var i = 0
+ var j = 0
+ while (iter.hasNext) {
+ val s = iter.next()
+ i = 0
+ while (i < s.length) {
+ j = i
+ while (j < s.length && s.charAt(j) != ' ') {
+ j += 1
+ }
+ if (j > i) {
+ val w = s.substring(i, j)
+ val c = map.get(w)
+ if (c == null) {
+ map.put(w, 1)
+ } else {
+ map.put(w, c + 1)
+ }
+ }
+ i = j
+ while (i < s.length && s.charAt(i) == ' ') {
+ i += 1
+ }
+ }
+ }
+
+ map.toIterator
+ }
+
+ val wordCounts = sentences.mapPartitions(splitAndCountPartitions).reduceByKey(_ + _, 10)
+ wordCounts.persist(StorageLevel.MEMORY_ONLY)
+ val windowedCounts = wordCounts.window(Seconds(10), Seconds(1)).reduceByKey(_ + _, 10)
+ windowedCounts.foreachRDD(_.collect)
+
+ ssc.run
+ }
+}
+
diff --git a/streaming/src/main/scala/spark/stream/FileStreamReceiver.scala b/streaming/src/main/scala/spark/stream/FileStreamReceiver.scala
new file mode 100644
index 0000000000..026254d6e1
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/FileStreamReceiver.scala
@@ -0,0 +1,70 @@
+package spark.stream
+
+import spark.Logging
+
+import scala.collection.mutable.HashSet
+import scala.actors._
+import scala.actors.Actor._
+import scala.actors.remote._
+import scala.actors.remote.RemoteActor._
+
+import org.apache.hadoop.fs._
+import org.apache.hadoop.conf._
+import org.apache.hadoop.io._
+import org.apache.hadoop.mapred._
+import org.apache.hadoop.util._
+
+class FileStreamReceiver (
+ inputName: String,
+ rootDirectory: String,
+ intervalDuration: Long)
+ extends Logging {
+
+ val pollInterval = 100
+ val sparkstreamScheduler = {
+ val host = System.getProperty("spark.master.host")
+ val port = System.getProperty("spark.master.port").toInt + 1
+ RemoteActor.select(Node(host, port), 'SparkStreamScheduler)
+ }
+ val directory = new Path(rootDirectory)
+ val fs = directory.getFileSystem(new Configuration())
+ val files = new HashSet[String]()
+ var time: Long = 0
+
+ def start() {
+ fs.mkdirs(directory)
+ files ++= getFiles()
+
+ actor {
+ logInfo("Monitoring directory - " + rootDirectory)
+ while(true) {
+ testFiles(getFiles())
+ Thread.sleep(pollInterval)
+ }
+ }
+ }
+
+ def getFiles(): Iterable[String] = {
+ fs.listStatus(directory).map(_.getPath.toString)
+ }
+
+ def testFiles(fileList: Iterable[String]) {
+ fileList.foreach(file => {
+ if (!files.contains(file)) {
+ if (!file.endsWith("_tmp")) {
+ notifyFile(file)
+ }
+ files += file
+ }
+ })
+ }
+
+ def notifyFile(file: String) {
+ logInfo("Notifying file " + file)
+ time += intervalDuration
+ val interval = Interval(LongTime(time), LongTime(time + intervalDuration))
+ sparkstreamScheduler ! InputGenerated(inputName, interval, file)
+ }
+}
+
+
diff --git a/streaming/src/main/scala/spark/stream/GrepCount.scala b/streaming/src/main/scala/spark/stream/GrepCount.scala
new file mode 100644
index 0000000000..45b90d4837
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/GrepCount.scala
@@ -0,0 +1,39 @@
+package spark.stream
+
+import SparkStreamContext._
+
+import scala.util.Sorting
+
+import spark.SparkContext
+import spark.storage.StorageLevel
+
+object GrepCount {
+ var inputFile : String = null
+ var HDFS : String = null
+ var idealPartitions : Int = 0
+
+ def main (args: Array[String]) {
+
+ if (args.length != 4) {
+ println ("Usage: GrepCount <host> <HDFS> <Input file> <Ideal Partitions>")
+ System.exit(1)
+ }
+
+ HDFS = args(1)
+ inputFile = HDFS + args(2)
+ idealPartitions = args(3).toInt
+ println ("Input file: " + inputFile)
+
+ val ssc = new SparkStreamContext(args(0), "GrepCount")
+
+ SparkContext.idealPartitions = idealPartitions
+ SparkContext.inputFile = inputFile
+
+ val sentences = ssc.readNetworkStream[String]("Sentences", Array("localhost:55119"), 1000)
+ //sentences.print
+ val matching = sentences.filter(_.contains("light"))
+ matching.foreachRDD(rdd => println(rdd.count))
+
+ ssc.run
+ }
+}
diff --git a/streaming/src/main/scala/spark/stream/GrepCount2.scala b/streaming/src/main/scala/spark/stream/GrepCount2.scala
new file mode 100644
index 0000000000..4eb65ba906
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/GrepCount2.scala
@@ -0,0 +1,113 @@
+package spark.stream
+
+import SparkStreamContext._
+
+import scala.util.Sorting
+
+import spark.SparkEnv
+import spark.SparkContext
+import spark.storage.StorageLevel
+import spark.network.Message
+import spark.network.ConnectionManagerId
+
+import java.nio.ByteBuffer
+
+object GrepCount2 {
+
+ def startSparkEnvs(sc: SparkContext) {
+
+ val dummy = sc.parallelize(0 to 1000, 100).persist(StorageLevel.DISK_AND_MEMORY)
+ sc.runJob(dummy, (_: Iterator[Int]) => {})
+
+ println("SparkEnvs started")
+ Thread.sleep(1000)
+ /*sc.runJob(sc.parallelize(0 to 1000, 100), (_: Iterator[Int]) => {})*/
+ }
+
+ def warmConnectionManagers(sc: SparkContext) {
+ val slaveConnManagerIds = sc.parallelize(0 to 100, 100).map(
+ i => SparkEnv.get.connectionManager.id).collect().distinct
+ println("\nSlave ConnectionManagerIds")
+ slaveConnManagerIds.foreach(println)
+ println
+
+ Thread.sleep(1000)
+ val numSlaves = slaveConnManagerIds.size
+ val count = 3
+ val size = 5 * 1024 * 1024
+ val iterations = (500 * 1024 * 1024 / (numSlaves * size)).toInt
+ println("count = " + count + ", size = " + size + ", iterations = " + iterations)
+
+ (0 until count).foreach(i => {
+ val resultStrs = sc.parallelize(0 until numSlaves, numSlaves).map(i => {
+ val connManager = SparkEnv.get.connectionManager
+ val thisConnManagerId = connManager.id
+ /*connManager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => {
+ println("Received [" + msg + "] from [" + id + "]")
+ None
+ })*/
+
+
+ val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte))
+ buffer.flip
+
+ val startTime = System.currentTimeMillis
+ val futures = (0 until iterations).map(i => {
+ slaveConnManagerIds.filter(_ != thisConnManagerId).map(slaveConnManagerId => {
+ val bufferMessage = Message.createBufferMessage(buffer.duplicate)
+ println("Sending [" + bufferMessage + "] to [" + slaveConnManagerId + "]")
+ connManager.sendMessageReliably(slaveConnManagerId, bufferMessage)
+ })
+ }).flatMap(x => x)
+ val results = futures.map(f => f())
+ val finishTime = System.currentTimeMillis
+
+
+ val mb = size * results.size / 1024.0 / 1024.0
+ val ms = finishTime - startTime
+
+ val resultStr = "Sent " + mb + " MB in " + ms + " ms at " + (mb / ms * 1000.0) + " MB/s"
+ println(resultStr)
+ System.gc()
+ resultStr
+ }).collect()
+
+ println("---------------------")
+ println("Run " + i)
+ resultStrs.foreach(println)
+ println("---------------------")
+ })
+ }
+
+
+ def main (args: Array[String]) {
+
+ if (args.length < 2) {
+ println ("Usage: GrepCount2 <host> <# sentence streams>")
+ System.exit(1)
+ }
+
+ val ssc = new SparkStreamContext(args(0), "GrepCount2")
+
+ val numSentenceStreams = if (args.length > 1) args(1).toInt else 1
+ if (args.length > 2) {
+ ssc.setTempDir(args(2))
+ }
+
+ /*startSparkEnvs(ssc.sc)*/
+ warmConnectionManagers(ssc.sc)
+
+ val sentences = new UnifiedRDS(
+ (1 to numSentenceStreams).map(i => ssc.readTestStream("Sentences-"+i, 500)).toArray
+ )
+
+ val matching = sentences.filter(_.contains("light"))
+ matching.foreachRDD(rdd => println(rdd.count))
+
+ ssc.run
+ }
+}
+
+
+
+
diff --git a/streaming/src/main/scala/spark/stream/GrepCountApprox.scala b/streaming/src/main/scala/spark/stream/GrepCountApprox.scala
new file mode 100644
index 0000000000..a4be2cc936
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/GrepCountApprox.scala
@@ -0,0 +1,54 @@
+package spark.stream
+
+import SparkStreamContext._
+
+import scala.util.Sorting
+
+import spark.SparkContext
+import spark.storage.StorageLevel
+
+object GrepCountApprox {
+ var inputFile : String = null
+ var hdfs : String = null
+ var idealPartitions : Int = 0
+
+ def main (args: Array[String]) {
+
+ if (args.length != 5) {
+ println ("Usage: GrepCountApprox <host> <HDFS> <Input file> <Ideal Partitions> <Timeout>")
+ System.exit(1)
+ }
+
+ hdfs = args(1)
+ inputFile = hdfs + args(2)
+ idealPartitions = args(3).toInt
+ val timeout = args(4).toLong
+ println ("Input file: " + inputFile)
+
+ val ssc = new SparkStreamContext(args(0), "GrepCount")
+
+ SparkContext.idealPartitions = idealPartitions
+ SparkContext.inputFile = inputFile
+ ssc.setTempDir(hdfs + "/tmp")
+
+ val sentences = ssc.readNetworkStream[String]("Sentences", Array("localhost:55119"), 1000)
+ //sentences.print
+ val matching = sentences.filter(_.contains("light"))
+ var i = 0
+ val startTime = System.currentTimeMillis
+ matching.foreachRDD { rdd =>
+ val myNum = i
+ val result = rdd.countApprox(timeout)
+ val initialTime = (System.currentTimeMillis - startTime) / 1000.0
+ printf("APPROX\t%.2f\t%d\tinitial\t%.1f\t%.1f\n", initialTime, myNum, result.initialValue.mean,
+ result.initialValue.high - result.initialValue.low)
+ result.onComplete { r =>
+ val finalTime = (System.currentTimeMillis - startTime) / 1000.0
+ printf("APPROX\t%.2f\t%d\tfinal\t%.1f\t0.0\t%.1f\n", finalTime, myNum, r.mean, finalTime - initialTime)
+ }
+ i += 1
+ }
+
+ ssc.run
+ }
+}
diff --git a/streaming/src/main/scala/spark/stream/IdealPerformance.scala b/streaming/src/main/scala/spark/stream/IdealPerformance.scala
new file mode 100644
index 0000000000..589fb2def0
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/IdealPerformance.scala
@@ -0,0 +1,36 @@
+package spark.stream
+
+import scala.collection.mutable.Map
+
+object IdealPerformance {
+ val base: String = "The medium researcher counts around the pinched troop The empire breaks " +
+ "Matei Matei announces HY with a theorem "
+
+ def main (args: Array[String]) {
+ val sentences: String = base * 100000
+
+ for (i <- 1 to 30) {
+ val start = System.nanoTime
+
+ val words = sentences.split(" ")
+
+ val pairs = words.map(word => (word, 1))
+
+ val counts = Map[String, Int]()
+
+ println("Job " + i + " position A at " + (System.nanoTime - start) / 1e9)
+
+ pairs.foreach((pair) => {
+ var t = counts.getOrElse(pair._1, 0)
+ counts(pair._1) = t + pair._2
+ })
+ println("Job " + i + " position B at " + (System.nanoTime - start) / 1e9)
+
+ for ((word, count) <- counts) {
+ print(word + " " + count + "; ")
+ }
+ println
+ println("Job " + i + " finished in " + (System.nanoTime - start) / 1e9)
+ }
+ }
+} \ No newline at end of file
diff --git a/streaming/src/main/scala/spark/stream/Interval.scala b/streaming/src/main/scala/spark/stream/Interval.scala
new file mode 100644
index 0000000000..08d0ed95b4
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/Interval.scala
@@ -0,0 +1,75 @@
+package spark.stream
+
+case class Interval (val beginTime: Time, val endTime: Time) {
+
+ def this(beginMs: Long, endMs: Long) = this(new LongTime(beginMs), new LongTime(endMs))
+
+ def duration(): Time = endTime - beginTime
+
+ def += (time: Time) {
+ beginTime += time
+ endTime += time
+ this
+ }
+
+ def + (time: Time): Interval = {
+ new Interval(beginTime + time, endTime + time)
+ }
+
+ def < (that: Interval): Boolean = {
+ if (this.duration != that.duration) {
+ throw new Exception("Comparing two intervals with different durations [" + this + ", " + that + "]")
+ }
+ this.endTime < that.endTime
+ }
+
+ def <= (that: Interval) = (this < that || this == that)
+
+ def > (that: Interval) = !(this <= that)
+
+ def >= (that: Interval) = !(this < that)
+
+ def next(): Interval = {
+ this + (endTime - beginTime)
+ }
+
+ def isZero() = (beginTime.isZero && endTime.isZero)
+
+ def toFormattedString = beginTime.toFormattedString + "-" + endTime.toFormattedString
+
+ override def toString = "[" + beginTime + ", " + endTime + "]"
+}
+
+object Interval {
+
+ /*
+ implicit def longTupleToInterval (longTuple: (Long, Long)) =
+ Interval(longTuple._1, longTuple._2)
+
+ implicit def intTupleToInterval (intTuple: (Int, Int)) =
+ Interval(intTuple._1, intTuple._2)
+
+ implicit def string2Interval (str: String): Interval = {
+ val parts = str.split(",")
+ if (parts.length == 1)
+ return Interval.zero
+ return Interval (parts(0).toInt, parts(1).toInt)
+ }
+
+ def getInterval (timeMs: Long, intervalDurationMs: Long): Interval = {
+ val intervalBeginMs = timeMs / intervalDurationMs * intervalDurationMs
+ Interval(intervalBeginMs, intervalBeginMs + intervalDurationMs)
+ }
+ */
+
+ def zero() = new Interval (Time.zero, Time.zero)
+
+ def currentInterval(intervalDuration: LongTime): Interval = {
+ val time = LongTime(System.currentTimeMillis)
+ val intervalBegin = time.floor(intervalDuration)
+ Interval(intervalBegin, intervalBegin + intervalDuration)
+ }
+
+}
+
+
diff --git a/streaming/src/main/scala/spark/stream/Job.scala b/streaming/src/main/scala/spark/stream/Job.scala
new file mode 100644
index 0000000000..bfdd5db645
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/Job.scala
@@ -0,0 +1,21 @@
+package spark.stream
+
+class Job(val time: Time, func: () => _) {
+ val id = Job.getNewId()
+
+ def run() {
+ func()
+ }
+
+ override def toString = "SparkStream Job " + id + ":" + time
+}
+
+object Job {
+ var lastId = 1
+
+ def getNewId() = synchronized {
+ lastId += 1
+ lastId
+ }
+}
+
diff --git a/streaming/src/main/scala/spark/stream/JobManager.scala b/streaming/src/main/scala/spark/stream/JobManager.scala
new file mode 100644
index 0000000000..5ea80b92aa
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/JobManager.scala
@@ -0,0 +1,112 @@
+package spark.stream
+
+import spark.SparkEnv
+import spark.Logging
+
+import scala.collection.mutable.PriorityQueue
+import scala.actors._
+import scala.actors.Actor._
+import scala.actors.remote._
+import scala.actors.remote.RemoteActor._
+import scala.actors.scheduler.ResizableThreadPoolScheduler
+import scala.actors.scheduler.ForkJoinScheduler
+
+sealed trait JobManagerMessage
+case class RunJob(job: Job) extends JobManagerMessage
+case class JobCompleted(handlerId: Int) extends JobManagerMessage
+
+class JobHandler(ssc: SparkStreamContext, val id: Int) extends DaemonActor with Logging {
+
+ var busy = false
+
+ def act() {
+ loop {
+ receive {
+ case job: Job => {
+ SparkEnv.set(ssc.env)
+ try {
+ logInfo("Starting " + job)
+ job.run()
+ logInfo("Finished " + job)
+ if (job.time.isInstanceOf[LongTime]) {
+ val longTime = job.time.asInstanceOf[LongTime]
+ logInfo("Total pushing + skew + processing delay for " + longTime + " is " +
+ (System.currentTimeMillis - longTime.milliseconds) / 1000.0 + " s")
+ }
+ } catch {
+ case e: Exception => logError("SparkStream job failed", e)
+ }
+ busy = false
+ reply(JobCompleted(id))
+ }
+ }
+ }
+ }
+}
+
+class JobManager(ssc: SparkStreamContext, numThreads: Int = 2) extends DaemonActor with Logging {
+
+ implicit private val jobOrdering = new Ordering[Job] {
+ override def compare(job1: Job, job2: Job): Int = {
+ if (job1.time < job2.time) {
+ return 1
+ } else if (job2.time < job1.time) {
+ return -1
+ } else {
+ return 0
+ }
+ }
+ }
+
+ private val jobs = new PriorityQueue[Job]()
+ private val handlers = (0 until numThreads).map(i => new JobHandler(ssc, i))
+
+ def act() {
+ handlers.foreach(_.start)
+ loop {
+ receive {
+ case RunJob(job) => {
+ jobs += job
+ logInfo("Job " + job + " submitted")
+ runJob()
+ }
+ case JobCompleted(handlerId) => {
+ runJob()
+ }
+ }
+ }
+ }
+
+ def runJob(): Unit = {
+ logInfo("Attempting to allocate job ")
+ if (jobs.size > 0) {
+ handlers.find(!_.busy).foreach(handler => {
+ val job = jobs.dequeue
+ logInfo("Allocating job " + job + " to handler " + handler.id)
+ handler.busy = true
+ handler ! job
+ })
+ }
+ }
+}
+
+object JobManager {
+ def main(args: Array[String]) {
+ val ssc = new SparkStreamContext("local[4]", "JobManagerTest")
+ val jobManager = new JobManager(ssc)
+ jobManager.start()
+
+ val t = System.currentTimeMillis
+ for (i <- 1 to 10) {
+ jobManager ! RunJob(new Job(
+ LongTime(i),
+ () => {
+ Thread.sleep(500)
+ println("Job " + i + " took " + (System.currentTimeMillis - t) + " ms")
+ }
+ ))
+ }
+ Thread.sleep(6000)
+ }
+}
+
diff --git a/streaming/src/main/scala/spark/stream/JobManager2.scala b/streaming/src/main/scala/spark/stream/JobManager2.scala
new file mode 100644
index 0000000000..b69653b9a4
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/JobManager2.scala
@@ -0,0 +1,37 @@
+package spark.stream
+
+import spark.{Logging, SparkEnv}
+import java.util.concurrent.Executors
+
+
+class JobManager2(ssc: SparkStreamContext, numThreads: Int = 1) extends Logging {
+
+ class JobHandler(ssc: SparkStreamContext, job: Job) extends Runnable {
+ def run() {
+ SparkEnv.set(ssc.env)
+ try {
+ logInfo("Starting " + job)
+ job.run()
+ logInfo("Finished " + job)
+ if (job.time.isInstanceOf[LongTime]) {
+ val longTime = job.time.asInstanceOf[LongTime]
+ logInfo("Total notification + skew + processing delay for " + longTime + " is " +
+ (System.currentTimeMillis - longTime.milliseconds) / 1000.0 + " s")
+ if (System.getProperty("spark.stream.distributed", "false") == "true") {
+ TestInputBlockTracker.setEndTime(job.time)
+ }
+ }
+ } catch {
+ case e: Exception => logError("SparkStream job failed", e)
+ }
+ }
+ }
+
+ initLogging()
+
+ val jobExecutor = Executors.newFixedThreadPool(numThreads)
+
+ def runJob(job: Job) {
+ jobExecutor.execute(new JobHandler(ssc, job))
+ }
+}
diff --git a/streaming/src/main/scala/spark/stream/NetworkStreamReceiver.scala b/streaming/src/main/scala/spark/stream/NetworkStreamReceiver.scala
new file mode 100644
index 0000000000..8be46cc927
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/NetworkStreamReceiver.scala
@@ -0,0 +1,184 @@
+package spark.stream
+
+import spark.Logging
+import spark.storage.StorageLevel
+
+import scala.math._
+import scala.collection.mutable.{Queue, HashMap, ArrayBuffer}
+import scala.actors._
+import scala.actors.Actor._
+import scala.actors.remote._
+import scala.actors.remote.RemoteActor._
+
+import java.io.BufferedWriter
+import java.io.OutputStreamWriter
+
+import org.apache.hadoop.fs._
+import org.apache.hadoop.conf._
+import org.apache.hadoop.io._
+import org.apache.hadoop.mapred._
+import org.apache.hadoop.util._
+
+/*import akka.actor.Actor._*/
+
+class NetworkStreamReceiver[T: ClassManifest] (
+ inputName: String,
+ intervalDuration: Time,
+ splitId: Int,
+ ssc: SparkStreamContext,
+ tempDirectory: String)
+ extends DaemonActor
+ with Logging {
+
+ /**
+ * Assume all data coming in has non-decreasing timestamp.
+ */
+ final class Inbox[T: ClassManifest] (intervalDuration: Time) {
+ var currentBucket: (Interval, ArrayBuffer[T]) = null
+ val filledBuckets = new Queue[(Interval, ArrayBuffer[T])]()
+
+ def += (tuple: (Time, T)) = addTuple(tuple)
+
+ def addTuple(tuple: (Time, T)) {
+ val (time, data) = tuple
+ val interval = getInterval (time)
+
+ filledBuckets.synchronized {
+ if (currentBucket == null) {
+ currentBucket = (interval, new ArrayBuffer[T]())
+ }
+
+ if (interval != currentBucket._1) {
+ filledBuckets += currentBucket
+ currentBucket = (interval, new ArrayBuffer[T]())
+ }
+
+ currentBucket._2 += data
+ }
+ }
+
+ def getInterval(time: Time): Interval = {
+ val intervalBegin = time.floor(intervalDuration)
+ Interval (intervalBegin, intervalBegin + intervalDuration)
+ }
+
+ def hasFilledBuckets(): Boolean = {
+ filledBuckets.synchronized {
+ return filledBuckets.size > 0
+ }
+ }
+
+ def popFilledBucket(): (Interval, ArrayBuffer[T]) = {
+ filledBuckets.synchronized {
+ if (filledBuckets.size == 0) {
+ return null
+ }
+ return filledBuckets.dequeue()
+ }
+ }
+ }
+
+ val inbox = new Inbox[T](intervalDuration)
+ lazy val sparkstreamScheduler = {
+ val host = System.getProperty("spark.master.host")
+ val port = System.getProperty("spark.master.port").toInt
+ val url = "akka://spark@%s:%s/user/SparkStreamScheduler".format(host, port)
+ ssc.actorSystem.actorFor(url)
+ }
+ /*sparkstreamScheduler ! Test()*/
+
+ val intervalDurationMillis = intervalDuration.asInstanceOf[LongTime].milliseconds
+ val useBlockManager = true
+
+ initLogging()
+
+ override def act() {
+ // register the InputReceiver
+ val port = 7078
+ RemoteActor.alive(port)
+ RemoteActor.register(Symbol("NetworkStreamReceiver-"+inputName), self)
+ logInfo("Registered actor on port " + port)
+
+ loop {
+ reactWithin (getSleepTime) {
+ case TIMEOUT =>
+ flushInbox()
+ case data =>
+ val t = data.asInstanceOf[T]
+ inbox += (getTimeFromData(t), t)
+ }
+ }
+ }
+
+ def getSleepTime(): Long = {
+ (System.currentTimeMillis / intervalDurationMillis + 1) *
+ intervalDurationMillis - System.currentTimeMillis
+ }
+
+ def getTimeFromData(data: T): Time = {
+ LongTime(System.currentTimeMillis)
+ }
+
+ def flushInbox() {
+ while (inbox.hasFilledBuckets) {
+ inbox.synchronized {
+ val (interval, data) = inbox.popFilledBucket()
+ val dataArray = data.toArray
+ logInfo("Received " + dataArray.length + " items at interval " + interval)
+ val reference = {
+ if (useBlockManager) {
+ writeToBlockManager(dataArray, interval)
+ } else {
+ writeToDisk(dataArray, interval)
+ }
+ }
+ if (reference != null) {
+ logInfo("Notifying scheduler")
+ sparkstreamScheduler ! InputGenerated(inputName, interval, reference.toString)
+ }
+ }
+ }
+ }
+
+ def writeToDisk(data: Array[T], interval: Interval): String = {
+ try {
+ // TODO(Haoyuan): For current test, the following writing to file lines could be
+ // commented.
+ val fs = new Path(tempDirectory).getFileSystem(new Configuration())
+ val inputDir = new Path(
+ tempDirectory,
+ inputName + "-" + interval.toFormattedString)
+ val inputFile = new Path(inputDir, "part-" + splitId)
+ logInfo("Writing to file " + inputFile)
+ if (System.getProperty("spark.fake", "false") != "true") {
+ val writer = new BufferedWriter(new OutputStreamWriter(fs.create(inputFile, true)))
+ data.foreach(x => writer.write(x.toString + "\n"))
+ writer.close()
+ } else {
+ logInfo("Fake file")
+ }
+ inputFile.toString
+ }catch {
+ case e: Exception =>
+ logError("Exception writing to file at interval " + interval + ": " + e.getMessage, e)
+ null
+ }
+ }
+
+ def writeToBlockManager(data: Array[T], interval: Interval): String = {
+ try{
+ val blockId = inputName + "-" + interval.toFormattedString + "-" + splitId
+ if (System.getProperty("spark.fake", "false") != "true") {
+ logInfo("Writing as block " + blockId )
+ ssc.env.blockManager.put(blockId.toString, data.toIterator, StorageLevel.DISK_AND_MEMORY)
+ } else {
+ logInfo("Fake block")
+ }
+ blockId
+ } catch {
+ case e: Exception =>
+ logError("Exception writing to block manager at interval " + interval + ": " + e.getMessage, e)
+ null
+ }
+ }
+}
diff --git a/streaming/src/main/scala/spark/stream/RDS.scala b/streaming/src/main/scala/spark/stream/RDS.scala
new file mode 100644
index 0000000000..b83181b0d1
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/RDS.scala
@@ -0,0 +1,607 @@
+package spark.stream
+
+import spark.stream.SparkStreamContext._
+
+import spark.RDD
+import spark.BlockRDD
+import spark.UnionRDD
+import spark.Logging
+import spark.SparkContext
+import spark.SparkContext._
+import spark.storage.StorageLevel
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashMap
+
+import java.net.InetSocketAddress
+
+abstract class RDS[T: ClassManifest] (@transient val ssc: SparkStreamContext)
+extends Logging with Serializable {
+
+ initLogging()
+
+ /* ---------------------------------------------- */
+ /* Methods that must be implemented by subclasses */
+ /* ---------------------------------------------- */
+
+ // Time by which the window slides in this RDS
+ def slideTime: Time
+
+ // List of parent RDSs on which this RDS depends on
+ def dependencies: List[RDS[_]]
+
+ // Key method that computes RDD for a valid time
+ def compute (validTime: Time): Option[RDD[T]]
+
+ /* --------------------------------------- */
+ /* Other general fields and methods of RDS */
+ /* --------------------------------------- */
+
+ // Variable to store the RDDs generated earlier in time
+ @transient private val generatedRDDs = new HashMap[Time, RDD[T]] ()
+
+ // Variable to be set to the first time seen by the RDS (effective time zero)
+ private[stream] var zeroTime: Time = null
+
+ // Variable to specify storage level
+ private var storageLevel: StorageLevel = StorageLevel.NONE
+
+ // Checkpoint level and checkpoint interval
+ private var checkpointLevel: StorageLevel = StorageLevel.NONE // NONE means don't checkpoint
+ private var checkpointInterval: Time = null
+
+ // Change this RDD's storage level
+ def persist(
+ storageLevel: StorageLevel,
+ checkpointLevel: StorageLevel,
+ checkpointInterval: Time): RDS[T] = {
+ if (this.storageLevel != StorageLevel.NONE && this.storageLevel != storageLevel) {
+ // TODO: not sure this is necessary for RDSes
+ throw new UnsupportedOperationException(
+ "Cannot change storage level of an RDS after it was already assigned a level")
+ }
+ this.storageLevel = storageLevel
+ this.checkpointLevel = checkpointLevel
+ this.checkpointInterval = checkpointInterval
+ this
+ }
+
+ def persist(newLevel: StorageLevel): RDS[T] = persist(newLevel, StorageLevel.NONE, null)
+
+ // Turn on the default caching level for this RDD
+ def persist(): RDS[T] = persist(StorageLevel.MEMORY_ONLY_DESER)
+
+ // Turn on the default caching level for this RDD
+ def cache(): RDS[T] = persist()
+
+ def isInitialized = (zeroTime != null)
+
+ // This method initializes the RDS by setting the "zero" time, based on which
+ // the validity of future times is calculated. This method also recursively initializes
+ // its parent RDSs.
+ def initialize(firstInterval: Interval) {
+ if (zeroTime == null) {
+ zeroTime = firstInterval.beginTime
+ }
+ logInfo(this + " initialized")
+ dependencies.foreach(_.initialize(firstInterval))
+ }
+
+ // This method checks whether the 'time' is valid wrt slideTime for generating RDD
+ private def isTimeValid (time: Time): Boolean = {
+ if (!isInitialized)
+ throw new Exception (this.toString + " has not been initialized")
+ if ((time - zeroTime).isMultipleOf(slideTime)) {
+ true
+ } else {
+ false
+ }
+ }
+
+ // This method either retrieves a precomputed RDD of this RDS,
+ // or computes the RDD (if the time is valid)
+ def getOrCompute(time: Time): Option[RDD[T]] = {
+
+ // if RDD was already generated, then retrieve it from HashMap
+ generatedRDDs.get(time) match {
+
+ // If an RDD was already generated and is being reused, then
+ // probably all RDDs in this RDS will be reused and hence should be cached
+ case Some(oldRDD) => Some(oldRDD)
+
+ // if RDD was not generated, and if the time is valid
+ // (based on sliding time of this RDS), then generate the RDD
+ case None =>
+ if (isTimeValid(time)) {
+ compute(time) match {
+ case Some(newRDD) =>
+ if (System.getProperty("spark.fake", "false") != "true" ||
+ newRDD.getStorageLevel == StorageLevel.NONE) {
+ if (checkpointInterval != null && (time - zeroTime).isMultipleOf(checkpointInterval)) {
+ newRDD.persist(checkpointLevel)
+ logInfo("Persisting " + newRDD + " to " + checkpointLevel + " at time " + time)
+ } else if (storageLevel != StorageLevel.NONE) {
+ newRDD.persist(storageLevel)
+ logInfo("Persisting " + newRDD + " to " + storageLevel + " at time " + time)
+ }
+ }
+ generatedRDDs.put(time.copy(), newRDD)
+ Some(newRDD)
+ case None =>
+ None
+ }
+ } else {
+ None
+ }
+ }
+ }
+
+ // This method generates a SparkStream job for the given time
+ // and may require to be overriden by subclasses
+ def generateJob(time: Time): Option[Job] = {
+ getOrCompute(time) match {
+ case Some(rdd) => {
+ val jobFunc = () => {
+ val emptyFunc = { (iterator: Iterator[T]) => {} }
+ ssc.sc.runJob(rdd, emptyFunc)
+ }
+ Some(new Job(time, jobFunc))
+ }
+ case None => None
+ }
+ }
+
+ /* -------------- */
+ /* RDS operations */
+ /* -------------- */
+
+ def map[U: ClassManifest](mapFunc: T => U) = new MappedRDS(this, ssc.sc.clean(mapFunc))
+
+ def flatMap[U: ClassManifest](flatMapFunc: T => Traversable[U]) =
+ new FlatMappedRDS(this, ssc.sc.clean(flatMapFunc))
+
+ def filter(filterFunc: T => Boolean) = new FilteredRDS(this, filterFunc)
+
+ def glom() = new GlommedRDS(this)
+
+ def mapPartitions[U: ClassManifest](mapPartFunc: Iterator[T] => Iterator[U]) =
+ new MapPartitionedRDS(this, ssc.sc.clean(mapPartFunc))
+
+ def reduce(reduceFunc: (T, T) => T) = this.map(x => (1, x)).reduceByKey(reduceFunc, 1).map(_._2)
+
+ def count() = this.map(_ => 1).reduce(_ + _)
+
+ def collect() = this.map(x => (1, x)).groupByKey(1).map(_._2)
+
+ def foreach(foreachFunc: T => Unit) = {
+ val newrds = new PerElementForEachRDS(this, ssc.sc.clean(foreachFunc))
+ ssc.registerOutputStream(newrds)
+ newrds
+ }
+
+ def foreachRDD(foreachFunc: RDD[T] => Unit) = {
+ val newrds = new PerRDDForEachRDS(this, ssc.sc.clean(foreachFunc))
+ ssc.registerOutputStream(newrds)
+ newrds
+ }
+
+ def print() = {
+ def foreachFunc = (rdd: RDD[T], time: Time) => {
+ val first11 = rdd.take(11)
+ println ("-------------------------------------------")
+ println ("Time: " + time)
+ println ("-------------------------------------------")
+ first11.take(10).foreach(println)
+ if (first11.size > 10) println("...")
+ println()
+ }
+ val newrds = new PerRDDForEachRDS(this, ssc.sc.clean(foreachFunc))
+ ssc.registerOutputStream(newrds)
+ newrds
+ }
+
+ def window(windowTime: Time, slideTime: Time) = new WindowedRDS(this, windowTime, slideTime)
+
+ def batch(batchTime: Time) = window(batchTime, batchTime)
+
+ def reduceByWindow(reduceFunc: (T, T) => T, windowTime: Time, slideTime: Time) =
+ this.window(windowTime, slideTime).reduce(reduceFunc)
+
+ def reduceByWindow(
+ reduceFunc: (T, T) => T,
+ invReduceFunc: (T, T) => T,
+ windowTime: Time,
+ slideTime: Time) = {
+ this.map(x => (1, x))
+ .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowTime, slideTime, 1)
+ .map(_._2)
+ }
+
+ def countByWindow(windowTime: Time, slideTime: Time) = {
+ def add(v1: Int, v2: Int) = (v1 + v2)
+ def subtract(v1: Int, v2: Int) = (v1 - v2)
+ this.map(_ => 1).reduceByWindow(add _, subtract _, windowTime, slideTime)
+ }
+
+ def union(that: RDS[T]) = new UnifiedRDS(Array(this, that))
+
+ def register() = ssc.registerOutputStream(this)
+}
+
+
+class PairRDSFunctions[K: ClassManifest, V: ClassManifest](rds: RDS[(K,V)])
+extends Serializable {
+
+ def ssc = rds.ssc
+
+ /* ---------------------------------- */
+ /* RDS operations for key-value pairs */
+ /* ---------------------------------- */
+
+ def groupByKey(numPartitions: Int = 0): ShuffledRDS[K, V, ArrayBuffer[V]] = {
+ def createCombiner(v: V) = ArrayBuffer[V](v)
+ def mergeValue(c: ArrayBuffer[V], v: V) = (c += v)
+ def mergeCombiner(c1: ArrayBuffer[V], c2: ArrayBuffer[V]) = (c1 ++ c2)
+ combineByKey[ArrayBuffer[V]](createCombiner, mergeValue, mergeCombiner, numPartitions)
+ }
+
+ def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int = 0): ShuffledRDS[K, V, V] = {
+ val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
+ combineByKey[V]((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, numPartitions)
+ }
+
+ private def combineByKey[C: ClassManifest](
+ createCombiner: V => C,
+ mergeValue: (C, V) => C,
+ mergeCombiner: (C, C) => C,
+ numPartitions: Int) : ShuffledRDS[K, V, C] = {
+ new ShuffledRDS[K, V, C](rds, createCombiner, mergeValue, mergeCombiner, numPartitions)
+ }
+
+ def groupByKeyAndWindow(
+ windowTime: Time,
+ slideTime: Time,
+ numPartitions: Int = 0): ShuffledRDS[K, V, ArrayBuffer[V]] = {
+ rds.window(windowTime, slideTime).groupByKey(numPartitions)
+ }
+
+ def reduceByKeyAndWindow(
+ reduceFunc: (V, V) => V,
+ windowTime: Time,
+ slideTime: Time,
+ numPartitions: Int = 0): ShuffledRDS[K, V, V] = {
+ rds.window(windowTime, slideTime).reduceByKey(ssc.sc.clean(reduceFunc), numPartitions)
+ }
+
+ // This method is the efficient sliding window reduce operation,
+ // which requires the specification of an inverse reduce function,
+ // so that new elements introduced in the window can be "added" using
+ // reduceFunc to the previous window's result and old elements can be
+ // "subtracted using invReduceFunc.
+ def reduceByKeyAndWindow(
+ reduceFunc: (V, V) => V,
+ invReduceFunc: (V, V) => V,
+ windowTime: Time,
+ slideTime: Time,
+ numPartitions: Int): ReducedWindowedRDS[K, V] = {
+
+ new ReducedWindowedRDS[K, V](
+ rds,
+ ssc.sc.clean(reduceFunc),
+ ssc.sc.clean(invReduceFunc),
+ windowTime,
+ slideTime,
+ numPartitions)
+ }
+}
+
+
+abstract class InputRDS[T: ClassManifest] (
+ val inputName: String,
+ val batchDuration: Time,
+ ssc: SparkStreamContext)
+extends RDS[T](ssc) {
+
+ override def dependencies = List()
+
+ override def slideTime = batchDuration
+
+ def setReference(time: Time, reference: AnyRef)
+}
+
+
+class FileInputRDS(
+ val fileInputName: String,
+ val directory: String,
+ ssc: SparkStreamContext)
+extends InputRDS[String](fileInputName, LongTime(1000), ssc) {
+
+ @transient val generatedFiles = new HashMap[Time,String]
+
+ // TODO(Haoyuan): This is for the performance test.
+ @transient
+ val rdd = ssc.sc.textFile(SparkContext.inputFile,
+ SparkContext.idealPartitions).asInstanceOf[RDD[String]]
+
+ override def compute(validTime: Time): Option[RDD[String]] = {
+ generatedFiles.get(validTime) match {
+ case Some(file) =>
+ logInfo("Reading from file " + file + " for time " + validTime)
+ // Some(ssc.sc.textFile(file).asInstanceOf[RDD[String]])
+ // The following line is for HDFS performance test. Sould comment out the above line.
+ Some(rdd)
+ case None =>
+ throw new Exception(this.toString + ": Reference missing for time " + validTime + "!!!")
+ None
+ }
+ }
+
+ def setReference(time: Time, reference: AnyRef) {
+ generatedFiles += ((time, reference.toString))
+ logInfo("Reference added for time " + time + " - " + reference.toString)
+ }
+}
+
+class NetworkInputRDS[T: ClassManifest](
+ val networkInputName: String,
+ val addresses: Array[InetSocketAddress],
+ batchDuration: Time,
+ ssc: SparkStreamContext)
+extends InputRDS[T](networkInputName, batchDuration, ssc) {
+
+
+ // TODO(Haoyuan): This is for the performance test.
+ @transient var rdd: RDD[T] = null
+
+ if (System.getProperty("spark.fake", "false") == "true") {
+ logInfo("Running initial count to cache fake RDD")
+ rdd = ssc.sc.textFile(SparkContext.inputFile,
+ SparkContext.idealPartitions).asInstanceOf[RDD[T]]
+ val fakeCacheLevel = System.getProperty("spark.fake.cache", "")
+ if (fakeCacheLevel == "MEMORY_ONLY_2") {
+ rdd.persist(StorageLevel.MEMORY_ONLY_2)
+ } else if (fakeCacheLevel == "MEMORY_ONLY_DESER_2") {
+ rdd.persist(StorageLevel.MEMORY_ONLY_2)
+ } else if (fakeCacheLevel != "") {
+ logError("Invalid fake cache level: " + fakeCacheLevel)
+ System.exit(1)
+ }
+ rdd.count()
+ }
+
+ @transient val references = new HashMap[Time,String]
+
+ override def compute(validTime: Time): Option[RDD[T]] = {
+ if (System.getProperty("spark.fake", "false") == "true") {
+ logInfo("Returning fake RDD at " + validTime)
+ return Some(rdd)
+ }
+ references.get(validTime) match {
+ case Some(reference) =>
+ if (reference.startsWith("file") || reference.startsWith("hdfs")) {
+ logInfo("Reading from file " + reference + " for time " + validTime)
+ Some(ssc.sc.textFile(reference).asInstanceOf[RDD[T]])
+ } else {
+ logInfo("Getting from BlockManager " + reference + " for time " + validTime)
+ Some(new BlockRDD(ssc.sc, Array(reference)))
+ }
+ case None =>
+ throw new Exception(this.toString + ": Reference missing for time " + validTime + "!!!")
+ None
+ }
+ }
+
+ def setReference(time: Time, reference: AnyRef) {
+ references += ((time, reference.toString))
+ logInfo("Reference added for time " + time + " - " + reference.toString)
+ }
+}
+
+
+class TestInputRDS(
+ val testInputName: String,
+ batchDuration: Time,
+ ssc: SparkStreamContext)
+extends InputRDS[String](testInputName, batchDuration, ssc) {
+
+ @transient val references = new HashMap[Time,Array[String]]
+
+ override def compute(validTime: Time): Option[RDD[String]] = {
+ references.get(validTime) match {
+ case Some(reference) =>
+ Some(new BlockRDD[String](ssc.sc, reference))
+ case None =>
+ throw new Exception(this.toString + ": Reference missing for time " + validTime + "!!!")
+ None
+ }
+ }
+
+ def setReference(time: Time, reference: AnyRef) {
+ references += ((time, reference.asInstanceOf[Array[String]]))
+ }
+}
+
+
+class MappedRDS[T: ClassManifest, U: ClassManifest] (
+ parent: RDS[T],
+ mapFunc: T => U)
+extends RDS[U](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[U]] = {
+ parent.getOrCompute(validTime).map(_.map[U](mapFunc))
+ }
+}
+
+
+class FlatMappedRDS[T: ClassManifest, U: ClassManifest](
+ parent: RDS[T],
+ flatMapFunc: T => Traversable[U])
+extends RDS[U](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[U]] = {
+ parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
+ }
+}
+
+
+class FilteredRDS[T: ClassManifest](parent: RDS[T], filterFunc: T => Boolean)
+extends RDS[T](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[T]] = {
+ parent.getOrCompute(validTime).map(_.filter(filterFunc))
+ }
+}
+
+class MapPartitionedRDS[T: ClassManifest, U: ClassManifest](
+ parent: RDS[T],
+ mapPartFunc: Iterator[T] => Iterator[U])
+extends RDS[U](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[U]] = {
+ parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc))
+ }
+}
+
+class GlommedRDS[T: ClassManifest](parent: RDS[T]) extends RDS[Array[T]](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[Array[T]]] = {
+ parent.getOrCompute(validTime).map(_.glom())
+ }
+}
+
+
+class ShuffledRDS[K: ClassManifest, V: ClassManifest, C: ClassManifest](
+ parent: RDS[(K,V)],
+ createCombiner: V => C,
+ mergeValue: (C, V) => C,
+ mergeCombiner: (C, C) => C,
+ numPartitions: Int)
+ extends RDS [(K,C)] (parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[(K,C)]] = {
+ parent.getOrCompute(validTime) match {
+ case Some(rdd) =>
+ val newrdd = {
+ if (numPartitions > 0) {
+ rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, numPartitions)
+ } else {
+ rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner)
+ }
+ }
+ Some(newrdd)
+ case None => None
+ }
+ }
+}
+
+
+class UnifiedRDS[T: ClassManifest](parents: Array[RDS[T]])
+extends RDS[T](parents(0).ssc) {
+
+ if (parents.length == 0) {
+ throw new IllegalArgumentException("Empty array of parents")
+ }
+
+ if (parents.map(_.ssc).distinct.size > 1) {
+ throw new IllegalArgumentException("Array of parents have different SparkStreamContexts")
+ }
+
+ if (parents.map(_.slideTime).distinct.size > 1) {
+ throw new IllegalArgumentException("Array of parents have different slide times")
+ }
+
+ override def dependencies = parents.toList
+
+ override def slideTime: Time = parents(0).slideTime
+
+ override def compute(validTime: Time): Option[RDD[T]] = {
+ val rdds = new ArrayBuffer[RDD[T]]()
+ parents.map(_.getOrCompute(validTime)).foreach(_ match {
+ case Some(rdd) => rdds += rdd
+ case None => throw new Exception("Could not generate RDD from a parent for unifying at time " + validTime)
+ })
+ if (rdds.size > 0) {
+ Some(new UnionRDD(ssc.sc, rdds))
+ } else {
+ None
+ }
+ }
+}
+
+
+class PerElementForEachRDS[T: ClassManifest] (
+ parent: RDS[T],
+ foreachFunc: T => Unit)
+extends RDS[Unit](parent.ssc) {
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[Unit]] = None
+
+ override def generateJob(time: Time): Option[Job] = {
+ parent.getOrCompute(time) match {
+ case Some(rdd) =>
+ val jobFunc = () => {
+ val sparkJobFunc = {
+ (iterator: Iterator[T]) => iterator.foreach(foreachFunc)
+ }
+ ssc.sc.runJob(rdd, sparkJobFunc)
+ }
+ Some(new Job(time, jobFunc))
+ case None => None
+ }
+ }
+}
+
+
+class PerRDDForEachRDS[T: ClassManifest] (
+ parent: RDS[T],
+ foreachFunc: (RDD[T], Time) => Unit)
+extends RDS[Unit](parent.ssc) {
+
+ def this(parent: RDS[T], altForeachFunc: (RDD[T]) => Unit) =
+ this(parent, (rdd: RDD[T], time: Time) => altForeachFunc(rdd))
+
+ override def dependencies = List(parent)
+
+ override def slideTime: Time = parent.slideTime
+
+ override def compute(validTime: Time): Option[RDD[Unit]] = None
+
+ override def generateJob(time: Time): Option[Job] = {
+ parent.getOrCompute(time) match {
+ case Some(rdd) =>
+ val jobFunc = () => {
+ foreachFunc(rdd, time)
+ }
+ Some(new Job(time, jobFunc))
+ case None => None
+ }
+ }
+}
diff --git a/streaming/src/main/scala/spark/stream/ReducedWindowedRDS.scala b/streaming/src/main/scala/spark/stream/ReducedWindowedRDS.scala
new file mode 100644
index 0000000000..d47654ccb9
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/ReducedWindowedRDS.scala
@@ -0,0 +1,218 @@
+package spark.stream
+
+import spark.stream.SparkStreamContext._
+
+import spark.RDD
+import spark.UnionRDD
+import spark.CoGroupedRDD
+import spark.HashPartitioner
+import spark.SparkContext._
+import spark.storage.StorageLevel
+
+import scala.collection.mutable.ArrayBuffer
+
+class ReducedWindowedRDS[K: ClassManifest, V: ClassManifest](
+ parent: RDS[(K, V)],
+ reduceFunc: (V, V) => V,
+ invReduceFunc: (V, V) => V,
+ _windowTime: Time,
+ _slideTime: Time,
+ numPartitions: Int)
+extends RDS[(K,V)](parent.ssc) {
+
+ if (!_windowTime.isMultipleOf(parent.slideTime))
+ throw new Exception("The window duration of ReducedWindowedRDS (" + _slideTime + ") " +
+ "must be multiple of the slide duration of parent RDS (" + parent.slideTime + ")")
+
+ if (!_slideTime.isMultipleOf(parent.slideTime))
+ throw new Exception("The slide duration of ReducedWindowedRDS (" + _slideTime + ") " +
+ "must be multiple of the slide duration of parent RDS (" + parent.slideTime + ")")
+
+ val reducedRDS = parent.reduceByKey(reduceFunc, numPartitions)
+ val allowPartialWindows = true
+ //reducedRDS.persist(StorageLevel.MEMORY_ONLY_DESER_2)
+
+ override def dependencies = List(reducedRDS)
+
+ def windowTime: Time = _windowTime
+
+ override def slideTime: Time = _slideTime
+
+ override def persist(
+ storageLevel: StorageLevel,
+ checkpointLevel: StorageLevel,
+ checkpointInterval: Time): RDS[(K,V)] = {
+ super.persist(storageLevel, checkpointLevel, checkpointInterval)
+ reducedRDS.persist(storageLevel, checkpointLevel, checkpointInterval)
+ }
+
+ override def compute(validTime: Time): Option[RDD[(K, V)]] = {
+
+
+ // Notation:
+ // _____________________________
+ // | previous window _________|___________________
+ // |___________________| current window | --------------> Time
+ // |_____________________________|
+ //
+ // |________ _________| |________ _________|
+ // | |
+ // V V
+ // old time steps new time steps
+ //
+ def getAdjustedWindow(endTime: Time, windowTime: Time): Interval = {
+ val beginTime =
+ if (allowPartialWindows && endTime - windowTime < parent.zeroTime) {
+ parent.zeroTime
+ } else {
+ endTime - windowTime
+ }
+ Interval(beginTime, endTime)
+ }
+
+ val currentTime = validTime.copy
+ val currentWindow = getAdjustedWindow(currentTime, windowTime)
+ val previousWindow = getAdjustedWindow(currentTime - slideTime, windowTime)
+
+ logInfo("Current window = " + currentWindow)
+ logInfo("Previous window = " + previousWindow)
+ logInfo("Parent.zeroTime = " + parent.zeroTime)
+
+ if (allowPartialWindows) {
+ if (currentTime - slideTime == parent.zeroTime) {
+ reducedRDS.getOrCompute(currentTime) match {
+ case Some(rdd) => return Some(rdd)
+ case None => throw new Exception("Could not get first reduced RDD for time " + currentTime)
+ }
+ }
+ } else {
+ if (previousWindow.beginTime < parent.zeroTime) {
+ if (currentWindow.beginTime < parent.zeroTime) {
+ return None
+ } else {
+ // If this is the first feasible window, then generate reduced value in the naive manner
+ val reducedRDDs = new ArrayBuffer[RDD[(K, V)]]()
+ var t = currentWindow.endTime
+ while (t > currentWindow.beginTime) {
+ reducedRDS.getOrCompute(t) match {
+ case Some(rdd) => reducedRDDs += rdd
+ case None => throw new Exception("Could not get reduced RDD for time " + t)
+ }
+ t -= reducedRDS.slideTime
+ }
+ if (reducedRDDs.size == 0) {
+ throw new Exception("Could not generate the first RDD for time " + validTime)
+ }
+ return Some(new UnionRDD(ssc.sc, reducedRDDs).reduceByKey(reduceFunc, numPartitions))
+ }
+ }
+ }
+
+ // Get the RDD of the reduced value of the previous window
+ val previousWindowRDD = getOrCompute(previousWindow.endTime) match {
+ case Some(rdd) => rdd.asInstanceOf[RDD[(_, _)]]
+ case None => throw new Exception("Could not get previous RDD for time " + previousWindow.endTime)
+ }
+
+ val oldRDDs = new ArrayBuffer[RDD[(_, _)]]()
+ val newRDDs = new ArrayBuffer[RDD[(_, _)]]()
+
+ // Get the RDDs of the reduced values in "old time steps"
+ var t = currentWindow.beginTime
+ while (t > previousWindow.beginTime) {
+ reducedRDS.getOrCompute(t) match {
+ case Some(rdd) => oldRDDs += rdd.asInstanceOf[RDD[(_, _)]]
+ case None => throw new Exception("Could not get old reduced RDD for time " + t)
+ }
+ t -= reducedRDS.slideTime
+ }
+
+ // Get the RDDs of the reduced values in "new time steps"
+ t = currentWindow.endTime
+ while (t > previousWindow.endTime) {
+ reducedRDS.getOrCompute(t) match {
+ case Some(rdd) => newRDDs += rdd.asInstanceOf[RDD[(_, _)]]
+ case None => throw new Exception("Could not get new reduced RDD for time " + t)
+ }
+ t -= reducedRDS.slideTime
+ }
+
+ val partitioner = new HashPartitioner(numPartitions)
+ val allRDDs = new ArrayBuffer[RDD[(_, _)]]()
+ allRDDs += previousWindowRDD
+ allRDDs ++= oldRDDs
+ allRDDs ++= newRDDs
+
+
+ val numOldRDDs = oldRDDs.size
+ val numNewRDDs = newRDDs.size
+ logInfo("Generated numOldRDDs = " + numOldRDDs + ", numNewRDDs = " + numNewRDDs)
+ logInfo("Generating CoGroupedRDD with " + allRDDs.size + " RDDs")
+ val newRDD = new CoGroupedRDD[K](allRDDs.toSeq, partitioner).asInstanceOf[RDD[(K,Seq[Seq[V]])]].map(x => {
+ val (key, value) = x
+ logDebug("value.size = " + value.size + ", numOldRDDs = " + numOldRDDs + ", numNewRDDs = " + numNewRDDs)
+ if (value.size != 1 + numOldRDDs + numNewRDDs) {
+ throw new Exception("Number of groups not odd!")
+ }
+
+ // old values = reduced values of the "old time steps" that are eliminated from current window
+ // new values = reduced values of the "new time steps" that are introduced to the current window
+ // previous value = reduced value of the previous window
+
+ /*val numOldValues = (value.size - 1) / 2*/
+ // Getting reduced values "old time steps"
+ val oldValues =
+ (0 until numOldRDDs).map(i => value(1 + i)).filter(_.size > 0).map(x => x(0))
+ // Getting reduced values "new time steps"
+ val newValues =
+ (0 until numNewRDDs).map(i => value(1 + numOldRDDs + i)).filter(_.size > 0).map(x => x(0))
+
+ // If reduced value for the key does not exist in previous window, it should not exist in "old time steps"
+ if (value(0).size == 0 && oldValues.size != 0) {
+ throw new Exception("Unexpected: Key exists in old reduced values but not in previous reduced values")
+ }
+
+ // For the key, at least one of "old time steps", "new time steps" and previous window should have reduced values
+ if (value(0).size == 0 && oldValues.size == 0 && newValues.size == 0) {
+ throw new Exception("Unexpected: Key does not exist in any of old, new, or previour reduced values")
+ }
+
+ // Logic to generate the final reduced value for current window:
+ //
+ // If previous window did not have reduced value for the key
+ // Then, return reduced value of "new time steps" as the final value
+ // Else, reduced value exists in previous window
+ // If "old" time steps did not have reduced value for the key
+ // Then, reduce previous window's reduced value with that of "new time steps" for final value
+ // Else, reduced values exists in "old time steps"
+ // If "new values" did not have reduced value for the key
+ // Then, inverse-reduce "old values" from previous window's reduced value for final value
+ // Else, all 3 values exist, combine all of them together
+ //
+ logDebug("# old values = " + oldValues.size + ", # new values = " + newValues)
+ val finalValue = {
+ if (value(0).size == 0) {
+ newValues.reduce(reduceFunc)
+ } else {
+ val prevValue = value(0)(0)
+ logDebug("prev value = " + prevValue)
+ if (oldValues.size == 0) {
+ // assuming newValue.size > 0 (all 3 cannot be zero, as checked earlier)
+ val temp = newValues.reduce(reduceFunc)
+ reduceFunc(prevValue, temp)
+ } else if (newValues.size == 0) {
+ invReduceFunc(prevValue, oldValues.reduce(reduceFunc))
+ } else {
+ val tempValue = invReduceFunc(prevValue, oldValues.reduce(reduceFunc))
+ reduceFunc(tempValue, newValues.reduce(reduceFunc))
+ }
+ }
+ }
+ (key, finalValue)
+ })
+ //newRDD.persist(StorageLevel.MEMORY_ONLY_DESER_2)
+ Some(newRDD)
+ }
+}
+
+
diff --git a/streaming/src/main/scala/spark/stream/Scheduler.scala b/streaming/src/main/scala/spark/stream/Scheduler.scala
new file mode 100644
index 0000000000..38946fef11
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/Scheduler.scala
@@ -0,0 +1,181 @@
+package spark.stream
+
+import spark.SparkEnv
+import spark.Logging
+
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.HashSet
+import scala.collection.mutable.ArrayBuffer
+
+import akka.actor._
+import akka.actor.Actor
+import akka.actor.Actor._
+import akka.util.duration._
+
+sealed trait SchedulerMessage
+case class InputGenerated(inputName: String, interval: Interval, reference: AnyRef = null) extends SchedulerMessage
+case class Test extends SchedulerMessage
+
+class Scheduler(
+ ssc: SparkStreamContext,
+ inputRDSs: Array[InputRDS[_]],
+ outputRDSs: Array[RDS[_]])
+extends Actor with Logging {
+
+ class InputState (inputNames: Array[String]) {
+ val inputsLeft = new HashSet[String]()
+ inputsLeft ++= inputNames
+
+ val startTime = System.currentTimeMillis
+
+ def delay() = System.currentTimeMillis - startTime
+
+ def addGeneratedInput(inputName: String) = inputsLeft -= inputName
+
+ def areAllInputsGenerated() = (inputsLeft.size == 0)
+
+ override def toString(): String = {
+ val left = if (inputsLeft.size == 0) "" else inputsLeft.reduceLeft(_ + ", " + _)
+ return "Inputs left = [ " + left + " ]"
+ }
+ }
+
+
+ initLogging()
+
+ val inputNames = inputRDSs.map(_.inputName).toArray
+ val inputStates = new HashMap[Interval, InputState]()
+ val currentJobs = System.getProperty("spark.stream.currentJobs", "1").toInt
+ val jobManager = new JobManager2(ssc, currentJobs)
+
+ // TODO(Haoyuan): The following line is for performance test only.
+ var cnt: Int = System.getProperty("spark.stream.fake.cnt", "60").toInt
+ var lastInterval: Interval = null
+
+
+ /*remote.register("SparkStreamScheduler", actorOf[Scheduler])*/
+ logInfo("Registered actor on port ")
+
+ /*jobManager.start()*/
+ startStreamReceivers()
+
+ def receive = {
+ case InputGenerated(inputName, interval, reference) => {
+ addGeneratedInput(inputName, interval, reference)
+ }
+ case Test() => logInfo("TEST PASSED")
+ }
+
+ def addGeneratedInput(inputName: String, interval: Interval, reference: AnyRef = null) {
+ logInfo("Input " + inputName + " generated for interval " + interval)
+ inputStates.get(interval) match {
+ case None => inputStates.put(interval, new InputState(inputNames))
+ case _ =>
+ }
+ inputStates(interval).addGeneratedInput(inputName)
+
+ inputRDSs.filter(_.inputName == inputName).foreach(inputRDS => {
+ inputRDS.setReference(interval.endTime, reference)
+ if (inputRDS.isInstanceOf[TestInputRDS]) {
+ TestInputBlockTracker.addBlocks(interval.endTime, reference)
+ }
+ }
+ )
+
+ def getNextInterval(): Option[Interval] = {
+ logDebug("Last interval is " + lastInterval)
+ val readyIntervals = inputStates.filter(_._2.areAllInputsGenerated).keys
+ /*inputState.foreach(println) */
+ logDebug("InputState has " + inputStates.size + " intervals, " + readyIntervals.size + " ready intervals")
+ return readyIntervals.find(lastInterval == null || _.beginTime == lastInterval.endTime)
+ }
+
+ var nextInterval = getNextInterval()
+ var count = 0
+ while(nextInterval.isDefined) {
+ val inputState = inputStates.get(nextInterval.get).get
+ generateRDDsForInterval(nextInterval.get)
+ logInfo("Skew delay for " + nextInterval.get.endTime + " is " + (inputState.delay / 1000.0) + " s")
+ inputStates.remove(nextInterval.get)
+ lastInterval = nextInterval.get
+ nextInterval = getNextInterval()
+ count += 1
+ /*if (nextInterval.size == 0 && inputState.size > 0) {
+ logDebug("Next interval not ready, pending intervals " + inputState.size)
+ }*/
+ }
+ logDebug("RDDs generated for " + count + " intervals")
+
+ /*
+ if (inputState(interval).areAllInputsGenerated) {
+ generateRDDsForInterval(interval)
+ lastInterval = interval
+ inputState.remove(interval)
+ } else {
+ logInfo("All inputs not generated for interval " + interval)
+ }
+ */
+ }
+
+ def generateRDDsForInterval (interval: Interval) {
+ logInfo("Generating RDDs for interval " + interval)
+ outputRDSs.foreach(outputRDS => {
+ if (!outputRDS.isInitialized) outputRDS.initialize(interval)
+ outputRDS.generateJob(interval.endTime) match {
+ case Some(job) => submitJob(job)
+ case None =>
+ }
+ }
+ )
+ // TODO(Haoyuan): This comment is for performance test only.
+ if (System.getProperty("spark.fake", "false") == "true" || System.getProperty("spark.stream.fake", "false") == "true") {
+ cnt -= 1
+ if (cnt <= 0) {
+ logInfo("My time is up! " + cnt)
+ System.exit(1)
+ }
+ }
+ }
+
+ def submitJob(job: Job) {
+ logInfo("Submitting " + job + " to JobManager")
+ /*jobManager ! RunJob(job)*/
+ jobManager.runJob(job)
+ }
+
+ def startStreamReceivers() {
+ val testStreamReceiverNames = new ArrayBuffer[(String, Long)]()
+ inputRDSs.foreach (inputRDS => {
+ inputRDS match {
+ case fileInputRDS: FileInputRDS => {
+ val fileStreamReceiver = new FileStreamReceiver(
+ fileInputRDS.inputName,
+ fileInputRDS.directory,
+ fileInputRDS.batchDuration.asInstanceOf[LongTime].milliseconds)
+ fileStreamReceiver.start()
+ }
+ case networkInputRDS: NetworkInputRDS[_] => {
+ val networkStreamReceiver = new NetworkStreamReceiver(
+ networkInputRDS.inputName,
+ networkInputRDS.batchDuration,
+ 0,
+ ssc,
+ if (ssc.tempDir == null) null else ssc.tempDir.toString)
+ networkStreamReceiver.start()
+ }
+ case testInputRDS: TestInputRDS => {
+ testStreamReceiverNames +=
+ ((testInputRDS.inputName, testInputRDS.batchDuration.asInstanceOf[LongTime].milliseconds))
+ }
+ }
+ })
+ if (testStreamReceiverNames.size > 0) {
+ /*val testStreamCoordinator = new TestStreamCoordinator(testStreamReceiverNames.toArray)*/
+ /*testStreamCoordinator.start()*/
+ val actor = ssc.actorSystem.actorOf(
+ Props(new TestStreamCoordinator(testStreamReceiverNames.toArray)),
+ name = "TestStreamCoordinator")
+ }
+ }
+}
+
diff --git a/streaming/src/main/scala/spark/stream/SenGeneratorForPerformanceTest.scala b/streaming/src/main/scala/spark/stream/SenGeneratorForPerformanceTest.scala
new file mode 100644
index 0000000000..74fd54072f
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/SenGeneratorForPerformanceTest.scala
@@ -0,0 +1,78 @@
+package spark.stream
+
+import scala.util.Random
+import scala.io.Source
+import scala.actors._
+import scala.actors.Actor._
+import scala.actors.remote._
+import scala.actors.remote.RemoteActor._
+
+import java.net.InetSocketAddress
+
+/*import akka.actor.Actor._*/
+/*import akka.actor.ActorRef*/
+
+
+object SenGeneratorForPerformanceTest {
+
+ def printUsage () {
+ println ("Usage: SentenceGenerator <target IP> <target port> <sentence file> [<sentences per second>]")
+ System.exit(0)
+ }
+
+ def main (args: Array[String]) {
+ if (args.length < 3) {
+ printUsage
+ }
+
+ val inputManagerIP = args(0)
+ val inputManagerPort = args(1).toInt
+ val sentenceFile = args(2)
+ val sentencesPerSecond = {
+ if (args.length > 3) args(3).toInt
+ else 10
+ }
+
+ val source = Source.fromFile(sentenceFile)
+ val lines = source.mkString.split ("\n")
+ source.close ()
+
+ try {
+ /*val inputManager = remote.actorFor("InputReceiver-Sentences",*/
+ /* inputManagerIP, inputManagerPort)*/
+ val inputManager = select(Node(inputManagerIP, inputManagerPort), Symbol("InputReceiver-Sentences"))
+ val sleepBetweenSentences = 1000.0 / sentencesPerSecond.toDouble - 1
+ val random = new Random ()
+ println ("Sending " + sentencesPerSecond + " sentences per second to " + inputManagerIP + ":" + inputManagerPort)
+ var lastPrintTime = System.currentTimeMillis()
+ var count = 0
+
+ while (true) {
+ /*if (!inputManager.tryTell (lines (random.nextInt (lines.length))))*/
+ /*throw new Exception ("disconnected")*/
+// inputManager ! lines (random.nextInt (lines.length))
+ for (i <- 0 to sentencesPerSecond) inputManager ! lines (0)
+ println(System.currentTimeMillis / 1000 + " s")
+/* count += 1
+
+ if (System.currentTimeMillis - lastPrintTime >= 1000) {
+ println (count + " sentences sent last second")
+ count = 0
+ lastPrintTime = System.currentTimeMillis
+ }
+
+ Thread.sleep (sleepBetweenSentences.toLong)
+*/
+ val currentMs = System.currentTimeMillis / 1000;
+ Thread.sleep ((currentMs * 1000 + 1000) - System.currentTimeMillis)
+ }
+ } catch {
+ case e: Exception =>
+ /*Thread.sleep (1000)*/
+ }
+ }
+}
+
+
+
+
diff --git a/streaming/src/main/scala/spark/stream/SenderReceiverTest.scala b/streaming/src/main/scala/spark/stream/SenderReceiverTest.scala
new file mode 100644
index 0000000000..69879b621c
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/SenderReceiverTest.scala
@@ -0,0 +1,63 @@
+package spark.stream
+import java.net.{Socket, ServerSocket}
+import java.io.{ByteArrayOutputStream, DataOutputStream, DataInputStream, BufferedInputStream}
+
+object Receiver {
+ def main(args: Array[String]) {
+ val port = args(0).toInt
+ val lsocket = new ServerSocket(port)
+ println("Listening on port " + port )
+ while(true) {
+ val socket = lsocket.accept()
+ (new Thread() {
+ override def run() {
+ val buffer = new Array[Byte](100000)
+ var count = 0
+ val time = System.currentTimeMillis
+ try {
+ val is = new DataInputStream(new BufferedInputStream(socket.getInputStream))
+ var loop = true
+ var string: String = null
+ while((string = is.readUTF) != null) {
+ count += 28
+ }
+ } catch {
+ case e: Exception => e.printStackTrace
+ }
+ val timeTaken = System.currentTimeMillis - time
+ val tput = (count / 1024.0) / (timeTaken / 1000.0)
+ println("Data = " + count + " bytes\nTime = " + timeTaken + " ms\nTput = " + tput + " KB/s")
+ }
+ }).start()
+ }
+ }
+
+}
+
+object Sender {
+
+ def main(args: Array[String]) {
+ try {
+ val host = args(0)
+ val port = args(1).toInt
+ val size = args(2).toInt
+
+ val byteStream = new ByteArrayOutputStream()
+ val stringDataStream = new DataOutputStream(byteStream)
+ (0 until size).foreach(_ => stringDataStream.writeUTF("abcdedfghijklmnopqrstuvwxy"))
+ val bytes = byteStream.toByteArray()
+ println("Generated array of " + bytes.length + " bytes")
+
+ /*val bytes = new Array[Byte](size)*/
+ val socket = new Socket(host, port)
+ val os = socket.getOutputStream
+ os.write(bytes)
+ os.flush
+ socket.close()
+
+ } catch {
+ case e: Exception => e.printStackTrace
+ }
+ }
+}
+
diff --git a/streaming/src/main/scala/spark/stream/SentenceFileGenerator.scala b/streaming/src/main/scala/spark/stream/SentenceFileGenerator.scala
new file mode 100644
index 0000000000..9aa441d9bb
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/SentenceFileGenerator.scala
@@ -0,0 +1,92 @@
+package spark.stream
+
+import spark._
+
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+import scala.io.Source
+
+import java.net.InetSocketAddress
+
+import org.apache.hadoop.fs._
+import org.apache.hadoop.conf._
+import org.apache.hadoop.io._
+import org.apache.hadoop.mapred._
+import org.apache.hadoop.util._
+
+object SentenceFileGenerator {
+
+ def printUsage () {
+ println ("Usage: SentenceFileGenerator <master> <target directory> <# partitions> <sentence file> [<sentences per second>]")
+ System.exit(0)
+ }
+
+ def main (args: Array[String]) {
+ if (args.length < 4) {
+ printUsage
+ }
+
+ val master = args(0)
+ val fs = new Path(args(1)).getFileSystem(new Configuration())
+ val targetDirectory = new Path(args(1)).makeQualified(fs)
+ val numPartitions = args(2).toInt
+ val sentenceFile = args(3)
+ val sentencesPerSecond = {
+ if (args.length > 4) args(4).toInt
+ else 10
+ }
+
+ val source = Source.fromFile(sentenceFile)
+ val lines = source.mkString.split ("\n").toArray
+ source.close ()
+ println("Read " + lines.length + " lines from file " + sentenceFile)
+
+ val sentences = {
+ val buffer = ArrayBuffer[String]()
+ val random = new Random()
+ var i = 0
+ while (i < sentencesPerSecond) {
+ buffer += lines(random.nextInt(lines.length))
+ i += 1
+ }
+ buffer.toArray
+ }
+ println("Generated " + sentences.length + " sentences")
+
+ val sc = new SparkContext(master, "SentenceFileGenerator")
+ val sentencesRDD = sc.parallelize(sentences, numPartitions)
+
+ val tempDirectory = new Path(targetDirectory, "_tmp")
+
+ fs.mkdirs(targetDirectory)
+ fs.mkdirs(tempDirectory)
+
+ var saveTimeMillis = System.currentTimeMillis
+ try {
+ while (true) {
+ val newDir = new Path(targetDirectory, "Sentences-" + saveTimeMillis)
+ val tmpNewDir = new Path(tempDirectory, "Sentences-" + saveTimeMillis)
+ println("Writing to file " + newDir)
+ sentencesRDD.saveAsTextFile(tmpNewDir.toString)
+ fs.rename(tmpNewDir, newDir)
+ saveTimeMillis += 1000
+ val sleepTimeMillis = {
+ val currentTimeMillis = System.currentTimeMillis
+ if (saveTimeMillis < currentTimeMillis) {
+ 0
+ } else {
+ saveTimeMillis - currentTimeMillis
+ }
+ }
+ println("Sleeping for " + sleepTimeMillis + " ms")
+ Thread.sleep(sleepTimeMillis)
+ }
+ } catch {
+ case e: Exception =>
+ }
+ }
+}
+
+
+
+
diff --git a/streaming/src/main/scala/spark/stream/SentenceGenerator.scala b/streaming/src/main/scala/spark/stream/SentenceGenerator.scala
new file mode 100644
index 0000000000..ef66e66047
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/SentenceGenerator.scala
@@ -0,0 +1,103 @@
+package spark.stream
+
+import scala.util.Random
+import scala.io.Source
+import scala.actors._
+import scala.actors.Actor._
+import scala.actors.remote._
+import scala.actors.remote.RemoteActor._
+
+import java.net.InetSocketAddress
+
+
+object SentenceGenerator {
+
+ def printUsage {
+ println ("Usage: SentenceGenerator <target IP> <target port> <sentence file> [<sentences per second>]")
+ System.exit(0)
+ }
+
+ def generateRandomSentences(lines: Array[String], sentencesPerSecond: Int, streamReceiver: AbstractActor) {
+ val sleepBetweenSentences = 1000.0 / sentencesPerSecond.toDouble - 1
+ val random = new Random ()
+
+ try {
+ var lastPrintTime = System.currentTimeMillis()
+ var count = 0
+ while(true) {
+ streamReceiver ! lines(random.nextInt(lines.length))
+ count += 1
+ if (System.currentTimeMillis - lastPrintTime >= 1000) {
+ println (count + " sentences sent last second")
+ count = 0
+ lastPrintTime = System.currentTimeMillis
+ }
+ Thread.sleep(sleepBetweenSentences.toLong)
+ }
+ } catch {
+ case e: Exception =>
+ }
+ }
+
+ def generateSameSentences(lines: Array[String], sentencesPerSecond: Int, streamReceiver: AbstractActor) {
+ try {
+ val numSentences = if (sentencesPerSecond <= 0) {
+ lines.length
+ } else {
+ sentencesPerSecond
+ }
+ var nextSendingTime = System.currentTimeMillis()
+ val pingInterval = if (System.getenv("INTERVAL") != null) {
+ System.getenv("INTERVAL").toInt
+ } else {
+ 2000
+ }
+ while(true) {
+ (0 until numSentences).foreach(i => {
+ streamReceiver ! lines(i % lines.length)
+ })
+ println ("Sent " + numSentences + " sentences")
+ nextSendingTime += pingInterval
+ val sleepTime = nextSendingTime - System.currentTimeMillis
+ if (sleepTime > 0) {
+ println ("Sleeping for " + sleepTime + " ms")
+ Thread.sleep(sleepTime)
+ }
+ }
+ } catch {
+ case e: Exception =>
+ }
+ }
+
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ printUsage
+ }
+
+ val generateRandomly = false
+
+ val streamReceiverIP = args(0)
+ val streamReceiverPort = args(1).toInt
+ val sentenceFile = args(2)
+ val sentencesPerSecond = if (args.length > 3) args(3).toInt else 10
+ val sentenceInputName = if (args.length > 4) args(4) else "Sentences"
+
+ println("Sending " + sentencesPerSecond + " sentences per second to " +
+ streamReceiverIP + ":" + streamReceiverPort + "/NetworkStreamReceiver-" + sentenceInputName)
+ val source = Source.fromFile(sentenceFile)
+ val lines = source.mkString.split ("\n")
+ source.close ()
+
+ val streamReceiver = select(
+ Node(streamReceiverIP, streamReceiverPort),
+ Symbol("NetworkStreamReceiver-" + sentenceInputName))
+ if (generateRandomly) {
+ generateRandomSentences(lines, sentencesPerSecond, streamReceiver)
+ } else {
+ generateSameSentences(lines, sentencesPerSecond, streamReceiver)
+ }
+ }
+}
+
+
+
diff --git a/streaming/src/main/scala/spark/stream/ShuffleTest.scala b/streaming/src/main/scala/spark/stream/ShuffleTest.scala
new file mode 100644
index 0000000000..5ad56f6777
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/ShuffleTest.scala
@@ -0,0 +1,22 @@
+package spark.stream
+import spark.SparkContext
+import SparkContext._
+
+object ShuffleTest {
+ def main(args: Array[String]) {
+
+ if (args.length < 1) {
+ println ("Usage: ShuffleTest <host>")
+ System.exit(1)
+ }
+
+ val sc = new spark.SparkContext(args(0), "ShuffleTest")
+ val rdd = sc.parallelize(1 to 1000, 500).cache
+
+ def time(f: => Unit) { val start = System.nanoTime; f; println((System.nanoTime - start) * 1.0e-6) }
+
+ time { for (i <- 0 until 50) time { rdd.map(x => (x % 100, x)).reduceByKey(_ + _, 10).count } }
+ System.exit(0)
+ }
+}
+
diff --git a/streaming/src/main/scala/spark/stream/SimpleWordCount.scala b/streaming/src/main/scala/spark/stream/SimpleWordCount.scala
new file mode 100644
index 0000000000..c53fe35f44
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/SimpleWordCount.scala
@@ -0,0 +1,30 @@
+package spark.stream
+
+import SparkStreamContext._
+
+import scala.util.Sorting
+
+object SimpleWordCount {
+
+ def main (args: Array[String]) {
+
+ if (args.length < 1) {
+ println ("Usage: SparkStreamContext <host> [<temp directory>]")
+ System.exit(1)
+ }
+
+ val ssc = new SparkStreamContext(args(0), "WordCount")
+ if (args.length > 1) {
+ ssc.setTempDir(args(1))
+ }
+ val sentences = ssc.readNetworkStream[String]("Sentences", Array("localhost:55119"), 2000)
+ /*sentences.print*/
+
+ val words = sentences.flatMap(_.split(" "))
+
+ val counts = words.map(x => (x, 1)).reduceByKey(_ + _, 1)
+ counts.print
+
+ ssc.run
+ }
+}
diff --git a/streaming/src/main/scala/spark/stream/SimpleWordCount2.scala b/streaming/src/main/scala/spark/stream/SimpleWordCount2.scala
new file mode 100644
index 0000000000..1a2c67cd4d
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/SimpleWordCount2.scala
@@ -0,0 +1,51 @@
+package spark.stream
+
+import spark.SparkContext
+import SparkContext._
+import SparkStreamContext._
+
+import scala.util.Sorting
+
+object SimpleWordCount2 {
+
+ def moreWarmup(sc: SparkContext) {
+ (0 until 20).foreach {i =>
+ sc.parallelize(1 to 20000000, 500)
+ .map(_ % 100).map(_.toString)
+ .map(x => (x, 1)).reduceByKey(_ + _, 10)
+ .collect()
+ }
+ }
+
+ def main (args: Array[String]) {
+
+ if (args.length < 2) {
+ println ("Usage: SimpleWordCount2 <host> <# sentence streams>")
+ System.exit(1)
+ }
+
+ val ssc = new SparkStreamContext(args(0), "SimpleWordCount2")
+
+ val numSentenceStreams = if (args.length > 1) args(1).toInt else 1
+ if (args.length > 2) {
+ ssc.setTempDir(args(2))
+ }
+
+ GrepCount2.warmConnectionManagers(ssc.sc)
+ moreWarmup(ssc.sc)
+
+ val sentences = new UnifiedRDS(
+ (1 to numSentenceStreams).map(i => ssc.readTestStream("Sentences-" + i, 1000)).toArray
+ )
+
+
+ val words = sentences.flatMap(_.split(" "))
+
+ val counts = words.map(x => (x, 1)).reduceByKey(_ + _, 10)
+ counts.foreachRDD(_.collect())
+ /*words.foreachRDD(_.countByValue())*/
+
+ ssc.run
+ }
+}
+
diff --git a/streaming/src/main/scala/spark/stream/SimpleWordCount2_Special.scala b/streaming/src/main/scala/spark/stream/SimpleWordCount2_Special.scala
new file mode 100644
index 0000000000..9003a5dbb3
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/SimpleWordCount2_Special.scala
@@ -0,0 +1,83 @@
+package spark.stream
+
+import spark.SparkContext
+import SparkContext._
+import SparkStreamContext._
+
+import scala.collection.JavaConversions.mapAsScalaMap
+import scala.util.Sorting
+import java.lang.{Long => JLong}
+
+object SimpleWordCount2_Special {
+
+ def moreWarmup(sc: SparkContext) {
+ (0 until 20).foreach {i =>
+ sc.parallelize(1 to 20000000, 500)
+ .map(_ % 100).map(_.toString)
+ .map(x => (x, 1)).reduceByKey(_ + _, 10)
+ .collect()
+ }
+ }
+
+ def main (args: Array[String]) {
+
+ if (args.length < 2) {
+ println ("Usage: SimpleWordCount2 <host> <# sentence streams>")
+ System.exit(1)
+ }
+
+ val ssc = new SparkStreamContext(args(0), "SimpleWordCount2")
+
+ val numSentenceStreams = if (args.length > 1) args(1).toInt else 1
+ if (args.length > 2) {
+ ssc.setTempDir(args(2))
+ }
+
+ GrepCount2.warmConnectionManagers(ssc.sc)
+ moreWarmup(ssc.sc)
+
+ val sentences = new UnifiedRDS(
+ (1 to numSentenceStreams).map(i => ssc.readTestStream("Sentences-" + i, 400)).toArray
+ )
+
+
+ def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, JLong)] = {
+ val map = new java.util.HashMap[String, JLong]
+ var i = 0
+ var j = 0
+ while (iter.hasNext) {
+ val s = iter.next()
+ i = 0
+ while (i < s.length) {
+ j = i
+ while (j < s.length && s.charAt(j) != ' ') {
+ j += 1
+ }
+ if (j > i) {
+ val w = s.substring(i, j)
+ val c = map.get(w)
+ if (c == null) {
+ map.put(w, 1)
+ } else {
+ map.put(w, c + 1)
+ }
+ }
+ i = j
+ while (i < s.length && s.charAt(i) == ' ') {
+ i += 1
+ }
+ }
+ }
+ map.toIterator
+ }
+
+
+ /*val words = sentences.flatMap(_.split(" "))*/
+ /*val counts = words.map(x => (x, 1)).reduceByKey(_ + _, 10)*/
+ val counts = sentences.mapPartitions(splitAndCountPartitions).reduceByKey(_ + _, 10)
+ counts.foreachRDD(_.collect())
+
+ ssc.run
+ }
+}
+
diff --git a/streaming/src/main/scala/spark/stream/SparkStreamContext.scala b/streaming/src/main/scala/spark/stream/SparkStreamContext.scala
new file mode 100644
index 0000000000..0e65196e46
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/SparkStreamContext.scala
@@ -0,0 +1,105 @@
+package spark.stream
+
+import spark.SparkContext
+import spark.SparkEnv
+import spark.Utils
+import spark.Logging
+
+import scala.collection.mutable.ArrayBuffer
+
+import java.net.InetSocketAddress
+import java.io.IOException
+import java.util.UUID
+
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.conf.Configuration
+
+import akka.actor._
+import akka.actor.Actor
+import akka.util.duration._
+
+class SparkStreamContext (
+ master: String,
+ frameworkName: String,
+ val sparkHome: String = null,
+ val jars: Seq[String] = Nil)
+ extends Logging {
+
+ initLogging()
+
+ val sc = new SparkContext(master, frameworkName, sparkHome, jars)
+ val env = SparkEnv.get
+ val actorSystem = env.actorSystem
+
+ @transient val inputRDSs = new ArrayBuffer[InputRDS[_]]()
+ @transient val outputRDSs = new ArrayBuffer[RDS[_]]()
+
+ var tempDirRoot: String = null
+ var tempDir: Path = null
+
+ def readNetworkStream[T: ClassManifest](
+ name: String,
+ addresses: Array[InetSocketAddress],
+ batchDuration: Time): RDS[T] = {
+
+ val inputRDS = new NetworkInputRDS[T](name, addresses, batchDuration, this)
+ inputRDSs += inputRDS
+ inputRDS
+ }
+
+ def readNetworkStream[T: ClassManifest](
+ name: String,
+ addresses: Array[String],
+ batchDuration: Long): RDS[T] = {
+
+ def stringToInetSocketAddress (str: String): InetSocketAddress = {
+ val parts = str.split(":")
+ if (parts.length != 2) {
+ throw new IllegalArgumentException ("Address format error")
+ }
+ new InetSocketAddress(parts(0), parts(1).toInt)
+ }
+
+ readNetworkStream(
+ name,
+ addresses.map(stringToInetSocketAddress).toArray,
+ LongTime(batchDuration))
+ }
+
+ def readFileStream(name: String, directory: String): RDS[String] = {
+ val path = new Path(directory)
+ val fs = path.getFileSystem(new Configuration())
+ val qualPath = path.makeQualified(fs)
+ val inputRDS = new FileInputRDS(name, qualPath.toString, this)
+ inputRDSs += inputRDS
+ inputRDS
+ }
+
+ def readTestStream(name: String, batchDuration: Long): RDS[String] = {
+ val inputRDS = new TestInputRDS(name, LongTime(batchDuration), this)
+ inputRDSs += inputRDS
+ inputRDS
+ }
+
+ def registerOutputStream (outputRDS: RDS[_]) {
+ outputRDSs += outputRDS
+ }
+
+ def setTempDir(dir: String) {
+ tempDirRoot = dir
+ }
+
+ def run () {
+ val ctxt = this
+ val actor = actorSystem.actorOf(
+ Props(new Scheduler(ctxt, inputRDSs.toArray, outputRDSs.toArray)),
+ name = "SparkStreamScheduler")
+ logInfo("Registered actor")
+ actorSystem.awaitTermination()
+ }
+}
+
+object SparkStreamContext {
+ implicit def rdsToPairRdsFunctions [K: ClassManifest, V: ClassManifest] (rds: RDS[(K,V)]) =
+ new PairRDSFunctions (rds)
+}
diff --git a/streaming/src/main/scala/spark/stream/TestGenerator.scala b/streaming/src/main/scala/spark/stream/TestGenerator.scala
new file mode 100644
index 0000000000..738ce17452
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/TestGenerator.scala
@@ -0,0 +1,107 @@
+package spark.stream
+
+import scala.util.Random
+import scala.io.Source
+import scala.actors._
+import scala.actors.Actor._
+import scala.actors.remote._
+import scala.actors.remote.RemoteActor._
+
+import java.net.InetSocketAddress
+
+
+object TestGenerator {
+
+ def printUsage {
+ println ("Usage: SentenceGenerator <target IP> <target port> <sentence file> [<sentences per second>]")
+ System.exit(0)
+ }
+ /*
+ def generateRandomSentences(lines: Array[String], sentencesPerSecond: Int, streamReceiver: AbstractActor) {
+ val sleepBetweenSentences = 1000.0 / sentencesPerSecond.toDouble - 1
+ val random = new Random ()
+
+ try {
+ var lastPrintTime = System.currentTimeMillis()
+ var count = 0
+ while(true) {
+ streamReceiver ! lines(random.nextInt(lines.length))
+ count += 1
+ if (System.currentTimeMillis - lastPrintTime >= 1000) {
+ println (count + " sentences sent last second")
+ count = 0
+ lastPrintTime = System.currentTimeMillis
+ }
+ Thread.sleep(sleepBetweenSentences.toLong)
+ }
+ } catch {
+ case e: Exception =>
+ }
+ }*/
+
+ def generateSameSentences(lines: Array[String], sentencesPerSecond: Int, streamReceiver: AbstractActor) {
+ try {
+ val numSentences = if (sentencesPerSecond <= 0) {
+ lines.length
+ } else {
+ sentencesPerSecond
+ }
+ val sentences = lines.take(numSentences).toArray
+
+ var nextSendingTime = System.currentTimeMillis()
+ val sendAsArray = true
+ while(true) {
+ if (sendAsArray) {
+ println("Sending as array")
+ streamReceiver !? sentences
+ } else {
+ println("Sending individually")
+ sentences.foreach(sentence => {
+ streamReceiver !? sentence
+ })
+ }
+ println ("Sent " + numSentences + " sentences in " + (System.currentTimeMillis - nextSendingTime) + " ms")
+ nextSendingTime += 1000
+ val sleepTime = nextSendingTime - System.currentTimeMillis
+ if (sleepTime > 0) {
+ println ("Sleeping for " + sleepTime + " ms")
+ Thread.sleep(sleepTime)
+ }
+ }
+ } catch {
+ case e: Exception =>
+ }
+ }
+
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ printUsage
+ }
+
+ val generateRandomly = false
+
+ val streamReceiverIP = args(0)
+ val streamReceiverPort = args(1).toInt
+ val sentenceFile = args(2)
+ val sentencesPerSecond = if (args.length > 3) args(3).toInt else 10
+ val sentenceInputName = if (args.length > 4) args(4) else "Sentences"
+
+ println("Sending " + sentencesPerSecond + " sentences per second to " +
+ streamReceiverIP + ":" + streamReceiverPort + "/NetworkStreamReceiver-" + sentenceInputName)
+ val source = Source.fromFile(sentenceFile)
+ val lines = source.mkString.split ("\n")
+ source.close ()
+
+ val streamReceiver = select(
+ Node(streamReceiverIP, streamReceiverPort),
+ Symbol("NetworkStreamReceiver-" + sentenceInputName))
+ if (generateRandomly) {
+ /*generateRandomSentences(lines, sentencesPerSecond, streamReceiver)*/
+ } else {
+ generateSameSentences(lines, sentencesPerSecond, streamReceiver)
+ }
+ }
+}
+
+
+
diff --git a/streaming/src/main/scala/spark/stream/TestGenerator2.scala b/streaming/src/main/scala/spark/stream/TestGenerator2.scala
new file mode 100644
index 0000000000..ceb4730e72
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/TestGenerator2.scala
@@ -0,0 +1,119 @@
+package spark.stream
+
+import scala.util.Random
+import scala.io.Source
+import scala.actors._
+import scala.actors.Actor._
+import scala.actors.remote._
+import scala.actors.remote.RemoteActor._
+
+import java.io.{DataOutputStream, ByteArrayOutputStream, DataInputStream}
+import java.net.Socket
+
+object TestGenerator2 {
+
+ def printUsage {
+ println ("Usage: SentenceGenerator <target IP> <target port> <sentence file> [<sentences per second>]")
+ System.exit(0)
+ }
+
+ def sendSentences(streamReceiverHost: String, streamReceiverPort: Int, numSentences: Int, bytes: Array[Byte], intervalTime: Long){
+ try {
+ println("Connecting to " + streamReceiverHost + ":" + streamReceiverPort)
+ val socket = new Socket(streamReceiverHost, streamReceiverPort)
+
+ println("Sending " + numSentences+ " sentences / " + (bytes.length / 1024.0 / 1024.0) + " MB per " + intervalTime + " ms to " + streamReceiverHost + ":" + streamReceiverPort )
+ val currentTime = System.currentTimeMillis
+ var targetTime = (currentTime / intervalTime + 1).toLong * intervalTime
+ Thread.sleep(targetTime - currentTime)
+
+ while(true) {
+ val startTime = System.currentTimeMillis()
+ println("Sending at " + startTime + " ms with delay of " + (startTime - targetTime) + " ms")
+ val socketOutputStream = socket.getOutputStream
+ val parts = 10
+ (0 until parts).foreach(i => {
+ val partStartTime = System.currentTimeMillis
+
+ val offset = (i * bytes.length / parts).toInt
+ val len = math.min(((i + 1) * bytes.length / parts).toInt - offset, bytes.length)
+ socketOutputStream.write(bytes, offset, len)
+ socketOutputStream.flush()
+ val partFinishTime = System.currentTimeMillis
+ println("Sending part " + i + " of " + len + " bytes took " + (partFinishTime - partStartTime) + " ms")
+ val sleepTime = math.max(0, 1000 / parts - (partFinishTime - partStartTime) - 1)
+ Thread.sleep(sleepTime)
+ })
+
+ socketOutputStream.flush()
+ /*val socketInputStream = new DataInputStream(socket.getInputStream)*/
+ /*val reply = socketInputStream.readUTF()*/
+ val finishTime = System.currentTimeMillis()
+ println ("Sent " + bytes.length + " bytes in " + (finishTime - startTime) + " ms for interval [" + targetTime + ", " + (targetTime + intervalTime) + "]")
+ /*println("Received = " + reply)*/
+ targetTime = targetTime + intervalTime
+ val sleepTime = (targetTime - finishTime) + 10
+ if (sleepTime > 0) {
+ println("Sleeping for " + sleepTime + " ms")
+ Thread.sleep(sleepTime)
+ } else {
+ println("############################")
+ println("###### Skipping sleep ######")
+ println("############################")
+ }
+ }
+ } catch {
+ case e: Exception => println(e)
+ }
+ println("Stopped sending")
+ }
+
+ def main(args: Array[String]) {
+ if (args.length < 4) {
+ printUsage
+ }
+
+ val streamReceiverHost = args(0)
+ val streamReceiverPort = args(1).toInt
+ val sentenceFile = args(2)
+ val intervalTime = args(3).toLong
+ val sentencesPerInterval = if (args.length > 4) args(4).toInt else 0
+
+ println("Reading the file " + sentenceFile)
+ val source = Source.fromFile(sentenceFile)
+ val lines = source.mkString.split ("\n")
+ source.close()
+
+ val numSentences = if (sentencesPerInterval <= 0) {
+ lines.length
+ } else {
+ sentencesPerInterval
+ }
+
+ println("Generating sentences")
+ val sentences: Array[String] = if (numSentences <= lines.length) {
+ lines.take(numSentences).toArray
+ } else {
+ (0 until numSentences).map(i => lines(i % lines.length)).toArray
+ }
+
+ println("Converting to byte array")
+ val byteStream = new ByteArrayOutputStream()
+ val stringDataStream = new DataOutputStream(byteStream)
+ /*stringDataStream.writeInt(sentences.size)*/
+ sentences.foreach(stringDataStream.writeUTF)
+ val bytes = byteStream.toByteArray()
+ stringDataStream.close()
+ println("Generated array of " + bytes.length + " bytes")
+
+ /*while(true) { */
+ sendSentences(streamReceiverHost, streamReceiverPort, numSentences, bytes, intervalTime)
+ /*println("Sleeping for 5 seconds")*/
+ /*Thread.sleep(5000)*/
+ /*System.gc()*/
+ /*}*/
+ }
+}
+
+
+
diff --git a/streaming/src/main/scala/spark/stream/TestGenerator4.scala b/streaming/src/main/scala/spark/stream/TestGenerator4.scala
new file mode 100644
index 0000000000..edeb969d7c
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/TestGenerator4.scala
@@ -0,0 +1,244 @@
+package spark.stream
+
+import spark.Logging
+
+import scala.util.Random
+import scala.io.Source
+import scala.collection.mutable.{ArrayBuffer, Queue}
+
+import java.net._
+import java.io._
+import java.nio._
+import java.nio.charset._
+import java.nio.channels._
+
+import it.unimi.dsi.fastutil.io._
+
+class TestGenerator4(targetHost: String, targetPort: Int, sentenceFile: String, intervalDuration: Long, sentencesPerInterval: Int)
+extends Logging {
+
+ class SendingConnectionHandler(host: String, port: Int, generator: TestGenerator4)
+ extends ConnectionHandler(host, port, true) {
+
+ val buffers = new ArrayBuffer[ByteBuffer]
+ val newBuffers = new Queue[ByteBuffer]
+ var activeKey: SelectionKey = null
+
+ def send(buffer: ByteBuffer) {
+ logDebug("Sending: " + buffer)
+ newBuffers.synchronized {
+ newBuffers.enqueue(buffer)
+ }
+ selector.wakeup()
+ buffer.synchronized {
+ buffer.wait()
+ }
+ }
+
+ override def ready(key: SelectionKey) {
+ logDebug("Ready")
+ activeKey = key
+ val channel = key.channel.asInstanceOf[SocketChannel]
+ channel.register(selector, SelectionKey.OP_WRITE)
+ generator.startSending()
+ }
+
+ override def preSelect() {
+ newBuffers.synchronized {
+ while(!newBuffers.isEmpty) {
+ val buffer = newBuffers.dequeue
+ buffers += buffer
+ logDebug("Added: " + buffer)
+ changeInterest(activeKey, SelectionKey.OP_WRITE)
+ }
+ }
+ }
+
+ override def write(key: SelectionKey) {
+ try {
+ /*while(true) {*/
+ val channel = key.channel.asInstanceOf[SocketChannel]
+ if (buffers.size > 0) {
+ val buffer = buffers(0)
+ val newBuffer = buffer.slice()
+ newBuffer.limit(math.min(newBuffer.remaining, 32768))
+ val bytesWritten = channel.write(newBuffer)
+ buffer.position(buffer.position + bytesWritten)
+ if (bytesWritten == 0) return
+ if (buffer.remaining == 0) {
+ buffers -= buffer
+ buffer.synchronized {
+ buffer.notify()
+ }
+ }
+ /*changeInterest(key, SelectionKey.OP_WRITE)*/
+ } else {
+ changeInterest(key, 0)
+ }
+ /*}*/
+ } catch {
+ case e: IOException => {
+ if (e.toString.contains("pipe") || e.toString.contains("reset")) {
+ logError("Connection broken")
+ } else {
+ logError("Connection error", e)
+ }
+ close(key)
+ }
+ }
+ }
+
+ override def close(key: SelectionKey) {
+ buffers.clear()
+ super.close(key)
+ }
+ }
+
+ initLogging()
+
+ val connectionHandler = new SendingConnectionHandler(targetHost, targetPort, this)
+ var sendingThread: Thread = null
+ var sendCount = 0
+ val sendBatches = 5
+
+ def run() {
+ logInfo("Connection handler started")
+ connectionHandler.start()
+ connectionHandler.join()
+ if (sendingThread != null && !sendingThread.isInterrupted) {
+ sendingThread.interrupt
+ }
+ logInfo("Connection handler stopped")
+ }
+
+ def startSending() {
+ sendingThread = new Thread() {
+ override def run() {
+ logInfo("STARTING TO SEND")
+ sendSentences()
+ logInfo("SENDING STOPPED AFTER " + sendCount)
+ connectionHandler.interrupt()
+ }
+ }
+ sendingThread.start()
+ }
+
+ def stopSending() {
+ sendingThread.interrupt()
+ }
+
+ def sendSentences() {
+ logInfo("Reading the file " + sentenceFile)
+ val source = Source.fromFile(sentenceFile)
+ val lines = source.mkString.split ("\n")
+ source.close()
+
+ val numSentences = if (sentencesPerInterval <= 0) {
+ lines.length
+ } else {
+ sentencesPerInterval
+ }
+
+ logInfo("Generating sentence buffer")
+ val sentences: Array[String] = if (numSentences <= lines.length) {
+ lines.take(numSentences).toArray
+ } else {
+ (0 until numSentences).map(i => lines(i % lines.length)).toArray
+ }
+
+ /*
+ val sentences: Array[String] = if (numSentences <= lines.length) {
+ lines.take((numSentences / sendBatches).toInt).toArray
+ } else {
+ (0 until (numSentences/sendBatches)).map(i => lines(i % lines.length)).toArray
+ }*/
+
+
+ val serializer = new spark.KryoSerializer().newInstance()
+ val byteStream = new FastByteArrayOutputStream(100 * 1024 * 1024)
+ serializer.serializeStream(byteStream).writeAll(sentences.toIterator.asInstanceOf[Iterator[Any]]).close()
+ byteStream.trim()
+ val sentenceBuffer = ByteBuffer.wrap(byteStream.array)
+
+ logInfo("Sending " + numSentences+ " sentences / " + sentenceBuffer.limit + " bytes per " + intervalDuration + " ms to " + targetHost + ":" + targetPort )
+ val currentTime = System.currentTimeMillis
+ var targetTime = (currentTime / intervalDuration + 1).toLong * intervalDuration
+ Thread.sleep(targetTime - currentTime)
+
+ val totalBytes = sentenceBuffer.limit
+
+ while(true) {
+ val batchesInCurrentInterval = sendBatches // if (sendCount < 10) 1 else sendBatches
+
+ val startTime = System.currentTimeMillis()
+ logDebug("Sending # " + sendCount + " at " + startTime + " ms with delay of " + (startTime - targetTime) + " ms")
+
+ (0 until batchesInCurrentInterval).foreach(i => {
+ try {
+ val position = (i * totalBytes / sendBatches).toInt
+ val limit = if (i == sendBatches - 1) {
+ totalBytes
+ } else {
+ ((i + 1) * totalBytes / sendBatches).toInt - 1
+ }
+
+ val partStartTime = System.currentTimeMillis
+ sentenceBuffer.limit(limit)
+ connectionHandler.send(sentenceBuffer)
+ val partFinishTime = System.currentTimeMillis
+ val sleepTime = math.max(0, intervalDuration / sendBatches - (partFinishTime - partStartTime) - 1)
+ Thread.sleep(sleepTime)
+
+ } catch {
+ case ie: InterruptedException => return
+ case e: Exception => e.printStackTrace()
+ }
+ })
+ sentenceBuffer.rewind()
+
+ val finishTime = System.currentTimeMillis()
+ /*logInfo ("Sent " + sentenceBuffer.limit + " bytes in " + (finishTime - startTime) + " ms")*/
+ targetTime = targetTime + intervalDuration //+ (if (sendCount < 3) 1000 else 0)
+
+ val sleepTime = (targetTime - finishTime) + 20
+ if (sleepTime > 0) {
+ logInfo("Sleeping for " + sleepTime + " ms")
+ Thread.sleep(sleepTime)
+ } else {
+ logInfo("###### Skipping sleep ######")
+ }
+ if (Thread.currentThread.isInterrupted) {
+ return
+ }
+ sendCount += 1
+ }
+ }
+}
+
+object TestGenerator4 {
+ def printUsage {
+ println("Usage: TestGenerator4 <target IP> <target port> <sentence file> <interval duration> [<sentences per second>]")
+ System.exit(0)
+ }
+
+ def main(args: Array[String]) {
+ println("GENERATOR STARTED")
+ if (args.length < 4) {
+ printUsage
+ }
+
+
+ val streamReceiverHost = args(0)
+ val streamReceiverPort = args(1).toInt
+ val sentenceFile = args(2)
+ val intervalDuration = args(3).toLong
+ val sentencesPerInterval = if (args.length > 4) args(4).toInt else 0
+
+ while(true) {
+ val generator = new TestGenerator4(streamReceiverHost, streamReceiverPort, sentenceFile, intervalDuration, sentencesPerInterval)
+ generator.run()
+ Thread.sleep(2000)
+ }
+ println("GENERATOR STOPPED")
+ }
+}
diff --git a/streaming/src/main/scala/spark/stream/TestInputBlockTracker.scala b/streaming/src/main/scala/spark/stream/TestInputBlockTracker.scala
new file mode 100644
index 0000000000..da3b964407
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/TestInputBlockTracker.scala
@@ -0,0 +1,42 @@
+package spark.stream
+import spark.Logging
+import scala.collection.mutable.{ArrayBuffer, HashMap}
+
+object TestInputBlockTracker extends Logging {
+ initLogging()
+ val allBlockIds = new HashMap[Time, ArrayBuffer[String]]()
+
+ def addBlocks(intervalEndTime: Time, reference: AnyRef) {
+ allBlockIds.getOrElseUpdate(intervalEndTime, new ArrayBuffer[String]()) ++= reference.asInstanceOf[Array[String]]
+ }
+
+ def setEndTime(intervalEndTime: Time) {
+ try {
+ val endTime = System.currentTimeMillis
+ allBlockIds.get(intervalEndTime) match {
+ case Some(blockIds) => {
+ val numBlocks = blockIds.size
+ var totalDelay = 0d
+ blockIds.foreach(blockId => {
+ val inputTime = getInputTime(blockId)
+ val delay = (endTime - inputTime) / 1000.0
+ totalDelay += delay
+ logInfo("End-to-end delay for block " + blockId + " is " + delay + " s")
+ })
+ logInfo("Average end-to-end delay for time " + intervalEndTime + " is " + (totalDelay / numBlocks) + " s")
+ allBlockIds -= intervalEndTime
+ }
+ case None => throw new Exception("Unexpected")
+ }
+ } catch {
+ case e: Exception => logError(e.toString)
+ }
+ }
+
+ def getInputTime(blockId: String): Long = {
+ val parts = blockId.split("-")
+ /*logInfo(blockId + " -> " + parts(4)) */
+ parts(4).toLong
+ }
+}
+
diff --git a/streaming/src/main/scala/spark/stream/TestStreamCoordinator.scala b/streaming/src/main/scala/spark/stream/TestStreamCoordinator.scala
new file mode 100644
index 0000000000..add166fbd9
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/TestStreamCoordinator.scala
@@ -0,0 +1,38 @@
+package spark.stream
+
+import spark.Logging
+
+import akka.actor._
+import akka.actor.Actor
+import akka.actor.Actor._
+
+sealed trait TestStreamCoordinatorMessage
+case class GetStreamDetails extends TestStreamCoordinatorMessage
+case class GotStreamDetails(name: String, duration: Long) extends TestStreamCoordinatorMessage
+case class TestStarted extends TestStreamCoordinatorMessage
+
+class TestStreamCoordinator(streamDetails: Array[(String, Long)]) extends Actor with Logging {
+
+ var index = 0
+
+ initLogging()
+
+ logInfo("Created")
+
+ def receive = {
+ case TestStarted => {
+ sender ! "OK"
+ }
+
+ case GetStreamDetails => {
+ val streamDetail = if (index >= streamDetails.length) null else streamDetails(index)
+ sender ! GotStreamDetails(streamDetail._1, streamDetail._2)
+ index += 1
+ if (streamDetail != null) {
+ logInfo("Allocated " + streamDetail._1 + " (" + index + "/" + streamDetails.length + ")" )
+ }
+ }
+ }
+
+}
+
diff --git a/streaming/src/main/scala/spark/stream/TestStreamReceiver3.scala b/streaming/src/main/scala/spark/stream/TestStreamReceiver3.scala
new file mode 100644
index 0000000000..9cc342040b
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/TestStreamReceiver3.scala
@@ -0,0 +1,420 @@
+package spark.stream
+
+import spark._
+import spark.storage._
+import spark.util.AkkaUtils
+
+import scala.math._
+import scala.collection.mutable.{Queue, HashMap, ArrayBuffer, SynchronizedMap}
+
+import akka.actor._
+import akka.actor.Actor
+import akka.dispatch._
+import akka.pattern.ask
+import akka.util.duration._
+
+import java.io.DataInputStream
+import java.io.BufferedInputStream
+import java.net.Socket
+import java.net.ServerSocket
+import java.util.LinkedHashMap
+
+import org.apache.hadoop.fs._
+import org.apache.hadoop.conf._
+import org.apache.hadoop.io._
+import org.apache.hadoop.mapred._
+import org.apache.hadoop.util._
+
+import spark.Utils
+
+
+class TestStreamReceiver3(actorSystem: ActorSystem, blockManager: BlockManager)
+extends Thread with Logging {
+
+
+ class DataHandler(
+ inputName: String,
+ longIntervalDuration: LongTime,
+ shortIntervalDuration: LongTime,
+ blockManager: BlockManager
+ )
+ extends Logging {
+
+ class Block(var id: String, var shortInterval: Interval) {
+ val data = ArrayBuffer[String]()
+ var pushed = false
+ def longInterval = getLongInterval(shortInterval)
+ def empty() = (data.size == 0)
+ def += (str: String) = (data += str)
+ override def toString() = "Block " + id
+ }
+
+ class Bucket(val longInterval: Interval) {
+ val blocks = new ArrayBuffer[Block]()
+ var filled = false
+ def += (block: Block) = blocks += block
+ def empty() = (blocks.size == 0)
+ def ready() = (filled && !blocks.exists(! _.pushed))
+ def blockIds() = blocks.map(_.id).toArray
+ override def toString() = "Bucket [" + longInterval + ", " + blocks.size + " blocks]"
+ }
+
+ initLogging()
+
+ val shortIntervalDurationMillis = shortIntervalDuration.asInstanceOf[LongTime].milliseconds
+ val longIntervalDurationMillis = longIntervalDuration.asInstanceOf[LongTime].milliseconds
+
+ var currentBlock: Block = null
+ var currentBucket: Bucket = null
+
+ val blocksForPushing = new Queue[Block]()
+ val buckets = new HashMap[Interval, Bucket]() with SynchronizedMap[Interval, Bucket]
+
+ val blockUpdatingThread = new Thread() { override def run() { keepUpdatingCurrentBlock() } }
+ val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
+
+ def start() {
+ blockUpdatingThread.start()
+ blockPushingThread.start()
+ }
+
+ def += (data: String) = addData(data)
+
+ def addData(data: String) {
+ if (currentBlock == null) {
+ updateCurrentBlock()
+ }
+ currentBlock.synchronized {
+ currentBlock += data
+ }
+ }
+
+ def getShortInterval(time: Time): Interval = {
+ val intervalBegin = time.floor(shortIntervalDuration)
+ Interval(intervalBegin, intervalBegin + shortIntervalDuration)
+ }
+
+ def getLongInterval(shortInterval: Interval): Interval = {
+ val intervalBegin = shortInterval.beginTime.floor(longIntervalDuration)
+ Interval(intervalBegin, intervalBegin + longIntervalDuration)
+ }
+
+ def updateCurrentBlock() {
+ /*logInfo("Updating current block")*/
+ val currentTime: LongTime = LongTime(System.currentTimeMillis)
+ val shortInterval = getShortInterval(currentTime)
+ val longInterval = getLongInterval(shortInterval)
+
+ def createBlock(reuseCurrentBlock: Boolean = false) {
+ val newBlockId = inputName + "-" + longInterval.toFormattedString + "-" + currentBucket.blocks.size
+ if (!reuseCurrentBlock) {
+ val newBlock = new Block(newBlockId, shortInterval)
+ /*logInfo("Created " + currentBlock)*/
+ currentBlock = newBlock
+ } else {
+ currentBlock.shortInterval = shortInterval
+ currentBlock.id = newBlockId
+ }
+ }
+
+ def createBucket() {
+ val newBucket = new Bucket(longInterval)
+ buckets += ((longInterval, newBucket))
+ currentBucket = newBucket
+ /*logInfo("Created " + currentBucket + ", " + buckets.size + " buckets")*/
+ }
+
+ if (currentBlock == null || currentBucket == null) {
+ createBucket()
+ currentBucket.synchronized {
+ createBlock()
+ }
+ return
+ }
+
+ currentBlock.synchronized {
+ var reuseCurrentBlock = false
+
+ if (shortInterval != currentBlock.shortInterval) {
+ if (!currentBlock.empty) {
+ blocksForPushing.synchronized {
+ blocksForPushing += currentBlock
+ blocksForPushing.notifyAll()
+ }
+ }
+
+ currentBucket.synchronized {
+ if (currentBlock.empty) {
+ reuseCurrentBlock = true
+ } else {
+ currentBucket += currentBlock
+ }
+
+ if (longInterval != currentBucket.longInterval) {
+ currentBucket.filled = true
+ if (currentBucket.ready) {
+ currentBucket.notifyAll()
+ }
+ createBucket()
+ }
+ }
+
+ createBlock(reuseCurrentBlock)
+ }
+ }
+ }
+
+ def pushBlock(block: Block) {
+ try{
+ if (blockManager != null) {
+ logInfo("Pushing block")
+ val startTime = System.currentTimeMillis
+
+ val bytes = blockManager.dataSerialize(block.data.toIterator)
+ val finishTime = System.currentTimeMillis
+ logInfo(block + " serialization delay is " + (finishTime - startTime) / 1000.0 + " s")
+
+ blockManager.putBytes(block.id.toString, bytes, StorageLevel.DISK_AND_MEMORY_2)
+ /*blockManager.putBytes(block.id.toString, bytes, StorageLevel.DISK_AND_MEMORY_DESER_2)*/
+ /*blockManager.put(block.id.toString, block.data.toIterator, StorageLevel.DISK_AND_MEMORY_DESER)*/
+ /*blockManager.put(block.id.toString, block.data.toIterator, StorageLevel.DISK_AND_MEMORY)*/
+ val finishTime1 = System.currentTimeMillis
+ logInfo(block + " put delay is " + (finishTime1 - startTime) / 1000.0 + " s")
+ } else {
+ logWarning(block + " not put as block manager is null")
+ }
+ } catch {
+ case e: Exception => logError("Exception writing " + block + " to blockmanager" , e)
+ }
+ }
+
+ def getBucket(longInterval: Interval): Option[Bucket] = {
+ buckets.get(longInterval)
+ }
+
+ def clearBucket(longInterval: Interval) {
+ buckets.remove(longInterval)
+ }
+
+ def keepUpdatingCurrentBlock() {
+ logInfo("Thread to update current block started")
+ while(true) {
+ updateCurrentBlock()
+ val currentTimeMillis = System.currentTimeMillis
+ val sleepTimeMillis = (currentTimeMillis / shortIntervalDurationMillis + 1) *
+ shortIntervalDurationMillis - currentTimeMillis + 1
+ Thread.sleep(sleepTimeMillis)
+ }
+ }
+
+ def keepPushingBlocks() {
+ var loop = true
+ logInfo("Thread to push blocks started")
+ while(loop) {
+ val block = blocksForPushing.synchronized {
+ if (blocksForPushing.size == 0) {
+ blocksForPushing.wait()
+ }
+ blocksForPushing.dequeue
+ }
+ pushBlock(block)
+ block.pushed = true
+ block.data.clear()
+
+ val bucket = buckets(block.longInterval)
+ bucket.synchronized {
+ if (bucket.ready) {
+ bucket.notifyAll()
+ }
+ }
+ }
+ }
+ }
+
+
+ class ConnectionListener(port: Int, dataHandler: DataHandler)
+ extends Thread with Logging {
+ initLogging()
+ override def run {
+ try {
+ val listener = new ServerSocket(port)
+ logInfo("Listening on port " + port)
+ while (true) {
+ new ConnectionHandler(listener.accept(), dataHandler).start();
+ }
+ listener.close()
+ } catch {
+ case e: Exception => logError("", e);
+ }
+ }
+ }
+
+ class ConnectionHandler(socket: Socket, dataHandler: DataHandler) extends Thread with Logging {
+ initLogging()
+ override def run {
+ logInfo("New connection from " + socket.getInetAddress() + ":" + socket.getPort)
+ val bytes = new Array[Byte](100 * 1024 * 1024)
+ try {
+
+ val inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream, 1024 * 1024))
+ /*val inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream))*/
+ var str: String = null
+ str = inputStream.readUTF
+ while(str != null) {
+ dataHandler += str
+ str = inputStream.readUTF()
+ }
+
+ /*
+ var loop = true
+ while(loop) {
+ val numRead = inputStream.read(bytes)
+ if (numRead < 0) {
+ loop = false
+ }
+ inbox += ((LongTime(SystemTime.currentTimeMillis), "test"))
+ }*/
+
+ inputStream.close()
+ } catch {
+ case e => logError("Error receiving data", e)
+ }
+ socket.close()
+ }
+ }
+
+ initLogging()
+
+ val masterHost = System.getProperty("spark.master.host")
+ val masterPort = System.getProperty("spark.master.port").toInt
+
+ val akkaPath = "akka://spark@%s:%s/user/".format(masterHost, masterPort)
+ val sparkstreamScheduler = actorSystem.actorFor(akkaPath + "/SparkStreamScheduler")
+ val testStreamCoordinator = actorSystem.actorFor(akkaPath + "/TestStreamCoordinator")
+
+ logInfo("Getting stream details from master " + masterHost + ":" + masterPort)
+
+ val timeout = 50 millis
+
+ var started = false
+ while (!started) {
+ askActor[String](testStreamCoordinator, TestStarted) match {
+ case Some(str) => {
+ started = true
+ logInfo("TestStreamCoordinator started")
+ }
+ case None => {
+ logInfo("TestStreamCoordinator not started yet")
+ Thread.sleep(200)
+ }
+ }
+ }
+
+ val streamDetails = askActor[GotStreamDetails](testStreamCoordinator, GetStreamDetails) match {
+ case Some(details) => details
+ case None => throw new Exception("Could not get stream details")
+ }
+ logInfo("Stream details received: " + streamDetails)
+
+ val inputName = streamDetails.name
+ val intervalDurationMillis = streamDetails.duration
+ val intervalDuration = LongTime(intervalDurationMillis)
+
+ val dataHandler = new DataHandler(
+ inputName,
+ intervalDuration,
+ LongTime(TestStreamReceiver3.SHORT_INTERVAL_MILLIS),
+ blockManager)
+
+ val connListener = new ConnectionListener(TestStreamReceiver3.PORT, dataHandler)
+
+ // Send a message to an actor and return an option with its reply, or None if this times out
+ def askActor[T](actor: ActorRef, message: Any): Option[T] = {
+ try {
+ val future = actor.ask(message)(timeout)
+ return Some(Await.result(future, timeout).asInstanceOf[T])
+ } catch {
+ case e: Exception =>
+ logInfo("Error communicating with " + actor, e)
+ return None
+ }
+ }
+
+ override def run() {
+ connListener.start()
+ dataHandler.start()
+
+ var interval = Interval.currentInterval(intervalDuration)
+ var dataStarted = false
+
+ while(true) {
+ waitFor(interval.endTime)
+ logInfo("Woken up at " + System.currentTimeMillis + " for " + interval)
+ dataHandler.getBucket(interval) match {
+ case Some(bucket) => {
+ logInfo("Found " + bucket + " for " + interval)
+ bucket.synchronized {
+ if (!bucket.ready) {
+ logInfo("Waiting for " + bucket)
+ bucket.wait()
+ logInfo("Wait over for " + bucket)
+ }
+ if (dataStarted || !bucket.empty) {
+ logInfo("Notifying " + bucket)
+ notifyScheduler(interval, bucket.blockIds)
+ dataStarted = true
+ }
+ bucket.blocks.clear()
+ dataHandler.clearBucket(interval)
+ }
+ }
+ case None => {
+ logInfo("Found none for " + interval)
+ if (dataStarted) {
+ logInfo("Notifying none")
+ notifyScheduler(interval, Array[String]())
+ }
+ }
+ }
+ interval = interval.next
+ }
+ }
+
+ def waitFor(time: Time) {
+ val currentTimeMillis = System.currentTimeMillis
+ val targetTimeMillis = time.asInstanceOf[LongTime].milliseconds
+ if (currentTimeMillis < targetTimeMillis) {
+ val sleepTime = (targetTimeMillis - currentTimeMillis)
+ Thread.sleep(sleepTime + 1)
+ }
+ }
+
+ def notifyScheduler(interval: Interval, blockIds: Array[String]) {
+ try {
+ sparkstreamScheduler ! InputGenerated(inputName, interval, blockIds.toArray)
+ val time = interval.endTime.asInstanceOf[LongTime]
+ val delay = (System.currentTimeMillis - time.milliseconds) / 1000.0
+ logInfo("Pushing delay for " + time + " is " + delay + " s")
+ } catch {
+ case _ => logError("Exception notifying scheduler at interval " + interval)
+ }
+ }
+}
+
+object TestStreamReceiver3 {
+
+ val PORT = 9999
+ val SHORT_INTERVAL_MILLIS = 100
+
+ def main(args: Array[String]) {
+ System.setProperty("spark.master.host", Utils.localHostName)
+ System.setProperty("spark.master.port", "7078")
+ val details = Array(("Sentences", 2000L))
+ val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localHostName, 7078)
+ actorSystem.actorOf(Props(new TestStreamCoordinator(details)), name = "TestStreamCoordinator")
+ new TestStreamReceiver3(actorSystem, null).start()
+ }
+}
+
+
+
diff --git a/streaming/src/main/scala/spark/stream/TestStreamReceiver4.scala b/streaming/src/main/scala/spark/stream/TestStreamReceiver4.scala
new file mode 100644
index 0000000000..e7bef75391
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/TestStreamReceiver4.scala
@@ -0,0 +1,373 @@
+package spark.stream
+
+import spark._
+import spark.storage._
+import spark.util.AkkaUtils
+
+import scala.math._
+import scala.collection.mutable.{Queue, HashMap, ArrayBuffer, SynchronizedMap}
+
+import java.io._
+import java.nio._
+import java.nio.charset._
+import java.nio.channels._
+import java.util.concurrent.Executors
+
+import akka.actor._
+import akka.actor.Actor
+import akka.dispatch._
+import akka.pattern.ask
+import akka.util.duration._
+
+class TestStreamReceiver4(actorSystem: ActorSystem, blockManager: BlockManager)
+extends Thread with Logging {
+
+ class DataHandler(
+ inputName: String,
+ longIntervalDuration: LongTime,
+ shortIntervalDuration: LongTime,
+ blockManager: BlockManager
+ )
+ extends Logging {
+
+ class Block(val id: String, val shortInterval: Interval, val buffer: ByteBuffer) {
+ var pushed = false
+ def longInterval = getLongInterval(shortInterval)
+ override def toString() = "Block " + id
+ }
+
+ class Bucket(val longInterval: Interval) {
+ val blocks = new ArrayBuffer[Block]()
+ var filled = false
+ def += (block: Block) = blocks += block
+ def empty() = (blocks.size == 0)
+ def ready() = (filled && !blocks.exists(! _.pushed))
+ def blockIds() = blocks.map(_.id).toArray
+ override def toString() = "Bucket [" + longInterval + ", " + blocks.size + " blocks]"
+ }
+
+ initLogging()
+
+ val syncOnLastShortInterval = true
+
+ val shortIntervalDurationMillis = shortIntervalDuration.asInstanceOf[LongTime].milliseconds
+ val longIntervalDurationMillis = longIntervalDuration.asInstanceOf[LongTime].milliseconds
+
+ val buffer = ByteBuffer.allocateDirect(100 * 1024 * 1024)
+ var currentShortInterval = Interval.currentInterval(shortIntervalDuration)
+
+ val blocksForPushing = new Queue[Block]()
+ val buckets = new HashMap[Interval, Bucket]() with SynchronizedMap[Interval, Bucket]
+
+ val bufferProcessingThread = new Thread() { override def run() { keepProcessingBuffers() } }
+ val blockPushingExecutor = Executors.newFixedThreadPool(5)
+
+
+ def start() {
+ buffer.clear()
+ if (buffer.remaining == 0) {
+ throw new Exception("Buffer initialization error")
+ }
+ bufferProcessingThread.start()
+ }
+
+ def readDataToBuffer(func: ByteBuffer => Int): Int = {
+ buffer.synchronized {
+ if (buffer.remaining == 0) {
+ logInfo("Received first data for interval " + currentShortInterval)
+ }
+ func(buffer)
+ }
+ }
+
+ def getLongInterval(shortInterval: Interval): Interval = {
+ val intervalBegin = shortInterval.beginTime.floor(longIntervalDuration)
+ Interval(intervalBegin, intervalBegin + longIntervalDuration)
+ }
+
+ def processBuffer() {
+
+ def readInt(buffer: ByteBuffer): Int = {
+ var offset = 0
+ var result = 0
+ while (offset < 32) {
+ val b = buffer.get()
+ result |= ((b & 0x7F) << offset)
+ if ((b & 0x80) == 0) {
+ return result
+ }
+ offset += 7
+ }
+ throw new Exception("Malformed zigzag-encoded integer")
+ }
+
+ val currentLongInterval = getLongInterval(currentShortInterval)
+ val startTime = System.currentTimeMillis
+ val newBuffer: ByteBuffer = buffer.synchronized {
+ buffer.flip()
+ if (buffer.remaining == 0) {
+ buffer.clear()
+ null
+ } else {
+ logDebug("Processing interval " + currentShortInterval + " with delay of " + (System.currentTimeMillis - startTime) + " ms")
+ val startTime1 = System.currentTimeMillis
+ var loop = true
+ var count = 0
+ while(loop) {
+ buffer.mark()
+ try {
+ val len = readInt(buffer)
+ buffer.position(buffer.position + len)
+ count += 1
+ } catch {
+ case e: Exception => {
+ buffer.reset()
+ loop = false
+ }
+ }
+ }
+ val bytesToCopy = buffer.position
+ val newBuf = ByteBuffer.allocate(bytesToCopy)
+ buffer.position(0)
+ newBuf.put(buffer.slice().limit(bytesToCopy).asInstanceOf[ByteBuffer])
+ newBuf.flip()
+ buffer.position(bytesToCopy)
+ buffer.compact()
+ newBuf
+ }
+ }
+
+ if (newBuffer != null) {
+ val bucket = buckets.getOrElseUpdate(currentLongInterval, new Bucket(currentLongInterval))
+ bucket.synchronized {
+ val newBlockId = inputName + "-" + currentLongInterval.toFormattedString + "-" + currentShortInterval.toFormattedString
+ val newBlock = new Block(newBlockId, currentShortInterval, newBuffer)
+ if (syncOnLastShortInterval) {
+ bucket += newBlock
+ }
+ logDebug("Created " + newBlock + " with " + newBuffer.remaining + " bytes, creation delay is " + (System.currentTimeMillis - currentShortInterval.endTime.asInstanceOf[LongTime].milliseconds) / 1000.0 + " s" )
+ blockPushingExecutor.execute(new Runnable() { def run() { pushAndNotifyBlock(newBlock) } })
+ }
+ }
+
+ val newShortInterval = Interval.currentInterval(shortIntervalDuration)
+ val newLongInterval = getLongInterval(newShortInterval)
+
+ if (newLongInterval != currentLongInterval) {
+ buckets.get(currentLongInterval) match {
+ case Some(bucket) => {
+ bucket.synchronized {
+ bucket.filled = true
+ if (bucket.ready) {
+ bucket.notifyAll()
+ }
+ }
+ }
+ case None =>
+ }
+ buckets += ((newLongInterval, new Bucket(newLongInterval)))
+ }
+
+ currentShortInterval = newShortInterval
+ }
+
+ def pushBlock(block: Block) {
+ try{
+ if (blockManager != null) {
+ val startTime = System.currentTimeMillis
+ logInfo(block + " put start delay is " + (startTime - block.shortInterval.endTime.asInstanceOf[LongTime].milliseconds) + " ms")
+ /*blockManager.putBytes(block.id.toString, block.buffer, StorageLevel.DISK_AND_MEMORY)*/
+ /*blockManager.putBytes(block.id.toString, block.buffer, StorageLevel.DISK_AND_MEMORY_2)*/
+ blockManager.putBytes(block.id.toString, block.buffer, StorageLevel.MEMORY_ONLY_2)
+ /*blockManager.putBytes(block.id.toString, block.buffer, StorageLevel.MEMORY_ONLY)*/
+ /*blockManager.putBytes(block.id.toString, block.buffer, StorageLevel.DISK_AND_MEMORY_DESER)*/
+ /*blockManager.putBytes(block.id.toString, block.buffer, StorageLevel.DISK_AND_MEMORY_DESER_2)*/
+ val finishTime = System.currentTimeMillis
+ logInfo(block + " put delay is " + (finishTime - startTime) + " ms")
+ } else {
+ logWarning(block + " not put as block manager is null")
+ }
+ } catch {
+ case e: Exception => logError("Exception writing " + block + " to blockmanager" , e)
+ }
+ }
+
+ def getBucket(longInterval: Interval): Option[Bucket] = {
+ buckets.get(longInterval)
+ }
+
+ def clearBucket(longInterval: Interval) {
+ buckets.remove(longInterval)
+ }
+
+ def keepProcessingBuffers() {
+ logInfo("Thread to process buffers started")
+ while(true) {
+ processBuffer()
+ val currentTimeMillis = System.currentTimeMillis
+ val sleepTimeMillis = (currentTimeMillis / shortIntervalDurationMillis + 1) *
+ shortIntervalDurationMillis - currentTimeMillis + 1
+ Thread.sleep(sleepTimeMillis)
+ }
+ }
+
+ def pushAndNotifyBlock(block: Block) {
+ pushBlock(block)
+ block.pushed = true
+ val bucket = if (syncOnLastShortInterval) {
+ buckets(block.longInterval)
+ } else {
+ var longInterval = block.longInterval
+ while(!buckets.contains(longInterval)) {
+ logWarning("Skipping bucket of " + longInterval + " for " + block)
+ longInterval = longInterval.next
+ }
+ val chosenBucket = buckets(longInterval)
+ logDebug("Choosing bucket of " + longInterval + " for " + block)
+ chosenBucket += block
+ chosenBucket
+ }
+
+ bucket.synchronized {
+ if (bucket.ready) {
+ bucket.notifyAll()
+ }
+ }
+
+ }
+ }
+
+
+ class ReceivingConnectionHandler(host: String, port: Int, dataHandler: DataHandler)
+ extends ConnectionHandler(host, port, false) {
+
+ override def ready(key: SelectionKey) {
+ changeInterest(key, SelectionKey.OP_READ)
+ }
+
+ override def read(key: SelectionKey) {
+ try {
+ val channel = key.channel.asInstanceOf[SocketChannel]
+ val bytesRead = dataHandler.readDataToBuffer(channel.read)
+ if (bytesRead < 0) {
+ close(key)
+ }
+ } catch {
+ case e: IOException => {
+ logError("Error reading", e)
+ close(key)
+ }
+ }
+ }
+ }
+
+ initLogging()
+
+ val masterHost = System.getProperty("spark.master.host", "localhost")
+ val masterPort = System.getProperty("spark.master.port", "7078").toInt
+
+ val akkaPath = "akka://spark@%s:%s/user/".format(masterHost, masterPort)
+ val sparkstreamScheduler = actorSystem.actorFor(akkaPath + "/SparkStreamScheduler")
+ val testStreamCoordinator = actorSystem.actorFor(akkaPath + "/TestStreamCoordinator")
+
+ logInfo("Getting stream details from master " + masterHost + ":" + masterPort)
+
+ val streamDetails = askActor[GotStreamDetails](testStreamCoordinator, GetStreamDetails) match {
+ case Some(details) => details
+ case None => throw new Exception("Could not get stream details")
+ }
+ logInfo("Stream details received: " + streamDetails)
+
+ val inputName = streamDetails.name
+ val intervalDurationMillis = streamDetails.duration
+ val intervalDuration = Milliseconds(intervalDurationMillis)
+ val shortIntervalDuration = Milliseconds(System.getProperty("spark.stream.shortinterval", "500").toInt)
+
+ val dataHandler = new DataHandler(inputName, intervalDuration, shortIntervalDuration, blockManager)
+ val connectionHandler = new ReceivingConnectionHandler("localhost", 9999, dataHandler)
+
+ val timeout = 100 millis
+
+ // Send a message to an actor and return an option with its reply, or None if this times out
+ def askActor[T](actor: ActorRef, message: Any): Option[T] = {
+ try {
+ val future = actor.ask(message)(timeout)
+ return Some(Await.result(future, timeout).asInstanceOf[T])
+ } catch {
+ case e: Exception =>
+ logInfo("Error communicating with " + actor, e)
+ return None
+ }
+ }
+
+ override def run() {
+ connectionHandler.start()
+ dataHandler.start()
+
+ var interval = Interval.currentInterval(intervalDuration)
+ var dataStarted = false
+
+
+ while(true) {
+ waitFor(interval.endTime)
+ /*logInfo("Woken up at " + System.currentTimeMillis + " for " + interval)*/
+ dataHandler.getBucket(interval) match {
+ case Some(bucket) => {
+ logDebug("Found " + bucket + " for " + interval)
+ bucket.synchronized {
+ if (!bucket.ready) {
+ logDebug("Waiting for " + bucket)
+ bucket.wait()
+ logDebug("Wait over for " + bucket)
+ }
+ if (dataStarted || !bucket.empty) {
+ logDebug("Notifying " + bucket)
+ notifyScheduler(interval, bucket.blockIds)
+ dataStarted = true
+ }
+ bucket.blocks.clear()
+ dataHandler.clearBucket(interval)
+ }
+ }
+ case None => {
+ logDebug("Found none for " + interval)
+ if (dataStarted) {
+ logDebug("Notifying none")
+ notifyScheduler(interval, Array[String]())
+ }
+ }
+ }
+ interval = interval.next
+ }
+ }
+
+ def waitFor(time: Time) {
+ val currentTimeMillis = System.currentTimeMillis
+ val targetTimeMillis = time.asInstanceOf[LongTime].milliseconds
+ if (currentTimeMillis < targetTimeMillis) {
+ val sleepTime = (targetTimeMillis - currentTimeMillis)
+ Thread.sleep(sleepTime + 1)
+ }
+ }
+
+ def notifyScheduler(interval: Interval, blockIds: Array[String]) {
+ try {
+ sparkstreamScheduler ! InputGenerated(inputName, interval, blockIds.toArray)
+ val time = interval.endTime.asInstanceOf[LongTime]
+ val delay = (System.currentTimeMillis - time.milliseconds)
+ logInfo("Notification delay for " + time + " is " + delay + " ms")
+ } catch {
+ case e: Exception => logError("Exception notifying scheduler at interval " + interval + ": " + e)
+ }
+ }
+}
+
+
+object TestStreamReceiver4 {
+ def main(args: Array[String]) {
+ val details = Array(("Sentences", 2000L))
+ val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localHostName, 7078)
+ actorSystem.actorOf(Props(new TestStreamCoordinator(details)), name = "TestStreamCoordinator")
+ new TestStreamReceiver4(actorSystem, null).start()
+ }
+}
diff --git a/streaming/src/main/scala/spark/stream/Time.scala b/streaming/src/main/scala/spark/stream/Time.scala
new file mode 100644
index 0000000000..25369dfee5
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/Time.scala
@@ -0,0 +1,85 @@
+package spark.stream
+
+abstract case class Time {
+
+ // basic operations that must be overridden
+ def copy(): Time
+ def zero: Time
+ def < (that: Time): Boolean
+ def += (that: Time): Time
+ def -= (that: Time): Time
+ def floor(that: Time): Time
+ def isMultipleOf(that: Time): Boolean
+
+ // derived operations composed of basic operations
+ def + (that: Time) = this.copy() += that
+ def - (that: Time) = this.copy() -= that
+ def * (times: Int) = {
+ var count = 0
+ var result = this.copy()
+ while (count < times) {
+ result += this
+ count += 1
+ }
+ result
+ }
+ def <= (that: Time) = (this < that || this == that)
+ def > (that: Time) = !(this <= that)
+ def >= (that: Time) = !(this < that)
+ def isZero = (this == zero)
+ def toFormattedString = toString
+}
+
+object Time {
+ def Milliseconds(milliseconds: Long) = LongTime(milliseconds)
+
+ def zero = LongTime(0)
+}
+
+case class LongTime(var milliseconds: Long) extends Time {
+
+ override def copy() = LongTime(this.milliseconds)
+
+ override def zero = LongTime(0)
+
+ override def < (that: Time): Boolean =
+ (this.milliseconds < that.asInstanceOf[LongTime].milliseconds)
+
+ override def += (that: Time): Time = {
+ this.milliseconds += that.asInstanceOf[LongTime].milliseconds
+ this
+ }
+
+ override def -= (that: Time): Time = {
+ this.milliseconds -= that.asInstanceOf[LongTime].milliseconds
+ this
+ }
+
+ override def floor(that: Time): Time = {
+ val t = that.asInstanceOf[LongTime].milliseconds
+ val m = this.milliseconds / t
+ LongTime(m.toLong * t)
+ }
+
+ override def isMultipleOf(that: Time): Boolean =
+ (this.milliseconds % that.asInstanceOf[LongTime].milliseconds == 0)
+
+ override def isZero = (this.milliseconds == 0)
+
+ override def toString = (milliseconds.toString + "ms")
+
+ override def toFormattedString = milliseconds.toString
+}
+
+object Milliseconds {
+ def apply(milliseconds: Long) = LongTime(milliseconds)
+}
+
+object Seconds {
+ def apply(seconds: Long) = LongTime(seconds * 1000)
+}
+
+object Minutes {
+ def apply(minutes: Long) = LongTime(minutes * 60000)
+}
+
diff --git a/streaming/src/main/scala/spark/stream/TopContentCount.scala b/streaming/src/main/scala/spark/stream/TopContentCount.scala
new file mode 100644
index 0000000000..a8cca4e793
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/TopContentCount.scala
@@ -0,0 +1,97 @@
+package spark.stream
+
+import SparkStreamContext._
+
+import spark.storage.StorageLevel
+
+import scala.util.Sorting
+
+object TopContentCount {
+
+ case class Event(val country: String, val content: String)
+
+ object Event {
+ def create(string: String): Event = {
+ val parts = string.split(":")
+ new Event(parts(0), parts(1))
+ }
+ }
+
+ def main(args: Array[String]) {
+
+ if (args.length < 2) {
+ println ("Usage: GrepCount2 <host> <# sentence streams>")
+ System.exit(1)
+ }
+
+ val ssc = new SparkStreamContext(args(0), "TopContentCount")
+ val sc = ssc.sc
+ val dummy = sc.parallelize(0 to 1000, 100).persist(StorageLevel.DISK_AND_MEMORY)
+ sc.runJob(dummy, (_: Iterator[Int]) => {})
+
+
+ val numEventStreams = if (args.length > 1) args(1).toInt else 1
+ if (args.length > 2) {
+ ssc.setTempDir(args(2))
+ }
+
+ val eventStrings = new UnifiedRDS(
+ (1 to numEventStreams).map(i => ssc.readTestStream("Events-" + i, 1000)).toArray
+ )
+
+ def parse(string: String) = {
+ val parts = string.split(":")
+ (parts(0), parts(1))
+ }
+
+ def add(v1: Int, v2: Int) = (v1 + v2)
+ def subtract(v1: Int, v2: Int) = (v1 - v2)
+
+ val events = eventStrings.map(x => parse(x))
+ /*events.print*/
+
+ val parallelism = 8
+ val counts_per_content_per_country = events
+ .map(x => (x, 1))
+ .reduceByKey(_ + _)
+ /*.reduceByKeyAndWindow(add _, subtract _, Seconds(5), Seconds(1), parallelism)*/
+ /*counts_per_content_per_country.print*/
+
+ /*
+ counts_per_content_per_country.persist(
+ StorageLevel.MEMORY_ONLY_DESER,
+ StorageLevel.MEMORY_ONLY_DESER_2,
+ Seconds(1)
+ )*/
+
+ val counts_per_country = counts_per_content_per_country
+ .map(x => (x._1._1, (x._1._2, x._2)))
+ .groupByKey()
+ counts_per_country.print
+
+
+ def topK(data: Seq[(String, Int)], k: Int): Array[(String, Int)] = {
+ implicit val countOrdering = new Ordering[(String, Int)] {
+ override def compare(count1: (String, Int), count2: (String, Int)): Int = {
+ count2._2 - count1._2
+ }
+ }
+ val array = data.toArray
+ Sorting.quickSort(array)
+ val taken = array.take(k)
+ taken
+ }
+
+ val k = 10
+ val topKContents_per_country = counts_per_country
+ .map(x => (x._1, topK(x._2, k)))
+ .map(x => (x._1, x._2.map(_.toString).reduceLeft(_ + ", " + _)))
+
+ topKContents_per_country.print
+
+ ssc.run
+ }
+}
+
+
+
diff --git a/streaming/src/main/scala/spark/stream/TopKWordCount2.scala b/streaming/src/main/scala/spark/stream/TopKWordCount2.scala
new file mode 100644
index 0000000000..7dd06dd5ee
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/TopKWordCount2.scala
@@ -0,0 +1,103 @@
+package spark.stream
+
+import spark.SparkContext
+import SparkContext._
+import SparkStreamContext._
+
+import spark.storage.StorageLevel
+
+import scala.util.Sorting
+
+object TopKWordCount2 {
+
+ def moreWarmup(sc: SparkContext) {
+ (0 until 20).foreach {i =>
+ sc.parallelize(1 to 20000000, 500)
+ .map(_ % 100).map(_.toString)
+ .map(x => (x, 1)).reduceByKey(_ + _, 10)
+ .collect()
+ }
+ }
+
+ def main (args: Array[String]) {
+
+ if (args.length < 2) {
+ println ("Usage: SparkStreamContext <host> <# sentence streams>")
+ System.exit(1)
+ }
+
+ val ssc = new SparkStreamContext(args(0), "WordCount2")
+
+ val numSentenceStreams = if (args.length > 1) args(1).toInt else 1
+ if (args.length > 2) {
+ ssc.setTempDir(args(2))
+ }
+
+ GrepCount2.warmConnectionManagers(ssc.sc)
+ moreWarmup(ssc.sc)
+
+ val sentences = new UnifiedRDS(
+ (1 to numSentenceStreams).map(i => ssc.readTestStream("Sentences-" + i, 1000)).toArray
+ )
+
+ val words = sentences.flatMap(_.split(" "))
+
+ def add(v1: Int, v2: Int) = (v1 + v2)
+ def subtract(v1: Int, v2: Int) = (v1 - v2)
+
+ val windowedCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(add _, subtract _, Seconds(10), Seconds(1), 10)
+ windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER, Seconds(1))
+
+ def topK(data: Iterator[(String, Int)], k: Int): Iterator[(String, Int)] = {
+ val taken = new Array[(String, Int)](k)
+
+ var i = 0
+ var len = 0
+ var done = false
+ var value: (String, Int) = null
+ var swap: (String, Int) = null
+ var count = 0
+
+ while(data.hasNext) {
+ value = data.next
+ count += 1
+ println("count = " + count)
+ if (len == 0) {
+ taken(0) = value
+ len = 1
+ } else if (len < k || value._2 > taken(len - 1)._2) {
+ if (len < k) {
+ len += 1
+ }
+ taken(len - 1) = value
+ i = len - 1
+ while(i > 0 && taken(i - 1)._2 < taken(i)._2) {
+ swap = taken(i)
+ taken(i) = taken(i-1)
+ taken(i - 1) = swap
+ i -= 1
+ }
+ }
+ }
+ println("Took " + len + " out of " + count + " items")
+ return taken.toIterator
+ }
+
+ val k = 10
+ val partialTopKWindowedCounts = windowedCounts.mapPartitions(topK(_, k))
+ partialTopKWindowedCounts.foreachRDD(rdd => {
+ val collectedCounts = rdd.collect
+ println("Collected " + collectedCounts.size + " items")
+ topK(collectedCounts.toIterator, k).foreach(println)
+ })
+
+ /*
+ windowedCounts.filter(_ == null).foreachRDD(rdd => {
+ val count = rdd.count
+ println("# of nulls = " + count)
+ })*/
+
+ ssc.run
+ }
+}
+
diff --git a/streaming/src/main/scala/spark/stream/TopKWordCount2_Special.scala b/streaming/src/main/scala/spark/stream/TopKWordCount2_Special.scala
new file mode 100644
index 0000000000..e9f3f914ae
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/TopKWordCount2_Special.scala
@@ -0,0 +1,142 @@
+package spark.stream
+
+import spark.SparkContext
+import SparkContext._
+import SparkStreamContext._
+
+import spark.storage.StorageLevel
+
+import scala.util.Sorting
+import scala.collection.JavaConversions.mapAsScalaMap
+import scala.collection.mutable.Queue
+
+import java.lang.{Long => JLong}
+
+object TopKWordCount2_Special {
+
+ def moreWarmup(sc: SparkContext) {
+ (0 until 20).foreach {i =>
+ sc.parallelize(1 to 20000000, 500)
+ .map(_ % 100).map(_.toString)
+ .map(x => (x, 1)).reduceByKey(_ + _, 10)
+ .collect()
+ }
+ }
+
+ def main (args: Array[String]) {
+
+ if (args.length < 2) {
+ println ("Usage: SparkStreamContext <host> <# sentence streams>")
+ System.exit(1)
+ }
+
+ val ssc = new SparkStreamContext(args(0), "TopKWordCount2")
+
+ val numSentenceStreams = if (args.length > 1) args(1).toInt else 1
+ if (args.length > 2) {
+ ssc.setTempDir(args(2))
+ }
+
+ GrepCount2.warmConnectionManagers(ssc.sc)
+ /*moreWarmup(ssc.sc)*/
+
+ val sentences = new UnifiedRDS(
+ (1 to numSentenceStreams).map(i => ssc.readTestStream("Sentences-" + i, 500)).toArray
+ )
+
+ /*val words = sentences.flatMap(_.split(" "))*/
+
+ /*def add(v1: Int, v2: Int) = (v1 + v2) */
+ /*def subtract(v1: Int, v2: Int) = (v1 - v2) */
+
+ def add(v1: JLong, v2: JLong) = (v1 + v2)
+ def subtract(v1: JLong, v2: JLong) = (v1 - v2)
+
+ def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, JLong)] = {
+ val map = new java.util.HashMap[String, JLong]
+ var i = 0
+ var j = 0
+ while (iter.hasNext) {
+ val s = iter.next()
+ i = 0
+ while (i < s.length) {
+ j = i
+ while (j < s.length && s.charAt(j) != ' ') {
+ j += 1
+ }
+ if (j > i) {
+ val w = s.substring(i, j)
+ val c = map.get(w)
+ if (c == null) {
+ map.put(w, 1)
+ } else {
+ map.put(w, c + 1)
+ }
+ }
+ i = j
+ while (i < s.length && s.charAt(i) == ' ') {
+ i += 1
+ }
+ }
+ }
+ map.toIterator
+ }
+
+
+ val windowedCounts = sentences.mapPartitions(splitAndCountPartitions).reduceByKeyAndWindow(add _, subtract _, Seconds(10), Milliseconds(500), 10)
+ /*windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER, Seconds(1))*/
+ windowedCounts.persist(StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY, Milliseconds(500))
+
+ def topK(data: Iterator[(String, JLong)], k: Int): Iterator[(String, JLong)] = {
+ val taken = new Array[(String, JLong)](k)
+
+ var i = 0
+ var len = 0
+ var done = false
+ var value: (String, JLong) = null
+ var swap: (String, JLong) = null
+ var count = 0
+
+ while(data.hasNext) {
+ value = data.next
+ count += 1
+ println("count = " + count)
+ if (len == 0) {
+ taken(0) = value
+ len = 1
+ } else if (len < k || value._2 > taken(len - 1)._2) {
+ if (len < k) {
+ len += 1
+ }
+ taken(len - 1) = value
+ i = len - 1
+ while(i > 0 && taken(i - 1)._2 < taken(i)._2) {
+ swap = taken(i)
+ taken(i) = taken(i-1)
+ taken(i - 1) = swap
+ i -= 1
+ }
+ }
+ }
+ println("Took " + len + " out of " + count + " items")
+ return taken.toIterator
+ }
+
+ val k = 50
+ val partialTopKWindowedCounts = windowedCounts.mapPartitions(topK(_, k))
+ partialTopKWindowedCounts.foreachRDD(rdd => {
+ val collectedCounts = rdd.collect
+ println("Collected " + collectedCounts.size + " items")
+ topK(collectedCounts.toIterator, k).foreach(println)
+ })
+
+ /*
+ windowedCounts.filter(_ == null).foreachRDD(rdd => {
+ val count = rdd.count
+ println("# of nulls = " + count)
+ })*/
+
+ ssc.run
+ }
+}
+
diff --git a/streaming/src/main/scala/spark/stream/WindowedRDS.scala b/streaming/src/main/scala/spark/stream/WindowedRDS.scala
new file mode 100644
index 0000000000..a2e7966edb
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/WindowedRDS.scala
@@ -0,0 +1,68 @@
+package spark.stream
+
+import spark.stream.SparkStreamContext._
+
+import spark.RDD
+import spark.UnionRDD
+import spark.SparkContext._
+
+import scala.collection.mutable.ArrayBuffer
+
+class WindowedRDS[T: ClassManifest](
+ parent: RDS[T],
+ _windowTime: Time,
+ _slideTime: Time)
+ extends RDS[T](parent.ssc) {
+
+ if (!_windowTime.isMultipleOf(parent.slideTime))
+ throw new Exception("The window duration of WindowedRDS (" + _slideTime + ") " +
+ "must be multiple of the slide duration of parent RDS (" + parent.slideTime + ")")
+
+ if (!_slideTime.isMultipleOf(parent.slideTime))
+ throw new Exception("The slide duration of WindowedRDS (" + _slideTime + ") " +
+ "must be multiple of the slide duration of parent RDS (" + parent.slideTime + ")")
+
+ val allowPartialWindows = true
+
+ override def dependencies = List(parent)
+
+ def windowTime: Time = _windowTime
+
+ override def slideTime: Time = _slideTime
+
+ override def compute(validTime: Time): Option[RDD[T]] = {
+ val parentRDDs = new ArrayBuffer[RDD[T]]()
+ val windowEndTime = validTime.copy()
+ val windowStartTime = if (allowPartialWindows && windowEndTime - windowTime < parent.zeroTime) {
+ parent.zeroTime
+ } else {
+ windowEndTime - windowTime
+ }
+
+ logInfo("Window = " + windowStartTime + " - " + windowEndTime)
+ logInfo("Parent.zeroTime = " + parent.zeroTime)
+
+ if (windowStartTime >= parent.zeroTime) {
+ // Walk back through time, from the 'windowEndTime' to 'windowStartTime'
+ // and get all parent RDDs from the parent RDS
+ var t = windowEndTime
+ while (t > windowStartTime) {
+ parent.getOrCompute(t) match {
+ case Some(rdd) => parentRDDs += rdd
+ case None => throw new Exception("Could not generate parent RDD for time " + t)
+ }
+ t -= parent.slideTime
+ }
+ }
+
+ // Do a union of all parent RDDs to generate the new RDD
+ if (parentRDDs.size > 0) {
+ Some(new UnionRDD(ssc.sc, parentRDDs))
+ } else {
+ None
+ }
+ }
+}
+
+
+
diff --git a/streaming/src/main/scala/spark/stream/WordCount.scala b/streaming/src/main/scala/spark/stream/WordCount.scala
new file mode 100644
index 0000000000..af825e46a8
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/WordCount.scala
@@ -0,0 +1,62 @@
+package spark.stream
+
+import SparkStreamContext._
+
+import scala.util.Sorting
+
+import spark.SparkContext
+import spark.storage.StorageLevel
+
+object WordCount {
+ var inputFile : String = null
+ var HDFS : String = null
+ var idealPartitions : Int = 0
+
+ def main (args: Array[String]) {
+
+ if (args.length != 4) {
+ println ("Usage: WordCount <host> <HDFS> <Input file> <Ideal Partitions>")
+ System.exit(1)
+ }
+
+ HDFS = args(1)
+ inputFile = HDFS + args(2)
+ idealPartitions = args(3).toInt
+ println ("Input file: " + inputFile)
+
+ val ssc = new SparkStreamContext(args(0), "WordCountWindow")
+
+ SparkContext.idealPartitions = idealPartitions
+ SparkContext.inputFile = inputFile
+
+ val sentences = ssc.readNetworkStream[String]("Sentences", Array("localhost:55119"), 2000)
+ //sentences.print
+
+ val words = sentences.flatMap(_.split(" "))
+
+ def add(v1: Int, v2: Int) = (v1 + v2)
+ def subtract(v1: Int, v2: Int) = (v1 - v2)
+
+ //val windowedCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(add _, subtract _, Seconds(30), Milliseconds(2000),
+ // System.getProperty("spark.default.parallelism", "1").toInt)
+ //windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.DISK_AND_MEMORY_DESER_2, Seconds(5))
+ //windowedCounts.print
+
+ val parallelism = System.getProperty("spark.default.parallelism", "1").toInt
+
+ //val localCounts = words.map(x => (x, 1)).reduceByKey(add _, parallelism)
+ //localCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2, Seconds(6))
+ //val windowedCounts = localCounts.window(Seconds(30), Seconds(2)).reduceByKey(_ + _)
+
+ val windowedCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(2),
+ parallelism)
+ windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2, Seconds(10))
+
+ //windowedCounts.print
+ windowedCounts.register
+ //windowedCounts.foreachRDD(rdd => rdd.collect.foreach(x => print(x+ " ")))
+ //windowedCounts.foreachRDD(rdd => rdd.collect.foreach(x => x))
+
+ ssc.run
+ }
+}
diff --git a/streaming/src/main/scala/spark/stream/WordCount1.scala b/streaming/src/main/scala/spark/stream/WordCount1.scala
new file mode 100644
index 0000000000..501062b18d
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/WordCount1.scala
@@ -0,0 +1,46 @@
+package spark.stream
+
+import SparkStreamContext._
+
+import scala.util.Sorting
+
+import spark.SparkContext
+import spark.storage.StorageLevel
+
+object WordCount1 {
+ var inputFile : String = null
+ var HDFS : String = null
+ var idealPartitions : Int = 0
+
+ def main (args: Array[String]) {
+
+ if (args.length != 4) {
+ println ("Usage: WordCount <host> <HDFS> <Input file> <Ideal Partitions>")
+ System.exit(1)
+ }
+
+ HDFS = args(1)
+ inputFile = HDFS + args(2)
+ idealPartitions = args(3).toInt
+ println ("Input file: " + inputFile)
+
+ val ssc = new SparkStreamContext(args(0), "WordCountWindow")
+
+ SparkContext.idealPartitions = idealPartitions
+ SparkContext.inputFile = inputFile
+
+ val sentences = ssc.readNetworkStream[String]("Sentences", Array("localhost:55119"), 1000)
+ //sentences.print
+
+ val words = sentences.flatMap(_.split(" "))
+
+ def add(v1: Int, v2: Int) = (v1 + v2)
+ def subtract(v1: Int, v2: Int) = (v1 - v2)
+
+ val windowedCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(add _, subtract _, Seconds(10), Seconds(1), 10)
+ windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER, Seconds(1))
+ windowedCounts.foreachRDD(_.collect)
+
+ ssc.run
+ }
+}
diff --git a/streaming/src/main/scala/spark/stream/WordCount2.scala b/streaming/src/main/scala/spark/stream/WordCount2.scala
new file mode 100644
index 0000000000..24324e891a
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/WordCount2.scala
@@ -0,0 +1,55 @@
+package spark.stream
+
+import spark.SparkContext
+import SparkContext._
+import SparkStreamContext._
+
+import spark.storage.StorageLevel
+
+import scala.util.Sorting
+
+object WordCount2 {
+
+ def moreWarmup(sc: SparkContext) {
+ (0 until 20).foreach {i =>
+ sc.parallelize(1 to 20000000, 500)
+ .map(_ % 100).map(_.toString)
+ .map(x => (x, 1)).reduceByKey(_ + _, 10)
+ .collect()
+ }
+ }
+
+ def main (args: Array[String]) {
+
+ if (args.length < 2) {
+ println ("Usage: SparkStreamContext <host> <# sentence streams>")
+ System.exit(1)
+ }
+
+ val ssc = new SparkStreamContext(args(0), "WordCount2")
+
+ val numSentenceStreams = if (args.length > 1) args(1).toInt else 1
+ if (args.length > 2) {
+ ssc.setTempDir(args(2))
+ }
+
+ GrepCount2.warmConnectionManagers(ssc.sc)
+ /*moreWarmup(ssc.sc)*/
+
+ val sentences = new UnifiedRDS(
+ (1 to numSentenceStreams).map(i => ssc.readTestStream("Sentences-" + i, 1000)).toArray
+ )
+
+ val words = sentences.flatMap(_.split(" "))
+
+ def add(v1: Int, v2: Int) = (v1 + v2)
+ def subtract(v1: Int, v2: Int) = (v1 - v2)
+
+ val windowedCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(add _, subtract _, Seconds(10), Seconds(1), 6)
+ windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER, Seconds(1))
+ windowedCounts.foreachRDD(_.collect)
+
+ ssc.run
+ }
+}
+
diff --git a/streaming/src/main/scala/spark/stream/WordCount2_Special.scala b/streaming/src/main/scala/spark/stream/WordCount2_Special.scala
new file mode 100644
index 0000000000..c6b1aaa57e
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/WordCount2_Special.scala
@@ -0,0 +1,94 @@
+package spark.stream
+
+import spark.SparkContext
+import SparkContext._
+import SparkStreamContext._
+
+import spark.storage.StorageLevel
+
+import scala.util.Sorting
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.Queue
+import scala.collection.JavaConversions.mapAsScalaMap
+
+import java.lang.{Long => JLong}
+import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
+
+
+object WordCount2_ExtraFunctions {
+
+ def add(v1: JLong, v2: JLong) = (v1 + v2)
+
+ def subtract(v1: JLong, v2: JLong) = (v1 - v2)
+
+ def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, JLong)] = {
+ val map = new java.util.HashMap[String, JLong]
+ var i = 0
+ var j = 0
+ while (iter.hasNext) {
+ val s = iter.next()
+ i = 0
+ while (i < s.length) {
+ j = i
+ while (j < s.length && s.charAt(j) != ' ') {
+ j += 1
+ }
+ if (j > i) {
+ val w = s.substring(i, j)
+ val c = map.get(w)
+ if (c == null) {
+ map.put(w, 1)
+ } else {
+ map.put(w, c + 1)
+ }
+ }
+ i = j
+ while (i < s.length && s.charAt(i) == ' ') {
+ i += 1
+ }
+ }
+ }
+ map.toIterator
+ }
+}
+
+object WordCount2_Special {
+
+ def moreWarmup(sc: SparkContext) {
+ (0 until 40).foreach {i =>
+ sc.parallelize(1 to 20000000, 1000)
+ .map(_ % 1331).map(_.toString)
+ .mapPartitions(WordCount2_ExtraFunctions.splitAndCountPartitions).reduceByKey(_ + _, 10)
+ .collect()
+ }
+ }
+
+ def main (args: Array[String]) {
+
+ if (args.length < 2) {
+ println ("Usage: SparkStreamContext <host> <# sentence streams>")
+ System.exit(1)
+ }
+
+ val ssc = new SparkStreamContext(args(0), "WordCount2")
+
+ val numSentenceStreams = if (args.length > 1) args(1).toInt else 1
+
+ GrepCount2.warmConnectionManagers(ssc.sc)
+ /*moreWarmup(ssc.sc)*/
+
+ val sentences = new UnifiedRDS(
+ (1 to numSentenceStreams).map(i => ssc.readTestStream("Sentences-" + i, 500)).toArray
+ )
+
+ val windowedCounts = sentences
+ .mapPartitions(WordCount2_ExtraFunctions.splitAndCountPartitions)
+ .reduceByKeyAndWindow(WordCount2_ExtraFunctions.add _, WordCount2_ExtraFunctions.subtract _, Seconds(10), Milliseconds(500), 10)
+ windowedCounts.persist(StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY, Milliseconds(500))
+ windowedCounts.foreachRDD(_.collect)
+
+ ssc.run
+ }
+}
+
diff --git a/streaming/src/main/scala/spark/stream/WordCount3.scala b/streaming/src/main/scala/spark/stream/WordCount3.scala
new file mode 100644
index 0000000000..455a8c9dbf
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/WordCount3.scala
@@ -0,0 +1,49 @@
+package spark.stream
+
+import SparkStreamContext._
+
+import scala.util.Sorting
+
+object WordCount3 {
+
+ def main (args: Array[String]) {
+
+ if (args.length < 1) {
+ println ("Usage: SparkStreamContext <host> [<temp directory>]")
+ System.exit(1)
+ }
+
+ val ssc = new SparkStreamContext(args(0), "WordCount")
+ if (args.length > 1) {
+ ssc.setTempDir(args(1))
+ }
+ val sentences = ssc.readNetworkStream[String]("Sentences", Array("localhost:55119"), 1000)
+ /*sentences.print*/
+
+ val words = sentences.flatMap(_.split(" "))
+
+ def add(v1: Int, v2: Int) = (v1 + v2)
+ def subtract(v1: Int, v2: Int) = (v1 - v2)
+
+ /*val windowedCounts = words.map(x => (x, 1)).window(Seconds(5), Seconds(1)).reduceByKey(add _, 1)*/
+ val windowedCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(add _, subtract _, Seconds(5), Seconds(1), 1)
+ /*windowedCounts.print */
+
+ def topK(data: Seq[(String, Int)], k: Int): Array[(String, Int)] = {
+ implicit val countOrdering = new Ordering[(String, Int)] {
+ override def compare(count1: (String, Int), count2: (String, Int)): Int = {
+ count2._2 - count1._2
+ }
+ }
+ val array = data.toArray
+ Sorting.quickSort(array)
+ array.take(k)
+ }
+
+ val k = 10
+ val topKWindowedCounts = windowedCounts.glom.flatMap(topK(_, k)).collect.flatMap(topK(_, k))
+ topKWindowedCounts.print
+
+ ssc.run
+ }
+}
diff --git a/streaming/src/main/scala/spark/stream/WordCountEc2.scala b/streaming/src/main/scala/spark/stream/WordCountEc2.scala
new file mode 100644
index 0000000000..5b10026d7a
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/WordCountEc2.scala
@@ -0,0 +1,41 @@
+package spark.stream
+
+import SparkStreamContext._
+import spark.SparkContext
+
+object WordCountEc2 {
+ var inputFile : String = null
+ var HDFS : String = null
+ var idealPartitions : Int = 0
+
+ def main (args: Array[String]) {
+
+ if (args.length != 4) {
+ println ("Usage: SparkStreamContext <host> <HDFS> <Input file> <Ideal Partitions>")
+ System.exit(1)
+ }
+
+ HDFS = args(1)
+ inputFile = HDFS + args(2)
+ idealPartitions = args(3).toInt
+ println ("Input file: " + inputFile)
+
+ SparkContext.idealPartitions = idealPartitions
+ SparkContext.inputFile = inputFile
+
+ val ssc = new SparkStreamContext(args(0), "Test")
+ val sentences = ssc.readNetworkStream[String]("Sentences", Array("localhost:55119"), 1000)
+ /*sentences.foreach(println)*/
+
+ val words = sentences.flatMap(_.split(" "))
+ /*words.foreach(println)*/
+
+ val counts = words.map(x => (x, 1)).reduceByKey(_ + _)
+ /*counts.foreach(println)*/
+
+ counts.foreachRDD(rdd => rdd.collect.foreach(x => x))
+ /*counts.register*/
+
+ ssc.run
+ }
+}
diff --git a/streaming/src/main/scala/spark/stream/WordCountTrivialWindow.scala b/streaming/src/main/scala/spark/stream/WordCountTrivialWindow.scala
new file mode 100644
index 0000000000..5469df71e9
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/WordCountTrivialWindow.scala
@@ -0,0 +1,51 @@
+package spark.stream
+
+import SparkStreamContext._
+
+import scala.util.Sorting
+
+object WordCountTrivialWindow {
+
+ def main (args: Array[String]) {
+
+ if (args.length < 1) {
+ println ("Usage: SparkStreamContext <host> [<temp directory>]")
+ System.exit(1)
+ }
+
+ val ssc = new SparkStreamContext(args(0), "WordCountTrivialWindow")
+ if (args.length > 1) {
+ ssc.setTempDir(args(1))
+ }
+ val sentences = ssc.readNetworkStream[String]("Sentences", Array("localhost:55119"), 1000)
+ /*sentences.print*/
+
+ val words = sentences.flatMap(_.split(" "))
+
+ /*val counts = words.map(x => (x, 1)).reduceByKey(_ + _, 1)*/
+ /*counts.print*/
+
+ def add(v1: Int, v2: Int) = (v1 + v2)
+ def subtract(v1: Int, v2: Int) = (v1 - v2)
+
+ val windowedCounts = words.map(x => (x, 1)).window(Seconds(5), Seconds(1)).reduceByKey(add _, 1)
+ /*windowedCounts.print */
+
+ def topK(data: Seq[(String, Int)], k: Int): Array[(String, Int)] = {
+ implicit val countOrdering = new Ordering[(String, Int)] {
+ override def compare(count1: (String, Int), count2: (String, Int)): Int = {
+ count2._2 - count1._2
+ }
+ }
+ val array = data.toArray
+ Sorting.quickSort(array)
+ array.take(k)
+ }
+
+ val k = 10
+ val topKWindowedCounts = windowedCounts.glom.flatMap(topK(_, k)).collect.flatMap(topK(_, k))
+ topKWindowedCounts.print
+
+ ssc.run
+ }
+}
diff --git a/streaming/src/main/scala/spark/stream/WordMax.scala b/streaming/src/main/scala/spark/stream/WordMax.scala
new file mode 100644
index 0000000000..fc075e6d9d
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/WordMax.scala
@@ -0,0 +1,64 @@
+package spark.stream
+
+import SparkStreamContext._
+
+import scala.util.Sorting
+
+import spark.SparkContext
+import spark.storage.StorageLevel
+
+object WordMax {
+ var inputFile : String = null
+ var HDFS : String = null
+ var idealPartitions : Int = 0
+
+ def main (args: Array[String]) {
+
+ if (args.length != 4) {
+ println ("Usage: WordCount <host> <HDFS> <Input file> <Ideal Partitions>")
+ System.exit(1)
+ }
+
+ HDFS = args(1)
+ inputFile = HDFS + args(2)
+ idealPartitions = args(3).toInt
+ println ("Input file: " + inputFile)
+
+ val ssc = new SparkStreamContext(args(0), "WordCountWindow")
+
+ SparkContext.idealPartitions = idealPartitions
+ SparkContext.inputFile = inputFile
+
+ val sentences = ssc.readNetworkStream[String]("Sentences", Array("localhost:55119"), 2000)
+ //sentences.print
+
+ val words = sentences.flatMap(_.split(" "))
+
+ def add(v1: Int, v2: Int) = (v1 + v2)
+ def subtract(v1: Int, v2: Int) = (v1 - v2)
+ def max(v1: Int, v2: Int) = (if (v1 > v2) v1 else v2)
+
+ //val windowedCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(add _, subtract _, Seconds(30), Milliseconds(2000),
+ // System.getProperty("spark.default.parallelism", "1").toInt)
+ //windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.DISK_AND_MEMORY_DESER_2, Seconds(5))
+ //windowedCounts.print
+
+ val parallelism = System.getProperty("spark.default.parallelism", "1").toInt
+
+ val localCounts = words.map(x => (x, 1)).reduceByKey(add _, parallelism)
+ //localCounts.persist(StorageLevel.MEMORY_ONLY_DESER)
+ localCounts.persist(StorageLevel.MEMORY_ONLY_DESER_2)
+ val windowedCounts = localCounts.window(Seconds(30), Seconds(2)).reduceByKey(max _)
+
+ //val windowedCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(2),
+ // parallelism)
+ //windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2, Seconds(6))
+
+ //windowedCounts.print
+ windowedCounts.register
+ //windowedCounts.foreachRDD(rdd => rdd.collect.foreach(x => print(x+ " ")))
+ //windowedCounts.foreachRDD(rdd => rdd.collect.foreach(x => x))
+
+ ssc.run
+ }
+}