aboutsummaryrefslogtreecommitdiff
path: root/extras
diff options
context:
space:
mode:
authorJean-Baptiste Onofré <jbonofre@apache.org>2015-12-12 08:51:52 +0000
committerSean Owen <sowen@cloudera.com>2015-12-12 08:51:52 +0000
commit03138b67d3ef7f5278ea9f8b9c75f0e357ef79d8 (patch)
tree793ae3bc74e0bd91b483cd6f9f548354548502d2 /extras
parent1e3526c2d3de723225024fedd45753b556e18fc6 (diff)
downloadspark-03138b67d3ef7f5278ea9f8b9c75f0e357ef79d8.tar.gz
spark-03138b67d3ef7f5278ea9f8b9c75f0e357ef79d8.tar.bz2
spark-03138b67d3ef7f5278ea9f8b9c75f0e357ef79d8.zip
[SPARK-11193] Use Java ConcurrentHashMap instead of SynchronizedMap trait in order to avoid ClassCastException due to KryoSerializer in KinesisReceiver
Author: Jean-Baptiste Onofré <jbonofre@apache.org> Closes #10203 from jbonofre/SPARK-11193.
Diffstat (limited to 'extras')
-rw-r--r--extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala16
1 files changed, 8 insertions, 8 deletions
diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
index 05080835fc..80edda59e1 100644
--- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
+++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala
@@ -17,6 +17,7 @@
package org.apache.spark.streaming.kinesis
import java.util.UUID
+import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConverters._
import scala.collection.mutable
@@ -124,8 +125,7 @@ private[kinesis] class KinesisReceiver[T](
private val seqNumRangesInCurrentBlock = new mutable.ArrayBuffer[SequenceNumberRange]
/** Sequence number ranges of data added to each generated block */
- private val blockIdToSeqNumRanges = new mutable.HashMap[StreamBlockId, SequenceNumberRanges]
- with mutable.SynchronizedMap[StreamBlockId, SequenceNumberRanges]
+ private val blockIdToSeqNumRanges = new ConcurrentHashMap[StreamBlockId, SequenceNumberRanges]
/**
* The centralized kinesisCheckpointer that checkpoints based on the given checkpointInterval.
@@ -135,8 +135,8 @@ private[kinesis] class KinesisReceiver[T](
/**
* Latest sequence number ranges that have been stored successfully.
* This is used for checkpointing through KCL */
- private val shardIdToLatestStoredSeqNum = new mutable.HashMap[String, String]
- with mutable.SynchronizedMap[String, String]
+ private val shardIdToLatestStoredSeqNum = new ConcurrentHashMap[String, String]
+
/**
* This is called when the KinesisReceiver starts and must be non-blocking.
* The KCL creates and manages the receiving/processing thread pool through Worker.run().
@@ -222,7 +222,7 @@ private[kinesis] class KinesisReceiver[T](
/** Get the latest sequence number for the given shard that can be checkpointed through KCL */
private[kinesis] def getLatestSeqNumToCheckpoint(shardId: String): Option[String] = {
- shardIdToLatestStoredSeqNum.get(shardId)
+ Option(shardIdToLatestStoredSeqNum.get(shardId))
}
/**
@@ -257,7 +257,7 @@ private[kinesis] class KinesisReceiver[T](
* for next block. Internally, this is synchronized with `rememberAddedRange()`.
*/
private def finalizeRangesForCurrentBlock(blockId: StreamBlockId): Unit = {
- blockIdToSeqNumRanges(blockId) = SequenceNumberRanges(seqNumRangesInCurrentBlock.toArray)
+ blockIdToSeqNumRanges.put(blockId, SequenceNumberRanges(seqNumRangesInCurrentBlock.toArray))
seqNumRangesInCurrentBlock.clear()
logDebug(s"Generated block $blockId has $blockIdToSeqNumRanges")
}
@@ -265,7 +265,7 @@ private[kinesis] class KinesisReceiver[T](
/** Store the block along with its associated ranges */
private def storeBlockWithRanges(
blockId: StreamBlockId, arrayBuffer: mutable.ArrayBuffer[T]): Unit = {
- val rangesToReportOption = blockIdToSeqNumRanges.remove(blockId)
+ val rangesToReportOption = Option(blockIdToSeqNumRanges.remove(blockId))
if (rangesToReportOption.isEmpty) {
stop("Error while storing block into Spark, could not find sequence number ranges " +
s"for block $blockId")
@@ -294,7 +294,7 @@ private[kinesis] class KinesisReceiver[T](
// Note that we are doing this sequentially because the array of sequence number ranges
// is assumed to be
rangesToReport.ranges.foreach { range =>
- shardIdToLatestStoredSeqNum(range.shardId) = range.toSeqNumber
+ shardIdToLatestStoredSeqNum.put(range.shardId, range.toSeqNumber)
}
}