aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-01-08 13:02:30 -0800
committerReynold Xin <rxin@databricks.com>2016-01-08 13:02:30 -0800
commit659fd9d04b988d48960eac4f352ca37066f43f5c (patch)
tree1893735497a7cfae284d7a9eb4dd07bed62b4ac4 /streaming
parent553fd7b912a32476b481fd3f80c1d0664b6c6484 (diff)
downloadspark-659fd9d04b988d48960eac4f352ca37066f43f5c.tar.gz
spark-659fd9d04b988d48960eac4f352ca37066f43f5c.tar.bz2
spark-659fd9d04b988d48960eac4f352ca37066f43f5c.zip
[SPARK-4819] Remove Guava's "Optional" from public API
Replace Guava `Optional` with (an API clone of) Java 8 `java.util.Optional` (edit: and a clone of Guava `Optional`) See also https://github.com/apache/spark/pull/10512 Author: Sean Owen <sowen@cloudera.com> Closes #10513 from srowen/SPARK-4819.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala12
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala3
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java2
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java4
4 files changed, 11 insertions, 10 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala b/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala
index 0b094558df..f1114c1e5a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala
@@ -17,11 +17,9 @@
package org.apache.spark.streaming
-import com.google.common.base.Optional
-
import org.apache.spark.{HashPartitioner, Partitioner}
import org.apache.spark.annotation.Experimental
-import org.apache.spark.api.java.{JavaPairRDD, JavaUtils}
+import org.apache.spark.api.java.{JavaPairRDD, JavaUtils, Optional}
import org.apache.spark.api.java.function.{Function3 => JFunction3, Function4 => JFunction4}
import org.apache.spark.rdd.RDD
import org.apache.spark.util.ClosureCleaner
@@ -200,7 +198,11 @@ object StateSpec {
StateSpec[KeyType, ValueType, StateType, MappedType] = {
val wrappedFunc = (time: Time, k: KeyType, v: Option[ValueType], s: State[StateType]) => {
val t = mappingFunction.call(time, k, JavaUtils.optionToOptional(v), s)
- Option(t.orNull)
+ if (t.isPresent) {
+ Some(t.get)
+ } else {
+ None
+ }
}
StateSpec.function(wrappedFunc)
}
@@ -220,7 +222,7 @@ object StateSpec {
mappingFunction: JFunction3[KeyType, Optional[ValueType], State[StateType], MappedType]):
StateSpec[KeyType, ValueType, StateType, MappedType] = {
val wrappedFunc = (k: KeyType, v: Option[ValueType], s: State[StateType]) => {
- mappingFunction.call(k, Optional.fromNullable(v.get), s)
+ mappingFunction.call(k, Optional.ofNullable(v.get), s)
}
StateSpec.function(wrappedFunc)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index af0d84b332..d718f1d6fc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -25,14 +25,13 @@ import scala.collection.JavaConverters._
import scala.language.implicitConversions
import scala.reflect.ClassTag
-import com.google.common.base.Optional
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.spark.Partitioner
import org.apache.spark.annotation.Experimental
-import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaUtils}
+import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaUtils, Optional}
import org.apache.spark.api.java.JavaPairRDD._
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index ddc56fc869..4dbcef2934 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.junit.Assert;
import org.junit.Test;
-import com.google.common.base.Optional;
import com.google.common.io.Files;
import com.google.common.collect.Sets;
@@ -43,6 +42,7 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.*;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.*;
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
index 20e2a1c3d5..9b7701003d 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
@@ -26,7 +26,6 @@ import java.util.Set;
import scala.Tuple2;
-import com.google.common.base.Optional;
import com.google.common.collect.Sets;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
@@ -38,6 +37,7 @@ import org.junit.Test;
import org.apache.spark.HashPartitioner;
import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function3;
import org.apache.spark.api.java.function.Function4;
import org.apache.spark.streaming.api.java.JavaPairDStream;
@@ -139,7 +139,7 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements
new Function3<String, Optional<Integer>, State<Integer>, Integer>() {
@Override
public Integer call(String key, Optional<Integer> value, State<Integer> state) {
- int sum = value.or(0) + (state.exists() ? state.get() : 0);
+ int sum = value.orElse(0) + (state.exists() ? state.get() : 0);
state.update(sum);
return sum;
}