aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2013-04-29 16:39:13 +0530
committerPrashant Sharma <prashant.s@imaginea.com>2013-04-29 16:39:13 +0530
commit8f3ac240cbdd678c0c76155b080dcc461355452e (patch)
treea35684b591baa426bd022e0a070d3d900ed2a6d0 /streaming/src/test
parent4b4a36ea7d7f9e1d9c9ee1d6738deea579dc1b4e (diff)
downloadspark-8f3ac240cbdd678c0c76155b080dcc461355452e.tar.gz
spark-8f3ac240cbdd678c0c76155b080dcc461355452e.tar.bz2
spark-8f3ac240cbdd678c0c76155b080dcc461355452e.zip
Fixed Warning: ClassManifest -> ClassTag
Diffstat (limited to 'streaming/src/test')
-rw-r--r--streaming/src/test/java/spark/streaming/JavaTestUtils.scala22
-rw-r--r--streaming/src/test/scala/spark/streaming/CheckpointSuite.scala5
-rw-r--r--streaming/src/test/scala/spark/streaming/TestSuiteBase.scala25
3 files changed, 28 insertions, 24 deletions
diff --git a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala
index 64a7e7cbf9..8a7c48bde6 100644
--- a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala
+++ b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala
@@ -1,6 +1,8 @@
package spark.streaming
-import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+import scala.reflect.ClassTag
+
import java.util.{List => JList}
import spark.streaming.api.java.{JavaPairDStream, JavaDStreamLike, JavaDStream, JavaStreamingContext}
import spark.streaming._
@@ -13,15 +15,15 @@ trait JavaTestBase extends TestSuiteBase {
/**
* Create a [[spark.streaming.TestInputStream]] and attach it to the supplied context.
* The stream will be derived from the supplied lists of Java objects.
- **/
+ */
def attachTestInputStream[T](
ssc: JavaStreamingContext,
data: JList[JList[T]],
numPartitions: Int) = {
val seqData = data.map(Seq(_:_*))
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val dstream = new TestInputStream[T](ssc.ssc, seqData, numPartitions)
ssc.ssc.registerInputStream(dstream)
new JavaDStream[T](dstream)
@@ -30,12 +32,12 @@ trait JavaTestBase extends TestSuiteBase {
/**
* Attach a provided stream to it's associated StreamingContext as a
* [[spark.streaming.TestOutputStream]].
- **/
+ */
def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T, This, R],
R <: spark.api.java.JavaRDDLike[T, R]](
dstream: JavaDStreamLike[T, This, R]) = {
- implicit val cm: ClassManifest[T] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
+ implicit val cm: ClassTag[T] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val ostream = new TestOutputStream(dstream.dstream,
new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]])
dstream.dstream.ssc.registerOutputStream(ostream)
@@ -48,8 +50,8 @@ trait JavaTestBase extends TestSuiteBase {
*/
def runStreams[V](
ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = {
- implicit val cm: ClassManifest[V] =
- implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+ implicit val cm: ClassTag[V] =
+ implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput)
val out = new ArrayList[JList[V]]()
res.map(entry => out.append(new ArrayList[V](entry)))
@@ -64,4 +66,4 @@ object JavaTestUtils extends JavaTestBase {
object JavaCheckpointTestUtils extends JavaTestBase {
override def actuallyWait = true
-} \ No newline at end of file
+}
diff --git a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
index f9285b19e2..143a26d911 100644
--- a/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/spark/streaming/CheckpointSuite.scala
@@ -3,6 +3,7 @@ package spark.streaming
import java.io.File
import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfter
@@ -297,7 +298,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
* NOTE: This takes into consideration that the last batch processed before
* master failure will be re-processed after restart/recovery.
*/
- def testCheckpointedOperation[U: ClassManifest, V: ClassManifest](
+ def testCheckpointedOperation[U: ClassTag, V: ClassTag](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
expectedOutput: Seq[Seq[V]],
@@ -340,7 +341,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
* Advances the manual clock on the streaming scheduler by given number of batches.
* It also waits for the expected amount of time for each batch.
*/
- def advanceTimeWithRealDelay[V: ClassManifest](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = {
+ def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
logInfo("Manual clock before advancing = " + clock.time)
for (i <- 1 to numBatches.toInt) {
diff --git a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
index ad6aa79d10..dc280b09c9 100644
--- a/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/spark/streaming/TestSuiteBase.scala
@@ -5,8 +5,9 @@ import spark.streaming.util.ManualClock
import spark.{RDD, Logging}
-import collection.mutable.ArrayBuffer
-import collection.mutable.SynchronizedBuffer
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.SynchronizedBuffer
+import scala.reflect.ClassTag
import java.io.{ObjectInputStream, IOException}
@@ -17,7 +18,7 @@ import org.scalatest.{BeforeAndAfter, FunSuite}
* replayable, reliable message queue like Kafka. It requires a sequence as input, and
* returns the i_th element at the i_th batch unde manual clock.
*/
-class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[T]], numPartitions: Int)
+class TestInputStream[T: ClassTag](ssc_ : StreamingContext, input: Seq[Seq[T]], numPartitions: Int)
extends InputDStream[T](ssc_) {
def start() {}
@@ -43,7 +44,7 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[
* This is a output stream just for the testsuites. All the output is collected into a
* ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
*/
-class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBuffer[Seq[T]])
+class TestOutputStream[T: ClassTag](parent: DStream[T], val output: ArrayBuffer[Seq[T]])
extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.collect()
output += collected
@@ -88,7 +89,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* Set up required DStreams to test the DStream operation using the two sequences
* of input collections.
*/
- def setupStreams[U: ClassManifest, V: ClassManifest](
+ def setupStreams[U: ClassTag, V: ClassTag](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V]
): StreamingContext = {
@@ -112,7 +113,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* Set up required DStreams to test the binary operation using the sequence
* of input collections.
*/
- def setupStreams[U: ClassManifest, V: ClassManifest, W: ClassManifest](
+ def setupStreams[U: ClassTag, V: ClassTag, W: ClassTag](
input1: Seq[Seq[U]],
input2: Seq[Seq[V]],
operation: (DStream[U], DStream[V]) => DStream[W]
@@ -140,7 +141,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* returns the collected output. It will wait until `numExpectedOutput` number of
* output data has been collected or timeout (set by `maxWaitTimeMillis`) is reached.
*/
- def runStreams[V: ClassManifest](
+ def runStreams[V: ClassTag](
ssc: StreamingContext,
numBatches: Int,
numExpectedOutput: Int
@@ -196,7 +197,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* is same as the expected output values, by comparing the output
* collections either as lists (order matters) or sets (order does not matter)
*/
- def verifyOutput[V: ClassManifest](
+ def verifyOutput[V: ClassTag](
output: Seq[Seq[V]],
expectedOutput: Seq[Seq[V]],
useSet: Boolean
@@ -226,7 +227,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* Test unary DStream operation with a list of inputs, with number of
* batches to run same as the number of expected output values
*/
- def testOperation[U: ClassManifest, V: ClassManifest](
+ def testOperation[U: ClassTag, V: ClassTag](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
expectedOutput: Seq[Seq[V]],
@@ -244,7 +245,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* @param useSet Compare the output values with the expected output values
* as sets (order matters) or as lists (order does not matter)
*/
- def testOperation[U: ClassManifest, V: ClassManifest](
+ def testOperation[U: ClassTag, V: ClassTag](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
expectedOutput: Seq[Seq[V]],
@@ -261,7 +262,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* Test binary DStream operation with two lists of inputs, with number of
* batches to run same as the number of expected output values
*/
- def testOperation[U: ClassManifest, V: ClassManifest, W: ClassManifest](
+ def testOperation[U: ClassTag, V: ClassTag, W: ClassTag](
input1: Seq[Seq[U]],
input2: Seq[Seq[V]],
operation: (DStream[U], DStream[V]) => DStream[W],
@@ -281,7 +282,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* @param useSet Compare the output values with the expected output values
* as sets (order matters) or as lists (order does not matter)
*/
- def testOperation[U: ClassManifest, V: ClassManifest, W: ClassManifest](
+ def testOperation[U: ClassTag, V: ClassTag, W: ClassTag](
input1: Seq[Seq[U]],
input2: Seq[Seq[V]],
operation: (DStream[U], DStream[V]) => DStream[W],