From 159198eff67ee9ead08fba60a585494ea1575147 Mon Sep 17 00:00:00 2001 From: Holden Karau Date: Tue, 9 Feb 2016 08:44:56 +0000 Subject: [SPARK-13165][STREAMING] Replace deprecated synchronizedBuffer in streaming Building with Scala 2.11 results in the warning trait SynchronizedBuffer in package mutable is deprecated: Synchronization via traits is deprecated as it is inherently unreliable. Consider java.util.concurrent.ConcurrentLinkedQueue as an alternative - we already use ConcurrentLinkedQueue elsewhere so lets replace it. Some notes about how behaviour is different for reviewers: The Seq from a SynchronizedBuffer that was implicitly converted would continue to receive updates - however when we do the same conversion explicitly on the ConcurrentLinkedQueue this isn't the case. Hence changing some of the (internal & test) APIs to pass an Iterable. toSeq is safe to use if there are no more updates. Author: Holden Karau Author: tedyu Closes #11067 from holdenk/SPARK-13165-replace-deprecated-synchronizedBuffer-in-streaming. --- .../src/test/java/org/apache/spark/streaming/JavaTestUtils.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'streaming/src/test/java') diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala index 57b50bdfd6..ae44fd07ac 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala @@ -69,7 +69,7 @@ trait JavaTestBase extends TestSuiteBase { implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] ssc.getState() val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput) - res.map(_.asJava).asJava + res.map(_.asJava).toSeq.asJava } /** @@ -85,7 +85,7 @@ trait JavaTestBase extends TestSuiteBase { implicit val cm: ClassTag[V] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] val res = runStreamsWithPartitions[V](ssc.ssc, numBatches, numExpectedOutput) - res.map(entry => entry.map(_.asJava).asJava).asJava + res.map(entry => entry.map(_.asJava).asJava).toSeq.asJava } } -- cgit v1.2.3