aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/scala
diff options
context:
space:
mode:
authorDenny <dennybritz@gmail.com>2012-11-12 19:39:29 -0800
committerDenny <dennybritz@gmail.com>2012-11-12 19:39:29 -0800
commit255b3e44c18e64a55afb184f39746780b391a496 (patch)
tree479fedcfdbf7e68bc6dc73188421a49fbc356a82 /streaming/src/test/scala
parent0fd4c93f1c349f052f633fea64f975d53976bd9c (diff)
parent564dd8c3f415746a68f05bde6ea2a0e7a7760b4c (diff)
downloadspark-255b3e44c18e64a55afb184f39746780b391a496.tar.gz
spark-255b3e44c18e64a55afb184f39746780b391a496.tar.bz2
spark-255b3e44c18e64a55afb184f39746780b391a496.zip
Merge branch 'dev' into kafka
Diffstat (limited to 'streaming/src/test/scala')
-rw-r--r--streaming/src/test/scala/spark/streaming/CheckpointSuite.scala26
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala136
2 files changed, 134 insertions, 28 deletions
diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
index 0450120061..0bcf207082 100644
--- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
@@ -15,12 +15,16 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
}
after {
+
+ if (ssc != null) ssc.stop()
FileUtils.deleteDirectory(new File(checkpointDir))
}
+ var ssc: StreamingContext = null
+
override def framework = "CheckpointSuite"
- override def batchDuration = Milliseconds(500)
+ override def batchDuration = Milliseconds(200)
override def checkpointDir = "checkpoint"
@@ -30,12 +34,12 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
test("basic stream+rdd recovery") {
- assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second")
+ assert(batchDuration === Milliseconds(200), "batchDuration for this test must be 1 second")
assert(checkpointInterval === batchDuration, "checkpointInterval for this test much be same as batchDuration")
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
- val stateStreamCheckpointInterval = Seconds(2)
+ val stateStreamCheckpointInterval = Seconds(1)
// this ensure checkpointing occurs at least once
val firstNumBatches = (stateStreamCheckpointInterval.millis / batchDuration.millis) * 2
@@ -110,6 +114,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
runStreamsWithRealDelay(ssc, 4)
ssc.stop()
System.clearProperty("spark.streaming.manualClock.jump")
+ ssc = null
}
test("map and reduceByKey") {
@@ -131,9 +136,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
.reduceByKeyAndWindow(_ + _, _ - _, batchDuration * w, batchDuration)
.checkpoint(Seconds(2))
}
- for (i <- Seq(2, 3, 4)) {
- testCheckpointedOperation(input, operation, output, i)
- }
+ testCheckpointedOperation(input, operation, output, 3)
}
test("updateStateByKey") {
@@ -148,9 +151,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
.checkpoint(Seconds(2))
.map(t => (t._1, t._2.self))
}
- for (i <- Seq(2, 3, 4)) {
- testCheckpointedOperation(input, operation, output, i)
- }
+ testCheckpointedOperation(input, operation, output, 3)
}
@@ -171,7 +172,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Do half the computation (half the number of batches), create checkpoint file and quit
- val ssc = setupStreams[U, V](input, operation)
+ ssc = setupStreams[U, V](input, operation)
val output = runStreams[V](ssc, initialNumBatches, initialNumExpectedOutputs)
verifyOutput[V](output, expectedOutput.take(initialNumBatches), true)
Thread.sleep(1000)
@@ -182,9 +183,10 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
" Restarting stream computation " +
"\n-------------------------------------------\n"
)
- val sscNew = new StreamingContext(checkpointDir)
- val outputNew = runStreams[V](sscNew, nextNumBatches, nextNumExpectedOutputs)
+ ssc = new StreamingContext(checkpointDir)
+ val outputNew = runStreams[V](ssc, nextNumBatches, nextNumExpectedOutputs)
verifyOutput[V](outputNew, expectedOutput.takeRight(nextNumExpectedOutputs), true)
+ ssc = null
}
/**
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index 8f892baab1..0957748603 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -9,12 +9,19 @@ import spark.storage.StorageLevel
import spark.Logging
import scala.util.Random
import org.apache.commons.io.FileUtils
+import org.scalatest.BeforeAndAfter
-class InputStreamsSuite extends TestSuiteBase {
+class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
+ override def checkpointDir = "checkpoint"
+
+ after {
+ FileUtils.deleteDirectory(new File(checkpointDir))
+ }
+
test("network input stream") {
// Start the server
val serverPort = 9999
@@ -30,7 +37,7 @@ class InputStreamsSuite extends TestSuiteBase {
ssc.registerOutputStream(outputStream)
ssc.start()
- // Feed data to the server to send to the Spark Streaming network receiver
+ // Feed data to the server to send to the network receiver
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq(1, 2, 3, 4, 5)
val expectedOutput = input.map(_.toString)
@@ -52,7 +59,7 @@ class InputStreamsSuite extends TestSuiteBase {
logInfo("Stopping context")
ssc.stop()
- // Verify whether data received by Spark Streaming was as expected
+ // Verify whether data received was as expected
logInfo("--------------------------------")
logInfo("output.size = " + outputBuffer.size)
logInfo("output")
@@ -69,6 +76,49 @@ class InputStreamsSuite extends TestSuiteBase {
}
}
+ test("network input stream with checkpoint") {
+ // Start the server
+ val serverPort = 9999
+ val server = new TestServer(9999)
+ server.start()
+
+ // Set up the streaming context and input streams
+ var ssc = new StreamingContext(master, framework)
+ ssc.setBatchDuration(batchDuration)
+ ssc.checkpoint(checkpointDir, checkpointInterval)
+ val networkStream = ssc.networkTextStream("localhost", serverPort, StorageLevel.MEMORY_AND_DISK)
+ var outputStream = new TestOutputStream(networkStream, new ArrayBuffer[Seq[String]])
+ ssc.registerOutputStream(outputStream)
+ ssc.start()
+
+ // Feed data to the server to send to the network receiver
+ var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ for (i <- Seq(1, 2, 3)) {
+ server.send(i.toString + "\n")
+ Thread.sleep(100)
+ clock.addToTime(batchDuration.milliseconds)
+ }
+ Thread.sleep(500)
+ assert(outputStream.output.size > 0)
+ ssc.stop()
+
+ // Restart stream computation from checkpoint and feed more data to see whether
+ // they are being received and processed
+ logInfo("*********** RESTARTING ************")
+ ssc = new StreamingContext(checkpointDir)
+ ssc.start()
+ clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ for (i <- Seq(4, 5, 6)) {
+ server.send(i.toString + "\n")
+ Thread.sleep(100)
+ clock.addToTime(batchDuration.milliseconds)
+ }
+ Thread.sleep(500)
+ outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[String]]
+ assert(outputStream.output.size > 0)
+ ssc.stop()
+ }
+
test("file input stream") {
// Create a temporary directory
val dir = {
@@ -76,7 +126,7 @@ class InputStreamsSuite extends TestSuiteBase {
temp.delete()
temp.mkdirs()
temp.deleteOnExit()
- println("Created temp dir " + temp)
+ logInfo("Created temp dir " + temp)
temp
}
@@ -84,7 +134,9 @@ class InputStreamsSuite extends TestSuiteBase {
val ssc = new StreamingContext(master, framework)
ssc.setBatchDuration(batchDuration)
val filestream = ssc.textFileStream(dir.toString)
- val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]]
+ val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
+ def output = outputBuffer.flatMap(x => x)
+
val outputStream = new TestOutputStream(filestream, outputBuffer)
ssc.registerOutputStream(outputStream)
ssc.start()
@@ -96,36 +148,88 @@ class InputStreamsSuite extends TestSuiteBase {
Thread.sleep(1000)
for (i <- 0 until input.size) {
FileUtils.writeStringToFile(new File(dir, i.toString), input(i).toString + "\n")
- Thread.sleep(500)
+ Thread.sleep(100)
clock.addToTime(batchDuration.milliseconds)
- Thread.sleep(500)
+ Thread.sleep(100)
}
val startTime = System.currentTimeMillis()
- while (outputBuffer.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
- println("output.size = " + outputBuffer.size + ", expectedOutput.size = " + expectedOutput.size)
+ while (output.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
+ //println("output.size = " + output.size + ", expectedOutput.size = " + expectedOutput.size)
Thread.sleep(100)
}
Thread.sleep(1000)
val timeTaken = System.currentTimeMillis() - startTime
assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
- println("Stopping context")
+ logInfo("Stopping context")
ssc.stop()
// Verify whether data received by Spark Streaming was as expected
logInfo("--------------------------------")
- logInfo("output.size = " + outputBuffer.size)
+ logInfo("output.size = " + output.size)
logInfo("output")
- outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+ output.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("expected output.size = " + expectedOutput.size)
logInfo("expected output")
expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("--------------------------------")
- assert(outputBuffer.size === expectedOutput.size)
- for (i <- 0 until outputBuffer.size) {
- assert(outputBuffer(i).size === 1)
- assert(outputBuffer(i).head === expectedOutput(i))
+ assert(output.size === expectedOutput.size)
+ for (i <- 0 until output.size) {
+ assert(output(i).size === 1)
+ assert(output(i).head.toString === expectedOutput(i))
+ }
+ }
+
+ test("file input stream with checkpoint") {
+ // Create a temporary directory
+ val dir = {
+ var temp = File.createTempFile(".temp.", Random.nextInt().toString)
+ temp.delete()
+ temp.mkdirs()
+ temp.deleteOnExit()
+ println("Created temp dir " + temp)
+ temp
}
+
+ // Set up the streaming context and input streams
+ var ssc = new StreamingContext(master, framework)
+ ssc.setBatchDuration(batchDuration)
+ ssc.checkpoint(checkpointDir, checkpointInterval)
+ val filestream = ssc.textFileStream(dir.toString)
+ var outputStream = new TestOutputStream(filestream, new ArrayBuffer[Seq[String]])
+ ssc.registerOutputStream(outputStream)
+ ssc.start()
+
+ // Create files and advance manual clock to process them
+ var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ Thread.sleep(1000)
+ for (i <- Seq(1, 2, 3)) {
+ FileUtils.writeStringToFile(new File(dir, i.toString), i.toString + "\n")
+ Thread.sleep(100)
+ clock.addToTime(batchDuration.milliseconds)
+ }
+ Thread.sleep(500)
+ logInfo("Output = " + outputStream.output.mkString(","))
+ assert(outputStream.output.size > 0)
+ ssc.stop()
+
+ // Restart stream computation from checkpoint and create more files to see whether
+ // they are being processed
+ logInfo("*********** RESTARTING ************")
+ ssc = new StreamingContext(checkpointDir)
+ ssc.start()
+ clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ Thread.sleep(500)
+ for (i <- Seq(4, 5, 6)) {
+ FileUtils.writeStringToFile(new File(dir, i.toString), i.toString + "\n")
+ Thread.sleep(100)
+ clock.addToTime(batchDuration.milliseconds)
+ }
+ Thread.sleep(500)
+ outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[String]]
+ logInfo("Output = " + outputStream.output.mkString(","))
+ assert(outputStream.output.size > 0)
+ ssc.stop()
}
}