diff options
author | Holden Karau <holden@us.ibm.com> | 2016-02-09 08:44:56 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-02-09 08:44:56 +0000 |
commit | 159198eff67ee9ead08fba60a585494ea1575147 (patch) | |
tree | b45f653a253ce0aac7af561caaeef9ea52099e1a /streaming/src/test/java | |
parent | f9307d8fc5223b4c5be07e3dc691a327f3bbfa7f (diff) | |
download | spark-159198eff67ee9ead08fba60a585494ea1575147.tar.gz spark-159198eff67ee9ead08fba60a585494ea1575147.tar.bz2 spark-159198eff67ee9ead08fba60a585494ea1575147.zip |
[SPARK-13165][STREAMING] Replace deprecated synchronizedBuffer in streaming
Building with Scala 2.11 results in the warning trait SynchronizedBuffer in package mutable is deprecated: Synchronization via traits is deprecated as it is inherently unreliable. Consider java.util.concurrent.ConcurrentLinkedQueue as an alternative - we already use ConcurrentLinkedQueue elsewhere so lets replace it.
Some notes about how behaviour is different for reviewers:
The Seq from a SynchronizedBuffer that was implicitly converted would continue to receive updates - however when we do the same conversion explicitly on the ConcurrentLinkedQueue this isn't the case. Hence changing some of the (internal & test) APIs to pass an Iterable. toSeq is safe to use if there are no more updates.
Author: Holden Karau <holden@us.ibm.com>
Author: tedyu <yuzhihong@gmail.com>
Closes #11067 from holdenk/SPARK-13165-replace-deprecated-synchronizedBuffer-in-streaming.
Diffstat (limited to 'streaming/src/test/java')
-rw-r--r-- | streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala | 4 |
1 files changed, 2 insertions, 2 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala index 57b50bdfd6..ae44fd07ac 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala @@ -69,7 +69,7 @@ trait JavaTestBase extends TestSuiteBase { implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] ssc.getState() val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput) - res.map(_.asJava).asJava + res.map(_.asJava).toSeq.asJava } /** @@ -85,7 +85,7 @@ trait JavaTestBase extends TestSuiteBase { implicit val cm: ClassTag[V] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]] val res = runStreamsWithPartitions[V](ssc.ssc, numBatches, numExpectedOutput) - res.map(entry => entry.map(_.asJava).asJava).asJava + res.map(entry => entry.map(_.asJava).asJava).toSeq.asJava } } |