diff options
author | Prashant Sharma <prashant.s@imaginea.com> | 2013-04-29 16:39:13 +0530 |
---|---|---|
committer | Prashant Sharma <prashant.s@imaginea.com> | 2013-04-29 16:39:13 +0530 |
commit | 8f3ac240cbdd678c0c76155b080dcc461355452e (patch) | |
tree | a35684b591baa426bd022e0a070d3d900ed2a6d0 /streaming/src/test/java | |
parent | 4b4a36ea7d7f9e1d9c9ee1d6738deea579dc1b4e (diff) | |
download | spark-8f3ac240cbdd678c0c76155b080dcc461355452e.tar.gz spark-8f3ac240cbdd678c0c76155b080dcc461355452e.tar.bz2 spark-8f3ac240cbdd678c0c76155b080dcc461355452e.zip |
Fixed Warning: ClassManifest -> ClassTag
Diffstat (limited to 'streaming/src/test/java')
-rw-r--r-- | streaming/src/test/java/spark/streaming/JavaTestUtils.scala | 22 |
1 files changed, 12 insertions, 10 deletions
diff --git a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala index 64a7e7cbf9..8a7c48bde6 100644 --- a/streaming/src/test/java/spark/streaming/JavaTestUtils.scala +++ b/streaming/src/test/java/spark/streaming/JavaTestUtils.scala @@ -1,6 +1,8 @@ package spark.streaming -import collection.mutable.{SynchronizedBuffer, ArrayBuffer} +import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} +import scala.reflect.ClassTag + import java.util.{List => JList} import spark.streaming.api.java.{JavaPairDStream, JavaDStreamLike, JavaDStream, JavaStreamingContext} import spark.streaming._ @@ -13,15 +15,15 @@ trait JavaTestBase extends TestSuiteBase { /** * Create a [[spark.streaming.TestInputStream]] and attach it to the supplied context. * The stream will be derived from the supplied lists of Java objects. - **/ + */ def attachTestInputStream[T]( ssc: JavaStreamingContext, data: JList[JList[T]], numPartitions: Int) = { val seqData = data.map(Seq(_:_*)) - implicit val cm: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val dstream = new TestInputStream[T](ssc.ssc, seqData, numPartitions) ssc.ssc.registerInputStream(dstream) new JavaDStream[T](dstream) @@ -30,12 +32,12 @@ trait JavaTestBase extends TestSuiteBase { /** * Attach a provided stream to it's associated StreamingContext as a * [[spark.streaming.TestOutputStream]]. - **/ + */ def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T, This, R], R <: spark.api.java.JavaRDDLike[T, R]]( dstream: JavaDStreamLike[T, This, R]) = { - implicit val cm: ClassManifest[T] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]] + implicit val cm: ClassTag[T] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] val ostream = new TestOutputStream(dstream.dstream, new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]]) dstream.dstream.ssc.registerOutputStream(ostream) @@ -48,8 +50,8 @@ trait JavaTestBase extends TestSuiteBase { */ def runStreams[V]( ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = { - implicit val cm: ClassManifest[V] = - implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + implicit val cm: ClassTag[V] = + implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput) val out = new ArrayList[JList[V]]() res.map(entry => out.append(new ArrayList[V](entry))) @@ -64,4 +66,4 @@ object JavaTestUtils extends JavaTestBase { object JavaCheckpointTestUtils extends JavaTestBase { override def actuallyWait = true -}
\ No newline at end of file +} |