diff options
author | Denny <dennybritz@gmail.com> | 2012-11-12 19:39:29 -0800 |
---|---|---|
committer | Denny <dennybritz@gmail.com> | 2012-11-12 19:39:29 -0800 |
commit | 255b3e44c18e64a55afb184f39746780b391a496 (patch) | |
tree | 479fedcfdbf7e68bc6dc73188421a49fbc356a82 /streaming | |
parent | 0fd4c93f1c349f052f633fea64f975d53976bd9c (diff) | |
parent | 564dd8c3f415746a68f05bde6ea2a0e7a7760b4c (diff) | |
download | spark-255b3e44c18e64a55afb184f39746780b391a496.tar.gz spark-255b3e44c18e64a55afb184f39746780b391a496.tar.bz2 spark-255b3e44c18e64a55afb184f39746780b391a496.zip |
Merge branch 'dev' into kafka
Diffstat (limited to 'streaming')
26 files changed, 331 insertions, 2055 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala index b8324d11a3..e8bbf7d1c0 100644 --- a/streaming/src/main/scala/spark/streaming/DStream.scala +++ b/streaming/src/main/scala/spark/streaming/DStream.scala @@ -82,7 +82,7 @@ extends Serializable with Logging { this } - def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY) + def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER) // Turn on the default caching level for this RDD def cache(): DStream[T] = persist() diff --git a/streaming/src/main/scala/spark/streaming/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/FileInputDStream.scala index 9d7361097b..88856364d2 100644 --- a/streaming/src/main/scala/spark/streaming/FileInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/FileInputDStream.scala @@ -6,7 +6,8 @@ import spark.rdd.UnionRDD import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat} -import java.io.{ObjectInputStream, IOException} + +import scala.collection.mutable.HashSet class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest]( @@ -19,7 +20,8 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K @transient private var path_ : Path = null @transient private var fs_ : FileSystem = null - var lastModTime: Long = 0 + var lastModTime = 0L + val lastModTimeFiles = new HashSet[String]() def path(): Path = { if (path_ == null) path_ = new Path(directory) @@ -40,22 +42,37 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K } override def stop() { } - + + /** + * Finds the files that were modified since the last time this method was called and makes + * a union RDD out of them. Note that this maintains the list of files that were processed + * in the latest modification time in the previous call to this method. This is because the + * modification time returned by the FileStatus API seems to return times only at the + * granularity of seconds. Hence, new files may have the same modification time as the + * latest modification time in the previous call to this method and the list of files + * maintained is used to filter the one that have been processed. + */ override def compute(validTime: Time): Option[RDD[(K, V)]] = { + // Create the filter for selecting new files val newFilter = new PathFilter() { var latestModTime = 0L - + val latestModTimeFiles = new HashSet[String]() + def accept(path: Path): Boolean = { if (!filter.accept(path)) { return false } else { val modTime = fs.getFileStatus(path).getModificationTime() - if (modTime <= lastModTime) { + if (modTime < lastModTime){ + return false + } else if (modTime == lastModTime && lastModTimeFiles.contains(path.toString)) { return false } if (modTime > latestModTime) { latestModTime = modTime + latestModTimeFiles.clear() } + latestModTimeFiles += path.toString return true } } @@ -64,7 +81,12 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K val newFiles = fs.listStatus(path, newFilter) logInfo("New files: " + newFiles.map(_.getPath).mkString(", ")) if (newFiles.length > 0) { - lastModTime = newFilter.latestModTime + // Update the modification time and the files processed for that modification time + if (lastModTime != newFilter.latestModTime) { + lastModTime = newFilter.latestModTime + lastModTimeFiles.clear() + } + lastModTimeFiles ++= newFilter.latestModTimeFiles } val newRDD = new UnionRDD(ssc.sc, newFiles.map( file => ssc.sc.newAPIHadoopFile[K, V, F](file.getPath.toString))) diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala index 4d9346edd8..4c42692295 100644 --- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala +++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala @@ -47,6 +47,7 @@ class NetworkInputTracker( val result = queue.synchronized { queue.dequeueAll(x => true) } + logInfo("Stream " + receiverId + " received " + result.size + " blocks") result.toArray } diff --git a/streaming/src/main/scala/spark/streaming/RawInputDStream.scala b/streaming/src/main/scala/spark/streaming/RawInputDStream.scala index 90d8528d5b..d5db8e787d 100644 --- a/streaming/src/main/scala/spark/streaming/RawInputDStream.scala +++ b/streaming/src/main/scala/spark/streaming/RawInputDStream.scala @@ -69,7 +69,7 @@ class RawNetworkReceiver(streamId: Int, host: String, port: Int, storageLevel: S } def onStop() { - blockPushingThread.interrupt() + if (blockPushingThread != null) blockPushingThread.interrupt() } /** Read a buffer fully from a given Channel */ diff --git a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala index 6df82c0df3..b07d51fa6b 100644 --- a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala +++ b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala @@ -31,10 +31,14 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( "must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")" ) - super.persist(StorageLevel.MEMORY_ONLY) - + // Reduce each batch of data using reduceByKey which will be further reduced by window + // by ReducedWindowedDStream val reducedStream = parent.reduceByKey(reduceFunc, partitioner) + // Persist RDDs to memory by default as these RDDs are going to be reused. + super.persist(StorageLevel.MEMORY_ONLY_SER) + reducedStream.persist(StorageLevel.MEMORY_ONLY_SER) + def windowTime: Time = _windowTime override def dependencies = List(reducedStream) @@ -57,13 +61,6 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest]( this } - protected[streaming] override def setRememberDuration(time: Time) { - if (rememberDuration == null || rememberDuration < time) { - rememberDuration = time - dependencies.foreach(_.setRememberDuration(rememberDuration + windowTime)) - } - } - override def compute(validTime: Time): Option[RDD[(K, V)]] = { val reduceF = reduceFunc val invReduceF = invReduceFunc diff --git a/streaming/src/main/scala/spark/streaming/StateDStream.scala b/streaming/src/main/scala/spark/streaming/StateDStream.scala index 0211df1343..cb261808f5 100644 --- a/streaming/src/main/scala/spark/streaming/StateDStream.scala +++ b/streaming/src/main/scala/spark/streaming/StateDStream.scala @@ -23,7 +23,7 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManife rememberPartitioner: Boolean ) extends DStream[(K, S)](parent.ssc) { - super.persist(StorageLevel.MEMORY_ONLY) + super.persist(StorageLevel.MEMORY_ONLY_SER) override def dependencies = List(parent) diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala index e87d0cb7c8..4cba525f86 100644 --- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala @@ -149,7 +149,7 @@ final class StreamingContext ( def rawNetworkStream[T: ClassManifest]( hostname: String, port: Int, - storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2 + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): DStream[T] = { val inputStream = new RawInputDStream[T](this, hostname, port, storageLevel) graph.addInputStream(inputStream) @@ -157,7 +157,7 @@ final class StreamingContext ( } /** - * This function creates a input stream that monitors a Hadoop-compatible + * This function creates a input stream that monitors a Hadoop-compatible filesystem * for new files and executes the necessary processing on them. */ def fileStream[ diff --git a/streaming/src/main/scala/spark/streaming/examples/Grep2.scala b/streaming/src/main/scala/spark/streaming/examples/Grep2.scala deleted file mode 100644 index b1faa65c17..0000000000 --- a/streaming/src/main/scala/spark/streaming/examples/Grep2.scala +++ /dev/null @@ -1,64 +0,0 @@ -package spark.streaming.examples - -import spark.SparkContext -import SparkContext._ -import spark.streaming._ -import StreamingContext._ - -import spark.storage.StorageLevel - -import scala.util.Sorting -import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.HashMap -import scala.collection.mutable.Queue -import scala.collection.JavaConversions.mapAsScalaMap - -import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap} - - -object Grep2 { - - def warmup(sc: SparkContext) { - (0 until 10).foreach {i => - sc.parallelize(1 to 20000000, 1000) - .map(x => (x % 337, x % 1331)) - .reduceByKey(_ + _) - .count() - } - } - - def main (args: Array[String]) { - - if (args.length != 6) { - println ("Usage: Grep2 <host> <file> <mapTasks> <reduceTasks> <batchMillis> <chkptMillis>") - System.exit(1) - } - - val Array(master, file, mapTasks, reduceTasks, batchMillis, chkptMillis) = args - - val batchDuration = Milliseconds(batchMillis.toLong) - - val ssc = new StreamingContext(master, "Grep2") - ssc.setBatchDuration(batchDuration) - - //warmup(ssc.sc) - - val data = ssc.sc.textFile(file, mapTasks.toInt).persist( - new StorageLevel(false, true, false, 3)) // Memory only, serialized, 3 replicas - println("Data count: " + data.count()) - println("Data count: " + data.count()) - println("Data count: " + data.count()) - - val sentences = new ConstantInputDStream(ssc, data) - ssc.registerInputStream(sentences) - - sentences.filter(_.contains("Culpepper")).count().foreachRDD(r => - println("Grep count: " + r.collect().mkString)) - - ssc.start() - - while(true) { Thread.sleep(1000) } - } -} - - diff --git a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala index b1e1a613fe..ffbea6e55d 100644 --- a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala +++ b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala @@ -2,8 +2,10 @@ package spark.streaming.examples import spark.util.IntParam import spark.storage.StorageLevel + import spark.streaming._ import spark.streaming.StreamingContext._ +import spark.streaming.util.RawTextHelper._ object GrepRaw { def main(args: Array[String]) { @@ -17,16 +19,13 @@ object GrepRaw { // Create the context and set the batch size val ssc = new StreamingContext(master, "GrepRaw") ssc.setBatchDuration(Milliseconds(batchMillis)) + warmUp(ssc.sc) - // Make sure some tasks have started on each node - ssc.sc.parallelize(1 to 1000, 1000).count() - ssc.sc.parallelize(1 to 1000, 1000).count() - ssc.sc.parallelize(1 to 1000, 1000).count() val rawStreams = (1 to numStreams).map(_ => - ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray + ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray val union = new UnionDStream(rawStreams) - union.filter(_.contains("Culpepper")).count().foreachRDD(r => + union.filter(_.contains("Alice")).count().foreachRDD(r => println("Grep count: " + r.collect().mkString)) ssc.start() } diff --git a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala index 750cb7445f..0411bde1a7 100644 --- a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala +++ b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala @@ -1,94 +1,50 @@ package spark.streaming.examples -import spark.util.IntParam -import spark.SparkContext -import spark.SparkContext._ import spark.storage.StorageLevel +import spark.util.IntParam + import spark.streaming._ import spark.streaming.StreamingContext._ +import spark.streaming.util.RawTextHelper._ -import WordCount2_ExtraFunctions._ +import java.util.UUID object TopKWordCountRaw { - def moreWarmup(sc: SparkContext) { - (0 until 40).foreach {i => - sc.parallelize(1 to 20000000, 1000) - .map(_ % 1331).map(_.toString) - .mapPartitions(splitAndCountPartitions).reduceByKey(_ + _, 10) - .collect() - } - } - + def main(args: Array[String]) { - if (args.length != 7) { - System.err.println("Usage: TopKWordCountRaw <master> <streams> <host> <port> <batchMs> <chkptMs> <reduces>") + if (args.length != 4) { + System.err.println("Usage: WordCountRaw <master> <# streams> <port> <HDFS checkpoint directory> ") System.exit(1) } - val Array(master, IntParam(streams), host, IntParam(port), IntParam(batchMs), - IntParam(chkptMs), IntParam(reduces)) = args - - // Create the context and set the batch size - val ssc = new StreamingContext(master, "TopKWordCountRaw") - ssc.setBatchDuration(Milliseconds(batchMs)) - - // Make sure some tasks have started on each node - moreWarmup(ssc.sc) - - val rawStreams = (1 to streams).map(_ => - ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray - val union = new UnionDStream(rawStreams) - - val windowedCounts = union.mapPartitions(splitAndCountPartitions) - .reduceByKeyAndWindow(add _, subtract _, Seconds(30), Milliseconds(batchMs), reduces) - windowedCounts.persist().checkpoint(Milliseconds(chkptMs)) - //.persist(StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY_2, Milliseconds(chkptMs)) - - def topK(data: Iterator[(String, Long)], k: Int): Iterator[(String, Long)] = { - val taken = new Array[(String, Long)](k) - - var i = 0 - var len = 0 - var done = false - var value: (String, Long) = null - var swap: (String, Long) = null - var count = 0 - - while(data.hasNext) { - value = data.next - count += 1 - println("count = " + count) - if (len == 0) { - taken(0) = value - len = 1 - } else if (len < k || value._2 > taken(len - 1)._2) { - if (len < k) { - len += 1 - } - taken(len - 1) = value - i = len - 1 - while(i > 0 && taken(i - 1)._2 < taken(i)._2) { - swap = taken(i) - taken(i) = taken(i-1) - taken(i - 1) = swap - i -= 1 - } - } - } - println("Took " + len + " out of " + count + " items") - return taken.toIterator - } + val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args + val k = 10 - val k = 50 + // Create the context, set the batch size and checkpoint directory. + // Checkpoint directory is necessary for achieving fault-tolerance, by saving counts + // periodically to HDFS + val ssc = new StreamingContext(master, "TopKWordCountRaw") + ssc.setBatchDuration(Seconds(1)) + ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1)) + + // Warm up the JVMs on master and slave for JIT compilation to kick in + /*warmUp(ssc.sc)*/ + + // Set up the raw network streams that will connect to localhost:port to raw test + // senders on the slaves and generate top K words of last 30 seconds + val lines = (1 to numStreams).map(_ => { + ssc.rawNetworkStream[String]("localhost", port, StorageLevel.MEMORY_ONLY_SER_2) + }) + val union = new UnionDStream(lines.toArray) + val counts = union.mapPartitions(splitAndCountPartitions) + val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10) val partialTopKWindowedCounts = windowedCounts.mapPartitions(topK(_, k)) partialTopKWindowedCounts.foreachRDD(rdd => { val collectedCounts = rdd.collect - println("Collected " + collectedCounts.size + " items") - topK(collectedCounts.toIterator, k).foreach(println) + println("Collected " + collectedCounts.size + " words from partial top words") + println("Top " + k + " words are " + topK(collectedCounts.toIterator, k).mkString(",")) }) -// windowedCounts.foreachRDD(r => println("Element count: " + r.count())) - ssc.start() } } diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala deleted file mode 100644 index 865026033e..0000000000 --- a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala +++ /dev/null @@ -1,114 +0,0 @@ -package spark.streaming.examples - -import spark.SparkContext -import SparkContext._ -import spark.streaming._ -import StreamingContext._ - -import spark.storage.StorageLevel - -import scala.util.Sorting -import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.HashMap -import scala.collection.mutable.Queue -import scala.collection.JavaConversions.mapAsScalaMap - -import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap} - - -object WordCount2_ExtraFunctions { - - def add(v1: Long, v2: Long) = (v1 + v2) - - def subtract(v1: Long, v2: Long) = (v1 - v2) - - def max(v1: Long, v2: Long) = math.max(v1, v2) - - def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, Long)] = { - //val map = new java.util.HashMap[String, Long] - val map = new OLMap[String] - var i = 0 - var j = 0 - while (iter.hasNext) { - val s = iter.next() - i = 0 - while (i < s.length) { - j = i - while (j < s.length && s.charAt(j) != ' ') { - j += 1 - } - if (j > i) { - val w = s.substring(i, j) - val c = map.getLong(w) - map.put(w, c + 1) -/* - if (c == null) { - map.put(w, 1) - } else { - map.put(w, c + 1) - } -*/ - } - i = j - while (i < s.length && s.charAt(i) == ' ') { - i += 1 - } - } - } - map.toIterator.map{case (k, v) => (k, v)} - } -} - -object WordCount2 { - - def warmup(sc: SparkContext) { - (0 until 3).foreach {i => - sc.parallelize(1 to 20000000, 500) - .map(x => (x % 337, x % 1331)) - .reduceByKey(_ + _, 100) - .count() - } - } - - def main (args: Array[String]) { - - if (args.length != 6) { - println ("Usage: WordCount2 <host> <file> <mapTasks> <reduceTasks> <batchMillis> <chkptMillis>") - System.exit(1) - } - - val Array(master, file, mapTasks, reduceTasks, batchMillis, chkptMillis) = args - - val batchDuration = Milliseconds(batchMillis.toLong) - - val ssc = new StreamingContext(master, "WordCount2") - ssc.setBatchDuration(batchDuration) - - //warmup(ssc.sc) - - val data = ssc.sc.textFile(file, mapTasks.toInt).persist( - new StorageLevel(false, true, false, 3)) // Memory only, serialized, 3 replicas - println("Data count: " + data.map(x => if (x == "") 1 else x.split(" ").size / x.split(" ").size).count()) - println("Data count: " + data.count()) - println("Data count: " + data.count()) - - val sentences = new ConstantInputDStream(ssc, data) - ssc.registerInputStream(sentences) - - import WordCount2_ExtraFunctions._ - - val windowedCounts = sentences - .mapPartitions(splitAndCountPartitions) - .reduceByKeyAndWindow(add _, subtract _, Seconds(30), batchDuration, reduceTasks.toInt) - - windowedCounts.persist().checkpoint(Milliseconds(chkptMillis.toLong)) - //.persist(StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY_2, Milliseconds(chkptMillis.toLong)) - windowedCounts.foreachRDD(r => println("Element count: " + r.count())) - - ssc.start() - - while(true) { Thread.sleep(1000) } - } -} - - diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala index d1ea9a9cd5..571428c0fe 100644 --- a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala +++ b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala @@ -1,50 +1,43 @@ package spark.streaming.examples -import spark.util.IntParam -import spark.SparkContext -import spark.SparkContext._ import spark.storage.StorageLevel +import spark.util.IntParam + import spark.streaming._ import spark.streaming.StreamingContext._ +import spark.streaming.util.RawTextHelper._ -import WordCount2_ExtraFunctions._ +import java.util.UUID object WordCountRaw { - def moreWarmup(sc: SparkContext) { - (0 until 40).foreach {i => - sc.parallelize(1 to 20000000, 1000) - .map(_ % 1331).map(_.toString) - .mapPartitions(splitAndCountPartitions).reduceByKey(_ + _, 10) - .collect() - } - } def main(args: Array[String]) { - if (args.length != 7) { - System.err.println("Usage: WordCountRaw <master> <streams> <host> <port> <batchMs> <chkptMs> <reduces>") + if (args.length != 4) { + System.err.println("Usage: WordCountRaw <master> <# streams> <port> <HDFS checkpoint directory> ") System.exit(1) } - val Array(master, IntParam(streams), host, IntParam(port), IntParam(batchMs), - IntParam(chkptMs), IntParam(reduces)) = args + val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args - // Create the context and set the batch size + // Create the context, set the batch size and checkpoint directory. + // Checkpoint directory is necessary for achieving fault-tolerance, by saving counts + // periodically to HDFS val ssc = new StreamingContext(master, "WordCountRaw") - ssc.setBatchDuration(Milliseconds(batchMs)) - - // Make sure some tasks have started on each node - moreWarmup(ssc.sc) - - val rawStreams = (1 to streams).map(_ => - ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray - val union = new UnionDStream(rawStreams) - - val windowedCounts = union.mapPartitions(splitAndCountPartitions) - .reduceByKeyAndWindow(add _, subtract _, Seconds(30), Milliseconds(batchMs), reduces) - windowedCounts.persist().checkpoint(chkptMs) - //.persist(StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY_2, Milliseconds(chkptMs)) - - windowedCounts.foreachRDD(r => println("Element count: " + r.count())) + ssc.setBatchDuration(Seconds(1)) + ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1)) + + // Warm up the JVMs on master and slave for JIT compilation to kick in + warmUp(ssc.sc) + + // Set up the raw network streams that will connect to localhost:port to raw test + // senders on the slaves and generate count of words of last 30 seconds + val lines = (1 to numStreams).map(_ => { + ssc.rawNetworkStream[String]("localhost", port, StorageLevel.MEMORY_ONLY_SER_2) + }) + val union = new UnionDStream(lines.toArray) + val counts = union.mapPartitions(splitAndCountPartitions) + val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10) + windowedCounts.foreachRDD(r => println("# unique words = " + r.count())) ssc.start() } diff --git a/streaming/src/main/scala/spark/streaming/examples/WordMax2.scala b/streaming/src/main/scala/spark/streaming/examples/WordMax2.scala deleted file mode 100644 index 6a9c8a9a69..0000000000 --- a/streaming/src/main/scala/spark/streaming/examples/WordMax2.scala +++ /dev/null @@ -1,75 +0,0 @@ -package spark.streaming.examples - -import spark.SparkContext -import SparkContext._ -import spark.streaming._ -import StreamingContext._ - -import spark.storage.StorageLevel - -import scala.util.Sorting -import scala.collection.mutable.ArrayBuffer -import scala.collection.mutable.HashMap -import scala.collection.mutable.Queue -import scala.collection.JavaConversions.mapAsScalaMap - -import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap} - - -object WordMax2 { - - def warmup(sc: SparkContext) { - (0 until 10).foreach {i => - sc.parallelize(1 to 20000000, 1000) - .map(x => (x % 337, x % 1331)) - .reduceByKey(_ + _) - .count() - } - } - - def main (args: Array[String]) { - - if (args.length != 6) { - println ("Usage: WordMax2 <host> <file> <mapTasks> <reduceTasks> <batchMillis> <chkptMillis>") - System.exit(1) - } - - val Array(master, file, mapTasks, reduceTasks, batchMillis, chkptMillis) = args - - val batchDuration = Milliseconds(batchMillis.toLong) - - val ssc = new StreamingContext(master, "WordMax2") - ssc.setBatchDuration(batchDuration) - - //warmup(ssc.sc) - - val data = ssc.sc.textFile(file, mapTasks.toInt).persist( - new StorageLevel(false, true, false, 3)) // Memory only, serialized, 3 replicas - println("Data count: " + data.count()) - println("Data count: " + data.count()) - println("Data count: " + data.count()) - - val sentences = new ConstantInputDStream(ssc, data) - ssc.registerInputStream(sentences) - - import WordCount2_ExtraFunctions._ - - val windowedCounts = sentences - .mapPartitions(splitAndCountPartitions) - .reduceByKey(add _, reduceTasks.toInt) - .persist() - .checkpoint(Milliseconds(chkptMillis.toLong)) - //.persist(StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY_2, Milliseconds(chkptMillis.toLong)) - .reduceByKeyAndWindow(max _, Seconds(10), batchDuration, reduceTasks.toInt) - .persist() - .checkpoint(Milliseconds(chkptMillis.toLong)) - //.persist(StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY_2, Milliseconds(chkptMillis.toLong)) - windowedCounts.foreachRDD(r => println("Element count: " + r.count())) - - ssc.start() - - while(true) { Thread.sleep(1000) } - } -} - - diff --git a/streaming/src/main/scala/spark/streaming/util/ConnectionHandler.scala b/streaming/src/main/scala/spark/streaming/util/ConnectionHandler.scala deleted file mode 100644 index cde868a0c9..0000000000 --- a/streaming/src/main/scala/spark/streaming/util/ConnectionHandler.scala +++ /dev/null @@ -1,157 +0,0 @@ -package spark.streaming.util - -import spark.Logging - -import scala.collection.mutable.{ArrayBuffer, SynchronizedQueue} - -import java.net._ -import java.io._ -import java.nio._ -import java.nio.charset._ -import java.nio.channels._ -import java.nio.channels.spi._ - -abstract class ConnectionHandler(host: String, port: Int, connect: Boolean) -extends Thread with Logging { - - val selector = SelectorProvider.provider.openSelector() - val interestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)] - - initLogging() - - override def run() { - try { - if (connect) { - connect() - } else { - listen() - } - - var interrupted = false - while(!interrupted) { - - preSelect() - - while(!interestChangeRequests.isEmpty) { - val (key, ops) = interestChangeRequests.dequeue - val lastOps = key.interestOps() - key.interestOps(ops) - - def intToOpStr(op: Int): String = { - val opStrs = new ArrayBuffer[String]() - if ((op & SelectionKey.OP_READ) != 0) opStrs += "READ" - if ((op & SelectionKey.OP_WRITE) != 0) opStrs += "WRITE" - if ((op & SelectionKey.OP_CONNECT) != 0) opStrs += "CONNECT" - if ((op & SelectionKey.OP_ACCEPT) != 0) opStrs += "ACCEPT" - if (opStrs.size > 0) opStrs.reduceLeft(_ + " | " + _) else " " - } - - logTrace("Changed ops from [" + intToOpStr(lastOps) + "] to [" + intToOpStr(ops) + "]") - } - - selector.select() - interrupted = Thread.currentThread.isInterrupted - - val selectedKeys = selector.selectedKeys().iterator() - while (selectedKeys.hasNext) { - val key = selectedKeys.next.asInstanceOf[SelectionKey] - selectedKeys.remove() - if (key.isValid) { - if (key.isAcceptable) { - accept(key) - } else if (key.isConnectable) { - finishConnect(key) - } else if (key.isReadable) { - read(key) - } else if (key.isWritable) { - write(key) - } - } - } - } - } catch { - case e: Exception => { - logError("Error in select loop", e) - } - } - } - - def connect() { - val socketAddress = new InetSocketAddress(host, port) - val channel = SocketChannel.open() - channel.configureBlocking(false) - channel.socket.setReuseAddress(true) - channel.socket.setTcpNoDelay(true) - channel.connect(socketAddress) - channel.register(selector, SelectionKey.OP_CONNECT) - logInfo("Initiating connection to [" + socketAddress + "]") - } - - def listen() { - val channel = ServerSocketChannel.open() - channel.configureBlocking(false) - channel.socket.setReuseAddress(true) - channel.socket.setReceiveBufferSize(256 * 1024) - channel.socket.bind(new InetSocketAddress(port)) - channel.register(selector, SelectionKey.OP_ACCEPT) - logInfo("Listening on port " + port) - } - - def finishConnect(key: SelectionKey) { - try { - val channel = key.channel.asInstanceOf[SocketChannel] - val address = channel.socket.getRemoteSocketAddress - channel.finishConnect() - logInfo("Connected to [" + host + ":" + port + "]") - ready(key) - } catch { - case e: IOException => { - logError("Error finishing connect to " + host + ":" + port) - close(key) - } - } - } - - def accept(key: SelectionKey) { - try { - val serverChannel = key.channel.asInstanceOf[ServerSocketChannel] - val channel = serverChannel.accept() - val address = channel.socket.getRemoteSocketAddress - channel.configureBlocking(false) - logInfo("Accepted connection from [" + address + "]") - ready(channel.register(selector, 0)) - } catch { - case e: IOException => { - logError("Error accepting connection", e) - } - } - } - - def changeInterest(key: SelectionKey, ops: Int) { - logTrace("Added request to change ops to " + ops) - interestChangeRequests += ((key, ops)) - } - - def ready(key: SelectionKey) - - def preSelect() { - } - - def read(key: SelectionKey) { - throw new UnsupportedOperationException("Cannot read on connection of type " + this.getClass.toString) - } - - def write(key: SelectionKey) { - throw new UnsupportedOperationException("Cannot write on connection of type " + this.getClass.toString) - } - - def close(key: SelectionKey) { - try { - key.channel.close() - key.cancel() - Thread.currentThread.interrupt - } catch { - case e: Exception => logError("Error closing connection", e) - } - } -} diff --git a/streaming/src/main/scala/spark/streaming/util/RawTextHelper.scala b/streaming/src/main/scala/spark/streaming/util/RawTextHelper.scala new file mode 100644 index 0000000000..f31ae39a16 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/util/RawTextHelper.scala @@ -0,0 +1,98 @@ +package spark.streaming.util + +import spark.SparkContext +import spark.SparkContext._ +import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap} +import scala.collection.JavaConversions.mapAsScalaMap + +object RawTextHelper { + + /** + * Splits lines and counts the words in them using specialized object-to-long hashmap + * (to avoid boxing-unboxing overhead of Long in java/scala HashMap) + */ + def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, Long)] = { + val map = new OLMap[String] + var i = 0 + var j = 0 + while (iter.hasNext) { + val s = iter.next() + i = 0 + while (i < s.length) { + j = i + while (j < s.length && s.charAt(j) != ' ') { + j += 1 + } + if (j > i) { + val w = s.substring(i, j) + val c = map.getLong(w) + map.put(w, c + 1) + } + i = j + while (i < s.length && s.charAt(i) == ' ') { + i += 1 + } + } + } + map.toIterator.map{case (k, v) => (k, v)} + } + + /** + * Gets the top k words in terms of word counts. Assumes that each word exists only once + * in the `data` iterator (that is, the counts have been reduced). + */ + def topK(data: Iterator[(String, Long)], k: Int): Iterator[(String, Long)] = { + val taken = new Array[(String, Long)](k) + + var i = 0 + var len = 0 + var done = false + var value: (String, Long) = null + var swap: (String, Long) = null + var count = 0 + + while(data.hasNext) { + value = data.next + if (value != null) { + count += 1 + if (len == 0) { + taken(0) = value + len = 1 + } else if (len < k || value._2 > taken(len - 1)._2) { + if (len < k) { + len += 1 + } + taken(len - 1) = value + i = len - 1 + while(i > 0 && taken(i - 1)._2 < taken(i)._2) { + swap = taken(i) + taken(i) = taken(i-1) + taken(i - 1) = swap + i -= 1 + } + } + } + } + return taken.toIterator + } + + /** + * Warms up the SparkContext in master and slave by running tasks to force JIT kick in + * before real workload starts. + */ + def warmUp(sc: SparkContext) { + for(i <- 0 to 4) { + sc.parallelize(1 to 200000, 1000) + .map(_ % 1331).map(_.toString) + .mapPartitions(splitAndCountPartitions).reduceByKey(_ + _, 10) + .count() + } + } + + def add(v1: Long, v2: Long) = (v1 + v2) + + def subtract(v1: Long, v2: Long) = (v1 - v2) + + def max(v1: Long, v2: Long) = math.max(v1, v2) +} + diff --git a/streaming/src/main/scala/spark/streaming/util/SenderReceiverTest.scala b/streaming/src/main/scala/spark/streaming/util/SenderReceiverTest.scala deleted file mode 100644 index 3922dfbad6..0000000000 --- a/streaming/src/main/scala/spark/streaming/util/SenderReceiverTest.scala +++ /dev/null @@ -1,67 +0,0 @@ -package spark.streaming.util - -import java.net.{Socket, ServerSocket} -import java.io.{ByteArrayOutputStream, DataOutputStream, DataInputStream, BufferedInputStream} - -object Receiver { - def main(args: Array[String]) { - val port = args(0).toInt - val lsocket = new ServerSocket(port) - println("Listening on port " + port ) - while(true) { - val socket = lsocket.accept() - (new Thread() { - override def run() { - val buffer = new Array[Byte](100000) - var count = 0 - val time = System.currentTimeMillis - try { - val is = new DataInputStream(new BufferedInputStream(socket.getInputStream)) - var loop = true - var string: String = null - do { - string = is.readUTF() - if (string != null) { - count += 28 - } - } while (string != null) - } catch { - case e: Exception => e.printStackTrace() - } - val timeTaken = System.currentTimeMillis - time - val tput = (count / 1024.0) / (timeTaken / 1000.0) - println("Data = " + count + " bytes\nTime = " + timeTaken + " ms\nTput = " + tput + " KB/s") - } - }).start() - } - } - -} - -object Sender { - - def main(args: Array[String]) { - try { - val host = args(0) - val port = args(1).toInt - val size = args(2).toInt - - val byteStream = new ByteArrayOutputStream() - val stringDataStream = new DataOutputStream(byteStream) - (0 until size).foreach(_ => stringDataStream.writeUTF("abcdedfghijklmnopqrstuvwxy")) - val bytes = byteStream.toByteArray() - println("Generated array of " + bytes.length + " bytes") - - /*val bytes = new Array[Byte](size)*/ - val socket = new Socket(host, port) - val os = socket.getOutputStream - os.write(bytes) - os.flush - socket.close() - - } catch { - case e: Exception => e.printStackTrace - } - } -} - diff --git a/streaming/src/main/scala/spark/streaming/util/SentenceFileGenerator.scala b/streaming/src/main/scala/spark/streaming/util/SentenceFileGenerator.scala deleted file mode 100644 index 94e8f7a849..0000000000 --- a/streaming/src/main/scala/spark/streaming/util/SentenceFileGenerator.scala +++ /dev/null @@ -1,92 +0,0 @@ -package spark.streaming.util - -import spark._ - -import scala.collection.mutable.ArrayBuffer -import scala.util.Random -import scala.io.Source - -import java.net.InetSocketAddress - -import org.apache.hadoop.fs._ -import org.apache.hadoop.conf._ -import org.apache.hadoop.io._ -import org.apache.hadoop.mapred._ -import org.apache.hadoop.util._ - -object SentenceFileGenerator { - - def printUsage () { - println ("Usage: SentenceFileGenerator <master> <target directory> <# partitions> <sentence file> [<sentences per second>]") - System.exit(0) - } - - def main (args: Array[String]) { - if (args.length < 4) { - printUsage - } - - val master = args(0) - val fs = new Path(args(1)).getFileSystem(new Configuration()) - val targetDirectory = new Path(args(1)).makeQualified(fs) - val numPartitions = args(2).toInt - val sentenceFile = args(3) - val sentencesPerSecond = { - if (args.length > 4) args(4).toInt - else 10 - } - - val source = Source.fromFile(sentenceFile) - val lines = source.mkString.split ("\n").toArray - source.close () - println("Read " + lines.length + " lines from file " + sentenceFile) - - val sentences = { - val buffer = ArrayBuffer[String]() - val random = new Random() - var i = 0 - while (i < sentencesPerSecond) { - buffer += lines(random.nextInt(lines.length)) - i += 1 - } - buffer.toArray - } - println("Generated " + sentences.length + " sentences") - - val sc = new SparkContext(master, "SentenceFileGenerator") - val sentencesRDD = sc.parallelize(sentences, numPartitions) - - val tempDirectory = new Path(targetDirectory, "_tmp") - - fs.mkdirs(targetDirectory) - fs.mkdirs(tempDirectory) - - var saveTimeMillis = System.currentTimeMillis - try { - while (true) { - val newDir = new Path(targetDirectory, "Sentences-" + saveTimeMillis) - val tmpNewDir = new Path(tempDirectory, "Sentences-" + saveTimeMillis) - println("Writing to file " + newDir) - sentencesRDD.saveAsTextFile(tmpNewDir.toString) - fs.rename(tmpNewDir, newDir) - saveTimeMillis += 1000 - val sleepTimeMillis = { - val currentTimeMillis = System.currentTimeMillis - if (saveTimeMillis < currentTimeMillis) { - 0 - } else { - saveTimeMillis - currentTimeMillis - } - } - println("Sleeping for " + sleepTimeMillis + " ms") - Thread.sleep(sleepTimeMillis) - } - } catch { - case e: Exception => - } - } -} - - - - diff --git a/streaming/src/main/scala/spark/streaming/util/ShuffleTest.scala b/streaming/src/main/scala/spark/streaming/util/ShuffleTest.scala deleted file mode 100644 index 60085f4f88..0000000000 --- a/streaming/src/main/scala/spark/streaming/util/ShuffleTest.scala +++ /dev/null @@ -1,23 +0,0 @@ -package spark.streaming.util - -import spark.SparkContext -import SparkContext._ - -object ShuffleTest { - def main(args: Array[String]) { - - if (args.length < 1) { - println ("Usage: ShuffleTest <host>") - System.exit(1) - } - - val sc = new spark.SparkContext(args(0), "ShuffleTest") - val rdd = sc.parallelize(1 to 1000, 500).cache - - def time(f: => Unit) { val start = System.nanoTime; f; println((System.nanoTime - start) * 1.0e-6) } - - time { for (i <- 0 until 50) time { rdd.map(x => (x % 100, x)).reduceByKey(_ + _, 10).count } } - System.exit(0) - } -} - diff --git a/streaming/src/main/scala/spark/streaming/util/TestGenerator.scala b/streaming/src/main/scala/spark/streaming/util/TestGenerator.scala deleted file mode 100644 index 23e9235c60..0000000000 --- a/streaming/src/main/scala/spark/streaming/util/TestGenerator.scala +++ /dev/null @@ -1,107 +0,0 @@ -package spark.streaming.util - -import scala.util.Random -import scala.io.Source -import scala.actors._ -import scala.actors.Actor._ -import scala.actors.remote._ -import scala.actors.remote.RemoteActor._ - -import java.net.InetSocketAddress - - -object TestGenerator { - - def printUsage { - println ("Usage: SentenceGenerator <target IP> <target port> <sentence file> [<sentences per second>]") - System.exit(0) - } - /* - def generateRandomSentences(lines: Array[String], sentencesPerSecond: Int, streamReceiver: AbstractActor) { - val sleepBetweenSentences = 1000.0 / sentencesPerSecond.toDouble - 1 - val random = new Random () - - try { - var lastPrintTime = System.currentTimeMillis() - var count = 0 - while(true) { - streamReceiver ! lines(random.nextInt(lines.length)) - count += 1 - if (System.currentTimeMillis - lastPrintTime >= 1000) { - println (count + " sentences sent last second") - count = 0 - lastPrintTime = System.currentTimeMillis - } - Thread.sleep(sleepBetweenSentences.toLong) - } - } catch { - case e: Exception => - } - }*/ - - def generateSameSentences(lines: Array[String], sentencesPerSecond: Int, streamReceiver: AbstractActor) { - try { - val numSentences = if (sentencesPerSecond <= 0) { - lines.length - } else { - sentencesPerSecond - } - val sentences = lines.take(numSentences).toArray - - var nextSendingTime = System.currentTimeMillis() - val sendAsArray = true - while(true) { - if (sendAsArray) { - println("Sending as array") - streamReceiver !? sentences - } else { - println("Sending individually") - sentences.foreach(sentence => { - streamReceiver !? sentence - }) - } - println ("Sent " + numSentences + " sentences in " + (System.currentTimeMillis - nextSendingTime) + " ms") - nextSendingTime += 1000 - val sleepTime = nextSendingTime - System.currentTimeMillis - if (sleepTime > 0) { - println ("Sleeping for " + sleepTime + " ms") - Thread.sleep(sleepTime) - } - } - } catch { - case e: Exception => - } - } - - def main(args: Array[String]) { - if (args.length < 3) { - printUsage - } - - val generateRandomly = false - - val streamReceiverIP = args(0) - val streamReceiverPort = args(1).toInt - val sentenceFile = args(2) - val sentencesPerSecond = if (args.length > 3) args(3).toInt else 10 - val sentenceInputName = if (args.length > 4) args(4) else "Sentences" - - println("Sending " + sentencesPerSecond + " sentences per second to " + - streamReceiverIP + ":" + streamReceiverPort + "/NetworkStreamReceiver-" + sentenceInputName) - val source = Source.fromFile(sentenceFile) - val lines = source.mkString.split ("\n") - source.close () - - val streamReceiver = select( - Node(streamReceiverIP, streamReceiverPort), - Symbol("NetworkStreamReceiver-" + sentenceInputName)) - if (generateRandomly) { - /*generateRandomSentences(lines, sentencesPerSecond, streamReceiver)*/ - } else { - generateSameSentences(lines, sentencesPerSecond, streamReceiver) - } - } -} - - - diff --git a/streaming/src/main/scala/spark/streaming/util/TestGenerator2.scala b/streaming/src/main/scala/spark/streaming/util/TestGenerator2.scala deleted file mode 100644 index ff840d084f..0000000000 --- a/streaming/src/main/scala/spark/streaming/util/TestGenerator2.scala +++ /dev/null @@ -1,119 +0,0 @@ -package spark.streaming.util - -import scala.util.Random -import scala.io.Source -import scala.actors._ -import scala.actors.Actor._ -import scala.actors.remote._ -import scala.actors.remote.RemoteActor._ - -import java.io.{DataOutputStream, ByteArrayOutputStream, DataInputStream} -import java.net.Socket - -object TestGenerator2 { - - def printUsage { - println ("Usage: SentenceGenerator <target IP> <target port> <sentence file> [<sentences per second>]") - System.exit(0) - } - - def sendSentences(streamReceiverHost: String, streamReceiverPort: Int, numSentences: Int, bytes: Array[Byte], intervalTime: Long){ - try { - println("Connecting to " + streamReceiverHost + ":" + streamReceiverPort) - val socket = new Socket(streamReceiverHost, streamReceiverPort) - - println("Sending " + numSentences+ " sentences / " + (bytes.length / 1024.0 / 1024.0) + " MB per " + intervalTime + " ms to " + streamReceiverHost + ":" + streamReceiverPort ) - val currentTime = System.currentTimeMillis - var targetTime = (currentTime / intervalTime + 1).toLong * intervalTime - Thread.sleep(targetTime - currentTime) - - while(true) { - val startTime = System.currentTimeMillis() - println("Sending at " + startTime + " ms with delay of " + (startTime - targetTime) + " ms") - val socketOutputStream = socket.getOutputStream - val parts = 10 - (0 until parts).foreach(i => { - val partStartTime = System.currentTimeMillis - - val offset = (i * bytes.length / parts).toInt - val len = math.min(((i + 1) * bytes.length / parts).toInt - offset, bytes.length) - socketOutputStream.write(bytes, offset, len) - socketOutputStream.flush() - val partFinishTime = System.currentTimeMillis - println("Sending part " + i + " of " + len + " bytes took " + (partFinishTime - partStartTime) + " ms") - val sleepTime = math.max(0, 1000 / parts - (partFinishTime - partStartTime) - 1) - Thread.sleep(sleepTime) - }) - - socketOutputStream.flush() - /*val socketInputStream = new DataInputStream(socket.getInputStream)*/ - /*val reply = socketInputStream.readUTF()*/ - val finishTime = System.currentTimeMillis() - println ("Sent " + bytes.length + " bytes in " + (finishTime - startTime) + " ms for interval [" + targetTime + ", " + (targetTime + intervalTime) + "]") - /*println("Received = " + reply)*/ - targetTime = targetTime + intervalTime - val sleepTime = (targetTime - finishTime) + 10 - if (sleepTime > 0) { - println("Sleeping for " + sleepTime + " ms") - Thread.sleep(sleepTime) - } else { - println("############################") - println("###### Skipping sleep ######") - println("############################") - } - } - } catch { - case e: Exception => println(e) - } - println("Stopped sending") - } - - def main(args: Array[String]) { - if (args.length < 4) { - printUsage - } - - val streamReceiverHost = args(0) - val streamReceiverPort = args(1).toInt - val sentenceFile = args(2) - val intervalTime = args(3).toLong - val sentencesPerInterval = if (args.length > 4) args(4).toInt else 0 - - println("Reading the file " + sentenceFile) - val source = Source.fromFile(sentenceFile) - val lines = source.mkString.split ("\n") - source.close() - - val numSentences = if (sentencesPerInterval <= 0) { - lines.length - } else { - sentencesPerInterval - } - - println("Generating sentences") - val sentences: Array[String] = if (numSentences <= lines.length) { - lines.take(numSentences).toArray - } else { - (0 until numSentences).map(i => lines(i % lines.length)).toArray - } - - println("Converting to byte array") - val byteStream = new ByteArrayOutputStream() - val stringDataStream = new DataOutputStream(byteStream) - /*stringDataStream.writeInt(sentences.size)*/ - sentences.foreach(stringDataStream.writeUTF) - val bytes = byteStream.toByteArray() - stringDataStream.close() - println("Generated array of " + bytes.length + " bytes") - - /*while(true) { */ - sendSentences(streamReceiverHost, streamReceiverPort, numSentences, bytes, intervalTime) - /*println("Sleeping for 5 seconds")*/ - /*Thread.sleep(5000)*/ - /*System.gc()*/ - /*}*/ - } -} - - - diff --git a/streaming/src/main/scala/spark/streaming/util/TestGenerator4.scala b/streaming/src/main/scala/spark/streaming/util/TestGenerator4.scala deleted file mode 100644 index 9c39ef3e12..0000000000 --- a/streaming/src/main/scala/spark/streaming/util/TestGenerator4.scala +++ /dev/null @@ -1,244 +0,0 @@ -package spark.streaming.util - -import spark.Logging - -import scala.util.Random -import scala.io.Source -import scala.collection.mutable.{ArrayBuffer, Queue} - -import java.net._ -import java.io._ -import java.nio._ -import java.nio.charset._ -import java.nio.channels._ - -import it.unimi.dsi.fastutil.io._ - -class TestGenerator4(targetHost: String, targetPort: Int, sentenceFile: String, intervalDuration: Long, sentencesPerInterval: Int) -extends Logging { - - class SendingConnectionHandler(host: String, port: Int, generator: TestGenerator4) - extends ConnectionHandler(host, port, true) { - - val buffers = new ArrayBuffer[ByteBuffer] - val newBuffers = new Queue[ByteBuffer] - var activeKey: SelectionKey = null - - def send(buffer: ByteBuffer) { - logDebug("Sending: " + buffer) - newBuffers.synchronized { - newBuffers.enqueue(buffer) - } - selector.wakeup() - buffer.synchronized { - buffer.wait() - } - } - - override def ready(key: SelectionKey) { - logDebug("Ready") - activeKey = key - val channel = key.channel.asInstanceOf[SocketChannel] - channel.register(selector, SelectionKey.OP_WRITE) - generator.startSending() - } - - override def preSelect() { - newBuffers.synchronized { - while(!newBuffers.isEmpty) { - val buffer = newBuffers.dequeue - buffers += buffer - logDebug("Added: " + buffer) - changeInterest(activeKey, SelectionKey.OP_WRITE) - } - } - } - - override def write(key: SelectionKey) { - try { - /*while(true) {*/ - val channel = key.channel.asInstanceOf[SocketChannel] - if (buffers.size > 0) { - val buffer = buffers(0) - val newBuffer = buffer.slice() - newBuffer.limit(math.min(newBuffer.remaining, 32768)) - val bytesWritten = channel.write(newBuffer) - buffer.position(buffer.position + bytesWritten) - if (bytesWritten == 0) return - if (buffer.remaining == 0) { - buffers -= buffer - buffer.synchronized { - buffer.notify() - } - } - /*changeInterest(key, SelectionKey.OP_WRITE)*/ - } else { - changeInterest(key, 0) - } - /*}*/ - } catch { - case e: IOException => { - if (e.toString.contains("pipe") || e.toString.contains("reset")) { - logError("Connection broken") - } else { - logError("Connection error", e) - } - close(key) - } - } - } - - override def close(key: SelectionKey) { - buffers.clear() - super.close(key) - } - } - - initLogging() - - val connectionHandler = new SendingConnectionHandler(targetHost, targetPort, this) - var sendingThread: Thread = null - var sendCount = 0 - val sendBatches = 5 - - def run() { - logInfo("Connection handler started") - connectionHandler.start() - connectionHandler.join() - if (sendingThread != null && !sendingThread.isInterrupted) { - sendingThread.interrupt - } - logInfo("Connection handler stopped") - } - - def startSending() { - sendingThread = new Thread() { - override def run() { - logInfo("STARTING TO SEND") - sendSentences() - logInfo("SENDING STOPPED AFTER " + sendCount) - connectionHandler.interrupt() - } - } - sendingThread.start() - } - - def stopSending() { - sendingThread.interrupt() - } - - def sendSentences() { - logInfo("Reading the file " + sentenceFile) - val source = Source.fromFile(sentenceFile) - val lines = source.mkString.split ("\n") - source.close() - - val numSentences = if (sentencesPerInterval <= 0) { - lines.length - } else { - sentencesPerInterval - } - - logInfo("Generating sentence buffer") - val sentences: Array[String] = if (numSentences <= lines.length) { - lines.take(numSentences).toArray - } else { - (0 until numSentences).map(i => lines(i % lines.length)).toArray - } - - /* - val sentences: Array[String] = if (numSentences <= lines.length) { - lines.take((numSentences / sendBatches).toInt).toArray - } else { - (0 until (numSentences/sendBatches)).map(i => lines(i % lines.length)).toArray - }*/ - - - val serializer = new spark.KryoSerializer().newInstance() - val byteStream = new FastByteArrayOutputStream(100 * 1024 * 1024) - serializer.serializeStream(byteStream).writeAll(sentences.toIterator.asInstanceOf[Iterator[Any]]).close() - byteStream.trim() - val sentenceBuffer = ByteBuffer.wrap(byteStream.array) - - logInfo("Sending " + numSentences+ " sentences / " + sentenceBuffer.limit + " bytes per " + intervalDuration + " ms to " + targetHost + ":" + targetPort ) - val currentTime = System.currentTimeMillis - var targetTime = (currentTime / intervalDuration + 1).toLong * intervalDuration - Thread.sleep(targetTime - currentTime) - - val totalBytes = sentenceBuffer.limit - - while(true) { - val batchesInCurrentInterval = sendBatches // if (sendCount < 10) 1 else sendBatches - - val startTime = System.currentTimeMillis() - logDebug("Sending # " + sendCount + " at " + startTime + " ms with delay of " + (startTime - targetTime) + " ms") - - (0 until batchesInCurrentInterval).foreach(i => { - try { - val position = (i * totalBytes / sendBatches).toInt - val limit = if (i == sendBatches - 1) { - totalBytes - } else { - ((i + 1) * totalBytes / sendBatches).toInt - 1 - } - - val partStartTime = System.currentTimeMillis - sentenceBuffer.limit(limit) - connectionHandler.send(sentenceBuffer) - val partFinishTime = System.currentTimeMillis - val sleepTime = math.max(0, intervalDuration / sendBatches - (partFinishTime - partStartTime) - 1) - Thread.sleep(sleepTime) - - } catch { - case ie: InterruptedException => return - case e: Exception => e.printStackTrace() - } - }) - sentenceBuffer.rewind() - - val finishTime = System.currentTimeMillis() - /*logInfo ("Sent " + sentenceBuffer.limit + " bytes in " + (finishTime - startTime) + " ms")*/ - targetTime = targetTime + intervalDuration //+ (if (sendCount < 3) 1000 else 0) - - val sleepTime = (targetTime - finishTime) + 20 - if (sleepTime > 0) { - logInfo("Sleeping for " + sleepTime + " ms") - Thread.sleep(sleepTime) - } else { - logInfo("###### Skipping sleep ######") - } - if (Thread.currentThread.isInterrupted) { - return - } - sendCount += 1 - } - } -} - -object TestGenerator4 { - def printUsage { - println("Usage: TestGenerator4 <target IP> <target port> <sentence file> <interval duration> [<sentences per second>]") - System.exit(0) - } - - def main(args: Array[String]) { - println("GENERATOR STARTED") - if (args.length < 4) { - printUsage - } - - - val streamReceiverHost = args(0) - val streamReceiverPort = args(1).toInt - val sentenceFile = args(2) - val intervalDuration = args(3).toLong - val sentencesPerInterval = if (args.length > 4) args(4).toInt else 0 - - while(true) { - val generator = new TestGenerator4(streamReceiverHost, streamReceiverPort, sentenceFile, intervalDuration, sentencesPerInterval) - generator.run() - Thread.sleep(2000) - } - println("GENERATOR STOPPED") - } -} diff --git a/streaming/src/main/scala/spark/streaming/util/TestStreamCoordinator.scala b/streaming/src/main/scala/spark/streaming/util/TestStreamCoordinator.scala deleted file mode 100644 index f584f772bb..0000000000 --- a/streaming/src/main/scala/spark/streaming/util/TestStreamCoordinator.scala +++ /dev/null @@ -1,39 +0,0 @@ -package spark.streaming.util - -import spark.streaming._ -import spark.Logging - -import akka.actor._ -import akka.actor.Actor -import akka.actor.Actor._ - -sealed trait TestStreamCoordinatorMessage -case class GetStreamDetails extends TestStreamCoordinatorMessage -case class GotStreamDetails(name: String, duration: Long) extends TestStreamCoordinatorMessage -case class TestStarted extends TestStreamCoordinatorMessage - -class TestStreamCoordinator(streamDetails: Array[(String, Long)]) extends Actor with Logging { - - var index = 0 - - initLogging() - - logInfo("Created") - - def receive = { - case TestStarted => { - sender ! "OK" - } - - case GetStreamDetails => { - val streamDetail = if (index >= streamDetails.length) null else streamDetails(index) - sender ! GotStreamDetails(streamDetail._1, streamDetail._2) - index += 1 - if (streamDetail != null) { - logInfo("Allocated " + streamDetail._1 + " (" + index + "/" + streamDetails.length + ")" ) - } - } - } - -} - diff --git a/streaming/src/main/scala/spark/streaming/util/TestStreamReceiver3.scala b/streaming/src/main/scala/spark/streaming/util/TestStreamReceiver3.scala deleted file mode 100644 index 80ad924dd8..0000000000 --- a/streaming/src/main/scala/spark/streaming/util/TestStreamReceiver3.scala +++ /dev/null @@ -1,421 +0,0 @@ -package spark.streaming.util - -import spark._ -import spark.storage._ -import spark.util.AkkaUtils -import spark.streaming._ - -import scala.math._ -import scala.collection.mutable.{Queue, HashMap, ArrayBuffer, SynchronizedMap} - -import akka.actor._ -import akka.actor.Actor -import akka.dispatch._ -import akka.pattern.ask -import akka.util.duration._ - -import java.io.DataInputStream -import java.io.BufferedInputStream -import java.net.Socket -import java.net.ServerSocket -import java.util.LinkedHashMap - -import org.apache.hadoop.fs._ -import org.apache.hadoop.conf._ -import org.apache.hadoop.io._ -import org.apache.hadoop.mapred._ -import org.apache.hadoop.util._ - -import spark.Utils - - -class TestStreamReceiver3(actorSystem: ActorSystem, blockManager: BlockManager) -extends Thread with Logging { - - - class DataHandler( - inputName: String, - longIntervalDuration: Time, - shortIntervalDuration: Time, - blockManager: BlockManager - ) - extends Logging { - - class Block(var id: String, var shortInterval: Interval) { - val data = ArrayBuffer[String]() - var pushed = false - def longInterval = getLongInterval(shortInterval) - def empty() = (data.size == 0) - def += (str: String) = (data += str) - override def toString() = "Block " + id - } - - class Bucket(val longInterval: Interval) { - val blocks = new ArrayBuffer[Block]() - var filled = false - def += (block: Block) = blocks += block - def empty() = (blocks.size == 0) - def ready() = (filled && !blocks.exists(! _.pushed)) - def blockIds() = blocks.map(_.id).toArray - override def toString() = "Bucket [" + longInterval + ", " + blocks.size + " blocks]" - } - - initLogging() - - val shortIntervalDurationMillis = shortIntervalDuration.toLong - val longIntervalDurationMillis = longIntervalDuration.toLong - - var currentBlock: Block = null - var currentBucket: Bucket = null - - val blocksForPushing = new Queue[Block]() - val buckets = new HashMap[Interval, Bucket]() with SynchronizedMap[Interval, Bucket] - - val blockUpdatingThread = new Thread() { override def run() { keepUpdatingCurrentBlock() } } - val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } } - - def start() { - blockUpdatingThread.start() - blockPushingThread.start() - } - - def += (data: String) = addData(data) - - def addData(data: String) { - if (currentBlock == null) { - updateCurrentBlock() - } - currentBlock.synchronized { - currentBlock += data - } - } - - def getShortInterval(time: Time): Interval = { - val intervalBegin = time.floor(shortIntervalDuration) - Interval(intervalBegin, intervalBegin + shortIntervalDuration) - } - - def getLongInterval(shortInterval: Interval): Interval = { - val intervalBegin = shortInterval.beginTime.floor(longIntervalDuration) - Interval(intervalBegin, intervalBegin + longIntervalDuration) - } - - def updateCurrentBlock() { - /*logInfo("Updating current block")*/ - val currentTime = Time(System.currentTimeMillis) - val shortInterval = getShortInterval(currentTime) - val longInterval = getLongInterval(shortInterval) - - def createBlock(reuseCurrentBlock: Boolean = false) { - val newBlockId = inputName + "-" + longInterval.toFormattedString + "-" + currentBucket.blocks.size - if (!reuseCurrentBlock) { - val newBlock = new Block(newBlockId, shortInterval) - /*logInfo("Created " + currentBlock)*/ - currentBlock = newBlock - } else { - currentBlock.shortInterval = shortInterval - currentBlock.id = newBlockId - } - } - - def createBucket() { - val newBucket = new Bucket(longInterval) - buckets += ((longInterval, newBucket)) - currentBucket = newBucket - /*logInfo("Created " + currentBucket + ", " + buckets.size + " buckets")*/ - } - - if (currentBlock == null || currentBucket == null) { - createBucket() - currentBucket.synchronized { - createBlock() - } - return - } - - currentBlock.synchronized { - var reuseCurrentBlock = false - - if (shortInterval != currentBlock.shortInterval) { - if (!currentBlock.empty) { - blocksForPushing.synchronized { - blocksForPushing += currentBlock - blocksForPushing.notifyAll() - } - } - - currentBucket.synchronized { - if (currentBlock.empty) { - reuseCurrentBlock = true - } else { - currentBucket += currentBlock - } - - if (longInterval != currentBucket.longInterval) { - currentBucket.filled = true - if (currentBucket.ready) { - currentBucket.notifyAll() - } - createBucket() - } - } - - createBlock(reuseCurrentBlock) - } - } - } - - def pushBlock(block: Block) { - try{ - if (blockManager != null) { - logInfo("Pushing block") - val startTime = System.currentTimeMillis - - val bytes = blockManager.dataSerialize("rdd_", block.data.toIterator) // TODO: Will this be an RDD block? - val finishTime = System.currentTimeMillis - logInfo(block + " serialization delay is " + (finishTime - startTime) / 1000.0 + " s") - - blockManager.putBytes(block.id.toString, bytes, StorageLevel.MEMORY_AND_DISK_SER_2) - /*blockManager.putBytes(block.id.toString, bytes, StorageLevel.DISK_AND_MEMORY_DESER_2)*/ - /*blockManager.put(block.id.toString, block.data.toIterator, StorageLevel.DISK_AND_MEMORY_DESER)*/ - /*blockManager.put(block.id.toString, block.data.toIterator, StorageLevel.DISK_AND_MEMORY)*/ - val finishTime1 = System.currentTimeMillis - logInfo(block + " put delay is " + (finishTime1 - startTime) / 1000.0 + " s") - } else { - logWarning(block + " not put as block manager is null") - } - } catch { - case e: Exception => logError("Exception writing " + block + " to blockmanager" , e) - } - } - - def getBucket(longInterval: Interval): Option[Bucket] = { - buckets.get(longInterval) - } - - def clearBucket(longInterval: Interval) { - buckets.remove(longInterval) - } - - def keepUpdatingCurrentBlock() { - logInfo("Thread to update current block started") - while(true) { - updateCurrentBlock() - val currentTimeMillis = System.currentTimeMillis - val sleepTimeMillis = (currentTimeMillis / shortIntervalDurationMillis + 1) * - shortIntervalDurationMillis - currentTimeMillis + 1 - Thread.sleep(sleepTimeMillis) - } - } - - def keepPushingBlocks() { - var loop = true - logInfo("Thread to push blocks started") - while(loop) { - val block = blocksForPushing.synchronized { - if (blocksForPushing.size == 0) { - blocksForPushing.wait() - } - blocksForPushing.dequeue - } - pushBlock(block) - block.pushed = true - block.data.clear() - - val bucket = buckets(block.longInterval) - bucket.synchronized { - if (bucket.ready) { - bucket.notifyAll() - } - } - } - } - } - - - class ConnectionListener(port: Int, dataHandler: DataHandler) - extends Thread with Logging { - initLogging() - override def run { - try { - val listener = new ServerSocket(port) - logInfo("Listening on port " + port) - while (true) { - new ConnectionHandler(listener.accept(), dataHandler).start(); - } - listener.close() - } catch { - case e: Exception => logError("", e); - } - } - } - - class ConnectionHandler(socket: Socket, dataHandler: DataHandler) extends Thread with Logging { - initLogging() - override def run { - logInfo("New connection from " + socket.getInetAddress() + ":" + socket.getPort) - val bytes = new Array[Byte](100 * 1024 * 1024) - try { - - val inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream, 1024 * 1024)) - /*val inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream))*/ - var str: String = null - str = inputStream.readUTF - while(str != null) { - dataHandler += str - str = inputStream.readUTF() - } - - /* - var loop = true - while(loop) { - val numRead = inputStream.read(bytes) - if (numRead < 0) { - loop = false - } - inbox += ((LongTime(SystemTime.currentTimeMillis), "test")) - }*/ - - inputStream.close() - } catch { - case e => logError("Error receiving data", e) - } - socket.close() - } - } - - initLogging() - - val masterHost = System.getProperty("spark.master.host") - val masterPort = System.getProperty("spark.master.port").toInt - - val akkaPath = "akka://spark@%s:%s/user/".format(masterHost, masterPort) - val sparkstreamScheduler = actorSystem.actorFor(akkaPath + "/SparkStreamScheduler") - val testStreamCoordinator = actorSystem.actorFor(akkaPath + "/TestStreamCoordinator") - - logInfo("Getting stream details from master " + masterHost + ":" + masterPort) - - val timeout = 50 millis - - var started = false - while (!started) { - askActor[String](testStreamCoordinator, TestStarted) match { - case Some(str) => { - started = true - logInfo("TestStreamCoordinator started") - } - case None => { - logInfo("TestStreamCoordinator not started yet") - Thread.sleep(200) - } - } - } - - val streamDetails = askActor[GotStreamDetails](testStreamCoordinator, GetStreamDetails) match { - case Some(details) => details - case None => throw new Exception("Could not get stream details") - } - logInfo("Stream details received: " + streamDetails) - - val inputName = streamDetails.name - val intervalDurationMillis = streamDetails.duration - val intervalDuration = Time(intervalDurationMillis) - - val dataHandler = new DataHandler( - inputName, - intervalDuration, - Time(TestStreamReceiver3.SHORT_INTERVAL_MILLIS), - blockManager) - - val connListener = new ConnectionListener(TestStreamReceiver3.PORT, dataHandler) - - // Send a message to an actor and return an option with its reply, or None if this times out - def askActor[T](actor: ActorRef, message: Any): Option[T] = { - try { - val future = actor.ask(message)(timeout) - return Some(Await.result(future, timeout).asInstanceOf[T]) - } catch { - case e: Exception => - logInfo("Error communicating with " + actor, e) - return None - } - } - - override def run() { - connListener.start() - dataHandler.start() - - var interval = Interval.currentInterval(intervalDuration) - var dataStarted = false - - while(true) { - waitFor(interval.endTime) - logInfo("Woken up at " + System.currentTimeMillis + " for " + interval) - dataHandler.getBucket(interval) match { - case Some(bucket) => { - logInfo("Found " + bucket + " for " + interval) - bucket.synchronized { - if (!bucket.ready) { - logInfo("Waiting for " + bucket) - bucket.wait() - logInfo("Wait over for " + bucket) - } - if (dataStarted || !bucket.empty) { - logInfo("Notifying " + bucket) - notifyScheduler(interval, bucket.blockIds) - dataStarted = true - } - bucket.blocks.clear() - dataHandler.clearBucket(interval) - } - } - case None => { - logInfo("Found none for " + interval) - if (dataStarted) { - logInfo("Notifying none") - notifyScheduler(interval, Array[String]()) - } - } - } - interval = interval.next - } - } - - def waitFor(time: Time) { - val currentTimeMillis = System.currentTimeMillis - val targetTimeMillis = time.milliseconds - if (currentTimeMillis < targetTimeMillis) { - val sleepTime = (targetTimeMillis - currentTimeMillis) - Thread.sleep(sleepTime + 1) - } - } - - def notifyScheduler(interval: Interval, blockIds: Array[String]) { - try { - sparkstreamScheduler ! InputGenerated(inputName, interval, blockIds.toArray) - val time = interval.endTime - val delay = (System.currentTimeMillis - time.milliseconds) / 1000.0 - logInfo("Pushing delay for " + time + " is " + delay + " s") - } catch { - case _ => logError("Exception notifying scheduler at interval " + interval) - } - } -} - -object TestStreamReceiver3 { - - val PORT = 9999 - val SHORT_INTERVAL_MILLIS = 100 - - def main(args: Array[String]) { - System.setProperty("spark.master.host", Utils.localHostName) - System.setProperty("spark.master.port", "7078") - val details = Array(("Sentences", 2000L)) - val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localHostName, 7078) - actorSystem.actorOf(Props(new TestStreamCoordinator(details)), name = "TestStreamCoordinator") - new TestStreamReceiver3(actorSystem, null).start() - } -} - - - diff --git a/streaming/src/main/scala/spark/streaming/util/TestStreamReceiver4.scala b/streaming/src/main/scala/spark/streaming/util/TestStreamReceiver4.scala deleted file mode 100644 index 31754870dd..0000000000 --- a/streaming/src/main/scala/spark/streaming/util/TestStreamReceiver4.scala +++ /dev/null @@ -1,374 +0,0 @@ -package spark.streaming.util - -import spark.streaming._ -import spark._ -import spark.storage._ -import spark.util.AkkaUtils - -import scala.math._ -import scala.collection.mutable.{Queue, HashMap, ArrayBuffer, SynchronizedMap} - -import java.io._ -import java.nio._ -import java.nio.charset._ -import java.nio.channels._ -import java.util.concurrent.Executors - -import akka.actor._ -import akka.actor.Actor -import akka.dispatch._ -import akka.pattern.ask -import akka.util.duration._ - -class TestStreamReceiver4(actorSystem: ActorSystem, blockManager: BlockManager) -extends Thread with Logging { - - class DataHandler( - inputName: String, - longIntervalDuration: Time, - shortIntervalDuration: Time, - blockManager: BlockManager - ) - extends Logging { - - class Block(val id: String, val shortInterval: Interval, val buffer: ByteBuffer) { - var pushed = false - def longInterval = getLongInterval(shortInterval) - override def toString() = "Block " + id - } - - class Bucket(val longInterval: Interval) { - val blocks = new ArrayBuffer[Block]() - var filled = false - def += (block: Block) = blocks += block - def empty() = (blocks.size == 0) - def ready() = (filled && !blocks.exists(! _.pushed)) - def blockIds() = blocks.map(_.id).toArray - override def toString() = "Bucket [" + longInterval + ", " + blocks.size + " blocks]" - } - - initLogging() - - val syncOnLastShortInterval = true - - val shortIntervalDurationMillis = shortIntervalDuration.milliseconds - val longIntervalDurationMillis = longIntervalDuration.milliseconds - - val buffer = ByteBuffer.allocateDirect(100 * 1024 * 1024) - var currentShortInterval = Interval.currentInterval(shortIntervalDuration) - - val blocksForPushing = new Queue[Block]() - val buckets = new HashMap[Interval, Bucket]() with SynchronizedMap[Interval, Bucket] - - val bufferProcessingThread = new Thread() { override def run() { keepProcessingBuffers() } } - val blockPushingExecutor = Executors.newFixedThreadPool(5) - - - def start() { - buffer.clear() - if (buffer.remaining == 0) { - throw new Exception("Buffer initialization error") - } - bufferProcessingThread.start() - } - - def readDataToBuffer(func: ByteBuffer => Int): Int = { - buffer.synchronized { - if (buffer.remaining == 0) { - logInfo("Received first data for interval " + currentShortInterval) - } - func(buffer) - } - } - - def getLongInterval(shortInterval: Interval): Interval = { - val intervalBegin = shortInterval.beginTime.floor(longIntervalDuration) - Interval(intervalBegin, intervalBegin + longIntervalDuration) - } - - def processBuffer() { - - def readInt(buffer: ByteBuffer): Int = { - var offset = 0 - var result = 0 - while (offset < 32) { - val b = buffer.get() - result |= ((b & 0x7F) << offset) - if ((b & 0x80) == 0) { - return result - } - offset += 7 - } - throw new Exception("Malformed zigzag-encoded integer") - } - - val currentLongInterval = getLongInterval(currentShortInterval) - val startTime = System.currentTimeMillis - val newBuffer: ByteBuffer = buffer.synchronized { - buffer.flip() - if (buffer.remaining == 0) { - buffer.clear() - null - } else { - logDebug("Processing interval " + currentShortInterval + " with delay of " + (System.currentTimeMillis - startTime) + " ms") - val startTime1 = System.currentTimeMillis - var loop = true - var count = 0 - while(loop) { - buffer.mark() - try { - val len = readInt(buffer) - buffer.position(buffer.position + len) - count += 1 - } catch { - case e: Exception => { - buffer.reset() - loop = false - } - } - } - val bytesToCopy = buffer.position - val newBuf = ByteBuffer.allocate(bytesToCopy) - buffer.position(0) - newBuf.put(buffer.slice().limit(bytesToCopy).asInstanceOf[ByteBuffer]) - newBuf.flip() - buffer.position(bytesToCopy) - buffer.compact() - newBuf - } - } - - if (newBuffer != null) { - val bucket = buckets.getOrElseUpdate(currentLongInterval, new Bucket(currentLongInterval)) - bucket.synchronized { - val newBlockId = inputName + "-" + currentLongInterval.toFormattedString + "-" + currentShortInterval.toFormattedString - val newBlock = new Block(newBlockId, currentShortInterval, newBuffer) - if (syncOnLastShortInterval) { - bucket += newBlock - } - logDebug("Created " + newBlock + " with " + newBuffer.remaining + " bytes, creation delay is " + (System.currentTimeMillis - currentShortInterval.endTime.milliseconds) / 1000.0 + " s" ) - blockPushingExecutor.execute(new Runnable() { def run() { pushAndNotifyBlock(newBlock) } }) - } - } - - val newShortInterval = Interval.currentInterval(shortIntervalDuration) - val newLongInterval = getLongInterval(newShortInterval) - - if (newLongInterval != currentLongInterval) { - buckets.get(currentLongInterval) match { - case Some(bucket) => { - bucket.synchronized { - bucket.filled = true - if (bucket.ready) { - bucket.notifyAll() - } - } - } - case None => - } - buckets += ((newLongInterval, new Bucket(newLongInterval))) - } - - currentShortInterval = newShortInterval - } - - def pushBlock(block: Block) { - try{ - if (blockManager != null) { - val startTime = System.currentTimeMillis - logInfo(block + " put start delay is " + (startTime - block.shortInterval.endTime.milliseconds) + " ms") - /*blockManager.putBytes(block.id.toString, block.buffer, StorageLevel.DISK_AND_MEMORY)*/ - /*blockManager.putBytes(block.id.toString, block.buffer, StorageLevel.DISK_AND_MEMORY_2)*/ - blockManager.putBytes(block.id.toString, block.buffer, StorageLevel.MEMORY_ONLY_2) - /*blockManager.putBytes(block.id.toString, block.buffer, StorageLevel.MEMORY_ONLY)*/ - /*blockManager.putBytes(block.id.toString, block.buffer, StorageLevel.DISK_AND_MEMORY_DESER)*/ - /*blockManager.putBytes(block.id.toString, block.buffer, StorageLevel.DISK_AND_MEMORY_DESER_2)*/ - val finishTime = System.currentTimeMillis - logInfo(block + " put delay is " + (finishTime - startTime) + " ms") - } else { - logWarning(block + " not put as block manager is null") - } - } catch { - case e: Exception => logError("Exception writing " + block + " to blockmanager" , e) - } - } - - def getBucket(longInterval: Interval): Option[Bucket] = { - buckets.get(longInterval) - } - - def clearBucket(longInterval: Interval) { - buckets.remove(longInterval) - } - - def keepProcessingBuffers() { - logInfo("Thread to process buffers started") - while(true) { - processBuffer() - val currentTimeMillis = System.currentTimeMillis - val sleepTimeMillis = (currentTimeMillis / shortIntervalDurationMillis + 1) * - shortIntervalDurationMillis - currentTimeMillis + 1 - Thread.sleep(sleepTimeMillis) - } - } - - def pushAndNotifyBlock(block: Block) { - pushBlock(block) - block.pushed = true - val bucket = if (syncOnLastShortInterval) { - buckets(block.longInterval) - } else { - var longInterval = block.longInterval - while(!buckets.contains(longInterval)) { - logWarning("Skipping bucket of " + longInterval + " for " + block) - longInterval = longInterval.next - } - val chosenBucket = buckets(longInterval) - logDebug("Choosing bucket of " + longInterval + " for " + block) - chosenBucket += block - chosenBucket - } - - bucket.synchronized { - if (bucket.ready) { - bucket.notifyAll() - } - } - - } - } - - - class ReceivingConnectionHandler(host: String, port: Int, dataHandler: DataHandler) - extends ConnectionHandler(host, port, false) { - - override def ready(key: SelectionKey) { - changeInterest(key, SelectionKey.OP_READ) - } - - override def read(key: SelectionKey) { - try { - val channel = key.channel.asInstanceOf[SocketChannel] - val bytesRead = dataHandler.readDataToBuffer(channel.read) - if (bytesRead < 0) { - close(key) - } - } catch { - case e: IOException => { - logError("Error reading", e) - close(key) - } - } - } - } - - initLogging() - - val masterHost = System.getProperty("spark.master.host", "localhost") - val masterPort = System.getProperty("spark.master.port", "7078").toInt - - val akkaPath = "akka://spark@%s:%s/user/".format(masterHost, masterPort) - val sparkstreamScheduler = actorSystem.actorFor(akkaPath + "/SparkStreamScheduler") - val testStreamCoordinator = actorSystem.actorFor(akkaPath + "/TestStreamCoordinator") - - logInfo("Getting stream details from master " + masterHost + ":" + masterPort) - - val streamDetails = askActor[GotStreamDetails](testStreamCoordinator, GetStreamDetails) match { - case Some(details) => details - case None => throw new Exception("Could not get stream details") - } - logInfo("Stream details received: " + streamDetails) - - val inputName = streamDetails.name - val intervalDurationMillis = streamDetails.duration - val intervalDuration = Milliseconds(intervalDurationMillis) - val shortIntervalDuration = Milliseconds(System.getProperty("spark.stream.shortinterval", "500").toInt) - - val dataHandler = new DataHandler(inputName, intervalDuration, shortIntervalDuration, blockManager) - val connectionHandler = new ReceivingConnectionHandler("localhost", 9999, dataHandler) - - val timeout = 100 millis - - // Send a message to an actor and return an option with its reply, or None if this times out - def askActor[T](actor: ActorRef, message: Any): Option[T] = { - try { - val future = actor.ask(message)(timeout) - return Some(Await.result(future, timeout).asInstanceOf[T]) - } catch { - case e: Exception => - logInfo("Error communicating with " + actor, e) - return None - } - } - - override def run() { - connectionHandler.start() - dataHandler.start() - - var interval = Interval.currentInterval(intervalDuration) - var dataStarted = false - - - while(true) { - waitFor(interval.endTime) - /*logInfo("Woken up at " + System.currentTimeMillis + " for " + interval)*/ - dataHandler.getBucket(interval) match { - case Some(bucket) => { - logDebug("Found " + bucket + " for " + interval) - bucket.synchronized { - if (!bucket.ready) { - logDebug("Waiting for " + bucket) - bucket.wait() - logDebug("Wait over for " + bucket) - } - if (dataStarted || !bucket.empty) { - logDebug("Notifying " + bucket) - notifyScheduler(interval, bucket.blockIds) - dataStarted = true - } - bucket.blocks.clear() - dataHandler.clearBucket(interval) - } - } - case None => { - logDebug("Found none for " + interval) - if (dataStarted) { - logDebug("Notifying none") - notifyScheduler(interval, Array[String]()) - } - } - } - interval = interval.next - } - } - - def waitFor(time: Time) { - val currentTimeMillis = System.currentTimeMillis - val targetTimeMillis = time.milliseconds - if (currentTimeMillis < targetTimeMillis) { - val sleepTime = (targetTimeMillis - currentTimeMillis) - Thread.sleep(sleepTime + 1) - } - } - - def notifyScheduler(interval: Interval, blockIds: Array[String]) { - try { - sparkstreamScheduler ! InputGenerated(inputName, interval, blockIds.toArray) - val time = interval.endTime - val delay = (System.currentTimeMillis - time.milliseconds) - logInfo("Notification delay for " + time + " is " + delay + " ms") - } catch { - case e: Exception => logError("Exception notifying scheduler at interval " + interval + ": " + e) - } - } -} - - -object TestStreamReceiver4 { - def main(args: Array[String]) { - val details = Array(("Sentences", 2000L)) - val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localHostName, 7078) - actorSystem.actorOf(Props(new TestStreamCoordinator(details)), name = "TestStreamCoordinator") - new TestStreamReceiver4(actorSystem, null).start() - } -} diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala index 0450120061..0bcf207082 100644 --- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala @@ -15,12 +15,16 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { } after { + + if (ssc != null) ssc.stop() FileUtils.deleteDirectory(new File(checkpointDir)) } + var ssc: StreamingContext = null + override def framework = "CheckpointSuite" - override def batchDuration = Milliseconds(500) + override def batchDuration = Milliseconds(200) override def checkpointDir = "checkpoint" @@ -30,12 +34,12 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { test("basic stream+rdd recovery") { - assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second") + assert(batchDuration === Milliseconds(200), "batchDuration for this test must be 1 second") assert(checkpointInterval === batchDuration, "checkpointInterval for this test much be same as batchDuration") System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") - val stateStreamCheckpointInterval = Seconds(2) + val stateStreamCheckpointInterval = Seconds(1) // this ensure checkpointing occurs at least once val firstNumBatches = (stateStreamCheckpointInterval.millis / batchDuration.millis) * 2 @@ -110,6 +114,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { runStreamsWithRealDelay(ssc, 4) ssc.stop() System.clearProperty("spark.streaming.manualClock.jump") + ssc = null } test("map and reduceByKey") { @@ -131,9 +136,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { .reduceByKeyAndWindow(_ + _, _ - _, batchDuration * w, batchDuration) .checkpoint(Seconds(2)) } - for (i <- Seq(2, 3, 4)) { - testCheckpointedOperation(input, operation, output, i) - } + testCheckpointedOperation(input, operation, output, 3) } test("updateStateByKey") { @@ -148,9 +151,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { .checkpoint(Seconds(2)) .map(t => (t._1, t._2.self)) } - for (i <- Seq(2, 3, 4)) { - testCheckpointedOperation(input, operation, output, i) - } + testCheckpointedOperation(input, operation, output, 3) } @@ -171,7 +172,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { // Do half the computation (half the number of batches), create checkpoint file and quit - val ssc = setupStreams[U, V](input, operation) + ssc = setupStreams[U, V](input, operation) val output = runStreams[V](ssc, initialNumBatches, initialNumExpectedOutputs) verifyOutput[V](output, expectedOutput.take(initialNumBatches), true) Thread.sleep(1000) @@ -182,9 +183,10 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter { " Restarting stream computation " + "\n-------------------------------------------\n" ) - val sscNew = new StreamingContext(checkpointDir) - val outputNew = runStreams[V](sscNew, nextNumBatches, nextNumExpectedOutputs) + ssc = new StreamingContext(checkpointDir) + val outputNew = runStreams[V](ssc, nextNumBatches, nextNumExpectedOutputs) verifyOutput[V](outputNew, expectedOutput.takeRight(nextNumExpectedOutputs), true) + ssc = null } /** diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala index 8f892baab1..0957748603 100644 --- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala @@ -9,12 +9,19 @@ import spark.storage.StorageLevel import spark.Logging import scala.util.Random import org.apache.commons.io.FileUtils +import org.scalatest.BeforeAndAfter -class InputStreamsSuite extends TestSuiteBase { +class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock") + override def checkpointDir = "checkpoint" + + after { + FileUtils.deleteDirectory(new File(checkpointDir)) + } + test("network input stream") { // Start the server val serverPort = 9999 @@ -30,7 +37,7 @@ class InputStreamsSuite extends TestSuiteBase { ssc.registerOutputStream(outputStream) ssc.start() - // Feed data to the server to send to the Spark Streaming network receiver + // Feed data to the server to send to the network receiver val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] val input = Seq(1, 2, 3, 4, 5) val expectedOutput = input.map(_.toString) @@ -52,7 +59,7 @@ class InputStreamsSuite extends TestSuiteBase { logInfo("Stopping context") ssc.stop() - // Verify whether data received by Spark Streaming was as expected + // Verify whether data received was as expected logInfo("--------------------------------") logInfo("output.size = " + outputBuffer.size) logInfo("output") @@ -69,6 +76,49 @@ class InputStreamsSuite extends TestSuiteBase { } } + test("network input stream with checkpoint") { + // Start the server + val serverPort = 9999 + val server = new TestServer(9999) + server.start() + + // Set up the streaming context and input streams + var ssc = new StreamingContext(master, framework) + ssc.setBatchDuration(batchDuration) + ssc.checkpoint(checkpointDir, checkpointInterval) + val networkStream = ssc.networkTextStream("localhost", serverPort, StorageLevel.MEMORY_AND_DISK) + var outputStream = new TestOutputStream(networkStream, new ArrayBuffer[Seq[String]]) + ssc.registerOutputStream(outputStream) + ssc.start() + + // Feed data to the server to send to the network receiver + var clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + for (i <- Seq(1, 2, 3)) { + server.send(i.toString + "\n") + Thread.sleep(100) + clock.addToTime(batchDuration.milliseconds) + } + Thread.sleep(500) + assert(outputStream.output.size > 0) + ssc.stop() + + // Restart stream computation from checkpoint and feed more data to see whether + // they are being received and processed + logInfo("*********** RESTARTING ************") + ssc = new StreamingContext(checkpointDir) + ssc.start() + clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + for (i <- Seq(4, 5, 6)) { + server.send(i.toString + "\n") + Thread.sleep(100) + clock.addToTime(batchDuration.milliseconds) + } + Thread.sleep(500) + outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[String]] + assert(outputStream.output.size > 0) + ssc.stop() + } + test("file input stream") { // Create a temporary directory val dir = { @@ -76,7 +126,7 @@ class InputStreamsSuite extends TestSuiteBase { temp.delete() temp.mkdirs() temp.deleteOnExit() - println("Created temp dir " + temp) + logInfo("Created temp dir " + temp) temp } @@ -84,7 +134,9 @@ class InputStreamsSuite extends TestSuiteBase { val ssc = new StreamingContext(master, framework) ssc.setBatchDuration(batchDuration) val filestream = ssc.textFileStream(dir.toString) - val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]] + val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]] + def output = outputBuffer.flatMap(x => x) + val outputStream = new TestOutputStream(filestream, outputBuffer) ssc.registerOutputStream(outputStream) ssc.start() @@ -96,36 +148,88 @@ class InputStreamsSuite extends TestSuiteBase { Thread.sleep(1000) for (i <- 0 until input.size) { FileUtils.writeStringToFile(new File(dir, i.toString), input(i).toString + "\n") - Thread.sleep(500) + Thread.sleep(100) clock.addToTime(batchDuration.milliseconds) - Thread.sleep(500) + Thread.sleep(100) } val startTime = System.currentTimeMillis() - while (outputBuffer.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { - println("output.size = " + outputBuffer.size + ", expectedOutput.size = " + expectedOutput.size) + while (output.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { + //println("output.size = " + output.size + ", expectedOutput.size = " + expectedOutput.size) Thread.sleep(100) } Thread.sleep(1000) val timeTaken = System.currentTimeMillis() - startTime assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") - println("Stopping context") + logInfo("Stopping context") ssc.stop() // Verify whether data received by Spark Streaming was as expected logInfo("--------------------------------") - logInfo("output.size = " + outputBuffer.size) + logInfo("output.size = " + output.size) logInfo("output") - outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]")) + output.foreach(x => logInfo("[" + x.mkString(",") + "]")) logInfo("expected output.size = " + expectedOutput.size) logInfo("expected output") expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) logInfo("--------------------------------") - assert(outputBuffer.size === expectedOutput.size) - for (i <- 0 until outputBuffer.size) { - assert(outputBuffer(i).size === 1) - assert(outputBuffer(i).head === expectedOutput(i)) + assert(output.size === expectedOutput.size) + for (i <- 0 until output.size) { + assert(output(i).size === 1) + assert(output(i).head.toString === expectedOutput(i)) + } + } + + test("file input stream with checkpoint") { + // Create a temporary directory + val dir = { + var temp = File.createTempFile(".temp.", Random.nextInt().toString) + temp.delete() + temp.mkdirs() + temp.deleteOnExit() + println("Created temp dir " + temp) + temp } + + // Set up the streaming context and input streams + var ssc = new StreamingContext(master, framework) + ssc.setBatchDuration(batchDuration) + ssc.checkpoint(checkpointDir, checkpointInterval) + val filestream = ssc.textFileStream(dir.toString) + var outputStream = new TestOutputStream(filestream, new ArrayBuffer[Seq[String]]) + ssc.registerOutputStream(outputStream) + ssc.start() + + // Create files and advance manual clock to process them + var clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + Thread.sleep(1000) + for (i <- Seq(1, 2, 3)) { + FileUtils.writeStringToFile(new File(dir, i.toString), i.toString + "\n") + Thread.sleep(100) + clock.addToTime(batchDuration.milliseconds) + } + Thread.sleep(500) + logInfo("Output = " + outputStream.output.mkString(",")) + assert(outputStream.output.size > 0) + ssc.stop() + + // Restart stream computation from checkpoint and create more files to see whether + // they are being processed + logInfo("*********** RESTARTING ************") + ssc = new StreamingContext(checkpointDir) + ssc.start() + clock = ssc.scheduler.clock.asInstanceOf[ManualClock] + Thread.sleep(500) + for (i <- Seq(4, 5, 6)) { + FileUtils.writeStringToFile(new File(dir, i.toString), i.toString + "\n") + Thread.sleep(100) + clock.addToTime(batchDuration.milliseconds) + } + Thread.sleep(500) + outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[String]] + logInfo("Output = " + outputStream.output.mkString(",")) + assert(outputStream.output.size > 0) + ssc.stop() } } |