aboutsummaryrefslogtreecommitdiff
path: root/external/flume
diff options
context:
space:
mode:
authorHolden Karau <holden@us.ibm.com>2016-02-09 08:44:56 +0000
committerSean Owen <sowen@cloudera.com>2016-02-09 08:44:56 +0000
commit159198eff67ee9ead08fba60a585494ea1575147 (patch)
treeb45f653a253ce0aac7af561caaeef9ea52099e1a /external/flume
parentf9307d8fc5223b4c5be07e3dc691a327f3bbfa7f (diff)
downloadspark-159198eff67ee9ead08fba60a585494ea1575147.tar.gz
spark-159198eff67ee9ead08fba60a585494ea1575147.tar.bz2
spark-159198eff67ee9ead08fba60a585494ea1575147.zip
[SPARK-13165][STREAMING] Replace deprecated synchronizedBuffer in streaming
Building with Scala 2.11 results in the warning trait SynchronizedBuffer in package mutable is deprecated: Synchronization via traits is deprecated as it is inherently unreliable. Consider java.util.concurrent.ConcurrentLinkedQueue as an alternative - we already use ConcurrentLinkedQueue elsewhere so lets replace it. Some notes about how behaviour is different for reviewers: The Seq from a SynchronizedBuffer that was implicitly converted would continue to receive updates - however when we do the same conversion explicitly on the ConcurrentLinkedQueue this isn't the case. Hence changing some of the (internal & test) APIs to pass an Iterable. toSeq is safe to use if there are no more updates. Author: Holden Karau <holden@us.ibm.com> Author: tedyu <yuzhihong@gmail.com> Closes #11067 from holdenk/SPARK-13165-replace-deprecated-synchronizedBuffer-in-streaming.
Diffstat (limited to 'external/flume')
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala5
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala13
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala16
3 files changed, 17 insertions, 17 deletions
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
index 57374ef515..fc02c9fcb5 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
@@ -18,6 +18,7 @@
package org.apache.spark.streaming
import java.io.{IOException, ObjectInputStream}
+import java.util.concurrent.ConcurrentLinkedQueue
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
@@ -33,10 +34,10 @@ import org.apache.spark.util.Utils
* The buffer contains a sequence of RDD's, each containing a sequence of items
*/
class TestOutputStream[T: ClassTag](parent: DStream[T],
- val output: ArrayBuffer[Seq[T]] = ArrayBuffer[Seq[T]]())
+ val output: ConcurrentLinkedQueue[Seq[T]] = new ConcurrentLinkedQueue[Seq[T]]())
extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.collect()
- output += collected
+ output.add(collected)
}, false) {
// This is to clear the output buffer every it is read from a checkpoint
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index 60db846ffb..10dcbf98bc 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -18,9 +18,9 @@
package org.apache.spark.streaming.flume
import java.net.InetSocketAddress
+import java.util.concurrent.ConcurrentLinkedQueue
import scala.collection.JavaConverters._
-import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.concurrent.duration._
import scala.language.postfixOps
@@ -102,9 +102,8 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Log
val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK,
utils.eventsPerBatch, 5)
- val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
- with SynchronizedBuffer[Seq[SparkFlumeEvent]]
- val outputStream = new TestOutputStream(flumeStream, outputBuffer)
+ val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]
+ val outputStream = new TestOutputStream(flumeStream, outputQueue)
outputStream.register()
ssc.start()
@@ -115,11 +114,11 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Log
// The eventually is required to ensure that all data in the batch has been processed.
eventually(timeout(10 seconds), interval(100 milliseconds)) {
- val flattenOutputBuffer = outputBuffer.flatten
- val headers = flattenOutputBuffer.map(_.event.getHeaders.asScala.map {
+ val flattenOutput = outputQueue.asScala.toSeq.flatten
+ val headers = flattenOutput.map(_.event.getHeaders.asScala.map {
case (key, value) => (key.toString, value.toString)
}).map(_.asJava)
- val bodies = flattenOutputBuffer.map(e => JavaUtils.bytesToString(e.event.getBody))
+ val bodies = flattenOutput.map(e => JavaUtils.bytesToString(e.event.getBody))
utils.assertOutput(headers.asJava, bodies.asJava)
}
} finally {
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index b29e591c07..38208c6518 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -17,8 +17,9 @@
package org.apache.spark.streaming.flume
+import java.util.concurrent.ConcurrentLinkedQueue
+
import scala.collection.JavaConverters._
-import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.concurrent.duration._
import scala.language.postfixOps
@@ -51,14 +52,14 @@ class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers w
val input = (1 to 100).map { _.toString }
val utils = new FlumeTestUtils
try {
- val outputBuffer = startContext(utils.getTestPort(), testCompression)
+ val outputQueue = startContext(utils.getTestPort(), testCompression)
eventually(timeout(10 seconds), interval(100 milliseconds)) {
utils.writeInput(input.asJava, testCompression)
}
eventually(timeout(10 seconds), interval(100 milliseconds)) {
- val outputEvents = outputBuffer.flatten.map { _.event }
+ val outputEvents = outputQueue.asScala.toSeq.flatten.map { _.event }
outputEvents.foreach {
event =>
event.getHeaders.get("test") should be("header")
@@ -76,16 +77,15 @@ class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers w
/** Setup and start the streaming context */
private def startContext(
- testPort: Int, testCompression: Boolean): (ArrayBuffer[Seq[SparkFlumeEvent]]) = {
+ testPort: Int, testCompression: Boolean): (ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]) = {
ssc = new StreamingContext(conf, Milliseconds(200))
val flumeStream = FlumeUtils.createStream(
ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, testCompression)
- val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
- with SynchronizedBuffer[Seq[SparkFlumeEvent]]
- val outputStream = new TestOutputStream(flumeStream, outputBuffer)
+ val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]
+ val outputStream = new TestOutputStream(flumeStream, outputQueue)
outputStream.register()
ssc.start()
- outputBuffer
+ outputQueue
}
/** Class to create socket channel with compression */