aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-12-18 17:51:14 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-12-18 17:51:14 -0800
commite93b391d75a1c2e17ad93caff39e8fc34d640935 (patch)
tree925eb68b33ddbabc4482aae4b6ead51668a22653 /streaming/src/test
parentb80ec05635132f96772545803a10a1bbfa1250e7 (diff)
parent5ea187277c2b11e5db813f7ff9f214d7b85190f6 (diff)
downloadspark-e93b391d75a1c2e17ad93caff39e8fc34d640935.tar.gz
spark-e93b391d75a1c2e17ad93caff39e8fc34d640935.tar.bz2
spark-e93b391d75a1c2e17ad93caff39e8fc34d640935.zip
Merge branch 'apache-master' into scheduler-update
Conflicts: streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala streaming/src/main/scala/org/apache/spark/streaming/dstream/ForEachDStream.scala
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java88
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala22
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala36
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala29
4 files changed, 93 insertions, 82 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index ad4a8b9535..daeb99f5b7 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -21,28 +21,31 @@ import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.io.Files;
+
import kafka.serializer.StringDecoder;
+
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.spark.streaming.api.java.JavaDStreamLike;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+
import scala.Tuple2;
+import twitter4j.Status;
+
import org.apache.spark.HashPartitioner;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
-import org.apache.spark.api.java.JavaRDDLike;
-import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.dstream.SparkFlumeEvent;
import org.apache.spark.streaming.JavaTestUtils;
import org.apache.spark.streaming.JavaCheckpointTestUtils;
-import org.apache.spark.streaming.InputStreamsSuite;
import java.io.*;
import java.util.*;
@@ -51,7 +54,6 @@ import akka.actor.Props;
import akka.zeromq.Subscribe;
-
// The test suite itself is Serializable so that anonymous Function implementations can be
// serialized, as an alternative to converting these anonymous classes to static inner classes;
// see http://stackoverflow.com/questions/758570/.
@@ -86,8 +88,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(3L),
Arrays.asList(1L));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream count = stream.count();
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Long> count = stream.count();
JavaTestUtils.attachTestOutputStream(count);
List<List<Long>> result = JavaTestUtils.runStreams(ssc, 3, 3);
assertOrderInvariantEquals(expected, result);
@@ -103,8 +105,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(5,5),
Arrays.asList(9,4));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream letterCount = stream.map(new Function<String, Integer>() {
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Integer> letterCount = stream.map(new Function<String, Integer>() {
@Override
public Integer call(String s) throws Exception {
return s.length();
@@ -129,8 +131,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(7,8,9,4,5,6),
Arrays.asList(7,8,9));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream windowed = stream.window(new Duration(2000));
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Integer> windowed = stream.window(new Duration(2000));
JavaTestUtils.attachTestOutputStream(windowed);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
@@ -153,8 +155,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(7,8,9,10,11,12,13,14,15,16,17,18),
Arrays.asList(13,14,15,16,17,18));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream windowed = stream.window(new Duration(4000), new Duration(2000));
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Integer> windowed = stream.window(new Duration(4000), new Duration(2000));
JavaTestUtils.attachTestOutputStream(windowed);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 8, 4);
@@ -171,8 +173,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList("giants"),
Arrays.asList("yankees"));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream filtered = stream.filter(new Function<String, Boolean>() {
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<String> filtered = stream.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String s) throws Exception {
return s.contains("a");
@@ -227,8 +229,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(Arrays.asList("giants", "dodgers")),
Arrays.asList(Arrays.asList("yankees", "red socks")));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream glommed = stream.glom();
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<List<String>> glommed = stream.glom();
JavaTestUtils.attachTestOutputStream(glommed);
List<List<List<String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
@@ -245,8 +247,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList("GIANTSDODGERS"),
Arrays.asList("YANKEESRED SOCKS"));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream mapped = stream.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<String> mapped = stream.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
@Override
public Iterable<String> call(Iterator<String> in) {
String out = "";
@@ -288,8 +290,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(15),
Arrays.asList(24));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream reduced = stream.reduce(new IntegerSum());
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Integer> reduced = stream.reduce(new IntegerSum());
JavaTestUtils.attachTestOutputStream(reduced);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -309,8 +311,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(39),
Arrays.asList(24));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream reducedWindowed = stream.reduceByWindow(new IntegerSum(),
+ JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Integer> reducedWindowed = stream.reduceByWindow(new IntegerSum(),
new IntegerDifference(), new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(reducedWindowed);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
@@ -695,8 +697,8 @@ public class JavaAPISuite implements Serializable {
Arrays.asList("b", "o", "o", "d","o","d","g","e","r","s"),
Arrays.asList("a","t","h","l","e","t","i","c","s"));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream flatMapped = stream.flatMap(new FlatMapFunction<String, String>() {
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<String> flatMapped = stream.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
return Lists.newArrayList(x.split("(?!^)"));
@@ -742,8 +744,8 @@ public class JavaAPISuite implements Serializable {
new Tuple2<Integer, String>(9, "c"),
new Tuple2<Integer, String>(9, "s")));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaPairDStream flatMapped = stream.flatMap(new PairFlatMapFunction<String, Integer, String>() {
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaPairDStream<Integer,String> flatMapped = stream.flatMap(new PairFlatMapFunction<String, Integer, String>() {
@Override
public Iterable<Tuple2<Integer, String>> call(String in) throws Exception {
List<Tuple2<Integer, String>> out = Lists.newArrayList();
@@ -776,10 +778,10 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(2,2,5,5),
Arrays.asList(3,3,6,6));
- JavaDStream stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 2);
- JavaDStream stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 2);
+ JavaDStream<Integer> stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 2);
+ JavaDStream<Integer> stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 2);
- JavaDStream unioned = stream1.union(stream2);
+ JavaDStream<Integer> unioned = stream1.union(stream2);
JavaTestUtils.attachTestOutputStream(unioned);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -790,7 +792,7 @@ public class JavaAPISuite implements Serializable {
* Performs an order-invariant comparison of lists representing two RDD streams. This allows
* us to account for ordering variation within individual RDD's which occurs during windowing.
*/
- public static <T extends Comparable> void assertOrderInvariantEquals(
+ public static <T extends Comparable<T>> void assertOrderInvariantEquals(
List<List<T>> expected, List<List<T>> actual) {
for (List<T> list: expected) {
Collections.sort(list);
@@ -813,11 +815,11 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(new Tuple2<String, Integer>("giants", 6)),
Arrays.asList(new Tuple2<String, Integer>("yankees", 7)));
- JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = stream.map(
new PairFunction<String, String, Integer>() {
@Override
- public Tuple2 call(String in) throws Exception {
+ public Tuple2<String, Integer> call(String in) throws Exception {
return new Tuple2<String, Integer>(in, in.length());
}
});
@@ -1540,8 +1542,8 @@ public class JavaAPISuite implements Serializable {
File tempDir = Files.createTempDir();
ssc.checkpoint(tempDir.getAbsolutePath());
- JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream letterCount = stream.map(new Function<String, Integer>() {
+ JavaDStream<String> stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
+ JavaDStream<Integer> letterCount = stream.map(new Function<String, Integer>() {
@Override
public Integer call(String s) throws Exception {
return s.length();
@@ -1616,7 +1618,7 @@ public class JavaAPISuite implements Serializable {
@Test
public void testSocketTextStream() {
- JavaDStream test = ssc.socketTextStream("localhost", 12345);
+ JavaDStream<String> test = ssc.socketTextStream("localhost", 12345);
}
@Test
@@ -1636,7 +1638,7 @@ public class JavaAPISuite implements Serializable {
}
}
- JavaDStream test = ssc.socketStream(
+ JavaDStream<String> test = ssc.socketStream(
"localhost",
12345,
new Converter(),
@@ -1645,39 +1647,39 @@ public class JavaAPISuite implements Serializable {
@Test
public void testTextFileStream() {
- JavaDStream test = ssc.textFileStream("/tmp/foo");
+ JavaDStream<String> test = ssc.textFileStream("/tmp/foo");
}
@Test
public void testRawSocketStream() {
- JavaDStream test = ssc.rawSocketStream("localhost", 12345);
+ JavaDStream<String> test = ssc.rawSocketStream("localhost", 12345);
}
@Test
public void testFlumeStream() {
- JavaDStream test = ssc.flumeStream("localhost", 12345, StorageLevel.MEMORY_ONLY());
+ JavaDStream<SparkFlumeEvent> test = ssc.flumeStream("localhost", 12345, StorageLevel.MEMORY_ONLY());
}
@Test
public void testFileStream() {
JavaPairDStream<String, String> foo =
- ssc.<String, String, SequenceFileInputFormat>fileStream("/tmp/foo");
+ ssc.<String, String, SequenceFileInputFormat<String,String>>fileStream("/tmp/foo");
}
@Test
public void testTwitterStream() {
String[] filters = new String[] { "good", "bad", "ugly" };
- JavaDStream test = ssc.twitterStream(filters, StorageLevel.MEMORY_ONLY());
+ JavaDStream<Status> test = ssc.twitterStream(filters, StorageLevel.MEMORY_ONLY());
}
@Test
public void testActorStream() {
- JavaDStream test = ssc.actorStream((Props)null, "TestActor", StorageLevel.MEMORY_ONLY());
+ JavaDStream<String> test = ssc.actorStream((Props)null, "TestActor", StorageLevel.MEMORY_ONLY());
}
@Test
public void testZeroMQStream() {
- JavaDStream test = ssc.zeroMQStream("url", (Subscribe) null, new Function<byte[][], Iterable<String>>() {
+ JavaDStream<String> test = ssc.zeroMQStream("url", (Subscribe) null, new Function<byte[][], Iterable<String>>() {
@Override
public Iterable<String> call(byte[][] b) throws Exception {
return null;
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
index 5e384eeee4..42ab9590d6 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
@@ -17,7 +17,9 @@
package org.apache.spark.streaming
-import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+import scala.reflect.ClassTag
+
import java.util.{List => JList}
import org.apache.spark.streaming.api.java.{JavaPairDStream, JavaDStreamLike, JavaDStream, JavaStreamingContext}
import org.apache.spark.streaming._
@@ -31,15 +33,15 @@ trait JavaTestBase extends TestSuiteBase {
/**
* Create a [[org.apache.spark.streaming.TestInputStream]] and attach it to the supplied context.
* The stream will be derived from the supplied lists of Java objects.
- **/
+ */
def attachTestInputStream[T](
ssc: JavaStreamingContext,
data: JList[JList[T]],
numPartitions: Int) = {
val seqData = data.map(Seq(_:_*))
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val dstream = new TestInputStream[T](ssc.ssc, seqData, numPartitions)
ssc.ssc.registerInputStream(dstream)
new JavaDStream[T](dstream)
@@ -52,8 +54,8 @@ trait JavaTestBase extends TestSuiteBase {
def attachTestOutputStream[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]](
dstream: JavaDStreamLike[T, This, R]) =
{
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val ostream = new TestOutputStreamWithPartitions(dstream.dstream)
dstream.dstream.ssc.registerOutputStream(ostream)
}
@@ -67,8 +69,8 @@ trait JavaTestBase extends TestSuiteBase {
*/
def runStreams[V](
ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = {
- implicit val cm: ClassManifest[V] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+ implicit val cm: ClassTag[V] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput)
val out = new ArrayList[JList[V]]()
res.map(entry => out.append(new ArrayList[V](entry)))
@@ -85,8 +87,8 @@ trait JavaTestBase extends TestSuiteBase {
*/
def runStreamsWithPartitions[V](ssc: JavaStreamingContext, numBatches: Int,
numExpectedOutput: Int): JList[JList[JList[V]]] = {
- implicit val cm: ClassManifest[V] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+ implicit val cm: ClassTag[V] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
val res = runStreamsWithPartitions[V](ssc.ssc, numBatches, numExpectedOutput)
val out = new ArrayList[JList[JList[V]]]()
res.map{entry =>
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index c93075e3b3..67a0841535 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -20,14 +20,20 @@ package org.apache.spark.streaming
import dstream.FileInputDStream
import org.apache.spark.streaming.StreamingContext._
import java.io.File
-import runtime.RichInt
-import org.scalatest.BeforeAndAfter
+
+import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
+
import org.apache.commons.io.FileUtils
-import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
-import util.{Clock, ManualClock}
-import scala.util.Random
+import org.scalatest.BeforeAndAfter
+
import com.google.common.io.Files
+import org.apache.spark.streaming.StreamingContext.toPairDStreamFunctions
+import org.apache.spark.streaming.dstream.FileInputDStream
+import org.apache.spark.streaming.util.ManualClock
+
+
/**
* This test suites tests the checkpointing functionality of DStreams -
@@ -68,13 +74,13 @@ class CheckpointSuite extends TestSuiteBase {
// Setup the streams
val input = (1 to 10).map(_ => Seq("a")).toSeq
val operation = (st: DStream[String]) => {
- val updateFunc = (values: Seq[Int], state: Option[RichInt]) => {
- Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0)))
+ val updateFunc = (values: Seq[Int], state: Option[Int]) => {
+ Some((values.foldLeft(0)(_ + _) + state.getOrElse(0)))
}
st.map(x => (x, 1))
- .updateStateByKey[RichInt](updateFunc)
+ .updateStateByKey(updateFunc)
.checkpoint(stateStreamCheckpointInterval)
- .map(t => (t._1, t._2.self))
+ .map(t => (t._1, t._2))
}
var ssc = setupStreams(input, operation)
var stateStream = ssc.graph.getOutputStreams().head.dependencies.head.dependencies.head
@@ -174,13 +180,13 @@ class CheckpointSuite extends TestSuiteBase {
val input = (1 to 10).map(_ => Seq("a")).toSeq
val output = (1 to 10).map(x => Seq(("a", x))).toSeq
val operation = (st: DStream[String]) => {
- val updateFunc = (values: Seq[Int], state: Option[RichInt]) => {
- Some(new RichInt(values.foldLeft(0)(_ + _) + state.map(_.self).getOrElse(0)))
+ val updateFunc = (values: Seq[Int], state: Option[Int]) => {
+ Some((values.foldLeft(0)(_ + _) + state.getOrElse(0)))
}
st.map(x => (x, 1))
- .updateStateByKey[RichInt](updateFunc)
+ .updateStateByKey(updateFunc)
.checkpoint(batchDuration * 2)
- .map(t => (t._1, t._2.self))
+ .map(t => (t._1, t._2))
}
testCheckpointedOperation(input, operation, output, 7)
}
@@ -306,7 +312,7 @@ class CheckpointSuite extends TestSuiteBase {
* NOTE: This takes into consideration that the last batch processed before
* master failure will be re-processed after restart/recovery.
*/
- def testCheckpointedOperation[U: ClassManifest, V: ClassManifest](
+ def testCheckpointedOperation[U: ClassTag, V: ClassTag](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
expectedOutput: Seq[Seq[V]],
@@ -350,7 +356,7 @@ class CheckpointSuite extends TestSuiteBase {
* Advances the manual clock on the streaming scheduler by given number of batches.
* It also waits for the expected amount of time for each batch.
*/
- def advanceTimeWithRealDelay[V: ClassManifest](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = {
+ def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
logInfo("Manual clock before advancing = " + clock.time)
for (i <- 1 to numBatches.toInt) {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index fbbeb8f0ee..e969e91d13 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -20,8 +20,9 @@ package org.apache.spark.streaming
import org.apache.spark.streaming.dstream.{InputDStream, ForEachDStream}
import org.apache.spark.streaming.util.ManualClock
-import collection.mutable.ArrayBuffer
-import collection.mutable.SynchronizedBuffer
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.SynchronizedBuffer
+import scala.reflect.ClassTag
import java.io.{ObjectInputStream, IOException}
@@ -35,7 +36,7 @@ import org.apache.spark.rdd.RDD
* replayable, reliable message queue like Kafka. It requires a sequence as input, and
* returns the i_th element at the i_th batch unde manual clock.
*/
-class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[T]], numPartitions: Int)
+class TestInputStream[T: ClassTag](ssc_ : StreamingContext, input: Seq[Seq[T]], numPartitions: Int)
extends InputDStream[T](ssc_) {
def start() {}
@@ -63,7 +64,7 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[
*
* The buffer contains a sequence of RDD's, each containing a sequence of items
*/
-class TestOutputStream[T: ClassManifest](parent: DStream[T],
+class TestOutputStream[T: ClassTag](parent: DStream[T],
val output: ArrayBuffer[Seq[T]] = ArrayBuffer[Seq[T]]())
extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.collect()
@@ -85,7 +86,7 @@ class TestOutputStream[T: ClassManifest](parent: DStream[T],
* The buffer contains a sequence of RDD's, each containing a sequence of partitions, each
* containing a sequence of items.
*/
-class TestOutputStreamWithPartitions[T: ClassManifest](parent: DStream[T],
+class TestOutputStreamWithPartitions[T: ClassTag](parent: DStream[T],
val output: ArrayBuffer[Seq[Seq[T]]] = ArrayBuffer[Seq[Seq[T]]]())
extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.glom().collect().map(_.toSeq)
@@ -163,7 +164,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* Set up required DStreams to test the DStream operation using the two sequences
* of input collections.
*/
- def setupStreams[U: ClassManifest, V: ClassManifest](
+ def setupStreams[U: ClassTag, V: ClassTag](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
numPartitions: Int = numInputPartitions
@@ -189,7 +190,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* Set up required DStreams to test the binary operation using the sequence
* of input collections.
*/
- def setupStreams[U: ClassManifest, V: ClassManifest, W: ClassManifest](
+ def setupStreams[U: ClassTag, V: ClassTag, W: ClassTag](
input1: Seq[Seq[U]],
input2: Seq[Seq[V]],
operation: (DStream[U], DStream[V]) => DStream[W]
@@ -220,7 +221,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
*
* Returns a sequence of items for each RDD.
*/
- def runStreams[V: ClassManifest](
+ def runStreams[V: ClassTag](
ssc: StreamingContext,
numBatches: Int,
numExpectedOutput: Int
@@ -237,7 +238,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* Returns a sequence of RDD's. Each RDD is represented as several sequences of items, each
* representing one partition.
*/
- def runStreamsWithPartitions[V: ClassManifest](
+ def runStreamsWithPartitions[V: ClassTag](
ssc: StreamingContext,
numBatches: Int,
numExpectedOutput: Int
@@ -293,7 +294,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* is same as the expected output values, by comparing the output
* collections either as lists (order matters) or sets (order does not matter)
*/
- def verifyOutput[V: ClassManifest](
+ def verifyOutput[V: ClassTag](
output: Seq[Seq[V]],
expectedOutput: Seq[Seq[V]],
useSet: Boolean
@@ -323,7 +324,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* Test unary DStream operation with a list of inputs, with number of
* batches to run same as the number of expected output values
*/
- def testOperation[U: ClassManifest, V: ClassManifest](
+ def testOperation[U: ClassTag, V: ClassTag](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
expectedOutput: Seq[Seq[V]],
@@ -341,7 +342,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* @param useSet Compare the output values with the expected output values
* as sets (order matters) or as lists (order does not matter)
*/
- def testOperation[U: ClassManifest, V: ClassManifest](
+ def testOperation[U: ClassTag, V: ClassTag](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
expectedOutput: Seq[Seq[V]],
@@ -358,7 +359,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* Test binary DStream operation with two lists of inputs, with number of
* batches to run same as the number of expected output values
*/
- def testOperation[U: ClassManifest, V: ClassManifest, W: ClassManifest](
+ def testOperation[U: ClassTag, V: ClassTag, W: ClassTag](
input1: Seq[Seq[U]],
input2: Seq[Seq[V]],
operation: (DStream[U], DStream[V]) => DStream[W],
@@ -378,7 +379,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* @param useSet Compare the output values with the expected output values
* as sets (order matters) or as lists (order does not matter)
*/
- def testOperation[U: ClassManifest, V: ClassManifest, W: ClassManifest](
+ def testOperation[U: ClassTag, V: ClassTag, W: ClassTag](
input1: Seq[Seq[U]],
input2: Seq[Seq[V]],
operation: (DStream[U], DStream[V]) => DStream[W],