From 659fd9d04b988d48960eac4f352ca37066f43f5c Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Fri, 8 Jan 2016 13:02:30 -0800 Subject: [SPARK-4819] Remove Guava's "Optional" from public API Replace Guava `Optional` with (an API clone of) Java 8 `java.util.Optional` (edit: and a clone of Guava `Optional`) See also https://github.com/apache/spark/pull/10512 Author: Sean Owen Closes #10513 from srowen/SPARK-4819. --- .../main/scala/org/apache/spark/streaming/StateSpec.scala | 12 +++++++----- .../apache/spark/streaming/api/java/JavaPairDStream.scala | 3 +-- .../test/java/org/apache/spark/streaming/JavaAPISuite.java | 2 +- .../org/apache/spark/streaming/JavaMapWithStateSuite.java | 4 ++-- 4 files changed, 11 insertions(+), 10 deletions(-) (limited to 'streaming/src') diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala b/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala index 0b094558df..f1114c1e5a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala @@ -17,11 +17,9 @@ package org.apache.spark.streaming -import com.google.common.base.Optional - import org.apache.spark.{HashPartitioner, Partitioner} import org.apache.spark.annotation.Experimental -import org.apache.spark.api.java.{JavaPairRDD, JavaUtils} +import org.apache.spark.api.java.{JavaPairRDD, JavaUtils, Optional} import org.apache.spark.api.java.function.{Function3 => JFunction3, Function4 => JFunction4} import org.apache.spark.rdd.RDD import org.apache.spark.util.ClosureCleaner @@ -200,7 +198,11 @@ object StateSpec { StateSpec[KeyType, ValueType, StateType, MappedType] = { val wrappedFunc = (time: Time, k: KeyType, v: Option[ValueType], s: State[StateType]) => { val t = mappingFunction.call(time, k, JavaUtils.optionToOptional(v), s) - Option(t.orNull) + if (t.isPresent) { + Some(t.get) + } else { + None + } } StateSpec.function(wrappedFunc) } @@ -220,7 +222,7 @@ object StateSpec { mappingFunction: JFunction3[KeyType, Optional[ValueType], State[StateType], MappedType]): StateSpec[KeyType, ValueType, StateType, MappedType] = { val wrappedFunc = (k: KeyType, v: Option[ValueType], s: State[StateType]) => { - mappingFunction.call(k, Optional.fromNullable(v.get), s) + mappingFunction.call(k, Optional.ofNullable(v.get), s) } StateSpec.function(wrappedFunc) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index af0d84b332..d718f1d6fc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -25,14 +25,13 @@ import scala.collection.JavaConverters._ import scala.language.implicitConversions import scala.reflect.ClassTag -import com.google.common.base.Optional import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapred.{JobConf, OutputFormat} import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat} import org.apache.spark.Partitioner import org.apache.spark.annotation.Experimental -import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaUtils} +import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaUtils, Optional} import org.apache.spark.api.java.JavaPairRDD._ import org.apache.spark.api.java.JavaSparkContext.fakeClassTag import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2} diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java index ddc56fc869..4dbcef2934 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java @@ -33,7 +33,6 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.junit.Assert; import org.junit.Test; -import com.google.common.base.Optional; import com.google.common.io.Files; import com.google.common.collect.Sets; @@ -43,6 +42,7 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.Optional; import org.apache.spark.api.java.function.*; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.api.java.*; diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java index 20e2a1c3d5..9b7701003d 100644 --- a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java +++ b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java @@ -26,7 +26,6 @@ import java.util.Set; import scala.Tuple2; -import com.google.common.base.Optional; import com.google.common.collect.Sets; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; @@ -38,6 +37,7 @@ import org.junit.Test; import org.apache.spark.HashPartitioner; import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.Optional; import org.apache.spark.api.java.function.Function3; import org.apache.spark.api.java.function.Function4; import org.apache.spark.streaming.api.java.JavaPairDStream; @@ -139,7 +139,7 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements new Function3, State, Integer>() { @Override public Integer call(String key, Optional value, State state) { - int sum = value.or(0) + (state.exists() ? state.get() : 0); + int sum = value.orElse(0) + (state.exists() ? state.get() : 0); state.update(sum); return sum; } -- cgit v1.2.3