aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorGabriele Nizzoli <mail@nizzoli.net>2016-02-02 13:20:01 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-02-02 13:20:01 -0800
commitd0df2ca40953ba581dce199798a168af01283cdc (patch)
tree0973414cfec1971497cd3faaa0e3f33769714d52 /streaming
parentbe5dd881f1eff248224a92d57cfd1309cb3acf38 (diff)
downloadspark-d0df2ca40953ba581dce199798a168af01283cdc.tar.gz
spark-d0df2ca40953ba581dce199798a168af01283cdc.tar.bz2
spark-d0df2ca40953ba581dce199798a168af01283cdc.zip
[SPARK-13121][STREAMING] java mapWithState mishandles scala Option
Already merged into 1.6 branch, this PR is to commit to master the same change Author: Gabriele Nizzoli <mail@nizzoli.net> Closes #11028 from gabrielenizzoli/patch-1.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala2
1 files changed, 1 insertions, 1 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala b/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala
index 66f646d7dc..e6724feaee 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala
@@ -221,7 +221,7 @@ object StateSpec {
mappingFunction: JFunction3[KeyType, Optional[ValueType], State[StateType], MappedType]):
StateSpec[KeyType, ValueType, StateType, MappedType] = {
val wrappedFunc = (k: KeyType, v: Option[ValueType], s: State[StateType]) => {
- mappingFunction.call(k, Optional.ofNullable(v.get), s)
+ mappingFunction.call(k, JavaUtils.optionToOptional(v), s)
}
StateSpec.function(wrappedFunc)
}