aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-02-04 12:43:16 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-02-04 12:43:16 -0800
commit8e2f296306131e6c7c2f06d6672995d3ff8ab021 (patch)
treec338de2d4c0e2cbf37c8f70d65e8a9b8f9e7a7b6 /streaming
parentbd38dd6f75c4af0f8f32bb21a82da53fffa5e825 (diff)
downloadspark-8e2f296306131e6c7c2f06d6672995d3ff8ab021.tar.gz
spark-8e2f296306131e6c7c2f06d6672995d3ff8ab021.tar.bz2
spark-8e2f296306131e6c7c2f06d6672995d3ff8ab021.zip
[SPARK-13195][STREAMING] Fix NoSuchElementException when a state is not set but timeoutThreshold is defined
Check the state Existence before calling get. Author: Shixiong Zhu <shixiong@databricks.com> Closes #11081 from zsxwing/SPARK-13195.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala3
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala5
2 files changed, 7 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala
index 1d2244eaf2..6ab1956bed 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala
@@ -57,7 +57,8 @@ private[streaming] object MapWithStateRDDRecord {
val returned = mappingFunction(batchTime, key, Some(value), wrappedState)
if (wrappedState.isRemoved) {
newStateMap.remove(key)
- } else if (wrappedState.isUpdated || timeoutThresholdTime.isDefined) {
+ } else if (wrappedState.isUpdated
+ || (wrappedState.exists && timeoutThresholdTime.isDefined)) {
newStateMap.put(key, wrappedState.get(), batchTime.milliseconds)
}
mappedData ++= returned
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala
index 5b13fd6ad6..e8c814ba71 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/MapWithStateRDDSuite.scala
@@ -190,6 +190,11 @@ class MapWithStateRDDSuite extends SparkFunSuite with RDDCheckpointTester with B
timeoutThreshold = Some(initialTime + 1), removeTimedoutData = true,
expectedStates = Nil, expectedTimingOutStates = Nil, expectedRemovedStates = Seq(123))
+ // If a state is not set but timeoutThreshold is defined, we should ignore this state.
+ // Previously it threw NoSuchElementException (SPARK-13195).
+ assertRecordUpdate(initStates = Seq(), data = Seq("noop"),
+ timeoutThreshold = Some(initialTime + 1), removeTimedoutData = true,
+ expectedStates = Nil, expectedTimingOutStates = Nil)
}
test("states generated by MapWithStateRDD") {