aboutsummaryrefslogtreecommitdiff
path: root/external/flume/src
diff options
context:
space:
mode:
Diffstat (limited to 'external/flume/src')
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala5
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala13
-rw-r--r--external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala16
3 files changed, 17 insertions, 17 deletions
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
index 57374ef515..fc02c9fcb5 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/TestOutputStream.scala
@@ -18,6 +18,7 @@
package org.apache.spark.streaming
import java.io.{IOException, ObjectInputStream}
+import java.util.concurrent.ConcurrentLinkedQueue
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
@@ -33,10 +34,10 @@ import org.apache.spark.util.Utils
* The buffer contains a sequence of RDD's, each containing a sequence of items
*/
class TestOutputStream[T: ClassTag](parent: DStream[T],
- val output: ArrayBuffer[Seq[T]] = ArrayBuffer[Seq[T]]())
+ val output: ConcurrentLinkedQueue[Seq[T]] = new ConcurrentLinkedQueue[Seq[T]]())
extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.collect()
- output += collected
+ output.add(collected)
}, false) {
// This is to clear the output buffer every it is read from a checkpoint
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
index 60db846ffb..10dcbf98bc 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala
@@ -18,9 +18,9 @@
package org.apache.spark.streaming.flume
import java.net.InetSocketAddress
+import java.util.concurrent.ConcurrentLinkedQueue
import scala.collection.JavaConverters._
-import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.concurrent.duration._
import scala.language.postfixOps
@@ -102,9 +102,8 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Log
val flumeStream: ReceiverInputDStream[SparkFlumeEvent] =
FlumeUtils.createPollingStream(ssc, addresses, StorageLevel.MEMORY_AND_DISK,
utils.eventsPerBatch, 5)
- val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
- with SynchronizedBuffer[Seq[SparkFlumeEvent]]
- val outputStream = new TestOutputStream(flumeStream, outputBuffer)
+ val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]
+ val outputStream = new TestOutputStream(flumeStream, outputQueue)
outputStream.register()
ssc.start()
@@ -115,11 +114,11 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Log
// The eventually is required to ensure that all data in the batch has been processed.
eventually(timeout(10 seconds), interval(100 milliseconds)) {
- val flattenOutputBuffer = outputBuffer.flatten
- val headers = flattenOutputBuffer.map(_.event.getHeaders.asScala.map {
+ val flattenOutput = outputQueue.asScala.toSeq.flatten
+ val headers = flattenOutput.map(_.event.getHeaders.asScala.map {
case (key, value) => (key.toString, value.toString)
}).map(_.asJava)
- val bodies = flattenOutputBuffer.map(e => JavaUtils.bytesToString(e.event.getBody))
+ val bodies = flattenOutput.map(e => JavaUtils.bytesToString(e.event.getBody))
utils.assertOutput(headers.asJava, bodies.asJava)
}
} finally {
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index b29e591c07..38208c6518 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -17,8 +17,9 @@
package org.apache.spark.streaming.flume
+import java.util.concurrent.ConcurrentLinkedQueue
+
import scala.collection.JavaConverters._
-import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.concurrent.duration._
import scala.language.postfixOps
@@ -51,14 +52,14 @@ class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers w
val input = (1 to 100).map { _.toString }
val utils = new FlumeTestUtils
try {
- val outputBuffer = startContext(utils.getTestPort(), testCompression)
+ val outputQueue = startContext(utils.getTestPort(), testCompression)
eventually(timeout(10 seconds), interval(100 milliseconds)) {
utils.writeInput(input.asJava, testCompression)
}
eventually(timeout(10 seconds), interval(100 milliseconds)) {
- val outputEvents = outputBuffer.flatten.map { _.event }
+ val outputEvents = outputQueue.asScala.toSeq.flatten.map { _.event }
outputEvents.foreach {
event =>
event.getHeaders.get("test") should be("header")
@@ -76,16 +77,15 @@ class FlumeStreamSuite extends SparkFunSuite with BeforeAndAfter with Matchers w
/** Setup and start the streaming context */
private def startContext(
- testPort: Int, testCompression: Boolean): (ArrayBuffer[Seq[SparkFlumeEvent]]) = {
+ testPort: Int, testCompression: Boolean): (ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]) = {
ssc = new StreamingContext(conf, Milliseconds(200))
val flumeStream = FlumeUtils.createStream(
ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, testCompression)
- val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
- with SynchronizedBuffer[Seq[SparkFlumeEvent]]
- val outputStream = new TestOutputStream(flumeStream, outputBuffer)
+ val outputQueue = new ConcurrentLinkedQueue[Seq[SparkFlumeEvent]]
+ val outputStream = new TestOutputStream(flumeStream, outputQueue)
outputStream.register()
ssc.start()
- outputBuffer
+ outputQueue
}
/** Class to create socket channel with compression */