aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2014-12-08 23:54:15 -0800
committerAaron Davidson <aaron@databricks.com>2014-12-08 23:54:15 -0800
commitbcb5cdad614d4fce43725dfec3ce88172d2f8c11 (patch)
tree532261ff55313cd5cada3284b60ae5380378f4c2
parent51b1fe1426ffecac6c4644523633ea1562ff9a4e (diff)
downloadspark-bcb5cdad614d4fce43725dfec3ce88172d2f8c11.tar.gz
spark-bcb5cdad614d4fce43725dfec3ce88172d2f8c11.tar.bz2
spark-bcb5cdad614d4fce43725dfec3ce88172d2f8c11.zip
[SPARK-3154][STREAMING] Replace ConcurrentHashMap with mutable.HashMap and remove @volatile from 'stopped'
Since `sequenceNumberToProcessor` and `stopped` are both protected by the lock `sequenceNumberToProcessor`, `ConcurrentHashMap` and `volatile` is unnecessary. So this PR updated them accordingly. Author: zsxwing <zsxwing@gmail.com> Closes #3634 from zsxwing/SPARK-3154 and squashes the following commits: 0d087ac [zsxwing] Replace ConcurrentHashMap with mutable.HashMap and remove @volatile from 'stopped'
-rw-r--r--external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala23
1 files changed, 12 insertions, 11 deletions
diff --git a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
index 3c656a381b..4373be443e 100644
--- a/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
+++ b/external/flume-sink/src/main/scala/org/apache/spark/streaming/flume/sink/SparkAvroCallbackHandler.scala
@@ -16,10 +16,10 @@
*/
package org.apache.spark.streaming.flume.sink
-import java.util.concurrent.{CountDownLatch, ConcurrentHashMap, Executors}
+import java.util.concurrent.{CountDownLatch, Executors}
import java.util.concurrent.atomic.AtomicLong
-import scala.collection.JavaConversions._
+import scala.collection.mutable
import org.apache.flume.Channel
import org.apache.commons.lang.RandomStringUtils
@@ -47,8 +47,8 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("Spark Sink Processor Thread - %d").build()))
- private val sequenceNumberToProcessor =
- new ConcurrentHashMap[CharSequence, TransactionProcessor]()
+ // Protected by `sequenceNumberToProcessor`
+ private val sequenceNumberToProcessor = mutable.HashMap[CharSequence, TransactionProcessor]()
// This sink will not persist sequence numbers and reuses them if it gets restarted.
// So it is possible to commit a transaction which may have been meant for the sink before the
// restart.
@@ -58,8 +58,8 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
private val seqBase = RandomStringUtils.randomAlphanumeric(8)
private val seqCounter = new AtomicLong(0)
-
- @volatile private var stopped = false
+ // Protected by `sequenceNumberToProcessor`
+ private var stopped = false
@volatile private var isTest = false
private var testLatch: CountDownLatch = null
@@ -131,7 +131,7 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
* @param success Whether the batch was successful or not.
*/
private def completeTransaction(sequenceNumber: CharSequence, success: Boolean) {
- Option(removeAndGetProcessor(sequenceNumber)).foreach(processor => {
+ removeAndGetProcessor(sequenceNumber).foreach(processor => {
processor.batchProcessed(success)
})
}
@@ -139,10 +139,11 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
/**
* Helper method to remove the TxnProcessor for a Sequence Number. Can be used to avoid a leak.
* @param sequenceNumber
- * @return The transaction processor for the corresponding batch. Note that this instance is no
- * longer tracked and the caller is responsible for that txn processor.
+ * @return An `Option` of the transaction processor for the corresponding batch. Note that this
+ * instance is no longer tracked and the caller is responsible for that txn processor.
*/
- private[sink] def removeAndGetProcessor(sequenceNumber: CharSequence): TransactionProcessor = {
+ private[sink] def removeAndGetProcessor(sequenceNumber: CharSequence):
+ Option[TransactionProcessor] = {
sequenceNumberToProcessor.synchronized {
sequenceNumberToProcessor.remove(sequenceNumber.toString)
}
@@ -160,7 +161,7 @@ private[flume] class SparkAvroCallbackHandler(val threads: Int, val channel: Cha
logInfo("Shutting down Spark Avro Callback Handler")
sequenceNumberToProcessor.synchronized {
stopped = true
- sequenceNumberToProcessor.values().foreach(_.shutdown())
+ sequenceNumberToProcessor.values.foreach(_.shutdown())
}
transactionExecutorOpt.foreach(_.shutdownNow())
}