aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorHolden Karau <holden@us.ibm.com>2016-02-09 08:44:56 +0000
committerSean Owen <sowen@cloudera.com>2016-02-09 08:44:56 +0000
commit159198eff67ee9ead08fba60a585494ea1575147 (patch)
treeb45f653a253ce0aac7af561caaeef9ea52099e1a /streaming/src/main
parentf9307d8fc5223b4c5be07e3dc691a327f3bbfa7f (diff)
downloadspark-159198eff67ee9ead08fba60a585494ea1575147.tar.gz
spark-159198eff67ee9ead08fba60a585494ea1575147.tar.bz2
spark-159198eff67ee9ead08fba60a585494ea1575147.zip
[SPARK-13165][STREAMING] Replace deprecated synchronizedBuffer in streaming
Building with Scala 2.11 results in the warning trait SynchronizedBuffer in package mutable is deprecated: Synchronization via traits is deprecated as it is inherently unreliable. Consider java.util.concurrent.ConcurrentLinkedQueue as an alternative - we already use ConcurrentLinkedQueue elsewhere so lets replace it. Some notes about how behaviour is different for reviewers: The Seq from a SynchronizedBuffer that was implicitly converted would continue to receive updates - however when we do the same conversion explicitly on the ConcurrentLinkedQueue this isn't the case. Hence changing some of the (internal & test) APIs to pass an Iterable. toSeq is safe to use if there are no more updates. Author: Holden Karau <holden@us.ibm.com> Author: tedyu <yuzhihong@gmail.com> Closes #11067 from holdenk/SPARK-13165-replace-deprecated-synchronizedBuffer-in-streaming.
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala17
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala2
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala21
4 files changed, 23 insertions, 19 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
index b774b6b9a5..8d4e6827d6 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisorImpl.scala
@@ -19,8 +19,9 @@ package org.apache.spark.streaming.receiver
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicLong
+import java.util.concurrent.ConcurrentLinkedQueue
-import scala.collection.mutable
+import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import com.google.common.base.Throwables
@@ -83,7 +84,7 @@ private[streaming] class ReceiverSupervisorImpl(
cleanupOldBlocks(threshTime)
case UpdateRateLimit(eps) =>
logInfo(s"Received a new rate limit: $eps.")
- registeredBlockGenerators.foreach { bg =>
+ registeredBlockGenerators.asScala.foreach { bg =>
bg.updateRate(eps)
}
}
@@ -92,8 +93,7 @@ private[streaming] class ReceiverSupervisorImpl(
/** Unique block ids if one wants to add blocks directly */
private val newBlockId = new AtomicLong(System.currentTimeMillis())
- private val registeredBlockGenerators = new mutable.ArrayBuffer[BlockGenerator]
- with mutable.SynchronizedBuffer[BlockGenerator]
+ private val registeredBlockGenerators = new ConcurrentLinkedQueue[BlockGenerator]()
/** Divides received data records into data blocks for pushing in BlockManager. */
private val defaultBlockGeneratorListener = new BlockGeneratorListener {
@@ -170,11 +170,11 @@ private[streaming] class ReceiverSupervisorImpl(
}
override protected def onStart() {
- registeredBlockGenerators.foreach { _.start() }
+ registeredBlockGenerators.asScala.foreach { _.start() }
}
override protected def onStop(message: String, error: Option[Throwable]) {
- registeredBlockGenerators.foreach { _.stop() }
+ registeredBlockGenerators.asScala.foreach { _.stop() }
env.rpcEnv.stop(endpoint)
}
@@ -194,10 +194,11 @@ private[streaming] class ReceiverSupervisorImpl(
override def createBlockGenerator(
blockGeneratorListener: BlockGeneratorListener): BlockGenerator = {
// Cleanup BlockGenerators that have already been stopped
- registeredBlockGenerators --= registeredBlockGenerators.filter{ _.isStopped() }
+ val stoppedGenerators = registeredBlockGenerators.asScala.filter{ _.isStopped() }
+ stoppedGenerators.foreach(registeredBlockGenerators.remove(_))
val newBlockGenerator = new BlockGenerator(blockGeneratorListener, streamId, env.conf)
- registeredBlockGenerators += newBlockGenerator
+ registeredBlockGenerators.add(newBlockGenerator)
newBlockGenerator
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
index 81de07f933..e235afad5e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchPage.scala
@@ -273,7 +273,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
val outputOpIdToSparkJobIds = batchUIData.outputOpIdSparkJobIdPairs.groupBy(_.outputOpId).
map { case (outputOpId, outputOpIdAndSparkJobIds) =>
// sort SparkJobIds for each OutputOpId
- (outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).sorted)
+ (outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).toSeq.sorted)
}
val outputOps: Seq[(OutputOperationUIData, Seq[SparkJobId])] =
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
index 3ef3689de1..1af60857bc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
@@ -33,7 +33,7 @@ private[ui] case class BatchUIData(
val processingStartTime: Option[Long],
val processingEndTime: Option[Long],
val outputOperations: mutable.HashMap[OutputOpId, OutputOperationUIData] = mutable.HashMap(),
- var outputOpIdSparkJobIdPairs: Seq[OutputOpIdAndSparkJobId] = Seq.empty) {
+ var outputOpIdSparkJobIdPairs: Iterable[OutputOpIdAndSparkJobId] = Seq.empty) {
/**
* Time taken for the first job of this batch to start processing from the time this batch
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index cacd430cf3..30a3a98c01 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -18,8 +18,10 @@
package org.apache.spark.streaming.ui
import java.util.{LinkedHashMap, Map => JMap, Properties}
+import java.util.concurrent.ConcurrentLinkedQueue
-import scala.collection.mutable.{ArrayBuffer, HashMap, Queue, SynchronizedBuffer}
+import scala.collection.JavaConverters._
+import scala.collection.mutable.{HashMap, Queue}
import org.apache.spark.scheduler._
import org.apache.spark.streaming.{StreamingContext, Time}
@@ -41,9 +43,9 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
// we may not be able to get the corresponding BatchUIData when receiving onJobStart. So here we
// cannot use a map of (Time, BatchUIData).
private[ui] val batchTimeToOutputOpIdSparkJobIdPair =
- new LinkedHashMap[Time, SynchronizedBuffer[OutputOpIdAndSparkJobId]] {
+ new LinkedHashMap[Time, ConcurrentLinkedQueue[OutputOpIdAndSparkJobId]] {
override def removeEldestEntry(
- p1: JMap.Entry[Time, SynchronizedBuffer[OutputOpIdAndSparkJobId]]): Boolean = {
+ p1: JMap.Entry[Time, ConcurrentLinkedQueue[OutputOpIdAndSparkJobId]]): Boolean = {
// If a lot of "onBatchCompleted"s happen before "onJobStart" (image if
// SparkContext.listenerBus is very slow), "batchTimeToOutputOpIdToSparkJobIds"
// may add some information for a removed batch when processing "onJobStart". It will be a
@@ -131,12 +133,10 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
getBatchTimeAndOutputOpId(jobStart.properties).foreach { case (batchTime, outputOpId) =>
var outputOpIdToSparkJobIds = batchTimeToOutputOpIdSparkJobIdPair.get(batchTime)
if (outputOpIdToSparkJobIds == null) {
- outputOpIdToSparkJobIds =
- new ArrayBuffer[OutputOpIdAndSparkJobId]()
- with SynchronizedBuffer[OutputOpIdAndSparkJobId]
+ outputOpIdToSparkJobIds = new ConcurrentLinkedQueue[OutputOpIdAndSparkJobId]()
batchTimeToOutputOpIdSparkJobIdPair.put(batchTime, outputOpIdToSparkJobIds)
}
- outputOpIdToSparkJobIds += OutputOpIdAndSparkJobId(outputOpId, jobStart.jobId)
+ outputOpIdToSparkJobIds.add(OutputOpIdAndSparkJobId(outputOpId, jobStart.jobId))
}
}
@@ -256,8 +256,11 @@ private[streaming] class StreamingJobProgressListener(ssc: StreamingContext)
}
}
batchUIData.foreach { _batchUIData =>
- val outputOpIdToSparkJobIds =
- Option(batchTimeToOutputOpIdSparkJobIdPair.get(batchTime)).getOrElse(Seq.empty)
+ // We use an Iterable rather than explicitly converting to a seq so that updates
+ // will propegate
+ val outputOpIdToSparkJobIds: Iterable[OutputOpIdAndSparkJobId] =
+ Option(batchTimeToOutputOpIdSparkJobIdPair.get(batchTime).asScala)
+ .getOrElse(Seq.empty)
_batchUIData.outputOpIdSparkJobIdPairs = outputOpIdToSparkJobIds
}
batchUIData