aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-02-19 02:48:50 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-02-19 02:48:50 -0800
commit9e82be1503551ff8a8e46dc06e70e1f0aee00051 (patch)
tree8bad9e0c1318d0c5d0120f8b87a7c868e22176ae /streaming/src/test
parent291dd47c7f702f1229f82b111126f5f64b29d0c6 (diff)
parent12ea14c211da908a278ab19fd1e9f6acd45daae8 (diff)
downloadspark-9e82be1503551ff8a8e46dc06e70e1f0aee00051.tar.gz
spark-9e82be1503551ff8a8e46dc06e70e1f0aee00051.tar.bz2
spark-9e82be1503551ff8a8e46dc06e70e1f0aee00051.zip
Merge branch 'streaming' into ScrapCodes-streaming-actor
Conflicts: docs/plugin-custom-receiver.md streaming/src/main/scala/spark/streaming/StreamingContext.scala streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala streaming/src/main/scala/spark/streaming/dstream/PluggableInputDStream.scala streaming/src/main/scala/spark/streaming/receivers/ActorReceiver.scala streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/java/spark/streaming/JavaAPISuite.java (renamed from streaming/src/test/java/JavaAPISuite.java)158
-rw-r--r--streaming/src/test/java/spark/streaming/JavaTestUtils.scala (renamed from streaming/src/test/java/JavaTestUtils.scala)1
-rw-r--r--streaming/src/test/resources/log4j.properties1
-rw-r--r--streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala88
-rw-r--r--streaming/src/test/scala/spark/streaming/CheckpointSuite.scala195
-rw-r--r--streaming/src/test/scala/spark/streaming/FailureSuite.scala189
-rw-r--r--streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala165
-rw-r--r--streaming/src/test/scala/spark/streaming/TestSuiteBase.scala22
-rw-r--r--streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala64
9 files changed, 443 insertions, 440 deletions
diff --git a/streaming/src/test/java/JavaAPISuite.java b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
index c84e7331c7..5d510fd89f 100644
--- a/streaming/src/test/java/JavaAPISuite.java
+++ b/streaming/src/test/java/spark/streaming/JavaAPISuite.java
@@ -33,8 +33,9 @@ public class JavaAPISuite implements Serializable {
@Before
public void setUp() {
- ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
- ssc.checkpoint("checkpoint", new Duration(1000));
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock");
+ ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
+ ssc.checkpoint("checkpoint");
}
@After
@@ -43,7 +44,7 @@ public class JavaAPISuite implements Serializable {
ssc = null;
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port");
+ System.clearProperty("spark.driver.port");
}
@Test
@@ -134,29 +135,6 @@ public class JavaAPISuite implements Serializable {
}
@Test
- public void testTumble() {
- List<List<Integer>> inputData = Arrays.asList(
- Arrays.asList(1,2,3),
- Arrays.asList(4,5,6),
- Arrays.asList(7,8,9),
- Arrays.asList(10,11,12),
- Arrays.asList(13,14,15),
- Arrays.asList(16,17,18));
-
- List<List<Integer>> expected = Arrays.asList(
- Arrays.asList(1,2,3,4,5,6),
- Arrays.asList(7,8,9,10,11,12),
- Arrays.asList(13,14,15,16,17,18));
-
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream windowed = stream.tumble(new Duration(2000));
- JavaTestUtils.attachTestOutputStream(windowed);
- List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 6, 3);
-
- assertOrderInvariantEquals(expected, result);
- }
-
- @Test
public void testFilter() {
List<List<String>> inputData = Arrays.asList(
Arrays.asList("giants", "dodgers"),
@@ -583,50 +561,73 @@ public class JavaAPISuite implements Serializable {
}
@Test
- public void testCountByKey() {
- List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+ public void testCountByValue() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("hello", "world"),
+ Arrays.asList("hello", "moon"),
+ Arrays.asList("hello"));
List<List<Tuple2<String, Long>>> expected = Arrays.asList(
- Arrays.asList(
- new Tuple2<String, Long>("california", 2L),
- new Tuple2<String, Long>("new york", 2L)),
- Arrays.asList(
- new Tuple2<String, Long>("california", 2L),
- new Tuple2<String, Long>("new york", 2L)));
-
- JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
- ssc, inputData, 1);
- JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ Arrays.asList(
+ new Tuple2<String, Long>("hello", 1L),
+ new Tuple2<String, Long>("world", 1L)),
+ Arrays.asList(
+ new Tuple2<String, Long>("hello", 1L),
+ new Tuple2<String, Long>("moon", 1L)),
+ Arrays.asList(
+ new Tuple2<String, Long>("hello", 1L)));
- JavaPairDStream<String, Long> counted = pairStream.countByKey();
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Long> counted = stream.countByValue();
JavaTestUtils.attachTestOutputStream(counted);
- List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+ List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
Assert.assertEquals(expected, result);
}
@Test
public void testGroupByKeyAndWindow() {
- List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+ List<List<Tuple2<String, Integer>>> inputData = stringIntKVStream;
- List<List<Tuple2<String, List<String>>>> expected = Arrays.asList(
- Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("dodgers", "giants")),
- new Tuple2<String, List<String>>("new york", Arrays.asList("yankees", "mets"))),
- Arrays.asList(new Tuple2<String, List<String>>("california",
- Arrays.asList("sharks", "ducks", "dodgers", "giants")),
- new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders", "yankees", "mets"))),
- Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("sharks", "ducks")),
- new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders"))));
+ List<List<Tuple2<String, List<Integer>>>> expected = Arrays.asList(
+ Arrays.asList(
+ new Tuple2<String, List<Integer>>("california", Arrays.asList(1, 3)),
+ new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 4))
+ ),
+ Arrays.asList(
+ new Tuple2<String, List<Integer>>("california", Arrays.asList(1, 3, 5, 5)),
+ new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 1, 3, 4))
+ ),
+ Arrays.asList(
+ new Tuple2<String, List<Integer>>("california", Arrays.asList(5, 5)),
+ new Tuple2<String, List<Integer>>("new york", Arrays.asList(1, 3))
+ )
+ );
- JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
+ JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<String, List<String>> groupWindowed =
+ JavaPairDStream<String, List<Integer>> groupWindowed =
pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(groupWindowed);
- List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
+ List<List<Tuple2<String, List<Integer>>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
- Assert.assertEquals(expected, result);
+ assert(result.size() == expected.size());
+ for (int i = 0; i < result.size(); i++) {
+ assert(convert(result.get(i)).equals(convert(expected.get(i))));
+ }
+ }
+
+ private HashSet<Tuple2<String, HashSet<Integer>>> convert(List<Tuple2<String, List<Integer>>> listOfTuples) {
+ List<Tuple2<String, HashSet<Integer>>> newListOfTuples = new ArrayList<Tuple2<String, HashSet<Integer>>>();
+ for (Tuple2<String, List<Integer>> tuple: listOfTuples) {
+ newListOfTuples.add(convert(tuple));
+ }
+ return new HashSet<Tuple2<String, HashSet<Integer>>>(newListOfTuples);
+ }
+
+ private Tuple2<String, HashSet<Integer>> convert(Tuple2<String, List<Integer>> tuple) {
+ return new Tuple2<String, HashSet<Integer>>(tuple._1(), new HashSet<Integer>(tuple._2()));
}
@Test
@@ -711,26 +712,28 @@ public class JavaAPISuite implements Serializable {
}
@Test
- public void testCountByKeyAndWindow() {
- List<List<Tuple2<String, String>>> inputData = stringStringKVStream;
+ public void testCountByValueAndWindow() {
+ List<List<String>> inputData = Arrays.asList(
+ Arrays.asList("hello", "world"),
+ Arrays.asList("hello", "moon"),
+ Arrays.asList("hello"));
List<List<Tuple2<String, Long>>> expected = Arrays.asList(
Arrays.asList(
- new Tuple2<String, Long>("california", 2L),
- new Tuple2<String, Long>("new york", 2L)),
+ new Tuple2<String, Long>("hello", 1L),
+ new Tuple2<String, Long>("world", 1L)),
Arrays.asList(
- new Tuple2<String, Long>("california", 4L),
- new Tuple2<String, Long>("new york", 4L)),
+ new Tuple2<String, Long>("hello", 2L),
+ new Tuple2<String, Long>("world", 1L),
+ new Tuple2<String, Long>("moon", 1L)),
Arrays.asList(
- new Tuple2<String, Long>("california", 2L),
- new Tuple2<String, Long>("new york", 2L)));
+ new Tuple2<String, Long>("hello", 2L),
+ new Tuple2<String, Long>("moon", 1L)));
- JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(
ssc, inputData, 1);
- JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
-
JavaPairDStream<String, Long> counted =
- pairStream.countByKeyAndWindow(new Duration(2000), new Duration(1000));
+ stream.countByValueAndWindow(new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(counted);
List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -911,9 +914,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(1,4),
Arrays.asList(8,7));
-
File tempDir = Files.createTempDir();
- ssc.checkpoint(tempDir.getAbsolutePath(), new Duration(1000));
+ ssc.checkpoint(tempDir.getAbsolutePath());
JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream letterCount = stream.map(new Function<String, Integer>() {
@@ -927,14 +929,16 @@ public class JavaAPISuite implements Serializable {
assertOrderInvariantEquals(expectedInitial, initialResult);
Thread.sleep(1000);
-
ssc.stop();
+
ssc = new JavaStreamingContext(tempDir.getAbsolutePath());
- ssc.start();
- List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 2);
- assertOrderInvariantEquals(expectedFinal, finalResult);
+ // Tweak to take into consideration that the last batch before failure
+ // will be re-processed after recovery
+ List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 3);
+ assertOrderInvariantEquals(expectedFinal, finalResult.subList(1, 3));
}
+
/** TEST DISABLED: Pending a discussion about checkpoint() semantics with TD
@Test
public void testCheckpointofIndividualStream() throws InterruptedException {
@@ -971,15 +975,15 @@ public class JavaAPISuite implements Serializable {
public void testKafkaStream() {
HashMap<String, Integer> topics = Maps.newHashMap();
HashMap<KafkaPartitionKey, Long> offsets = Maps.newHashMap();
- JavaDStream test1 = ssc.kafkaStream("localhost", 12345, "group", topics);
- JavaDStream test2 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets);
- JavaDStream test3 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets,
+ JavaDStream test1 = ssc.kafkaStream("localhost:12345", "group", topics);
+ JavaDStream test2 = ssc.kafkaStream("localhost:12345", "group", topics, offsets);
+ JavaDStream test3 = ssc.kafkaStream("localhost:12345", "group", topics, offsets,
StorageLevel.MEMORY_AND_DISK());
}
@Test
public void testNetworkTextStream() {
- JavaDStream test = ssc.networkTextStream("localhost", 12345);
+ JavaDStream test = ssc.socketTextStream("localhost", 12345);
}
@Test
@@ -999,7 +1003,7 @@ public class JavaAPISuite implements Serializable {
}
}
- JavaDStream test = ssc.networkStream(
+ JavaDStream test = ssc.socketStream(
"localhost",
12345,
new Converter(),
diff --git a/streaming/src/test/java/JavaTestUtils.scala b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala
index 56349837e5..52ea28732a 100644
--- a/streaming/src/test/java/JavaTestUtils.scala
+++ b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala
@@ -57,6 +57,7 @@ trait JavaTestBase extends TestSuiteBase {
}
object JavaTestUtils extends JavaTestBase {
+ override def maxWaitTimeMillis = 20000
}
diff --git a/streaming/src/test/resources/log4j.properties b/streaming/src/test/resources/log4j.properties
index edfa1243fa..59c445e63f 100644
--- a/streaming/src/test/resources/log4j.properties
+++ b/streaming/src/test/resources/log4j.properties
@@ -1,5 +1,6 @@
# Set everything to be logged to the file streaming/target/unit-tests.log
log4j.rootCategory=INFO, file
+# log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.append=false
log4j.appender.file.file=streaming/target/unit-tests.log
diff --git a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
index bfdf32c73e..8fce91853c 100644
--- a/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/BasicOperationsSuite.scala
@@ -6,11 +6,13 @@ import util.ManualClock
class BasicOperationsSuite extends TestSuiteBase {
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
+
override def framework() = "BasicOperationsSuite"
after {
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
+ System.clearProperty("spark.driver.port")
}
test("map") {
@@ -22,7 +24,7 @@ class BasicOperationsSuite extends TestSuiteBase {
)
}
- test("flatmap") {
+ test("flatMap") {
val input = Seq(1 to 4, 5 to 8, 9 to 12)
testOperation(
input,
@@ -86,6 +88,23 @@ class BasicOperationsSuite extends TestSuiteBase {
)
}
+ test("count") {
+ testOperation(
+ Seq(1 to 1, 1 to 2, 1 to 3, 1 to 4),
+ (s: DStream[Int]) => s.count(),
+ Seq(Seq(1L), Seq(2L), Seq(3L), Seq(4L))
+ )
+ }
+
+ test("countByValue") {
+ testOperation(
+ Seq(1 to 1, Seq(1, 1, 1), 1 to 2, Seq(1, 1, 2, 2)),
+ (s: DStream[Int]) => s.countByValue(),
+ Seq(Seq((1, 1L)), Seq((1, 3L)), Seq((1, 1L), (2, 1L)), Seq((2, 2L), (1, 2L))),
+ true
+ )
+ }
+
test("mapValues") {
testOperation(
Seq( Seq("a", "a", "b"), Seq("", ""), Seq() ),
@@ -165,6 +184,71 @@ class BasicOperationsSuite extends TestSuiteBase {
testOperation(inputData, updateStateOperation, outputData, true)
}
+ test("updateStateByKey - object lifecycle") {
+ val inputData =
+ Seq(
+ Seq("a","b"),
+ null,
+ Seq("a","c","a"),
+ Seq("c"),
+ null,
+ null
+ )
+
+ val outputData =
+ Seq(
+ Seq(("a", 1), ("b", 1)),
+ Seq(("a", 1), ("b", 1)),
+ Seq(("a", 3), ("c", 1)),
+ Seq(("a", 3), ("c", 2)),
+ Seq(("c", 2)),
+ Seq()
+ )
+
+ val updateStateOperation = (s: DStream[String]) => {
+ class StateObject(var counter: Int = 0, var expireCounter: Int = 0) extends Serializable
+
+ // updateFunc clears a state when a StateObject is seen without new values twice in a row
+ val updateFunc = (values: Seq[Int], state: Option[StateObject]) => {
+ val stateObj = state.getOrElse(new StateObject)
+ values.foldLeft(0)(_ + _) match {
+ case 0 => stateObj.expireCounter += 1 // no new values
+ case n => { // has new values, increment and reset expireCounter
+ stateObj.counter += n
+ stateObj.expireCounter = 0
+ }
+ }
+ stateObj.expireCounter match {
+ case 2 => None // seen twice with no new values, give it the boot
+ case _ => Option(stateObj)
+ }
+ }
+ s.map(x => (x, 1)).updateStateByKey[StateObject](updateFunc).mapValues(_.counter)
+ }
+
+ testOperation(inputData, updateStateOperation, outputData, true)
+ }
+
+ test("slice") {
+ val ssc = new StreamingContext("local[2]", "BasicOperationSuite", Seconds(1))
+ val input = Seq(Seq(1), Seq(2), Seq(3), Seq(4))
+ val stream = new TestInputStream[Int](ssc, input, 2)
+ ssc.registerInputStream(stream)
+ stream.foreach(_ => {}) // Dummy output stream
+ ssc.start()
+ Thread.sleep(2000)
+ def getInputFromSlice(fromMillis: Long, toMillis: Long) = {
+ stream.slice(new Time(fromMillis), new Time(toMillis)).flatMap(_.collect()).toSet
+ }
+
+ assert(getInputFromSlice(0, 1000) == Set(1))
+ assert(getInputFromSlice(0, 2000) == Set(1, 2))
+ assert(getInputFromSlice(1000, 2000) == Set(1, 2))
+ assert(getInputFromSlice(2000, 4000) == Set(2, 3, 4))
+ ssc.stop()
+ Thread.sleep(1000)
+ }
+
test("forgetting of RDDs - map and window operations") {
assert(batchDuration === Seconds(1), "Batch duration has changed from 1 second")
diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
index d2f32c189b..cac86deeaf 100644
--- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
@@ -1,5 +1,6 @@
package spark.streaming
+import dstream.FileInputDStream
import spark.streaming.StreamingContext._
import java.io.File
import runtime.RichInt
@@ -7,9 +8,19 @@ import org.scalatest.BeforeAndAfter
import org.apache.commons.io.FileUtils
import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import util.{Clock, ManualClock}
+import scala.util.Random
+import com.google.common.io.Files
+
+/**
+ * This test suites tests the checkpointing functionality of DStreams -
+ * the checkpointing of a DStream's RDDs as well as the checkpointing of
+ * the whole DStream graph.
+ */
class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
+
before {
FileUtils.deleteDirectory(new File(checkpointDir))
}
@@ -19,7 +30,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
FileUtils.deleteDirectory(new File(checkpointDir))
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
+ System.clearProperty("spark.driver.port")
}
var ssc: StreamingContext = null
@@ -28,21 +39,18 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
override def batchDuration = Milliseconds(500)
- override def checkpointInterval = batchDuration
-
override def actuallyWait = true
- test("basic stream+rdd recovery") {
+ test("basic rdd checkpoints + dstream graph checkpoint recovery") {
assert(batchDuration === Milliseconds(500), "batchDuration for this test must be 1 second")
- assert(checkpointInterval === batchDuration, "checkpointInterval for this test much be same as batchDuration")
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
val stateStreamCheckpointInterval = Seconds(1)
// this ensure checkpointing occurs at least once
- val firstNumBatches = (stateStreamCheckpointInterval / batchDuration) * 2
+ val firstNumBatches = (stateStreamCheckpointInterval / batchDuration).toLong * 2
val secondNumBatches = firstNumBatches
// Setup the streams
@@ -62,10 +70,10 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Run till a time such that at least one RDD in the stream should have been checkpointed,
// then check whether some RDD has been checkpointed or not
ssc.start()
- runStreamsWithRealDelay(ssc, firstNumBatches)
- logInfo("Checkpoint data of state stream = \n[" + stateStream.checkpointData.rdds.mkString(",\n") + "]")
- assert(!stateStream.checkpointData.rdds.isEmpty, "No checkpointed RDDs in state stream before first failure")
- stateStream.checkpointData.rdds.foreach {
+ advanceTimeWithRealDelay(ssc, firstNumBatches)
+ logInfo("Checkpoint data of state stream = \n" + stateStream.checkpointData)
+ assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before first failure")
+ stateStream.checkpointData.checkpointFiles.foreach {
case (time, data) => {
val file = new File(data.toString)
assert(file.exists(), "Checkpoint file '" + file +"' for time " + time + " for state stream before first failure does not exist")
@@ -74,8 +82,8 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Run till a further time such that previous checkpoint files in the stream would be deleted
// and check whether the earlier checkpoint files are deleted
- val checkpointFiles = stateStream.checkpointData.rdds.map(x => new File(x._2.toString))
- runStreamsWithRealDelay(ssc, secondNumBatches)
+ val checkpointFiles = stateStream.checkpointData.checkpointFiles.map(x => new File(x._2))
+ advanceTimeWithRealDelay(ssc, secondNumBatches)
checkpointFiles.foreach(file => assert(!file.exists, "Checkpoint file '" + file + "' was not deleted"))
ssc.stop()
@@ -90,9 +98,9 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Run one batch to generate a new checkpoint file and check whether some RDD
// is present in the checkpoint data or not
ssc.start()
- runStreamsWithRealDelay(ssc, 1)
- assert(!stateStream.checkpointData.rdds.isEmpty, "No checkpointed RDDs in state stream before second failure")
- stateStream.checkpointData.rdds.foreach {
+ advanceTimeWithRealDelay(ssc, 1)
+ assert(!stateStream.checkpointData.checkpointFiles.isEmpty, "No checkpointed RDDs in state stream before second failure")
+ stateStream.checkpointData.checkpointFiles.foreach {
case (time, data) => {
val file = new File(data.toString)
assert(file.exists(),
@@ -111,13 +119,16 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
// Adjust manual clock time as if it is being restarted after a delay
System.setProperty("spark.streaming.manualClock.jump", (batchDuration.milliseconds * 7).toString)
ssc.start()
- runStreamsWithRealDelay(ssc, 4)
+ advanceTimeWithRealDelay(ssc, 4)
ssc.stop()
System.clearProperty("spark.streaming.manualClock.jump")
ssc = null
}
- test("map and reduceByKey") {
+ // This tests whether the systm can recover from a master failure with simple
+ // non-stateful operations. This assumes as reliable, replayable input
+ // source - TestInputDStream.
+ test("recovery with map and reduceByKey operations") {
testCheckpointedOperation(
Seq( Seq("a", "a", "b"), Seq("", ""), Seq(), Seq("a", "a", "b"), Seq("", ""), Seq() ),
(s: DStream[String]) => s.map(x => (x, 1)).reduceByKey(_ + _),
@@ -126,7 +137,11 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
)
}
- test("reduceByKeyAndWindowInv") {
+
+ // This tests whether the ReduceWindowedDStream's RDD checkpoints works correctly such
+ // that the system can recover from a master failure. This assumes as reliable,
+ // replayable input source - TestInputDStream.
+ test("recovery with invertible reduceByKeyAndWindow operation") {
val n = 10
val w = 4
val input = (1 to n).map(_ => Seq("a")).toSeq
@@ -139,7 +154,11 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
testCheckpointedOperation(input, operation, output, 7)
}
- test("updateStateByKey") {
+
+ // This tests whether the StateDStream's RDD checkpoints works correctly such
+ // that the system can recover from a master failure. This assumes as reliable,
+ // replayable input source - TestInputDStream.
+ test("recovery with updateStateByKey operation") {
val input = (1 to 10).map(_ => Seq("a")).toSeq
val output = (1 to 10).map(x => Seq(("a", x))).toSeq
val operation = (st: DStream[String]) => {
@@ -154,11 +173,126 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
testCheckpointedOperation(input, operation, output, 7)
}
+ // This tests whether file input stream remembers what files were seen before
+ // the master failure and uses them again to process a large window operation.
+ // It also tests whether batches, whose processing was incomplete due to the
+ // failure, are re-processed or not.
+ test("recovery with file input stream") {
+ // Disable manual clock as FileInputDStream does not work with manual clock
+ val clockProperty = System.getProperty("spark.streaming.clock")
+ System.clearProperty("spark.streaming.clock")
+
+ // Set up the streaming context and input streams
+ val testDir = Files.createTempDir()
+ var ssc = new StreamingContext(master, framework, Seconds(1))
+ ssc.checkpoint(checkpointDir)
+ val fileStream = ssc.textFileStream(testDir.toString)
+ // Making value 3 take large time to process, to ensure that the master
+ // shuts down in the middle of processing the 3rd batch
+ val mappedStream = fileStream.map(s => {
+ val i = s.toInt
+ if (i == 3) Thread.sleep(2000)
+ i
+ })
+
+ // Reducing over a large window to ensure that recovery from master failure
+ // requires reprocessing of all the files seen before the failure
+ val reducedStream = mappedStream.reduceByWindow(_ + _, Seconds(30), Seconds(1))
+ val outputBuffer = new ArrayBuffer[Seq[Int]]
+ var outputStream = new TestOutputStream(reducedStream, outputBuffer)
+ ssc.registerOutputStream(outputStream)
+ ssc.start()
+
+ // Create files and advance manual clock to process them
+ //var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ Thread.sleep(1000)
+ for (i <- Seq(1, 2, 3)) {
+ FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
+ // wait to make sure that the file is written such that it gets shown in the file listings
+ Thread.sleep(1000)
+ }
+ logInfo("Output = " + outputStream.output.mkString(","))
+ assert(outputStream.output.size > 0, "No files processed before restart")
+ ssc.stop()
+
+ // Verify whether files created have been recorded correctly or not
+ var fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
+ def recordedFiles = fileInputDStream.files.values.flatMap(x => x)
+ assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
+
+ // Create files while the master is down
+ for (i <- Seq(4, 5, 6)) {
+ FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
+ Thread.sleep(1000)
+ }
+
+ // Recover context from checkpoint file and verify whether the files that were
+ // recorded before failure were saved and successfully recovered
+ logInfo("*********** RESTARTING ************")
+ ssc = new StreamingContext(checkpointDir)
+ fileInputDStream = ssc.graph.getInputStreams().head.asInstanceOf[FileInputDStream[_, _, _]]
+ assert(!recordedFiles.filter(_.endsWith("1")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("2")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("3")).isEmpty)
+
+ // Restart stream computation
+ ssc.start()
+ for (i <- Seq(7, 8, 9)) {
+ FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
+ Thread.sleep(1000)
+ }
+ Thread.sleep(1000)
+ logInfo("Output = " + outputStream.output.mkString("[", ", ", "]"))
+ assert(outputStream.output.size > 0, "No files processed after restart")
+ ssc.stop()
+
+ // Verify whether files created while the driver was down have been recorded or not
+ assert(!recordedFiles.filter(_.endsWith("4")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("5")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("6")).isEmpty)
+
+ // Verify whether new files created after recover have been recorded or not
+ assert(!recordedFiles.filter(_.endsWith("7")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("8")).isEmpty)
+ assert(!recordedFiles.filter(_.endsWith("9")).isEmpty)
+
+ // Append the new output to the old buffer
+ outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[Int]]
+ outputBuffer ++= outputStream.output
+
+ val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45)
+ logInfo("--------------------------------")
+ logInfo("output, size = " + outputBuffer.size)
+ outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+ logInfo("expected output, size = " + expectedOutput.size)
+ expectedOutput.foreach(x => logInfo("[" + x + "]"))
+ logInfo("--------------------------------")
+
+ // Verify whether all the elements received are as expected
+ val output = outputBuffer.flatMap(x => x)
+ assert(output.contains(6)) // To ensure that the 3rd input (i.e., 3) was processed
+ output.foreach(o => // To ensure all the inputs are correctly added cumulatively
+ assert(expectedOutput.contains(o), "Expected value " + o + " not found")
+ )
+ // To ensure that all the inputs were received correctly
+ assert(expectedOutput.last === output.last)
+
+ // Enable manual clock back again for other tests
+ if (clockProperty != null)
+ System.setProperty("spark.streaming.clock", clockProperty)
+ }
+
+
/**
- * Tests a streaming operation under checkpointing, by restart the operation
+ * Tests a streaming operation under checkpointing, by restarting the operation
* from checkpoint file and verifying whether the final output is correct.
* The output is assumed to have come from a reliable queue which an replay
* data as required.
+ *
+ * NOTE: This takes into consideration that the last batch processed before
+ * master failure will be re-processed after restart/recovery.
*/
def testCheckpointedOperation[U: ClassManifest, V: ClassManifest](
input: Seq[Seq[U]],
@@ -172,11 +306,14 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
val totalNumBatches = input.size
val nextNumBatches = totalNumBatches - initialNumBatches
val initialNumExpectedOutputs = initialNumBatches
- val nextNumExpectedOutputs = expectedOutput.size - initialNumExpectedOutputs
+ val nextNumExpectedOutputs = expectedOutput.size - initialNumExpectedOutputs + 1
+ // because the last batch will be processed again
// Do the computation for initial number of batches, create checkpoint file and quit
ssc = setupStreams[U, V](input, operation)
- val output = runStreams[V](ssc, initialNumBatches, initialNumExpectedOutputs)
+ ssc.start()
+ val output = advanceTimeWithRealDelay[V](ssc, initialNumBatches)
+ ssc.stop()
verifyOutput[V](output, expectedOutput.take(initialNumBatches), true)
Thread.sleep(1000)
@@ -187,16 +324,20 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
"\n-------------------------------------------\n"
)
ssc = new StreamingContext(checkpointDir)
- val outputNew = runStreams[V](ssc, nextNumBatches, nextNumExpectedOutputs)
+ System.clearProperty("spark.driver.port")
+ ssc.start()
+ val outputNew = advanceTimeWithRealDelay[V](ssc, nextNumBatches)
+ // the first element will be re-processed data of the last batch before restart
verifyOutput[V](outputNew, expectedOutput.takeRight(nextNumExpectedOutputs), true)
+ ssc.stop()
ssc = null
}
/**
* Advances the manual clock on the streaming scheduler by given number of batches.
- * It also wait for the expected amount of time for each batch.
+ * It also waits for the expected amount of time for each batch.
*/
- def runStreamsWithRealDelay(ssc: StreamingContext, numBatches: Long) {
+ def advanceTimeWithRealDelay[V: ClassManifest](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
logInfo("Manual clock before advancing = " + clock.time)
for (i <- 1 to numBatches.toInt) {
@@ -205,6 +346,8 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
}
logInfo("Manual clock after advancing = " + clock.time)
Thread.sleep(batchDuration.milliseconds)
- }
+ val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]]
+ outputStream.output
+ }
} \ No newline at end of file
diff --git a/streaming/src/test/scala/spark/streaming/FailureSuite.scala b/streaming/src/test/scala/spark/streaming/FailureSuite.scala
index 7493ac1207..a5fa7ab92d 100644
--- a/streaming/src/test/scala/spark/streaming/FailureSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/FailureSuite.scala
@@ -1,191 +1,40 @@
package spark.streaming
-import org.scalatest.BeforeAndAfter
-import org.apache.commons.io.FileUtils
+import spark.Logging
+import spark.streaming.util.MasterFailureTest
+import StreamingContext._
+
+import org.scalatest.{FunSuite, BeforeAndAfter}
+import com.google.common.io.Files
import java.io.File
-import scala.runtime.RichInt
-import scala.util.Random
-import spark.streaming.StreamingContext._
+import org.apache.commons.io.FileUtils
import collection.mutable.ArrayBuffer
-import spark.Logging
+
/**
* This testsuite tests master failures at random times while the stream is running using
* the real clock.
*/
-class FailureSuite extends TestSuiteBase with BeforeAndAfter {
+class FailureSuite extends FunSuite with BeforeAndAfter with Logging {
+
+ var directory = "FailureSuite"
+ val numBatches = 30
+ val batchDuration = Milliseconds(1000)
before {
- FileUtils.deleteDirectory(new File(checkpointDir))
+ FileUtils.deleteDirectory(new File(directory))
}
after {
- FailureSuite.reset()
- FileUtils.deleteDirectory(new File(checkpointDir))
-
- // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
- }
-
- override def framework = "CheckpointSuite"
-
- override def batchDuration = Milliseconds(500)
-
- override def checkpointDir = "checkpoint"
-
- override def checkpointInterval = batchDuration
-
- test("multiple failures with updateStateByKey") {
- val n = 30
- // Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ...
- val input = (1 to n).map(i => (1 to i).map(_ =>"a").toSeq).toSeq
- // Last output: [ (a, 465) ] for n=30
- val lastOutput = Seq( ("a", (1 to n).reduce(_ + _)) )
-
- val operation = (st: DStream[String]) => {
- val updateFunc = (values: Seq[Int], state: Option[RichInt]) => {
- Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0)))
- }
- st.map(x => (x, 1))
- .updateStateByKey[RichInt](updateFunc)
- .checkpoint(Seconds(2))
- .map(t => (t._1, t._2.self))
- }
-
- testOperationWithMultipleFailures(input, operation, lastOutput, n, n)
- }
-
- test("multiple failures with reduceByKeyAndWindow") {
- val n = 30
- val w = 100
- assert(w > n, "Window should be much larger than the number of input sets in this test")
- // Input: time=1 ==> [ a ] , time=2 ==> [ a, a ] , time=3 ==> [ a, a, a ] , ...
- val input = (1 to n).map(i => (1 to i).map(_ =>"a").toSeq).toSeq
- // Last output: [ (a, 465) ]
- val lastOutput = Seq( ("a", (1 to n).reduce(_ + _)) )
-
- val operation = (st: DStream[String]) => {
- st.map(x => (x, 1))
- .reduceByKeyAndWindow(_ + _, _ - _, batchDuration * w, batchDuration)
- .checkpoint(Seconds(2))
- }
-
- testOperationWithMultipleFailures(input, operation, lastOutput, n, n)
- }
-
-
- /**
- * Tests stream operation with multiple master failures, and verifies whether the
- * final set of output values is as expected or not. Checking the final value is
- * proof that no intermediate data was lost due to master failures.
- */
- def testOperationWithMultipleFailures[U: ClassManifest, V: ClassManifest](
- input: Seq[Seq[U]],
- operation: DStream[U] => DStream[V],
- lastExpectedOutput: Seq[V],
- numBatches: Int,
- numExpectedOutput: Int
- ) {
- var ssc = setupStreams[U, V](input, operation)
- val mergedOutput = new ArrayBuffer[Seq[V]]()
-
- var totalTimeRan = 0L
- while(totalTimeRan <= numBatches * batchDuration.milliseconds * 2) {
- new KillingThread(ssc, numBatches * batchDuration.milliseconds.toInt / 4).start()
- val (output, timeRan) = runStreamsWithRealClock[V](ssc, numBatches, numExpectedOutput)
-
- mergedOutput ++= output
- totalTimeRan += timeRan
- logInfo("New output = " + output)
- logInfo("Merged output = " + mergedOutput)
- logInfo("Total time spent = " + totalTimeRan)
- val sleepTime = Random.nextInt(numBatches * batchDuration.milliseconds.toInt / 8)
- logInfo(
- "\n-------------------------------------------\n" +
- " Restarting stream computation in " + sleepTime + " ms " +
- "\n-------------------------------------------\n"
- )
- Thread.sleep(sleepTime)
- FailureSuite.failed = false
- ssc = new StreamingContext(checkpointDir)
- }
- ssc.stop()
- ssc = null
-
- // Verify whether the last output is the expected one
- val lastOutput = mergedOutput(mergedOutput.lastIndexWhere(!_.isEmpty))
- assert(lastOutput.toSet === lastExpectedOutput.toSet)
- logInfo("Finished computation after " + FailureSuite.failureCount + " failures")
+ FileUtils.deleteDirectory(new File(directory))
}
- /**
- * Runs the streams set up in `ssc` on real clock until the expected max number of
- */
- def runStreamsWithRealClock[V: ClassManifest](
- ssc: StreamingContext,
- numBatches: Int,
- maxExpectedOutput: Int
- ): (Seq[Seq[V]], Long) = {
-
- System.clearProperty("spark.streaming.clock")
-
- assert(numBatches > 0, "Number of batches to run stream computation is zero")
- assert(maxExpectedOutput > 0, "Max expected outputs after " + numBatches + " is zero")
- logInfo("numBatches = " + numBatches + ", maxExpectedOutput = " + maxExpectedOutput)
-
- // Get the output buffer
- val outputStream = ssc.graph.getOutputStreams.head.asInstanceOf[TestOutputStream[V]]
- val output = outputStream.output
- val waitTime = (batchDuration.milliseconds * (numBatches.toDouble + 0.5)).toLong
- val startTime = System.currentTimeMillis()
-
- try {
- // Start computation
- ssc.start()
-
- // Wait until expected number of output items have been generated
- while (output.size < maxExpectedOutput && System.currentTimeMillis() - startTime < waitTime && !FailureSuite.failed) {
- logInfo("output.size = " + output.size + ", maxExpectedOutput = " + maxExpectedOutput)
- Thread.sleep(100)
- }
- } catch {
- case e: Exception => logInfo("Exception while running streams: " + e)
- } finally {
- ssc.stop()
- }
- val timeTaken = System.currentTimeMillis() - startTime
- logInfo("" + output.size + " sets of output generated in " + timeTaken + " ms")
- (output, timeTaken)
+ test("multiple failures with map") {
+ MasterFailureTest.testMap(directory, numBatches, batchDuration)
}
-
-}
-
-object FailureSuite {
- var failed = false
- var failureCount = 0
-
- def reset() {
- failed = false
- failureCount = 0
+ test("multiple failures with updateStateByKey") {
+ MasterFailureTest.testUpdateStateByKey(directory, numBatches, batchDuration)
}
}
-class KillingThread(ssc: StreamingContext, maxKillWaitTime: Int) extends Thread with Logging {
- initLogging()
-
- override def run() {
- var minKillWaitTime = if (FailureSuite.failureCount == 0) 3000 else 1000 // to allow the first checkpoint
- val killWaitTime = minKillWaitTime + Random.nextInt(maxKillWaitTime)
- logInfo("Kill wait time = " + killWaitTime)
- Thread.sleep(killWaitTime.toLong)
- logInfo(
- "\n---------------------------------------\n" +
- "Killing streaming context after " + killWaitTime + " ms" +
- "\n---------------------------------------\n"
- )
- if (ssc != null) ssc.stop()
- FailureSuite.failed = true
- FailureSuite.failureCount += 1
- }
-}
diff --git a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
index d597501781..c9f941c5b8 100644
--- a/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/InputStreamsSuite.scala
@@ -26,40 +26,30 @@ import org.apache.avro.ipc.specific.SpecificRequestor
import java.nio.ByteBuffer
import collection.JavaConversions._
import java.nio.charset.Charset
+import com.google.common.io.Files
class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
val testPort = 9999
- var testServer: TestServer = null
- var testDir: File = null
override def checkpointDir = "checkpoint"
after {
- FileUtils.deleteDirectory(new File(checkpointDir))
- if (testServer != null) {
- testServer.stop()
- testServer = null
- }
- if (testDir != null && testDir.exists()) {
- FileUtils.deleteDirectory(testDir)
- testDir = null
- }
-
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
+ System.clearProperty("spark.driver.port")
}
+
test("network input stream") {
// Start the server
- testServer = new TestServer(testPort)
+ val testServer = new TestServer(testPort)
testServer.start()
// Set up the streaming context and input streams
val ssc = new StreamingContext(master, framework, batchDuration)
- val networkStream = ssc.networkTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
+ val networkStream = ssc.socketTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String ]]
val outputStream = new TestOutputStream(networkStream, outputBuffer)
def output = outputBuffer.flatMap(x => x)
@@ -100,46 +90,6 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
}
- test("network input stream with checkpoint") {
- // Start the server
- testServer = new TestServer(testPort)
- testServer.start()
-
- // Set up the streaming context and input streams
- var ssc = new StreamingContext(master, framework, batchDuration)
- ssc.checkpoint(checkpointDir, checkpointInterval)
- val networkStream = ssc.networkTextStream("localhost", testPort, StorageLevel.MEMORY_AND_DISK)
- var outputStream = new TestOutputStream(networkStream, new ArrayBuffer[Seq[String]])
- ssc.registerOutputStream(outputStream)
- ssc.start()
-
- // Feed data to the server to send to the network receiver
- var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- for (i <- Seq(1, 2, 3)) {
- testServer.send(i.toString + "\n")
- Thread.sleep(100)
- clock.addToTime(batchDuration.milliseconds)
- }
- Thread.sleep(500)
- assert(outputStream.output.size > 0)
- ssc.stop()
-
- // Restart stream computation from checkpoint and feed more data to see whether
- // they are being received and processed
- logInfo("*********** RESTARTING ************")
- ssc = new StreamingContext(checkpointDir)
- ssc.start()
- clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- for (i <- Seq(4, 5, 6)) {
- testServer.send(i.toString + "\n")
- Thread.sleep(100)
- clock.addToTime(batchDuration.milliseconds)
- }
- Thread.sleep(500)
- outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[String]]
- assert(outputStream.output.size > 0)
- ssc.stop()
- }
test("flume input stream") {
// Set up the streaming context and input streams
@@ -153,7 +103,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq(1, 2, 3, 4, 5)
-
+ Thread.sleep(1000)
val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", 33333));
val client = SpecificRequestor.getClient(
classOf[AvroSourceProtocol], transceiver);
@@ -189,42 +139,33 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
}
- test("file input stream") {
- // Create a temporary directory
- testDir = {
- var temp = File.createTempFile(".temp.", Random.nextInt().toString)
- temp.delete()
- temp.mkdirs()
- logInfo("Created temp dir " + temp)
- temp
- }
+ test("file input stream") {
+ // Disable manual clock as FileInputDStream does not work with manual clock
+ System.clearProperty("spark.streaming.clock")
// Set up the streaming context and input streams
+ val testDir = Files.createTempDir()
val ssc = new StreamingContext(master, framework, batchDuration)
- val filestream = ssc.textFileStream(testDir.toString)
+ val fileStream = ssc.textFileStream(testDir.toString)
val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
def output = outputBuffer.flatMap(x => x)
- val outputStream = new TestOutputStream(filestream, outputBuffer)
+ val outputStream = new TestOutputStream(fileStream, outputBuffer)
ssc.registerOutputStream(outputStream)
ssc.start()
// Create files in the temporary directory so that Spark Streaming can read data from it
- val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq(1, 2, 3, 4, 5)
val expectedOutput = input.map(_.toString)
Thread.sleep(1000)
for (i <- 0 until input.size) {
- FileUtils.writeStringToFile(new File(testDir, i.toString), input(i).toString + "\n")
- Thread.sleep(500)
- clock.addToTime(batchDuration.milliseconds)
- //Thread.sleep(100)
+ val file = new File(testDir, i.toString)
+ FileUtils.writeStringToFile(file, input(i).toString + "\n")
+ logInfo("Created file " + file)
+ Thread.sleep(batchDuration.milliseconds)
+ Thread.sleep(1000)
}
val startTime = System.currentTimeMillis()
- /*while (output.size < expectedOutput.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
- logInfo("output.size = " + output.size + ", expectedOutput.size = " + expectedOutput.size)
- Thread.sleep(100)
- }*/
Thread.sleep(1000)
val timeTaken = System.currentTimeMillis() - startTime
assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
@@ -233,26 +174,27 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Verify whether data received by Spark Streaming was as expected
logInfo("--------------------------------")
- logInfo("output.size = " + outputBuffer.size)
- logInfo("output")
+ logInfo("output, size = " + outputBuffer.size)
outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
- logInfo("expected output.size = " + expectedOutput.size)
- logInfo("expected output")
+ logInfo("expected output, size = " + expectedOutput.size)
expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("--------------------------------")
// Verify whether all the elements received are as expected
// (whether the elements were received one in each interval is not verified)
- assert(output.size === expectedOutput.size)
- for (i <- 0 until output.size) {
- assert(output(i).size === 1)
- assert(output(i).head.toString === expectedOutput(i))
- }
+ assert(output.toList === expectedOutput.toList)
+
+ FileUtils.deleteDirectory(testDir)
+
+ // Enable manual clock back again for other tests
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
}
+
+
test("actor input stream") {
// Start the server
val port = testPort
- testServer = new TestServer(port)
+ val testServer = new TestServer(port)
testServer.start()
// Set up the streaming context and input streams
@@ -298,59 +240,10 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
assert(output(i) === expectedOutput(i))
}
}
-
- test("file input stream with checkpoint") {
- // Create a temporary directory
- testDir = {
- var temp = File.createTempFile(".temp.", Random.nextInt().toString)
- temp.delete()
- temp.mkdirs()
- logInfo("Created temp dir " + temp)
- temp
- }
-
- // Set up the streaming context and input streams
- var ssc = new StreamingContext(master, framework, batchDuration)
- ssc.checkpoint(checkpointDir, checkpointInterval)
- val filestream = ssc.textFileStream(testDir.toString)
- var outputStream = new TestOutputStream(filestream, new ArrayBuffer[Seq[String]])
- ssc.registerOutputStream(outputStream)
- ssc.start()
-
- // Create files and advance manual clock to process them
- var clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- Thread.sleep(1000)
- for (i <- Seq(1, 2, 3)) {
- FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
- Thread.sleep(100)
- clock.addToTime(batchDuration.milliseconds)
- }
- Thread.sleep(500)
- logInfo("Output = " + outputStream.output.mkString(","))
- assert(outputStream.output.size > 0)
- ssc.stop()
-
- // Restart stream computation from checkpoint and create more files to see whether
- // they are being processed
- logInfo("*********** RESTARTING ************")
- ssc = new StreamingContext(checkpointDir)
- ssc.start()
- clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
- Thread.sleep(500)
- for (i <- Seq(4, 5, 6)) {
- FileUtils.writeStringToFile(new File(testDir, i.toString), i.toString + "\n")
- Thread.sleep(100)
- clock.addToTime(batchDuration.milliseconds)
- }
- Thread.sleep(500)
- outputStream = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[String]]
- logInfo("Output = " + outputStream.output.mkString(","))
- assert(outputStream.output.size > 0)
- ssc.stop()
- }
}
+/** This is server to test the network input stream */
class TestServer(port: Int) extends Logging {
val queue = new ArrayBlockingQueue[String](100)
diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
index 49129f3964..ad6aa79d10 100644
--- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
@@ -28,6 +28,11 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[
logInfo("Computing RDD for time " + validTime)
val index = ((validTime - zeroTime) / slideDuration - 1).toInt
val selectedInput = if (index < input.size) input(index) else Seq[T]()
+
+ // lets us test cases where RDDs are not created
+ if (selectedInput == null)
+ return None
+
val rdd = ssc.sc.makeRDD(selectedInput, numPartitions)
logInfo("Created RDD " + rdd.id + " with " + selectedInput)
Some(rdd)
@@ -58,20 +63,25 @@ class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBu
*/
trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
+ // Name of the framework for Spark context
def framework = "TestSuiteBase"
+ // Master for Spark context
def master = "local[2]"
+ // Batch duration
def batchDuration = Seconds(1)
+ // Directory where the checkpoint data will be saved
def checkpointDir = "checkpoint"
- def checkpointInterval = batchDuration
-
+ // Number of partitions of the input parallel collections created for testing
def numInputPartitions = 2
+ // Maximum time to wait before the test times out
def maxWaitTimeMillis = 10000
+ // Whether to actually wait in real time before changing manual clock
def actuallyWait = false
/**
@@ -86,7 +96,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
// Create StreamingContext
val ssc = new StreamingContext(master, framework, batchDuration)
if (checkpointDir != null) {
- ssc.checkpoint(checkpointDir, checkpointInterval)
+ ssc.checkpoint(checkpointDir)
}
// Setup the stream computation
@@ -111,7 +121,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
// Create StreamingContext
val ssc = new StreamingContext(master, framework, batchDuration)
if (checkpointDir != null) {
- ssc.checkpoint(checkpointDir, checkpointInterval)
+ ssc.checkpoint(checkpointDir)
}
// Setup the stream computation
@@ -135,9 +145,6 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
numBatches: Int,
numExpectedOutput: Int
): Seq[Seq[V]] = {
-
- System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
-
assert(numBatches > 0, "Number of batches to run stream computation is zero")
assert(numExpectedOutput > 0, "Number of expected outputs after " + numBatches + " is zero")
logInfo("numBatches = " + numBatches + ", numExpectedOutput = " + numExpectedOutput)
@@ -181,7 +188,6 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
} finally {
ssc.stop()
}
-
output
}
diff --git a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
index 0c6e928835..1b66f3bda2 100644
--- a/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/WindowOperationsSuite.scala
@@ -5,6 +5,8 @@ import collection.mutable.ArrayBuffer
class WindowOperationsSuite extends TestSuiteBase {
+ System.setProperty("spark.streaming.clock", "spark.streaming.util.ManualClock")
+
override def framework = "WindowOperationsSuite"
override def maxWaitTimeMillis = 20000
@@ -13,7 +15,7 @@ class WindowOperationsSuite extends TestSuiteBase {
after {
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
- System.clearProperty("spark.master.port")
+ System.clearProperty("spark.driver.port")
}
val largerSlideInput = Seq(
@@ -82,12 +84,9 @@ class WindowOperationsSuite extends TestSuiteBase {
)
/*
- The output of the reduceByKeyAndWindow with inverse reduce function is
- different from the naive reduceByKeyAndWindow. Even if the count of a
- particular key is 0, the key does not get eliminated from the RDDs of
- ReducedWindowedDStream. This causes the number of keys in these RDDs to
- increase forever. A more generalized version that allows elimination of
- keys should be considered.
+ The output of the reduceByKeyAndWindow with inverse function but without a filter
+ function will be different from the naive reduceByKeyAndWindow, as no keys get
+ eliminated from the ReducedWindowedDStream even if the value of a key becomes 0.
*/
val bigReduceInvOutput = Seq(
@@ -175,31 +174,31 @@ class WindowOperationsSuite extends TestSuiteBase {
// Testing reduceByKeyAndWindow (with invertible reduce function)
- testReduceByKeyAndWindowInv(
+ testReduceByKeyAndWindowWithInverse(
"basic reduction",
Seq(Seq(("a", 1), ("a", 3)) ),
Seq(Seq(("a", 4)) )
)
- testReduceByKeyAndWindowInv(
+ testReduceByKeyAndWindowWithInverse(
"key already in window and new value added into window",
Seq( Seq(("a", 1)), Seq(("a", 1)) ),
Seq( Seq(("a", 1)), Seq(("a", 2)) )
)
- testReduceByKeyAndWindowInv(
+ testReduceByKeyAndWindowWithInverse(
"new key added into window",
Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 1)) ),
Seq( Seq(("a", 1)), Seq(("a", 2), ("b", 1)) )
)
- testReduceByKeyAndWindowInv(
+ testReduceByKeyAndWindowWithInverse(
"key removed from window",
Seq( Seq(("a", 1)), Seq(("a", 1)), Seq(), Seq() ),
Seq( Seq(("a", 1)), Seq(("a", 2)), Seq(("a", 1)), Seq(("a", 0)) )
)
- testReduceByKeyAndWindowInv(
+ testReduceByKeyAndWindowWithInverse(
"larger slide time",
largerSlideInput,
largerSlideReduceOutput,
@@ -207,7 +206,9 @@ class WindowOperationsSuite extends TestSuiteBase {
Seconds(2)
)
- testReduceByKeyAndWindowInv("big test", bigInput, bigReduceInvOutput)
+ testReduceByKeyAndWindowWithInverse("big test", bigInput, bigReduceInvOutput)
+
+ testReduceByKeyAndWindowWithFilteredInverse("big test", bigInput, bigReduceOutput)
test("groupByKeyAndWindow") {
val input = bigInput
@@ -235,14 +236,14 @@ class WindowOperationsSuite extends TestSuiteBase {
testOperation(input, operation, expectedOutput, numBatches, true)
}
- test("countByKeyAndWindow") {
- val input = Seq(Seq(("a", 1)), Seq(("b", 1), ("b", 2)), Seq(("a", 10), ("b", 20)))
+ test("countByValueAndWindow") {
+ val input = Seq(Seq("a"), Seq("b", "b"), Seq("a", "b"))
val expectedOutput = Seq( Seq(("a", 1)), Seq(("a", 1), ("b", 2)), Seq(("a", 1), ("b", 3)))
val windowDuration = Seconds(2)
val slideDuration = Seconds(1)
val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
- val operation = (s: DStream[(String, Int)]) => {
- s.countByKeyAndWindow(windowDuration, slideDuration).map(x => (x._1, x._2.toInt))
+ val operation = (s: DStream[String]) => {
+ s.countByValueAndWindow(windowDuration, slideDuration).map(x => (x._1, x._2.toInt))
}
testOperation(input, operation, expectedOutput, numBatches, true)
}
@@ -272,29 +273,50 @@ class WindowOperationsSuite extends TestSuiteBase {
slideDuration: Duration = Seconds(1)
) {
test("reduceByKeyAndWindow - " + name) {
+ logInfo("reduceByKeyAndWindow - " + name)
val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
- s.reduceByKeyAndWindow(_ + _, windowDuration, slideDuration).persist()
+ s.reduceByKeyAndWindow((x: Int, y: Int) => x + y, windowDuration, slideDuration)
}
testOperation(input, operation, expectedOutput, numBatches, true)
}
}
- def testReduceByKeyAndWindowInv(
+ def testReduceByKeyAndWindowWithInverse(
name: String,
input: Seq[Seq[(String, Int)]],
expectedOutput: Seq[Seq[(String, Int)]],
windowDuration: Duration = Seconds(2),
slideDuration: Duration = Seconds(1)
) {
- test("reduceByKeyAndWindowInv - " + name) {
+ test("reduceByKeyAndWindow with inverse function - " + name) {
+ logInfo("reduceByKeyAndWindow with inverse function - " + name)
val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
val operation = (s: DStream[(String, Int)]) => {
s.reduceByKeyAndWindow(_ + _, _ - _, windowDuration, slideDuration)
- .persist()
.checkpoint(Seconds(100)) // Large value to avoid effect of RDD checkpointing
}
testOperation(input, operation, expectedOutput, numBatches, true)
}
}
+
+ def testReduceByKeyAndWindowWithFilteredInverse(
+ name: String,
+ input: Seq[Seq[(String, Int)]],
+ expectedOutput: Seq[Seq[(String, Int)]],
+ windowDuration: Duration = Seconds(2),
+ slideDuration: Duration = Seconds(1)
+ ) {
+ test("reduceByKeyAndWindow with inverse and filter functions - " + name) {
+ logInfo("reduceByKeyAndWindow with inverse and filter functions - " + name)
+ val numBatches = expectedOutput.size * (slideDuration / batchDuration).toInt
+ val filterFunc = (p: (String, Int)) => p._2 != 0
+ val operation = (s: DStream[(String, Int)]) => {
+ s.reduceByKeyAndWindow(_ + _, _ - _, windowDuration, slideDuration, filterFunc = filterFunc)
+ .persist()
+ .checkpoint(Seconds(100)) // Large value to avoid effect of RDD checkpointing
+ }
+ testOperation(input, operation, expectedOutput, numBatches, true)
+ }
+ }
}