aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala15
1 files changed, 8 insertions, 7 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 7210439509..9020be166a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
@@ -127,9 +127,9 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
/** Listener that collects information on processed batches */
class BatchInfoCollector extends StreamingListener {
- val batchInfosCompleted = new ArrayBuffer[BatchInfo]
- val batchInfosStarted = new ArrayBuffer[BatchInfo]
- val batchInfosSubmitted = new ArrayBuffer[BatchInfo]
+ val batchInfosCompleted = new ArrayBuffer[BatchInfo] with SynchronizedBuffer[BatchInfo]
+ val batchInfosStarted = new ArrayBuffer[BatchInfo] with SynchronizedBuffer[BatchInfo]
+ val batchInfosSubmitted = new ArrayBuffer[BatchInfo] with SynchronizedBuffer[BatchInfo]
override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) {
batchInfosSubmitted += batchSubmitted.batchInfo
@@ -146,9 +146,10 @@ class BatchInfoCollector extends StreamingListener {
/** Listener that collects information on processed batches */
class ReceiverInfoCollector extends StreamingListener {
- val startedReceiverStreamIds = new ArrayBuffer[Int]
- val stoppedReceiverStreamIds = new ArrayBuffer[Int]()
- val receiverErrors = new ArrayBuffer[(Int, String, String)]()
+ val startedReceiverStreamIds = new ArrayBuffer[Int] with SynchronizedBuffer[Int]
+ val stoppedReceiverStreamIds = new ArrayBuffer[Int] with SynchronizedBuffer[Int]
+ val receiverErrors =
+ new ArrayBuffer[(Int, String, String)] with SynchronizedBuffer[(Int, String, String)]
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) {
startedReceiverStreamIds += receiverStarted.receiverInfo.streamId