aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorHolden Karau <holden@us.ibm.com>2016-02-09 08:44:56 +0000
committerSean Owen <sowen@cloudera.com>2016-02-09 08:44:56 +0000
commit159198eff67ee9ead08fba60a585494ea1575147 (patch)
treeb45f653a253ce0aac7af561caaeef9ea52099e1a /streaming
parentf9307d8fc5223b4c5be07e3dc691a327f3bbfa7f (diff)
downloadspark-159198eff67ee9ead08fba60a585494ea1575147.tar.gz
spark-159198eff67ee9ead08fba60a585494ea1575147.tar.bz2
spark-159198eff67ee9ead08fba60a585494ea1575147.zip
[SPARK-13165][STREAMING] Replace deprecated synchronizedBuffer in streaming
Building with Scala 2.11 results in the warning trait SynchronizedBuffer in package mutable is deprecated: Synchronization via traits is deprecated as it is inherently unreliable. Consider java.util.concurrent.ConcurrentLinkedQueue as an alternative - we already use ConcurrentLinkedQueue elsewhere so lets replace it. Some notes about how behaviour is different for reviewers: The Seq from a SynchronizedBuffer that was implicitly converted would continue to receive updates - however when we do the same conversion explicitly on the ConcurrentLinkedQueue this isn't the case. Hence changing some of the (internal & test) APIs to pass an Iterable. toSeq is safe to use if there are no more updates. Author: Holden Karau <holden@us.ibm.com> Author: tedyu <yuzhihong@gmail.com> Closes #11067 from holdenk/SPARK-13165-replace-deprecated-synchronizedBuffer-in-streaming.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala17
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala21
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala4
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala24
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala22
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala72
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala10
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala9
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala89
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala36
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala32
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala2
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala22
15 files changed, 188 insertions, 176 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index b774b6b9a5..8d4e6827d6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -19,8 +19,9 @@ package org.apache.spark.streaming.receiver
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.ConcurrentLinkedQueue
-import scala.collection.mutable
+import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import com.google.common.base.Throwables
@@ -83,7 +84,7 @@ private[streaming] class ReceiverSupervisorImpl(
cleanupOldBlocks(threshTime)
case UpdateRateLimit(eps) =>
logInfo(s"Received a new rate limit: $eps.")
- registeredBlockGenerators.foreach { bg =>
+ registeredBlockGenerators.asScala.foreach { bg =>
bg.updateRate(eps)
}
}
@@ -92,8 +93,7 @@ private[streaming] class ReceiverSupervisorImpl(
/** Unique block ids if one wants to add blocks directly */
private val newBlockId = new AtomicLong(System.currentTimeMillis())
- private val registeredBlockGenerators = new mutable.ArrayBuffer[BlockGenerator]
- with mutable.SynchronizedBuffer[BlockGenerator]
+ private val registeredBlockGenerators = new ConcurrentLinkedQueue[BlockGenerator]()
/** Divides received data records into data blocks for pushing in BlockManager. */
private val defaultBlockGeneratorListener = new BlockGeneratorListener {
@@ -170,11 +170,11 @@ private[streaming] class ReceiverSupervisorImpl(
}
override protected def onStart() {
- registeredBlockGenerators.foreach { _.start() }
+ registeredBlockGenerators.asScala.foreach { _.start() }
}
override protected def onStop(message: String, error: Option[Throwable]) {
- registeredBlockGenerators.foreach { _.stop() }
+ registeredBlockGenerators.asScala.foreach { _.stop() }
env.rpcEnv.stop(endpoint)
}
@@ -194,10 +194,11 @@ private[streaming] class ReceiverSupervisorImpl(
override def createBlockGenerator(
blockGeneratorListener: BlockGeneratorListener): BlockGenerator = {
// Cleanup BlockGenerators that have already been stopped
- registeredBlockGenerators --= registeredBlockGenerators.filter{ _.isStopped() }
+ val stoppedGenerators = registeredBlockGenerators.asScala.filter{ _.isStopped() }
+ stoppedGenerators.foreach(registeredBlockGenerators.remove(_))
val newBlockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)
- registeredBlockGenerators += newBlockGenerator
+ registeredBlockGenerators.add(newBlockGenerator)
newBlockGenerator
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
index 81de07f933..e235afad5e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
@@ -273,7 +273,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
val outputOpIdToSparkJobIds = batchUIData.outputOpIdSparkJobIdPairs.groupBy(_.outputOpId).
map { case (outputOpId, outputOpIdAndSparkJobIds) =>
// sort SparkJobIds for each OutputOpId
- (outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).sorted)
+ (outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).toSeq.sorted)
}
val outputOps: Seq[(OutputOperationUIData, Seq[SparkJobId])] =
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
index 3ef3689de1..1af60857bc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
@@ -33,7 +33,7 @@ private[ui] case class BatchUIData(
val processingStartTime: Option[Long],
val processingEndTime: Option[Long],
val outputOperations: mutable.HashMap[OutputOpId, OutputOperationUIData] = mutable.HashMap(),
- var outputOpIdSparkJobIdPairs: Seq[OutputOpIdAndSparkJobId] = Seq.empty) {
+ var outputOpIdSparkJobIdPairs: Iterable[OutputOpIdAndSparkJobId] = Seq.empty) {
/**
* Time taken for the first job of this batch to start processing from the time this batch
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index cacd430cf3..30a3a98c01 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -18,8 +18,10 @@
package org.apache.spark.streaming.ui
import java.util.{LinkedHashMap, Map => JMap, Properties}
+import java.util.concurrent.ConcurrentLinkedQueue
-import scala.collection.mutable.{ArrayBuffer, HashMap, Queue, SynchronizedBuffer}
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{HashMap, Queue}
import org.apache.spark.scheduler._
import org.apache.spark.streaming.{StreamingContext, Time}
@@ -41,9 +43,9 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
// we may not be able to get the corresponding BatchUIData when receiving onJobStart. So here we
// cannot use a map of (Time, BatchUIData).
private[ui] val batchTimeToOutputOpIdSparkJobIdPair =
- new LinkedHashMap[Time, SynchronizedBuffer[OutputOpIdAndSparkJobId]] {
+ new LinkedHashMap[Time, ConcurrentLinkedQueue[OutputOpIdAndSparkJobId]] {
override def removeEldestEntry(
- p1: JMap.Entry[Time, SynchronizedBuffer[OutputOpIdAndSparkJobId]]): Boolean = {
+ p1: JMap.Entry[Time, ConcurrentLinkedQueue[OutputOpIdAndSparkJobId]]): Boolean = {
// If a lot of "onBatchCompleted"s happen before "onJobStart" (image if
// SparkContext.listenerBus is very slow), "batchTimeToOutputOpIdToSparkJobIds"
// may add some information for a removed batch when processing "onJobStart". It will be a
@@ -131,12 +133,10 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
getBatchTimeAndOutputOpId(jobStart.properties).foreach { case (batchTime, outputOpId) =>
var outputOpIdToSparkJobIds = batchTimeToOutputOpIdSparkJobIdPair.get(batchTime)
if (outputOpIdToSparkJobIds == null) {
- outputOpIdToSparkJobIds =
- new ArrayBuffer[OutputOpIdAndSparkJobId]()
- with SynchronizedBuffer[OutputOpIdAndSparkJobId]
+ outputOpIdToSparkJobIds = new ConcurrentLinkedQueue[OutputOpIdAndSparkJobId]()
batchTimeToOutputOpIdSparkJobIdPair.put(batchTime, outputOpIdToSparkJobIds)
}
- outputOpIdToSparkJobIds += OutputOpIdAndSparkJobId(outputOpId, jobStart.jobId)
+ outputOpIdToSparkJobIds.add(OutputOpIdAndSparkJobId(outputOpId, jobStart.jobId))
}
}
@@ -256,8 +256,11 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}
}
batchUIData.foreach { _batchUIData =>
- val outputOpIdToSparkJobIds =
- Option(batchTimeToOutputOpIdSparkJobIdPair.get(batchTime)).getOrElse(Seq.empty)
+ // We use an Iterable rather than explicitly converting to a seq so that updates
+ // will propegate
+ val outputOpIdToSparkJobIds: Iterable[OutputOpIdAndSparkJobId] =
+ Option(batchTimeToOutputOpIdSparkJobIdPair.get(batchTime).asScala)
+ .getOrElse(Seq.empty)
_batchUIData.outputOpIdSparkJobIdPairs = outputOpIdToSparkJobIds
}
batchUIData
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
index 57b50bdfd6..ae44fd07ac 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaTestUtils.scala
@@ -69,7 +69,7 @@ trait JavaTestBase extends TestSuiteBase {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
ssc.getState()
val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput)
- res.map(_.asJava).asJava
+ res.map(_.asJava).toSeq.asJava
}
/**
@@ -85,7 +85,7 @@ trait JavaTestBase extends TestSuiteBase {
implicit val cm: ClassTag[V] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
val res = runStreamsWithPartitions[V](ssc.ssc, numBatches, numExpectedOutput)
- res.map(entry => entry.map(_.asJava).asJava).asJava
+ res.map(entry => entry.map(_.asJava).asJava).toSeq.asJava
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 25e7ae8262..f1c64799c6 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -17,8 +17,10 @@
package org.apache.spark.streaming
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import scala.collection.JavaConverters._
import scala.collection.mutable
-import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import scala.language.existentials
import scala.reflect.ClassTag
@@ -84,9 +86,10 @@ class BasicOperationsSuite extends TestSuiteBase {
withStreamingContext(setupStreams(input, operation, 2)) { ssc =>
val output = runStreamsWithPartitions(ssc, 3, 3)
assert(output.size === 3)
- val first = output(0)
- val second = output(1)
- val third = output(2)
+ val outputArray = output.toArray
+ val first = outputArray(0)
+ val second = outputArray(1)
+ val third = outputArray(2)
assert(first.size === 5)
assert(second.size === 5)
@@ -104,9 +107,10 @@ class BasicOperationsSuite extends TestSuiteBase {
withStreamingContext(setupStreams(input, operation, 5)) { ssc =>
val output = runStreamsWithPartitions(ssc, 3, 3)
assert(output.size === 3)
- val first = output(0)
- val second = output(1)
- val third = output(2)
+ val outputArray = output.toArray
+ val first = outputArray(0)
+ val second = outputArray(1)
+ val third = outputArray(2)
assert(first.size === 2)
assert(second.size === 2)
@@ -645,8 +649,8 @@ class BasicOperationsSuite extends TestSuiteBase {
val networkStream =
ssc.socketTextStream("localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
val mappedStream = networkStream.map(_ + ".").persist()
- val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
- val outputStream = new TestOutputStream(mappedStream, outputBuffer)
+ val outputQueue = new ConcurrentLinkedQueue[Seq[String]]
+ val outputStream = new TestOutputStream(mappedStream, outputQueue)
outputStream.register()
ssc.start()
@@ -685,7 +689,7 @@ class BasicOperationsSuite extends TestSuiteBase {
testServer.stop()
// verify data has been received
- assert(outputBuffer.size > 0)
+ assert(!outputQueue.isEmpty)
assert(blockRdds.size > 0)
assert(persistentRddIds.size > 0)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index 786703eb9a..1f0245a397 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -18,8 +18,9 @@
package org.apache.spark.streaming
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, File, ObjectOutputStream}
+import java.util.concurrent.ConcurrentLinkedQueue
-import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
+import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import com.google.common.base.Charsets
@@ -105,7 +106,7 @@ trait DStreamCheckpointTester { self: SparkFunSuite =>
val operatedStream = operation(inputStream)
operatedStream.print()
val outputStream = new TestOutputStreamWithPartitions(operatedStream,
- new ArrayBuffer[Seq[Seq[V]]] with SynchronizedBuffer[Seq[Seq[V]]])
+ new ConcurrentLinkedQueue[Seq[Seq[V]]])
outputStream.register()
ssc.checkpoint(checkpointDir)
@@ -166,7 +167,7 @@ trait DStreamCheckpointTester { self: SparkFunSuite =>
// are written to make sure that both of them have been written.
assert(checkpointFilesOfLatestTime.size === 2)
}
- outputStream.output.map(_.flatten)
+ outputStream.output.asScala.map(_.flatten).toSeq
} finally {
ssc.stop(stopSparkContext = stopSparkContext)
@@ -591,7 +592,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
// Set up the streaming context and input streams
val batchDuration = Seconds(2) // Due to 1-second resolution of setLastModified() on some OS's.
val testDir = Utils.createTempDir()
- val outputBuffer = new ArrayBuffer[Seq[Int]] with SynchronizedBuffer[Seq[Int]]
+ val outputBuffer = new ConcurrentLinkedQueue[Seq[Int]]
/**
* Writes a file named `i` (which contains the number `i`) to the test directory and sets its
@@ -671,7 +672,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
ssc.stop()
// Check that we shut down while the third batch was being processed
assert(batchCounter.getNumCompletedBatches === 2)
- assert(outputStream.output.flatten === Seq(1, 3))
+ assert(outputStream.output.asScala.toSeq.flatten === Seq(1, 3))
}
// The original StreamingContext has now been stopped.
@@ -721,7 +722,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
assert(batchCounter.getNumCompletedBatches === index + numBatchesAfterRestart + 1)
}
}
- logInfo("Output after restart = " + outputStream.output.mkString("[", ", ", "]"))
+ logInfo("Output after restart = " + outputStream.output.asScala.mkString("[", ", ", "]"))
assert(outputStream.output.size > 0, "No files processed after restart")
ssc.stop()
@@ -730,11 +731,11 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
assert(recordedFiles(ssc) === (1 to 9))
// Append the new output to the old buffer
- outputBuffer ++= outputStream.output
+ outputBuffer.addAll(outputStream.output)
// Verify whether all the elements received are as expected
val expectedOutput = Seq(1, 3, 6, 10, 15, 21, 28, 36, 45)
- assert(outputBuffer.flatten.toSet === expectedOutput.toSet)
+ assert(outputBuffer.asScala.flatten.toSet === expectedOutput.toSet)
}
} finally {
Utils.deleteRecursively(testDir)
@@ -894,7 +895,8 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
* Advances the manual clock on the streaming scheduler by given number of batches.
* It also waits for the expected amount of time for each batch.
*/
- def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] =
+ def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long):
+ Iterable[Seq[V]] =
{
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
logInfo("Manual clock before advancing = " + clock.getTimeMillis())
@@ -908,7 +910,7 @@ class CheckpointSuite extends TestSuiteBase with DStreamCheckpointTester
val outputStream = ssc.graph.getOutputStreams().filter { dstream =>
dstream.isInstanceOf[TestOutputStreamWithPartitions[V]]
}.head.asInstanceOf[TestOutputStreamWithPartitions[V]]
- outputStream.output.map(_.flatten)
+ outputStream.output.asScala.map(_.flatten)
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index 75591f04ca..93c883362c 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -20,10 +20,11 @@ package org.apache.spark.streaming
import java.io.{BufferedWriter, File, OutputStreamWriter}
import java.net.{ServerSocket, Socket, SocketException}
import java.nio.charset.Charset
-import java.util.concurrent.{ArrayBlockingQueue, CountDownLatch, Executors, TimeUnit}
+import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
-import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer, SynchronizedQueue}
+import scala.collection.JavaConverters._
+import scala.collection.mutable.SynchronizedQueue
import scala.language.postfixOps
import com.google.common.io.Files
@@ -58,8 +59,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val batchCounter = new BatchCounter(ssc)
val networkStream = ssc.socketTextStream(
"localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
- val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
- val outputStream = new TestOutputStream(networkStream, outputBuffer)
+ val outputQueue = new ConcurrentLinkedQueue[Seq[String]]
+ val outputStream = new TestOutputStream(networkStream, outputQueue)
outputStream.register()
ssc.start()
@@ -90,9 +91,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Verify whether data received was as expected
logInfo("--------------------------------")
- logInfo("output.size = " + outputBuffer.size)
+ logInfo("output.size = " + outputQueue.size)
logInfo("output")
- outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+ outputQueue.asScala.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("expected output.size = " + expectedOutput.size)
logInfo("expected output")
expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
@@ -100,7 +101,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Verify whether all the elements received are as expected
// (whether the elements were received one in each interval is not verified)
- val output: ArrayBuffer[String] = outputBuffer.flatMap(x => x)
+ val output: Array[String] = outputQueue.asScala.flatMap(x => x).toArray
assert(output.size === expectedOutput.size)
for (i <- 0 until output.size) {
assert(output(i) === expectedOutput(i))
@@ -119,8 +120,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val batchCounter = new BatchCounter(ssc)
val networkStream = ssc.socketTextStream(
"localhost", testServer.port, StorageLevel.MEMORY_AND_DISK)
- val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
- val outputStream = new TestOutputStream(networkStream, outputBuffer)
+ val outputQueue = new ConcurrentLinkedQueue[Seq[String]]
+ val outputStream = new TestOutputStream(networkStream, outputQueue)
outputStream.register()
ssc.start()
@@ -156,9 +157,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
clock.setTime(existingFile.lastModified + batchDuration.milliseconds)
val batchCounter = new BatchCounter(ssc)
val fileStream = ssc.binaryRecordsStream(testDir.toString, 1)
- val outputBuffer = new ArrayBuffer[Seq[Array[Byte]]]
- with SynchronizedBuffer[Seq[Array[Byte]]]
- val outputStream = new TestOutputStream(fileStream, outputBuffer)
+ val outputQueue = new ConcurrentLinkedQueue[Seq[Array[Byte]]]
+ val outputStream = new TestOutputStream(fileStream, outputQueue)
outputStream.register()
ssc.start()
@@ -183,8 +183,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
}
val expectedOutput = input.map(i => i.toByte)
- val obtainedOutput = outputBuffer.flatten.toList.map(i => i(0).toByte)
- assert(obtainedOutput === expectedOutput)
+ val obtainedOutput = outputQueue.asScala.flatten.toList.map(i => i(0).toByte)
+ assert(obtainedOutput.toSeq === expectedOutput)
}
} finally {
if (testDir != null) Utils.deleteRecursively(testDir)
@@ -206,15 +206,15 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val numTotalRecords = numThreads * numRecordsPerThread
val testReceiver = new MultiThreadTestReceiver(numThreads, numRecordsPerThread)
MultiThreadTestReceiver.haveAllThreadsFinished = false
- val outputBuffer = new ArrayBuffer[Seq[Long]] with SynchronizedBuffer[Seq[Long]]
- def output: ArrayBuffer[Long] = outputBuffer.flatMap(x => x)
+ val outputQueue = new ConcurrentLinkedQueue[Seq[Long]]
+ def output: Iterable[Long] = outputQueue.asScala.flatMap(x => x)
// set up the network stream using the test receiver
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
val networkStream = ssc.receiverStream[Int](testReceiver)
val countStream = networkStream.count
- val outputStream = new TestOutputStream(countStream, outputBuffer)
+ val outputStream = new TestOutputStream(countStream, outputQueue)
outputStream.register()
ssc.start()
@@ -231,9 +231,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Verify whether data received was as expected
logInfo("--------------------------------")
- logInfo("output.size = " + outputBuffer.size)
+ logInfo("output.size = " + outputQueue.size)
logInfo("output")
- outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+ outputQueue.asScala.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("--------------------------------")
assert(output.sum === numTotalRecords)
}
@@ -241,14 +241,14 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
test("queue input stream - oneAtATime = true") {
val input = Seq("1", "2", "3", "4", "5")
val expectedOutput = input.map(Seq(_))
- val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
- def output: ArrayBuffer[Seq[String]] = outputBuffer.filter(_.size > 0)
+ val outputQueue = new ConcurrentLinkedQueue[Seq[String]]
+ def output: Iterable[Seq[String]] = outputQueue.asScala.filter(_.size > 0)
// Set up the streaming context and input streams
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
val queue = new SynchronizedQueue[RDD[String]]()
val queueStream = ssc.queueStream(queue, oneAtATime = true)
- val outputStream = new TestOutputStream(queueStream, outputBuffer)
+ val outputStream = new TestOutputStream(queueStream, outputQueue)
outputStream.register()
ssc.start()
@@ -266,9 +266,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Verify whether data received was as expected
logInfo("--------------------------------")
- logInfo("output.size = " + outputBuffer.size)
+ logInfo("output.size = " + outputQueue.size)
logInfo("output")
- outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+ outputQueue.asScala.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("expected output.size = " + expectedOutput.size)
logInfo("expected output")
expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
@@ -276,14 +276,12 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Verify whether all the elements received are as expected
assert(output.size === expectedOutput.size)
- for (i <- 0 until output.size) {
- assert(output(i) === expectedOutput(i))
- }
+ output.zipWithIndex.foreach{case (e, i) => assert(e == expectedOutput(i))}
}
test("queue input stream - oneAtATime = false") {
- val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
- def output: ArrayBuffer[Seq[String]] = outputBuffer.filter(_.size > 0)
+ val outputQueue = new ConcurrentLinkedQueue[Seq[String]]
+ def output: Iterable[Seq[String]] = outputQueue.asScala.filter(_.size > 0)
val input = Seq("1", "2", "3", "4", "5")
val expectedOutput = Seq(Seq("1", "2", "3"), Seq("4", "5"))
@@ -291,7 +289,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
val queue = new SynchronizedQueue[RDD[String]]()
val queueStream = ssc.queueStream(queue, oneAtATime = false)
- val outputStream = new TestOutputStream(queueStream, outputBuffer)
+ val outputStream = new TestOutputStream(queueStream, outputQueue)
outputStream.register()
ssc.start()
@@ -312,9 +310,9 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Verify whether data received was as expected
logInfo("--------------------------------")
- logInfo("output.size = " + outputBuffer.size)
+ logInfo("output.size = " + outputQueue.size)
logInfo("output")
- outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+ outputQueue.asScala.foreach(x => logInfo("[" + x.mkString(",") + "]"))
logInfo("expected output.size = " + expectedOutput.size)
logInfo("expected output")
expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
@@ -322,9 +320,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
// Verify whether all the elements received are as expected
assert(output.size === expectedOutput.size)
- for (i <- 0 until output.size) {
- assert(output(i) === expectedOutput(i))
- }
+ output.zipWithIndex.foreach{case (e, i) => assert(e == expectedOutput(i))}
}
test("test track the number of input stream") {
@@ -373,8 +369,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
val batchCounter = new BatchCounter(ssc)
val fileStream = ssc.fileStream[LongWritable, Text, TextInputFormat](
testDir.toString, (x: Path) => true, newFilesOnly = newFilesOnly).map(_._2.toString)
- val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
- val outputStream = new TestOutputStream(fileStream, outputBuffer)
+ val outputQueue = new ConcurrentLinkedQueue[Seq[String]]
+ val outputStream = new TestOutputStream(fileStream, outputQueue)
outputStream.register()
ssc.start()
@@ -404,7 +400,7 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
} else {
(Seq(0) ++ input).map(_.toString).toSet
}
- assert(outputBuffer.flatten.toSet === expectedOutput)
+ assert(outputQueue.asScala.flatten.toSet === expectedOutput)
}
} finally {
if (testDir != null) Utils.deleteRecursively(testDir)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala
index 2984fd2b29..b6d6585bd8 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/MapWithStateSuite.scala
@@ -18,8 +18,9 @@
package org.apache.spark.streaming
import java.io.File
+import java.util.concurrent.ConcurrentLinkedQueue
-import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
+import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
@@ -550,9 +551,9 @@ class MapWithStateSuite extends SparkFunSuite
val ssc = new StreamingContext(sc, Seconds(1))
val inputStream = new TestInputStream(ssc, input, numPartitions = 2)
val trackeStateStream = inputStream.map(x => (x, 1)).mapWithState(mapWithStateSpec)
- val collectedOutputs = new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]]
+ val collectedOutputs = new ConcurrentLinkedQueue[Seq[T]]
val outputStream = new TestOutputStream(trackeStateStream, collectedOutputs)
- val collectedStateSnapshots = new ArrayBuffer[Seq[(K, S)]] with SynchronizedBuffer[Seq[(K, S)]]
+ val collectedStateSnapshots = new ConcurrentLinkedQueue[Seq[(K, S)]]
val stateSnapshotStream = new TestOutputStream(
trackeStateStream.stateSnapshots(), collectedStateSnapshots)
outputStream.register()
@@ -567,7 +568,7 @@ class MapWithStateSuite extends SparkFunSuite
batchCounter.waitUntilBatchesCompleted(numBatches, 10000)
ssc.stop(stopSparkContext = false)
- (collectedOutputs, collectedStateSnapshots)
+ (collectedOutputs.asScala.toSeq, collectedStateSnapshots.asScala.toSeq)
}
private def assert[U](expected: Seq[Seq[U]], collected: Seq[Seq[U]], typ: String) {
@@ -583,4 +584,3 @@ class MapWithStateSuite extends SparkFunSuite
}
}
}
-
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
index 7bbbdebd9b..a02d49eced 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala
@@ -21,6 +21,7 @@ import java.io.{File, IOException}
import java.nio.charset.Charset
import java.util.UUID
+import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import scala.util.Random
@@ -215,8 +216,8 @@ object MasterFailureTest extends Logging {
while(!isLastOutputGenerated && !isTimedOut) {
// Get the output buffer
- val outputBuffer = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[T]].output
- def output = outputBuffer.flatMap(x => x)
+ val outputQueue = ssc.graph.getOutputStreams().head.asInstanceOf[TestOutputStream[T]].output
+ def output = outputQueue.asScala.flatten
// Start the thread to kill the streaming after some time
killed = false
@@ -257,9 +258,9 @@ object MasterFailureTest extends Logging {
// Verify whether the output of each batch has only one element or no element
// and then merge the new output with all the earlier output
- mergedOutput ++= output
+ mergedOutput ++= output.toSeq
totalTimeRan += timeRan
- logInfo("New output = " + output)
+ logInfo("New output = " + output.toSeq)
logInfo("Merged output = " + mergedOutput)
logInfo("Time ran = " + timeRan)
logInfo("Total time ran = " + totalTimeRan)
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
index 1ed68c74db..66f47394c7 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/StreamingListenerSuite.scala
@@ -17,7 +17,10 @@
package org.apache.spark.streaming
-import scala.collection.mutable.{ArrayBuffer, HashMap, SynchronizedBuffer, SynchronizedMap}
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{HashMap, SynchronizedMap}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
@@ -62,43 +65,43 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
val batchInfosSubmitted = collector.batchInfosSubmitted
batchInfosSubmitted should have size 4
- batchInfosSubmitted.foreach(info => {
+ batchInfosSubmitted.asScala.foreach(info => {
info.schedulingDelay should be (None)
info.processingDelay should be (None)
info.totalDelay should be (None)
})
- batchInfosSubmitted.foreach { info =>
+ batchInfosSubmitted.asScala.foreach { info =>
info.numRecords should be (1L)
info.streamIdToInputInfo should be (Map(0 -> StreamInputInfo(0, 1L)))
}
- isInIncreasingOrder(batchInfosSubmitted.map(_.submissionTime)) should be (true)
+ isInIncreasingOrder(batchInfosSubmitted.asScala.map(_.submissionTime)) should be (true)
// SPARK-6766: processingStartTime of batch info should not be None when starting
val batchInfosStarted = collector.batchInfosStarted
batchInfosStarted should have size 4
- batchInfosStarted.foreach(info => {
+ batchInfosStarted.asScala.foreach(info => {
info.schedulingDelay should not be None
info.schedulingDelay.get should be >= 0L
info.processingDelay should be (None)
info.totalDelay should be (None)
})
- batchInfosStarted.foreach { info =>
+ batchInfosStarted.asScala.foreach { info =>
info.numRecords should be (1L)
info.streamIdToInputInfo should be (Map(0 -> StreamInputInfo(0, 1L)))
}
- isInIncreasingOrder(batchInfosStarted.map(_.submissionTime)) should be (true)
- isInIncreasingOrder(batchInfosStarted.map(_.processingStartTime.get)) should be (true)
+ isInIncreasingOrder(batchInfosStarted.asScala.map(_.submissionTime)) should be (true)
+ isInIncreasingOrder(batchInfosStarted.asScala.map(_.processingStartTime.get)) should be (true)
// test onBatchCompleted
val batchInfosCompleted = collector.batchInfosCompleted
batchInfosCompleted should have size 4
- batchInfosCompleted.foreach(info => {
+ batchInfosCompleted.asScala.foreach(info => {
info.schedulingDelay should not be None
info.processingDelay should not be None
info.totalDelay should not be None
@@ -107,14 +110,14 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
info.totalDelay.get should be >= 0L
})
- batchInfosCompleted.foreach { info =>
+ batchInfosCompleted.asScala.foreach { info =>
info.numRecords should be (1L)
info.streamIdToInputInfo should be (Map(0 -> StreamInputInfo(0, 1L)))
}
- isInIncreasingOrder(batchInfosCompleted.map(_.submissionTime)) should be (true)
- isInIncreasingOrder(batchInfosCompleted.map(_.processingStartTime.get)) should be (true)
- isInIncreasingOrder(batchInfosCompleted.map(_.processingEndTime.get)) should be (true)
+ isInIncreasingOrder(batchInfosCompleted.asScala.map(_.submissionTime)) should be (true)
+ isInIncreasingOrder(batchInfosCompleted.asScala.map(_.processingStartTime.get)) should be (true)
+ isInIncreasingOrder(batchInfosCompleted.asScala.map(_.processingEndTime.get)) should be (true)
}
test("receiver info reporting") {
@@ -129,13 +132,13 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
try {
eventually(timeout(30 seconds), interval(20 millis)) {
collector.startedReceiverStreamIds.size should equal (1)
- collector.startedReceiverStreamIds(0) should equal (0)
- collector.stoppedReceiverStreamIds should have size 1
- collector.stoppedReceiverStreamIds(0) should equal (0)
+ collector.startedReceiverStreamIds.peek() should equal (0)
+ collector.stoppedReceiverStreamIds.size should equal (1)
+ collector.stoppedReceiverStreamIds.peek() should equal (0)
collector.receiverErrors should have size 1
- collector.receiverErrors(0)._1 should equal (0)
- collector.receiverErrors(0)._2 should include ("report error")
- collector.receiverErrors(0)._3 should include ("report exception")
+ collector.receiverErrors.peek()._1 should equal (0)
+ collector.receiverErrors.peek()._2 should include ("report error")
+ collector.receiverErrors.peek()._3 should include ("report exception")
}
} finally {
ssc.stop()
@@ -155,8 +158,8 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
ssc.start()
try {
eventually(timeout(30 seconds), interval(20 millis)) {
- collector.startedOutputOperationIds.take(3) should be (Seq(0, 1, 2))
- collector.completedOutputOperationIds.take(3) should be (Seq(0, 1, 2))
+ collector.startedOutputOperationIds.asScala.take(3) should be (Seq(0, 1, 2))
+ collector.completedOutputOperationIds.asScala.take(3) should be (Seq(0, 1, 2))
}
} finally {
ssc.stop()
@@ -271,69 +274,63 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
}
/** Check if a sequence of numbers is in increasing order */
- def isInIncreasingOrder(seq: Seq[Long]): Boolean = {
- for (i <- 1 until seq.size) {
- if (seq(i - 1) > seq(i)) {
- return false
- }
- }
- true
+ def isInIncreasingOrder(data: Iterable[Long]): Boolean = {
+ !data.sliding(2).map{itr => itr.size == 2 && itr.head > itr.tail.head }.contains(true)
}
}
/** Listener that collects information on processed batches */
class BatchInfoCollector extends StreamingListener {
- val batchInfosCompleted = new ArrayBuffer[BatchInfo] with SynchronizedBuffer[BatchInfo]
- val batchInfosStarted = new ArrayBuffer[BatchInfo] with SynchronizedBuffer[BatchInfo]
- val batchInfosSubmitted = new ArrayBuffer[BatchInfo] with SynchronizedBuffer[BatchInfo]
+ val batchInfosCompleted = new ConcurrentLinkedQueue[BatchInfo]
+ val batchInfosStarted = new ConcurrentLinkedQueue[BatchInfo]
+ val batchInfosSubmitted = new ConcurrentLinkedQueue[BatchInfo]
override def onBatchSubmitted(batchSubmitted: StreamingListenerBatchSubmitted) {
- batchInfosSubmitted += batchSubmitted.batchInfo
+ batchInfosSubmitted.add(batchSubmitted.batchInfo)
}
override def onBatchStarted(batchStarted: StreamingListenerBatchStarted) {
- batchInfosStarted += batchStarted.batchInfo
+ batchInfosStarted.add(batchStarted.batchInfo)
}
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
- batchInfosCompleted += batchCompleted.batchInfo
+ batchInfosCompleted.add(batchCompleted.batchInfo)
}
}
/** Listener that collects information on processed batches */
class ReceiverInfoCollector extends StreamingListener {
- val startedReceiverStreamIds = new ArrayBuffer[Int] with SynchronizedBuffer[Int]
- val stoppedReceiverStreamIds = new ArrayBuffer[Int] with SynchronizedBuffer[Int]
- val receiverErrors =
- new ArrayBuffer[(Int, String, String)] with SynchronizedBuffer[(Int, String, String)]
+ val startedReceiverStreamIds = new ConcurrentLinkedQueue[Int]
+ val stoppedReceiverStreamIds = new ConcurrentLinkedQueue[Int]
+ val receiverErrors = new ConcurrentLinkedQueue[(Int, String, String)]
override def onReceiverStarted(receiverStarted: StreamingListenerReceiverStarted) {
- startedReceiverStreamIds += receiverStarted.receiverInfo.streamId
+ startedReceiverStreamIds.add(receiverStarted.receiverInfo.streamId)
}
override def onReceiverStopped(receiverStopped: StreamingListenerReceiverStopped) {
- stoppedReceiverStreamIds += receiverStopped.receiverInfo.streamId
+ stoppedReceiverStreamIds.add(receiverStopped.receiverInfo.streamId)
}
override def onReceiverError(receiverError: StreamingListenerReceiverError) {
- receiverErrors += ((receiverError.receiverInfo.streamId,
- receiverError.receiverInfo.lastErrorMessage, receiverError.receiverInfo.lastError))
+ receiverErrors.add(((receiverError.receiverInfo.streamId,
+ receiverError.receiverInfo.lastErrorMessage, receiverError.receiverInfo.lastError)))
}
}
/** Listener that collects information on processed output operations */
class OutputOperationInfoCollector extends StreamingListener {
- val startedOutputOperationIds = new ArrayBuffer[Int] with SynchronizedBuffer[Int]
- val completedOutputOperationIds = new ArrayBuffer[Int] with SynchronizedBuffer[Int]
+ val startedOutputOperationIds = new ConcurrentLinkedQueue[Int]()
+ val completedOutputOperationIds = new ConcurrentLinkedQueue[Int]()
override def onOutputOperationStarted(
outputOperationStarted: StreamingListenerOutputOperationStarted): Unit = {
- startedOutputOperationIds += outputOperationStarted.outputOperationInfo.id
+ startedOutputOperationIds.add(outputOperationStarted.outputOperationInfo.id)
}
override def onOutputOperationCompleted(
outputOperationCompleted: StreamingListenerOutputOperationCompleted): Unit = {
- completedOutputOperationIds += outputOperationCompleted.outputOperationInfo.id
+ completedOutputOperationIds.add(outputOperationCompleted.outputOperationInfo.id)
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
index 239b10894a..82cd63bcaf 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/TestSuiteBase.scala
@@ -18,9 +18,9 @@
package org.apache.spark.streaming
import java.io.{IOException, ObjectInputStream}
+import java.util.concurrent.ConcurrentLinkedQueue
-import scala.collection.mutable.ArrayBuffer
-import scala.collection.mutable.SynchronizedBuffer
+import scala.collection.JavaConverters._
import scala.language.implicitConversions
import scala.reflect.ClassTag
@@ -87,17 +87,17 @@ class TestInputStream[T: ClassTag](_ssc: StreamingContext, input: Seq[Seq[T]], n
/**
* This is a output stream just for the testsuites. All the output is collected into a
- * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
+ * ConcurrentLinkedQueue. This queue is wiped clean on being restored from checkpoint.
*
- * The buffer contains a sequence of RDD's, each containing a sequence of items
+ * The buffer contains a sequence of RDD's, each containing a sequence of items.
*/
class TestOutputStream[T: ClassTag](
parent: DStream[T],
- val output: SynchronizedBuffer[Seq[T]] =
- new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]]
+ val output: ConcurrentLinkedQueue[Seq[T]] =
+ new ConcurrentLinkedQueue[Seq[T]]()
) extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.collect()
- output += collected
+ output.add(collected)
}, false) {
// This is to clear the output buffer every it is read from a checkpoint
@@ -110,18 +110,18 @@ class TestOutputStream[T: ClassTag](
/**
* This is a output stream just for the testsuites. All the output is collected into a
- * ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
+ * ConcurrentLinkedQueue. This queue is wiped clean on being restored from checkpoint.
*
- * The buffer contains a sequence of RDD's, each containing a sequence of partitions, each
+ * The queue contains a sequence of RDD's, each containing a sequence of partitions, each
* containing a sequence of items.
*/
class TestOutputStreamWithPartitions[T: ClassTag](
parent: DStream[T],
- val output: SynchronizedBuffer[Seq[Seq[T]]] =
- new ArrayBuffer[Seq[Seq[T]]] with SynchronizedBuffer[Seq[Seq[T]]])
+ val output: ConcurrentLinkedQueue[Seq[Seq[T]]] =
+ new ConcurrentLinkedQueue[Seq[Seq[T]]]())
extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.glom().collect().map(_.toSeq)
- output += collected
+ output.add(collected)
}, false) {
// This is to clear the output buffer every it is read from a checkpoint
@@ -322,7 +322,7 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging {
val inputStream = new TestInputStream(ssc, input, numPartitions)
val operatedStream = operation(inputStream)
val outputStream = new TestOutputStreamWithPartitions(operatedStream,
- new ArrayBuffer[Seq[Seq[V]]] with SynchronizedBuffer[Seq[Seq[V]]])
+ new ConcurrentLinkedQueue[Seq[Seq[V]]])
outputStream.register()
ssc
}
@@ -347,7 +347,7 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging {
val inputStream2 = new TestInputStream(ssc, input2, numInputPartitions)
val operatedStream = operation(inputStream1, inputStream2)
val outputStream = new TestOutputStreamWithPartitions(operatedStream,
- new ArrayBuffer[Seq[Seq[W]]] with SynchronizedBuffer[Seq[Seq[W]]])
+ new ConcurrentLinkedQueue[Seq[Seq[W]]])
outputStream.register()
ssc
}
@@ -418,7 +418,7 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging {
}
val timeTaken = System.currentTimeMillis() - startTime
logInfo("Output generated in " + timeTaken + " milliseconds")
- output.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+ output.asScala.foreach(x => logInfo("[" + x.mkString(",") + "]"))
assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
assert(output.size === numExpectedOutput, "Unexpected number of outputs generated")
@@ -426,7 +426,7 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging {
} finally {
ssc.stop(stopSparkContext = true)
}
- output
+ output.asScala.toSeq
}
/**
@@ -501,7 +501,7 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging {
val numBatches_ = if (numBatches > 0) numBatches else expectedOutput.size
withStreamingContext(setupStreams[U, V](input, operation)) { ssc =>
val output = runStreams[V](ssc, numBatches_, expectedOutput.size)
- verifyOutput[V](output, expectedOutput, useSet)
+ verifyOutput[V](output.toSeq, expectedOutput, useSet)
}
}
@@ -540,7 +540,7 @@ trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging {
val numBatches_ = if (numBatches > 0) numBatches else expectedOutput.size
withStreamingContext(setupStreams[U, V, W](input1, input2, operation)) { ssc =>
val output = runStreams[W](ssc, numBatches_, expectedOutput.size)
- verifyOutput[W](output, expectedOutput, useSet)
+ verifyOutput[W](output.toSeq, expectedOutput, useSet)
}
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala
index f5ec0ff60a..a1d0561bf3 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/receiver/BlockGeneratorSuite.scala
@@ -17,6 +17,9 @@
package org.apache.spark.streaming.receiver
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.language.reflectiveCalls
@@ -84,7 +87,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter {
assert(listener.onPushBlockCalled === true)
}
}
- listener.pushedData should contain theSameElementsInOrderAs (data1)
+ listener.pushedData.asScala.toSeq should contain theSameElementsInOrderAs (data1)
assert(listener.onAddDataCalled === false) // should be called only with addDataWithCallback()
// Verify addDataWithCallback() add data+metadata and and callbacks are called correctly
@@ -92,21 +95,24 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter {
val metadata2 = data2.map { _.toString }
data2.zip(metadata2).foreach { case (d, m) => blockGenerator.addDataWithCallback(d, m) }
assert(listener.onAddDataCalled === true)
- listener.addedData should contain theSameElementsInOrderAs (data2)
- listener.addedMetadata should contain theSameElementsInOrderAs (metadata2)
+ listener.addedData.asScala.toSeq should contain theSameElementsInOrderAs (data2)
+ listener.addedMetadata.asScala.toSeq should contain theSameElementsInOrderAs (metadata2)
clock.advance(blockIntervalMs) // advance clock to generate blocks
eventually(timeout(1 second)) {
- listener.pushedData should contain theSameElementsInOrderAs (data1 ++ data2)
+ val combined = data1 ++ data2
+ listener.pushedData.asScala.toSeq should contain theSameElementsInOrderAs combined
}
// Verify addMultipleDataWithCallback() add data+metadata and and callbacks are called correctly
val data3 = 21 to 30
val metadata3 = "metadata"
blockGenerator.addMultipleDataWithCallback(data3.iterator, metadata3)
- listener.addedMetadata should contain theSameElementsInOrderAs (metadata2 :+ metadata3)
+ val combinedMetadata = metadata2 :+ metadata3
+ listener.addedMetadata.asScala.toSeq should contain theSameElementsInOrderAs (combinedMetadata)
clock.advance(blockIntervalMs) // advance clock to generate blocks
eventually(timeout(1 second)) {
- listener.pushedData should contain theSameElementsInOrderAs (data1 ++ data2 ++ data3)
+ val combinedData = data1 ++ data2 ++ data3
+ listener.pushedData.asScala.toSeq should contain theSameElementsInOrderAs (combinedData)
}
// Stop the block generator by starting the stop on a different thread and
@@ -191,7 +197,7 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter {
assert(thread.isAlive === false)
}
assert(blockGenerator.isStopped() === true) // generator has finally been completely stopped
- assert(listener.pushedData === data, "All data not pushed by stop()")
+ assert(listener.pushedData.asScala.toSeq === data, "All data not pushed by stop()")
}
test("block push errors are reported") {
@@ -231,15 +237,15 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter {
/** A listener for BlockGenerator that records the data in the callbacks */
private class TestBlockGeneratorListener extends BlockGeneratorListener {
- val pushedData = new mutable.ArrayBuffer[Any] with mutable.SynchronizedBuffer[Any]
- val addedData = new mutable.ArrayBuffer[Any] with mutable.SynchronizedBuffer[Any]
- val addedMetadata = new mutable.ArrayBuffer[Any] with mutable.SynchronizedBuffer[Any]
+ val pushedData = new ConcurrentLinkedQueue[Any]
+ val addedData = new ConcurrentLinkedQueue[Any]
+ val addedMetadata = new ConcurrentLinkedQueue[Any]
@volatile var onGenerateBlockCalled = false
@volatile var onAddDataCalled = false
@volatile var onPushBlockCalled = false
override def onPushBlock(blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[_]): Unit = {
- pushedData ++= arrayBuffer
+ pushedData.addAll(arrayBuffer.asJava)
onPushBlockCalled = true
}
override def onError(message: String, throwable: Throwable): Unit = {}
@@ -247,8 +253,8 @@ class BlockGeneratorSuite extends SparkFunSuite with BeforeAndAfter {
onGenerateBlockCalled = true
}
override def onAddData(data: Any, metadata: Any): Unit = {
- addedData += data
- addedMetadata += metadata
+ addedData.add(data)
+ addedMetadata.add(metadata)
onAddDataCalled = true
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
index 34cd743556..26b757cc2d 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ui/StreamingJobProgressListenerSuite.scala
@@ -200,7 +200,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
batchUIData.get.totalDelay should be (batchInfoSubmitted.totalDelay)
batchUIData.get.streamIdToInputInfo should be (Map.empty)
batchUIData.get.numRecords should be (0)
- batchUIData.get.outputOpIdSparkJobIdPairs should be (Seq(OutputOpIdAndSparkJobId(0, 0)))
+ batchUIData.get.outputOpIdSparkJobIdPairs.toSeq should be (Seq(OutputOpIdAndSparkJobId(0, 0)))
// A lot of "onBatchCompleted"s happen before "onJobStart"
for(i <- limit + 1 to limit * 2) {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala
index 0544972d95..25b70a3d08 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/util/RecurringTimerSuite.scala
@@ -17,7 +17,9 @@
package org.apache.spark.streaming.util
-import scala.collection.mutable
+import java.util.concurrent.ConcurrentLinkedQueue
+
+import scala.collection.JavaConverters._
import scala.concurrent.duration._
import org.scalatest.PrivateMethodTester
@@ -30,34 +32,34 @@ class RecurringTimerSuite extends SparkFunSuite with PrivateMethodTester {
test("basic") {
val clock = new ManualClock()
- val results = new mutable.ArrayBuffer[Long]() with mutable.SynchronizedBuffer[Long]
+ val results = new ConcurrentLinkedQueue[Long]()
val timer = new RecurringTimer(clock, 100, time => {
- results += time
+ results.add(time)
}, "RecurringTimerSuite-basic")
timer.start(0)
eventually(timeout(10.seconds), interval(10.millis)) {
- assert(results === Seq(0L))
+ assert(results.asScala.toSeq === Seq(0L))
}
clock.advance(100)
eventually(timeout(10.seconds), interval(10.millis)) {
- assert(results === Seq(0L, 100L))
+ assert(results.asScala.toSeq === Seq(0L, 100L))
}
clock.advance(200)
eventually(timeout(10.seconds), interval(10.millis)) {
- assert(results === Seq(0L, 100L, 200L, 300L))
+ assert(results.asScala.toSeq === Seq(0L, 100L, 200L, 300L))
}
assert(timer.stop(interruptTimer = true) === 300L)
}
test("SPARK-10224: call 'callback' after stopping") {
val clock = new ManualClock()
- val results = new mutable.ArrayBuffer[Long]() with mutable.SynchronizedBuffer[Long]
+ val results = new ConcurrentLinkedQueue[Long]
val timer = new RecurringTimer(clock, 100, time => {
- results += time
+ results.add(time)
}, "RecurringTimerSuite-SPARK-10224")
timer.start(0)
eventually(timeout(10.seconds), interval(10.millis)) {
- assert(results === Seq(0L))
+ assert(results.asScala.toSeq === Seq(0L))
}
@volatile var lastTime = -1L
// Now RecurringTimer is waiting for the next interval
@@ -77,7 +79,7 @@ class RecurringTimerSuite extends SparkFunSuite with PrivateMethodTester {
// Then it will find `stopped` is true and exit the loop, but it should call `callback` again
// before exiting its internal thread.
thread.join()
- assert(results === Seq(0L, 100L, 200L))
+ assert(results.asScala.toSeq === Seq(0L, 100L, 200L))
assert(lastTime === 200L)
}
}