diff options
52 files changed, 5141 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/BlockRDD.scala b/core/src/main/scala/spark/BlockRDD.scala new file mode 100644 index 0000000000..ea009f0f4f --- /dev/null +++ b/core/src/main/scala/spark/BlockRDD.scala @@ -0,0 +1,42 @@ +package spark + +import scala.collection.mutable.HashMap + +class BlockRDDSplit(val blockId: String, idx: Int) extends Split { + val index = idx +} + + +class BlockRDD[T: ClassManifest](sc: SparkContext, blockIds: Array[String]) extends RDD[T](sc) { + + @transient + val splits_ = (0 until blockIds.size).map(i => { + new BlockRDDSplit(blockIds(i), i).asInstanceOf[Split] + }).toArray + + @transient + lazy val locations_ = { + val blockManager = SparkEnv.get.blockManager + /*val locations = blockIds.map(id => blockManager.getLocations(id))*/ + val locations = blockManager.getLocations(blockIds) + HashMap(blockIds.zip(locations):_*) + } + + override def splits = splits_ + + override def compute(split: Split): Iterator[T] = { + val blockManager = SparkEnv.get.blockManager + val blockId = split.asInstanceOf[BlockRDDSplit].blockId + blockManager.get(blockId) match { + case Some(block) => block.asInstanceOf[Iterator[T]] + case None => + throw new Exception("Could not compute split, block " + blockId + " not found") + } + } + + override def preferredLocations(split: Split) = + locations_(split.asInstanceOf[BlockRDDSplit].blockId) + + override val dependencies: List[Dependency[_]] = Nil +} + diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index dd17d4d6b3..78c7618542 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -409,6 +409,11 @@ class SparkContext( * various Spark features. */ object SparkContext { + + // TODO: temporary hack for using HDFS as input in streaing + var inputFile: String = null + var idealPartitions: Int = 1 + implicit object DoubleAccumulatorParam extends AccumulatorParam[Double] { def addInPlace(t1: Double, t2: Double): Double = t1 + t2 def zero(initialValue: Double) = 0.0 diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 726d490738..c4ada2bf2a 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -8,7 +8,7 @@ object SparkBuild extends Build { // "1.0.1" for Apache releases, or "0.20.2-cdh3u3" for Cloudera Hadoop. val HADOOP_VERSION = "0.20.205.0" - lazy val root = Project("root", file("."), settings = sharedSettings) aggregate(core, repl, examples, bagel) + lazy val root = Project("root", file("."), settings = sharedSettings) aggregate(core, repl, examples, bagel, streaming) lazy val core = Project("core", file("core"), settings = coreSettings) @@ -18,6 +18,8 @@ object SparkBuild extends Build { lazy val bagel = Project("bagel", file("bagel"), settings = bagelSettings) dependsOn (core) + lazy val streaming = Project("streaming", file("streaming"), settings = streamingSettings) dependsOn (core) + def sharedSettings = Defaults.defaultSettings ++ Seq( organization := "org.spark-project", version := "0.6.0-SNAPSHOT", @@ -82,6 +84,8 @@ object SparkBuild extends Build { def bagelSettings = sharedSettings ++ Seq(name := "spark-bagel") + def streamingSettings = sharedSettings ++ Seq(name := "spark-streaming") + def extraAssemblySettings() = Seq(test in assembly := {}) ++ Seq( mergeStrategy in assembly := { case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard @@ -46,6 +46,7 @@ CORE_DIR="$FWDIR/core" REPL_DIR="$FWDIR/repl" EXAMPLES_DIR="$FWDIR/examples" BAGEL_DIR="$FWDIR/bagel" +STREAMING_DIR="$FWDIR/streaming" # Build up classpath CLASSPATH="$SPARK_CLASSPATH" @@ -55,6 +56,7 @@ CLASSPATH+=":$CORE_DIR/target/scala-$SCALA_VERSION/classes" CLASSPATH+=":$CORE_DIR/src/main/resources" CLASSPATH+=":$REPL_DIR/target/scala-$SCALA_VERSION/classes" CLASSPATH+=":$EXAMPLES_DIR/target/scala-$SCALA_VERSION/classes" +CLASSPATH+=":$STREAMING_DIR/target/scala-$SCALA_VERSION/classes" for jar in `find $CORE_DIR/lib -name '*jar'`; do CLASSPATH+=":$jar" done diff --git a/sentences.txt b/sentences.txt new file mode 100644 index 0000000000..fedf96c66e --- /dev/null +++ b/sentences.txt @@ -0,0 +1,3 @@ +Hello world! +What's up? +There is no cow level diff --git a/startTrigger.sh b/startTrigger.sh new file mode 100755 index 0000000000..0afce91a3e --- /dev/null +++ b/startTrigger.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +./run spark.stream.SentenceGenerator localhost 7078 sentences.txt 1 diff --git a/streaming/src/main/scala/spark/stream/BlockID.scala b/streaming/src/main/scala/spark/stream/BlockID.scala new file mode 100644 index 0000000000..a3fd046c9a --- /dev/null +++ b/streaming/src/main/scala/spark/stream/BlockID.scala @@ -0,0 +1,20 @@ +package spark.stream + +case class BlockID(sRds: String, sInterval: Interval, sPartition: Int) { + override def toString : String = ( + sRds + BlockID.sConnector + + sInterval.beginTime + BlockID.sConnector + + sInterval.endTime + BlockID.sConnector + + sPartition + ) +} + +object BlockID { + val sConnector = '-' + + def parse(name : String) = BlockID( + name.split(BlockID.sConnector)(0), + new Interval(name.split(BlockID.sConnector)(1).toLong, + name.split(BlockID.sConnector)(2).toLong), + name.split(BlockID.sConnector)(3).toInt) +}
\ No newline at end of file diff --git a/streaming/src/main/scala/spark/stream/ConnectionHandler.scala b/streaming/src/main/scala/spark/stream/ConnectionHandler.scala new file mode 100644 index 0000000000..73b82b76b8 --- /dev/null +++ b/streaming/src/main/scala/spark/stream/ConnectionHandler.scala @@ -0,0 +1,157 @@ +package spark.stream + +import spark.Logging + +import scala.collection.mutable.{ArrayBuffer, SynchronizedQueue} + +import java.net._ +import java.io._ +import java.nio._ +import java.nio.charset._ +import java.nio.channels._ +import java.nio.channels.spi._ + +abstract class ConnectionHandler(host: String, port: Int, connect: Boolean) +extends Thread with Logging { + + val selector = SelectorProvider.provider.openSelector() + val interestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)] + + initLogging() + + override def run() { + try { + if (connect) { + connect() + } else { + listen() + } + + var interrupted = false + while(!interrupted) { + + preSelect() + + while(!interestChangeRequests.isEmpty) { + val (key, ops) = interestChangeRequests.dequeue + val lastOps = key.interestOps() + key.interestOps(ops) + + def intToOpStr(op: Int): String = { + val opStrs = new ArrayBuffer[String]() + if ((op & SelectionKey.OP_READ) != 0) opStrs += "READ" + if ((op & SelectionKey.OP_WRITE) != 0) opStrs += "WRITE" + if ((op & SelectionKey.OP_CONNECT) != 0) opStrs += "CONNECT" + if ((op & SelectionKey.OP_ACCEPT) != 0) opStrs += "ACCEPT" + if (opStrs.size > 0) opStrs.reduceLeft(_ + " | " + _) else " " + } + + logTrace("Changed ops from [" + intToOpStr(lastOps) + "] to [" + intToOpStr(ops) + "]") + } + + selector.select() + interrupted = Thread.currentThread.isInterrupted + + val selectedKeys = selector.selectedKeys().iterator() + while (selectedKeys.hasNext) { + val key = selectedKeys.next.asInstanceOf[SelectionKey] + selectedKeys.remove() + if (key.isValid) { + if (key.isAcceptable) { + accept(key) + } else if (key.isConnectable) { + finishConnect(key) + } else if (key.isReadable) { + read(key) + } else if (key.isWritable) { + write(key) + } + } + } + } + } catch { + case e: Exception => { + logError("Error in select loop", e) + } + } + } + + def connect() { + val socketAddress = new InetSocketAddress(host, port) + val channel = SocketChannel.open() + channel.configureBlocking(false) + channel.socket.setReuseAddress(true) + channel.socket.setTcpNoDelay(true) + channel.connect(socketAddress) + channel.register(selector, SelectionKey.OP_CONNECT) + logInfo("Initiating connection to [" + socketAddress + "]") + } + + def listen() { + val channel = ServerSocketChannel.open() + channel.configureBlocking(false) + channel.socket.setReuseAddress(true) + channel.socket.setReceiveBufferSize(256 * 1024) + channel.socket.bind(new InetSocketAddress(port)) + channel.register(selector, SelectionKey.OP_ACCEPT) + logInfo("Listening on port " + port) + } + + def finishConnect(key: SelectionKey) { + try { + val channel = key.channel.asInstanceOf[SocketChannel] + val address = channel.socket.getRemoteSocketAddress + channel.finishConnect() + logInfo("Connected to [" + host + ":" + port + "]") + ready(key) + } catch { + case e: IOException => { + logError("Error finishing connect to " + host + ":" + port) + close(key) + } + } + } + + def accept(key: SelectionKey) { + try { + val serverChannel = key.channel.asInstanceOf[ServerSocketChannel] + val channel = serverChannel.accept() + val address = channel.socket.getRemoteSocketAddress + channel.configureBlocking(false) + logInfo("Accepted connection from [" + address + "]") + ready(channel.register(selector, 0)) + } catch { + case e: IOException => { + logError("Error accepting connection", e) + } + } + } + + def changeInterest(key: SelectionKey, ops: Int) { + logTrace("Added request to change ops to " + ops) + interestChangeRequests += ((key, ops)) + } + + def ready(key: SelectionKey) + + def preSelect() { + } + + def read(key: SelectionKey) { + throw new UnsupportedOperationException("Cannot read on connection of type " + this.getClass.toString) + } + + def write(key: SelectionKey) { + throw new UnsupportedOperationException("Cannot write on connection of type " + this.getClass.toString) + } + + def close(key: SelectionKey) { + try { + key.channel.close() + key.cancel() + Thread.currentThread.interrupt + } catch { + case e: Exception => logError("Error closing connection", e) + } + } +} diff --git a/streaming/src/main/scala/spark/stream/DumbTopKWordCount2_Special.scala b/streaming/src/main/scala/spark/stream/DumbTopKWordCount2_Special.scala new file mode 100644 index 0000000000..bd43f44b1a --- /dev/null +++ b/streaming/src/main/scala/spark/stream/DumbTopKWordCount2_Special.scala @@ -0,0 +1,138 @@ +package spark.stream + +import spark.SparkContext +import SparkContext._ +import SparkStreamContext._ + +import spark.storage.StorageLevel + +import scala.util.Sorting +import scala.collection.JavaConversions.mapAsScalaMap +import scala.collection.mutable.Queue + +import java.lang.{Long => JLong} + +object DumbTopKWordCount2_Special { + + def moreWarmup(sc: SparkContext) { + (0 until 20).foreach {i => + sc.parallelize(1 to 20000000, 500) + .map(_ % 100).map(_.toString) + .map(x => (x, 1)).reduceByKey(_ + _, 10) + .collect() + } + } + + def main (args: Array[String]) { + + if (args.length < 2) { + println ("Usage: SparkStreamContext <host> <# sentence streams>") + System.exit(1) + } + + val ssc = new SparkStreamContext(args(0), "WordCount2") + + val numSentenceStreams = if (args.length > 1) args(1).toInt else 1 + if (args.length > 2) { + ssc.setTempDir(args(2)) + } + + GrepCount2.warmConnectionManagers(ssc.sc) + moreWarmup(ssc.sc) + + val sentences = new UnifiedRDS( + (1 to numSentenceStreams).map(i => ssc.readTestStream("Sentences-" + i, 1000)).toArray + ) + + + def add(v1: JLong, v2: JLong) = (v1 + v2) + def subtract(v1: JLong, v2: JLong) = (v1 - v2) + + def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, JLong)] = { + val map = new java.util.HashMap[String, JLong] + var i = 0 + var j = 0 + while (iter.hasNext) { + val s = iter.next() + i = 0 + while (i < s.length) { + j = i + while (j < s.length && s.charAt(j) != ' ') { + j += 1 + } + if (j > i) { + val w = s.substring(i, j) + val c = map.get(w) + if (c == null) { + map.put(w, 1) + } else { + map.put(w, c + 1) + } + } + i = j + while (i < s.length && s.charAt(i) == ' ') { + i += 1 + } + } + } + map.toIterator + } + + + val wordCounts = sentences.mapPartitions(splitAndCountPartitions).reduceByKey(_ + _, 10) + wordCounts.persist(StorageLevel.MEMORY_ONLY) + val windowedCounts = wordCounts.window(Seconds(10), Seconds(1)).reduceByKey(_ + _, 10) + + def topK(data: Iterator[(String, JLong)], k: Int): Iterator[(String, JLong)] = { + val taken = new Array[(String, JLong)](k) + + var i = 0 + var len = 0 + var done = false + var value: (String, JLong) = null + var swap: (String, JLong) = null + var count = 0 + + while(data.hasNext) { + value = data.next + count += 1 + /*println("count = " + count)*/ + if (len == 0) { + taken(0) = value + len = 1 + } else if (len < k || value._2 > taken(len - 1)._2) { + if (len < k) { + len += 1 + } + taken(len - 1) = value + i = len - 1 + while(i > 0 && taken(i - 1)._2 < taken(i)._2) { + swap = taken(i) + taken(i) = taken(i-1) + taken(i - 1) = swap + i -= 1 + } + } + } + println("Took " + len + " out of " + count + " items") + return taken.toIterator + } + + val k = 10 + val partialTopKWindowedCounts = windowedCounts.mapPartitions(topK(_, k)) + partialTopKWindowedCounts.foreachRDD(rdd => { + val collectedCounts = rdd.collect + println("Collected " + collectedCounts.size + " items") + topK(collectedCounts.toIterator, k).foreach(println) + }) + + /* + windowedCounts.filter(_ == null).foreachRDD(rdd => { + val count = rdd.count + println("# of nulls = " + count) + })*/ + + ssc.run + } +} + diff --git a/streaming/src/main/scala/spark/stream/DumbWordCount2_Special.scala b/streaming/src/main/scala/spark/stream/DumbWordCount2_Special.scala new file mode 100644 index 0000000000..31d682348a --- /dev/null +++ b/streaming/src/main/scala/spark/stream/DumbWordCount2_Special.scala @@ -0,0 +1,92 @@ +package spark.stream + +import spark.SparkContext +import SparkContext._ +import SparkStreamContext._ + +import spark.storage.StorageLevel + +import scala.util.Sorting +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.HashMap +import scala.collection.mutable.Queue +import scala.collection.JavaConversions.mapAsScalaMap + +import java.lang.{Long => JLong} +import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap} + +object DumbWordCount2_Special { + + def moreWarmup(sc: SparkContext) { + (0 until 20).foreach {i => + sc.parallelize(1 to 20000000, 500) + .map(_ % 100).map(_.toString) + .map(x => (x, 1)).reduceByKey(_ + _, 10) + .collect() + } + } + + def main (args: Array[String]) { + + if (args.length < 2) { + println ("Usage: SparkStreamContext <host> <# sentence streams>") + System.exit(1) + } + + val ssc = new SparkStreamContext(args(0), "WordCount2") + + val numSentenceStreams = if (args.length > 1) args(1).toInt else 1 + if (args.length > 2) { + ssc.setTempDir(args(2)) + } + + GrepCount2.warmConnectionManagers(ssc.sc) + moreWarmup(ssc.sc) + + val sentences = new UnifiedRDS( + (1 to numSentenceStreams).map(i => ssc.readTestStream("Sentences-" + i, 1000)).toArray + ) + + def add(v1: JLong, v2: JLong) = (v1 + v2) + def subtract(v1: JLong, v2: JLong) = (v1 - v2) + + def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, JLong)] = { + val map = new java.util.HashMap[String, JLong] + var i = 0 + var j = 0 + while (iter.hasNext) { + val s = iter.next() + i = 0 + while (i < s.length) { + j = i + while (j < s.length && s.charAt(j) != ' ') { + j += 1 + } + if (j > i) { + val w = s.substring(i, j) + val c = map.get(w) + if (c == null) { + map.put(w, 1) + } else { + map.put(w, c + 1) + } + } + i = j + while (i < s.length && s.charAt(i) == ' ') { + i += 1 + } + } + } + + map.toIterator + } + + val wordCounts = sentences.mapPartitions(splitAndCountPartitions).reduceByKey(_ + _, 10) + wordCounts.persist(StorageLevel.MEMORY_ONLY) + val windowedCounts = wordCounts.window(Seconds(10), Seconds(1)).reduceByKey(_ + _, 10) + windowedCounts.foreachRDD(_.collect) + + ssc.run + } +} + diff --git a/streaming/src/main/scala/spark/stream/FileStreamReceiver.scala b/streaming/src/main/scala/spark/stream/FileStreamReceiver.scala new file mode 100644 index 0000000000..026254d6e1 --- /dev/null +++ b/streaming/src/main/scala/spark/stream/FileStreamReceiver.scala @@ -0,0 +1,70 @@ +package spark.stream + +import spark.Logging + +import scala.collection.mutable.HashSet +import scala.actors._ +import scala.actors.Actor._ +import scala.actors.remote._ +import scala.actors.remote.RemoteActor._ + +import org.apache.hadoop.fs._ +import org.apache.hadoop.conf._ +import org.apache.hadoop.io._ +import org.apache.hadoop.mapred._ +import org.apache.hadoop.util._ + +class FileStreamReceiver ( + inputName: String, + rootDirectory: String, + intervalDuration: Long) + extends Logging { + + val pollInterval = 100 + val sparkstreamScheduler = { + val host = System.getProperty("spark.master.host") + val port = System.getProperty("spark.master.port").toInt + 1 + RemoteActor.select(Node(host, port), 'SparkStreamScheduler) + } + val directory = new Path(rootDirectory) + val fs = directory.getFileSystem(new Configuration()) + val files = new HashSet[String]() + var time: Long = 0 + + def start() { + fs.mkdirs(directory) + files ++= getFiles() + + actor { + logInfo("Monitoring directory - " + rootDirectory) + while(true) { + testFiles(getFiles()) + Thread.sleep(pollInterval) + } + } + } + + def getFiles(): Iterable[String] = { + fs.listStatus(directory).map(_.getPath.toString) + } + + def testFiles(fileList: Iterable[String]) { + fileList.foreach(file => { + if (!files.contains(file)) { + if (!file.endsWith("_tmp")) { + notifyFile(file) + } + files += file + } + }) + } + + def notifyFile(file: String) { + logInfo("Notifying file " + file) + time += intervalDuration + val interval = Interval(LongTime(time), LongTime(time + intervalDuration)) + sparkstreamScheduler ! InputGenerated(inputName, interval, file) + } +} + + diff --git a/streaming/src/main/scala/spark/stream/GrepCount.scala b/streaming/src/main/scala/spark/stream/GrepCount.scala new file mode 100644 index 0000000000..45b90d4837 --- /dev/null +++ b/streaming/src/main/scala/spark/stream/GrepCount.scala @@ -0,0 +1,39 @@ +package spark.stream + +import SparkStreamContext._ + +import scala.util.Sorting + +import spark.SparkContext +import spark.storage.StorageLevel + +object GrepCount { + var inputFile : String = null + var HDFS : String = null + var idealPartitions : Int = 0 + + def main (args: Array[String]) { + + if (args.length != 4) { + println ("Usage: GrepCount <host> <HDFS> <Input file> <Ideal Partitions>") + System.exit(1) + } + + HDFS = args(1) + inputFile = HDFS + args(2) + idealPartitions = args(3).toInt + println ("Input file: " + inputFile) + + val ssc = new SparkStreamContext(args(0), "GrepCount") + + SparkContext.idealPartitions = idealPartitions + SparkContext.inputFile = inputFile + + val sentences = ssc.readNetworkStream[String]("Sentences", Array("localhost:55119"), 1000) + //sentences.print + val matching = sentences.filter(_.contains("light")) + matching.foreachRDD(rdd => println(rdd.count)) + + ssc.run + } +} diff --git a/streaming/src/main/scala/spark/stream/GrepCount2.scala b/streaming/src/main/scala/spark/stream/GrepCount2.scala new file mode 100644 index 0000000000..4eb65ba906 --- /dev/null +++ b/streaming/src/main/scala/spark/stream/GrepCount2.scala @@ -0,0 +1,113 @@ +package spark.stream + +import SparkStreamContext._ + +import scala.util.Sorting + +import spark.SparkEnv +import spark.SparkContext +import spark.storage.StorageLevel +import spark.network.Message +import spark.network.ConnectionManagerId + +import java.nio.ByteBuffer + +object GrepCount2 { + + def startSparkEnvs(sc: SparkContext) { + + val dummy = sc.parallelize(0 to 1000, 100).persist(StorageLevel.DISK_AND_MEMORY) + sc.runJob(dummy, (_: Iterator[Int]) => {}) + + println("SparkEnvs started") + Thread.sleep(1000) + /*sc.runJob(sc.parallelize(0 to 1000, 100), (_: Iterator[Int]) => {})*/ + } + + def warmConnectionManagers(sc: SparkContext) { + val slaveConnManagerIds = sc.parallelize(0 to 100, 100).map( + i => SparkEnv.get.connectionManager.id).collect().distinct + println("\nSlave ConnectionManagerIds") + slaveConnManagerIds.foreach(println) + println + + Thread.sleep(1000) + val numSlaves = slaveConnManagerIds.size + val count = 3 + val size = 5 * 1024 * 1024 + val iterations = (500 * 1024 * 1024 / (numSlaves * size)).toInt + println("count = " + count + ", size = " + size + ", iterations = " + iterations) + + (0 until count).foreach(i => { + val resultStrs = sc.parallelize(0 until numSlaves, numSlaves).map(i => { + val connManager = SparkEnv.get.connectionManager + val thisConnManagerId = connManager.id + /*connManager.onReceiveMessage((msg: Message, id: ConnectionManagerId) => { + println("Received [" + msg + "] from [" + id + "]") + None + })*/ + + + val buffer = ByteBuffer.allocate(size).put(Array.tabulate[Byte](size)(x => x.toByte)) + buffer.flip + + val startTime = System.currentTimeMillis + val futures = (0 until iterations).map(i => { + slaveConnManagerIds.filter(_ != thisConnManagerId).map(slaveConnManagerId => { + val bufferMessage = Message.createBufferMessage(buffer.duplicate) + println("Sending [" + bufferMessage + "] to [" + slaveConnManagerId + "]") + connManager.sendMessageReliably(slaveConnManagerId, bufferMessage) + }) + }).flatMap(x => x) + val results = futures.map(f => f()) + val finishTime = System.currentTimeMillis + + + val mb = size * results.size / 1024.0 / 1024.0 + val ms = finishTime - startTime + + val resultStr = "Sent " + mb + " MB in " + ms + " ms at " + (mb / ms * 1000.0) + " MB/s" + println(resultStr) + System.gc() + resultStr + }).collect() + + println("---------------------") + println("Run " + i) + resultStrs.foreach(println) + println("---------------------") + }) + } + + + def main (args: Array[String]) { + + if (args.length < 2) { + println ("Usage: GrepCount2 <host> <# sentence streams>") + System.exit(1) + } + + val ssc = new SparkStreamContext(args(0), "GrepCount2") + + val numSentenceStreams = if (args.length > 1) args(1).toInt else 1 + if (args.length > 2) { + ssc.setTempDir(args(2)) + } + + /*startSparkEnvs(ssc.sc)*/ + warmConnectionManagers(ssc.sc) + + val sentences = new UnifiedRDS( + (1 to numSentenceStreams).map(i => ssc.readTestStream("Sentences-"+i, 500)).toArray + ) + + val matching = sentences.filter(_.contains("light")) + matching.foreachRDD(rdd => println(rdd.count)) + + ssc.run + } +} + + + + diff --git a/streaming/src/main/scala/spark/stream/GrepCountApprox.scala b/streaming/src/main/scala/spark/stream/GrepCountApprox.scala new file mode 100644 index 0000000000..a4be2cc936 --- /dev/null +++ b/streaming/src/main/scala/spark/stream/GrepCountApprox.scala @@ -0,0 +1,54 @@ +package spark.stream + +import SparkStreamContext._ + +import scala.util.Sorting + +import spark.SparkContext +import spark.storage.StorageLevel + +object GrepCountApprox { + var inputFile : String = null + var hdfs : String = null + var idealPartitions : Int = 0 + + def main (args: Array[String]) { + + if (args.length != 5) { + println ("Usage: GrepCountApprox <host> <HDFS> <Input file> <Ideal Partitions> <Timeout>") + System.exit(1) + } + + hdfs = args(1) + inputFile = hdfs + args(2) + idealPartitions = args(3).toInt + val timeout = args(4).toLong + println ("Input file: " + inputFile) + + val ssc = new SparkStreamContext(args(0), "GrepCount") + + SparkContext.idealPartitions = idealPartitions + SparkContext.inputFile = inputFile + ssc.setTempDir(hdfs + "/tmp") + + val sentences = ssc.readNetworkStream[String]("Sentences", Array("localhost:55119"), 1000) + //sentences.print + val matching = sentences.filter(_.contains("light")) + var i = 0 + val startTime = System.currentTimeMillis + matching.foreachRDD { rdd => + val myNum = i + val result = rdd.countApprox(timeout) + val initialTime = (System.currentTimeMillis - startTime) / 1000.0 + printf("APPROX\t%.2f\t%d\tinitial\t%.1f\t%.1f\n", initialTime, myNum, result.initialValue.mean, + result.initialValue.high - result.initialValue.low) + result.onComplete { r => + val finalTime = (System.currentTimeMillis - startTime) / 1000.0 + printf("APPROX\t%.2f\t%d\tfinal\t%.1f\t0.0\t%.1f\n", finalTime, myNum, r.mean, finalTime - initialTime) + } + i += 1 + } + + ssc.run + } +} diff --git a/streaming/src/main/scala/spark/stream/IdealPerformance.scala b/streaming/src/main/scala/spark/stream/IdealPerformance.scala new file mode 100644 index 0000000000..589fb2def0 --- /dev/null +++ b/streaming/src/main/scala/spark/stream/IdealPerformance.scala @@ -0,0 +1,36 @@ +package spark.stream + +import scala.collection.mutable.Map + +object IdealPerformance { + val base: String = "The medium researcher counts around the pinched troop The empire breaks " + + "Matei Matei announces HY with a theorem " + + def main (args: Array[String]) { + val sentences: String = base * 100000 + + for (i <- 1 to 30) { + val start = System.nanoTime + + val words = sentences.split(" ") + + val pairs = words.map(word => (word, 1)) + + val counts = Map[String, Int]() + + println("Job " + i + " position A at " + (System.nanoTime - start) / 1e9) + + pairs.foreach((pair) => { + var t = counts.getOrElse(pair._1, 0) + counts(pair._1) = t + pair._2 + }) + println("Job " + i + " position B at " + (System.nanoTime - start) / 1e9) + + for ((word, count) <- counts) { + print(word + " " + count + "; ") + } + println + println("Job " + i + " finished in " + (System.nanoTime - start) / 1e9) + } + } +}
\ No newline at end of file diff --git a/streaming/src/main/scala/spark/stream/Interval.scala b/streaming/src/main/scala/spark/stream/Interval.scala new file mode 100644 index 0000000000..08d0ed95b4 --- /dev/null +++ b/streaming/src/main/scala/spark/stream/Interval.scala @@ -0,0 +1,75 @@ +package spark.stream + +case class Interval (val beginTime: Time, val endTime: Time) { + + def this(beginMs: Long, endMs: Long) = this(new LongTime(beginMs), new LongTime(endMs)) + + def duration(): Time = endTime - beginTime + + def += (time: Time) { + beginTime += time + endTime += time + this + } + + def + (time: Time): Interval = { + new Interval(beginTime + time, endTime + time) + } + + def < (that: Interval): Boolean = { + if (this.duration != that.duration) { + throw new Exception("Comparing two intervals with different durations [" + this + ", " + that + "]") + } + this.endTime < that.endTime + } + + def <= (that: Interval) = (this < that || this == that) + + def > (that: Interval) = !(this <= that) + + def >= (that: Interval) = !(this < that) + + def next(): Interval = { + this + (endTime - beginTime) + } + + def isZero() = (beginTime.isZero && endTime.isZero) + + def toFormattedString = beginTime.toFormattedString + "-" + endTime.toFormattedString + + override def toString = "[" + beginTime + ", " + endTime + "]" +} + +object Interval { + + /* + implicit def longTupleToInterval (longTuple: (Long, Long)) = + Interval(longTuple._1, longTuple._2) + + implicit def intTupleToInterval (intTuple: (Int, Int)) = + Interval(intTuple._1, intTuple._2) + + implicit def string2Interval (str: String): Interval = { + val parts = str.split(",") + if (parts.length == 1) + return Interval.zero + return Interval (parts(0).toInt, parts(1).toInt) + } + + def getInterval (timeMs: Long, intervalDurationMs: Long): Interval = { + val intervalBeginMs = timeMs / intervalDurationMs * intervalDurationMs + Interval(intervalBeginMs, intervalBeginMs + intervalDurationMs) + } + */ + + def zero() = new Interval (Time.zero, Time.zero) + + def currentInterval(intervalDuration: LongTime): Interval = { + val time = LongTime(System.currentTimeMillis) + val intervalBegin = time.floor(intervalDuration) + Interval(intervalBegin, intervalBegin + intervalDuration) + } + +} + + diff --git a/streaming/src/main/scala/spark/stream/Job.scala b/streaming/src/main/scala/spark/stream/Job.scala new file mode 100644 index 0000000000..bfdd5db645 --- /dev/null +++ b/streaming/src/main/scala/spark/stream/Job.scala @@ -0,0 +1,21 @@ +package spark.stream + +class Job(val time: Time, func: () => _) { + val id = Job.getNewId() + + def run() { + func() + } + + override def toString = "SparkStream Job " + id + ":" + time +} + +object Job { + var lastId = 1 + + def getNewId() = synchronized { + lastId += 1 + lastId + } +} + diff --git a/streaming/src/main/scala/spark/stream/JobManager.scala b/streaming/src/main/scala/spark/stream/JobManager.scala new file mode 100644 index 0000000000..5ea80b92aa --- /dev/null +++ b/streaming/src/main/scala/spark/stream/JobManager.scala @@ -0,0 +1,112 @@ +package spark.stream + +import spark.SparkEnv +import spark.Logging + +import scala.collection.mutable.PriorityQueue +import scala.actors._ +import scala.actors.Actor._ +import scala.actors.remote._ +import scala.actors.remote.RemoteActor._ +import scala.actors.scheduler.ResizableThreadPoolScheduler +import scala.actors.scheduler.ForkJoinScheduler + +sealed trait JobManagerMessage +case class RunJob(job: Job) extends JobManagerMessage +case class JobCompleted(handlerId: Int) extends JobManagerMessage + +class JobHandler(ssc: SparkStreamContext, val id: Int) extends DaemonActor with Logging { + + var busy = false + + def act() { + loop { + receive { + case job: Job => { + SparkEnv.set(ssc.env) + try { + logInfo("Starting " + job) + job.run() + logInfo("Finished " + job) + if (job.time.isInstanceOf[LongTime]) { + val longTime = job.time.asInstanceOf[LongTime] + logInfo("Total pushing + skew + processing delay for " + longTime + " is " + + (System.currentTimeMillis - longTime.milliseconds) / 1000.0 + " s") + } + } catch { + case e: Exception => logError("SparkStream job failed", e) + } + busy = false + reply(JobCompleted(id)) + } + } + } + } +} + +class JobManager(ssc: SparkStreamContext, numThreads: Int = 2) extends DaemonActor with Logging { + + implicit private val jobOrdering = new Ordering[Job] { + override def compare(job1: Job, job2: Job): Int = { + if (job1.time < job2.time) { + return 1 + } else if (job2.time < job1.time) { + return -1 + } else { + return 0 + } + } + } + + private val jobs = new PriorityQueue[Job]() + private val handlers = (0 until numThreads).map(i => new JobHandler(ssc, i)) + + def act() { + handlers.foreach(_.start) + loop { + receive { + case RunJob(job) => { + jobs += job + logInfo("Job " + job + " submitted") + runJob() + } + case JobCompleted(handlerId) => { + runJob() + } + } + } + } + + def runJob(): Unit = { + logInfo("Attempting to allocate job ") + if (jobs.size > 0) { + handlers.find(!_.busy).foreach(handler => { + val job = jobs.dequeue + logInfo("Allocating job " + job + " to handler " + handler.id) + handler.busy = true + handler ! job + }) + } + } +} + +object JobManager { + def main(args: Array[String]) { + val ssc = new SparkStreamContext("local[4]", "JobManagerTest") + val jobManager = new JobManager(ssc) + jobManager.start() + + val t = System.currentTimeMillis + for (i <- 1 to 10) { + jobManager ! RunJob(new Job( + LongTime(i), + () => { + Thread.sleep(500) + println("Job " + i + " took " + (System.currentTimeMillis - t) + " ms") + } + )) + } + Thread.sleep(6000) + } +} + diff --git a/streaming/src/main/scala/spark/stream/JobManager2.scala b/streaming/src/main/scala/spark/stream/JobManager2.scala new file mode 100644 index 0000000000..b69653b9a4 --- /dev/null +++ b/streaming/src/main/scala/spark/stream/JobManager2.scala @@ -0,0 +1,37 @@ +package spark.stream + +import spark.{Logging, SparkEnv} +import java.util.concurrent.Executors + + +class JobManager2(ssc: SparkStreamContext, numThreads: Int = 1) extends Logging { + + class JobHandler(ssc: SparkStreamContext, job: Job) extends Runnable { + def run() { + SparkEnv.set(ssc.env) + try { + logInfo("Starting " + job) + job.run() + logInfo("Finished " + job) + if (job.time.isInstanceOf[LongTime]) { + val longTime = job.time.asInstanceOf[LongTime] + logInfo("Total notification + skew + processing delay for " + longTime + " is " + + (System.currentTimeMillis - longTime.milliseconds) / 1000.0 + " s") + if (System.getProperty("spark.stream.distributed", "false") == "true") { + TestInputBlockTracker.setEndTime(job.time) + } + } + } catch { + case e: Exception => logError("SparkStream job failed", e) + } + } + } + + initLogging() + + val jobExecutor = Executors.newFixedThreadPool(numThreads) + + def runJob(job: Job) { + jobExecutor.execute(new JobHandler(ssc, job)) + } +} diff --git a/streaming/src/main/scala/spark/stream/NetworkStreamReceiver.scala b/streaming/src/main/scala/spark/stream/NetworkStreamReceiver.scala new file mode 100644 index 0000000000..8be46cc927 --- /dev/null +++ b/streaming/src/main/scala/spark/stream/NetworkStreamReceiver.scala @@ -0,0 +1,184 @@ +package spark.stream + +import spark.Logging +import spark.storage.StorageLevel + +import scala.math._ +import scala.collection.mutable.{Queue, HashMap, ArrayBuffer} +import scala.actors._ +import scala.actors.Actor._ +import scala.actors.remote._ +import scala.actors.remote.RemoteActor._ + +import java.io.BufferedWriter +import java.io.OutputStreamWriter + +import org.apache.hadoop.fs._ +import org.apache.hadoop.conf._ +import org.apache.hadoop.io._ +import org.apache.hadoop.mapred._ +import org.apache.hadoop.util._ + +/*import akka.actor.Actor._*/ + +class NetworkStreamReceiver[T: ClassManifest] ( + inputName: String, + intervalDuration: Time, + splitId: Int, + ssc: SparkStreamContext, + tempDirectory: String) + extends DaemonActor + with Logging { + + /** + * Assume all data coming in has non-decreasing timestamp. + */ + final class Inbox[T: ClassManifest] (intervalDuration: Time) { + var currentBucket: (Interval, ArrayBuffer[T]) = null + val filledBuckets = new Queue[(Interval, ArrayBuffer[T])]() + + def += (tuple: (Time, T)) = addTuple(tuple) + + def addTuple(tuple: (Time, T)) { + val (time, data) = tuple + val interval = getInterval (time) + + filledBuckets.synchronized { + if (currentBucket == null) { + currentBucket = (interval, new ArrayBuffer[T]()) + } + + if (interval != currentBucket._1) { + filledBuckets += currentBucket + currentBucket = (interval, new ArrayBuffer[T]()) + } + + currentBucket._2 += data + } + } + + def getInterval(time: Time): Interval = { + val intervalBegin = time.floor(intervalDuration) + Interval (intervalBegin, intervalBegin + intervalDuration) + } + + def hasFilledBuckets(): Boolean = { + filledBuckets.synchronized { + return filledBuckets.size > 0 + } + } + + def popFilledBucket(): (Interval, ArrayBuffer[T]) = { + filledBuckets.synchronized { + if (filledBuckets.size == 0) { + return null + } + return filledBuckets.dequeue() + } + } + } + + val inbox = new Inbox[T](intervalDuration) + lazy val sparkstreamScheduler = { + val host = System.getProperty("spark.master.host") + val port = System.getProperty("spark.master.port").toInt + val url = "akka://spark@%s:%s/user/SparkStreamScheduler".format(host, port) + ssc.actorSystem.actorFor(url) + } + /*sparkstreamScheduler ! Test()*/ + + val intervalDurationMillis = intervalDuration.asInstanceOf[LongTime].milliseconds + val useBlockManager = true + + initLogging() + + override def act() { + // register the InputReceiver + val port = 7078 + RemoteActor.alive(port) + RemoteActor.register(Symbol("NetworkStreamReceiver-"+inputName), self) + logInfo("Registered actor on port " + port) + + loop { + reactWithin (getSleepTime) { + case TIMEOUT => + flushInbox() + case data => + val t = data.asInstanceOf[T] + inbox += (getTimeFromData(t), t) + } + } + } + + def getSleepTime(): Long = { + (System.currentTimeMillis / intervalDurationMillis + 1) * + intervalDurationMillis - System.currentTimeMillis + } + + def getTimeFromData(data: T): Time = { + LongTime(System.currentTimeMillis) + } + + def flushInbox() { + while (inbox.hasFilledBuckets) { + inbox.synchronized { + val (interval, data) = inbox.popFilledBucket() + val dataArray = data.toArray + logInfo("Received " + dataArray.length + " items at interval " + interval) + val reference = { + if (useBlockManager) { + writeToBlockManager(dataArray, interval) + } else { + writeToDisk(dataArray, interval) + } + } + if (reference != null) { + logInfo("Notifying scheduler") + sparkstreamScheduler ! InputGenerated(inputName, interval, reference.toString) + } + } + } + } + + def writeToDisk(data: Array[T], interval: Interval): String = { + try { + // TODO(Haoyuan): For current test, the following writing to file lines could be + // commented. + val fs = new Path(tempDirectory).getFileSystem(new Configuration()) + val inputDir = new Path( + tempDirectory, + inputName + "-" + interval.toFormattedString) + val inputFile = new Path(inputDir, "part-" + splitId) + logInfo("Writing to file " + inputFile) + if (System.getProperty("spark.fake", "false") != "true") { + val writer = new BufferedWriter(new OutputStreamWriter(fs.create(inputFile, true))) + data.foreach(x => writer.write(x.toString + "\n")) + writer.close() + } else { + logInfo("Fake file") + } + inputFile.toString + }catch { + case e: Exception => + logError("Exception writing to file at interval " + interval + ": " + e.getMessage, e) + null + } + } + + def writeToBlockManager(data: Array[T], interval: Interval): String = { + try{ + val blockId = inputName + "-" + interval.toFormattedString + "-" + splitId + if (System.getProperty("spark.fake", "false") != "true") { + logInfo("Writing as block " + blockId ) + ssc.env.blockManager.put(blockId.toString, data.toIterator, StorageLevel.DISK_AND_MEMORY) + } else { + logInfo("Fake block") + } + blockId + } catch { + case e: Exception => + logError("Exception writing to block manager at interval " + interval + ": " + e.getMessage, e) + null + } + } +} diff --git a/streaming/src/main/scala/spark/stream/RDS.scala b/streaming/src/main/scala/spark/stream/RDS.scala new file mode 100644 index 0000000000..b83181b0d1 --- /dev/null +++ b/streaming/src/main/scala/spark/stream/RDS.scala @@ -0,0 +1,607 @@ +package spark.stream + +import spark.stream.SparkStreamContext._ + +import spark.RDD +import spark.BlockRDD +import spark.UnionRDD +import spark.Logging +import spark.SparkContext +import spark.SparkContext._ +import spark.storage.StorageLevel + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.HashMap + +import java.net.InetSocketAddress + +abstract class RDS[T: ClassManifest] (@transient val ssc: SparkStreamContext) +extends Logging with Serializable { + + initLogging() + + /* ---------------------------------------------- */ + /* Methods that must be implemented by subclasses */ + /* ---------------------------------------------- */ + + // Time by which the window slides in this RDS + def slideTime: Time + + // List of parent RDSs on which this RDS depends on + def dependencies: List[RDS[_]] + + // Key method that computes RDD for a valid time + def compute (validTime: Time): Option[RDD[T]] + + /* --------------------------------------- */ + /* Other general fields and methods of RDS */ + /* --------------------------------------- */ + + // Variable to store the RDDs generated earlier in time + @transient private val generatedRDDs = new HashMap[Time, RDD[T]] () + + // Variable to be set to the first time seen by the RDS (effective time zero) + private[stream] var zeroTime: Time = null + + // Variable to specify storage level + private var storageLevel: StorageLevel = StorageLevel.NONE + + // Checkpoint level and checkpoint interval + private var checkpointLevel: StorageLevel = StorageLevel.NONE // NONE means don't checkpoint + private var checkpointInterval: Time = null + + // Change this RDD's storage level + def persist( + storageLevel: StorageLevel, + checkpointLevel: StorageLevel, + checkpointInterval: Time): RDS[T] = { + if (this.storageLevel != StorageLevel.NONE && this.storageLevel != storageLevel) { + // TODO: not sure this is necessary for RDSes + throw new UnsupportedOperationException( + "Cannot change storage level of an RDS after it was already assigned a level") + } + this.storageLevel = storageLevel + this.checkpointLevel = checkpointLevel + this.checkpointInterval = checkpointInterval + this + } + + def persist(newLevel: StorageLevel): RDS[T] = persist(newLevel, StorageLevel.NONE, null) + + // Turn on the default caching level for this RDD + def persist(): RDS[T] = persist(StorageLevel.MEMORY_ONLY_DESER) + + // Turn on the default caching level for this RDD + def cache(): RDS[T] = persist() + + def isInitialized = (zeroTime != null) + + // This method initializes the RDS by setting the "zero" time, based on which + // the validity of future times is calculated. This method also recursively initializes + // its parent RDSs. + def initialize(firstInterval: Interval) { + if (zeroTime == null) { + zeroTime = firstInterval.beginTime + } + logInfo(this + " initialized") + dependencies.foreach(_.initialize(firstInterval)) + } + + // This method checks whether the 'time' is valid wrt slideTime for generating RDD + private def isTimeValid (time: Time): Boolean = { + if (!isInitialized) + throw new Exception (this.toString + " has not been initialized") + if ((time - zeroTime).isMultipleOf(slideTime)) { + true + } else { + false + } + } + + // This method either retrieves a precomputed RDD of this RDS, + // or computes the RDD (if the time is valid) + def getOrCompute(time: Time): Option[RDD[T]] = { + + // if RDD was already generated, then retrieve it from HashMap + generatedRDDs.get(time) match { + + // If an RDD was already generated and is being reused, then + // probably all RDDs in this RDS will be reused and hence should be cached + case Some(oldRDD) => Some(oldRDD) + + // if RDD was not generated, and if the time is valid + // (based on sliding time of this RDS), then generate the RDD + case None => + if (isTimeValid(time)) { + compute(time) match { + case Some(newRDD) => + if (System.getProperty("spark.fake", "false") != "true" || + newRDD.getStorageLevel == StorageLevel.NONE) { + if (checkpointInterval != null && (time - zeroTime).isMultipleOf(checkpointInterval)) { + newRDD.persist(checkpointLevel) + logInfo("Persisting " + newRDD + " to " + checkpointLevel + " at time " + time) + } else if (storageLevel != StorageLevel.NONE) { + newRDD.persist(storageLevel) + logInfo("Persisting " + newRDD + " to " + storageLevel + " at time " + time) + } + } + generatedRDDs.put(time.copy(), newRDD) + Some(newRDD) + case None => + None + } + } else { + None + } + } + } + + // This method generates a SparkStream job for the given time + // and may require to be overriden by subclasses + def generateJob(time: Time): Option[Job] = { + getOrCompute(time) match { + case Some(rdd) => { + val jobFunc = () => { + val emptyFunc = { (iterator: Iterator[T]) => {} } + ssc.sc.runJob(rdd, emptyFunc) + } + Some(new Job(time, jobFunc)) + } + case None => None + } + } + + /* -------------- */ + /* RDS operations */ + /* -------------- */ + + def map[U: ClassManifest](mapFunc: T => U) = new MappedRDS(this, ssc.sc.clean(mapFunc)) + + def flatMap[U: ClassManifest](flatMapFunc: T => Traversable[U]) = + new FlatMappedRDS(this, ssc.sc.clean(flatMapFunc)) + + def filter(filterFunc: T => Boolean) = new FilteredRDS(this, filterFunc) + + def glom() = new GlommedRDS(this) + + def mapPartitions[U: ClassManifest](mapPartFunc: Iterator[T] => Iterator[U]) = + new MapPartitionedRDS(this, ssc.sc.clean(mapPartFunc)) + + def reduce(reduceFunc: (T, T) => T) = this.map(x => (1, x)).reduceByKey(reduceFunc, 1).map(_._2) + + def count() = this.map(_ => 1).reduce(_ + _) + + def collect() = this.map(x => (1, x)).groupByKey(1).map(_._2) + + def foreach(foreachFunc: T => Unit) = { + val newrds = new PerElementForEachRDS(this, ssc.sc.clean(foreachFunc)) + ssc.registerOutputStream(newrds) + newrds + } + + def foreachRDD(foreachFunc: RDD[T] => Unit) = { + val newrds = new PerRDDForEachRDS(this, ssc.sc.clean(foreachFunc)) + ssc.registerOutputStream(newrds) + newrds + } + + def print() = { + def foreachFunc = (rdd: RDD[T], time: Time) => { + val first11 = rdd.take(11) + println ("-------------------------------------------") + println ("Time: " + time) + println ("-------------------------------------------") + first11.take(10).foreach(println) + if (first11.size > 10) println("...") + println() + } + val newrds = new PerRDDForEachRDS(this, ssc.sc.clean(foreachFunc)) + ssc.registerOutputStream(newrds) + newrds + } + + def window(windowTime: Time, slideTime: Time) = new WindowedRDS(this, windowTime, slideTime) + + def batch(batchTime: Time) = window(batchTime, batchTime) + + def reduceByWindow(reduceFunc: (T, T) => T, windowTime: Time, slideTime: Time) = + this.window(windowTime, slideTime).reduce(reduceFunc) + + def reduceByWindow( + reduceFunc: (T, T) => T, + invReduceFunc: (T, T) => T, + windowTime: Time, + slideTime: Time) = { + this.map(x => (1, x)) + .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowTime, slideTime, 1) + .map(_._2) + } + + def countByWindow(windowTime: Time, slideTime: Time) = { + def add(v1: Int, v2: Int) = (v1 + v2) + def subtract(v1: Int, v2: Int) = (v1 - v2) + this.map(_ => 1).reduceByWindow(add _, subtract _, windowTime, slideTime) + } + + def union(that: RDS[T]) = new UnifiedRDS(Array(this, that)) + + def register() = ssc.registerOutputStream(this) +} + + +class PairRDSFunctions[K: ClassManifest, V: ClassManifest](rds: RDS[(K,V)]) +extends Serializable { + + def ssc = rds.ssc + + /* ---------------------------------- */ + /* RDS operations for key-value pairs */ + /* ---------------------------------- */ + + def groupByKey(numPartitions: Int = 0): ShuffledRDS[K, V, ArrayBuffer[V]] = { + def createCombiner(v: V) = ArrayBuffer[V](v) + def mergeValue(c: ArrayBuffer[V], v: V) = (c += v) + def mergeCombiner(c1: ArrayBuffer[V], c2: ArrayBuffer[V]) = (c1 ++ c2) + combineByKey[ArrayBuffer[V]](createCombiner, mergeValue, mergeCombiner, numPartitions) + } + + def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int = 0): ShuffledRDS[K, V, V] = { + val cleanedReduceFunc = ssc.sc.clean(reduceFunc) + combineByKey[V]((v: V) => v, cleanedReduceFunc, cleanedReduceFunc, numPartitions) + } + + private def combineByKey[C: ClassManifest]( + createCombiner: V => C, + mergeValue: (C, V) => C, + mergeCombiner: (C, C) => C, + numPartitions: Int) : ShuffledRDS[K, V, C] = { + new ShuffledRDS[K, V, C](rds, createCombiner, mergeValue, mergeCombiner, numPartitions) + } + + def groupByKeyAndWindow( + windowTime: Time, + slideTime: Time, + numPartitions: Int = 0): ShuffledRDS[K, V, ArrayBuffer[V]] = { + rds.window(windowTime, slideTime).groupByKey(numPartitions) + } + + def reduceByKeyAndWindow( + reduceFunc: (V, V) => V, + windowTime: Time, + slideTime: Time, + numPartitions: Int = 0): ShuffledRDS[K, V, V] = { + rds.window(windowTime, slideTime).reduceByKey(ssc.sc.clean(reduceFunc), numPartitions) + } + + // This method is the efficient sliding window reduce operation, + // which requires the specification of an inverse reduce function, + // so that new elements introduced in the window can be "added" using + // reduceFunc to the previous window's result and old elements can be + // "subtracted using invReduceFunc. + def reduceByKeyAndWindow( + reduceFunc: (V, V) => V, + invReduceFunc: (V, V) => V, + windowTime: Time, + slideTime: Time, + numPartitions: Int): ReducedWindowedRDS[K, V] = { + + new ReducedWindowedRDS[K, V]( + rds, + ssc.sc.clean(reduceFunc), + ssc.sc.clean(invReduceFunc), + windowTime, + slideTime, + numPartitions) + } +} + + +abstract class InputRDS[T: ClassManifest] ( + val inputName: String, + val batchDuration: Time, + ssc: SparkStreamContext) +extends RDS[T](ssc) { + + override def dependencies = List() + + override def slideTime = batchDuration + + def setReference(time: Time, reference: AnyRef) +} + + +class FileInputRDS( + val fileInputName: String, + val directory: String, + ssc: SparkStreamContext) +extends InputRDS[String](fileInputName, LongTime(1000), ssc) { + + @transient val generatedFiles = new HashMap[Time,String] + + // TODO(Haoyuan): This is for the performance test. + @transient + val rdd = ssc.sc.textFile(SparkContext.inputFile, + SparkContext.idealPartitions).asInstanceOf[RDD[String]] + + override def compute(validTime: Time): Option[RDD[String]] = { + generatedFiles.get(validTime) match { + case Some(file) => + logInfo("Reading from file " + file + " for time " + validTime) + // Some(ssc.sc.textFile(file).asInstanceOf[RDD[String]]) + // The following line is for HDFS performance test. Sould comment out the above line. + Some(rdd) + case None => + throw new Exception(this.toString + ": Reference missing for time " + validTime + "!!!") + None + } + } + + def setReference(time: Time, reference: AnyRef) { + generatedFiles += ((time, reference.toString)) + logInfo("Reference added for time " + time + " - " + reference.toString) + } +} + +class NetworkInputRDS[T: ClassManifest]( + val networkInputName: String, + val addresses: Array[InetSocketAddress], + batchDuration: Time, + ssc: SparkStreamContext) +extends InputRDS[T](networkInputName, batchDuration, ssc) { + + + // TODO(Haoyuan): This is for the performance test. + @transient var rdd: RDD[T] = null + + if (System.getProperty("spark.fake", "false") == "true") { + logInfo("Running initial count to cache fake RDD") + rdd = ssc.sc.textFile(SparkContext.inputFile, + SparkContext.idealPartitions).asInstanceOf[RDD[T]] + val fakeCacheLevel = System.getProperty("spark.fake.cache", "") + if (fakeCacheLevel == "MEMORY_ONLY_2") { + rdd.persist(StorageLevel.MEMORY_ONLY_2) + } else if (fakeCacheLevel == "MEMORY_ONLY_DESER_2") { + rdd.persist(StorageLevel.MEMORY_ONLY_2) + } else if (fakeCacheLevel != "") { + logError("Invalid fake cache level: " + fakeCacheLevel) + System.exit(1) + } + rdd.count() + } + + @transient val references = new HashMap[Time,String] + + override def compute(validTime: Time): Option[RDD[T]] = { + if (System.getProperty("spark.fake", "false") == "true") { + logInfo("Returning fake RDD at " + validTime) + return Some(rdd) + } + references.get(validTime) match { + case Some(reference) => + if (reference.startsWith("file") || reference.startsWith("hdfs")) { + logInfo("Reading from file " + reference + " for time " + validTime) + Some(ssc.sc.textFile(reference).asInstanceOf[RDD[T]]) + } else { + logInfo("Getting from BlockManager " + reference + " for time " + validTime) + Some(new BlockRDD(ssc.sc, Array(reference))) + } + case None => + throw new Exception(this.toString + ": Reference missing for time " + validTime + "!!!") + None + } + } + + def setReference(time: Time, reference: AnyRef) { + references += ((time, reference.toString)) + logInfo("Reference added for time " + time + " - " + reference.toString) + } +} + + +class TestInputRDS( + val testInputName: String, + batchDuration: Time, + ssc: SparkStreamContext) +extends InputRDS[String](testInputName, batchDuration, ssc) { + + @transient val references = new HashMap[Time,Array[String]] + + override def compute(validTime: Time): Option[RDD[String]] = { + references.get(validTime) match { + case Some(reference) => + Some(new BlockRDD[String](ssc.sc, reference)) + case None => + throw new Exception(this.toString + ": Reference missing for time " + validTime + "!!!") + None + } + } + + def setReference(time: Time, reference: AnyRef) { + references += ((time, reference.asInstanceOf[Array[String]])) + } +} + + +class MappedRDS[T: ClassManifest, U: ClassManifest] ( + parent: RDS[T], + mapFunc: T => U) +extends RDS[U](parent.ssc) { + + override def dependencies = List(parent) + + override def slideTime: Time = parent.slideTime + + override def compute(validTime: Time): Option[RDD[U]] = { + parent.getOrCompute(validTime).map(_.map[U](mapFunc)) + } +} + + +class FlatMappedRDS[T: ClassManifest, U: ClassManifest]( + parent: RDS[T], + flatMapFunc: T => Traversable[U]) +extends RDS[U](parent.ssc) { + + override def dependencies = List(parent) + + override def slideTime: Time = parent.slideTime + + override def compute(validTime: Time): Option[RDD[U]] = { + parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc)) + } +} + + +class FilteredRDS[T: ClassManifest](parent: RDS[T], filterFunc: T => Boolean) +extends RDS[T](parent.ssc) { + + override def dependencies = List(parent) + + override def slideTime: Time = parent.slideTime + + override def compute(validTime: Time): Option[RDD[T]] = { + parent.getOrCompute(validTime).map(_.filter(filterFunc)) + } +} + +class MapPartitionedRDS[T: ClassManifest, U: ClassManifest]( + parent: RDS[T], + mapPartFunc: Iterator[T] => Iterator[U]) +extends RDS[U](parent.ssc) { + + override def dependencies = List(parent) + + override def slideTime: Time = parent.slideTime + + override def compute(validTime: Time): Option[RDD[U]] = { + parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc)) + } +} + +class GlommedRDS[T: ClassManifest](parent: RDS[T]) extends RDS[Array[T]](parent.ssc) { + + override def dependencies = List(parent) + + override def slideTime: Time = parent.slideTime + + override def compute(validTime: Time): Option[RDD[Array[T]]] = { + parent.getOrCompute(validTime).map(_.glom()) + } +} + + +class ShuffledRDS[K: ClassManifest, V: ClassManifest, C: ClassManifest]( + parent: RDS[(K,V)], + createCombiner: V => C, + mergeValue: (C, V) => C, + mergeCombiner: (C, C) => C, + numPartitions: Int) + extends RDS [(K,C)] (parent.ssc) { + + override def dependencies = List(parent) + + override def slideTime: Time = parent.slideTime + + override def compute(validTime: Time): Option[RDD[(K,C)]] = { + parent.getOrCompute(validTime) match { + case Some(rdd) => + val newrdd = { + if (numPartitions > 0) { + rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, numPartitions) + } else { + rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner) + } + } + Some(newrdd) + case None => None + } + } +} + + +class UnifiedRDS[T: ClassManifest](parents: Array[RDS[T]]) +extends RDS[T](parents(0).ssc) { + + if (parents.length == 0) { + throw new IllegalArgumentException("Empty array of parents") + } + + if (parents.map(_.ssc).distinct.size > 1) { + throw new IllegalArgumentException("Array of parents have different SparkStreamContexts") + } + + if (parents.map(_.slideTime).distinct.size > 1) { + throw new IllegalArgumentException("Array of parents have different slide times") + } + + override def dependencies = parents.toList + + override def slideTime: Time = parents(0).slideTime + + override def compute(validTime: Time): Option[RDD[T]] = { + val rdds = new ArrayBuffer[RDD[T]]() + parents.map(_.getOrCompute(validTime)).foreach(_ match { + case Some(rdd) => rdds += rdd + case None => throw new Exception("Could not generate RDD from a parent for unifying at time " + validTime) + }) + if (rdds.size > 0) { + Some(new UnionRDD(ssc.sc, rdds)) + } else { + None + } + } +} + + +class PerElementForEachRDS[T: ClassManifest] ( + parent: RDS[T], + foreachFunc: T => Unit) +extends RDS[Unit](parent.ssc) { + + override def dependencies = List(parent) + + override def slideTime: Time = parent.slideTime + + override def compute(validTime: Time): Option[RDD[Unit]] = None + + override def generateJob(time: Time): Option[Job] = { + parent.getOrCompute(time) match { + case Some(rdd) => + val jobFunc = () => { + val sparkJobFunc = { + (iterator: Iterator[T]) => iterator.foreach(foreachFunc) + } + ssc.sc.runJob(rdd, sparkJobFunc) + } + Some(new Job(time, jobFunc)) + case None => None + } + } +} + + +class PerRDDForEachRDS[T: ClassManifest] ( + parent: RDS[T], + foreachFunc: (RDD[T], Time) => Unit) +extends RDS[Unit](parent.ssc) { + + def this(parent: RDS[T], altForeachFunc: (RDD[T]) => Unit) = + this(parent, (rdd: RDD[T], time: Time) => altForeachFunc(rdd)) + + override def dependencies = List(parent) + + override def slideTime: Time = parent.slideTime + + override def compute(validTime: Time): Option[RDD[Unit]] = None + + override def generateJob(time: Time): Option[Job] = { + parent.getOrCompute(time) match { + case Some(rdd) => + val jobFunc = () => { + foreachFunc(rdd, time) + } + Some(new Job(time, jobFunc)) + case None => None + } + } +} diff --git a/streaming/src/main/scala/spark/stream/ReducedWindowedRDS.scala b/streaming/src/main/scala/spark/stream/ReducedWindowedRDS.scala new file mode 100644 index 0000000000..d47654ccb9 --- /dev/null +++ b/streaming/src/main/scala/spark/stream/ReducedWindowedRDS.scala @@ -0,0 +1,218 @@ +package spark.stream + +import spark.stream.SparkStreamContext._ + +import spark.RDD +import spark.UnionRDD +import spark.CoGroupedRDD +import spark.HashPartitioner +import spark.SparkContext._ +import spark.storage.StorageLevel + +import scala.collection.mutable.ArrayBuffer + +class ReducedWindowedRDS[K: ClassManifest, V: ClassManifest]( + parent: RDS[(K, V)], + reduceFunc: (V, V) => V, + invReduceFunc: (V, V) => V, + _windowTime: Time, + _slideTime: Time, + numPartitions: Int) +extends RDS[(K,V)](parent.ssc) { + + if (!_windowTime.isMultipleOf(parent.slideTime)) + throw new Exception("The window duration of ReducedWindowedRDS (" + _slideTime + ") " + + "must be multiple of the slide duration of parent RDS (" + parent.slideTime + ")") + + if (!_slideTime.isMultipleOf(parent.slideTime)) + throw new Exception("The slide duration of ReducedWindowedRDS (" + _slideTime + ") " + + "must be multiple of the slide duration of parent RDS (" + parent.slideTime + ")") + + val reducedRDS = parent.reduceByKey(reduceFunc, numPartitions) + val allowPartialWindows = true + //reducedRDS.persist(StorageLevel.MEMORY_ONLY_DESER_2) + + override def dependencies = List(reducedRDS) + + def windowTime: Time = _windowTime + + override def slideTime: Time = _slideTime + + override def persist( + storageLevel: StorageLevel, + checkpointLevel: StorageLevel, + checkpointInterval: Time): RDS[(K,V)] = { + super.persist(storageLevel, checkpointLevel, checkpointInterval) + reducedRDS.persist(storageLevel, checkpointLevel, checkpointInterval) + } + + override def compute(validTime: Time): Option[RDD[(K, V)]] = { + + + // Notation: + // _____________________________ + // | previous window _________|___________________ + // |___________________| current window | --------------> Time + // |_____________________________| + // + // |________ _________| |________ _________| + // | | + // V V + // old time steps new time steps + // + def getAdjustedWindow(endTime: Time, windowTime: Time): Interval = { + val beginTime = + if (allowPartialWindows && endTime - windowTime < parent.zeroTime) { + parent.zeroTime + } else { + endTime - windowTime + } + Interval(beginTime, endTime) + } + + val currentTime = validTime.copy + val currentWindow = getAdjustedWindow(currentTime, windowTime) + val previousWindow = getAdjustedWindow(currentTime - slideTime, windowTime) + + logInfo("Current window = " + currentWindow) + logInfo("Previous window = " + previousWindow) + logInfo("Parent.zeroTime = " + parent.zeroTime) + + if (allowPartialWindows) { + if (currentTime - slideTime == parent.zeroTime) { + reducedRDS.getOrCompute(currentTime) match { + case Some(rdd) => return Some(rdd) + case None => throw new Exception("Could not get first reduced RDD for time " + currentTime) + } + } + } else { + if (previousWindow.beginTime < parent.zeroTime) { + if (currentWindow.beginTime < parent.zeroTime) { + return None + } else { + // If this is the first feasible window, then generate reduced value in the naive manner + val reducedRDDs = new ArrayBuffer[RDD[(K, V)]]() + var t = currentWindow.endTime + while (t > currentWindow.beginTime) { + reducedRDS.getOrCompute(t) match { + case Some(rdd) => reducedRDDs += rdd + case None => throw new Exception("Could not get reduced RDD for time " + t) + } + t -= reducedRDS.slideTime + } + if (reducedRDDs.size == 0) { + throw new Exception("Could not generate the first RDD for time " + validTime) + } + return Some(new UnionRDD(ssc.sc, reducedRDDs).reduceByKey(reduceFunc, numPartitions)) + } + } + } + + // Get the RDD of the reduced value of the previous window + val previousWindowRDD = getOrCompute(previousWindow.endTime) match { + case Some(rdd) => rdd.asInstanceOf[RDD[(_, _)]] + case None => throw new Exception("Could not get previous RDD for time " + previousWindow.endTime) + } + + val oldRDDs = new ArrayBuffer[RDD[(_, _)]]() + val newRDDs = new ArrayBuffer[RDD[(_, _)]]() + + // Get the RDDs of the reduced values in "old time steps" + var t = currentWindow.beginTime + while (t > previousWindow.beginTime) { + reducedRDS.getOrCompute(t) match { + case Some(rdd) => oldRDDs += rdd.asInstanceOf[RDD[(_, _)]] + case None => throw new Exception("Could not get old reduced RDD for time " + t) + } + t -= reducedRDS.slideTime + } + + // Get the RDDs of the reduced values in "new time steps" + t = currentWindow.endTime + while (t > previousWindow.endTime) { + reducedRDS.getOrCompute(t) match { + case Some(rdd) => newRDDs += rdd.asInstanceOf[RDD[(_, _)]] + case None => throw new Exception("Could not get new reduced RDD for time " + t) + } + t -= reducedRDS.slideTime + } + + val partitioner = new HashPartitioner(numPartitions) + val allRDDs = new ArrayBuffer[RDD[(_, _)]]() + allRDDs += previousWindowRDD + allRDDs ++= oldRDDs + allRDDs ++= newRDDs + + + val numOldRDDs = oldRDDs.size + val numNewRDDs = newRDDs.size + logInfo("Generated numOldRDDs = " + numOldRDDs + ", numNewRDDs = " + numNewRDDs) + logInfo("Generating CoGroupedRDD with " + allRDDs.size + " RDDs") + val newRDD = new CoGroupedRDD[K](allRDDs.toSeq, partitioner).asInstanceOf[RDD[(K,Seq[Seq[V]])]].map(x => { + val (key, value) = x + logDebug("value.size = " + value.size + ", numOldRDDs = " + numOldRDDs + ", numNewRDDs = " + numNewRDDs) + if (value.size != 1 + numOldRDDs + numNewRDDs) { + throw new Exception("Number of groups not odd!") + } + + // old values = reduced values of the "old time steps" that are eliminated from current window + // new values = reduced values of the "new time steps" that are introduced to the current window + // previous value = reduced value of the previous window + + /*val numOldValues = (value.size - 1) / 2*/ + // Getting reduced values "old time steps" + val oldValues = + (0 until numOldRDDs).map(i => value(1 + i)).filter(_.size > 0).map(x => x(0)) + // Getting reduced values "new time steps" + val newValues = + (0 until numNewRDDs).map(i => value(1 + numOldRDDs + i)).filter(_.size > 0).map(x => x(0)) + + // If reduced value for the key does not exist in previous window, it should not exist in "old time steps" + if (value(0).size == 0 && oldValues.size != 0) { + throw new Exception("Unexpected: Key exists in old reduced values but not in previous reduced values") + } + + // For the key, at least one of "old time steps", "new time steps" and previous window should have reduced values + if (value(0).size == 0 && oldValues.size == 0 && newValues.size == 0) { + throw new Exception("Unexpected: Key does not exist in any of old, new, or previour reduced values") + } + + // Logic to generate the final reduced value for current window: + // + // If previous window did not have reduced value for the key + // Then, return reduced value of "new time steps" as the final value + // Else, reduced value exists in previous window + // If "old" time steps did not have reduced value for the key + // Then, reduce previous window's reduced value with that of "new time steps" for final value + // Else, reduced values exists in "old time steps" + // If "new values" did not have reduced value for the key + // Then, inverse-reduce "old values" from previous window's reduced value for final value + // Else, all 3 values exist, combine all of them together + // + logDebug("# old values = " + oldValues.size + ", # new values = " + newValues) + val finalValue = { + if (value(0).size == 0) { + newValues.reduce(reduceFunc) + } else { + val prevValue = value(0)(0) + logDebug("prev value = " + prevValue) + if (oldValues.size == 0) { + // assuming newValue.size > 0 (all 3 cannot be zero, as checked earlier) + val temp = newValues.reduce(reduceFunc) + reduceFunc(prevValue, temp) + } else if (newValues.size == 0) { + invReduceFunc(prevValue, oldValues.reduce(reduceFunc)) + } else { + val tempValue = invReduceFunc(prevValue, oldValues.reduce(reduceFunc)) + reduceFunc(tempValue, newValues.reduce(reduceFunc)) + } + } + } + (key, finalValue) + }) + //newRDD.persist(StorageLevel.MEMORY_ONLY_DESER_2) + Some(newRDD) + } +} + + diff --git a/streaming/src/main/scala/spark/stream/Scheduler.scala b/streaming/src/main/scala/spark/stream/Scheduler.scala new file mode 100644 index 0000000000..38946fef11 --- /dev/null +++ b/streaming/src/main/scala/spark/stream/Scheduler.scala @@ -0,0 +1,181 @@ +package spark.stream + +import spark.SparkEnv +import spark.Logging + +import scala.collection.mutable.HashMap +import scala.collection.mutable.HashSet +import scala.collection.mutable.ArrayBuffer + +import akka.actor._ +import akka.actor.Actor +import akka.actor.Actor._ +import akka.util.duration._ + +sealed trait SchedulerMessage +case class InputGenerated(inputName: String, interval: Interval, reference: AnyRef = null) extends SchedulerMessage +case class Test extends SchedulerMessage + +class Scheduler( + ssc: SparkStreamContext, + inputRDSs: Array[InputRDS[_]], + outputRDSs: Array[RDS[_]]) +extends Actor with Logging { + + class InputState (inputNames: Array[String]) { + val inputsLeft = new HashSet[String]() + inputsLeft ++= inputNames + + val startTime = System.currentTimeMillis + + def delay() = System.currentTimeMillis - startTime + + def addGeneratedInput(inputName: String) = inputsLeft -= inputName + + def areAllInputsGenerated() = (inputsLeft.size == 0) + + override def toString(): String = { + val left = if (inputsLeft.size == 0) "" else inputsLeft.reduceLeft(_ + ", " + _) + return "Inputs left = [ " + left + " ]" + } + } + + + initLogging() + + val inputNames = inputRDSs.map(_.inputName).toArray + val inputStates = new HashMap[Interval, InputState]() + val currentJobs = System.getProperty("spark.stream.currentJobs", "1").toInt + val jobManager = new JobManager2(ssc, currentJobs) + + // TODO(Haoyuan): The following line is for performance test only. + var cnt: Int = System.getProperty("spark.stream.fake.cnt", "60").toInt + var lastInterval: Interval = null + + + /*remote.register("SparkStreamScheduler", actorOf[Scheduler])*/ + logInfo("Registered actor on port ") + + /*jobManager.start()*/ + startStreamReceivers() + + def receive = { + case InputGenerated(inputName, interval, reference) => { + addGeneratedInput(inputName, interval, reference) + } + case Test() => logInfo("TEST PASSED") + } + + def addGeneratedInput(inputName: String, interval: Interval, reference: AnyRef = null) { + logInfo("Input " + inputName + " generated for interval " + interval) + inputStates.get(interval) match { + case None => inputStates.put(interval, new InputState(inputNames)) + case _ => + } + inputStates(interval).addGeneratedInput(inputName) + + inputRDSs.filter(_.inputName == inputName).foreach(inputRDS => { + inputRDS.setReference(interval.endTime, reference) + if (inputRDS.isInstanceOf[TestInputRDS]) { + TestInputBlockTracker.addBlocks(interval.endTime, reference) + } + } + ) + + def getNextInterval(): Option[Interval] = { + logDebug("Last interval is " + lastInterval) + val readyIntervals = inputStates.filter(_._2.areAllInputsGenerated).keys + /*inputState.foreach(println) */ + logDebug("InputState has " + inputStates.size + " intervals, " + readyIntervals.size + " ready intervals") + return readyIntervals.find(lastInterval == null || _.beginTime == lastInterval.endTime) + } + + var nextInterval = getNextInterval() + var count = 0 + while(nextInterval.isDefined) { + val inputState = inputStates.get(nextInterval.get).get + generateRDDsForInterval(nextInterval.get) + logInfo("Skew delay for " + nextInterval.get.endTime + " is " + (inputState.delay / 1000.0) + " s") + inputStates.remove(nextInterval.get) + lastInterval = nextInterval.get + nextInterval = getNextInterval() + count += 1 + /*if (nextInterval.size == 0 && inputState.size > 0) { + logDebug("Next interval not ready, pending intervals " + inputState.size) + }*/ + } + logDebug("RDDs generated for " + count + " intervals") + + /* + if (inputState(interval).areAllInputsGenerated) { + generateRDDsForInterval(interval) + lastInterval = interval + inputState.remove(interval) + } else { + logInfo("All inputs not generated for interval " + interval) + } + */ + } + + def generateRDDsForInterval (interval: Interval) { + logInfo("Generating RDDs for interval " + interval) + outputRDSs.foreach(outputRDS => { + if (!outputRDS.isInitialized) outputRDS.initialize(interval) + outputRDS.generateJob(interval.endTime) match { + case Some(job) => submitJob(job) + case None => + } + } + ) + // TODO(Haoyuan): This comment is for performance test only. + if (System.getProperty("spark.fake", "false") == "true" || System.getProperty("spark.stream.fake", "false") == "true") { + cnt -= 1 + if (cnt <= 0) { + logInfo("My time is up! " + cnt) + System.exit(1) + } + } + } + + def submitJob(job: Job) { + logInfo("Submitting " + job + " to JobManager") + /*jobManager ! RunJob(job)*/ + jobManager.runJob(job) + } + + def startStreamReceivers() { + val testStreamReceiverNames = new ArrayBuffer[(String, Long)]() + inputRDSs.foreach (inputRDS => { + inputRDS match { + case fileInputRDS: FileInputRDS => { + val fileStreamReceiver = new FileStreamReceiver( + fileInputRDS.inputName, + fileInputRDS.directory, + fileInputRDS.batchDuration.asInstanceOf[LongTime].milliseconds) + fileStreamReceiver.start() + } + case networkInputRDS: NetworkInputRDS[_] => { + val networkStreamReceiver = new NetworkStreamReceiver( + networkInputRDS.inputName, + networkInputRDS.batchDuration, + 0, + ssc, + if (ssc.tempDir == null) null else ssc.tempDir.toString) + networkStreamReceiver.start() + } + case testInputRDS: TestInputRDS => { + testStreamReceiverNames += + ((testInputRDS.inputName, testInputRDS.batchDuration.asInstanceOf[LongTime].milliseconds)) + } + } + }) + if (testStreamReceiverNames.size > 0) { + /*val testStreamCoordinator = new TestStreamCoordinator(testStreamReceiverNames.toArray)*/ + /*testStreamCoordinator.start()*/ + val actor = ssc.actorSystem.actorOf( + Props(new TestStreamCoordinator(testStreamReceiverNames.toArray)), + name = "TestStreamCoordinator") + } + } +} + diff --git a/streaming/src/main/scala/spark/stream/SenGeneratorForPerformanceTest.scala b/streaming/src/main/scala/spark/stream/SenGeneratorForPerformanceTest.scala new file mode 100644 index 0000000000..74fd54072f --- /dev/null +++ b/streaming/src/main/scala/spark/stream/SenGeneratorForPerformanceTest.scala @@ -0,0 +1,78 @@ +package spark.stream + +import scala.util.Random +import scala.io.Source +import scala.actors._ +import scala.actors.Actor._ +import scala.actors.remote._ +import scala.actors.remote.RemoteActor._ + +import java.net.InetSocketAddress + +/*import akka.actor.Actor._*/ +/*import akka.actor.ActorRef*/ + + +object SenGeneratorForPerformanceTest { + + def printUsage () { + println ("Usage: SentenceGenerator <target IP> <target port> <sentence file> [<sentences per second>]") + System.exit(0) + } + + def main (args: Array[String]) { + if (args.length < 3) { + printUsage + } + + val inputManagerIP = args(0) + val inputManagerPort = args(1).toInt + val sentenceFile = args(2) + val sentencesPerSecond = { + if (args.length > 3) args(3).toInt + else 10 + } + + val source = Source.fromFile(sentenceFile) + val lines = source.mkString.split ("\n") + source.close () + + try { + /*val inputManager = remote.actorFor("InputReceiver-Sentences",*/ + /* inputManagerIP, inputManagerPort)*/ + val inputManager = select(Node(inputManagerIP, inputManagerPort), Symbol("InputReceiver-Sentences")) + val sleepBetweenSentences = 1000.0 / sentencesPerSecond.toDouble - 1 + val random = new Random () + println ("Sending " + sentencesPerSecond + " sentences per second to " + inputManagerIP + ":" + inputManagerPort) + var lastPrintTime = System.currentTimeMillis() + var count = 0 + + while (true) { + /*if (!inputManager.tryTell (lines (random.nextInt (lines.length))))*/ + /*throw new Exception ("disconnected")*/ +// inputManager ! lines (random.nextInt (lines.length)) + for (i <- 0 to sentencesPerSecond) inputManager ! lines (0) + println(System.currentTimeMillis / 1000 + " s") +/* count += 1 + + if (System.currentTimeMillis - lastPrintTime >= 1000) { + println (count + " sentences sent last second") + count = 0 + lastPrintTime = System.currentTimeMillis + } + + Thread.sleep (sleepBetweenSentences.toLong) +*/ + val currentMs = System.currentTimeMillis / 1000; + Thread.sleep ((currentMs * 1000 + 1000) - System.currentTimeMillis) + } + } catch { + case e: Exception => + /*Thread.sleep (1000)*/ + } + } +} + + + + diff --git a/streaming/src/main/scala/spark/stream/SenderReceiverTest.scala b/streaming/src/main/scala/spark/stream/SenderReceiverTest.scala new file mode 100644 index 0000000000..69879b621c --- /dev/null +++ b/streaming/src/main/scala/spark/stream/SenderReceiverTest.scala @@ -0,0 +1,63 @@ +package spark.stream +import java.net.{Socket, ServerSocket} +import java.io.{ByteArrayOutputStream, DataOutputStream, DataInputStream, BufferedInputStream} + +object Receiver { + def main(args: Array[String]) { + val port = args(0).toInt + val lsocket = new ServerSocket(port) + println("Listening on port " + port ) + while(true) { + val socket = lsocket.accept() + (new Thread() { + override def run() { + val buffer = new Array[Byte](100000) + var count = 0 + val time = System.currentTimeMillis + try { + val is = new DataInputStream(new BufferedInputStream(socket.getInputStream)) + var loop = true + var string: String = null + while((string = is.readUTF) != null) { + count += 28 + } + } catch { + case e: Exception => e.printStackTrace + } + val timeTaken = System.currentTimeMillis - time + val tput = (count / 1024.0) / (timeTaken / 1000.0) + println("Data = " + count + " bytes\nTime = " + timeTaken + " ms\nTput = " + tput + " KB/s") + } + }).start() + } + } + +} + +object Sender { + + def main(args: Array[String]) { + try { + val host = args(0) + val port = args(1).toInt + val size = args(2).toInt + + val byteStream = new ByteArrayOutputStream() + val stringDataStream = new DataOutputStream(byteStream) + (0 until size).foreach(_ => stringDataStream.writeUTF("abcdedfghijklmnopqrstuvwxy")) + val bytes = byteStream.toByteArray() + println("Generated array of " + bytes.length + " bytes") + + /*val bytes = new Array[Byte](size)*/ + val socket = new Socket(host, port) + val os = socket.getOutputStream + os.write(bytes) + os.flush + socket.close() + + } catch { + case e: Exception => e.printStackTrace + } + } +} + diff --git a/streaming/src/main/scala/spark/stream/SentenceFileGenerator.scala b/streaming/src/main/scala/spark/stream/SentenceFileGenerator.scala new file mode 100644 index 0000000000..9aa441d9bb --- /dev/null +++ b/streaming/src/main/scala/spark/stream/SentenceFileGenerator.scala @@ -0,0 +1,92 @@ +package spark.stream + +import spark._ + +import scala.collection.mutable.ArrayBuffer +import scala.util.Random +import scala.io.Source + +import java.net.InetSocketAddress + +import org.apache.hadoop.fs._ +import org.apache.hadoop.conf._ +import org.apache.hadoop.io._ +import org.apache.hadoop.mapred._ +import org.apache.hadoop.util._ + +object SentenceFileGenerator { + + def printUsage () { + println ("Usage: SentenceFileGenerator <master> <target directory> <# partitions> <sentence file> [<sentences per second>]") + System.exit(0) + } + + def main (args: Array[String]) { + if (args.length < 4) { + printUsage + } + + val master = args(0) + val fs = new Path(args(1)).getFileSystem(new Configuration()) + val targetDirectory = new Path(args(1)).makeQualified(fs) + val numPartitions = args(2).toInt + val sentenceFile = args(3) + val sentencesPerSecond = { + if (args.length > 4) args(4).toInt + else 10 + } + + val source = Source.fromFile(sentenceFile) + val lines = source.mkString.split ("\n").toArray + source.close () + println("Read " + lines.length + " lines from file " + sentenceFile) + + val sentences = { + val buffer = ArrayBuffer[String]() + val random = new Random() + var i = 0 + while (i < sentencesPerSecond) { + buffer += lines(random.nextInt(lines.length)) + i += 1 + } + buffer.toArray + } + println("Generated " + sentences.length + " sentences") + + val sc = new SparkContext(master, "SentenceFileGenerator") + val sentencesRDD = sc.parallelize(sentences, numPartitions) + + val tempDirectory = new Path(targetDirectory, "_tmp") + + fs.mkdirs(targetDirectory) + fs.mkdirs(tempDirectory) + + var saveTimeMillis = System.currentTimeMillis + try { + while (true) { + val newDir = new Path(targetDirectory, "Sentences-" + saveTimeMillis) + val tmpNewDir = new Path(tempDirectory, "Sentences-" + saveTimeMillis) + println("Writing to file " + newDir) + sentencesRDD.saveAsTextFile(tmpNewDir.toString) + fs.rename(tmpNewDir, newDir) + saveTimeMillis += 1000 + val sleepTimeMillis = { + val currentTimeMillis = System.currentTimeMillis + if (saveTimeMillis < currentTimeMillis) { + 0 + } else { + saveTimeMillis - currentTimeMillis + } + } + println("Sleeping for " + sleepTimeMillis + " ms") + Thread.sleep(sleepTimeMillis) + } + } catch { + case e: Exception => + } + } +} + + + + diff --git a/streaming/src/main/scala/spark/stream/SentenceGenerator.scala b/streaming/src/main/scala/spark/stream/SentenceGenerator.scala new file mode 100644 index 0000000000..ef66e66047 --- /dev/null +++ b/streaming/src/main/scala/spark/stream/SentenceGenerator.scala @@ -0,0 +1,103 @@ +package spark.stream + +import scala.util.Random +import scala.io.Source +import scala.actors._ +import scala.actors.Actor._ +import scala.actors.remote._ +import scala.actors.remote.RemoteActor._ + +import java.net.InetSocketAddress + + +object SentenceGenerator { + + def printUsage { + println ("Usage: SentenceGenerator <target IP> <target port> <sentence file> [<sentences per second>]") + System.exit(0) + } + + def generateRandomSentences(lines: Array[String], sentencesPerSecond: Int, streamReceiver: AbstractActor) { + val sleepBetweenSentences = 1000.0 / sentencesPerSecond.toDouble - 1 + val random = new Random () + + try { + var lastPrintTime = System.currentTimeMillis() + var count = 0 + while(true) { + streamReceiver ! lines(random.nextInt(lines.length)) + count += 1 + if (System.currentTimeMillis - lastPrintTime >= 1000) { + println (count + " sentences sent last second") + count = 0 + lastPrintTime = System.currentTimeMillis + } + Thread.sleep(sleepBetweenSentences.toLong) + } + } catch { + case e: Exception => + } + } + + def generateSameSentences(lines: Array[String], sentencesPerSecond: Int, streamReceiver: AbstractActor) { + try { + val numSentences = if (sentencesPerSecond <= 0) { + lines.length + } else { + sentencesPerSecond + } + var nextSendingTime = System.currentTimeMillis() + val pingInterval = if (System.getenv("INTERVAL") != null) { + System.getenv("INTERVAL").toInt + } else { + 2000 + } + while(true) { + (0 until numSentences).foreach(i => { + streamReceiver ! lines(i % lines.length) + }) + println ("Sent " + numSentences + " sentences") + nextSendingTime += pingInterval + val sleepTime = nextSendingTime - System.currentTimeMillis + if (sleepTime > 0) { + println ("Sleeping for " + sleepTime + " ms") + Thread.sleep(sleepTime) + } + } + } catch { + case e: Exception => + } + } + + def main(args: Array[String]) { + if (args.length < 3) { + printUsage + } + + val generateRandomly = false + + val streamReceiverIP = args(0) + val streamReceiverPort = args(1).toInt + val sentenceFile = args(2) + val sentencesPerSecond = if (args.length > 3) args(3).toInt else 10 + val sentenceInputName = if (args.length > 4) args(4) else "Sentences" + + println("Sending " + sentencesPerSecond + " sentences per second to " + + streamReceiverIP + ":" + streamReceiverPort + "/NetworkStreamReceiver-" + sentenceInputName) + val source = Source.fromFile(sentenceFile) + val lines = source.mkString.split ("\n") + source.close () + + val streamReceiver = select( + Node(streamReceiverIP, streamReceiverPort), + Symbol("NetworkStreamReceiver-" + sentenceInputName)) + if (generateRandomly) { + generateRandomSentences(lines, sentencesPerSecond, streamReceiver) + } else { + generateSameSentences(lines, sentencesPerSecond, streamReceiver) + } + } +} + + + diff --git a/streaming/src/main/scala/spark/stream/ShuffleTest.scala b/streaming/src/main/scala/spark/stream/ShuffleTest.scala new file mode 100644 index 0000000000..5ad56f6777 --- /dev/null +++ b/streaming/src/main/scala/spark/stream/ShuffleTest.scala @@ -0,0 +1,22 @@ +package spark.stream +import spark.SparkContext +import SparkContext._ + +object ShuffleTest { + def main(args: Array[String]) { + + if (args.length < 1) { + println ("Usage: ShuffleTest <host>") + System.exit(1) + } + + val sc = new spark.SparkContext(args(0), "ShuffleTest") + val rdd = sc.parallelize(1 to 1000, 500).cache + + def time(f: => Unit) { val start = System.nanoTime; f; println((System.nanoTime - start) * 1.0e-6) } + + time { for (i <- 0 until 50) time { rdd.map(x => (x % 100, x)).reduceByKey(_ + _, 10).count } } + System.exit(0) + } +} + diff --git a/streaming/src/main/scala/spark/stream/SimpleWordCount.scala b/streaming/src/main/scala/spark/stream/SimpleWordCount.scala new file mode 100644 index 0000000000..c53fe35f44 --- /dev/null +++ b/streaming/src/main/scala/spark/stream/SimpleWordCount.scala @@ -0,0 +1,30 @@ +package spark.stream + +import SparkStreamContext._ + +import scala.util.Sorting + +object SimpleWordCount { + + def main (args: Array[String]) { + + if (args.length < 1) { + println ("Usage: SparkStreamContext <host> [<temp directory>]") + System.exit(1) + } + + val ssc = new SparkStreamContext(args(0), "WordCount") + if (args.length > 1) { + ssc.setTempDir(args(1)) + } + val sentences = ssc.readNetworkStream[String]("Sentences", Array("localhost:55119"), 2000) + /*sentences.print*/ + + val words = sentences.flatMap(_.split(" ")) + + val counts = words.map(x => (x, 1)).reduceByKey(_ + _, 1) + counts.print + + ssc.run + } +} diff --git a/streaming/src/main/scala/spark/stream/SimpleWordCount2.scala b/streaming/src/main/scala/spark/stream/SimpleWordCount2.scala new file mode 100644 index 0000000000..1a2c67cd4d --- /dev/null +++ b/streaming/src/main/scala/spark/stream/SimpleWordCount2.scala @@ -0,0 +1,51 @@ +package spark.stream + +import spark.SparkContext +import SparkContext._ +import SparkStreamContext._ + +import scala.util.Sorting + +object SimpleWordCount2 { + + def moreWarmup(sc: SparkContext) { + (0 until 20).foreach {i => + sc.parallelize(1 to 20000000, 500) + .map(_ % 100).map(_.toString) + .map(x => (x, 1)).reduceByKey(_ + _, 10) + .collect() + } + } + + def main (args: Array[String]) { + + if (args.length < 2) { + println ("Usage: SimpleWordCount2 <host> <# sentence streams>") + System.exit(1) + } + + val ssc = new SparkStreamContext(args(0), "SimpleWordCount2") + + val numSentenceStreams = if (args.length > 1) args(1).toInt else 1 + if (args.length > 2) { + ssc.setTempDir(args(2)) + } + + GrepCount2.warmConnectionManagers(ssc.sc) + moreWarmup(ssc.sc) + + val sentences = new UnifiedRDS( + (1 to numSentenceStreams).map(i => ssc.readTestStream("Sentences-" + i, 1000)).toArray + ) + + + val words = sentences.flatMap(_.split(" ")) + + val counts = words.map(x => (x, 1)).reduceByKey(_ + _, 10) + counts.foreachRDD(_.collect()) + /*words.foreachRDD(_.countByValue())*/ + + ssc.run + } +} + diff --git a/streaming/src/main/scala/spark/stream/SimpleWordCount2_Special.scala b/streaming/src/main/scala/spark/stream/SimpleWordCount2_Special.scala new file mode 100644 index 0000000000..9003a5dbb3 --- /dev/null +++ b/streaming/src/main/scala/spark/stream/SimpleWordCount2_Special.scala @@ -0,0 +1,83 @@ +package spark.stream + +import spark.SparkContext +import SparkContext._ +import SparkStreamContext._ + +import scala.collection.JavaConversions.mapAsScalaMap +import scala.util.Sorting +import java.lang.{Long => JLong} + +object SimpleWordCount2_Special { + + def moreWarmup(sc: SparkContext) { + (0 until 20).foreach {i => + sc.parallelize(1 to 20000000, 500) + .map(_ % 100).map(_.toString) + .map(x => (x, 1)).reduceByKey(_ + _, 10) + .collect() + } + } + + def main (args: Array[String]) { + + if (args.length < 2) { + println ("Usage: SimpleWordCount2 <host> <# sentence streams>") + System.exit(1) + } + + val ssc = new SparkStreamContext(args(0), "SimpleWordCount2") + + val numSentenceStreams = if (args.length > 1) args(1).toInt else 1 + if (args.length > 2) { + ssc.setTempDir(args(2)) + } + + GrepCount2.warmConnectionManagers(ssc.sc) + moreWarmup(ssc.sc) + + val sentences = new UnifiedRDS( + (1 to numSentenceStreams).map(i => ssc.readTestStream("Sentences-" + i, 400)).toArray + ) + + + def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, JLong)] = { + val map = new java.util.HashMap[String, JLong] + var i = 0 + var j = 0 + while (iter.hasNext) { + val s = iter.next() + i = 0 + while (i < s.length) { + j = i + while (j < s.length && s.charAt(j) != ' ') { + j += 1 + } + if (j > i) { + val w = s.substring(i, j) + val c = map.get(w) + if (c == null) { + map.put(w, 1) + } else { + map.put(w, c + 1) + } + } + i = j + while (i < s.length && s.charAt(i) == ' ') { + i += 1 + } + } + } + map.toIterator + } + + + /*val words = sentences.flatMap(_.split(" "))*/ + /*val counts = words.map(x => (x, 1)).reduceByKey(_ + _, 10)*/ + val counts = sentences.mapPartitions(splitAndCountPartitions).reduceByKey(_ + _, 10) + counts.foreachRDD(_.collect()) + + ssc.run + } +} + diff --git a/streaming/src/main/scala/spark/stream/SparkStreamContext.scala b/streaming/src/main/scala/spark/stream/SparkStreamContext.scala new file mode 100644 index 0000000000..0e65196e46 --- /dev/null +++ b/streaming/src/main/scala/spark/stream/SparkStreamContext.scala @@ -0,0 +1,105 @@ +package spark.stream + +import spark.SparkContext +import spark.SparkEnv +import spark.Utils +import spark.Logging + +import scala.collection.mutable.ArrayBuffer + +import java.net.InetSocketAddress +import java.io.IOException +import java.util.UUID + +import org.apache.hadoop.fs.Path +import org.apache.hadoop.conf.Configuration + +import akka.actor._ +import akka.actor.Actor +import akka.util.duration._ + +class SparkStreamContext ( + master: String, + frameworkName: String, + val sparkHome: String = null, + val jars: Seq[String] = Nil) + extends Logging { + + initLogging() + + val sc = new SparkContext(master, frameworkName, sparkHome, jars) + val env = SparkEnv.get + val actorSystem = env.actorSystem + + @transient val inputRDSs = new ArrayBuffer[InputRDS[_]]() + @transient val outputRDSs = new ArrayBuffer[RDS[_]]() + + var tempDirRoot: String = null + var tempDir: Path = null + + def readNetworkStream[T: ClassManifest]( + name: String, + addresses: Array[InetSocketAddress], + batchDuration: Time): RDS[T] = { + + val inputRDS = new NetworkInputRDS[T](name, addresses, batchDuration, this) + inputRDSs += inputRDS + inputRDS + } + + def readNetworkStream[T: ClassManifest]( + name: String, + addresses: Array[String], + batchDuration: Long): RDS[T] = { + + def stringToInetSocketAddress (str: String): InetSocketAddress = { + val parts = str.split(":") + if (parts.length != 2) { + throw new IllegalArgumentException ("Address format error") + } + new InetSocketAddress(parts(0), parts(1).toInt) + } + + readNetworkStream( + name, + addresses.map(stringToInetSocketAddress).toArray, + LongTime(batchDuration)) + } + + def readFileStream(name: String, directory: String): RDS[String] = { + val path = new Path(directory) + val fs = path.getFileSystem(new Configuration()) + val qualPath = path.makeQualified(fs) + val inputRDS = new FileInputRDS(name, qualPath.toString, this) + inputRDSs += inputRDS + inputRDS + } + + def readTestStream(name: String, batchDuration: Long): RDS[String] = { + val inputRDS = new TestInputRDS(name, LongTime(batchDuration), this) + inputRDSs += inputRDS + inputRDS + } + + def registerOutputStream (outputRDS: RDS[_]) { + outputRDSs += outputRDS + } + + def setTempDir(dir: String) { + tempDirRoot = dir + } + + def run () { + val ctxt = this + val actor = actorSystem.actorOf( + Props(new Scheduler(ctxt, inputRDSs.toArray, outputRDSs.toArray)), + name = "SparkStreamScheduler") + logInfo("Registered actor") + actorSystem.awaitTermination() + } +} + +object SparkStreamContext { + implicit def rdsToPairRdsFunctions [K: ClassManifest, V: ClassManifest] (rds: RDS[(K,V)]) = + new PairRDSFunctions (rds) +} diff --git a/streaming/src/main/scala/spark/stream/TestGenerator.scala b/streaming/src/main/scala/spark/stream/TestGenerator.scala new file mode 100644 index 0000000000..738ce17452 --- /dev/null +++ b/streaming/src/main/scala/spark/stream/TestGenerator.scala @@ -0,0 +1,107 @@ +package spark.stream + +import scala.util.Random +import scala.io.Source +import scala.actors._ +import scala.actors.Actor._ +import scala.actors.remote._ +import scala.actors.remote.RemoteActor._ + +import java.net.InetSocketAddress + + +object TestGenerator { + + def printUsage { + println ("Usage: SentenceGenerator <target IP> <target port> <sentence file> [<sentences per second>]") + System.exit(0) + } + /* + def generateRandomSentences(lines: Array[String], sentencesPerSecond: Int, streamReceiver: AbstractActor) { + val sleepBetweenSentences = 1000.0 / sentencesPerSecond.toDouble - 1 + val random = new Random () + + try { + var lastPrintTime = System.currentTimeMillis() + var count = 0 + while(true) { + streamReceiver ! lines(random.nextInt(lines.length)) + count += 1 + if (System.currentTimeMillis - lastPrintTime >= 1000) { + println (count + " sentences sent last second") + count = 0 + lastPrintTime = System.currentTimeMillis + } + Thread.sleep(sleepBetweenSentences.toLong) + } + } catch { + case e: Exception => + } + }*/ + + def generateSameSentences(lines: Array[String], sentencesPerSecond: Int, streamReceiver: AbstractActor) { + try { + val numSentences = if (sentencesPerSecond <= 0) { + lines.length + } else { + sentencesPerSecond + } + val sentences = lines.take(numSentences).toArray + + var nextSendingTime = System.currentTimeMillis() + val sendAsArray = true + while(true) { + if (sendAsArray) { + println("Sending as array") + streamReceiver !? sentences + } else { + println("Sending individually") + sentences.foreach(sentence => { + streamReceiver !? sentence + }) + } + println ("Sent " + numSentences + " sentences in " + (System.currentTimeMillis - nextSendingTime) + " ms") + nextSendingTime += 1000 + val sleepTime = nextSendingTime - System.currentTimeMillis + if (sleepTime > 0) { + println ("Sleeping for " + sleepTime + " ms") + Thread.sleep(sleepTime) + } + } + } catch { + case e: Exception => + } + } + + def main(args: Array[String]) { + if (args.length < 3) { + printUsage + } + + val generateRandomly = false + + val streamReceiverIP = args(0) + val streamReceiverPort = args(1).toInt + val sentenceFile = args(2) + val sentencesPerSecond = if (args.length > 3) args(3).toInt else 10 + val sentenceInputName = if (args.length > 4) args(4) else "Sentences" + + println("Sending " + sentencesPerSecond + " sentences per second to " + + streamReceiverIP + ":" + streamReceiverPort + "/NetworkStreamReceiver-" + sentenceInputName) + val source = Source.fromFile(sentenceFile) + val lines = source.mkString.split ("\n") + source.close () + + val streamReceiver = select( + Node(streamReceiverIP, streamReceiverPort), + Symbol("NetworkStreamReceiver-" + sentenceInputName)) + if (generateRandomly) { + /*generateRandomSentences(lines, sentencesPerSecond, streamReceiver)*/ + } else { + generateSameSentences(lines, sentencesPerSecond, streamReceiver) + } + } +} + + + diff --git a/streaming/src/main/scala/spark/stream/TestGenerator2.scala b/streaming/src/main/scala/spark/stream/TestGenerator2.scala new file mode 100644 index 0000000000..ceb4730e72 --- /dev/null +++ b/streaming/src/main/scala/spark/stream/TestGenerator2.scala @@ -0,0 +1,119 @@ +package spark.stream + +import scala.util.Random +import scala.io.Source +import scala.actors._ +import scala.actors.Actor._ +import scala.actors.remote._ +import scala.actors.remote.RemoteActor._ + +import java.io.{DataOutputStream, ByteArrayOutputStream, DataInputStream} +import java.net.Socket + +object TestGenerator2 { + + def printUsage { + println ("Usage: SentenceGenerator <target IP> <target port> <sentence file> [<sentences per second>]") + System.exit(0) + } + + def sendSentences(streamReceiverHost: String, streamReceiverPort: Int, numSentences: Int, bytes: Array[Byte], intervalTime: Long){ + try { + println("Connecting to " + streamReceiverHost + ":" + streamReceiverPort) + val socket = new Socket(streamReceiverHost, streamReceiverPort) + + println("Sending " + numSentences+ " sentences / " + (bytes.length / 1024.0 / 1024.0) + " MB per " + intervalTime + " ms to " + streamReceiverHost + ":" + streamReceiverPort ) + val currentTime = System.currentTimeMillis + var targetTime = (currentTime / intervalTime + 1).toLong * intervalTime + Thread.sleep(targetTime - currentTime) + + while(true) { + val startTime = System.currentTimeMillis() + println("Sending at " + startTime + " ms with delay of " + (startTime - targetTime) + " ms") + val socketOutputStream = socket.getOutputStream + val parts = 10 + (0 until parts).foreach(i => { + val partStartTime = System.currentTimeMillis + + val offset = (i * bytes.length / parts).toInt + val len = math.min(((i + 1) * bytes.length / parts).toInt - offset, bytes.length) + socketOutputStream.write(bytes, offset, len) + socketOutputStream.flush() + val partFinishTime = System.currentTimeMillis + println("Sending part " + i + " of " + len + " bytes took " + (partFinishTime - partStartTime) + " ms") + val sleepTime = math.max(0, 1000 / parts - (partFinishTime - partStartTime) - 1) + Thread.sleep(sleepTime) + }) + + socketOutputStream.flush() + /*val socketInputStream = new DataInputStream(socket.getInputStream)*/ + /*val reply = socketInputStream.readUTF()*/ + val finishTime = System.currentTimeMillis() + println ("Sent " + bytes.length + " bytes in " + (finishTime - startTime) + " ms for interval [" + targetTime + ", " + (targetTime + intervalTime) + "]") + /*println("Received = " + reply)*/ + targetTime = targetTime + intervalTime + val sleepTime = (targetTime - finishTime) + 10 + if (sleepTime > 0) { + println("Sleeping for " + sleepTime + " ms") + Thread.sleep(sleepTime) + } else { + println("############################") + println("###### Skipping sleep ######") + println("############################") + } + } + } catch { + case e: Exception => println(e) + } + println("Stopped sending") + } + + def main(args: Array[String]) { + if (args.length < 4) { + printUsage + } + + val streamReceiverHost = args(0) + val streamReceiverPort = args(1).toInt + val sentenceFile = args(2) + val intervalTime = args(3).toLong + val sentencesPerInterval = if (args.length > 4) args(4).toInt else 0 + + println("Reading the file " + sentenceFile) + val source = Source.fromFile(sentenceFile) + val lines = source.mkString.split ("\n") + source.close() + + val numSentences = if (sentencesPerInterval <= 0) { + lines.length + } else { + sentencesPerInterval + } + + println("Generating sentences") + val sentences: Array[String] = if (numSentences <= lines.length) { + lines.take(numSentences).toArray + } else { + (0 until numSentences).map(i => lines(i % lines.length)).toArray + } + + println("Converting to byte array") + val byteStream = new ByteArrayOutputStream() + val stringDataStream = new DataOutputStream(byteStream) + /*stringDataStream.writeInt(sentences.size)*/ + sentences.foreach(stringDataStream.writeUTF) + val bytes = byteStream.toByteArray() + stringDataStream.close() + println("Generated array of " + bytes.length + " bytes") + + /*while(true) { */ + sendSentences(streamReceiverHost, streamReceiverPort, numSentences, bytes, intervalTime) + /*println("Sleeping for 5 seconds")*/ + /*Thread.sleep(5000)*/ + /*System.gc()*/ + /*}*/ + } +} + + + diff --git a/streaming/src/main/scala/spark/stream/TestGenerator4.scala b/streaming/src/main/scala/spark/stream/TestGenerator4.scala new file mode 100644 index 0000000000..edeb969d7c --- /dev/null +++ b/streaming/src/main/scala/spark/stream/TestGenerator4.scala @@ -0,0 +1,244 @@ +package spark.stream + +import spark.Logging + +import scala.util.Random +import scala.io.Source +import scala.collection.mutable.{ArrayBuffer, Queue} + +import java.net._ +import java.io._ +import java.nio._ +import java.nio.charset._ +import java.nio.channels._ + +import it.unimi.dsi.fastutil.io._ + +class TestGenerator4(targetHost: String, targetPort: Int, sentenceFile: String, intervalDuration: Long, sentencesPerInterval: Int) +extends Logging { + + class SendingConnectionHandler(host: String, port: Int, generator: TestGenerator4) + extends ConnectionHandler(host, port, true) { + + val buffers = new ArrayBuffer[ByteBuffer] + val newBuffers = new Queue[ByteBuffer] + var activeKey: SelectionKey = null + + def send(buffer: ByteBuffer) { + logDebug("Sending: " + buffer) + newBuffers.synchronized { + newBuffers.enqueue(buffer) + } + selector.wakeup() + buffer.synchronized { + buffer.wait() + } + } + + override def ready(key: SelectionKey) { + logDebug("Ready") + activeKey = key + val channel = key.channel.asInstanceOf[SocketChannel] + channel.register(selector, SelectionKey.OP_WRITE) + generator.startSending() + } + + override def preSelect() { + newBuffers.synchronized { + while(!newBuffers.isEmpty) { + val buffer = newBuffers.dequeue + buffers += buffer + logDebug("Added: " + buffer) + changeInterest(activeKey, SelectionKey.OP_WRITE) + } + } + } + + override def write(key: SelectionKey) { + try { + /*while(true) {*/ + val channel = key.channel.asInstanceOf[SocketChannel] + if (buffers.size > 0) { + val buffer = buffers(0) + val newBuffer = buffer.slice() + newBuffer.limit(math.min(newBuffer.remaining, 32768)) + val bytesWritten = channel.write(newBuffer) + buffer.position(buffer.position + bytesWritten) + if (bytesWritten == 0) return + if (buffer.remaining == 0) { + buffers -= buffer + buffer.synchronized { + buffer.notify() + } + } + /*changeInterest(key, SelectionKey.OP_WRITE)*/ + } else { + changeInterest(key, 0) + } + /*}*/ + } catch { + case e: IOException => { + if (e.toString.contains("pipe") || e.toString.contains("reset")) { + logError("Connection broken") + } else { + logError("Connection error", e) + } + close(key) + } + } + } + + override def close(key: SelectionKey) { + buffers.clear() + super.close(key) + } + } + + initLogging() + + val connectionHandler = new SendingConnectionHandler(targetHost, targetPort, this) + var sendingThread: Thread = null + var sendCount = 0 + val sendBatches = 5 + + def run() { + logInfo("Connection handler started") + connectionHandler.start() + connectionHandler.join() + if (sendingThread != null && !sendingThread.isInterrupted) { + sendingThread.interrupt + } + logInfo("Connection handler stopped") + } + + def startSending() { + sendingThread = new Thread() { + override def run() { + logInfo("STARTING TO SEND") + sendSentences() + logInfo("SENDING STOPPED AFTER " + sendCount) + connectionHandler.interrupt() + } + } + sendingThread.start() + } + + def stopSending() { + sendingThread.interrupt() + } + + def sendSentences() { + logInfo("Reading the file " + sentenceFile) + val source = Source.fromFile(sentenceFile) + val lines = source.mkString.split ("\n") + source.close() + + val numSentences = if (sentencesPerInterval <= 0) { + lines.length + } else { + sentencesPerInterval + } + + logInfo("Generating sentence buffer") + val sentences: Array[String] = if (numSentences <= lines.length) { + lines.take(numSentences).toArray + } else { + (0 until numSentences).map(i => lines(i % lines.length)).toArray + } + + /* + val sentences: Array[String] = if (numSentences <= lines.length) { + lines.take((numSentences / sendBatches).toInt).toArray + } else { + (0 until (numSentences/sendBatches)).map(i => lines(i % lines.length)).toArray + }*/ + + + val serializer = new spark.KryoSerializer().newInstance() + val byteStream = new FastByteArrayOutputStream(100 * 1024 * 1024) + serializer.serializeStream(byteStream).writeAll(sentences.toIterator.asInstanceOf[Iterator[Any]]).close() + byteStream.trim() + val sentenceBuffer = ByteBuffer.wrap(byteStream.array) + + logInfo("Sending " + numSentences+ " sentences / " + sentenceBuffer.limit + " bytes per " + intervalDuration + " ms to " + targetHost + ":" + targetPort ) + val currentTime = System.currentTimeMillis + var targetTime = (currentTime / intervalDuration + 1).toLong * intervalDuration + Thread.sleep(targetTime - currentTime) + + val totalBytes = sentenceBuffer.limit + + while(true) { + val batchesInCurrentInterval = sendBatches // if (sendCount < 10) 1 else sendBatches + + val startTime = System.currentTimeMillis() + logDebug("Sending # " + sendCount + " at " + startTime + " ms with delay of " + (startTime - targetTime) + " ms") + + (0 until batchesInCurrentInterval).foreach(i => { + try { + val position = (i * totalBytes / sendBatches).toInt + val limit = if (i == sendBatches - 1) { + totalBytes + } else { + ((i + 1) * totalBytes / sendBatches).toInt - 1 + } + + val partStartTime = System.currentTimeMillis + sentenceBuffer.limit(limit) + connectionHandler.send(sentenceBuffer) + val partFinishTime = System.currentTimeMillis + val sleepTime = math.max(0, intervalDuration / sendBatches - (partFinishTime - partStartTime) - 1) + Thread.sleep(sleepTime) + + } catch { + case ie: InterruptedException => return + case e: Exception => e.printStackTrace() + } + }) + sentenceBuffer.rewind() + + val finishTime = System.currentTimeMillis() + /*logInfo ("Sent " + sentenceBuffer.limit + " bytes in " + (finishTime - startTime) + " ms")*/ + targetTime = targetTime + intervalDuration //+ (if (sendCount < 3) 1000 else 0) + + val sleepTime = (targetTime - finishTime) + 20 + if (sleepTime > 0) { + logInfo("Sleeping for " + sleepTime + " ms") + Thread.sleep(sleepTime) + } else { + logInfo("###### Skipping sleep ######") + } + if (Thread.currentThread.isInterrupted) { + return + } + sendCount += 1 + } + } +} + +object TestGenerator4 { + def printUsage { + println("Usage: TestGenerator4 <target IP> <target port> <sentence file> <interval duration> [<sentences per second>]") + System.exit(0) + } + + def main(args: Array[String]) { + println("GENERATOR STARTED") + if (args.length < 4) { + printUsage + } + + + val streamReceiverHost = args(0) + val streamReceiverPort = args(1).toInt + val sentenceFile = args(2) + val intervalDuration = args(3).toLong + val sentencesPerInterval = if (args.length > 4) args(4).toInt else 0 + + while(true) { + val generator = new TestGenerator4(streamReceiverHost, streamReceiverPort, sentenceFile, intervalDuration, sentencesPerInterval) + generator.run() + Thread.sleep(2000) + } + println("GENERATOR STOPPED") + } +} diff --git a/streaming/src/main/scala/spark/stream/TestInputBlockTracker.scala b/streaming/src/main/scala/spark/stream/TestInputBlockTracker.scala new file mode 100644 index 0000000000..da3b964407 --- /dev/null +++ b/streaming/src/main/scala/spark/stream/TestInputBlockTracker.scala @@ -0,0 +1,42 @@ +package spark.stream +import spark.Logging +import scala.collection.mutable.{ArrayBuffer, HashMap} + +object TestInputBlockTracker extends Logging { + initLogging() + val allBlockIds = new HashMap[Time, ArrayBuffer[String]]() + + def addBlocks(intervalEndTime: Time, reference: AnyRef) { + allBlockIds.getOrElseUpdate(intervalEndTime, new ArrayBuffer[String]()) ++= reference.asInstanceOf[Array[String]] + } + + def setEndTime(intervalEndTime: Time) { + try { + val endTime = System.currentTimeMillis + allBlockIds.get(intervalEndTime) match { + case Some(blockIds) => { + val numBlocks = blockIds.size + var totalDelay = 0d + blockIds.foreach(blockId => { + val inputTime = getInputTime(blockId) + val delay = (endTime - inputTime) / 1000.0 + totalDelay += delay + logInfo("End-to-end delay for block " + blockId + " is " + delay + " s") + }) + logInfo("Average end-to-end delay for time " + intervalEndTime + " is " + (totalDelay / numBlocks) + " s") + allBlockIds -= intervalEndTime + } + case None => throw new Exception("Unexpected") + } + } catch { + case e: Exception => logError(e.toString) + } + } + + def getInputTime(blockId: String): Long = { + val parts = blockId.split("-") + /*logInfo(blockId + " -> " + parts(4)) */ + parts(4).toLong + } +} + diff --git a/streaming/src/main/scala/spark/stream/TestStreamCoordinator.scala b/streaming/src/main/scala/spark/stream/TestStreamCoordinator.scala new file mode 100644 index 0000000000..add166fbd9 --- /dev/null +++ b/streaming/src/main/scala/spark/stream/TestStreamCoordinator.scala @@ -0,0 +1,38 @@ +package spark.stream + +import spark.Logging + +import akka.actor._ +import akka.actor.Actor +import akka.actor.Actor._ + +sealed trait TestStreamCoordinatorMessage +case class GetStreamDetails extends TestStreamCoordinatorMessage +case class GotStreamDetails(name: String, duration: Long) extends TestStreamCoordinatorMessage +case class TestStarted extends TestStreamCoordinatorMessage + +class TestStreamCoordinator(streamDetails: Array[(String, Long)]) extends Actor with Logging { + + var index = 0 + + initLogging() + + logInfo("Created") + + def receive = { + case TestStarted => { + sender ! "OK" + } + + case GetStreamDetails => { + val streamDetail = if (index >= streamDetails.length) null else streamDetails(index) + sender ! GotStreamDetails(streamDetail._1, streamDetail._2) + index += 1 + if (streamDetail != null) { + logInfo("Allocated " + streamDetail._1 + " (" + index + "/" + streamDetails.length + ")" ) + } + } + } + +} + diff --git a/streaming/src/main/scala/spark/stream/TestStreamReceiver3.scala b/streaming/src/main/scala/spark/stream/TestStreamReceiver3.scala new file mode 100644 index 0000000000..9cc342040b --- /dev/null +++ b/streaming/src/main/scala/spark/stream/TestStreamReceiver3.scala @@ -0,0 +1,420 @@ +package spark.stream + +import spark._ +import spark.storage._ +import spark.util.AkkaUtils + +import scala.math._ +import scala.collection.mutable.{Queue, HashMap, ArrayBuffer, SynchronizedMap} + +import akka.actor._ +import akka.actor.Actor +import akka.dispatch._ +import akka.pattern.ask +import akka.util.duration._ + +import java.io.DataInputStream +import java.io.BufferedInputStream +import java.net.Socket +import java.net.ServerSocket +import java.util.LinkedHashMap + +import org.apache.hadoop.fs._ +import org.apache.hadoop.conf._ +import org.apache.hadoop.io._ +import org.apache.hadoop.mapred._ +import org.apache.hadoop.util._ + +import spark.Utils + + +class TestStreamReceiver3(actorSystem: ActorSystem, blockManager: BlockManager) +extends Thread with Logging { + + + class DataHandler( + inputName: String, + longIntervalDuration: LongTime, + shortIntervalDuration: LongTime, + blockManager: BlockManager + ) + extends Logging { + + class Block(var id: String, var shortInterval: Interval) { + val data = ArrayBuffer[String]() + var pushed = false + def longInterval = getLongInterval(shortInterval) + def empty() = (data.size == 0) + def += (str: String) = (data += str) + override def toString() = "Block " + id + } + + class Bucket(val longInterval: Interval) { + val blocks = new ArrayBuffer[Block]() + var filled = false + def += (block: Block) = blocks += block + def empty() = (blocks.size == 0) + def ready() = (filled && !blocks.exists(! _.pushed)) + def blockIds() = blocks.map(_.id).toArray + override def toString() = "Bucket [" + longInterval + ", " + blocks.size + " blocks]" + } + + initLogging() + + val shortIntervalDurationMillis = shortIntervalDuration.asInstanceOf[LongTime].milliseconds + val longIntervalDurationMillis = longIntervalDuration.asInstanceOf[LongTime].milliseconds + + var currentBlock: Block = null + var currentBucket: Bucket = null + + val blocksForPushing = new Queue[Block]() + val buckets = new HashMap[Interval, Bucket]() with SynchronizedMap[Interval, Bucket] + + val blockUpdatingThread = new Thread() { override def run() { keepUpdatingCurrentBlock() } } + val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } } + + def start() { + blockUpdatingThread.start() + blockPushingThread.start() + } + + def += (data: String) = addData(data) + + def addData(data: String) { + if (currentBlock == null) { + updateCurrentBlock() + } + currentBlock.synchronized { + currentBlock += data + } + } + + def getShortInterval(time: Time): Interval = { + val intervalBegin = time.floor(shortIntervalDuration) + Interval(intervalBegin, intervalBegin + shortIntervalDuration) + } + + def getLongInterval(shortInterval: Interval): Interval = { + val intervalBegin = shortInterval.beginTime.floor(longIntervalDuration) + Interval(intervalBegin, intervalBegin + longIntervalDuration) + } + + def updateCurrentBlock() { + /*logInfo("Updating current block")*/ + val currentTime: LongTime = LongTime(System.currentTimeMillis) + val shortInterval = getShortInterval(currentTime) + val longInterval = getLongInterval(shortInterval) + + def createBlock(reuseCurrentBlock: Boolean = false) { + val newBlockId = inputName + "-" + longInterval.toFormattedString + "-" + currentBucket.blocks.size + if (!reuseCurrentBlock) { + val newBlock = new Block(newBlockId, shortInterval) + /*logInfo("Created " + currentBlock)*/ + currentBlock = newBlock + } else { + currentBlock.shortInterval = shortInterval + currentBlock.id = newBlockId + } + } + + def createBucket() { + val newBucket = new Bucket(longInterval) + buckets += ((longInterval, newBucket)) + currentBucket = newBucket + /*logInfo("Created " + currentBucket + ", " + buckets.size + " buckets")*/ + } + + if (currentBlock == null || currentBucket == null) { + createBucket() + currentBucket.synchronized { + createBlock() + } + return + } + + currentBlock.synchronized { + var reuseCurrentBlock = false + + if (shortInterval != currentBlock.shortInterval) { + if (!currentBlock.empty) { + blocksForPushing.synchronized { + blocksForPushing += currentBlock + blocksForPushing.notifyAll() + } + } + + currentBucket.synchronized { + if (currentBlock.empty) { + reuseCurrentBlock = true + } else { + currentBucket += currentBlock + } + + if (longInterval != currentBucket.longInterval) { + currentBucket.filled = true + if (currentBucket.ready) { + currentBucket.notifyAll() + } + createBucket() + } + } + + createBlock(reuseCurrentBlock) + } + } + } + + def pushBlock(block: Block) { + try{ + if (blockManager != null) { + logInfo("Pushing block") + val startTime = System.currentTimeMillis + + val bytes = blockManager.dataSerialize(block.data.toIterator) + val finishTime = System.currentTimeMillis + logInfo(block + " serialization delay is " + (finishTime - startTime) / 1000.0 + " s") + + blockManager.putBytes(block.id.toString, bytes, StorageLevel.DISK_AND_MEMORY_2) + /*blockManager.putBytes(block.id.toString, bytes, StorageLevel.DISK_AND_MEMORY_DESER_2)*/ + /*blockManager.put(block.id.toString, block.data.toIterator, StorageLevel.DISK_AND_MEMORY_DESER)*/ + /*blockManager.put(block.id.toString, block.data.toIterator, StorageLevel.DISK_AND_MEMORY)*/ + val finishTime1 = System.currentTimeMillis + logInfo(block + " put delay is " + (finishTime1 - startTime) / 1000.0 + " s") + } else { + logWarning(block + " not put as block manager is null") + } + } catch { + case e: Exception => logError("Exception writing " + block + " to blockmanager" , e) + } + } + + def getBucket(longInterval: Interval): Option[Bucket] = { + buckets.get(longInterval) + } + + def clearBucket(longInterval: Interval) { + buckets.remove(longInterval) + } + + def keepUpdatingCurrentBlock() { + logInfo("Thread to update current block started") + while(true) { + updateCurrentBlock() + val currentTimeMillis = System.currentTimeMillis + val sleepTimeMillis = (currentTimeMillis / shortIntervalDurationMillis + 1) * + shortIntervalDurationMillis - currentTimeMillis + 1 + Thread.sleep(sleepTimeMillis) + } + } + + def keepPushingBlocks() { + var loop = true + logInfo("Thread to push blocks started") + while(loop) { + val block = blocksForPushing.synchronized { + if (blocksForPushing.size == 0) { + blocksForPushing.wait() + } + blocksForPushing.dequeue + } + pushBlock(block) + block.pushed = true + block.data.clear() + + val bucket = buckets(block.longInterval) + bucket.synchronized { + if (bucket.ready) { + bucket.notifyAll() + } + } + } + } + } + + + class ConnectionListener(port: Int, dataHandler: DataHandler) + extends Thread with Logging { + initLogging() + override def run { + try { + val listener = new ServerSocket(port) + logInfo("Listening on port " + port) + while (true) { + new ConnectionHandler(listener.accept(), dataHandler).start(); + } + listener.close() + } catch { + case e: Exception => logError("", e); + } + } + } + + class ConnectionHandler(socket: Socket, dataHandler: DataHandler) extends Thread with Logging { + initLogging() + override def run { + logInfo("New connection from " + socket.getInetAddress() + ":" + socket.getPort) + val bytes = new Array[Byte](100 * 1024 * 1024) + try { + + val inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream, 1024 * 1024)) + /*val inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream))*/ + var str: String = null + str = inputStream.readUTF + while(str != null) { + dataHandler += str + str = inputStream.readUTF() + } + + /* + var loop = true + while(loop) { + val numRead = inputStream.read(bytes) + if (numRead < 0) { + loop = false + } + inbox += ((LongTime(SystemTime.currentTimeMillis), "test")) + }*/ + + inputStream.close() + } catch { + case e => logError("Error receiving data", e) + } + socket.close() + } + } + + initLogging() + + val masterHost = System.getProperty("spark.master.host") + val masterPort = System.getProperty("spark.master.port").toInt + + val akkaPath = "akka://spark@%s:%s/user/".format(masterHost, masterPort) + val sparkstreamScheduler = actorSystem.actorFor(akkaPath + "/SparkStreamScheduler") + val testStreamCoordinator = actorSystem.actorFor(akkaPath + "/TestStreamCoordinator") + + logInfo("Getting stream details from master " + masterHost + ":" + masterPort) + + val timeout = 50 millis + + var started = false + while (!started) { + askActor[String](testStreamCoordinator, TestStarted) match { + case Some(str) => { + started = true + logInfo("TestStreamCoordinator started") + } + case None => { + logInfo("TestStreamCoordinator not started yet") + Thread.sleep(200) + } + } + } + + val streamDetails = askActor[GotStreamDetails](testStreamCoordinator, GetStreamDetails) match { + case Some(details) => details + case None => throw new Exception("Could not get stream details") + } + logInfo("Stream details received: " + streamDetails) + + val inputName = streamDetails.name + val intervalDurationMillis = streamDetails.duration + val intervalDuration = LongTime(intervalDurationMillis) + + val dataHandler = new DataHandler( + inputName, + intervalDuration, + LongTime(TestStreamReceiver3.SHORT_INTERVAL_MILLIS), + blockManager) + + val connListener = new ConnectionListener(TestStreamReceiver3.PORT, dataHandler) + + // Send a message to an actor and return an option with its reply, or None if this times out + def askActor[T](actor: ActorRef, message: Any): Option[T] = { + try { + val future = actor.ask(message)(timeout) + return Some(Await.result(future, timeout).asInstanceOf[T]) + } catch { + case e: Exception => + logInfo("Error communicating with " + actor, e) + return None + } + } + + override def run() { + connListener.start() + dataHandler.start() + + var interval = Interval.currentInterval(intervalDuration) + var dataStarted = false + + while(true) { + waitFor(interval.endTime) + logInfo("Woken up at " + System.currentTimeMillis + " for " + interval) + dataHandler.getBucket(interval) match { + case Some(bucket) => { + logInfo("Found " + bucket + " for " + interval) + bucket.synchronized { + if (!bucket.ready) { + logInfo("Waiting for " + bucket) + bucket.wait() + logInfo("Wait over for " + bucket) + } + if (dataStarted || !bucket.empty) { + logInfo("Notifying " + bucket) + notifyScheduler(interval, bucket.blockIds) + dataStarted = true + } + bucket.blocks.clear() + dataHandler.clearBucket(interval) + } + } + case None => { + logInfo("Found none for " + interval) + if (dataStarted) { + logInfo("Notifying none") + notifyScheduler(interval, Array[String]()) + } + } + } + interval = interval.next + } + } + + def waitFor(time: Time) { + val currentTimeMillis = System.currentTimeMillis + val targetTimeMillis = time.asInstanceOf[LongTime].milliseconds + if (currentTimeMillis < targetTimeMillis) { + val sleepTime = (targetTimeMillis - currentTimeMillis) + Thread.sleep(sleepTime + 1) + } + } + + def notifyScheduler(interval: Interval, blockIds: Array[String]) { + try { + sparkstreamScheduler ! InputGenerated(inputName, interval, blockIds.toArray) + val time = interval.endTime.asInstanceOf[LongTime] + val delay = (System.currentTimeMillis - time.milliseconds) / 1000.0 + logInfo("Pushing delay for " + time + " is " + delay + " s") + } catch { + case _ => logError("Exception notifying scheduler at interval " + interval) + } + } +} + +object TestStreamReceiver3 { + + val PORT = 9999 + val SHORT_INTERVAL_MILLIS = 100 + + def main(args: Array[String]) { + System.setProperty("spark.master.host", Utils.localHostName) + System.setProperty("spark.master.port", "7078") + val details = Array(("Sentences", 2000L)) + val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localHostName, 7078) + actorSystem.actorOf(Props(new TestStreamCoordinator(details)), name = "TestStreamCoordinator") + new TestStreamReceiver3(actorSystem, null).start() + } +} + + + diff --git a/streaming/src/main/scala/spark/stream/TestStreamReceiver4.scala b/streaming/src/main/scala/spark/stream/TestStreamReceiver4.scala new file mode 100644 index 0000000000..e7bef75391 --- /dev/null +++ b/streaming/src/main/scala/spark/stream/TestStreamReceiver4.scala @@ -0,0 +1,373 @@ +package spark.stream + +import spark._ +import spark.storage._ +import spark.util.AkkaUtils + +import scala.math._ +import scala.collection.mutable.{Queue, HashMap, ArrayBuffer, SynchronizedMap} + +import java.io._ +import java.nio._ +import java.nio.charset._ +import java.nio.channels._ +import java.util.concurrent.Executors + +import akka.actor._ +import akka.actor.Actor +import akka.dispatch._ +import akka.pattern.ask +import akka.util.duration._ + +class TestStreamReceiver4(actorSystem: ActorSystem, blockManager: BlockManager) +extends Thread with Logging { + + class DataHandler( + inputName: String, + longIntervalDuration: LongTime, + shortIntervalDuration: LongTime, + blockManager: BlockManager + ) + extends Logging { + + class Block(val id: String, val shortInterval: Interval, val buffer: ByteBuffer) { + var pushed = false + def longInterval = getLongInterval(shortInterval) + override def toString() = "Block " + id + } + + class Bucket(val longInterval: Interval) { + val blocks = new ArrayBuffer[Block]() + var filled = false + def += (block: Block) = blocks += block + def empty() = (blocks.size == 0) + def ready() = (filled && !blocks.exists(! _.pushed)) + def blockIds() = blocks.map(_.id).toArray + override def toString() = "Bucket [" + longInterval + ", " + blocks.size + " blocks]" + } + + initLogging() + + val syncOnLastShortInterval = true + + val shortIntervalDurationMillis = shortIntervalDuration.asInstanceOf[LongTime].milliseconds + val longIntervalDurationMillis = longIntervalDuration.asInstanceOf[LongTime].milliseconds + + val buffer = ByteBuffer.allocateDirect(100 * 1024 * 1024) + var currentShortInterval = Interval.currentInterval(shortIntervalDuration) + + val blocksForPushing = new Queue[Block]() + val buckets = new HashMap[Interval, Bucket]() with SynchronizedMap[Interval, Bucket] + + val bufferProcessingThread = new Thread() { override def run() { keepProcessingBuffers() } } + val blockPushingExecutor = Executors.newFixedThreadPool(5) + + + def start() { + buffer.clear() + if (buffer.remaining == 0) { + throw new Exception("Buffer initialization error") + } + bufferProcessingThread.start() + } + + def readDataToBuffer(func: ByteBuffer => Int): Int = { + buffer.synchronized { + if (buffer.remaining == 0) { + logInfo("Received first data for interval " + currentShortInterval) + } + func(buffer) + } + } + + def getLongInterval(shortInterval: Interval): Interval = { + val intervalBegin = shortInterval.beginTime.floor(longIntervalDuration) + Interval(intervalBegin, intervalBegin + longIntervalDuration) + } + + def processBuffer() { + + def readInt(buffer: ByteBuffer): Int = { + var offset = 0 + var result = 0 + while (offset < 32) { + val b = buffer.get() + result |= ((b & 0x7F) << offset) + if ((b & 0x80) == 0) { + return result + } + offset += 7 + } + throw new Exception("Malformed zigzag-encoded integer") + } + + val currentLongInterval = getLongInterval(currentShortInterval) + val startTime = System.currentTimeMillis + val newBuffer: ByteBuffer = buffer.synchronized { + buffer.flip() + if (buffer.remaining == 0) { + buffer.clear() + null + } else { + logDebug("Processing interval " + currentShortInterval + " with delay of " + (System.currentTimeMillis - startTime) + " ms") + val startTime1 = System.currentTimeMillis + var loop = true + var count = 0 + while(loop) { + buffer.mark() + try { + val len = readInt(buffer) + buffer.position(buffer.position + len) + count += 1 + } catch { + case e: Exception => { + buffer.reset() + loop = false + } + } + } + val bytesToCopy = buffer.position + val newBuf = ByteBuffer.allocate(bytesToCopy) + buffer.position(0) + newBuf.put(buffer.slice().limit(bytesToCopy).asInstanceOf[ByteBuffer]) + newBuf.flip() + buffer.position(bytesToCopy) + buffer.compact() + newBuf + } + } + + if (newBuffer != null) { + val bucket = buckets.getOrElseUpdate(currentLongInterval, new Bucket(currentLongInterval)) + bucket.synchronized { + val newBlockId = inputName + "-" + currentLongInterval.toFormattedString + "-" + currentShortInterval.toFormattedString + val newBlock = new Block(newBlockId, currentShortInterval, newBuffer) + if (syncOnLastShortInterval) { + bucket += newBlock + } + logDebug("Created " + newBlock + " with " + newBuffer.remaining + " bytes, creation delay is " + (System.currentTimeMillis - currentShortInterval.endTime.asInstanceOf[LongTime].milliseconds) / 1000.0 + " s" ) + blockPushingExecutor.execute(new Runnable() { def run() { pushAndNotifyBlock(newBlock) } }) + } + } + + val newShortInterval = Interval.currentInterval(shortIntervalDuration) + val newLongInterval = getLongInterval(newShortInterval) + + if (newLongInterval != currentLongInterval) { + buckets.get(currentLongInterval) match { + case Some(bucket) => { + bucket.synchronized { + bucket.filled = true + if (bucket.ready) { + bucket.notifyAll() + } + } + } + case None => + } + buckets += ((newLongInterval, new Bucket(newLongInterval))) + } + + currentShortInterval = newShortInterval + } + + def pushBlock(block: Block) { + try{ + if (blockManager != null) { + val startTime = System.currentTimeMillis + logInfo(block + " put start delay is " + (startTime - block.shortInterval.endTime.asInstanceOf[LongTime].milliseconds) + " ms") + /*blockManager.putBytes(block.id.toString, block.buffer, StorageLevel.DISK_AND_MEMORY)*/ + /*blockManager.putBytes(block.id.toString, block.buffer, StorageLevel.DISK_AND_MEMORY_2)*/ + blockManager.putBytes(block.id.toString, block.buffer, StorageLevel.MEMORY_ONLY_2) + /*blockManager.putBytes(block.id.toString, block.buffer, StorageLevel.MEMORY_ONLY)*/ + /*blockManager.putBytes(block.id.toString, block.buffer, StorageLevel.DISK_AND_MEMORY_DESER)*/ + /*blockManager.putBytes(block.id.toString, block.buffer, StorageLevel.DISK_AND_MEMORY_DESER_2)*/ + val finishTime = System.currentTimeMillis + logInfo(block + " put delay is " + (finishTime - startTime) + " ms") + } else { + logWarning(block + " not put as block manager is null") + } + } catch { + case e: Exception => logError("Exception writing " + block + " to blockmanager" , e) + } + } + + def getBucket(longInterval: Interval): Option[Bucket] = { + buckets.get(longInterval) + } + + def clearBucket(longInterval: Interval) { + buckets.remove(longInterval) + } + + def keepProcessingBuffers() { + logInfo("Thread to process buffers started") + while(true) { + processBuffer() + val currentTimeMillis = System.currentTimeMillis + val sleepTimeMillis = (currentTimeMillis / shortIntervalDurationMillis + 1) * + shortIntervalDurationMillis - currentTimeMillis + 1 + Thread.sleep(sleepTimeMillis) + } + } + + def pushAndNotifyBlock(block: Block) { + pushBlock(block) + block.pushed = true + val bucket = if (syncOnLastShortInterval) { + buckets(block.longInterval) + } else { + var longInterval = block.longInterval + while(!buckets.contains(longInterval)) { + logWarning("Skipping bucket of " + longInterval + " for " + block) + longInterval = longInterval.next + } + val chosenBucket = buckets(longInterval) + logDebug("Choosing bucket of " + longInterval + " for " + block) + chosenBucket += block + chosenBucket + } + + bucket.synchronized { + if (bucket.ready) { + bucket.notifyAll() + } + } + + } + } + + + class ReceivingConnectionHandler(host: String, port: Int, dataHandler: DataHandler) + extends ConnectionHandler(host, port, false) { + + override def ready(key: SelectionKey) { + changeInterest(key, SelectionKey.OP_READ) + } + + override def read(key: SelectionKey) { + try { + val channel = key.channel.asInstanceOf[SocketChannel] + val bytesRead = dataHandler.readDataToBuffer(channel.read) + if (bytesRead < 0) { + close(key) + } + } catch { + case e: IOException => { + logError("Error reading", e) + close(key) + } + } + } + } + + initLogging() + + val masterHost = System.getProperty("spark.master.host", "localhost") + val masterPort = System.getProperty("spark.master.port", "7078").toInt + + val akkaPath = "akka://spark@%s:%s/user/".format(masterHost, masterPort) + val sparkstreamScheduler = actorSystem.actorFor(akkaPath + "/SparkStreamScheduler") + val testStreamCoordinator = actorSystem.actorFor(akkaPath + "/TestStreamCoordinator") + + logInfo("Getting stream details from master " + masterHost + ":" + masterPort) + + val streamDetails = askActor[GotStreamDetails](testStreamCoordinator, GetStreamDetails) match { + case Some(details) => details + case None => throw new Exception("Could not get stream details") + } + logInfo("Stream details received: " + streamDetails) + + val inputName = streamDetails.name + val intervalDurationMillis = streamDetails.duration + val intervalDuration = Milliseconds(intervalDurationMillis) + val shortIntervalDuration = Milliseconds(System.getProperty("spark.stream.shortinterval", "500").toInt) + + val dataHandler = new DataHandler(inputName, intervalDuration, shortIntervalDuration, blockManager) + val connectionHandler = new ReceivingConnectionHandler("localhost", 9999, dataHandler) + + val timeout = 100 millis + + // Send a message to an actor and return an option with its reply, or None if this times out + def askActor[T](actor: ActorRef, message: Any): Option[T] = { + try { + val future = actor.ask(message)(timeout) + return Some(Await.result(future, timeout).asInstanceOf[T]) + } catch { + case e: Exception => + logInfo("Error communicating with " + actor, e) + return None + } + } + + override def run() { + connectionHandler.start() + dataHandler.start() + + var interval = Interval.currentInterval(intervalDuration) + var dataStarted = false + + + while(true) { + waitFor(interval.endTime) + /*logInfo("Woken up at " + System.currentTimeMillis + " for " + interval)*/ + dataHandler.getBucket(interval) match { + case Some(bucket) => { + logDebug("Found " + bucket + " for " + interval) + bucket.synchronized { + if (!bucket.ready) { + logDebug("Waiting for " + bucket) + bucket.wait() + logDebug("Wait over for " + bucket) + } + if (dataStarted || !bucket.empty) { + logDebug("Notifying " + bucket) + notifyScheduler(interval, bucket.blockIds) + dataStarted = true + } + bucket.blocks.clear() + dataHandler.clearBucket(interval) + } + } + case None => { + logDebug("Found none for " + interval) + if (dataStarted) { + logDebug("Notifying none") + notifyScheduler(interval, Array[String]()) + } + } + } + interval = interval.next + } + } + + def waitFor(time: Time) { + val currentTimeMillis = System.currentTimeMillis + val targetTimeMillis = time.asInstanceOf[LongTime].milliseconds + if (currentTimeMillis < targetTimeMillis) { + val sleepTime = (targetTimeMillis - currentTimeMillis) + Thread.sleep(sleepTime + 1) + } + } + + def notifyScheduler(interval: Interval, blockIds: Array[String]) { + try { + sparkstreamScheduler ! InputGenerated(inputName, interval, blockIds.toArray) + val time = interval.endTime.asInstanceOf[LongTime] + val delay = (System.currentTimeMillis - time.milliseconds) + logInfo("Notification delay for " + time + " is " + delay + " ms") + } catch { + case e: Exception => logError("Exception notifying scheduler at interval " + interval + ": " + e) + } + } +} + + +object TestStreamReceiver4 { + def main(args: Array[String]) { + val details = Array(("Sentences", 2000L)) + val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localHostName, 7078) + actorSystem.actorOf(Props(new TestStreamCoordinator(details)), name = "TestStreamCoordinator") + new TestStreamReceiver4(actorSystem, null).start() + } +} diff --git a/streaming/src/main/scala/spark/stream/Time.scala b/streaming/src/main/scala/spark/stream/Time.scala new file mode 100644 index 0000000000..25369dfee5 --- /dev/null +++ b/streaming/src/main/scala/spark/stream/Time.scala @@ -0,0 +1,85 @@ +package spark.stream + +abstract case class Time { + + // basic operations that must be overridden + def copy(): Time + def zero: Time + def < (that: Time): Boolean + def += (that: Time): Time + def -= (that: Time): Time + def floor(that: Time): Time + def isMultipleOf(that: Time): Boolean + + // derived operations composed of basic operations + def + (that: Time) = this.copy() += that + def - (that: Time) = this.copy() -= that + def * (times: Int) = { + var count = 0 + var result = this.copy() + while (count < times) { + result += this + count += 1 + } + result + } + def <= (that: Time) = (this < that || this == that) + def > (that: Time) = !(this <= that) + def >= (that: Time) = !(this < that) + def isZero = (this == zero) + def toFormattedString = toString +} + +object Time { + def Milliseconds(milliseconds: Long) = LongTime(milliseconds) + + def zero = LongTime(0) +} + +case class LongTime(var milliseconds: Long) extends Time { + + override def copy() = LongTime(this.milliseconds) + + override def zero = LongTime(0) + + override def < (that: Time): Boolean = + (this.milliseconds < that.asInstanceOf[LongTime].milliseconds) + + override def += (that: Time): Time = { + this.milliseconds += that.asInstanceOf[LongTime].milliseconds + this + } + + override def -= (that: Time): Time = { + this.milliseconds -= that.asInstanceOf[LongTime].milliseconds + this + } + + override def floor(that: Time): Time = { + val t = that.asInstanceOf[LongTime].milliseconds + val m = this.milliseconds / t + LongTime(m.toLong * t) + } + + override def isMultipleOf(that: Time): Boolean = + (this.milliseconds % that.asInstanceOf[LongTime].milliseconds == 0) + + override def isZero = (this.milliseconds == 0) + + override def toString = (milliseconds.toString + "ms") + + override def toFormattedString = milliseconds.toString +} + +object Milliseconds { + def apply(milliseconds: Long) = LongTime(milliseconds) +} + +object Seconds { + def apply(seconds: Long) = LongTime(seconds * 1000) +} + +object Minutes { + def apply(minutes: Long) = LongTime(minutes * 60000) +} + diff --git a/streaming/src/main/scala/spark/stream/TopContentCount.scala b/streaming/src/main/scala/spark/stream/TopContentCount.scala new file mode 100644 index 0000000000..a8cca4e793 --- /dev/null +++ b/streaming/src/main/scala/spark/stream/TopContentCount.scala @@ -0,0 +1,97 @@ +package spark.stream + +import SparkStreamContext._ + +import spark.storage.StorageLevel + +import scala.util.Sorting + +object TopContentCount { + + case class Event(val country: String, val content: String) + + object Event { + def create(string: String): Event = { + val parts = string.split(":") + new Event(parts(0), parts(1)) + } + } + + def main(args: Array[String]) { + + if (args.length < 2) { + println ("Usage: GrepCount2 <host> <# sentence streams>") + System.exit(1) + } + + val ssc = new SparkStreamContext(args(0), "TopContentCount") + val sc = ssc.sc + val dummy = sc.parallelize(0 to 1000, 100).persist(StorageLevel.DISK_AND_MEMORY) + sc.runJob(dummy, (_: Iterator[Int]) => {}) + + + val numEventStreams = if (args.length > 1) args(1).toInt else 1 + if (args.length > 2) { + ssc.setTempDir(args(2)) + } + + val eventStrings = new UnifiedRDS( + (1 to numEventStreams).map(i => ssc.readTestStream("Events-" + i, 1000)).toArray + ) + + def parse(string: String) = { + val parts = string.split(":") + (parts(0), parts(1)) + } + + def add(v1: Int, v2: Int) = (v1 + v2) + def subtract(v1: Int, v2: Int) = (v1 - v2) + + val events = eventStrings.map(x => parse(x)) + /*events.print*/ + + val parallelism = 8 + val counts_per_content_per_country = events + .map(x => (x, 1)) + .reduceByKey(_ + _) + /*.reduceByKeyAndWindow(add _, subtract _, Seconds(5), Seconds(1), parallelism)*/ + /*counts_per_content_per_country.print*/ + + /* + counts_per_content_per_country.persist( + StorageLevel.MEMORY_ONLY_DESER, + StorageLevel.MEMORY_ONLY_DESER_2, + Seconds(1) + )*/ + + val counts_per_country = counts_per_content_per_country + .map(x => (x._1._1, (x._1._2, x._2))) + .groupByKey() + counts_per_country.print + + + def topK(data: Seq[(String, Int)], k: Int): Array[(String, Int)] = { + implicit val countOrdering = new Ordering[(String, Int)] { + override def compare(count1: (String, Int), count2: (String, Int)): Int = { + count2._2 - count1._2 + } + } + val array = data.toArray + Sorting.quickSort(array) + val taken = array.take(k) + taken + } + + val k = 10 + val topKContents_per_country = counts_per_country + .map(x => (x._1, topK(x._2, k))) + .map(x => (x._1, x._2.map(_.toString).reduceLeft(_ + ", " + _))) + + topKContents_per_country.print + + ssc.run + } +} + + + diff --git a/streaming/src/main/scala/spark/stream/TopKWordCount2.scala b/streaming/src/main/scala/spark/stream/TopKWordCount2.scala new file mode 100644 index 0000000000..7dd06dd5ee --- /dev/null +++ b/streaming/src/main/scala/spark/stream/TopKWordCount2.scala @@ -0,0 +1,103 @@ +package spark.stream + +import spark.SparkContext +import SparkContext._ +import SparkStreamContext._ + +import spark.storage.StorageLevel + +import scala.util.Sorting + +object TopKWordCount2 { + + def moreWarmup(sc: SparkContext) { + (0 until 20).foreach {i => + sc.parallelize(1 to 20000000, 500) + .map(_ % 100).map(_.toString) + .map(x => (x, 1)).reduceByKey(_ + _, 10) + .collect() + } + } + + def main (args: Array[String]) { + + if (args.length < 2) { + println ("Usage: SparkStreamContext <host> <# sentence streams>") + System.exit(1) + } + + val ssc = new SparkStreamContext(args(0), "WordCount2") + + val numSentenceStreams = if (args.length > 1) args(1).toInt else 1 + if (args.length > 2) { + ssc.setTempDir(args(2)) + } + + GrepCount2.warmConnectionManagers(ssc.sc) + moreWarmup(ssc.sc) + + val sentences = new UnifiedRDS( + (1 to numSentenceStreams).map(i => ssc.readTestStream("Sentences-" + i, 1000)).toArray + ) + + val words = sentences.flatMap(_.split(" ")) + + def add(v1: Int, v2: Int) = (v1 + v2) + def subtract(v1: Int, v2: Int) = (v1 - v2) + + val windowedCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(add _, subtract _, Seconds(10), Seconds(1), 10) + windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER, Seconds(1)) + + def topK(data: Iterator[(String, Int)], k: Int): Iterator[(String, Int)] = { + val taken = new Array[(String, Int)](k) + + var i = 0 + var len = 0 + var done = false + var value: (String, Int) = null + var swap: (String, Int) = null + var count = 0 + + while(data.hasNext) { + value = data.next + count += 1 + println("count = " + count) + if (len == 0) { + taken(0) = value + len = 1 + } else if (len < k || value._2 > taken(len - 1)._2) { + if (len < k) { + len += 1 + } + taken(len - 1) = value + i = len - 1 + while(i > 0 && taken(i - 1)._2 < taken(i)._2) { + swap = taken(i) + taken(i) = taken(i-1) + taken(i - 1) = swap + i -= 1 + } + } + } + println("Took " + len + " out of " + count + " items") + return taken.toIterator + } + + val k = 10 + val partialTopKWindowedCounts = windowedCounts.mapPartitions(topK(_, k)) + partialTopKWindowedCounts.foreachRDD(rdd => { + val collectedCounts = rdd.collect + println("Collected " + collectedCounts.size + " items") + topK(collectedCounts.toIterator, k).foreach(println) + }) + + /* + windowedCounts.filter(_ == null).foreachRDD(rdd => { + val count = rdd.count + println("# of nulls = " + count) + })*/ + + ssc.run + } +} + diff --git a/streaming/src/main/scala/spark/stream/TopKWordCount2_Special.scala b/streaming/src/main/scala/spark/stream/TopKWordCount2_Special.scala new file mode 100644 index 0000000000..e9f3f914ae --- /dev/null +++ b/streaming/src/main/scala/spark/stream/TopKWordCount2_Special.scala @@ -0,0 +1,142 @@ +package spark.stream + +import spark.SparkContext +import SparkContext._ +import SparkStreamContext._ + +import spark.storage.StorageLevel + +import scala.util.Sorting +import scala.collection.JavaConversions.mapAsScalaMap +import scala.collection.mutable.Queue + +import java.lang.{Long => JLong} + +object TopKWordCount2_Special { + + def moreWarmup(sc: SparkContext) { + (0 until 20).foreach {i => + sc.parallelize(1 to 20000000, 500) + .map(_ % 100).map(_.toString) + .map(x => (x, 1)).reduceByKey(_ + _, 10) + .collect() + } + } + + def main (args: Array[String]) { + + if (args.length < 2) { + println ("Usage: SparkStreamContext <host> <# sentence streams>") + System.exit(1) + } + + val ssc = new SparkStreamContext(args(0), "TopKWordCount2") + + val numSentenceStreams = if (args.length > 1) args(1).toInt else 1 + if (args.length > 2) { + ssc.setTempDir(args(2)) + } + + GrepCount2.warmConnectionManagers(ssc.sc) + /*moreWarmup(ssc.sc)*/ + + val sentences = new UnifiedRDS( + (1 to numSentenceStreams).map(i => ssc.readTestStream("Sentences-" + i, 500)).toArray + ) + + /*val words = sentences.flatMap(_.split(" "))*/ + + /*def add(v1: Int, v2: Int) = (v1 + v2) */ + /*def subtract(v1: Int, v2: Int) = (v1 - v2) */ + + def add(v1: JLong, v2: JLong) = (v1 + v2) + def subtract(v1: JLong, v2: JLong) = (v1 - v2) + + def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, JLong)] = { + val map = new java.util.HashMap[String, JLong] + var i = 0 + var j = 0 + while (iter.hasNext) { + val s = iter.next() + i = 0 + while (i < s.length) { + j = i + while (j < s.length && s.charAt(j) != ' ') { + j += 1 + } + if (j > i) { + val w = s.substring(i, j) + val c = map.get(w) + if (c == null) { + map.put(w, 1) + } else { + map.put(w, c + 1) + } + } + i = j + while (i < s.length && s.charAt(i) == ' ') { + i += 1 + } + } + } + map.toIterator + } + + + val windowedCounts = sentences.mapPartitions(splitAndCountPartitions).reduceByKeyAndWindow(add _, subtract _, Seconds(10), Milliseconds(500), 10) + /*windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER, Seconds(1))*/ + windowedCounts.persist(StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY, Milliseconds(500)) + + def topK(data: Iterator[(String, JLong)], k: Int): Iterator[(String, JLong)] = { + val taken = new Array[(String, JLong)](k) + + var i = 0 + var len = 0 + var done = false + var value: (String, JLong) = null + var swap: (String, JLong) = null + var count = 0 + + while(data.hasNext) { + value = data.next + count += 1 + println("count = " + count) + if (len == 0) { + taken(0) = value + len = 1 + } else if (len < k || value._2 > taken(len - 1)._2) { + if (len < k) { + len += 1 + } + taken(len - 1) = value + i = len - 1 + while(i > 0 && taken(i - 1)._2 < taken(i)._2) { + swap = taken(i) + taken(i) = taken(i-1) + taken(i - 1) = swap + i -= 1 + } + } + } + println("Took " + len + " out of " + count + " items") + return taken.toIterator + } + + val k = 50 + val partialTopKWindowedCounts = windowedCounts.mapPartitions(topK(_, k)) + partialTopKWindowedCounts.foreachRDD(rdd => { + val collectedCounts = rdd.collect + println("Collected " + collectedCounts.size + " items") + topK(collectedCounts.toIterator, k).foreach(println) + }) + + /* + windowedCounts.filter(_ == null).foreachRDD(rdd => { + val count = rdd.count + println("# of nulls = " + count) + })*/ + + ssc.run + } +} + diff --git a/streaming/src/main/scala/spark/stream/WindowedRDS.scala b/streaming/src/main/scala/spark/stream/WindowedRDS.scala new file mode 100644 index 0000000000..a2e7966edb --- /dev/null +++ b/streaming/src/main/scala/spark/stream/WindowedRDS.scala @@ -0,0 +1,68 @@ +package spark.stream + +import spark.stream.SparkStreamContext._ + +import spark.RDD +import spark.UnionRDD +import spark.SparkContext._ + +import scala.collection.mutable.ArrayBuffer + +class WindowedRDS[T: ClassManifest]( + parent: RDS[T], + _windowTime: Time, + _slideTime: Time) + extends RDS[T](parent.ssc) { + + if (!_windowTime.isMultipleOf(parent.slideTime)) + throw new Exception("The window duration of WindowedRDS (" + _slideTime + ") " + + "must be multiple of the slide duration of parent RDS (" + parent.slideTime + ")") + + if (!_slideTime.isMultipleOf(parent.slideTime)) + throw new Exception("The slide duration of WindowedRDS (" + _slideTime + ") " + + "must be multiple of the slide duration of parent RDS (" + parent.slideTime + ")") + + val allowPartialWindows = true + + override def dependencies = List(parent) + + def windowTime: Time = _windowTime + + override def slideTime: Time = _slideTime + + override def compute(validTime: Time): Option[RDD[T]] = { + val parentRDDs = new ArrayBuffer[RDD[T]]() + val windowEndTime = validTime.copy() + val windowStartTime = if (allowPartialWindows && windowEndTime - windowTime < parent.zeroTime) { + parent.zeroTime + } else { + windowEndTime - windowTime + } + + logInfo("Window = " + windowStartTime + " - " + windowEndTime) + logInfo("Parent.zeroTime = " + parent.zeroTime) + + if (windowStartTime >= parent.zeroTime) { + // Walk back through time, from the 'windowEndTime' to 'windowStartTime' + // and get all parent RDDs from the parent RDS + var t = windowEndTime + while (t > windowStartTime) { + parent.getOrCompute(t) match { + case Some(rdd) => parentRDDs += rdd + case None => throw new Exception("Could not generate parent RDD for time " + t) + } + t -= parent.slideTime + } + } + + // Do a union of all parent RDDs to generate the new RDD + if (parentRDDs.size > 0) { + Some(new UnionRDD(ssc.sc, parentRDDs)) + } else { + None + } + } +} + + + diff --git a/streaming/src/main/scala/spark/stream/WordCount.scala b/streaming/src/main/scala/spark/stream/WordCount.scala new file mode 100644 index 0000000000..af825e46a8 --- /dev/null +++ b/streaming/src/main/scala/spark/stream/WordCount.scala @@ -0,0 +1,62 @@ +package spark.stream + +import SparkStreamContext._ + +import scala.util.Sorting + +import spark.SparkContext +import spark.storage.StorageLevel + +object WordCount { + var inputFile : String = null + var HDFS : String = null + var idealPartitions : Int = 0 + + def main (args: Array[String]) { + + if (args.length != 4) { + println ("Usage: WordCount <host> <HDFS> <Input file> <Ideal Partitions>") + System.exit(1) + } + + HDFS = args(1) + inputFile = HDFS + args(2) + idealPartitions = args(3).toInt + println ("Input file: " + inputFile) + + val ssc = new SparkStreamContext(args(0), "WordCountWindow") + + SparkContext.idealPartitions = idealPartitions + SparkContext.inputFile = inputFile + + val sentences = ssc.readNetworkStream[String]("Sentences", Array("localhost:55119"), 2000) + //sentences.print + + val words = sentences.flatMap(_.split(" ")) + + def add(v1: Int, v2: Int) = (v1 + v2) + def subtract(v1: Int, v2: Int) = (v1 - v2) + + //val windowedCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(add _, subtract _, Seconds(30), Milliseconds(2000), + // System.getProperty("spark.default.parallelism", "1").toInt) + //windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.DISK_AND_MEMORY_DESER_2, Seconds(5)) + //windowedCounts.print + + val parallelism = System.getProperty("spark.default.parallelism", "1").toInt + + //val localCounts = words.map(x => (x, 1)).reduceByKey(add _, parallelism) + //localCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2, Seconds(6)) + //val windowedCounts = localCounts.window(Seconds(30), Seconds(2)).reduceByKey(_ + _) + + val windowedCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(2), + parallelism) + windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2, Seconds(10)) + + //windowedCounts.print + windowedCounts.register + //windowedCounts.foreachRDD(rdd => rdd.collect.foreach(x => print(x+ " "))) + //windowedCounts.foreachRDD(rdd => rdd.collect.foreach(x => x)) + + ssc.run + } +} diff --git a/streaming/src/main/scala/spark/stream/WordCount1.scala b/streaming/src/main/scala/spark/stream/WordCount1.scala new file mode 100644 index 0000000000..501062b18d --- /dev/null +++ b/streaming/src/main/scala/spark/stream/WordCount1.scala @@ -0,0 +1,46 @@ +package spark.stream + +import SparkStreamContext._ + +import scala.util.Sorting + +import spark.SparkContext +import spark.storage.StorageLevel + +object WordCount1 { + var inputFile : String = null + var HDFS : String = null + var idealPartitions : Int = 0 + + def main (args: Array[String]) { + + if (args.length != 4) { + println ("Usage: WordCount <host> <HDFS> <Input file> <Ideal Partitions>") + System.exit(1) + } + + HDFS = args(1) + inputFile = HDFS + args(2) + idealPartitions = args(3).toInt + println ("Input file: " + inputFile) + + val ssc = new SparkStreamContext(args(0), "WordCountWindow") + + SparkContext.idealPartitions = idealPartitions + SparkContext.inputFile = inputFile + + val sentences = ssc.readNetworkStream[String]("Sentences", Array("localhost:55119"), 1000) + //sentences.print + + val words = sentences.flatMap(_.split(" ")) + + def add(v1: Int, v2: Int) = (v1 + v2) + def subtract(v1: Int, v2: Int) = (v1 - v2) + + val windowedCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(add _, subtract _, Seconds(10), Seconds(1), 10) + windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER, Seconds(1)) + windowedCounts.foreachRDD(_.collect) + + ssc.run + } +} diff --git a/streaming/src/main/scala/spark/stream/WordCount2.scala b/streaming/src/main/scala/spark/stream/WordCount2.scala new file mode 100644 index 0000000000..24324e891a --- /dev/null +++ b/streaming/src/main/scala/spark/stream/WordCount2.scala @@ -0,0 +1,55 @@ +package spark.stream + +import spark.SparkContext +import SparkContext._ +import SparkStreamContext._ + +import spark.storage.StorageLevel + +import scala.util.Sorting + +object WordCount2 { + + def moreWarmup(sc: SparkContext) { + (0 until 20).foreach {i => + sc.parallelize(1 to 20000000, 500) + .map(_ % 100).map(_.toString) + .map(x => (x, 1)).reduceByKey(_ + _, 10) + .collect() + } + } + + def main (args: Array[String]) { + + if (args.length < 2) { + println ("Usage: SparkStreamContext <host> <# sentence streams>") + System.exit(1) + } + + val ssc = new SparkStreamContext(args(0), "WordCount2") + + val numSentenceStreams = if (args.length > 1) args(1).toInt else 1 + if (args.length > 2) { + ssc.setTempDir(args(2)) + } + + GrepCount2.warmConnectionManagers(ssc.sc) + /*moreWarmup(ssc.sc)*/ + + val sentences = new UnifiedRDS( + (1 to numSentenceStreams).map(i => ssc.readTestStream("Sentences-" + i, 1000)).toArray + ) + + val words = sentences.flatMap(_.split(" ")) + + def add(v1: Int, v2: Int) = (v1 + v2) + def subtract(v1: Int, v2: Int) = (v1 - v2) + + val windowedCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(add _, subtract _, Seconds(10), Seconds(1), 6) + windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER, Seconds(1)) + windowedCounts.foreachRDD(_.collect) + + ssc.run + } +} + diff --git a/streaming/src/main/scala/spark/stream/WordCount2_Special.scala b/streaming/src/main/scala/spark/stream/WordCount2_Special.scala new file mode 100644 index 0000000000..c6b1aaa57e --- /dev/null +++ b/streaming/src/main/scala/spark/stream/WordCount2_Special.scala @@ -0,0 +1,94 @@ +package spark.stream + +import spark.SparkContext +import SparkContext._ +import SparkStreamContext._ + +import spark.storage.StorageLevel + +import scala.util.Sorting +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.HashMap +import scala.collection.mutable.Queue +import scala.collection.JavaConversions.mapAsScalaMap + +import java.lang.{Long => JLong} +import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap} + + +object WordCount2_ExtraFunctions { + + def add(v1: JLong, v2: JLong) = (v1 + v2) + + def subtract(v1: JLong, v2: JLong) = (v1 - v2) + + def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, JLong)] = { + val map = new java.util.HashMap[String, JLong] + var i = 0 + var j = 0 + while (iter.hasNext) { + val s = iter.next() + i = 0 + while (i < s.length) { + j = i + while (j < s.length && s.charAt(j) != ' ') { + j += 1 + } + if (j > i) { + val w = s.substring(i, j) + val c = map.get(w) + if (c == null) { + map.put(w, 1) + } else { + map.put(w, c + 1) + } + } + i = j + while (i < s.length && s.charAt(i) == ' ') { + i += 1 + } + } + } + map.toIterator + } +} + +object WordCount2_Special { + + def moreWarmup(sc: SparkContext) { + (0 until 40).foreach {i => + sc.parallelize(1 to 20000000, 1000) + .map(_ % 1331).map(_.toString) + .mapPartitions(WordCount2_ExtraFunctions.splitAndCountPartitions).reduceByKey(_ + _, 10) + .collect() + } + } + + def main (args: Array[String]) { + + if (args.length < 2) { + println ("Usage: SparkStreamContext <host> <# sentence streams>") + System.exit(1) + } + + val ssc = new SparkStreamContext(args(0), "WordCount2") + + val numSentenceStreams = if (args.length > 1) args(1).toInt else 1 + + GrepCount2.warmConnectionManagers(ssc.sc) + /*moreWarmup(ssc.sc)*/ + + val sentences = new UnifiedRDS( + (1 to numSentenceStreams).map(i => ssc.readTestStream("Sentences-" + i, 500)).toArray + ) + + val windowedCounts = sentences + .mapPartitions(WordCount2_ExtraFunctions.splitAndCountPartitions) + .reduceByKeyAndWindow(WordCount2_ExtraFunctions.add _, WordCount2_ExtraFunctions.subtract _, Seconds(10), Milliseconds(500), 10) + windowedCounts.persist(StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY, Milliseconds(500)) + windowedCounts.foreachRDD(_.collect) + + ssc.run + } +} + diff --git a/streaming/src/main/scala/spark/stream/WordCount3.scala b/streaming/src/main/scala/spark/stream/WordCount3.scala new file mode 100644 index 0000000000..455a8c9dbf --- /dev/null +++ b/streaming/src/main/scala/spark/stream/WordCount3.scala @@ -0,0 +1,49 @@ +package spark.stream + +import SparkStreamContext._ + +import scala.util.Sorting + +object WordCount3 { + + def main (args: Array[String]) { + + if (args.length < 1) { + println ("Usage: SparkStreamContext <host> [<temp directory>]") + System.exit(1) + } + + val ssc = new SparkStreamContext(args(0), "WordCount") + if (args.length > 1) { + ssc.setTempDir(args(1)) + } + val sentences = ssc.readNetworkStream[String]("Sentences", Array("localhost:55119"), 1000) + /*sentences.print*/ + + val words = sentences.flatMap(_.split(" ")) + + def add(v1: Int, v2: Int) = (v1 + v2) + def subtract(v1: Int, v2: Int) = (v1 - v2) + + /*val windowedCounts = words.map(x => (x, 1)).window(Seconds(5), Seconds(1)).reduceByKey(add _, 1)*/ + val windowedCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(add _, subtract _, Seconds(5), Seconds(1), 1) + /*windowedCounts.print */ + + def topK(data: Seq[(String, Int)], k: Int): Array[(String, Int)] = { + implicit val countOrdering = new Ordering[(String, Int)] { + override def compare(count1: (String, Int), count2: (String, Int)): Int = { + count2._2 - count1._2 + } + } + val array = data.toArray + Sorting.quickSort(array) + array.take(k) + } + + val k = 10 + val topKWindowedCounts = windowedCounts.glom.flatMap(topK(_, k)).collect.flatMap(topK(_, k)) + topKWindowedCounts.print + + ssc.run + } +} diff --git a/streaming/src/main/scala/spark/stream/WordCountEc2.scala b/streaming/src/main/scala/spark/stream/WordCountEc2.scala new file mode 100644 index 0000000000..5b10026d7a --- /dev/null +++ b/streaming/src/main/scala/spark/stream/WordCountEc2.scala @@ -0,0 +1,41 @@ +package spark.stream + +import SparkStreamContext._ +import spark.SparkContext + +object WordCountEc2 { + var inputFile : String = null + var HDFS : String = null + var idealPartitions : Int = 0 + + def main (args: Array[String]) { + + if (args.length != 4) { + println ("Usage: SparkStreamContext <host> <HDFS> <Input file> <Ideal Partitions>") + System.exit(1) + } + + HDFS = args(1) + inputFile = HDFS + args(2) + idealPartitions = args(3).toInt + println ("Input file: " + inputFile) + + SparkContext.idealPartitions = idealPartitions + SparkContext.inputFile = inputFile + + val ssc = new SparkStreamContext(args(0), "Test") + val sentences = ssc.readNetworkStream[String]("Sentences", Array("localhost:55119"), 1000) + /*sentences.foreach(println)*/ + + val words = sentences.flatMap(_.split(" ")) + /*words.foreach(println)*/ + + val counts = words.map(x => (x, 1)).reduceByKey(_ + _) + /*counts.foreach(println)*/ + + counts.foreachRDD(rdd => rdd.collect.foreach(x => x)) + /*counts.register*/ + + ssc.run + } +} diff --git a/streaming/src/main/scala/spark/stream/WordCountTrivialWindow.scala b/streaming/src/main/scala/spark/stream/WordCountTrivialWindow.scala new file mode 100644 index 0000000000..5469df71e9 --- /dev/null +++ b/streaming/src/main/scala/spark/stream/WordCountTrivialWindow.scala @@ -0,0 +1,51 @@ +package spark.stream + +import SparkStreamContext._ + +import scala.util.Sorting + +object WordCountTrivialWindow { + + def main (args: Array[String]) { + + if (args.length < 1) { + println ("Usage: SparkStreamContext <host> [<temp directory>]") + System.exit(1) + } + + val ssc = new SparkStreamContext(args(0), "WordCountTrivialWindow") + if (args.length > 1) { + ssc.setTempDir(args(1)) + } + val sentences = ssc.readNetworkStream[String]("Sentences", Array("localhost:55119"), 1000) + /*sentences.print*/ + + val words = sentences.flatMap(_.split(" ")) + + /*val counts = words.map(x => (x, 1)).reduceByKey(_ + _, 1)*/ + /*counts.print*/ + + def add(v1: Int, v2: Int) = (v1 + v2) + def subtract(v1: Int, v2: Int) = (v1 - v2) + + val windowedCounts = words.map(x => (x, 1)).window(Seconds(5), Seconds(1)).reduceByKey(add _, 1) + /*windowedCounts.print */ + + def topK(data: Seq[(String, Int)], k: Int): Array[(String, Int)] = { + implicit val countOrdering = new Ordering[(String, Int)] { + override def compare(count1: (String, Int), count2: (String, Int)): Int = { + count2._2 - count1._2 + } + } + val array = data.toArray + Sorting.quickSort(array) + array.take(k) + } + + val k = 10 + val topKWindowedCounts = windowedCounts.glom.flatMap(topK(_, k)).collect.flatMap(topK(_, k)) + topKWindowedCounts.print + + ssc.run + } +} diff --git a/streaming/src/main/scala/spark/stream/WordMax.scala b/streaming/src/main/scala/spark/stream/WordMax.scala new file mode 100644 index 0000000000..fc075e6d9d --- /dev/null +++ b/streaming/src/main/scala/spark/stream/WordMax.scala @@ -0,0 +1,64 @@ +package spark.stream + +import SparkStreamContext._ + +import scala.util.Sorting + +import spark.SparkContext +import spark.storage.StorageLevel + +object WordMax { + var inputFile : String = null + var HDFS : String = null + var idealPartitions : Int = 0 + + def main (args: Array[String]) { + + if (args.length != 4) { + println ("Usage: WordCount <host> <HDFS> <Input file> <Ideal Partitions>") + System.exit(1) + } + + HDFS = args(1) + inputFile = HDFS + args(2) + idealPartitions = args(3).toInt + println ("Input file: " + inputFile) + + val ssc = new SparkStreamContext(args(0), "WordCountWindow") + + SparkContext.idealPartitions = idealPartitions + SparkContext.inputFile = inputFile + + val sentences = ssc.readNetworkStream[String]("Sentences", Array("localhost:55119"), 2000) + //sentences.print + + val words = sentences.flatMap(_.split(" ")) + + def add(v1: Int, v2: Int) = (v1 + v2) + def subtract(v1: Int, v2: Int) = (v1 - v2) + def max(v1: Int, v2: Int) = (if (v1 > v2) v1 else v2) + + //val windowedCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(add _, subtract _, Seconds(30), Milliseconds(2000), + // System.getProperty("spark.default.parallelism", "1").toInt) + //windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.DISK_AND_MEMORY_DESER_2, Seconds(5)) + //windowedCounts.print + + val parallelism = System.getProperty("spark.default.parallelism", "1").toInt + + val localCounts = words.map(x => (x, 1)).reduceByKey(add _, parallelism) + //localCounts.persist(StorageLevel.MEMORY_ONLY_DESER) + localCounts.persist(StorageLevel.MEMORY_ONLY_DESER_2) + val windowedCounts = localCounts.window(Seconds(30), Seconds(2)).reduceByKey(max _) + + //val windowedCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(2), + // parallelism) + //windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER_2, Seconds(6)) + + //windowedCounts.print + windowedCounts.register + //windowedCounts.foreachRDD(rdd => rdd.collect.foreach(x => print(x+ " "))) + //windowedCounts.foreachRDD(rdd => rdd.collect.foreach(x => x)) + + ssc.run + } +} |