aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala2
1 files changed, 1 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala b/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala
index 66f646d7dc..e6724feaee 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala
@@ -221,7 +221,7 @@ object StateSpec {
mappingFunction: JFunction3[KeyType, Optional[ValueType], State[StateType], MappedType]):
StateSpec[KeyType, ValueType, StateType, MappedType] = {
val wrappedFunc = (k: KeyType, v: Option[ValueType], s: State[StateType]) => {
- mappingFunction.call(k, Optional.ofNullable(v.get), s)
+ mappingFunction.call(k, JavaUtils.optionToOptional(v), s)
}
StateSpec.function(wrappedFunc)
}