diff options
Diffstat (limited to 'core/src/main')
21 files changed, 222 insertions, 335 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/function/DoubleFlatMapFunction.scala b/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java index 7500a89436..57fd0a7a80 100644 --- a/core/src/main/scala/org/apache/spark/api/java/function/DoubleFlatMapFunction.scala +++ b/core/src/main/java/org/apache/spark/api/java/function/DoubleFlatMapFunction.java @@ -15,16 +15,13 @@ * limitations under the License. */ -package org.apache.spark.api.java.function +package org.apache.spark.api.java.function; -import java.lang.{Double => JDouble, Iterable => JIterable} +import java.io.Serializable; /** * A function that returns zero or more records of type Double from each input record. */ -// DoubleFlatMapFunction does not extend FlatMapFunction because flatMap is -// overloaded for both FlatMapFunction and DoubleFlatMapFunction. -abstract class DoubleFlatMapFunction[T] extends WrappedFunction1[T, JIterable[JDouble]] - with Serializable { - // Intentionally left blank +public interface DoubleFlatMapFunction<T> extends Serializable { + public Iterable<Double> call(T t) throws Exception; } diff --git a/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java b/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java new file mode 100644 index 0000000000..150144e0e4 --- /dev/null +++ b/core/src/main/java/org/apache/spark/api/java/function/DoubleFunction.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.java.function; + +import java.io.Serializable; + +/** + * A function that returns Doubles, and can be used to construct DoubleRDDs. + */ +public interface DoubleFunction<T> extends Serializable { + public double call(T t) throws Exception; +} diff --git a/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java index bdb01f7670..fa75842047 100644 --- a/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction.scala +++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction.java @@ -15,13 +15,13 @@ * limitations under the License. */ -package org.apache.spark.api.java.function +package org.apache.spark.api.java.function; -import scala.reflect.ClassTag +import java.io.Serializable; /** * A function that returns zero or more output records from each input record. */ -abstract class FlatMapFunction[T, R] extends Function[T, java.lang.Iterable[R]] { - def elementType(): ClassTag[R] = ClassTag.Any.asInstanceOf[ClassTag[R]] -} +public interface FlatMapFunction<T, R> extends Serializable { + public Iterable<R> call(T t) throws Exception; +}
\ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java index aae1349c5e..d1fdec0724 100644 --- a/core/src/main/scala/org/apache/spark/api/java/function/FlatMapFunction2.scala +++ b/core/src/main/java/org/apache/spark/api/java/function/FlatMapFunction2.java @@ -15,13 +15,13 @@ * limitations under the License. */ -package org.apache.spark.api.java.function +package org.apache.spark.api.java.function; -import scala.reflect.ClassTag +import java.io.Serializable; /** * A function that takes two inputs and returns zero or more output records. */ -abstract class FlatMapFunction2[A, B, C] extends Function2[A, B, java.lang.Iterable[C]] { - def elementType() : ClassTag[C] = ClassTag.Any.asInstanceOf[ClassTag[C]] -} +public interface FlatMapFunction2<T1, T2, R> extends Serializable { + public Iterable<R> call(T1 t1, T2 t2) throws Exception; +}
\ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function.scala b/core/src/main/java/org/apache/spark/api/java/function/Function.java index a5e1701f77..d00551bb0a 100644 --- a/core/src/main/scala/org/apache/spark/api/java/function/Function.scala +++ b/core/src/main/java/org/apache/spark/api/java/function/Function.java @@ -15,17 +15,15 @@ * limitations under the License. */ -package org.apache.spark.api.java.function +package org.apache.spark.api.java.function; -import scala.reflect.ClassTag -import org.apache.spark.api.java.JavaSparkContext +import java.io.Serializable; /** - * Base class for functions whose return types do not create special RDDs. PairFunction and + * Base interface for functions whose return types do not create special RDDs. PairFunction and * DoubleFunction are handled separately, to allow PairRDDs and DoubleRDDs to be constructed * when mapping RDDs of other types. */ -abstract class Function[T, R] extends WrappedFunction1[T, R] with Serializable { - def returnType(): ClassTag[R] = JavaSparkContext.fakeClassTag +public interface Function<T1, R> extends Serializable { + public R call(T1 v1) throws Exception; } - diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function2.scala b/core/src/main/java/org/apache/spark/api/java/function/Function2.java index fa3616cbcb..793caaa61a 100644 --- a/core/src/main/scala/org/apache/spark/api/java/function/Function2.scala +++ b/core/src/main/java/org/apache/spark/api/java/function/Function2.java @@ -15,15 +15,13 @@ * limitations under the License. */ -package org.apache.spark.api.java.function +package org.apache.spark.api.java.function; -import scala.reflect.ClassTag -import org.apache.spark.api.java.JavaSparkContext +import java.io.Serializable; /** * A two-argument function that takes arguments of type T1 and T2 and returns an R. */ -abstract class Function2[T1, T2, R] extends WrappedFunction2[T1, T2, R] with Serializable { - def returnType(): ClassTag[R] = JavaSparkContext.fakeClassTag +public interface Function2<T1, T2, R> extends Serializable { + public R call(T1 v1, T2 v2) throws Exception; } - diff --git a/core/src/main/scala/org/apache/spark/api/java/function/Function3.scala b/core/src/main/java/org/apache/spark/api/java/function/Function3.java index 45152891e9..b4151c3417 100644 --- a/core/src/main/scala/org/apache/spark/api/java/function/Function3.scala +++ b/core/src/main/java/org/apache/spark/api/java/function/Function3.java @@ -15,14 +15,13 @@ * limitations under the License. */ -package org.apache.spark.api.java.function +package org.apache.spark.api.java.function; -import org.apache.spark.api.java.JavaSparkContext -import scala.reflect.ClassTag +import java.io.Serializable; /** * A three-argument function that takes arguments of type T1, T2 and T3 and returns an R. */ -abstract class Function3[T1, T2, T3, R] extends WrappedFunction3[T1, T2, T3, R] with Serializable { - def returnType(): ClassTag[R] = JavaSparkContext.fakeClassTag +public interface Function3<T1, T2, T3, R> extends Serializable { + public R call(T1 v1, T2 v2, T3 v3) throws Exception; } diff --git a/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java b/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java new file mode 100644 index 0000000000..691ef2eceb --- /dev/null +++ b/core/src/main/java/org/apache/spark/api/java/function/PairFlatMapFunction.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.java.function; + +import java.io.Serializable; + +import scala.Tuple2; + +/** + * A function that returns zero or more key-value pair records from each input record. The + * key-value pairs are represented as scala.Tuple2 objects. + */ +public interface PairFlatMapFunction<T, K, V> extends Serializable { + public Iterable<Tuple2<K, V>> call(T t) throws Exception; +} diff --git a/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java b/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java new file mode 100644 index 0000000000..abd9bcc07a --- /dev/null +++ b/core/src/main/java/org/apache/spark/api/java/function/PairFunction.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.java.function; + +import java.io.Serializable; + +import scala.Tuple2; + +/** + * A function that returns key-value pairs (Tuple2<K, V>), and can be used to construct PairRDDs. + */ +public interface PairFunction<T, K, V> extends Serializable { + public Tuple2<K, V> call(T t) throws Exception; +} diff --git a/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java new file mode 100644 index 0000000000..2a10435b75 --- /dev/null +++ b/core/src/main/java/org/apache/spark/api/java/function/VoidFunction.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.java.function; + +import java.io.Serializable; + +/** + * A function with no return value. + */ +public interface VoidFunction<T> extends Serializable { + public void call(T t) throws Exception; +} diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala index 071044463d..d1787061bc 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaDoubleRDD.scala @@ -83,7 +83,7 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[JDouble, Ja * Return a new RDD containing only the elements that satisfy a predicate. */ def filter(f: JFunction[JDouble, java.lang.Boolean]): JavaDoubleRDD = - fromRDD(srdd.filter(x => f(x).booleanValue())) + fromRDD(srdd.filter(x => f.call(x).booleanValue())) /** * Return a new RDD that is reduced into `numPartitions` partitions. diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 3f672900cb..857626fe84 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -32,7 +32,7 @@ import org.apache.spark.{HashPartitioner, Partitioner} import org.apache.spark.Partitioner._ import org.apache.spark.SparkContext.rddToPairRDDFunctions import org.apache.spark.api.java.JavaSparkContext.fakeClassTag -import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2} +import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, PairFunction} import org.apache.spark.partial.{BoundedDouble, PartialResult} import org.apache.spark.rdd.{OrderedRDDFunctions, RDD} import org.apache.spark.storage.StorageLevel @@ -89,7 +89,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * Return a new RDD containing only the elements that satisfy a predicate. */ def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairRDD[K, V] = - new JavaPairRDD[K, V](rdd.filter(x => f(x).booleanValue())) + new JavaPairRDD[K, V](rdd.filter(x => f.call(x).booleanValue())) /** * Return a new RDD that is reduced into `numPartitions` partitions. @@ -165,9 +165,9 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * Simplified version of combineByKey that hash-partitions the output RDD. */ def combineByKey[C](createCombiner: JFunction[V, C], - mergeValue: JFunction2[C, V, C], - mergeCombiners: JFunction2[C, C, C], - numPartitions: Int): JavaPairRDD[K, C] = + mergeValue: JFunction2[C, V, C], + mergeCombiners: JFunction2[C, C, C], + numPartitions: Int): JavaPairRDD[K, C] = combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions)) /** @@ -442,7 +442,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) */ def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairRDD[K, U] = { import scala.collection.JavaConverters._ - def fn = (x: V) => f.apply(x).asScala + def fn = (x: V) => f.call(x).asScala implicit val ctag: ClassTag[U] = fakeClassTag fromRDD(rdd.flatMapValues(fn)) } @@ -511,49 +511,49 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) /** Output the RDD to any Hadoop-supported file system. */ def saveAsHadoopFile[F <: OutputFormat[_, _]]( - path: String, - keyClass: Class[_], - valueClass: Class[_], - outputFormatClass: Class[F], - conf: JobConf) { + path: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[F], + conf: JobConf) { rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, conf) } /** Output the RDD to any Hadoop-supported file system. */ def saveAsHadoopFile[F <: OutputFormat[_, _]]( - path: String, - keyClass: Class[_], - valueClass: Class[_], - outputFormatClass: Class[F]) { + path: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[F]) { rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass) } /** Output the RDD to any Hadoop-supported file system, compressing with the supplied codec. */ def saveAsHadoopFile[F <: OutputFormat[_, _]]( - path: String, - keyClass: Class[_], - valueClass: Class[_], - outputFormatClass: Class[F], - codec: Class[_ <: CompressionCodec]) { + path: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[F], + codec: Class[_ <: CompressionCodec]) { rdd.saveAsHadoopFile(path, keyClass, valueClass, outputFormatClass, codec) } /** Output the RDD to any Hadoop-supported file system. */ def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]]( - path: String, - keyClass: Class[_], - valueClass: Class[_], - outputFormatClass: Class[F], - conf: Configuration) { + path: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[F], + conf: Configuration) { rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, conf) } /** Output the RDD to any Hadoop-supported file system. */ def saveAsNewAPIHadoopFile[F <: NewOutputFormat[_, _]]( - path: String, - keyClass: Class[_], - valueClass: Class[_], - outputFormatClass: Class[F]) { + path: String, + keyClass: Class[_], + valueClass: Class[_], + outputFormatClass: Class[F]) { rdd.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass) } @@ -700,6 +700,15 @@ object JavaPairRDD { implicit def toRDD[K, V](rdd: JavaPairRDD[K, V]): RDD[(K, V)] = rdd.rdd + private[spark] + implicit def toScalaFunction2[T1, T2, R](fun: JFunction2[T1, T2, R]): Function2[T1, T2, R] = { + (x: T1, x1: T2) => fun.call(x, x1) + } + + private[spark] implicit def toScalaFunction[T, R](fun: JFunction[T, R]): T => R = x => fun.call(x) + + private[spark] + implicit def pairFunToScalaFun[A, B, C](x: PairFunction[A, B, C]): A => (B, C) = y => x.call(y) /** Convert a JavaRDD of key-value pairs to JavaPairRDD. */ def fromJavaRDD[K, V](rdd: JavaRDD[(K, V)]): JavaPairRDD[K, V] = { diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala index d7ce8fdfc2..e973c46edd 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala @@ -70,7 +70,7 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) * Return a new RDD containing only the elements that satisfy a predicate. */ def filter(f: JFunction[T, java.lang.Boolean]): JavaRDD[T] = - wrapRDD(rdd.filter((x => f(x).booleanValue()))) + wrapRDD(rdd.filter((x => f.call(x).booleanValue()))) /** * Return a new RDD that is reduced into `numPartitions` partitions. diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 729668fb67..af0114bee3 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -67,7 +67,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Return a new RDD by applying a function to all elements of this RDD. */ def map[R](f: JFunction[T, R]): JavaRDD[R] = - new JavaRDD(rdd.map(f)(f.returnType()))(f.returnType()) + new JavaRDD(rdd.map(f)(fakeClassTag))(fakeClassTag) /** * Return a new RDD by applying a function to each partition of this RDD, while tracking the index @@ -82,15 +82,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Return a new RDD by applying a function to all elements of this RDD. */ - def map[R](f: DoubleFunction[T]): JavaDoubleRDD = - new JavaDoubleRDD(rdd.map(x => f(x).doubleValue())) + def mapToDouble[R](f: DoubleFunction[T]): JavaDoubleRDD = { + new JavaDoubleRDD(rdd.map(x => f.call(x).doubleValue())) + } /** * Return a new RDD by applying a function to all elements of this RDD. */ - def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = { - val ctag = implicitly[ClassTag[Tuple2[K2, V2]]] - new JavaPairRDD(rdd.map(f)(ctag))(f.keyType(), f.valueType()) + def mapToPair[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = { + def cm = implicitly[ClassTag[(K2, V2)]] + new JavaPairRDD(rdd.map[(K2, V2)](f)(cm))(fakeClassTag[K2], fakeClassTag[V2]) } /** @@ -99,17 +100,17 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { */ def flatMap[U](f: FlatMapFunction[T, U]): JavaRDD[U] = { import scala.collection.JavaConverters._ - def fn = (x: T) => f.apply(x).asScala - JavaRDD.fromRDD(rdd.flatMap(fn)(f.elementType()))(f.elementType()) + def fn = (x: T) => f.call(x).asScala + JavaRDD.fromRDD(rdd.flatMap(fn)(fakeClassTag[U]))(fakeClassTag[U]) } /** * Return a new RDD by first applying a function to all elements of this * RDD, and then flattening the results. */ - def flatMap(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = { + def flatMapToDouble(f: DoubleFlatMapFunction[T]): JavaDoubleRDD = { import scala.collection.JavaConverters._ - def fn = (x: T) => f.apply(x).asScala + def fn = (x: T) => f.call(x).asScala new JavaDoubleRDD(rdd.flatMap(fn).map((x: java.lang.Double) => x.doubleValue())) } @@ -117,19 +118,19 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Return a new RDD by first applying a function to all elements of this * RDD, and then flattening the results. */ - def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = { + def flatMapToPair[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = { import scala.collection.JavaConverters._ - def fn = (x: T) => f.apply(x).asScala - val ctag = implicitly[ClassTag[Tuple2[K2, V2]]] - JavaPairRDD.fromRDD(rdd.flatMap(fn)(ctag))(f.keyType(), f.valueType()) + def fn = (x: T) => f.call(x).asScala + def cm = implicitly[ClassTag[(K2, V2)]] + JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(fakeClassTag[K2], fakeClassTag[V2]) } /** * Return a new RDD by applying a function to each partition of this RDD. */ def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] = { - def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) - JavaRDD.fromRDD(rdd.mapPartitions(fn)(f.elementType()))(f.elementType()) + def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator()) + JavaRDD.fromRDD(rdd.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U]) } /** @@ -137,52 +138,53 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { */ def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U], preservesPartitioning: Boolean): JavaRDD[U] = { - def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) - JavaRDD.fromRDD(rdd.mapPartitions(fn, preservesPartitioning)(f.elementType()))(f.elementType()) + def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator()) + JavaRDD.fromRDD( + rdd.mapPartitions(fn, preservesPartitioning)(fakeClassTag[U]))(fakeClassTag[U]) } /** - * Return a new RDD by applying a function to each partition of this RDD. + * Return a new RDD by applying a function to each partition of this RDD. */ - def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = { - def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) + def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = { + def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator()) new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: java.lang.Double) => x.doubleValue())) } /** - * Return a new RDD by applying a function to each partition of this RDD. + * Return a new RDD by applying a function to each partition of this RDD. */ - def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]): + def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]): JavaPairRDD[K2, V2] = { - def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) - JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(f.keyType(), f.valueType()) + def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator()) + JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2]) } - /** * Return a new RDD by applying a function to each partition of this RDD. */ - def mapPartitions(f: DoubleFlatMapFunction[java.util.Iterator[T]], - preservesPartitioning: Boolean): JavaDoubleRDD = { - def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) + def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]], + preservesPartitioning: Boolean): JavaDoubleRDD = { + def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator()) new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning) - .map((x: java.lang.Double) => x.doubleValue())) + .map(x => x.doubleValue())) } /** * Return a new RDD by applying a function to each partition of this RDD. */ - def mapPartitions[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2], + def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2], preservesPartitioning: Boolean): JavaPairRDD[K2, V2] = { - def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator()) - JavaPairRDD.fromRDD(rdd.mapPartitions(fn, preservesPartitioning))(f.keyType(), f.valueType()) + def fn = (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator()) + JavaPairRDD.fromRDD( + rdd.mapPartitions(fn, preservesPartitioning))(fakeClassTag[K2], fakeClassTag[V2]) } /** * Applies a function f to each partition of this RDD. */ def foreachPartition(f: VoidFunction[java.util.Iterator[T]]) { - rdd.foreachPartition((x => f(asJavaIterator(x)))) + rdd.foreachPartition((x => f.call(asJavaIterator(x)))) } /** @@ -205,7 +207,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JList[T]] = { implicit val ctagK: ClassTag[K] = fakeClassTag implicit val ctagV: ClassTag[JList[T]] = fakeClassTag - JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(f.returnType))) + JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(fakeClassTag))) } /** @@ -215,7 +217,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JList[T]] = { implicit val ctagK: ClassTag[K] = fakeClassTag implicit val ctagV: ClassTag[JList[T]] = fakeClassTag - JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(f.returnType))) + JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(fakeClassTag[K]))) } /** @@ -255,9 +257,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { other: JavaRDDLike[U, _], f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V] = { def fn = (x: Iterator[T], y: Iterator[U]) => asScalaIterator( - f.apply(asJavaIterator(x), asJavaIterator(y)).iterator()) + f.call(asJavaIterator(x), asJavaIterator(y)).iterator()) JavaRDD.fromRDD( - rdd.zipPartitions(other.rdd)(fn)(other.classTag, f.elementType()))(f.elementType()) + rdd.zipPartitions(other.rdd)(fn)(other.classTag, fakeClassTag[V]))(fakeClassTag[V]) } // Actions (launch a job to return a value to the user program) @@ -266,7 +268,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Applies a function f to all elements of this RDD. */ def foreach(f: VoidFunction[T]) { - val cleanF = rdd.context.clean(f) + val cleanF = rdd.context.clean((x: T) => f.call(x)) rdd.foreach(cleanF) } @@ -320,7 +322,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { */ def aggregate[U](zeroValue: U)(seqOp: JFunction2[U, T, U], combOp: JFunction2[U, U, U]): U = - rdd.aggregate(zeroValue)(seqOp, combOp)(seqOp.returnType) + rdd.aggregate(zeroValue)(seqOp, combOp)(fakeClassTag[U]) /** * Return the number of elements in the RDD. diff --git a/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.scala deleted file mode 100644 index 2cdf2e92c3..0000000000 --- a/core/src/main/scala/org/apache/spark/api/java/function/DoubleFunction.scala +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.api.java.function - -import java.lang.{Double => JDouble} - -/** - * A function that returns Doubles, and can be used to construct DoubleRDDs. - */ -// DoubleFunction does not extend Function because some UDF functions, like map, -// are overloaded for both Function and DoubleFunction. -abstract class DoubleFunction[T] extends WrappedFunction1[T, JDouble] with Serializable { - // Intentionally left blank -} diff --git a/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.scala deleted file mode 100644 index 8467bbb892..0000000000 --- a/core/src/main/scala/org/apache/spark/api/java/function/PairFlatMapFunction.scala +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.api.java.function - -import java.lang.{Iterable => JIterable} -import org.apache.spark.api.java.JavaSparkContext -import scala.reflect.ClassTag - -/** - * A function that returns zero or more key-value pair records from each input record. The - * key-value pairs are represented as scala.Tuple2 objects. - */ -// PairFlatMapFunction does not extend FlatMapFunction because flatMap is -// overloaded for both FlatMapFunction and PairFlatMapFunction. -abstract class PairFlatMapFunction[T, K, V] extends WrappedFunction1[T, JIterable[(K, V)]] - with Serializable { - - def keyType(): ClassTag[K] = JavaSparkContext.fakeClassTag - - def valueType(): ClassTag[V] = JavaSparkContext.fakeClassTag -} diff --git a/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.scala deleted file mode 100644 index d0ba0b6307..0000000000 --- a/core/src/main/scala/org/apache/spark/api/java/function/PairFunction.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.api.java.function - -import scala.reflect.ClassTag -import org.apache.spark.api.java.JavaSparkContext - -/** - * A function that returns key-value pairs (Tuple2<K, V>), and can be used to construct PairRDDs. - */ -// PairFunction does not extend Function because some UDF functions, like map, -// are overloaded for both Function and PairFunction. -abstract class PairFunction[T, K, V] extends WrappedFunction1[T, (K, V)] with Serializable { - - def keyType(): ClassTag[K] = JavaSparkContext.fakeClassTag - - def valueType(): ClassTag[V] = JavaSparkContext.fakeClassTag -} diff --git a/core/src/main/scala/org/apache/spark/api/java/function/VoidFunction.scala b/core/src/main/scala/org/apache/spark/api/java/function/VoidFunction.scala deleted file mode 100644 index ea94313a4a..0000000000 --- a/core/src/main/scala/org/apache/spark/api/java/function/VoidFunction.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.api.java.function - -/** - * A function with no return value. - */ -// This allows Java users to write void methods without having to return Unit. -abstract class VoidFunction[T] extends Serializable { - @throws(classOf[Exception]) - def call(t: T) : Unit -} - -// VoidFunction cannot extend AbstractFunction1 (because that would force users to explicitly -// return Unit), so it is implicitly converted to a Function1[T, Unit]: -object VoidFunction { - implicit def toFunction[T](f: VoidFunction[T]) : Function1[T, Unit] = ((x : T) => f.call(x)) -} diff --git a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction1.scala b/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction1.scala deleted file mode 100644 index cfe694f65d..0000000000 --- a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction1.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.api.java.function - -import scala.runtime.AbstractFunction1 - -/** - * Subclass of Function1 for ease of calling from Java. The main thing it does is re-expose the - * apply() method as call() and declare that it can throw Exception (since AbstractFunction1.apply - * isn't marked to allow that). - */ -private[spark] abstract class WrappedFunction1[T, R] extends AbstractFunction1[T, R] { - @throws(classOf[Exception]) - def call(t: T): R - - final def apply(t: T): R = call(t) -} diff --git a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction2.scala b/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction2.scala deleted file mode 100644 index eb9277c6fb..0000000000 --- a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction2.scala +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.api.java.function - -import scala.runtime.AbstractFunction2 - -/** - * Subclass of Function2 for ease of calling from Java. The main thing it does is re-expose the - * apply() method as call() and declare that it can throw Exception (since AbstractFunction2.apply - * isn't marked to allow that). - */ -private[spark] abstract class WrappedFunction2[T1, T2, R] extends AbstractFunction2[T1, T2, R] { - @throws(classOf[Exception]) - def call(t1: T1, t2: T2): R - - final def apply(t1: T1, t2: T2): R = call(t1, t2) -} diff --git a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala b/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala deleted file mode 100644 index d314dbdf1d..0000000000 --- a/core/src/main/scala/org/apache/spark/api/java/function/WrappedFunction3.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.api.java.function - -import scala.runtime.AbstractFunction3 - -/** - * Subclass of Function3 for ease of calling from Java. The main thing it does is re-expose the - * apply() method as call() and declare that it can throw Exception (since AbstractFunction3.apply - * isn't marked to allow that). - */ -private[spark] abstract class WrappedFunction3[T1, T2, T3, R] - extends AbstractFunction3[T1, T2, T3, R] { - @throws(classOf[Exception]) - def call(t1: T1, t2: T2, t3: T3): R - - final def apply(t1: T1, t2: T2, t3: T3): R = call(t1, t2, t3) -} - |