diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-10-25 17:26:06 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-10-25 17:26:06 -0700 |
commit | d307db6e554885d33e22ac09b567c66a0e702fd1 (patch) | |
tree | b9aac03db9986a09fb578a40a1996dfb83a2732b /core | |
parent | 85e2cab6f68cbcb66bb348298463e2e86591d92d (diff) | |
parent | dc9570782a90d731152246b347996ee12cf68aa3 (diff) | |
download | spark-d307db6e554885d33e22ac09b567c66a0e702fd1.tar.gz spark-d307db6e554885d33e22ac09b567c66a0e702fd1.tar.bz2 spark-d307db6e554885d33e22ac09b567c66a0e702fd1.zip |
Merge pull request #102 from tdas/transform
Added new Spark Streaming operations
New operations
- transformWith which allows arbitrary 2-to-1 DStream transform, added to Scala and Java API
- StreamingContext.transform to allow arbitrary n-to-1 DStream
- leftOuterJoin and rightOuterJoin between 2 DStreams, added to Scala and Java API
- missing variations of join and cogroup added to Scala Java API
- missing JavaStreamingContext.union
Updated a number of Java and Scala API docs
Diffstat (limited to 'core')
3 files changed, 81 insertions, 0 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 39f408b8c8..2142fd7327 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -622,4 +622,15 @@ object JavaPairRDD { new JavaPairRDD[K, V](rdd) implicit def toRDD[K, V](rdd: JavaPairRDD[K, V]): RDD[(K, V)] = rdd.rdd + + + /** Convert a JavaRDD of key-value pairs to JavaPairRDD. */ + def fromJavaRDD[K, V](rdd: JavaRDD[(K, V)]): JavaPairRDD[K, V] = { + implicit val cmk: ClassManifest[K] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]] + implicit val cmv: ClassManifest[V] = + implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]] + new JavaPairRDD[K, V](rdd.rdd) + } + } diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function3.java b/core/src/main/scala/org/apache/spark/api/java/function/Function3.java new file mode 100644 index 0000000000..ac6178924a --- /dev/null +++ b/core/src/main/scala/org/apache/spark/api/java/function/Function3.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.java.function; + +import scala.reflect.ClassManifest; +import scala.reflect.ClassManifest$; +import scala.runtime.AbstractFunction2; + +import java.io.Serializable; + +/** + * A three-argument function that takes arguments of type T1, T2 and T3 and returns an R. + */ +public abstract class Function3<T1, T2, T3, R> extends WrappedFunction3<T1, T2, T3, R> + implements Serializable { + + public ClassManifest<R> returnType() { + return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class); + } +} + diff --git a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala b/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala new file mode 100644 index 0000000000..d314dbdf1d --- /dev/null +++ b/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.java.function + +import scala.runtime.AbstractFunction3 + +/** + * Subclass of Function3 for ease of calling from Java. The main thing it does is re-expose the + * apply() method as call() and declare that it can throw Exception (since AbstractFunction3.apply + * isn't marked to allow that). + */ +private[spark] abstract class WrappedFunction3[T1, T2, T3, R] + extends AbstractFunction3[T1, T2, T3, R] { + @throws(classOf[Exception]) + def call(t1: T1, t2: T2, t3: T3): R + + final def apply(t1: T1, t2: T2, t3: T3): R = call(t1, t2, t3) +} + |