aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2013-08-11 12:05:09 -0700
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2013-08-11 12:05:09 -0700
commitd7f78b443b7c31b4db4eabb106801dc4a1866db7 (patch)
treea6bb0bee346fe6b2df7c5202ec3afc97bb980759 /streaming/src
parent95c62ca3060c89a44aa19aaab1fc9a9fff5a1196 (diff)
downloadspark-d7f78b443b7c31b4db4eabb106801dc4a1866db7.tar.gz
spark-d7f78b443b7c31b4db4eabb106801dc4a1866db7.tar.bz2
spark-d7f78b443b7c31b4db4eabb106801dc4a1866db7.zip
Change scala.Option to Guava Optional in Java APIs.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala7
1 files changed, 2 insertions, 5 deletions
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
index ccd15563b0..ea08fb3826 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaPairDStream.scala
@@ -29,7 +29,7 @@ import spark.{RDD, Partitioner}
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.conf.Configuration
-import spark.api.java.{JavaRDD, JavaPairRDD}
+import spark.api.java.{JavaUtils, JavaRDD, JavaPairRDD}
import spark.storage.StorageLevel
import com.google.common.base.Optional
import spark.RDD
@@ -401,10 +401,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
(Seq[V], Option[S]) => Option[S] = {
val scalaFunc: (Seq[V], Option[S]) => Option[S] = (values, state) => {
val list: JList[V] = values
- val scalaState: Optional[S] = state match {
- case Some(s) => Optional.of(s)
- case _ => Optional.absent()
- }
+ val scalaState: Optional[S] = JavaUtils.optionToOptional(state)
val result: Optional[S] = in.apply(list, scalaState)
result.isPresent match {
case true => Some(result.get())