aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorMatei Zaharia <matei.zaharia@gmail.com>2013-08-12 19:00:57 -0700
committerMatei Zaharia <matei.zaharia@gmail.com>2013-08-12 19:00:57 -0700
commit65d0d91fbaf429106d6d0653c0b432f92979b5ab (patch)
tree5e0e187c587ccce7fb10914a7117fd24ce61a5c2 /core/src
parent4346f0a1e9494216f318ba833b05607bdd61384f (diff)
parentcf08bb7a3e0596fe502fa618fdd6958772ebbeb5 (diff)
downloadspark-65d0d91fbaf429106d6d0653c0b432f92979b5ab.tar.gz
spark-65d0d91fbaf429106d6d0653c0b432f92979b5ab.tar.bz2
spark-65d0d91fbaf429106d6d0653c0b432f92979b5ab.zip
Merge pull request #807 from JoshRosen/guava-optional
Change scala.Option to Guava Optional in Java APIs
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/spark/api/java/JavaPairRDD.scala37
-rw-r--r--core/src/main/scala/spark/api/java/JavaRDDLike.scala5
-rw-r--r--core/src/main/scala/spark/api/java/JavaSparkContext.scala4
-rw-r--r--core/src/main/scala/spark/api/java/JavaUtils.scala28
-rw-r--r--core/src/test/scala/spark/JavaAPISuite.java30
5 files changed, 87 insertions, 17 deletions
diff --git a/core/src/main/scala/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
index ccc511dc5f..ff12e8b76c 100644
--- a/core/src/main/scala/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/spark/api/java/JavaPairRDD.scala
@@ -23,6 +23,7 @@ import java.util.Comparator
import scala.Tuple2
import scala.collection.JavaConversions._
+import com.google.common.base.Optional
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.OutputFormat
@@ -276,8 +277,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* partition the output RDD.
*/
def leftOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
- : JavaPairRDD[K, (V, Option[W])] =
- fromRDD(rdd.leftOuterJoin(other, partitioner))
+ : JavaPairRDD[K, (V, Optional[W])] = {
+ val joinResult = rdd.leftOuterJoin(other, partitioner)
+ fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))})
+ }
/**
* Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
@@ -286,8 +289,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* partition the output RDD.
*/
def rightOuterJoin[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
- : JavaPairRDD[K, (Option[V], W)] =
- fromRDD(rdd.rightOuterJoin(other, partitioner))
+ : JavaPairRDD[K, (Optional[V], W)] = {
+ val joinResult = rdd.rightOuterJoin(other, partitioner)
+ fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
+ }
/**
* Simplified version of combineByKey that hash-partitions the resulting RDD using the existing
@@ -340,8 +345,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
* using the existing partitioner/parallelism level.
*/
- def leftOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, Option[W])] =
- fromRDD(rdd.leftOuterJoin(other))
+ def leftOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (V, Optional[W])] = {
+ val joinResult = rdd.leftOuterJoin(other)
+ fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))})
+ }
/**
* Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
@@ -349,8 +356,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* pair (k, (v, None)) if no elements in `other` have key k. Hash-partitions the output
* into `numPartitions` partitions.
*/
- def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, Option[W])] =
- fromRDD(rdd.leftOuterJoin(other, numPartitions))
+ def leftOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (V, Optional[W])] = {
+ val joinResult = rdd.leftOuterJoin(other, numPartitions)
+ fromRDD(joinResult.mapValues{case (v, w) => (v, JavaUtils.optionToOptional(w))})
+ }
/**
* Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
@@ -358,8 +367,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
* RDD using the existing partitioner/parallelism level.
*/
- def rightOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Option[V], W)] =
- fromRDD(rdd.rightOuterJoin(other))
+ def rightOuterJoin[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (Optional[V], W)] = {
+ val joinResult = rdd.rightOuterJoin(other)
+ fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
+ }
/**
* Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
@@ -367,8 +378,10 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* pair (k, (None, w)) if no elements in `this` have key k. Hash-partitions the resulting
* RDD into the given number of partitions.
*/
- def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (Option[V], W)] =
- fromRDD(rdd.rightOuterJoin(other, numPartitions))
+ def rightOuterJoin[W](other: JavaPairRDD[K, W], numPartitions: Int): JavaPairRDD[K, (Optional[V], W)] = {
+ val joinResult = rdd.rightOuterJoin(other, numPartitions)
+ fromRDD(joinResult.mapValues{case (v, w) => (JavaUtils.optionToOptional(v), w)})
+ }
/**
* Return the key-value pairs in this RDD to the master as a Map.
diff --git a/core/src/main/scala/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
index 21b5abf053..e0255ed23e 100644
--- a/core/src/main/scala/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/spark/api/java/JavaRDDLike.scala
@@ -366,10 +366,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Gets the name of the file to which this RDD was checkpointed
*/
def getCheckpointFile(): Optional[String] = {
- rdd.getCheckpointFile match {
- case Some(file) => Optional.of(file)
- case _ => Optional.absent()
- }
+ JavaUtils.optionToOptional(rdd.getCheckpointFile)
}
/** A description of this RDD and its recursive dependencies for debugging. */
diff --git a/core/src/main/scala/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/spark/api/java/JavaSparkContext.scala
index fe182e7ab6..29d57004b5 100644
--- a/core/src/main/scala/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/spark/api/java/JavaSparkContext.scala
@@ -32,6 +32,8 @@ import spark.SparkContext.IntAccumulatorParam
import spark.SparkContext.DoubleAccumulatorParam
import spark.broadcast.Broadcast
+import com.google.common.base.Optional
+
/**
* A Java-friendly version of [[spark.SparkContext]] that returns [[spark.api.java.JavaRDD]]s and
* works with Java collections instead of Scala ones.
@@ -337,7 +339,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* or the spark.home Java property, or the SPARK_HOME environment variable
* (in that order of preference). If neither of these is set, return None.
*/
- def getSparkHome(): Option[String] = sc.getSparkHome()
+ def getSparkHome(): Optional[String] = JavaUtils.optionToOptional(sc.getSparkHome())
/**
* Add a file to be downloaded with this Spark job on every node.
diff --git a/core/src/main/scala/spark/api/java/JavaUtils.scala b/core/src/main/scala/spark/api/java/JavaUtils.scala
new file mode 100644
index 0000000000..ffc131ac83
--- /dev/null
+++ b/core/src/main/scala/spark/api/java/JavaUtils.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package spark.api.java
+
+import com.google.common.base.Optional
+
+object JavaUtils {
+ def optionToOptional[T](option: Option[T]): Optional[T] =
+ option match {
+ case Some(value) => Optional.of(value)
+ case None => Optional.absent()
+ }
+}
diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java
index 5e2bf2d231..4ab271de1a 100644
--- a/core/src/test/scala/spark/JavaAPISuite.java
+++ b/core/src/test/scala/spark/JavaAPISuite.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.*;
+import com.google.common.base.Optional;
import scala.Tuple2;
import com.google.common.base.Charsets;
@@ -198,6 +199,35 @@ public class JavaAPISuite implements Serializable {
}
@Test
+ public void leftOuterJoin() {
+ JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
+ new Tuple2<Integer, Integer>(1, 1),
+ new Tuple2<Integer, Integer>(1, 2),
+ new Tuple2<Integer, Integer>(2, 1),
+ new Tuple2<Integer, Integer>(3, 1)
+ ));
+ JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
+ new Tuple2<Integer, Character>(1, 'x'),
+ new Tuple2<Integer, Character>(2, 'y'),
+ new Tuple2<Integer, Character>(2, 'z'),
+ new Tuple2<Integer, Character>(4, 'w')
+ ));
+ List<Tuple2<Integer,Tuple2<Integer,Optional<Character>>>> joined =
+ rdd1.leftOuterJoin(rdd2).collect();
+ Assert.assertEquals(5, joined.size());
+ Tuple2<Integer,Tuple2<Integer,Optional<Character>>> firstUnmatched =
+ rdd1.leftOuterJoin(rdd2).filter(
+ new Function<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>, Boolean>() {
+ @Override
+ public Boolean call(Tuple2<Integer, Tuple2<Integer, Optional<Character>>> tup)
+ throws Exception {
+ return !tup._2()._2().isPresent();
+ }
+ }).first();
+ Assert.assertEquals(3, firstUnmatched._1().intValue());
+ }
+
+ @Test
public void foldReduce() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
Function2<Integer, Integer, Integer> add = new Function2<Integer, Integer, Integer>() {