aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2016-01-08 13:02:30 -0800
committerReynold Xin <rxin@databricks.com>2016-01-08 13:02:30 -0800
commit659fd9d04b988d48960eac4f352ca37066f43f5c (patch)
tree1893735497a7cfae284d7a9eb4dd07bed62b4ac4
parent553fd7b912a32476b481fd3f80c1d0664b6c6484 (diff)
downloadspark-659fd9d04b988d48960eac4f352ca37066f43f5c.tar.gz
spark-659fd9d04b988d48960eac4f352ca37066f43f5c.tar.bz2
spark-659fd9d04b988d48960eac4f352ca37066f43f5c.zip
[SPARK-4819] Remove Guava's "Optional" from public API
Replace Guava `Optional` with (an API clone of) Java 8 `java.util.Optional` (edit: and a clone of Guava `Optional`) See also https://github.com/apache/spark/pull/10512 Author: Sean Owen <sowen@cloudera.com> Closes #10513 from srowen/SPARK-4819.
-rw-r--r--core/src/main/java/org/apache/spark/api/java/Optional.java187
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala9
-rw-r--r--core/src/test/java/org/apache/spark/JavaAPISuite.java46
-rw-r--r--core/src/test/java/org/apache/spark/api/java/OptionalSuite.java94
-rw-r--r--docs/streaming-programming-guide.md1
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java20
-rw-r--r--extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java2
-rw-r--r--extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java1
-rw-r--r--network/common/pom.xml6
-rw-r--r--pom.xml11
-rw-r--r--project/MimaExcludes.scala11
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala12
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala3
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java2
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java4
-rw-r--r--tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala2
19 files changed, 333 insertions, 85 deletions
diff --git a/core/src/main/java/org/apache/spark/api/java/Optional.java b/core/src/main/java/org/apache/spark/api/java/Optional.java
new file mode 100644
index 0000000000..ca7babc3f0
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/java/Optional.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java;
+
+import java.io.Serializable;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * <p>Like {@code java.util.Optional} in Java 8, {@code scala.Option} in Scala, and
+ * {@code com.google.common.base.Optional} in Google Guava, this class represents a
+ * value of a given type that may or may not exist. It is used in methods that wish
+ * to optionally return a value, in preference to returning {@code null}.</p>
+ *
+ * <p>In fact, the class here is a reimplementation of the essential API of both
+ * {@code java.util.Optional} and {@code com.google.common.base.Optional}. From
+ * {@code java.util.Optional}, it implements:</p>
+ *
+ * <ul>
+ * <li>{@link #empty()}</li>
+ * <li>{@link #of(Object)}</li>
+ * <li>{@link #ofNullable(Object)}</li>
+ * <li>{@link #get()}</li>
+ * <li>{@link #orElse(Object)}</li>
+ * <li>{@link #isPresent()}</li>
+ * </ul>
+ *
+ * <p>From {@code com.google.common.base.Optional} it implements:</p>
+ *
+ * <ul>
+ * <li>{@link #absent()}</li>
+ * <li>{@link #of(Object)}</li>
+ * <li>{@link #fromNullable(Object)}</li>
+ * <li>{@link #get()}</li>
+ * <li>{@link #or(Object)}</li>
+ * <li>{@link #orNull()}</li>
+ * <li>{@link #isPresent()}</li>
+ * </ul>
+ *
+ * <p>{@code java.util.Optional} itself is not used at this time because the
+ * project does not require Java 8. Using {@code com.google.common.base.Optional}
+ * has in the past caused serious library version conflicts with Guava that can't
+ * be resolved by shading. Hence this work-alike clone.</p>
+ *
+ * @param <T> type of value held inside
+ */
+public final class Optional<T> implements Serializable {
+
+ private static final Optional<?> EMPTY = new Optional<>();
+
+ private final T value;
+
+ private Optional() {
+ this.value = null;
+ }
+
+ private Optional(T value) {
+ Preconditions.checkNotNull(value);
+ this.value = value;
+ }
+
+ // java.util.Optional API (subset)
+
+ /**
+ * @return an empty {@code Optional}
+ */
+ public static <T> Optional<T> empty() {
+ @SuppressWarnings("unchecked")
+ Optional<T> t = (Optional<T>) EMPTY;
+ return t;
+ }
+
+ /**
+ * @param value non-null value to wrap
+ * @return {@code Optional} wrapping this value
+ * @throws NullPointerException if value is null
+ */
+ public static <T> Optional<T> of(T value) {
+ return new Optional<>(value);
+ }
+
+ /**
+ * @param value value to wrap, which may be null
+ * @return {@code Optional} wrapping this value, which may be empty
+ */
+ public static <T> Optional<T> ofNullable(T value) {
+ if (value == null) {
+ return empty();
+ } else {
+ return of(value);
+ }
+ }
+
+ /**
+ * @return the value wrapped by this {@code Optional}
+ * @throws NullPointerException if this is empty (contains no value)
+ */
+ public T get() {
+ Preconditions.checkNotNull(value);
+ return value;
+ }
+
+ /**
+ * @param other value to return if this is empty
+ * @return this {@code Optional}'s value if present, or else the given value
+ */
+ public T orElse(T other) {
+ return value != null ? value : other;
+ }
+
+ /**
+ * @return true iff this {@code Optional} contains a value (non-empty)
+ */
+ public boolean isPresent() {
+ return value != null;
+ }
+
+ // Guava API (subset)
+ // of(), get() and isPresent() are identically present in the Guava API
+
+ /**
+ * @return an empty {@code Optional}
+ */
+ public static <T> Optional<T> absent() {
+ return empty();
+ }
+
+ /**
+ * @param value value to wrap, which may be null
+ * @return {@code Optional} wrapping this value, which may be empty
+ */
+ public static <T> Optional<T> fromNullable(T value) {
+ return ofNullable(value);
+ }
+
+ /**
+ * @param other value to return if this is empty
+ * @return this {@code Optional}'s value if present, or else the given value
+ */
+ public T or(T other) {
+ return value != null ? value : other;
+ }
+
+ /**
+ * @return this {@code Optional}'s value if present, or else null
+ */
+ public T orNull() {
+ return value;
+ }
+
+ // Common methods
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof Optional)) {
+ return false;
+ }
+ Optional<?> other = (Optional<?>) obj;
+ return value == null ? other.value == null : value.equals(other.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return value == null ? 0 : value.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return value == null ? "Optional.empty" : String.format("Optional[%s]", value);
+ }
+
+}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 59af1052eb..fb04472ee7 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -25,7 +25,6 @@ import scala.collection.JavaConverters._
import scala.language.implicitConversions
import scala.reflect.ClassTag
-import com.google.common.base.Optional
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
@@ -655,7 +654,6 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* keys; this also retains the original RDD's partitioning.
*/
def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairRDD[K, U] = {
- import scala.collection.JavaConverters._
def fn: (V) => Iterable[U] = (x: V) => f.call(x).asScala
implicit val ctag: ClassTag[U] = fakeClassTag
fromRDD(rdd.flatMapValues(fn))
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 242438237f..0f8d13cf5c 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -24,7 +24,6 @@ import java.util.{Comparator, Iterator => JIterator, List => JList}
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
-import com.google.common.base.Optional
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.spark._
@@ -122,7 +121,6 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* RDD, and then flattening the results.
*/
def flatMap[U](f: FlatMapFunction[T, U]): JavaRDD[U] = {
- import scala.collection.JavaConverters._
def fn: (T) => Iterable[U] = (x: T) => f.call(x).asScala
JavaRDD.fromRDD(rdd.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U])
}
@@ -132,7 +130,6 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* RDD, and then flattening the results.
*/
def flatMapToDouble(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = {
- import scala.collection.JavaConverters._
def fn: (T) => Iterable[jl.Double] = (x: T) => f.call(x).asScala
new JavaDoubleRDD(rdd.flatMap(fn).map((x: jl.Double) => x.doubleValue()))
}
@@ -142,7 +139,6 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* RDD, and then flattening the results.
*/
def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
- import scala.collection.JavaConverters._
def fn: (T) => Iterable[(K2, V2)] = (x: T) => f.call(x).asScala
def cm: ClassTag[(K2, V2)] = implicitly[ClassTag[(K2, V2)]]
JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2])
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 9990b22e14..01433ca2ef 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -25,7 +25,6 @@ import scala.collection.JavaConverters._
import scala.language.implicitConversions
import scala.reflect.ClassTag
-import com.google.common.base.Optional
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.{InputFormat, JobConf}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala b/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
index b2a4d053fa..f820401da2 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
@@ -22,13 +22,12 @@ import java.util.Map.Entry
import scala.collection.mutable
-import com.google.common.base.Optional
-
private[spark] object JavaUtils {
def optionToOptional[T](option: Option[T]): Optional[T] =
- option match {
- case Some(value) => Optional.of(value)
- case None => Optional.absent()
+ if (option.isDefined) {
+ Optional.of(option.get)
+ } else {
+ Optional.empty[T]
}
// Workaround for SPARK-3926 / SI-8911
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 47382e4231..44d5cac7c2 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -21,7 +21,17 @@ import java.io.*;
import java.nio.channels.FileChannel;
import java.nio.ByteBuffer;
import java.net.URI;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.*;
import scala.Tuple2;
@@ -35,7 +45,6 @@ import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.base.Throwables;
-import com.google.common.base.Optional;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
import org.apache.hadoop.io.IntWritable;
@@ -49,7 +58,12 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.apache.spark.api.java.*;
+import org.apache.spark.api.java.JavaDoubleRDD;
+import org.apache.spark.api.java.JavaFutureAction;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.*;
import org.apache.spark.input.PortableDataStream;
import org.apache.spark.partial.BoundedDouble;
@@ -1785,32 +1799,6 @@ public class JavaAPISuite implements Serializable {
Assert.assertTrue(future.isDone());
}
-
- /**
- * Test for SPARK-3647. This test needs to use the maven-built assembly to trigger the issue,
- * since that's the only artifact where Guava classes have been relocated.
- */
- @Test
- public void testGuavaOptional() {
- // Stop the context created in setUp() and start a local-cluster one, to force usage of the
- // assembly.
- sc.stop();
- JavaSparkContext localCluster = new JavaSparkContext("local-cluster[1,1,1024]", "JavaAPISuite");
- try {
- JavaRDD<Integer> rdd1 = localCluster.parallelize(Arrays.asList(1, 2, null), 3);
- JavaRDD<Optional<Integer>> rdd2 = rdd1.map(
- new Function<Integer, Optional<Integer>>() {
- @Override
- public Optional<Integer> call(Integer i) {
- return Optional.fromNullable(i);
- }
- });
- rdd2.collect();
- } finally {
- localCluster.stop();
- }
- }
-
static class Class1 {}
static class Class2 {}
diff --git a/core/src/test/java/org/apache/spark/api/java/OptionalSuite.java b/core/src/test/java/org/apache/spark/api/java/OptionalSuite.java
new file mode 100644
index 0000000000..4b97c18198
--- /dev/null
+++ b/core/src/test/java/org/apache/spark/api/java/OptionalSuite.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests {@link Optional}.
+ */
+public class OptionalSuite {
+
+ @Test
+ public void testEmpty() {
+ Assert.assertFalse(Optional.empty().isPresent());
+ Assert.assertNull(Optional.empty().orNull());
+ Assert.assertEquals("foo", Optional.empty().or("foo"));
+ Assert.assertEquals("foo", Optional.empty().orElse("foo"));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testEmptyGet() {
+ Optional.empty().get();
+ }
+
+ @Test
+ public void testAbsent() {
+ Assert.assertFalse(Optional.absent().isPresent());
+ Assert.assertNull(Optional.absent().orNull());
+ Assert.assertEquals("foo", Optional.absent().or("foo"));
+ Assert.assertEquals("foo", Optional.absent().orElse("foo"));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testAbsentGet() {
+ Optional.absent().get();
+ }
+
+ @Test
+ public void testOf() {
+ Assert.assertTrue(Optional.of(1).isPresent());
+ Assert.assertNotNull(Optional.of(1).orNull());
+ Assert.assertEquals(Integer.valueOf(1), Optional.of(1).get());
+ Assert.assertEquals(Integer.valueOf(1), Optional.of(1).or(2));
+ Assert.assertEquals(Integer.valueOf(1), Optional.of(1).orElse(2));
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testOfWithNull() {
+ Optional.of(null);
+ }
+
+ @Test
+ public void testOfNullable() {
+ Assert.assertTrue(Optional.ofNullable(1).isPresent());
+ Assert.assertNotNull(Optional.ofNullable(1).orNull());
+ Assert.assertEquals(Integer.valueOf(1), Optional.ofNullable(1).get());
+ Assert.assertEquals(Integer.valueOf(1), Optional.ofNullable(1).or(2));
+ Assert.assertEquals(Integer.valueOf(1), Optional.ofNullable(1).orElse(2));
+ Assert.assertFalse(Optional.ofNullable(null).isPresent());
+ Assert.assertNull(Optional.ofNullable(null).orNull());
+ Assert.assertEquals(Integer.valueOf(2), Optional.<Integer>ofNullable(null).or(2));
+ Assert.assertEquals(Integer.valueOf(2), Optional.<Integer>ofNullable(null).orElse(2));
+ }
+
+ @Test
+ public void testFromNullable() {
+ Assert.assertTrue(Optional.fromNullable(1).isPresent());
+ Assert.assertNotNull(Optional.fromNullable(1).orNull());
+ Assert.assertEquals(Integer.valueOf(1), Optional.fromNullable(1).get());
+ Assert.assertEquals(Integer.valueOf(1), Optional.fromNullable(1).or(2));
+ Assert.assertEquals(Integer.valueOf(1), Optional.fromNullable(1).orElse(2));
+ Assert.assertFalse(Optional.fromNullable(null).isPresent());
+ Assert.assertNull(Optional.fromNullable(null).orNull());
+ Assert.assertEquals(Integer.valueOf(2), Optional.<Integer>fromNullable(null).or(2));
+ Assert.assertEquals(Integer.valueOf(2), Optional.<Integer>fromNullable(null).orElse(2));
+ }
+
+}
diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md
index 1edc0fe347..8fd075d02b 100644
--- a/docs/streaming-programming-guide.md
+++ b/docs/streaming-programming-guide.md
@@ -881,7 +881,6 @@ Scala code, take a look at the example
<div data-lang="java" markdown="1">
{% highlight java %}
-import com.google.common.base.Optional;
Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
@Override public Optional<Integer> call(List<Integer> values, Optional<Integer> state) {
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
index 14997c64d5..f52cc7c205 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
@@ -23,17 +23,14 @@ import java.util.regex.Pattern;
import scala.Tuple2;
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.*;
import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.StorageLevels;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.State;
import org.apache.spark.streaming.StateSpec;
-import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.*;
/**
@@ -67,8 +64,8 @@ public class JavaStatefulNetworkWordCount {
// Initial state RDD input to mapWithState
@SuppressWarnings("unchecked")
- List<Tuple2<String, Integer>> tuples = Arrays.asList(new Tuple2<String, Integer>("hello", 1),
- new Tuple2<String, Integer>("world", 1));
+ List<Tuple2<String, Integer>> tuples =
+ Arrays.asList(new Tuple2<>("hello", 1), new Tuple2<>("world", 1));
JavaPairRDD<String, Integer> initialRDD = ssc.sparkContext().parallelizePairs(tuples);
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
@@ -77,7 +74,7 @@ public class JavaStatefulNetworkWordCount {
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
- return Lists.newArrayList(SPACE.split(x));
+ return Arrays.asList(SPACE.split(x));
}
});
@@ -85,18 +82,17 @@ public class JavaStatefulNetworkWordCount {
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
- return new Tuple2<String, Integer>(s, 1);
+ return new Tuple2<>(s, 1);
}
});
// Update the cumulative count function
- final Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
+ Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>> mappingFunc =
new Function3<String, Optional<Integer>, State<Integer>, Tuple2<String, Integer>>() {
-
@Override
public Tuple2<String, Integer> call(String word, Optional<Integer> one, State<Integer> state) {
- int sum = one.or(0) + (state.exists() ? state.get() : 0);
- Tuple2<String, Integer> output = new Tuple2<String, Integer>(word, sum);
+ int sum = one.orElse(0) + (state.exists() ? state.get() : 0);
+ Tuple2<String, Integer> output = new Tuple2<>(word, sum);
state.update(sum);
return output;
}
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
index 14975265ab..27d494ce35 100644
--- a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
+++ b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
@@ -24,7 +24,6 @@ import java.util.*;
import scala.Tuple2;
import com.google.common.collect.Iterables;
-import com.google.common.base.Optional;
import com.google.common.io.Files;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
@@ -38,6 +37,7 @@ import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.*;
import org.apache.spark.util.Utils;
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
index e8a0dfc0f0..604d818ef1 100644
--- a/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
+++ b/extras/java8-tests/src/test/java/org/apache/spark/streaming/Java8APISuite.java
@@ -22,7 +22,6 @@ import java.util.*;
import scala.Tuple2;
-import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.junit.Assert;
diff --git a/network/common/pom.xml b/network/common/pom.xml
index 32c34c63a4..92ca0046d4 100644
--- a/network/common/pom.xml
+++ b/network/common/pom.xml
@@ -52,15 +52,9 @@
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
- <!--
- Promote Guava to "compile" so that maven-shade-plugin picks it up (for packaging the Optional
- class exposed in the Java API). The plugin will then remove this dependency from the published
- pom, so that Guava does not pollute the client's compilation classpath.
- -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
- <scope>compile</scope>
</dependency>
<!-- Test dependencies -->
diff --git a/pom.xml b/pom.xml
index e414a8bfe6..9c975a45f8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2251,17 +2251,6 @@
<relocation>
<pattern>com.google.common</pattern>
<shadedPattern>org.spark-project.guava</shadedPattern>
- <excludes>
- <!--
- These classes cannot be relocated, because the Java API exposes the
- "Optional" type; the others are referenced by the Optional class.
- -->
- <exclude>com/google/common/base/Absent*</exclude>
- <exclude>com/google/common/base/Function</exclude>
- <exclude>com/google/common/base/Optional*</exclude>
- <exclude>com/google/common/base/Present*</exclude>
- <exclude>com/google/common/base/Supplier</exclude>
- </excludes>
</relocation>
</relocations>
</configuration>
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 40559a0910..0d5f938d9e 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -57,7 +57,16 @@ object MimaExcludes {
) ++ Seq(
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.SparkContext.emptyRDD"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.broadcast.HttpBroadcastFactory")
- ) ++
+ ) ++
+ Seq(
+ // SPARK-4819 replace Guava Optional
+ ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.api.java.JavaSparkContext.getCheckpointDir"),
+ ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.api.java.JavaSparkContext.getSparkHome"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.getCheckpointFile"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.partitioner"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.getCheckpointFile"),
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.api.java.JavaRDDLike.partitioner")
+ ) ++
Seq(
// SPARK-12481 Remove Hadoop 1.x
ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.mapred.SparkHadoopMapRedUtil"),
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala b/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala
index 0b094558df..f1114c1e5a 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StateSpec.scala
@@ -17,11 +17,9 @@
package org.apache.spark.streaming
-import com.google.common.base.Optional
-
import org.apache.spark.{HashPartitioner, Partitioner}
import org.apache.spark.annotation.Experimental
-import org.apache.spark.api.java.{JavaPairRDD, JavaUtils}
+import org.apache.spark.api.java.{JavaPairRDD, JavaUtils, Optional}
import org.apache.spark.api.java.function.{Function3 => JFunction3, Function4 => JFunction4}
import org.apache.spark.rdd.RDD
import org.apache.spark.util.ClosureCleaner
@@ -200,7 +198,11 @@ object StateSpec {
StateSpec[KeyType, ValueType, StateType, MappedType] = {
val wrappedFunc = (time: Time, k: KeyType, v: Option[ValueType], s: State[StateType]) => {
val t = mappingFunction.call(time, k, JavaUtils.optionToOptional(v), s)
- Option(t.orNull)
+ if (t.isPresent) {
+ Some(t.get)
+ } else {
+ None
+ }
}
StateSpec.function(wrappedFunc)
}
@@ -220,7 +222,7 @@ object StateSpec {
mappingFunction: JFunction3[KeyType, Optional[ValueType], State[StateType], MappedType]):
StateSpec[KeyType, ValueType, StateType, MappedType] = {
val wrappedFunc = (k: KeyType, v: Option[ValueType], s: State[StateType]) => {
- mappingFunction.call(k, Optional.fromNullable(v.get), s)
+ mappingFunction.call(k, Optional.ofNullable(v.get), s)
}
StateSpec.function(wrappedFunc)
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index af0d84b332..d718f1d6fc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -25,14 +25,13 @@ import scala.collection.JavaConverters._
import scala.language.implicitConversions
import scala.reflect.ClassTag
-import com.google.common.base.Optional
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.spark.Partitioner
import org.apache.spark.annotation.Experimental
-import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaUtils}
+import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext, JavaUtils, Optional}
import org.apache.spark.api.java.JavaPairRDD._
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index ddc56fc869..4dbcef2934 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -33,7 +33,6 @@ import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.junit.Assert;
import org.junit.Test;
-import com.google.common.base.Optional;
import com.google.common.io.Files;
import com.google.common.collect.Sets;
@@ -43,6 +42,7 @@ import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.*;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.*;
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
index 20e2a1c3d5..9b7701003d 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
@@ -26,7 +26,6 @@ import java.util.Set;
import scala.Tuple2;
-import com.google.common.base.Optional;
import com.google.common.collect.Sets;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
@@ -38,6 +37,7 @@ import org.junit.Test;
import org.apache.spark.HashPartitioner;
import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.Function3;
import org.apache.spark.api.java.function.Function4;
import org.apache.spark.streaming.api.java.JavaPairDStream;
@@ -139,7 +139,7 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements
new Function3<String, Optional<Integer>, State<Integer>, Integer>() {
@Override
public Integer call(String key, Optional<Integer> value, State<Integer> state) {
- int sum = value.or(0) + (state.exists() ? state.get() : 0);
+ int sum = value.orElse(0) + (state.exists() ? state.get() : 0);
state.update(sum);
return sum;
}
diff --git a/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala b/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
index 6fb7184e87..ccd8fd3969 100644
--- a/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
+++ b/tools/src/main/scala/org/apache/spark/tools/JavaAPICompletenessChecker.scala
@@ -161,7 +161,7 @@ object JavaAPICompletenessChecker {
}
case "scala.Option" => {
if (isReturnType) {
- ParameterizedType("com.google.common.base.Optional", parameters.map(applySubs))
+ ParameterizedType("org.apache.spark.api.java.Optional", parameters.map(applySubs))
} else {
applySubs(parameters(0))
}