aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-10-25 17:26:06 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-10-25 17:26:06 -0700
commitd307db6e554885d33e22ac09b567c66a0e702fd1 (patch)
treeb9aac03db9986a09fb578a40a1996dfb83a2732b /core
parent85e2cab6f68cbcb66bb348298463e2e86591d92d (diff)
parentdc9570782a90d731152246b347996ee12cf68aa3 (diff)
downloadspark-d307db6e554885d33e22ac09b567c66a0e702fd1.tar.gz
spark-d307db6e554885d33e22ac09b567c66a0e702fd1.tar.bz2
spark-d307db6e554885d33e22ac09b567c66a0e702fd1.zip
Merge pull request #102 from tdas/transform
Added new Spark Streaming operations New operations - transformWith which allows arbitrary 2-to-1 DStream transform, added to Scala and Java API - StreamingContext.transform to allow arbitrary n-to-1 DStream - leftOuterJoin and rightOuterJoin between 2 DStreams, added to Scala and Java API - missing variations of join and cogroup added to Scala Java API - missing JavaStreamingContext.union Updated a number of Java and Scala API docs
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/function/Function3.java36
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala34
3 files changed, 81 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 39f408b8c8..2142fd7327 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -622,4 +622,15 @@ object JavaPairRDD {
new JavaPairRDD[K, V](rdd)
implicit def toRDD[K, V](rdd: JavaPairRDD[K, V]): RDD[(K, V)] = rdd.rdd
+
+
+ /** Convert a JavaRDD of key-value pairs to JavaPairRDD. */
+ def fromJavaRDD[K, V](rdd: JavaRDD[(K, V)]): JavaPairRDD[K, V] = {
+ implicit val cmk: ClassManifest[K] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
+ implicit val cmv: ClassManifest[V] =
+ implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
+ new JavaPairRDD[K, V](rdd.rdd)
+ }
+
}
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function3.java b/core/src/main/scala/org/apache/spark/api/java/function/Function3.java
new file mode 100644
index 0000000000..ac6178924a
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/function/Function3.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function;
+
+import scala.reflect.ClassManifest;
+import scala.reflect.ClassManifest$;
+import scala.runtime.AbstractFunction2;
+
+import java.io.Serializable;
+
+/**
+ * A three-argument function that takes arguments of type T1, T2 and T3 and returns an R.
+ */
+public abstract class Function3<T1, T2, T3, R> extends WrappedFunction3<T1, T2, T3, R>
+ implements Serializable {
+
+ public ClassManifest<R> returnType() {
+ return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class);
+ }
+}
+
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala b/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala
new file mode 100644
index 0000000000..d314dbdf1d
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.java.function
+
+import scala.runtime.AbstractFunction3
+
+/**
+ * Subclass of Function3 for ease of calling from Java. The main thing it does is re-expose the
+ * apply() method as call() and declare that it can throw Exception (since AbstractFunction3.apply
+ * isn't marked to allow that).
+ */
+private[spark] abstract class WrappedFunction3[T1, T2, T3, R]
+ extends AbstractFunction3[T1, T2, T3, R] {
+ @throws(classOf[Exception])
+ def call(t1: T1, t2: T2, t3: T3): R
+
+ final def apply(t1: T1, t2: T2, t3: T3): R = call(t1, t2, t3)
+}
+