diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-02-04 12:43:16 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-02-04 12:43:16 -0800 |
commit | 8e2f296306131e6c7c2f06d6672995d3ff8ab021 (patch) | |
tree | c338de2d4c0e2cbf37c8f70d65e8a9b8f9e7a7b6 /streaming/src/main | |
parent | bd38dd6f75c4af0f8f32bb21a82da53fffa5e825 (diff) | |
download | spark-8e2f296306131e6c7c2f06d6672995d3ff8ab021.tar.gz spark-8e2f296306131e6c7c2f06d6672995d3ff8ab021.tar.bz2 spark-8e2f296306131e6c7c2f06d6672995d3ff8ab021.zip |
[SPARK-13195][STREAMING] Fix NoSuchElementException when a state is not set but timeoutThreshold is defined
Check the state Existence before calling get.
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #11081 from zsxwing/SPARK-13195.
Diffstat (limited to 'streaming/src/main')
-rw-r--r-- | streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala | 3 |
1 files changed, 2 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala index 1d2244eaf2..6ab1956bed 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/MapWithStateRDD.scala @@ -57,7 +57,8 @@ private[streaming] object MapWithStateRDDRecord { val returned = mappingFunction(batchTime, key, Some(value), wrappedState) if (wrappedState.isRemoved) { newStateMap.remove(key) - } else if (wrappedState.isUpdated || timeoutThresholdTime.isDefined) { + } else if (wrappedState.isUpdated + || (wrappedState.exists && timeoutThresholdTime.isDefined)) { newStateMap.put(key, wrappedState.get(), batchTime.milliseconds) } mappedData ++= returned |