aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/java
diff options
context:
space:
mode:
authorHolden Karau <holden@us.ibm.com>2016-02-09 08:44:56 +0000
committerSean Owen <sowen@cloudera.com>2016-02-09 08:44:56 +0000
commit159198eff67ee9ead08fba60a585494ea1575147 (patch)
treeb45f653a253ce0aac7af561caaeef9ea52099e1a /streaming/src/test/java
parentf9307d8fc5223b4c5be07e3dc691a327f3bbfa7f (diff)
downloadspark-159198eff67ee9ead08fba60a585494ea1575147.tar.gz
spark-159198eff67ee9ead08fba60a585494ea1575147.tar.bz2
spark-159198eff67ee9ead08fba60a585494ea1575147.zip
[SPARK-13165][STREAMING] Replace deprecated synchronizedBuffer in streaming
Building with Scala 2.11 results in the warning trait SynchronizedBuffer in package mutable is deprecated: Synchronization via traits is deprecated as it is inherently unreliable. Consider java.util.concurrent.ConcurrentLinkedQueue as an alternative - we already use ConcurrentLinkedQueue elsewhere so lets replace it. Some notes about how behaviour is different for reviewers: The Seq from a SynchronizedBuffer that was implicitly converted would continue to receive updates - however when we do the same conversion explicitly on the ConcurrentLinkedQueue this isn't the case. Hence changing some of the (internal & test) APIs to pass an Iterable. toSeq is safe to use if there are no more updates. Author: Holden Karau <holden@us.ibm.com> Author: tedyu <yuzhihong@gmail.com> Closes #11067 from holdenk/SPARK-13165-replace-deprecated-synchronizedBuffer-in-streaming.
Diffstat (limited to 'streaming/src/test/java')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala4
1 files changed, 2 insertions, 2 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
index 57b50bdfd6..ae44fd07ac 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
@@ -69,7 +69,7 @@ trait JavaTestBase extends TestSuiteBase {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
ssc.getState()
val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput)
- res.map(_.asJava).asJava
+ res.map(_.asJava).toSeq.asJava
}
/**
@@ -85,7 +85,7 @@ trait JavaTestBase extends TestSuiteBase {
implicit val cm: ClassTag[V] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
val res = runStreamsWithPartitions[V](ssc.ssc, numBatches, numExpectedOutput)
- res.map(entry => entry.map(_.asJava).asJava).asJava
+ res.map(entry => entry.map(_.asJava).asJava).toSeq.asJava
}
}