aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/scala/JavaTestUtils.scala
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/test/scala/JavaTestUtils.scala')
-rw-r--r--streaming/src/test/scala/JavaTestUtils.scala6
1 files changed, 3 insertions, 3 deletions
diff --git a/streaming/src/test/scala/JavaTestUtils.scala b/streaming/src/test/scala/JavaTestUtils.scala
index 776b0e6bb6..9f3a80df8b 100644
--- a/streaming/src/test/scala/JavaTestUtils.scala
+++ b/streaming/src/test/scala/JavaTestUtils.scala
@@ -2,7 +2,7 @@ package spark.streaming
import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import java.util.{List => JList}
-import spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
+import api.java.{JavaPairDStream, JavaDStreamLike, JavaDStream, JavaStreamingContext}
import spark.streaming._
import java.util.ArrayList
import collection.JavaConversions._
@@ -20,7 +20,8 @@ object JavaTestUtils extends TestSuiteBase {
new JavaDStream[T](dstream)
}
- def attachTestOutputStream[T](dstream: JavaDStream[T]) = {
+ def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T,This]]
+ (dstream: JavaDStreamLike[T, This]) = {
implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
val ostream = new TestOutputStream(dstream.dstream,
@@ -37,6 +38,5 @@ object JavaTestUtils extends TestSuiteBase {
res.map(entry => out.append(new ArrayList[V](entry)))
out
}
-
}