aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorDenny <dennybritz@gmail.com>2012-11-12 19:39:29 -0800
committerDenny <dennybritz@gmail.com>2012-11-12 19:39:29 -0800
commit255b3e44c18e64a55afb184f39746780b391a496 (patch)
tree479fedcfdbf7e68bc6dc73188421a49fbc356a82 /streaming
parent0fd4c93f1c349f052f633fea64f975d53976bd9c (diff)
parent564dd8c3f415746a68f05bde6ea2a0e7a7760b4c (diff)
downloadspark-255b3e44c18e64a55afb184f39746780b391a496.tar.gz
spark-255b3e44c18e64a55afb184f39746780b391a496.tar.bz2
spark-255b3e44c18e64a55afb184f39746780b391a496.zip
Merge branch 'dev' into kafka
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/spark/streaming/DStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/FileInputDStream.scala34
-rw-r--r--streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala1
-rw-r--r--streaming/src/main/scala/spark/streaming/RawInputDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala15
-rw-r--r--streaming/src/main/scala/spark/streaming/StateDStream.scala2
-rw-r--r--streaming/src/main/scala/spark/streaming/StreamingContext.scala4
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/Grep2.scala64
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala11
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala102
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCount2.scala114
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala57
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/WordMax2.scala75
-rw-r--r--streaming/src/main/scala/spark/streaming/util/ConnectionHandler.scala157
-rw-r--r--streaming/src/main/scala/spark/streaming/util/RawTextHelper.scala98
-rw-r--r--streaming/src/main/scala/spark/streaming/util/SenderReceiverTest.scala67
-rw-r--r--streaming/src/main/scala/spark/streaming/util/SentenceFileGenerator.scala92
-rw-r--r--streaming/src/main/scala/spark/streaming/util/ShuffleTest.scala23
-rw-r--r--streaming/src/main/scala/spark/streaming/util/TestGenerator.scala107
-rw-r--r--streaming/src/main/scala/spark/streaming/util/TestGenerator2.scala119
-rw-r--r--streaming/src/main/scala/spark/streaming/util/TestGenerator4.scala244
-rw-r--r--streaming/src/main/scala/spark/streaming/util/TestStreamCoordinator.scala39
-rw-r--r--streaming/src/main/scala/spark/streaming/util/TestStreamReceiver3.scala421
-rw-r--r--streaming/src/main/scala/spark/streaming/util/TestStreamReceiver4.scala374
-rw-r--r--streaming/src/test/scala/spark/streaming/CheckpointSuite.scala26
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala136
26 files changed, 331 insertions, 2055 deletions
diff --git a/streaming/src/main/scala/spark/streaming/DStream.scala b/streaming/src/main/scala/spark/streaming/DStream.scala
index b8324d11a3..e8bbf7d1c0 100644
--- a/streaming/src/main/scala/spark/streaming/DStream.scala
+++ b/streaming/src/main/scala/spark/streaming/DStream.scala
@@ -82,7 +82,7 @@ extends Serializable with Logging {
this
}
- def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY)
+ def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER)
// Turn on the default caching level for this RDD
def cache(): DStream[T] = persist()
diff --git a/streaming/src/main/scala/spark/streaming/FileInputDStream.scala b/streaming/src/main/scala/spark/streaming/FileInputDStream.scala
index 9d7361097b..88856364d2 100644
--- a/streaming/src/main/scala/spark/streaming/FileInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/FileInputDStream.scala
@@ -6,7 +6,8 @@ import spark.rdd.UnionRDD
import org.apache.hadoop.fs.{FileSystem, Path, PathFilter}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
-import java.io.{ObjectInputStream, IOException}
+
+import scala.collection.mutable.HashSet
class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest](
@@ -19,7 +20,8 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
@transient private var path_ : Path = null
@transient private var fs_ : FileSystem = null
- var lastModTime: Long = 0
+ var lastModTime = 0L
+ val lastModTimeFiles = new HashSet[String]()
def path(): Path = {
if (path_ == null) path_ = new Path(directory)
@@ -40,22 +42,37 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
}
override def stop() { }
-
+
+ /**
+ * Finds the files that were modified since the last time this method was called and makes
+ * a union RDD out of them. Note that this maintains the list of files that were processed
+ * in the latest modification time in the previous call to this method. This is because the
+ * modification time returned by the FileStatus API seems to return times only at the
+ * granularity of seconds. Hence, new files may have the same modification time as the
+ * latest modification time in the previous call to this method and the list of files
+ * maintained is used to filter the one that have been processed.
+ */
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
+ // Create the filter for selecting new files
val newFilter = new PathFilter() {
var latestModTime = 0L
-
+ val latestModTimeFiles = new HashSet[String]()
+
def accept(path: Path): Boolean = {
if (!filter.accept(path)) {
return false
} else {
val modTime = fs.getFileStatus(path).getModificationTime()
- if (modTime <= lastModTime) {
+ if (modTime < lastModTime){
+ return false
+ } else if (modTime == lastModTime && lastModTimeFiles.contains(path.toString)) {
return false
}
if (modTime > latestModTime) {
latestModTime = modTime
+ latestModTimeFiles.clear()
}
+ latestModTimeFiles += path.toString
return true
}
}
@@ -64,7 +81,12 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
val newFiles = fs.listStatus(path, newFilter)
logInfo("New files: " + newFiles.map(_.getPath).mkString(", "))
if (newFiles.length > 0) {
- lastModTime = newFilter.latestModTime
+ // Update the modification time and the files processed for that modification time
+ if (lastModTime != newFilter.latestModTime) {
+ lastModTime = newFilter.latestModTime
+ lastModTimeFiles.clear()
+ }
+ lastModTimeFiles ++= newFilter.latestModTimeFiles
}
val newRDD = new UnionRDD(ssc.sc, newFiles.map(
file => ssc.sc.newAPIHadoopFile[K, V, F](file.getPath.toString)))
diff --git a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
index 4d9346edd8..4c42692295 100644
--- a/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
+++ b/streaming/src/main/scala/spark/streaming/NetworkInputTracker.scala
@@ -47,6 +47,7 @@ class NetworkInputTracker(
val result = queue.synchronized {
queue.dequeueAll(x => true)
}
+ logInfo("Stream " + receiverId + " received " + result.size + " blocks")
result.toArray
}
diff --git a/streaming/src/main/scala/spark/streaming/RawInputDStream.scala b/streaming/src/main/scala/spark/streaming/RawInputDStream.scala
index 90d8528d5b..d5db8e787d 100644
--- a/streaming/src/main/scala/spark/streaming/RawInputDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/RawInputDStream.scala
@@ -69,7 +69,7 @@ class RawNetworkReceiver(streamId: Int, host: String, port: Int, storageLevel: S
}
def onStop() {
- blockPushingThread.interrupt()
+ if (blockPushingThread != null) blockPushingThread.interrupt()
}
/** Read a buffer fully from a given Channel */
diff --git a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
index 6df82c0df3..b07d51fa6b 100644
--- a/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/ReducedWindowedDStream.scala
@@ -31,10 +31,14 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
"must be multiple of the slide duration of parent DStream (" + parent.slideTime + ")"
)
- super.persist(StorageLevel.MEMORY_ONLY)
-
+ // Reduce each batch of data using reduceByKey which will be further reduced by window
+ // by ReducedWindowedDStream
val reducedStream = parent.reduceByKey(reduceFunc, partitioner)
+ // Persist RDDs to memory by default as these RDDs are going to be reused.
+ super.persist(StorageLevel.MEMORY_ONLY_SER)
+ reducedStream.persist(StorageLevel.MEMORY_ONLY_SER)
+
def windowTime: Time = _windowTime
override def dependencies = List(reducedStream)
@@ -57,13 +61,6 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
this
}
- protected[streaming] override def setRememberDuration(time: Time) {
- if (rememberDuration == null || rememberDuration < time) {
- rememberDuration = time
- dependencies.foreach(_.setRememberDuration(rememberDuration + windowTime))
- }
- }
-
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
val reduceF = reduceFunc
val invReduceF = invReduceFunc
diff --git a/streaming/src/main/scala/spark/streaming/StateDStream.scala b/streaming/src/main/scala/spark/streaming/StateDStream.scala
index 0211df1343..cb261808f5 100644
--- a/streaming/src/main/scala/spark/streaming/StateDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/StateDStream.scala
@@ -23,7 +23,7 @@ class StateDStream[K: ClassManifest, V: ClassManifest, S <: AnyRef : ClassManife
rememberPartitioner: Boolean
) extends DStream[(K, S)](parent.ssc) {
- super.persist(StorageLevel.MEMORY_ONLY)
+ super.persist(StorageLevel.MEMORY_ONLY_SER)
override def dependencies = List(parent)
diff --git a/streaming/src/main/scala/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
index e87d0cb7c8..4cba525f86 100644
--- a/streaming/src/main/scala/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/StreamingContext.scala
@@ -149,7 +149,7 @@ final class StreamingContext (
def rawNetworkStream[T: ClassManifest](
hostname: String,
port: Int,
- storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[T] = {
val inputStream = new RawInputDStream[T](this, hostname, port, storageLevel)
graph.addInputStream(inputStream)
@@ -157,7 +157,7 @@ final class StreamingContext (
}
/**
- * This function creates a input stream that monitors a Hadoop-compatible
+ * This function creates a input stream that monitors a Hadoop-compatible filesystem
* for new files and executes the necessary processing on them.
*/
def fileStream[
diff --git a/streaming/src/main/scala/spark/streaming/examples/Grep2.scala b/streaming/src/main/scala/spark/streaming/examples/Grep2.scala
deleted file mode 100644
index b1faa65c17..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/Grep2.scala
+++ /dev/null
@@ -1,64 +0,0 @@
-package spark.streaming.examples
-
-import spark.SparkContext
-import SparkContext._
-import spark.streaming._
-import StreamingContext._
-
-import spark.storage.StorageLevel
-
-import scala.util.Sorting
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.Queue
-import scala.collection.JavaConversions.mapAsScalaMap
-
-import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
-
-
-object Grep2 {
-
- def warmup(sc: SparkContext) {
- (0 until 10).foreach {i =>
- sc.parallelize(1 to 20000000, 1000)
- .map(x => (x % 337, x % 1331))
- .reduceByKey(_ + _)
- .count()
- }
- }
-
- def main (args: Array[String]) {
-
- if (args.length != 6) {
- println ("Usage: Grep2 <host> <file> <mapTasks> <reduceTasks> <batchMillis> <chkptMillis>")
- System.exit(1)
- }
-
- val Array(master, file, mapTasks, reduceTasks, batchMillis, chkptMillis) = args
-
- val batchDuration = Milliseconds(batchMillis.toLong)
-
- val ssc = new StreamingContext(master, "Grep2")
- ssc.setBatchDuration(batchDuration)
-
- //warmup(ssc.sc)
-
- val data = ssc.sc.textFile(file, mapTasks.toInt).persist(
- new StorageLevel(false, true, false, 3)) // Memory only, serialized, 3 replicas
- println("Data count: " + data.count())
- println("Data count: " + data.count())
- println("Data count: " + data.count())
-
- val sentences = new ConstantInputDStream(ssc, data)
- ssc.registerInputStream(sentences)
-
- sentences.filter(_.contains("Culpepper")).count().foreachRDD(r =>
- println("Grep count: " + r.collect().mkString))
-
- ssc.start()
-
- while(true) { Thread.sleep(1000) }
- }
-}
-
-
diff --git a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala
index b1e1a613fe..ffbea6e55d 100644
--- a/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/GrepRaw.scala
@@ -2,8 +2,10 @@ package spark.streaming.examples
import spark.util.IntParam
import spark.storage.StorageLevel
+
import spark.streaming._
import spark.streaming.StreamingContext._
+import spark.streaming.util.RawTextHelper._
object GrepRaw {
def main(args: Array[String]) {
@@ -17,16 +19,13 @@ object GrepRaw {
// Create the context and set the batch size
val ssc = new StreamingContext(master, "GrepRaw")
ssc.setBatchDuration(Milliseconds(batchMillis))
+ warmUp(ssc.sc)
- // Make sure some tasks have started on each node
- ssc.sc.parallelize(1 to 1000, 1000).count()
- ssc.sc.parallelize(1 to 1000, 1000).count()
- ssc.sc.parallelize(1 to 1000, 1000).count()
val rawStreams = (1 to numStreams).map(_ =>
- ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray
+ ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_SER_2)).toArray
val union = new UnionDStream(rawStreams)
- union.filter(_.contains("Culpepper")).count().foreachRDD(r =>
+ union.filter(_.contains("Alice")).count().foreachRDD(r =>
println("Grep count: " + r.collect().mkString))
ssc.start()
}
diff --git a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
index 750cb7445f..0411bde1a7 100644
--- a/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/TopKWordCountRaw.scala
@@ -1,94 +1,50 @@
package spark.streaming.examples
-import spark.util.IntParam
-import spark.SparkContext
-import spark.SparkContext._
import spark.storage.StorageLevel
+import spark.util.IntParam
+
import spark.streaming._
import spark.streaming.StreamingContext._
+import spark.streaming.util.RawTextHelper._
-import WordCount2_ExtraFunctions._
+import java.util.UUID
object TopKWordCountRaw {
- def moreWarmup(sc: SparkContext) {
- (0 until 40).foreach {i =>
- sc.parallelize(1 to 20000000, 1000)
- .map(_ % 1331).map(_.toString)
- .mapPartitions(splitAndCountPartitions).reduceByKey(_ + _, 10)
- .collect()
- }
- }
-
+
def main(args: Array[String]) {
- if (args.length != 7) {
- System.err.println("Usage: TopKWordCountRaw <master> <streams> <host> <port> <batchMs> <chkptMs> <reduces>")
+ if (args.length != 4) {
+ System.err.println("Usage: WordCountRaw <master> <# streams> <port> <HDFS checkpoint directory> ")
System.exit(1)
}
- val Array(master, IntParam(streams), host, IntParam(port), IntParam(batchMs),
- IntParam(chkptMs), IntParam(reduces)) = args
-
- // Create the context and set the batch size
- val ssc = new StreamingContext(master, "TopKWordCountRaw")
- ssc.setBatchDuration(Milliseconds(batchMs))
-
- // Make sure some tasks have started on each node
- moreWarmup(ssc.sc)
-
- val rawStreams = (1 to streams).map(_ =>
- ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray
- val union = new UnionDStream(rawStreams)
-
- val windowedCounts = union.mapPartitions(splitAndCountPartitions)
- .reduceByKeyAndWindow(add _, subtract _, Seconds(30), Milliseconds(batchMs), reduces)
- windowedCounts.persist().checkpoint(Milliseconds(chkptMs))
- //.persist(StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY_2, Milliseconds(chkptMs))
-
- def topK(data: Iterator[(String, Long)], k: Int): Iterator[(String, Long)] = {
- val taken = new Array[(String, Long)](k)
-
- var i = 0
- var len = 0
- var done = false
- var value: (String, Long) = null
- var swap: (String, Long) = null
- var count = 0
-
- while(data.hasNext) {
- value = data.next
- count += 1
- println("count = " + count)
- if (len == 0) {
- taken(0) = value
- len = 1
- } else if (len < k || value._2 > taken(len - 1)._2) {
- if (len < k) {
- len += 1
- }
- taken(len - 1) = value
- i = len - 1
- while(i > 0 && taken(i - 1)._2 < taken(i)._2) {
- swap = taken(i)
- taken(i) = taken(i-1)
- taken(i - 1) = swap
- i -= 1
- }
- }
- }
- println("Took " + len + " out of " + count + " items")
- return taken.toIterator
- }
+ val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args
+ val k = 10
- val k = 50
+ // Create the context, set the batch size and checkpoint directory.
+ // Checkpoint directory is necessary for achieving fault-tolerance, by saving counts
+ // periodically to HDFS
+ val ssc = new StreamingContext(master, "TopKWordCountRaw")
+ ssc.setBatchDuration(Seconds(1))
+ ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1))
+
+ // Warm up the JVMs on master and slave for JIT compilation to kick in
+ /*warmUp(ssc.sc)*/
+
+ // Set up the raw network streams that will connect to localhost:port to raw test
+ // senders on the slaves and generate top K words of last 30 seconds
+ val lines = (1 to numStreams).map(_ => {
+ ssc.rawNetworkStream[String]("localhost", port, StorageLevel.MEMORY_ONLY_SER_2)
+ })
+ val union = new UnionDStream(lines.toArray)
+ val counts = union.mapPartitions(splitAndCountPartitions)
+ val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10)
val partialTopKWindowedCounts = windowedCounts.mapPartitions(topK(_, k))
partialTopKWindowedCounts.foreachRDD(rdd => {
val collectedCounts = rdd.collect
- println("Collected " + collectedCounts.size + " items")
- topK(collectedCounts.toIterator, k).foreach(println)
+ println("Collected " + collectedCounts.size + " words from partial top words")
+ println("Top " + k + " words are " + topK(collectedCounts.toIterator, k).mkString(","))
})
-// windowedCounts.foreachRDD(r => println("Element count: " + r.count()))
-
ssc.start()
}
}
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala b/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala
deleted file mode 100644
index 865026033e..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/WordCount2.scala
+++ /dev/null
@@ -1,114 +0,0 @@
-package spark.streaming.examples
-
-import spark.SparkContext
-import SparkContext._
-import spark.streaming._
-import StreamingContext._
-
-import spark.storage.StorageLevel
-
-import scala.util.Sorting
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.Queue
-import scala.collection.JavaConversions.mapAsScalaMap
-
-import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
-
-
-object WordCount2_ExtraFunctions {
-
- def add(v1: Long, v2: Long) = (v1 + v2)
-
- def subtract(v1: Long, v2: Long) = (v1 - v2)
-
- def max(v1: Long, v2: Long) = math.max(v1, v2)
-
- def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, Long)] = {
- //val map = new java.util.HashMap[String, Long]
- val map = new OLMap[String]
- var i = 0
- var j = 0
- while (iter.hasNext) {
- val s = iter.next()
- i = 0
- while (i < s.length) {
- j = i
- while (j < s.length && s.charAt(j) != ' ') {
- j += 1
- }
- if (j > i) {
- val w = s.substring(i, j)
- val c = map.getLong(w)
- map.put(w, c + 1)
-/*
- if (c == null) {
- map.put(w, 1)
- } else {
- map.put(w, c + 1)
- }
-*/
- }
- i = j
- while (i < s.length && s.charAt(i) == ' ') {
- i += 1
- }
- }
- }
- map.toIterator.map{case (k, v) => (k, v)}
- }
-}
-
-object WordCount2 {
-
- def warmup(sc: SparkContext) {
- (0 until 3).foreach {i =>
- sc.parallelize(1 to 20000000, 500)
- .map(x => (x % 337, x % 1331))
- .reduceByKey(_ + _, 100)
- .count()
- }
- }
-
- def main (args: Array[String]) {
-
- if (args.length != 6) {
- println ("Usage: WordCount2 <host> <file> <mapTasks> <reduceTasks> <batchMillis> <chkptMillis>")
- System.exit(1)
- }
-
- val Array(master, file, mapTasks, reduceTasks, batchMillis, chkptMillis) = args
-
- val batchDuration = Milliseconds(batchMillis.toLong)
-
- val ssc = new StreamingContext(master, "WordCount2")
- ssc.setBatchDuration(batchDuration)
-
- //warmup(ssc.sc)
-
- val data = ssc.sc.textFile(file, mapTasks.toInt).persist(
- new StorageLevel(false, true, false, 3)) // Memory only, serialized, 3 replicas
- println("Data count: " + data.map(x => if (x == "") 1 else x.split(" ").size / x.split(" ").size).count())
- println("Data count: " + data.count())
- println("Data count: " + data.count())
-
- val sentences = new ConstantInputDStream(ssc, data)
- ssc.registerInputStream(sentences)
-
- import WordCount2_ExtraFunctions._
-
- val windowedCounts = sentences
- .mapPartitions(splitAndCountPartitions)
- .reduceByKeyAndWindow(add _, subtract _, Seconds(30), batchDuration, reduceTasks.toInt)
-
- windowedCounts.persist().checkpoint(Milliseconds(chkptMillis.toLong))
- //.persist(StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY_2, Milliseconds(chkptMillis.toLong))
- windowedCounts.foreachRDD(r => println("Element count: " + r.count()))
-
- ssc.start()
-
- while(true) { Thread.sleep(1000) }
- }
-}
-
-
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
index d1ea9a9cd5..571428c0fe 100644
--- a/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
+++ b/streaming/src/main/scala/spark/streaming/examples/WordCountRaw.scala
@@ -1,50 +1,43 @@
package spark.streaming.examples
-import spark.util.IntParam
-import spark.SparkContext
-import spark.SparkContext._
import spark.storage.StorageLevel
+import spark.util.IntParam
+
import spark.streaming._
import spark.streaming.StreamingContext._
+import spark.streaming.util.RawTextHelper._
-import WordCount2_ExtraFunctions._
+import java.util.UUID
object WordCountRaw {
- def moreWarmup(sc: SparkContext) {
- (0 until 40).foreach {i =>
- sc.parallelize(1 to 20000000, 1000)
- .map(_ % 1331).map(_.toString)
- .mapPartitions(splitAndCountPartitions).reduceByKey(_ + _, 10)
- .collect()
- }
- }
def main(args: Array[String]) {
- if (args.length != 7) {
- System.err.println("Usage: WordCountRaw <master> <streams> <host> <port> <batchMs> <chkptMs> <reduces>")
+ if (args.length != 4) {
+ System.err.println("Usage: WordCountRaw <master> <# streams> <port> <HDFS checkpoint directory> ")
System.exit(1)
}
- val Array(master, IntParam(streams), host, IntParam(port), IntParam(batchMs),
- IntParam(chkptMs), IntParam(reduces)) = args
+ val Array(master, IntParam(numStreams), IntParam(port), checkpointDir) = args
- // Create the context and set the batch size
+ // Create the context, set the batch size and checkpoint directory.
+ // Checkpoint directory is necessary for achieving fault-tolerance, by saving counts
+ // periodically to HDFS
val ssc = new StreamingContext(master, "WordCountRaw")
- ssc.setBatchDuration(Milliseconds(batchMs))
-
- // Make sure some tasks have started on each node
- moreWarmup(ssc.sc)
-
- val rawStreams = (1 to streams).map(_ =>
- ssc.rawNetworkStream[String](host, port, StorageLevel.MEMORY_ONLY_2)).toArray
- val union = new UnionDStream(rawStreams)
-
- val windowedCounts = union.mapPartitions(splitAndCountPartitions)
- .reduceByKeyAndWindow(add _, subtract _, Seconds(30), Milliseconds(batchMs), reduces)
- windowedCounts.persist().checkpoint(chkptMs)
- //.persist(StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY_2, Milliseconds(chkptMs))
-
- windowedCounts.foreachRDD(r => println("Element count: " + r.count()))
+ ssc.setBatchDuration(Seconds(1))
+ ssc.checkpoint(checkpointDir + "/" + UUID.randomUUID.toString, Seconds(1))
+
+ // Warm up the JVMs on master and slave for JIT compilation to kick in
+ warmUp(ssc.sc)
+
+ // Set up the raw network streams that will connect to localhost:port to raw test
+ // senders on the slaves and generate count of words of last 30 seconds
+ val lines = (1 to numStreams).map(_ => {
+ ssc.rawNetworkStream[String]("localhost", port, StorageLevel.MEMORY_ONLY_SER_2)
+ })
+ val union = new UnionDStream(lines.toArray)
+ val counts = union.mapPartitions(splitAndCountPartitions)
+ val windowedCounts = counts.reduceByKeyAndWindow(add _, subtract _, Seconds(30), Seconds(1), 10)
+ windowedCounts.foreachRDD(r => println("# unique words = " + r.count()))
ssc.start()
}
diff --git a/streaming/src/main/scala/spark/streaming/examples/WordMax2.scala b/streaming/src/main/scala/spark/streaming/examples/WordMax2.scala
deleted file mode 100644
index 6a9c8a9a69..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/WordMax2.scala
+++ /dev/null
@@ -1,75 +0,0 @@
-package spark.streaming.examples
-
-import spark.SparkContext
-import SparkContext._
-import spark.streaming._
-import StreamingContext._
-
-import spark.storage.StorageLevel
-
-import scala.util.Sorting
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.Queue
-import scala.collection.JavaConversions.mapAsScalaMap
-
-import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
-
-
-object WordMax2 {
-
- def warmup(sc: SparkContext) {
- (0 until 10).foreach {i =>
- sc.parallelize(1 to 20000000, 1000)
- .map(x => (x % 337, x % 1331))
- .reduceByKey(_ + _)
- .count()
- }
- }
-
- def main (args: Array[String]) {
-
- if (args.length != 6) {
- println ("Usage: WordMax2 <host> <file> <mapTasks> <reduceTasks> <batchMillis> <chkptMillis>")
- System.exit(1)
- }
-
- val Array(master, file, mapTasks, reduceTasks, batchMillis, chkptMillis) = args
-
- val batchDuration = Milliseconds(batchMillis.toLong)
-
- val ssc = new StreamingContext(master, "WordMax2")
- ssc.setBatchDuration(batchDuration)
-
- //warmup(ssc.sc)
-
- val data = ssc.sc.textFile(file, mapTasks.toInt).persist(
- new StorageLevel(false, true, false, 3)) // Memory only, serialized, 3 replicas
- println("Data count: " + data.count())
- println("Data count: " + data.count())
- println("Data count: " + data.count())
-
- val sentences = new ConstantInputDStream(ssc, data)
- ssc.registerInputStream(sentences)
-
- import WordCount2_ExtraFunctions._
-
- val windowedCounts = sentences
- .mapPartitions(splitAndCountPartitions)
- .reduceByKey(add _, reduceTasks.toInt)
- .persist()
- .checkpoint(Milliseconds(chkptMillis.toLong))
- //.persist(StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY_2, Milliseconds(chkptMillis.toLong))
- .reduceByKeyAndWindow(max _, Seconds(10), batchDuration, reduceTasks.toInt)
- .persist()
- .checkpoint(Milliseconds(chkptMillis.toLong))
- //.persist(StorageLevel.MEMORY_ONLY, StorageLevel.MEMORY_ONLY_2, Milliseconds(chkptMillis.toLong))
- windowedCounts.foreachRDD(r => println("Element count: " + r.count()))
-
- ssc.start()
-
- while(true) { Thread.sleep(1000) }
- }
-}
-
-
diff --git a/streaming/src/main/scala/spark/streaming/util/ConnectionHandler.scala b/streaming/src/main/scala/spark/streaming/util/ConnectionHandler.scala
deleted file mode 100644
index cde868a0c9..0000000000
--- a/streaming/src/main/scala/spark/streaming/util/ConnectionHandler.scala
+++ /dev/null
@@ -1,157 +0,0 @@
-package spark.streaming.util
-
-import spark.Logging
-
-import scala.collection.mutable.{ArrayBuffer, SynchronizedQueue}
-
-import java.net._
-import java.io._
-import java.nio._
-import java.nio.charset._
-import java.nio.channels._
-import java.nio.channels.spi._
-
-abstract class ConnectionHandler(host: String, port: Int, connect: Boolean)
-extends Thread with Logging {
-
- val selector = SelectorProvider.provider.openSelector()
- val interestChangeRequests = new SynchronizedQueue[(SelectionKey, Int)]
-
- initLogging()
-
- override def run() {
- try {
- if (connect) {
- connect()
- } else {
- listen()
- }
-
- var interrupted = false
- while(!interrupted) {
-
- preSelect()
-
- while(!interestChangeRequests.isEmpty) {
- val (key, ops) = interestChangeRequests.dequeue
- val lastOps = key.interestOps()
- key.interestOps(ops)
-
- def intToOpStr(op: Int): String = {
- val opStrs = new ArrayBuffer[String]()
- if ((op & SelectionKey.OP_READ) != 0) opStrs += "READ"
- if ((op & SelectionKey.OP_WRITE) != 0) opStrs += "WRITE"
- if ((op & SelectionKey.OP_CONNECT) != 0) opStrs += "CONNECT"
- if ((op & SelectionKey.OP_ACCEPT) != 0) opStrs += "ACCEPT"
- if (opStrs.size > 0) opStrs.reduceLeft(_ + " | " + _) else " "
- }
-
- logTrace("Changed ops from [" + intToOpStr(lastOps) + "] to [" + intToOpStr(ops) + "]")
- }
-
- selector.select()
- interrupted = Thread.currentThread.isInterrupted
-
- val selectedKeys = selector.selectedKeys().iterator()
- while (selectedKeys.hasNext) {
- val key = selectedKeys.next.asInstanceOf[SelectionKey]
- selectedKeys.remove()
- if (key.isValid) {
- if (key.isAcceptable) {
- accept(key)
- } else if (key.isConnectable) {
- finishConnect(key)
- } else if (key.isReadable) {
- read(key)
- } else if (key.isWritable) {
- write(key)
- }
- }
- }
- }
- } catch {
- case e: Exception => {
- logError("Error in select loop", e)
- }
- }
- }
-
- def connect() {
- val socketAddress = new InetSocketAddress(host, port)
- val channel = SocketChannel.open()
- channel.configureBlocking(false)
- channel.socket.setReuseAddress(true)
- channel.socket.setTcpNoDelay(true)
- channel.connect(socketAddress)
- channel.register(selector, SelectionKey.OP_CONNECT)
- logInfo("Initiating connection to [" + socketAddress + "]")
- }
-
- def listen() {
- val channel = ServerSocketChannel.open()
- channel.configureBlocking(false)
- channel.socket.setReuseAddress(true)
- channel.socket.setReceiveBufferSize(256 * 1024)
- channel.socket.bind(new InetSocketAddress(port))
- channel.register(selector, SelectionKey.OP_ACCEPT)
- logInfo("Listening on port " + port)
- }
-
- def finishConnect(key: SelectionKey) {
- try {
- val channel = key.channel.asInstanceOf[SocketChannel]
- val address = channel.socket.getRemoteSocketAddress
- channel.finishConnect()
- logInfo("Connected to [" + host + ":" + port + "]")
- ready(key)
- } catch {
- case e: IOException => {
- logError("Error finishing connect to " + host + ":" + port)
- close(key)
- }
- }
- }
-
- def accept(key: SelectionKey) {
- try {
- val serverChannel = key.channel.asInstanceOf[ServerSocketChannel]
- val channel = serverChannel.accept()
- val address = channel.socket.getRemoteSocketAddress
- channel.configureBlocking(false)
- logInfo("Accepted connection from [" + address + "]")
- ready(channel.register(selector, 0))
- } catch {
- case e: IOException => {
- logError("Error accepting connection", e)
- }
- }
- }
-
- def changeInterest(key: SelectionKey, ops: Int) {
- logTrace("Added request to change ops to " + ops)
- interestChangeRequests += ((key, ops))
- }
-
- def ready(key: SelectionKey)
-
- def preSelect() {
- }
-
- def read(key: SelectionKey) {
- throw new UnsupportedOperationException("Cannot read on connection of type " + this.getClass.toString)
- }
-
- def write(key: SelectionKey) {
- throw new UnsupportedOperationException("Cannot write on connection of type " + this.getClass.toString)
- }
-
- def close(key: SelectionKey) {
- try {
- key.channel.close()
- key.cancel()
- Thread.currentThread.interrupt
- } catch {
- case e: Exception => logError("Error closing connection", e)
- }
- }
-}
diff --git a/streaming/src/main/scala/spark/streaming/util/RawTextHelper.scala b/streaming/src/main/scala/spark/streaming/util/RawTextHelper.scala
new file mode 100644
index 0000000000..f31ae39a16
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/util/RawTextHelper.scala
@@ -0,0 +1,98 @@
+package spark.streaming.util
+
+import spark.SparkContext
+import spark.SparkContext._
+import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
+import scala.collection.JavaConversions.mapAsScalaMap
+
+object RawTextHelper {
+
+ /**
+ * Splits lines and counts the words in them using specialized object-to-long hashmap
+ * (to avoid boxing-unboxing overhead of Long in java/scala HashMap)
+ */
+ def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, Long)] = {
+ val map = new OLMap[String]
+ var i = 0
+ var j = 0
+ while (iter.hasNext) {
+ val s = iter.next()
+ i = 0
+ while (i < s.length) {
+ j = i
+ while (j < s.length && s.charAt(j) != ' ') {
+ j += 1
+ }
+ if (j > i) {
+ val w = s.substring(i, j)
+ val c = map.getLong(w)
+ map.put(w, c + 1)
+ }
+ i = j
+ while (i < s.length && s.charAt(i) == ' ') {
+ i += 1
+ }
+ }
+ }
+ map.toIterator.map{case (k, v) => (k, v)}
+ }
+
+ /**
+ * Gets the top k words in terms of word counts. Assumes that each word exists only once
+ * in the `data` iterator (that is, the counts have been reduced).
+ */
+ def topK(data: Iterator[(String, Long)], k: Int): Iterator[(String, Long)] = {
+ val taken = new Array[(String, Long)](k)
+
+ var i = 0
+ var len = 0
+ var done = false
+ var value: (String, Long) = null
+ var swap: (String, Long) = null
+ var count = 0
+
+ while(data.hasNext) {
+ value = data.next
+ if (value != null) {
+ count += 1
+ if (len == 0) {
+ taken(0) = value
+ len = 1
+ } else if (len < k || value._2 > taken(len - 1)._2) {
+ if (len < k) {
+ len += 1
+ }
+ taken(len - 1) = value
+ i = len - 1
+ while(i > 0 && taken(i - 1)._2 < taken(i)._2) {
+ swap = taken(i)
+ taken(i) = taken(i-1)
+ taken(i - 1) = swap
+ i -= 1
+ }
+ }
+ }
+ }
+ return taken.toIterator
+ }
+
+ /**
+ * Warms up the SparkContext in master and slave by running tasks to force JIT kick in
+ * before real workload starts.
+ */
+ def warmUp(sc: SparkContext) {
+ for(i <- 0 to 4) {
+ sc.parallelize(1 to 200000, 1000)
+ .map(_ % 1331).map(_.toString)
+ .mapPartitions(splitAndCountPartitions).reduceByKey(_ + _, 10)
+ .count()
+ }
+ }
+
+ def add(v1: Long, v2: Long) = (v1 + v2)
+
+ def subtract(v1: Long, v2: Long) = (v1 - v2)
+
+ def max(v1: Long, v2: Long) = math.max(v1, v2)
+}
+
diff --git a/streaming/src/main/scala/spark/streaming/util/SenderReceiverTest.scala b/streaming/src/main/scala/spark/streaming/util/SenderReceiverTest.scala
deleted file mode 100644
index 3922dfbad6..0000000000
--- a/streaming/src/main/scala/spark/streaming/util/SenderReceiverTest.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-package spark.streaming.util
-
-import java.net.{Socket, ServerSocket}
-import java.io.{ByteArrayOutputStream, DataOutputStream, DataInputStream, BufferedInputStream}
-
-object Receiver {
- def main(args: Array[String]) {
- val port = args(0).toInt
- val lsocket = new ServerSocket(port)
- println("Listening on port " + port )
- while(true) {
- val socket = lsocket.accept()
- (new Thread() {
- override def run() {
- val buffer = new Array[Byte](100000)
- var count = 0
- val time = System.currentTimeMillis
- try {
- val is = new DataInputStream(new BufferedInputStream(socket.getInputStream))
- var loop = true
- var string: String = null
- do {
- string = is.readUTF()
- if (string != null) {
- count += 28
- }
- } while (string != null)
- } catch {
- case e: Exception => e.printStackTrace()
- }
- val timeTaken = System.currentTimeMillis - time
- val tput = (count / 1024.0) / (timeTaken / 1000.0)
- println("Data = " + count + " bytes\nTime = " + timeTaken + " ms\nTput = " + tput + " KB/s")
- }
- }).start()
- }
- }
-
-}
-
-object Sender {
-
- def main(args: Array[String]) {
- try {
- val host = args(0)
- val port = args(1).toInt
- val size = args(2).toInt
-
- val byteStream = new ByteArrayOutputStream()
- val stringDataStream = new DataOutputStream(byteStream)
- (0 until size).foreach(_ => stringDataStream.writeUTF("abcdedfghijklmnopqrstuvwxy"))
- val bytes = byteStream.toByteArray()
- println("Generated array of " + bytes.length + " bytes")
-
- /*val bytes = new Array[Byte](size)*/
- val socket = new Socket(host, port)
- val os = socket.getOutputStream
- os.write(bytes)
- os.flush
- socket.close()
-
- } catch {
- case e: Exception => e.printStackTrace
- }
- }
-}
-
diff --git a/streaming/src/main/scala/spark/streaming/util/SentenceFileGenerator.scala b/streaming/src/main/scala/spark/streaming/util/SentenceFileGenerator.scala
deleted file mode 100644
index 94e8f7a849..0000000000
--- a/streaming/src/main/scala/spark/streaming/util/SentenceFileGenerator.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-package spark.streaming.util
-
-import spark._
-
-import scala.collection.mutable.ArrayBuffer
-import scala.util.Random
-import scala.io.Source
-
-import java.net.InetSocketAddress
-
-import org.apache.hadoop.fs._
-import org.apache.hadoop.conf._
-import org.apache.hadoop.io._
-import org.apache.hadoop.mapred._
-import org.apache.hadoop.util._
-
-object SentenceFileGenerator {
-
- def printUsage () {
- println ("Usage: SentenceFileGenerator <master> <target directory> <# partitions> <sentence file> [<sentences per second>]")
- System.exit(0)
- }
-
- def main (args: Array[String]) {
- if (args.length < 4) {
- printUsage
- }
-
- val master = args(0)
- val fs = new Path(args(1)).getFileSystem(new Configuration())
- val targetDirectory = new Path(args(1)).makeQualified(fs)
- val numPartitions = args(2).toInt
- val sentenceFile = args(3)
- val sentencesPerSecond = {
- if (args.length > 4) args(4).toInt
- else 10
- }
-
- val source = Source.fromFile(sentenceFile)
- val lines = source.mkString.split ("\n").toArray
- source.close ()
- println("Read " + lines.length + " lines from file " + sentenceFile)
-
- val sentences = {
- val buffer = ArrayBuffer[String]()
- val random = new Random()
- var i = 0
- while (i < sentencesPerSecond) {
- buffer += lines(random.nextInt(lines.length))
- i += 1
- }
- buffer.toArray
- }
- println("Generated " + sentences.length + " sentences")
-
- val sc = new SparkContext(master, "SentenceFileGenerator")
- val sentencesRDD = sc.parallelize(sentences, numPartitions)
-
- val tempDirectory = new Path(targetDirectory, "_tmp")
-
- fs.mkdirs(targetDirectory)
- fs.mkdirs(tempDirectory)
-
- var saveTimeMillis = System.currentTimeMillis
- try {
- while (true) {
- val newDir = new Path(targetDirectory, "Sentences-" + saveTimeMillis)
- val tmpNewDir = new Path(tempDirectory, "Sentences-" + saveTimeMillis)
- println("Writing to file " + newDir)
- sentencesRDD.saveAsTextFile(tmpNewDir.toString)
- fs.rename(tmpNewDir, newDir)
- saveTimeMillis += 1000
- val sleepTimeMillis = {
- val currentTimeMillis = System.currentTimeMillis
- if (saveTimeMillis < currentTimeMillis) {
- 0
- } else {
- saveTimeMillis - currentTimeMillis
- }
- }
- println("Sleeping for " + sleepTimeMillis + " ms")
- Thread.sleep(sleepTimeMillis)
- }
- } catch {
- case e: Exception =>
- }
- }
-}
-
-
-
-
diff --git a/streaming/src/main/scala/spark/streaming/util/ShuffleTest.scala b/streaming/src/main/scala/spark/streaming/util/ShuffleTest.scala
deleted file mode 100644
index 60085f4f88..0000000000
--- a/streaming/src/main/scala/spark/streaming/util/ShuffleTest.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-package spark.streaming.util
-
-import spark.SparkContext
-import SparkContext._
-
-object ShuffleTest {
- def main(args: Array[String]) {
-
- if (args.length < 1) {
- println ("Usage: ShuffleTest <host>")
- System.exit(1)
- }
-
- val sc = new spark.SparkContext(args(0), "ShuffleTest")
- val rdd = sc.parallelize(1 to 1000, 500).cache
-
- def time(f: => Unit) { val start = System.nanoTime; f; println((System.nanoTime - start) * 1.0e-6) }
-
- time { for (i <- 0 until 50) time { rdd.map(x => (x % 100, x)).reduceByKey(_ + _, 10).count } }
- System.exit(0)
- }
-}
-
diff --git a/streaming/src/main/scala/spark/streaming/util/TestGenerator.scala b/streaming/src/main/scala/spark/streaming/util/TestGenerator.scala
deleted file mode 100644
index 23e9235c60..0000000000
--- a/streaming/src/main/scala/spark/streaming/util/TestGenerator.scala
+++ /dev/null
@@ -1,107 +0,0 @@
-package spark.streaming.util
-
-import scala.util.Random
-import scala.io.Source
-import scala.actors._
-import scala.actors.Actor._
-import scala.actors.remote._
-import scala.actors.remote.RemoteActor._
-
-import java.net.InetSocketAddress
-
-
-object TestGenerator {
-
- def printUsage {
- println ("Usage: SentenceGenerator <target IP> <target port> <sentence file> [<sentences per second>]")
- System.exit(0)
- }
- /*
- def generateRandomSentences(lines: Array[String], sentencesPerSecond: Int, streamReceiver: AbstractActor) {
- val sleepBetweenSentences = 1000.0 / sentencesPerSecond.toDouble - 1
- val random = new Random ()
-
- try {
- var lastPrintTime = System.currentTimeMillis()
- var count = 0
- while(true) {
- streamReceiver ! lines(random.nextInt(lines.length))
- count += 1
- if (System.currentTimeMillis - lastPrintTime >= 1000) {
- println (count + " sentences sent last second")
- count = 0
- lastPrintTime = System.currentTimeMillis
- }
- Thread.sleep(sleepBetweenSentences.toLong)
- }
- } catch {
- case e: Exception =>
- }
- }*/
-
- def generateSameSentences(lines: Array[String], sentencesPerSecond: Int, streamReceiver: AbstractActor) {
- try {
- val numSentences = if (sentencesPerSecond <= 0) {
- lines.length
- } else {
- sentencesPerSecond
- }
- val sentences = lines.take(numSentences).toArray
-
- var nextSendingTime = System.currentTimeMillis()
- val sendAsArray = true
- while(true) {
- if (sendAsArray) {
- println("Sending as array")
- streamReceiver !? sentences
- } else {
- println("Sending individually")
- sentences.foreach(sentence => {
- streamReceiver !? sentence
- })
- }
- println ("Sent " + numSentences + " sentences in " + (System.currentTimeMillis - nextSendingTime) + " ms")
- nextSendingTime += 1000
- val sleepTime = nextSendingTime - System.currentTimeMillis
- if (sleepTime > 0) {
- println ("Sleeping for " + sleepTime + " ms")
- Thread.sleep(sleepTime)
- }
- }
- } catch {
- case e: Exception =>
- }
- }
-
- def main(args: Array[String]) {
- if (args.length < 3) {
- printUsage
- }
-
- val generateRandomly = false
-
- val streamReceiverIP = args(0)
- val streamReceiverPort = args(1).toInt
- val sentenceFile = args(2)
- val sentencesPerSecond = if (args.length > 3) args(3).toInt else 10
- val sentenceInputName = if (args.length > 4) args(4) else "Sentences"
-
- println("Sending " + sentencesPerSecond + " sentences per second to " +
- streamReceiverIP + ":" + streamReceiverPort + "/NetworkStreamReceiver-" + sentenceInputName)
- val source = Source.fromFile(sentenceFile)
- val lines = source.mkString.split ("\n")
- source.close ()
-
- val streamReceiver = select(
- Node(streamReceiverIP, streamReceiverPort),
- Symbol("NetworkStreamReceiver-" + sentenceInputName))
- if (generateRandomly) {
- /*generateRandomSentences(lines, sentencesPerSecond, streamReceiver)*/
- } else {
- generateSameSentences(lines, sentencesPerSecond, streamReceiver)
- }
- }
-}
-
-
-
diff --git a/streaming/src/main/scala/spark/streaming/util/TestGenerator2.scala b/streaming/src/main/scala/spark/streaming/util/TestGenerator2.scala
deleted file mode 100644
index ff840d084f..0000000000
--- a/streaming/src/main/scala/spark/streaming/util/TestGenerator2.scala
+++ /dev/null
@@ -1,119 +0,0 @@
-package spark.streaming.util
-
-import scala.util.Random
-import scala.io.Source
-import scala.actors._
-import scala.actors.Actor._
-import scala.actors.remote._
-import scala.actors.remote.RemoteActor._
-
-import java.io.{DataOutputStream, ByteArrayOutputStream, DataInputStream}
-import java.net.Socket
-
-object TestGenerator2 {
-
- def printUsage {
- println ("Usage: SentenceGenerator <target IP> <target port> <sentence file> [<sentences per second>]")
- System.exit(0)
- }
-
- def sendSentences(streamReceiverHost: String, streamReceiverPort: Int, numSentences: Int, bytes: Array[Byte], intervalTime: Long){
- try {
- println("Connecting to " + streamReceiverHost + ":" + streamReceiverPort)
- val socket = new Socket(streamReceiverHost, streamReceiverPort)
-
- println("Sending " + numSentences+ " sentences / " + (bytes.length / 1024.0 / 1024.0) + " MB per " + intervalTime + " ms to " + streamReceiverHost + ":" + streamReceiverPort )
- val currentTime = System.currentTimeMillis
- var targetTime = (currentTime / intervalTime + 1).toLong * intervalTime
- Thread.sleep(targetTime - currentTime)
-
- while(true) {
- val startTime = System.currentTimeMillis()
- println("Sending at " + startTime + " ms with delay of " + (startTime - targetTime) + " ms")
- val socketOutputStream = socket.getOutputStream
- val parts = 10
- (0 until parts).foreach(i => {
- val partStartTime = System.currentTimeMillis
-
- val offset = (i * bytes.length / parts).toInt
- val len = math.min(((i + 1) * bytes.length / parts).toInt - offset, bytes.length)
- socketOutputStream.write(bytes, offset, len)
- socketOutputStream.flush()
- val partFinishTime = System.currentTimeMillis
- println("Sending part " + i + " of " + len + " bytes took " + (partFinishTime - partStartTime) + " ms")
- val sleepTime = math.max(0, 1000 / parts - (partFinishTime - partStartTime) - 1)
- Thread.sleep(sleepTime)
- })
-
- socketOutputStream.flush()
- /*val socketInputStream = new DataInputStream(socket.getInputStream)*/
- /*val reply = socketInputStream.readUTF()*/
- val finishTime = System.currentTimeMillis()
- println ("Sent " + bytes.length + " bytes in " + (finishTime - startTime) + " ms for interval [" + targetTime + ", " + (targetTime + intervalTime) + "]")
- /*println("Received = " + reply)*/
- targetTime = targetTime + intervalTime
- val sleepTime = (targetTime - finishTime) + 10
- if (sleepTime > 0) {
- println("Sleeping for " + sleepTime + " ms")
- Thread.sleep(sleepTime)
- } else {
- println("############################")
- println("###### Skipping sleep ######")
- println("############################")
- }
- }
- } catch {
- case e: Exception => println(e)
- }
- println("Stopped sending")
- }
-
- def main(args: Array[String]) {
- if (args.length < 4) {
- printUsage
- }
-
- val streamReceiverHost = args(0)
- val streamReceiverPort = args(1).toInt
- val sentenceFile = args(2)
- val intervalTime = args(3).toLong
- val sentencesPerInterval = if (args.length > 4) args(4).toInt else 0
-
- println("Reading the file " + sentenceFile)
- val source = Source.fromFile(sentenceFile)
- val lines = source.mkString.split ("\n")
- source.close()
-
- val numSentences = if (sentencesPerInterval <= 0) {
- lines.length
- } else {
- sentencesPerInterval
- }
-
- println("Generating sentences")
- val sentences: Array[String] = if (numSentences <= lines.length) {
- lines.take(numSentences).toArray
- } else {
- (0 until numSentences).map(i => lines(i % lines.length)).toArray
- }
-
- println("Converting to byte array")
- val byteStream = new ByteArrayOutputStream()
- val stringDataStream = new DataOutputStream(byteStream)
- /*stringDataStream.writeInt(sentences.size)*/
- sentences.foreach(stringDataStream.writeUTF)
- val bytes = byteStream.toByteArray()
- stringDataStream.close()
- println("Generated array of " + bytes.length + " bytes")
-
- /*while(true) { */
- sendSentences(streamReceiverHost, streamReceiverPort, numSentences, bytes, intervalTime)
- /*println("Sleeping for 5 seconds")*/
- /*Thread.sleep(5000)*/
- /*System.gc()*/
- /*}*/
- }
-}
-
-
-
diff --git a/streaming/src/main/scala/spark/streaming/util/TestGenerator4.scala b/streaming/src/main/scala/spark/streaming/util/TestGenerator4.scala
deleted file mode 100644
index 9c39ef3e12..0000000000
--- a/streaming/src/main/scala/spark/streaming/util/TestGenerator4.scala
+++ /dev/null
@@ -1,244 +0,0 @@
-package spark.streaming.util
-
-import spark.Logging
-
-import scala.util.Random
-import scala.io.Source
-import scala.collection.mutable.{ArrayBuffer, Queue}
-
-import java.net._
-import java.io._
-import java.nio._
-import java.nio.charset._
-import java.nio.channels._
-
-import it.unimi.dsi.fastutil.io._
-
-class TestGenerator4(targetHost: String, targetPort: Int, sentenceFile: String, intervalDuration: Long, sentencesPerInterval: Int)
-extends Logging {
-
- class SendingConnectionHandler(host: String, port: Int, generator: TestGenerator4)
- extends ConnectionHandler(host, port, true) {
-
- val buffers = new ArrayBuffer[ByteBuffer]
- val newBuffers = new Queue[ByteBuffer]
- var activeKey: SelectionKey = null
-
- def send(buffer: ByteBuffer) {
- logDebug("Sending: " + buffer)
- newBuffers.synchronized {
- newBuffers.enqueue(buffer)
- }
- selector.wakeup()
- buffer.synchronized {
- buffer.wait()
- }
- }
-
- override def ready(key: SelectionKey) {
- logDebug("Ready")
- activeKey = key
- val channel = key.channel.asInstanceOf[SocketChannel]
- channel.register(selector, SelectionKey.OP_WRITE)
- generator.startSending()
- }
-
- override def preSelect() {
- newBuffers.synchronized {
- while(!newBuffers.isEmpty) {
- val buffer = newBuffers.dequeue
- buffers += buffer
- logDebug("Added: " + buffer)
- changeInterest(activeKey, SelectionKey.OP_WRITE)
- }
- }
- }
-
- override def write(key: SelectionKey) {
- try {
- /*while(true) {*/
- val channel = key.channel.asInstanceOf[SocketChannel]
- if (buffers.size > 0) {
- val buffer = buffers(0)
- val newBuffer = buffer.slice()
- newBuffer.limit(math.min(newBuffer.remaining, 32768))
- val bytesWritten = channel.write(newBuffer)
- buffer.position(buffer.position + bytesWritten)
- if (bytesWritten == 0) return
- if (buffer.remaining == 0) {
- buffers -= buffer
- buffer.synchronized {
- buffer.notify()
- }
- }
- /*changeInterest(key, SelectionKey.OP_WRITE)*/
- } else {
- changeInterest(key, 0)
- }
- /*}*/
- } catch {
- case e: IOException => {
- if (e.toString.contains("pipe") || e.toString.contains("reset")) {
- logError("Connection broken")
- } else {
- logError("Connection error", e)
- }
- close(key)
- }
- }
- }
-
- override def close(key: SelectionKey) {
- buffers.clear()
- super.close(key)
- }
- }
-
- initLogging()
-
- val connectionHandler = new SendingConnectionHandler(targetHost, targetPort, this)
- var sendingThread: Thread = null
- var sendCount = 0
- val sendBatches = 5
-
- def run() {
- logInfo("Connection handler started")
- connectionHandler.start()
- connectionHandler.join()
- if (sendingThread != null && !sendingThread.isInterrupted) {
- sendingThread.interrupt
- }
- logInfo("Connection handler stopped")
- }
-
- def startSending() {
- sendingThread = new Thread() {
- override def run() {
- logInfo("STARTING TO SEND")
- sendSentences()
- logInfo("SENDING STOPPED AFTER " + sendCount)
- connectionHandler.interrupt()
- }
- }
- sendingThread.start()
- }
-
- def stopSending() {
- sendingThread.interrupt()
- }
-
- def sendSentences() {
- logInfo("Reading the file " + sentenceFile)
- val source = Source.fromFile(sentenceFile)
- val lines = source.mkString.split ("\n")
- source.close()
-
- val numSentences = if (sentencesPerInterval <= 0) {
- lines.length
- } else {
- sentencesPerInterval
- }
-
- logInfo("Generating sentence buffer")
- val sentences: Array[String] = if (numSentences <= lines.length) {
- lines.take(numSentences).toArray
- } else {
- (0 until numSentences).map(i => lines(i % lines.length)).toArray
- }
-
- /*
- val sentences: Array[String] = if (numSentences <= lines.length) {
- lines.take((numSentences / sendBatches).toInt).toArray
- } else {
- (0 until (numSentences/sendBatches)).map(i => lines(i % lines.length)).toArray
- }*/
-
-
- val serializer = new spark.KryoSerializer().newInstance()
- val byteStream = new FastByteArrayOutputStream(100 * 1024 * 1024)
- serializer.serializeStream(byteStream).writeAll(sentences.toIterator.asInstanceOf[Iterator[Any]]).close()
- byteStream.trim()
- val sentenceBuffer = ByteBuffer.wrap(byteStream.array)
-
- logInfo("Sending " + numSentences+ " sentences / " + sentenceBuffer.limit + " bytes per " + intervalDuration + " ms to " + targetHost + ":" + targetPort )
- val currentTime = System.currentTimeMillis
- var targetTime = (currentTime / intervalDuration + 1).toLong * intervalDuration
- Thread.sleep(targetTime - currentTime)
-
- val totalBytes = sentenceBuffer.limit
-
- while(true) {
- val batchesInCurrentInterval = sendBatches // if (sendCount < 10) 1 else sendBatches
-
- val startTime = System.currentTimeMillis()
- logDebug("Sending # " + sendCount + " at " + startTime + " ms with delay of " + (startTime - targetTime) + " ms")
-
- (0 until batchesInCurrentInterval).foreach(i => {
- try {
- val position = (i * totalBytes / sendBatches).toInt
- val limit = if (i == sendBatches - 1) {
- totalBytes
- } else {
- ((i + 1) * totalBytes / sendBatches).toInt - 1
- }
-
- val partStartTime = System.currentTimeMillis
- sentenceBuffer.limit(limit)
- connectionHandler.send(sentenceBuffer)
- val partFinishTime = System.currentTimeMillis
- val sleepTime = math.max(0, intervalDuration / sendBatches - (partFinishTime - partStartTime) - 1)
- Thread.sleep(sleepTime)
-
- } catch {
- case ie: InterruptedException => return
- case e: Exception => e.printStackTrace()
- }
- })
- sentenceBuffer.rewind()
-
- val finishTime = System.currentTimeMillis()
- /*logInfo ("Sent " + sentenceBuffer.limit + " bytes in " + (finishTime - startTime) + " ms")*/
- targetTime = targetTime + intervalDuration //+ (if (sendCount < 3) 1000 else 0)
-
- val sleepTime = (targetTime - finishTime) + 20
- if (sleepTime > 0) {
- logInfo("Sleeping for " + sleepTime + " ms")
- Thread.sleep(sleepTime)
- } else {
- logInfo("###### Skipping sleep ######")
- }
- if (Thread.currentThread.isInterrupted) {
- return
- }
- sendCount += 1
- }
- }
-}
-
-object TestGenerator4 {
- def printUsage {
- println("Usage: TestGenerator4 <target IP> <target port> <sentence file> <interval duration> [<sentences per second>]")
- System.exit(0)
- }
-
- def main(args: Array[String]) {
- println("GENERATOR STARTED")
- if (args.length < 4) {
- printUsage
- }
-
-
- val streamReceiverHost = args(0)
- val streamReceiverPort = args(1).toInt
- val sentenceFile = args(2)
- val intervalDuration = args(3).toLong
- val sentencesPerInterval = if (args.length > 4) args(4).toInt else 0
-
- while(true) {
- val generator = new TestGenerator4(streamReceiverHost, streamReceiverPort, sentenceFile, intervalDuration, sentencesPerInterval)
- generator.run()
- Thread.sleep(2000)
- }
- println("GENERATOR STOPPED")
- }
-}
diff --git a/streaming/src/main/scala/spark/streaming/util/TestStreamCoordinator.scala b/streaming/src/main/scala/spark/streaming/util/TestStreamCoordinator.scala
deleted file mode 100644
index f584f772bb..0000000000
--- a/streaming/src/main/scala/spark/streaming/util/TestStreamCoordinator.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-package spark.streaming.util
-
-import spark.streaming._
-import spark.Logging
-
-import akka.actor._
-import akka.actor.Actor
-import akka.actor.Actor._
-
-sealed trait TestStreamCoordinatorMessage
-case class GetStreamDetails extends TestStreamCoordinatorMessage
-case class GotStreamDetails(name: String, duration: Long) extends TestStreamCoordinatorMessage
-case class TestStarted extends TestStreamCoordinatorMessage
-
-class TestStreamCoordinator(streamDetails: Array[(String, Long)]) extends Actor with Logging {
-
- var index = 0
-
- initLogging()
-
- logInfo("Created")
-
- def receive = {
- case TestStarted => {
- sender ! "OK"
- }
-
- case GetStreamDetails => {
- val streamDetail = if (index >= streamDetails.length) null else streamDetails(index)
- sender ! GotStreamDetails(streamDetail._1, streamDetail._2)
- index += 1
- if (streamDetail != null) {
- logInfo("Allocated " + streamDetail._1 + " (" + index + "/" + streamDetails.length + ")" )
- }
- }
- }
-
-}
-
diff --git a/streaming/src/main/scala/spark/streaming/util/TestStreamReceiver3.scala b/streaming/src/main/scala/spark/streaming/util/TestStreamReceiver3.scala
deleted file mode 100644
index 80ad924dd8..0000000000
--- a/streaming/src/main/scala/spark/streaming/util/TestStreamReceiver3.scala
+++ /dev/null
@@ -1,421 +0,0 @@
-package spark.streaming.util
-
-import spark._
-import spark.storage._
-import spark.util.AkkaUtils
-import spark.streaming._
-
-import scala.math._
-import scala.collection.mutable.{Queue, HashMap, ArrayBuffer, SynchronizedMap}
-
-import akka.actor._
-import akka.actor.Actor
-import akka.dispatch._
-import akka.pattern.ask
-import akka.util.duration._
-
-import java.io.DataInputStream
-import java.io.BufferedInputStream
-import java.net.Socket
-import java.net.ServerSocket
-import java.util.LinkedHashMap
-
-import org.apache.hadoop.fs._
-import org.apache.hadoop.conf._
-import org.apache.hadoop.io._
-import org.apache.hadoop.mapred._
-import org.apache.hadoop.util._
-
-import spark.Utils
-
-
-class TestStreamReceiver3(actorSystem: ActorSystem, blockManager: BlockManager)
-extends Thread with Logging {
-
-
- class DataHandler(
- inputName: String,
- longIntervalDuration: Time,
- shortIntervalDuration: Time,
- blockManager: BlockManager
- )
- extends Logging {
-
- class Block(var id: String, var shortInterval: Interval) {
- val data = ArrayBuffer[String]()
- var pushed = false
- def longInterval = getLongInterval(shortInterval)
- def empty() = (data.size == 0)
- def += (str: String) = (data += str)
- override def toString() = "Block " + id
- }
-
- class Bucket(val longInterval: Interval) {
- val blocks = new ArrayBuffer[Block]()
- var filled = false
- def += (block: Block) = blocks += block
- def empty() = (blocks.size == 0)
- def ready() = (filled && !blocks.exists(! _.pushed))
- def blockIds() = blocks.map(_.id).toArray
- override def toString() = "Bucket [" + longInterval + ", " + blocks.size + " blocks]"
- }
-
- initLogging()
-
- val shortIntervalDurationMillis = shortIntervalDuration.toLong
- val longIntervalDurationMillis = longIntervalDuration.toLong
-
- var currentBlock: Block = null
- var currentBucket: Bucket = null
-
- val blocksForPushing = new Queue[Block]()
- val buckets = new HashMap[Interval, Bucket]() with SynchronizedMap[Interval, Bucket]
-
- val blockUpdatingThread = new Thread() { override def run() { keepUpdatingCurrentBlock() } }
- val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
-
- def start() {
- blockUpdatingThread.start()
- blockPushingThread.start()
- }
-
- def += (data: String) = addData(data)
-
- def addData(data: String) {
- if (currentBlock == null) {
- updateCurrentBlock()
- }
- currentBlock.synchronized {
- currentBlock += data
- }
- }
-
- def getShortInterval(time: Time): Interval = {
- val intervalBegin = time.floor(shortIntervalDuration)
- Interval(intervalBegin, intervalBegin + shortIntervalDuration)
- }
-
- def getLongInterval(shortInterval: Interval): Interval = {
- val intervalBegin = shortInterval.beginTime.floor(longIntervalDuration)
- Interval(intervalBegin, intervalBegin + longIntervalDuration)
- }
-
- def updateCurrentBlock() {
- /*logInfo("Updating current block")*/
- val currentTime = Time(System.currentTimeMillis)
- val shortInterval = getShortInterval(currentTime)
- val longInterval = getLongInterval(shortInterval)
-
- def createBlock(reuseCurrentBlock: Boolean = false) {
- val newBlockId = inputName + "-" + longInterval.toFormattedString + "-" + currentBucket.blocks.size
- if (!reuseCurrentBlock) {
- val newBlock = new Block(newBlockId, shortInterval)
- /*logInfo("Created " + currentBlock)*/
- currentBlock = newBlock
- } else {
- currentBlock.shortInterval = shortInterval
- currentBlock.id = newBlockId
- }
- }
-
- def createBucket() {
- val newBucket = new Bucket(longInterval)
- buckets += ((longInterval, newBucket))
- currentBucket = newBucket
- /*logInfo("Created " + currentBucket + ", " + buckets.size + " buckets")*/
- }
-
- if (currentBlock == null || currentBucket == null) {
- createBucket()
- currentBucket.synchronized {
- createBlock()
- }
- return
- }
-
- currentBlock.synchronized {
- var reuseCurrentBlock = false
-
- if (shortInterval != currentBlock.shortInterval) {
- if (!currentBlock.empty) {
- blocksForPushing.synchronized {
- blocksForPushing += currentBlock
- blocksForPushing.notifyAll()
- }
- }
-
- currentBucket.synchronized {
- if (currentBlock.empty) {
- reuseCurrentBlock = true
- } else {
- currentBucket += currentBlock
- }
-
- if (longInterval != currentBucket.longInterval) {
- currentBucket.filled = true
- if (currentBucket.ready) {
- currentBucket.notifyAll()
- }
- createBucket()
- }
- }
-
- createBlock(reuseCurrentBlock)
- }
- }
- }
-
- def pushBlock(block: Block) {
- try{
- if (blockManager != null) {
- logInfo("Pushing block")
- val startTime = System.currentTimeMillis
-
- val bytes = blockManager.dataSerialize("rdd_", block.data.toIterator) // TODO: Will this be an RDD block?
- val finishTime = System.currentTimeMillis
- logInfo(block + " serialization delay is " + (finishTime - startTime) / 1000.0 + " s")
-
- blockManager.putBytes(block.id.toString, bytes, StorageLevel.MEMORY_AND_DISK_SER_2)
- /*blockManager.putBytes(block.id.toString, bytes, StorageLevel.DISK_AND_MEMORY_DESER_2)*/
- /*blockManager.put(block.id.toString, block.data.toIterator, StorageLevel.DISK_AND_MEMORY_DESER)*/
- /*blockManager.put(block.id.toString, block.data.toIterator, StorageLevel.DISK_AND_MEMORY)*/
- val finishTime1 = System.currentTimeMillis
- logInfo(block + " put delay is " + (finishTime1 - startTime) / 1000.0 + " s")
- } else {
- logWarning(block + " not put as block manager is null")
- }
- } catch {
- case e: Exception => logError("Exception writing " + block + " to blockmanager" , e)
- }
- }
-
- def getBucket(longInterval: Interval): Option[Bucket] = {
- buckets.get(longInterval)
- }
-
- def clearBucket(longInterval: Interval) {
- buckets.remove(longInterval)
- }
-
- def keepUpdatingCurrentBlock() {
- logInfo("Thread to update current block started")
- while(true) {
- updateCurrentBlock()
- val currentTimeMillis = System.currentTimeMillis
- val sleepTimeMillis = (currentTimeMillis / shortIntervalDurationMillis + 1) *
- shortIntervalDurationMillis - currentTimeMillis + 1
- Thread.sleep(sleepTimeMillis)
- }
- }
-
- def keepPushingBlocks() {
- var loop = true
- logInfo("Thread to push blocks started")
- while(loop) {
- val block = blocksForPushing.synchronized {
- if (blocksForPushing.size == 0) {
- blocksForPushing.wait()
- }
- blocksForPushing.dequeue
- }
- pushBlock(block)
- block.pushed = true
- block.data.clear()
-
- val bucket = buckets(block.longInterval)
- bucket.synchronized {
- if (bucket.ready) {
- bucket.notifyAll()
- }
- }
- }
- }
- }
-
-
- class ConnectionListener(port: Int, dataHandler: DataHandler)
- extends Thread with Logging {
- initLogging()
- override def run {
- try {
- val listener = new ServerSocket(port)
- logInfo("Listening on port " + port)
- while (true) {
- new ConnectionHandler(listener.accept(), dataHandler).start();
- }
- listener.close()
- } catch {
- case e: Exception => logError("", e);
- }
- }
- }
-
- class ConnectionHandler(socket: Socket, dataHandler: DataHandler) extends Thread with Logging {
- initLogging()
- override def run {
- logInfo("New connection from " + socket.getInetAddress() + ":" + socket.getPort)
- val bytes = new Array[Byte](100 * 1024 * 1024)
- try {
-
- val inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream, 1024 * 1024))
- /*val inputStream = new DataInputStream(new BufferedInputStream(socket.getInputStream))*/
- var str: String = null
- str = inputStream.readUTF
- while(str != null) {
- dataHandler += str
- str = inputStream.readUTF()
- }
-
- /*
- var loop = true
- while(loop) {
- val numRead = inputStream.read(bytes)
- if (numRead < 0) {
- loop = false
- }
- inbox += ((LongTime(SystemTime.currentTimeMillis), "test"))
- }*/
-
- inputStream.close()
- } catch {
- case e => logError("Error receiving data", e)
- }
- socket.close()
- }
- }
-
- initLogging()
-
- val masterHost = System.getProperty("spark.master.host")
- val masterPort = System.getProperty("spark.master.port").toInt
-
- val akkaPath = "akka://spark@%s:%s/user/".format(masterHost, masterPort)
- val sparkstreamScheduler = actorSystem.actorFor(akkaPath + "/SparkStreamScheduler")
- val testStreamCoordinator = actorSystem.actorFor(akkaPath + "/TestStreamCoordinator")
-
- logInfo("Getting stream details from master " + masterHost + ":" + masterPort)
-
- val timeout = 50 millis
-
- var started = false
- while (!started) {
- askActor[String](testStreamCoordinator, TestStarted) match {
- case Some(str) => {
- started = true
- logInfo("TestStreamCoordinator started")
- }
- case None => {
- logInfo("TestStreamCoordinator not started yet")
- Thread.sleep(200)
- }
- }
- }
-
- val streamDetails = askActor[GotStreamDetails](testStreamCoordinator, GetStreamDetails) match {
- case Some(details) => details
- case None => throw new Exception("Could not get stream details")
- }
- logInfo("Stream details received: " + streamDetails)
-
- val inputName = streamDetails.name
- val intervalDurationMillis = streamDetails.duration
- val intervalDuration = Time(intervalDurationMillis)
-
- val dataHandler = new DataHandler(
- inputName,
- intervalDuration,
- Time(TestStreamReceiver3.SHORT_INTERVAL_MILLIS),
- blockManager)
-
- val connListener = new ConnectionListener(TestStreamReceiver3.PORT, dataHandler)
-
- // Send a message to an actor and return an option with its reply, or None if this times out
- def askActor[T](actor: ActorRef, message: Any): Option[T] = {
- try {
- val future = actor.ask(message)(timeout)
- return Some(Await.result(future, timeout).asInstanceOf[T])
- } catch {
- case e: Exception =>
- logInfo("Error communicating with " + actor, e)
- return None
- }
- }
-
- override def run() {
- connListener.start()
- dataHandler.start()
-
- var interval = Interval.currentInterval(intervalDuration)
- var dataStarted = false
-
- while(true) {
- waitFor(interval.endTime)
- logInfo("Woken up at " + System.currentTimeMillis + " for " + interval)
- dataHandler.getBucket(interval) match {
- case Some(bucket) => {
- logInfo("Found " + bucket + " for " + interval)
- bucket.synchronized {
- if (!bucket.ready) {
- logInfo("Waiting for " + bucket)
- bucket.wait()
- logInfo("Wait over for " + bucket)
- }
- if (dataStarted || !bucket.empty) {
- logInfo("Notifying " + bucket)
- notifyScheduler(interval, bucket.blockIds)
- dataStarted = true
- }
- bucket.blocks.clear()
- dataHandler.clearBucket(interval)
- }
- }
- case None => {
- logInfo("Found none for " + interval)
- if (dataStarted) {
- logInfo("Notifying none")
- notifyScheduler(interval, Array[String]())
- }
- }
- }
- interval = interval.next
- }
- }
-
- def waitFor(time: Time) {
- val currentTimeMillis = System.currentTimeMillis
- val targetTimeMillis = time.milliseconds
- if (currentTimeMillis < targetTimeMillis) {
- val sleepTime = (targetTimeMillis - currentTimeMillis)
- Thread.sleep(sleepTime + 1)
- }
- }
-
- def notifyScheduler(interval: Interval, blockIds: Array[String]) {
- try {
- sparkstreamScheduler ! InputGenerated(inputName, interval, blockIds.toArray)
- val time = interval.endTime
- val delay = (System.currentTimeMillis - time.milliseconds) / 1000.0
- logInfo("Pushing delay for " + time + " is " + delay + " s")
- } catch {
- case _ => logError("Exception notifying scheduler at interval " + interval)
- }
- }
-}
-
-object TestStreamReceiver3 {
-
- val PORT = 9999
- val SHORT_INTERVAL_MILLIS = 100
-
- def main(args: Array[String]) {
- System.setProperty("spark.master.host", Utils.localHostName)
- System.setProperty("spark.master.port", "7078")
- val details = Array(("Sentences", 2000L))
- val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localHostName, 7078)
- actorSystem.actorOf(Props(new TestStreamCoordinator(details)), name = "TestStreamCoordinator")
- new TestStreamReceiver3(actorSystem, null).start()
- }
-}
-
-
-
diff --git a/streaming/src/main/scala/spark/streaming/util/TestStreamReceiver4.scala b/streaming/src/main/scala/spark/streaming/util/TestStreamReceiver4.scala
deleted file mode 100644
index 31754870dd..0000000000
--- a/streaming/src/main/scala/spark/streaming/util/TestStreamReceiver4.scala
+++ /dev/null
@@ -1,374 +0,0 @@
-package spark.streaming.util
-
-import spark.streaming._
-import spark._
-import spark.storage._
-import spark.util.AkkaUtils
-
-import scala.math._
-import scala.collection.mutable.{Queue, HashMap, ArrayBuffer, SynchronizedMap}
-
-import java.io._
-import java.nio._
-import java.nio.charset._
-import java.nio.channels._
-import java.util.concurrent.Executors
-
-import akka.actor._
-import akka.actor.Actor
-import akka.dispatch._
-import akka.pattern.ask
-import akka.util.duration._
-
-class TestStreamReceiver4(actorSystem: ActorSystem, blockManager: BlockManager)
-extends Thread with Logging {
-
- class DataHandler(
- inputName: String,
- longIntervalDuration: Time,
- shortIntervalDuration: Time,
- blockManager: BlockManager
- )
- extends Logging {
-
- class Block(val id: String, val shortInterval: Interval, val buffer: ByteBuffer) {
- var pushed = false
- def longInterval = getLongInterval(shortInterval)
- override def toString() = "Block " + id
- }
-
- class Bucket(val longInterval: Interval) {
- val blocks = new ArrayBuffer[Block]()
- var filled = false
- def += (block: Block) = blocks += block
- def empty() = (blocks.size == 0)
- def ready() = (filled && !blocks.exists(! _.pushed))
- def blockIds() = blocks.map(_.id).toArray
- override def toString() = "Bucket [" + longInterval + ", " + blocks.size + " blocks]"
- }
-
- initLogging()
-
- val syncOnLastShortInterval = true
-
- val shortIntervalDurationMillis = shortIntervalDuration.milliseconds
- val longIntervalDurationMillis = longIntervalDuration.milliseconds
-
- val buffer = ByteBuffer.allocateDirect(100 * 1024 * 1024)
- var currentShortInterval = Interval.currentInterval(shortIntervalDuration)
-
- val blocksForPushing = new Queue[Block]()
- val buckets = new HashMap[Interval, Bucket]() with SynchronizedMap[Interval, Bucket]
-
- val bufferProcessingThread = new Thread() { override def run() { keepProcessingBuffers() } }
- val blockPushingExecutor = Executors.newFixedThreadPool(5)
-
-
- def start() {
- buffer.clear()
- if (buffer.remaining == 0) {
- throw new Exception("Buffer initialization error")
- }
- bufferProcessingThread.start()
- }
-
- def readDataToBuffer(func: ByteBuffer => Int): Int = {
- buffer.synchronized {
- if (buffer.remaining == 0) {
- logInfo("Received first data for interval " + currentShortInterval)
- }
- func(buffer)
- }
- }
-
- def getLongInterval(shortInterval: Interval): Interval = {
- val intervalBegin = shortInterval.beginTime.floor(longIntervalDuration)
- Interval(intervalBegin, intervalBegin + longIntervalDuration)
- }
-
- def processBuffer() {
-
- def readInt(buffer: ByteBuffer): Int = {
- var offset = 0
- var result = 0
- while (offset < 32) {
- val b = buffer.get()
- result |= ((b & 0x7F) << offset)
- if ((b & 0x80) == 0) {
- return result
- }
- offset += 7
- }
- throw new Exception("Malformed zigzag-encoded integer")
- }
-
- val currentLongInterval = getLongInterval(currentShortInterval)
- val startTime = System.currentTimeMillis
- val newBuffer: ByteBuffer = buffer.synchronized {
- buffer.flip()
- if (buffer.remaining == 0) {
- buffer.clear()
- null
- } else {
- logDebug("Processing interval " + currentShortInterval + " with delay of " + (System.currentTimeMillis - startTime) + " ms")
- val startTime1 = System.currentTimeMillis
- var loop = true
- var count = 0
- while(loop) {
- buffer.mark()
- try {
- val len = readInt(buffer)
- buffer.position(buffer.position + len)
- count += 1
- } catch {
- case e: Exception => {
- buffer.reset()
- loop = false
- }
- }
- }
- val bytesToCopy = buffer.position
- val newBuf = ByteBuffer.allocate(bytesToCopy)
- buffer.position(0)
- newBuf.put(buffer.slice().limit(bytesToCopy).asInstanceOf[ByteBuffer])
- newBuf.flip()
- buffer.position(bytesToCopy)
- buffer.compact()
- newBuf
- }
- }
-
- if (newBuffer != null) {
- val bucket = buckets.getOrElseUpdate(currentLongInterval, new Bucket(currentLongInterval))
- bucket.synchronized {
- val newBlockId = inputName + "-" + currentLongInterval.toFormattedString + "-" + currentShortInterval.toFormattedString
- val newBlock = new Block(newBlockId, currentShortInterval, newBuffer)
- if (syncOnLastShortInterval) {
- bucket += newBlock
- }
- logDebug("Created " + newBlock + " with " + newBuffer.remaining + " bytes, creation delay is " + (System.currentTimeMillis - currentShortInterval.endTime.milliseconds) / 1000.0 + " s" )
- blockPushingExecutor.execute(new Runnable() { def run() { pushAndNotifyBlock(newBlock) } })
- }
- }
-
- val newShortInterval = Interval.currentInterval(shortIntervalDuration)
- val newLongInterval = getLongInterval(newShortInterval)
-
- if (newLongInterval != currentLongInterval) {
- buckets.get(currentLongInterval) match {
- case Some(bucket) => {
- bucket.synchronized {
- bucket.filled = true
- if (bucket.ready) {
- bucket.notifyAll()
- }
- }
- }
- case None =>
- }
- buckets += ((newLongInterval, new Bucket(newLongInterval)))
- }
-
- currentShortInterval = newShortInterval
- }
-
- def pushBlock(block: Block) {
- try{
- if (blockManager != null) {
- val startTime = System.currentTimeMillis
- logInfo(block + " put start delay is " + (startTime - block.shortInterval.endTime.milliseconds) + " ms")
- /*blockManager.putBytes(block.id.toString, block.buffer, StorageLevel.DISK_AND_MEMORY)*/
- /*blockManager.putBytes(block.id.toString, block.buffer, StorageLevel.DISK_AND_MEMORY_2)*/
- blockManager.putBytes(block.id.toString, block.buffer, StorageLevel.MEMORY_ONLY_2)
- /*blockManager.putBytes(block.id.toString, block.buffer, StorageLevel.MEMORY_ONLY)*/
- /*blockManager.putBytes(block.id.toString, block.buffer, StorageLevel.DISK_AND_MEMORY_DESER)*/
- /*blockManager.putBytes(block.id.toString, block.buffer, StorageLevel.DISK_AND_MEMORY_DESER_2)*/
- val finishTime = System.currentTimeMillis
- logInfo(block + " put delay is " + (finishTime - startTime) + " ms")
- } else {
- logWarning(block + " not put as block manager is null")
- }
- } catch {
- case e: Exception => logError("Exception writing " + block + " to blockmanager" , e)
- }
- }
-
- def getBucket(longInterval: Interval): Option[Bucket] = {
- buckets.get(longInterval)
- }
-
- def clearBucket(longInterval: Interval) {
- buckets.remove(longInterval)
- }
-
- def keepProcessingBuffers() {
- logInfo("Thread to process buffers started")
- while(true) {
- processBuffer()
- val currentTimeMillis = System.currentTimeMillis
- val sleepTimeMillis = (currentTimeMillis / shortIntervalDurationMillis + 1) *
- shortIntervalDurationMillis - currentTimeMillis + 1
- Thread.sleep(sleepTimeMillis)
- }
- }
-
- def pushAndNotifyBlock(block: Block) {
- pushBlock(block)
- block.pushed = true
- val bucket = if (syncOnLastShortInterval) {
- buckets(block.longInterval)
- } else {
- var longInterval = block.longInterval
- while(!buckets.contains(longInterval)) {
- logWarning("Skipping bucket of " + longInterval + " for " + block)
- longInterval = longInterval.next
- }
- val chosenBucket = buckets(longInterval)
- logDebug("Choosing bucket of " + longInterval + " for " + block)
- chosenBucket += block
- chosenBucket
- }
-
- bucket.synchronized {
- if (bucket.ready) {
- bucket.notifyAll()
- }
- }
-
- }
- }
-
-
- class ReceivingConnectionHandler(host: String, port: Int, dataHandler: DataHandler)
- extends ConnectionHandler(host, port, false) {
-
- override def ready(key: SelectionKey) {
- changeInterest(key, SelectionKey.OP_READ)
- }
-
- override def read(key: SelectionKey) {
- try {
- val channel = key.channel.asInstanceOf[SocketChannel]
- val bytesRead = dataHandler.readDataToBuffer(channel.read)
- if (bytesRead < 0) {
- close(key)
- }
- } catch {
- case e: IOException => {
- logError("Error reading", e)
- close(key)
- }
- }
- }
- }
-
- initLogging()
-
- val masterHost = System.getProperty("spark.master.host", "localhost")
- val masterPort = System.getProperty("spark.master.port", "7078").toInt
-
- val akkaPath = "akka://spark@%s:%s/user/".format(masterHost, masterPort)
- val sparkstreamScheduler = actorSystem.actorFor(akkaPath + "/SparkStreamScheduler")
- val testStreamCoordinator = actorSystem.actorFor(akkaPath + "/TestStreamCoordinator")
-
- logInfo("Getting stream details from master " + masterHost + ":" + masterPort)
-
- val streamDetails = askActor[GotStreamDetails](testStreamCoordinator, GetStreamDetails) match {
- case Some(details) => details
- case None => throw new Exception("Could not get stream details")
- }
- logInfo("Stream details received: " + streamDetails)
-
- val inputName = streamDetails.name
- val intervalDurationMillis = streamDetails.duration
- val intervalDuration = Milliseconds(intervalDurationMillis)
- val shortIntervalDuration = Milliseconds(System.getProperty("spark.stream.shortinterval", "500").toInt)
-
- val dataHandler = new DataHandler(inputName, intervalDuration, shortIntervalDuration, blockManager)
- val connectionHandler = new ReceivingConnectionHandler("localhost", 9999, dataHandler)
-
- val timeout = 100 millis
-
- // Send a message to an actor and return an option with its reply, or None if this times out
- def askActor[T](actor: ActorRef, message: Any): Option[T] = {
- try {
- val future = actor.ask(message)(timeout)
- return Some(Await.result(future, timeout).asInstanceOf[T])
- } catch {
- case e: Exception =>
- logInfo("Error communicating with " + actor, e)
- return None
- }
- }
-
- override def run() {
- connectionHandler.start()
- dataHandler.start()
-
- var interval = Interval.currentInterval(intervalDuration)
- var dataStarted = false
-
-
- while(true) {
- waitFor(interval.endTime)
- /*logInfo("Woken up at " + System.currentTimeMillis + " for " + interval)*/
- dataHandler.getBucket(interval) match {
- case Some(bucket) => {
- logDebug("Found " + bucket + " for " + interval)
- bucket.synchronized {
- if (!bucket.ready) {
- logDebug("Waiting for " + bucket)
- bucket.wait()
- logDebug("Wait over for " + bucket)
- }
- if (dataStarted || !bucket.empty) {
- logDebug("Notifying " + bucket)
- notifyScheduler(interval, bucket.blockIds)
- dataStarted = true
- }
- bucket.blocks.clear()
- dataHandler.clearBucket(interval)
- }
- }
- case None => {
- logDebug("Found none for " + interval)
- if (dataStarted) {
- logDebug("Notifying none")
- notifyScheduler(interval, Array[String]())
- }
- }
- }
- interval = interval.next
- }
- }
-
- def waitFor(time: Time) {
- val currentTimeMillis = System.currentTimeMillis
- val targetTimeMillis = time.milliseconds
- if (currentTimeMillis < targetTimeMillis) {
- val sleepTime = (targetTimeMillis - currentTimeMillis)
- Thread.sleep(sleepTime + 1)
- }
- }
-
- def notifyScheduler(interval: Interval, blockIds: Array[String]) {
- try {
- sparkstreamScheduler ! InputGenerated(inputName, interval, blockIds.toArray)
- val time = interval.endTime
- val delay = (System.currentTimeMillis - time.milliseconds)
- logInfo("Notification delay for " + time + " is " + delay + " ms")
- } catch {
- case e: Exception => logError("Exception notifying scheduler at interval " + interval + ": " + e)
- }
- }
-}
-
-
-object TestStreamReceiver4 {
- def main(args: Array[String]) {
- val details = Array(("Sentences", 2000L))
- val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localHostName, 7078)
- actorSystem.actorOf(Props(new TestStreamCoordinator(details)), name = "TestStreamCoordinator")
- new TestStreamReceiver4(actorSystem, null).start()
- }
-}
diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
index 0450120061..0bcf207082 100644
--- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
@@ -15,12 +15,16 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
}
after {
+
+ if (ssc != null) ssc.stop()
FileUtils.deleteDirectory(new File(checkpointDir))
}
+ var ssc: StreamingContext = null
+
override def framework = "CheckpointSuite"
- override def batchDuration = Milliseconds(500)
+ override def batchDuration = Milliseconds(200)
override def checkpointDir = "checkpoint"
@@ -30,12 +34,12 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
test("basic stream+rdd recovery") {
- assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second")
+ assert(batchDuration === Milliseconds(200), "batchDuration for this test must be 1 second")
assert(checkpointInterval === batchDuration, "checkpointInterval for this test much be same as batchDuration")
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
- val stateStreamCheckpointInterval = Seconds(2)
+ val stateStreamCheckpointInterval = Seconds(1)
// this ensure checkpointing occurs at least once
val firstNumBatches = (stateStreamCheckpointInterval.millis / batchDuration.millis) * 2
@@ -110,6 +114,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
runStreamsWithRealDelay(ssc, 4)
ssc.stop()
System.clearProperty("spark.streaming.manualClock.jump")
+ ssc = null
}
test("map and reduceByKey") {
@@ -131,9 +136,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
.reduceByKeyAndWindow(_ + _, _ - _, batchDuration * w, batchDuration)
.checkpoint(Seconds(2))
}
- for (i <- Seq(2, 3, 4)) {
- testCheckpointedOperation(input, operation, output, i)
- }
+ testCheckpointedOperation(input, operation, output, 3)
}
test("updateStateByKey") {
@@ -148,9 +151,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
.checkpoint(Seconds(2))
.map(t => (t._1, t._2.self))
}
- for (i <- Seq(2, 3, 4)) {
- testCheckpointedOperation(input, operation, output, i)
- }
+ testCheckpointedOperation(input, operation, output, 3)
}
@@ -171,7 +172,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Do half the computation (half the number of batches), create checkpoint file and quit
- val ssc = setupStreams[U, V](input, operation)
+ ssc = setupStreams[U, V](input, operation)
val output = runStreams[V](ssc, initialNumBatches, initialNumExpectedOutputs)
verifyOutput[V](output, expectedOutput.take(initialNumBatches), true)
Thread.sleep(1000)
@@ -182,9 +183,10 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
" Restarting stream computation " +
"\n-------------------------------------------\n"
)
- val sscNew = new StreamingContext(checkpointDir)
- val outputNew = runStreams[V](sscNew, nextNumBatches, nextNumExpectedOutputs)
+ ssc = new StreamingContext(checkpointDir)
+ val outputNew = runStreams[V](ssc, nextNumBatches, nextNumExpectedOutputs)
verifyOutput[V](outputNew, expectedOutput.takeRight(nextNumExpectedOutputs), true)
+ ssc = null
}
/**
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index 8f892baab1..0957748603 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -9,12 +9,19 @@ import spark.storage.StorageLevel
import spark.Logging
import scala.util.Random
import org.apache.commons.io.FileUtils
+import org.scalatest.BeforeAndAfter
-class InputStreamsSuite extends TestSuiteBase {
+class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
+ override def checkpointDir = "checkpoint"
+
+ after {
+ FileUtils.deleteDirectory(new File(checkpointDir))
+ }
+
test("network input stream") {
// Start the server
val serverPort = 9999
@@ -30,7 +37,7 @@ class InputStreamsSuite extends TestSuiteBase {
ssc.registerOutputStream(outputStream)
ssc.start()
- // Feed data to the server to send to the Spark Streaming network receiver
+ // Feed data to the server to send to the network receiver
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq(1, 2, 3, 4, 5)
val expectedOutput = input.map(_.toString)
@@ -52,7 +59,7 @@ class InputStreamsSuite extends TestSuiteBase {
logInfo("Stopping context")
ssc.stop()
- // Verify whether data received by Spark Streaming was as expected
+ // Verify whether data received was as expected
logInfo("--------------------------------")
logInfo("output.size = " + outputBuffer.size)
logInfo("output")
@@ -69,6 +76,49 @@ class InputStreamsSuite extends TestSuiteBase {
}
}
+ test("network input stream with checkpoint") {
+ // Start the server
+ val serverPort = 9999
+ val server = new TestServer(9999)
+ server.start()
+
+ // Set up the streaming context and input streams
+ var ssc = new StreamingContext(master, framework)
+ ssc.setBatchDuration(batchDuration)
+ ssc.checkpoint(checkpointDir, checkpointInterval)
+ val networkStream = ssc.networkTextStream("localhost", serverPort, StorageLevel.MEMORY_AND_DISK)
+ var outputStream = new TestOutputStream(networkStream, new ArrayBuffer[Seq[String]])
+ ssc.registerOutputStream(outputStream)
+ ssc.start()
+
+ // Feed data to the server to send to the network receiver
+ var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ for (i <- Seq(1, 2, 3)) {
+ server.send(i.toString + "\n")
+ Thread.sleep(100)
+ clock.addToTime(batchDuration.milliseconds)
+ }
+ Thread.sleep(500)
+ assert(outputStream.output.size > 0)
+ ssc.stop()
+
+ // Restart stream computation from checkpoint and feed more data to see whether
+ // they are being received and processed
+ logInfo("*********** RESTARTING ************")
+ ssc = new StreamingContext(checkpointDir)
+ ssc.start()
+ clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ for (i <- Seq(4, 5, 6)) {
+ server.send(i.toString + "\n")
+ Thread.sleep(100)
+ clock.addToTime(batchDuration.milliseconds)
+ }
+ Thread.sleep(500)
+ outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[String]]
+ assert(outputStream.output.size > 0)
+ ssc.stop()
+ }
+
test("file input stream") {
// Create a temporary directory
val dir = {
@@ -76,7 +126,7 @@ class InputStreamsSuite extends TestSuiteBase {
temp.delete()
temp.mkdirs()
temp.deleteOnExit()
- println("Created temp dir " + temp)
+ logInfo("Created temp dir " + temp)
temp
}
@@ -84,7 +134,9 @@ class InputStreamsSuite extends TestSuiteBase {
val ssc = new StreamingContext(master, framework)
ssc.setBatchDuration(batchDuration)
val filestream = ssc.textFileStream(dir.toString)
- val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]]
+ val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
+ def output = outputBuffer.flatMap(x => x)
+
val outputStream = new TestOutputStream(filestream, outputBuffer)
ssc.registerOutputStream(outputStream)
ssc.start()
@@ -96,36 +148,88 @@ class InputStreamsSuite extends TestSuiteBase {
Thread.sleep(1000)
for (i <- 0 until input.size) {
FileUtils.writeStringToFile(new File(dir, i.toString), input(i).toString + "\n")
- Thread.sleep(500)
+ Thread.sleep(100)
clock.addToTime(batchDuration.milliseconds)
- Thread.sleep(500)
+ Thread.sleep(100)
}
val startTime = System.currentTimeMillis()
- while (outputBuffer.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
- println("output.size = " + outputBuffer.size + ", expectedOutput.size = " + expectedOutput.size)
+ while (output.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
+ //println("output.size = " + output.size + ", expectedOutput.size = " + expectedOutput.size)
Thread.sleep(100)
}
Thread.sleep(1000)
val timeTaken = System.currentTimeMillis() - startTime
assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
- println("Stopping context")
+ logInfo("Stopping context")
ssc.stop()
// Verify whether data received by Spark Streaming was as expected
logInfo("--------------------------------")
- logInfo("output.size = " + outputBuffer.size)
+ logInfo("output.size = " + output.size)
logInfo("output")
- outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+ output.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("expected output.size = " + expectedOutput.size)
logInfo("expected output")
expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("--------------------------------")
- assert(outputBuffer.size === expectedOutput.size)
- for (i <- 0 until outputBuffer.size) {
- assert(outputBuffer(i).size === 1)
- assert(outputBuffer(i).head === expectedOutput(i))
+ assert(output.size === expectedOutput.size)
+ for (i <- 0 until output.size) {
+ assert(output(i).size === 1)
+ assert(output(i).head.toString === expectedOutput(i))
+ }
+ }
+
+ test("file input stream with checkpoint") {
+ // Create a temporary directory
+ val dir = {
+ var temp = File.createTempFile(".temp.", Random.nextInt().toString)
+ temp.delete()
+ temp.mkdirs()
+ temp.deleteOnExit()
+ println("Created temp dir " + temp)
+ temp
}
+
+ // Set up the streaming context and input streams
+ var ssc = new StreamingContext(master, framework)
+ ssc.setBatchDuration(batchDuration)
+ ssc.checkpoint(checkpointDir, checkpointInterval)
+ val filestream = ssc.textFileStream(dir.toString)
+ var outputStream = new TestOutputStream(filestream, new ArrayBuffer[Seq[String]])
+ ssc.registerOutputStream(outputStream)
+ ssc.start()
+
+ // Create files and advance manual clock to process them
+ var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ Thread.sleep(1000)
+ for (i <- Seq(1, 2, 3)) {
+ FileUtils.writeStringToFile(new File(dir, i.toString), i.toString + "\n")
+ Thread.sleep(100)
+ clock.addToTime(batchDuration.milliseconds)
+ }
+ Thread.sleep(500)
+ logInfo("Output = " + outputStream.output.mkString(","))
+ assert(outputStream.output.size > 0)
+ ssc.stop()
+
+ // Restart stream computation from checkpoint and create more files to see whether
+ // they are being processed
+ logInfo("*********** RESTARTING ************")
+ ssc = new StreamingContext(checkpointDir)
+ ssc.start()
+ clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ Thread.sleep(500)
+ for (i <- Seq(4, 5, 6)) {
+ FileUtils.writeStringToFile(new File(dir, i.toString), i.toString + "\n")
+ Thread.sleep(100)
+ clock.addToTime(batchDuration.milliseconds)
+ }
+ Thread.sleep(500)
+ outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[String]]
+ logInfo("Output = " + outputStream.output.mkString(","))
+ assert(outputStream.output.size > 0)
+ ssc.stop()
}
}