diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-18 17:51:14 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-12-18 17:51:14 -0800 |
commit | e93b391d75a1c2e17ad93caff39e8fc34d640935 (patch) | |
tree | 925eb68b33ddbabc4482aae4b6ead51668a22653 /streaming/src/test | |
parent | b80ec05635132f96772545803a10a1bbfa1250e7 (diff) | |
parent | 5ea187277c2b11e5db813f7ff9f214d7b85190f6 (diff) | |
download | spark-e93b391d75a1c2e17ad93caff39e8fc34d640935.tar.gz spark-e93b391d75a1c2e17ad93caff39e8fc34d640935.tar.bz2 spark-e93b391d75a1c2e17ad93caff39e8fc34d640935.zip |
Merge branch 'apache-master' into scheduler-update
Conflicts:
streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
Diffstat (limited to 'streaming/src/test')
4 files changed, 93 insertions, 82 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index ad4a8b9535..daeb99f5b7 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -21,28 +21,31 @@ import com.google.common.base.Optional; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.io.Files; + import kafka.serializer.StringDecoder; + import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; import org.apache.spark.streaming.api.java.JavaDStreamLike; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; + import scala.Tuple2; +import twitter4j.Status; + import org.apache.spark.HashPartitioner; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.api.java.JavaRDDLike; -import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.*; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.dstream.SparkFlumeEvent; import org.apache.spark.streaming.JavaTestUtils; import org.apache.spark.streaming.JavaCheckpointTestUtils; -import org.apache.spark.streaming.InputStreamsSuite; import java.io.*; import java.util.*; @@ -51,7 +54,6 @@ import akka.actor.Props; import akka.zeromq.Subscribe; - // The test suite itself is Serializable so that anonymous Function implementations can be // serialized, as an alternative to converting these anonymous classes to static inner classes; // see http://stackoverflow.com/questions/758570/. @@ -86,8 +88,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList(3L), Arrays.asList(1L)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream count = stream.count(); + JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<Long> count = stream.count(); JavaTestUtils.attachTestOutputStream(count); List<List<Long>> result = JavaTestUtils.runStreams(ssc, 3, 3); assertOrderInvariantEquals(expected, result); @@ -103,8 +105,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList(5,5), Arrays.asList(9,4)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream letterCount = stream.map(new Function<String, Integer>() { + JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<Integer> letterCount = stream.map(new Function<String, Integer>() { @Override public Integer call(String s) throws Exception { return s.length(); @@ -129,8 +131,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList(7,8,9,4,5,6), Arrays.asList(7,8,9)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream windowed = stream.window(new Duration(2000)); + JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<Integer> windowed = stream.window(new Duration(2000)); JavaTestUtils.attachTestOutputStream(windowed); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4); @@ -153,8 +155,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList(7,8,9,10,11,12,13,14,15,16,17,18), Arrays.asList(13,14,15,16,17,18)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream windowed = stream.window(new Duration(4000), new Duration(2000)); + JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<Integer> windowed = stream.window(new Duration(4000), new Duration(2000)); JavaTestUtils.attachTestOutputStream(windowed); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 8, 4); @@ -171,8 +173,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList("giants"), Arrays.asList("yankees")); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream filtered = stream.filter(new Function<String, Boolean>() { + JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<String> filtered = stream.filter(new Function<String, Boolean>() { @Override public Boolean call(String s) throws Exception { return s.contains("a"); @@ -227,8 +229,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList(Arrays.asList("giants", "dodgers")), Arrays.asList(Arrays.asList("yankees", "red socks"))); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream glommed = stream.glom(); + JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<List<String>> glommed = stream.glom(); JavaTestUtils.attachTestOutputStream(glommed); List<List<List<String>>> result = JavaTestUtils.runStreams(ssc, 2, 2); @@ -245,8 +247,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList("GIANTSDODGERS"), Arrays.asList("YANKEESRED SOCKS")); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream mapped = stream.mapPartitions(new FlatMapFunction<Iterator<String>, String>() { + JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<String> mapped = stream.mapPartitions(new FlatMapFunction<Iterator<String>, String>() { @Override public Iterable<String> call(Iterator<String> in) { String out = ""; @@ -288,8 +290,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList(15), Arrays.asList(24)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream reduced = stream.reduce(new IntegerSum()); + JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<Integer> reduced = stream.reduce(new IntegerSum()); JavaTestUtils.attachTestOutputStream(reduced); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -309,8 +311,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList(39), Arrays.asList(24)); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream reducedWindowed = stream.reduceByWindow(new IntegerSum(), + JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<Integer> reducedWindowed = stream.reduceByWindow(new IntegerSum(), new IntegerDifference(), new Duration(2000), new Duration(1000)); JavaTestUtils.attachTestOutputStream(reducedWindowed); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4); @@ -695,8 +697,8 @@ public class JavaAPISuite implements Serializable { Arrays.asList("b", "o", "o", "d","o","d","g","e","r","s"), Arrays.asList("a","t","h","l","e","t","i","c","s")); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream flatMapped = stream.flatMap(new FlatMapFunction<String, String>() { + JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<String> flatMapped = stream.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { return Lists.newArrayList(x.split("(?!^)")); @@ -742,8 +744,8 @@ public class JavaAPISuite implements Serializable { new Tuple2<Integer, String>(9, "c"), new Tuple2<Integer, String>(9, "s"))); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaPairDStream flatMapped = stream.flatMap(new PairFlatMapFunction<String, Integer, String>() { + JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaPairDStream<Integer,String> flatMapped = stream.flatMap(new PairFlatMapFunction<String, Integer, String>() { @Override public Iterable<Tuple2<Integer, String>> call(String in) throws Exception { List<Tuple2<Integer, String>> out = Lists.newArrayList(); @@ -776,10 +778,10 @@ public class JavaAPISuite implements Serializable { Arrays.asList(2,2,5,5), Arrays.asList(3,3,6,6)); - JavaDStream stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 2); - JavaDStream stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 2); + JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 2); + JavaDStream<Integer> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 2); - JavaDStream unioned = stream1.union(stream2); + JavaDStream<Integer> unioned = stream1.union(stream2); JavaTestUtils.attachTestOutputStream(unioned); List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3); @@ -790,7 +792,7 @@ public class JavaAPISuite implements Serializable { * Performs an order-invariant comparison of lists representing two RDD streams. This allows * us to account for ordering variation within individual RDD's which occurs during windowing. */ - public static <T extends Comparable> void assertOrderInvariantEquals( + public static <T extends Comparable<T>> void assertOrderInvariantEquals( List<List<T>> expected, List<List<T>> actual) { for (List<T> list: expected) { Collections.sort(list); @@ -813,11 +815,11 @@ public class JavaAPISuite implements Serializable { Arrays.asList(new Tuple2<String, Integer>("giants", 6)), Arrays.asList(new Tuple2<String, Integer>("yankees", 7))); - JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1); JavaPairDStream<String, Integer> pairStream = stream.map( new PairFunction<String, String, Integer>() { @Override - public Tuple2 call(String in) throws Exception { + public Tuple2<String, Integer> call(String in) throws Exception { return new Tuple2<String, Integer>(in, in.length()); } }); @@ -1540,8 +1542,8 @@ public class JavaAPISuite implements Serializable { File tempDir = Files.createTempDir(); ssc.checkpoint(tempDir.getAbsolutePath()); - JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1); - JavaDStream letterCount = stream.map(new Function<String, Integer>() { + JavaDStream<String> stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1); + JavaDStream<Integer> letterCount = stream.map(new Function<String, Integer>() { @Override public Integer call(String s) throws Exception { return s.length(); @@ -1616,7 +1618,7 @@ public class JavaAPISuite implements Serializable { @Test public void testSocketTextStream() { - JavaDStream test = ssc.socketTextStream("localhost", 12345); + JavaDStream<String> test = ssc.socketTextStream("localhost", 12345); } @Test @@ -1636,7 +1638,7 @@ public class JavaAPISuite implements Serializable { } } - JavaDStream test = ssc.socketStream( + JavaDStream<String> test = ssc.socketStream( "localhost", 12345, new Converter(), @@ -1645,39 +1647,39 @@ public class JavaAPISuite implements Serializable { @Test public void testTextFileStream() { - JavaDStream test = ssc.textFileStream("/tmp/foo"); + JavaDStream<String> test = ssc.textFileStream("/tmp/foo"); } @Test public void testRawSocketStream() { - JavaDStream test = ssc.rawSocketStream("localhost", 12345); + JavaDStream<String> test = ssc.rawSocketStream("localhost", 12345); } @Test public void testFlumeStream() { - JavaDStream test = ssc.flumeStream("localhost", 12345, StorageLevel.MEMORY_ONLY()); + JavaDStream<SparkFlumeEvent> test = ssc.flumeStream("localhost", 12345, StorageLevel.MEMORY_ONLY()); } @Test public void testFileStream() { JavaPairDStream<String, String> foo = - ssc.<String, String, SequenceFileInputFormat>fileStream("/tmp/foo"); + ssc.<String, String, SequenceFileInputFormat<String,String>>fileStream("/tmp/foo"); } @Test public void testTwitterStream() { String[] filters = new String[] { "good", "bad", "ugly" }; - JavaDStream test = ssc.twitterStream(filters, StorageLevel.MEMORY_ONLY()); + JavaDStream<Status> test = ssc.twitterStream(filters, StorageLevel.MEMORY_ONLY()); } @Test public void testActorStream() { - JavaDStream test = ssc.actorStream((Props)null, "TestActor", StorageLevel.MEMORY_ONLY()); + JavaDStream<String> test = ssc.actorStream((Props)null, "TestActor", StorageLevel.MEMORY_ONLY()); } @Test public void testZeroMQStream() { - JavaDStream test = ssc.zeroMQStream("url", (Subscribe) null, new Function<byte[][], Iterable<String>>() { + JavaDStream<String> test = ssc.zeroMQStream("url", (Subscribe) null, new Function<byte[][], Iterable<String>>() { @Override public Iterable<String> call(byte[][] b) throws Exception { return null; diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala index 5e384eeee4..42ab9590d6 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala @@ -17,7 +17,9 @@ package org.apache.spark.streaming -import collection.mutable.{SynchronizedBuffer, ArrayBuffer} +import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} +import scala.reflect.ClassTag + import java.util.{List => JList} import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaDStreamLike, JavaDStream, JavaStreamingContext} import org.apache.spark.streaming._ @@ -31,15 +33,15 @@ trait JavaTestBase extends TestSuiteBase { /** * Create a [[org.apache.spark.streaming.TestInputStream]] and attach it to the supplied context. * The stream will be derived from the supplied lists of Java objects. - **/ + */ def attachTestInputStream[T]( ssc: JavaStreamingContext, data: JList[JList[T]], numPartitions: Int) = { val seqData = data.map(Seq(_:_*)) - implicit val cm: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val dstream = new TestInputStream[T](ssc.ssc, seqData, numPartitions) ssc.ssc.registerInputStream(dstream) new JavaDStream[T](dstream) @@ -52,8 +54,8 @@ trait JavaTestBase extends TestSuiteBase { def attachTestOutputStream[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]]( dstream: JavaDStreamLike[T, This, R]) = { - implicit val cm: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val ostream = new TestOutputStreamWithPartitions(dstream.dstream) dstream.dstream.ssc.registerOutputStream(ostream) } @@ -67,8 +69,8 @@ trait JavaTestBase extends TestSuiteBase { */ def runStreams[V]( ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = { - implicit val cm: ClassManifest[V] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + implicit val cm: ClassTag[V] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput) val out = new ArrayList[JList[V]]() res.map(entry => out.append(new ArrayList[V](entry))) @@ -85,8 +87,8 @@ trait JavaTestBase extends TestSuiteBase { */ def runStreamsWithPartitions[V](ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[JList[V]]] = { - implicit val cm: ClassManifest[V] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + implicit val cm: ClassTag[V] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] val res = runStreamsWithPartitions[V](ssc.ssc, numBatches, numExpectedOutput) val out = new ArrayList[JList[JList[V]]]() res.map{entry => diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala index c93075e3b3..67a0841535 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala @@ -20,14 +20,20 @@ package org.apache.spark.streaming import dstream.FileInputDStream import org.apache.spark.streaming.StreamingContext._ import java.io.File -import runtime.RichInt -import org.scalatest.BeforeAndAfter + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + import org.apache.commons.io.FileUtils -import collection.mutable.{SynchronizedBuffer, ArrayBuffer} -import util.{Clock, ManualClock} -import scala.util.Random +import org.scalatest.BeforeAndAfter + import com.google.common.io.Files +import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions +import org.apache.spark.streaming.dstream.FileInputDStream +import org.apache.spark.streaming.util.ManualClock + + /** * This test suites tests the checkpointing functionality of DStreams - @@ -68,13 +74,13 @@ class CheckpointSuite extends TestSuiteBase { // Setup the streams val input = (1 to 10).map(_ => Seq("a")).toSeq val operation = (st: DStream[String]) => { - val updateFunc = (values: Seq[Int], state: Option[RichInt]) => { - Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0))) + val updateFunc = (values: Seq[Int], state: Option[Int]) => { + Some((values.foldLeft(0)(_ + _) + state.getOrElse(0))) } st.map(x => (x, 1)) - .updateStateByKey[RichInt](updateFunc) + .updateStateByKey(updateFunc) .checkpoint(stateStreamCheckpointInterval) - .map(t => (t._1, t._2.self)) + .map(t => (t._1, t._2)) } var ssc = setupStreams(input, operation) var stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head @@ -174,13 +180,13 @@ class CheckpointSuite extends TestSuiteBase { val input = (1 to 10).map(_ => Seq("a")).toSeq val output = (1 to 10).map(x => Seq(("a", x))).toSeq val operation = (st: DStream[String]) => { - val updateFunc = (values: Seq[Int], state: Option[RichInt]) => { - Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0))) + val updateFunc = (values: Seq[Int], state: Option[Int]) => { + Some((values.foldLeft(0)(_ + _) + state.getOrElse(0))) } st.map(x => (x, 1)) - .updateStateByKey[RichInt](updateFunc) + .updateStateByKey(updateFunc) .checkpoint(batchDuration * 2) - .map(t => (t._1, t._2.self)) + .map(t => (t._1, t._2)) } testCheckpointedOperation(input, operation, output, 7) } @@ -306,7 +312,7 @@ class CheckpointSuite extends TestSuiteBase { * NOTE: This takes into consideration that the last batch processed before * master failure will be re-processed after restart/recovery. */ - def testCheckpointedOperation[U: ClassManifest, V: ClassManifest]( + def testCheckpointedOperation[U: ClassTag, V: ClassTag]( input: Seq[Seq[U]], operation: DStream[U] => DStream[V], expectedOutput: Seq[Seq[V]], @@ -350,7 +356,7 @@ class CheckpointSuite extends TestSuiteBase { * Advances the manual clock on the streaming scheduler by given number of batches. * It also waits for the expected amount of time for each batch. */ - def advanceTimeWithRealDelay[V: ClassManifest](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = { + def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = { val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] logInfo("Manual clock before advancing = " + clock.time) for (i <- 1 to numBatches.toInt) { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala index fbbeb8f0ee..e969e91d13 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala @@ -20,8 +20,9 @@ package org.apache.spark.streaming import org.apache.spark.streaming.dstream.{InputDStream, ForEachDStream} import org.apache.spark.streaming.util.ManualClock -import collection.mutable.ArrayBuffer -import collection.mutable.SynchronizedBuffer +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.SynchronizedBuffer +import scala.reflect.ClassTag import java.io.{ObjectInputStream, IOException} @@ -35,7 +36,7 @@ import org.apache.spark.rdd.RDD * replayable, reliable message queue like Kafka. It requires a sequence as input, and * returns the i_th element at the i_th batch unde manual clock. */ -class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[T]], numPartitions: Int) +class TestInputStream[T: ClassTag](ssc_ : StreamingContext, input: Seq[Seq[T]], numPartitions: Int) extends InputDStream[T](ssc_) { def start() {} @@ -63,7 +64,7 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[ * * The buffer contains a sequence of RDD's, each containing a sequence of items */ -class TestOutputStream[T: ClassManifest](parent: DStream[T], +class TestOutputStream[T: ClassTag](parent: DStream[T], val output: ArrayBuffer[Seq[T]] = ArrayBuffer[Seq[T]]()) extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { val collected = rdd.collect() @@ -85,7 +86,7 @@ class TestOutputStream[T: ClassManifest](parent: DStream[T], * The buffer contains a sequence of RDD's, each containing a sequence of partitions, each * containing a sequence of items. */ -class TestOutputStreamWithPartitions[T: ClassManifest](parent: DStream[T], +class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T], val output: ArrayBuffer[Seq[Seq[T]]] = ArrayBuffer[Seq[Seq[T]]]()) extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => { val collected = rdd.glom().collect().map(_.toSeq) @@ -163,7 +164,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { * Set up required DStreams to test the DStream operation using the two sequences * of input collections. */ - def setupStreams[U: ClassManifest, V: ClassManifest]( + def setupStreams[U: ClassTag, V: ClassTag]( input: Seq[Seq[U]], operation: DStream[U] => DStream[V], numPartitions: Int = numInputPartitions @@ -189,7 +190,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { * Set up required DStreams to test the binary operation using the sequence * of input collections. */ - def setupStreams[U: ClassManifest, V: ClassManifest, W: ClassManifest]( + def setupStreams[U: ClassTag, V: ClassTag, W: ClassTag]( input1: Seq[Seq[U]], input2: Seq[Seq[V]], operation: (DStream[U], DStream[V]) => DStream[W] @@ -220,7 +221,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { * * Returns a sequence of items for each RDD. */ - def runStreams[V: ClassManifest]( + def runStreams[V: ClassTag]( ssc: StreamingContext, numBatches: Int, numExpectedOutput: Int @@ -237,7 +238,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { * Returns a sequence of RDD's. Each RDD is represented as several sequences of items, each * representing one partition. */ - def runStreamsWithPartitions[V: ClassManifest]( + def runStreamsWithPartitions[V: ClassTag]( ssc: StreamingContext, numBatches: Int, numExpectedOutput: Int @@ -293,7 +294,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { * is same as the expected output values, by comparing the output * collections either as lists (order matters) or sets (order does not matter) */ - def verifyOutput[V: ClassManifest]( + def verifyOutput[V: ClassTag]( output: Seq[Seq[V]], expectedOutput: Seq[Seq[V]], useSet: Boolean @@ -323,7 +324,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { * Test unary DStream operation with a list of inputs, with number of * batches to run same as the number of expected output values */ - def testOperation[U: ClassManifest, V: ClassManifest]( + def testOperation[U: ClassTag, V: ClassTag]( input: Seq[Seq[U]], operation: DStream[U] => DStream[V], expectedOutput: Seq[Seq[V]], @@ -341,7 +342,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { * @param useSet Compare the output values with the expected output values * as sets (order matters) or as lists (order does not matter) */ - def testOperation[U: ClassManifest, V: ClassManifest]( + def testOperation[U: ClassTag, V: ClassTag]( input: Seq[Seq[U]], operation: DStream[U] => DStream[V], expectedOutput: Seq[Seq[V]], @@ -358,7 +359,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { * Test binary DStream operation with two lists of inputs, with number of * batches to run same as the number of expected output values */ - def testOperation[U: ClassManifest, V: ClassManifest, W: ClassManifest]( + def testOperation[U: ClassTag, V: ClassTag, W: ClassTag]( input1: Seq[Seq[U]], input2: Seq[Seq[V]], operation: (DStream[U], DStream[V]) => DStream[W], @@ -378,7 +379,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging { * @param useSet Compare the output values with the expected output values * as sets (order matters) or as lists (order does not matter) */ - def testOperation[U: ClassManifest, V: ClassManifest, W: ClassManifest]( + def testOperation[U: ClassTag, V: ClassTag, W: ClassTag]( input1: Seq[Seq[U]], input2: Seq[Seq[V]], operation: (DStream[U], DStream[V]) => DStream[W], |