aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-08-12 23:10:19 +0200
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-08-12 23:10:19 +0200
commit6ae3c375a9e8a97ff96649b3637e4a011d849990 (patch)
treed455d4435336f64d1cf0a2f5e540f4424e2d7628
parent0141879c400732242ca90c41ae44e85dfe546db8 (diff)
downloadspark-6ae3c375a9e8a97ff96649b3637e4a011d849990.tar.gz
spark-6ae3c375a9e8a97ff96649b3637e4a011d849990.tar.bz2
spark-6ae3c375a9e8a97ff96649b3637e4a011d849990.zip
Renamed apply() to call() in Java API and allowed it to throw Exceptions
-rw-r--r--core/src/main/scala/spark/api/java/function/DoubleFlatMapFunction.java6
-rw-r--r--core/src/main/scala/spark/api/java/function/DoubleFunction.java5
-rw-r--r--core/src/main/scala/spark/api/java/function/FlatMapFunction.scala3
-rw-r--r--core/src/main/scala/spark/api/java/function/Function.java4
-rw-r--r--core/src/main/scala/spark/api/java/function/Function2.java7
-rw-r--r--core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java10
-rw-r--r--core/src/main/scala/spark/api/java/function/PairFunction.java10
-rw-r--r--core/src/main/scala/spark/api/java/function/VoidFunction.scala5
-rw-r--r--core/src/main/scala/spark/api/java/function/WrappedFunction1.scala15
-rw-r--r--core/src/main/scala/spark/api/java/function/WrappedFunction2.scala15
-rw-r--r--core/src/test/scala/spark/JavaAPISuite.java42
-rw-r--r--examples/src/main/java/spark/examples/JavaHdfsLR.java9
-rw-r--r--examples/src/main/java/spark/examples/JavaTC.java16
-rw-r--r--examples/src/main/java/spark/examples/JavaWordCount.java18
-rw-r--r--examples/src/main/scala/spark/examples/SparkTC.scala3
15 files changed, 105 insertions, 63 deletions
diff --git a/core/src/main/scala/spark/api/java/function/DoubleFlatMapFunction.java b/core/src/main/scala/spark/api/java/function/DoubleFlatMapFunction.java
index 240747390c..7b6478c2cd 100644
--- a/core/src/main/scala/spark/api/java/function/DoubleFlatMapFunction.java
+++ b/core/src/main/scala/spark/api/java/function/DoubleFlatMapFunction.java
@@ -9,5 +9,9 @@ import java.io.Serializable;
// overloaded for both FlatMapFunction and DoubleFlatMapFunction.
public abstract class DoubleFlatMapFunction<T> extends AbstractFunction1<T, Iterable<Double>>
implements Serializable {
- public abstract Iterable<Double> apply(T t);
+
+ public abstract Iterable<Double> call(T t);
+
+ @Override
+ public final Iterable<Double> apply(T t) { return call(t); }
}
diff --git a/core/src/main/scala/spark/api/java/function/DoubleFunction.java b/core/src/main/scala/spark/api/java/function/DoubleFunction.java
index 378ffd427d..a03a72c835 100644
--- a/core/src/main/scala/spark/api/java/function/DoubleFunction.java
+++ b/core/src/main/scala/spark/api/java/function/DoubleFunction.java
@@ -7,7 +7,8 @@ import java.io.Serializable;
// DoubleFunction does not extend Function because some UDF functions, like map,
// are overloaded for both Function and DoubleFunction.
-public abstract class DoubleFunction<T> extends AbstractFunction1<T, Double>
+public abstract class DoubleFunction<T> extends WrappedFunction1<T, Double>
implements Serializable {
- public abstract Double apply(T t);
+
+ public abstract Double call(T t) throws Exception;
}
diff --git a/core/src/main/scala/spark/api/java/function/FlatMapFunction.scala b/core/src/main/scala/spark/api/java/function/FlatMapFunction.scala
index 1045e006a0..bcba38c569 100644
--- a/core/src/main/scala/spark/api/java/function/FlatMapFunction.scala
+++ b/core/src/main/scala/spark/api/java/function/FlatMapFunction.scala
@@ -1,7 +1,8 @@
package spark.api.java.function
abstract class FlatMapFunction[T, R] extends Function[T, java.lang.Iterable[R]] {
- def apply(x: T) : java.lang.Iterable[R]
+ @throws(classOf[Exception])
+ def call(x: T) : java.lang.Iterable[R]
def elementType() : ClassManifest[R] = ClassManifest.Any.asInstanceOf[ClassManifest[R]]
}
diff --git a/core/src/main/scala/spark/api/java/function/Function.java b/core/src/main/scala/spark/api/java/function/Function.java
index ad38b89f0f..f6f2e5fd76 100644
--- a/core/src/main/scala/spark/api/java/function/Function.java
+++ b/core/src/main/scala/spark/api/java/function/Function.java
@@ -11,8 +11,8 @@ import java.io.Serializable;
* Base class for functions whose return types do not have special RDDs; DoubleFunction is
* handled separately, to allow DoubleRDDs to be constructed when mapping RDDs to doubles.
*/
-public abstract class Function<T, R> extends AbstractFunction1<T, R> implements Serializable {
- public abstract R apply(T t);
+public abstract class Function<T, R> extends WrappedFunction1<T, R> implements Serializable {
+ public abstract R call(T t) throws Exception;
public ClassManifest<R> returnType() {
return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class);
diff --git a/core/src/main/scala/spark/api/java/function/Function2.java b/core/src/main/scala/spark/api/java/function/Function2.java
index 883804dfe4..be48b173b8 100644
--- a/core/src/main/scala/spark/api/java/function/Function2.java
+++ b/core/src/main/scala/spark/api/java/function/Function2.java
@@ -6,12 +6,13 @@ import scala.runtime.AbstractFunction2;
import java.io.Serializable;
-public abstract class Function2<T1, T2, R> extends AbstractFunction2<T1, T2, R>
+public abstract class Function2<T1, T2, R> extends WrappedFunction2<T1, T2, R>
implements Serializable {
+
+ public abstract R call(T1 t1, T2 t2) throws Exception;
+
public ClassManifest<R> returnType() {
return (ClassManifest<R>) ClassManifest$.MODULE$.fromClass(Object.class);
}
-
- public abstract R apply(T1 t1, T2 t2);
}
diff --git a/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java b/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java
index aead6c4e03..c074b9c717 100644
--- a/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java
+++ b/core/src/main/scala/spark/api/java/function/PairFlatMapFunction.java
@@ -9,8 +9,11 @@ import java.io.Serializable;
// PairFlatMapFunction does not extend FlatMapFunction because flatMap is
// overloaded for both FlatMapFunction and PairFlatMapFunction.
-public abstract class PairFlatMapFunction<T, K, V> extends AbstractFunction1<T, Iterable<Tuple2<K,
- V>>> implements Serializable {
+public abstract class PairFlatMapFunction<T, K, V>
+ extends WrappedFunction1<T, Iterable<Tuple2<K, V>>>
+ implements Serializable {
+
+ public abstract Iterable<Tuple2<K, V>> call(T t) throws Exception;
public ClassManifest<K> keyType() {
return (ClassManifest<K>) ClassManifest$.MODULE$.fromClass(Object.class);
@@ -19,7 +22,4 @@ public abstract class PairFlatMapFunction<T, K, V> extends AbstractFunction1<T,
public ClassManifest<V> valueType() {
return (ClassManifest<V>) ClassManifest$.MODULE$.fromClass(Object.class);
}
-
- public abstract Iterable<Tuple2<K, V>> apply(T t);
-
}
diff --git a/core/src/main/scala/spark/api/java/function/PairFunction.java b/core/src/main/scala/spark/api/java/function/PairFunction.java
index 3284bfb11e..7f5bb7de13 100644
--- a/core/src/main/scala/spark/api/java/function/PairFunction.java
+++ b/core/src/main/scala/spark/api/java/function/PairFunction.java
@@ -9,8 +9,11 @@ import java.io.Serializable;
// PairFunction does not extend Function because some UDF functions, like map,
// are overloaded for both Function and PairFunction.
-public abstract class PairFunction<T, K, V> extends AbstractFunction1<T, Tuple2<K,
- V>> implements Serializable {
+public abstract class PairFunction<T, K, V>
+ extends WrappedFunction1<T, Tuple2<K, V>>
+ implements Serializable {
+
+ public abstract Tuple2<K, V> call(T t) throws Exception;
public ClassManifest<K> keyType() {
return (ClassManifest<K>) ClassManifest$.MODULE$.fromClass(Object.class);
@@ -19,7 +22,4 @@ public abstract class PairFunction<T, K, V> extends AbstractFunction1<T, Tuple2<
public ClassManifest<V> valueType() {
return (ClassManifest<V>) ClassManifest$.MODULE$.fromClass(Object.class);
}
-
- public abstract Tuple2<K, V> apply(T t);
-
}
diff --git a/core/src/main/scala/spark/api/java/function/VoidFunction.scala b/core/src/main/scala/spark/api/java/function/VoidFunction.scala
index be4cbaff39..0eefe337e8 100644
--- a/core/src/main/scala/spark/api/java/function/VoidFunction.scala
+++ b/core/src/main/scala/spark/api/java/function/VoidFunction.scala
@@ -2,11 +2,12 @@ package spark.api.java.function
// This allows Java users to write void methods without having to return Unit.
abstract class VoidFunction[T] extends Serializable {
- def apply(t: T) : Unit
+ @throws(classOf[Exception])
+ def call(t: T) : Unit
}
// VoidFunction cannot extend AbstractFunction1 (because that would force users to explicitly
// return Unit), so it is implicitly converted to a Function1[T, Unit]:
object VoidFunction {
- implicit def toFunction[T](f: VoidFunction[T]) : Function1[T, Unit] = ((x : T) => f(x))
+ implicit def toFunction[T](f: VoidFunction[T]) : Function1[T, Unit] = ((x : T) => f.call(x))
} \ No newline at end of file
diff --git a/core/src/main/scala/spark/api/java/function/WrappedFunction1.scala b/core/src/main/scala/spark/api/java/function/WrappedFunction1.scala
new file mode 100644
index 0000000000..d08e1e9fbf
--- /dev/null
+++ b/core/src/main/scala/spark/api/java/function/WrappedFunction1.scala
@@ -0,0 +1,15 @@
+package spark.api.java.function
+
+import scala.runtime.AbstractFunction1
+
+/**
+ * Subclass of Function1 for ease of calling from Java. The main thing it does is re-expose the
+ * apply() method as call() and declare that it can throw Exception (since AbstractFunction1.apply
+ * isn't marked to allow that).
+ */
+abstract class WrappedFunction1[T, R] extends AbstractFunction1[T, R] {
+ @throws(classOf[Exception])
+ def call(t: T): R
+
+ final def apply(t: T): R = call(t)
+}
diff --git a/core/src/main/scala/spark/api/java/function/WrappedFunction2.scala b/core/src/main/scala/spark/api/java/function/WrappedFunction2.scala
new file mode 100644
index 0000000000..c9d67d9771
--- /dev/null
+++ b/core/src/main/scala/spark/api/java/function/WrappedFunction2.scala
@@ -0,0 +1,15 @@
+package spark.api.java.function
+
+import scala.runtime.AbstractFunction2
+
+/**
+ * Subclass of Function2 for ease of calling from Java. The main thing it does is re-expose the
+ * apply() method as call() and declare that it can throw Exception (since AbstractFunction2.apply
+ * isn't marked to allow that).
+ */
+abstract class WrappedFunction2[T1, T2, R] extends AbstractFunction2[T1, T2, R] {
+ @throws(classOf[Exception])
+ def call(t1: T1, t2: T2): R
+
+ final def apply(t1: T1, t2: T2): R = call(t1, t2)
+}
diff --git a/core/src/test/scala/spark/JavaAPISuite.java b/core/src/test/scala/spark/JavaAPISuite.java
index 5f0293e55b..24bf021710 100644
--- a/core/src/test/scala/spark/JavaAPISuite.java
+++ b/core/src/test/scala/spark/JavaAPISuite.java
@@ -117,7 +117,7 @@ public class JavaAPISuite implements Serializable {
JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
rdd.foreach(new VoidFunction<String>() {
@Override
- public void apply(String s) {
+ public void call(String s) {
System.out.println(s);
}
});
@@ -128,7 +128,7 @@ public class JavaAPISuite implements Serializable {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
Function<Integer, Boolean> isOdd = new Function<Integer, Boolean>() {
@Override
- public Boolean apply(Integer x) {
+ public Boolean call(Integer x) {
return x % 2 == 0;
}
};
@@ -166,7 +166,7 @@ public class JavaAPISuite implements Serializable {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
Function2<Integer, Integer, Integer> add = new Function2<Integer, Integer, Integer>() {
@Override
- public Integer apply(Integer a, Integer b) {
+ public Integer call(Integer a, Integer b) {
return a + b;
}
};
@@ -191,7 +191,7 @@ public class JavaAPISuite implements Serializable {
JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override
- public Integer apply(Integer a, Integer b) {
+ public Integer call(Integer a, Integer b) {
return a + b;
}
});
@@ -207,7 +207,7 @@ public class JavaAPISuite implements Serializable {
localCounts = rdd.reduceByKeyLocally(new Function2<Integer, Integer,
Integer>() {
@Override
- public Integer apply(Integer a, Integer b) {
+ public Integer call(Integer a, Integer b) {
return a + b;
}
});
@@ -252,7 +252,7 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(5, distinct.count());
JavaDoubleRDD filter = rdd.filter(new Function<Double, Boolean>() {
@Override
- public Boolean apply(Double x) {
+ public Boolean call(Double x) {
return x > 2.0;
}
});
@@ -279,19 +279,19 @@ public class JavaAPISuite implements Serializable {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
JavaDoubleRDD doubles = rdd.map(new DoubleFunction<Integer>() {
@Override
- public Double apply(Integer x) {
+ public Double call(Integer x) {
return 1.0 * x;
}
}).cache();
JavaPairRDD<Integer, Integer> pairs = rdd.map(new PairFunction<Integer, Integer, Integer>() {
@Override
- public Tuple2<Integer, Integer> apply(Integer x) {
+ public Tuple2<Integer, Integer> call(Integer x) {
return new Tuple2<Integer, Integer>(x, x);
}
}).cache();
JavaRDD<String> strings = rdd.map(new Function<Integer, String>() {
@Override
- public String apply(Integer x) {
+ public String call(Integer x) {
return x.toString();
}
}).cache();
@@ -303,7 +303,7 @@ public class JavaAPISuite implements Serializable {
"The quick brown fox jumps over the lazy dog."));
JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() {
@Override
- public Iterable<String> apply(String x) {
+ public Iterable<String> call(String x) {
return Arrays.asList(x.split(" "));
}
});
@@ -314,7 +314,7 @@ public class JavaAPISuite implements Serializable {
new PairFlatMapFunction<String, String, String>() {
@Override
- public Iterable<Tuple2<String, String>> apply(String s) {
+ public Iterable<Tuple2<String, String>> call(String s) {
List<Tuple2<String, String>> pairs = new LinkedList<Tuple2<String, String>>();
for (String word : s.split(" ")) pairs.add(new Tuple2<String, String>(word, word));
return pairs;
@@ -326,7 +326,7 @@ public class JavaAPISuite implements Serializable {
JavaDoubleRDD doubles = rdd.flatMap(new DoubleFlatMapFunction<String>() {
@Override
- public Iterable<Double> apply(String s) {
+ public Iterable<Double> call(String s) {
List<Double> lengths = new LinkedList<Double>();
for (String word : s.split(" ")) lengths.add(word.length() * 1.0);
return lengths;
@@ -343,7 +343,7 @@ public class JavaAPISuite implements Serializable {
JavaRDD<Integer> partitionSums = rdd.mapPartitions(
new FlatMapFunction<Iterator<Integer>, Integer>() {
@Override
- public Iterable<Integer> apply(Iterator<Integer> iter) {
+ public Iterable<Integer> call(Iterator<Integer> iter) {
int sum = 0;
while (iter.hasNext()) {
sum += iter.next();
@@ -417,7 +417,7 @@ public class JavaAPISuite implements Serializable {
rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
@Override
- public Tuple2<IntWritable, Text> apply(Tuple2<Integer, String> pair) {
+ public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
}
}).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
@@ -426,7 +426,7 @@ public class JavaAPISuite implements Serializable {
JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class,
Text.class).map(new PairFunction<Tuple2<IntWritable, Text>, Integer, String>() {
@Override
- public Tuple2<Integer, String> apply(Tuple2<IntWritable, Text> pair) {
+ public Tuple2<Integer, String> call(Tuple2<IntWritable, Text> pair) {
return new Tuple2<Integer, String>(pair._1().get(), pair._2().toString());
}
});
@@ -446,7 +446,7 @@ public class JavaAPISuite implements Serializable {
rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
@Override
- public Tuple2<IntWritable, Text> apply(Tuple2<Integer, String> pair) {
+ public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
}
}).saveAsNewAPIHadoopFile(outputDir, IntWritable.class, Text.class,
@@ -457,7 +457,7 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>,
String>() {
@Override
- public String apply(Tuple2<IntWritable, Text> x) {
+ public String call(Tuple2<IntWritable, Text> x) {
return x.toString();
}
}).collect().toString());
@@ -476,7 +476,7 @@ public class JavaAPISuite implements Serializable {
rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
@Override
- public Tuple2<IntWritable, Text> apply(Tuple2<Integer, String> pair) {
+ public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
}
}).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
@@ -487,7 +487,7 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>,
String>() {
@Override
- public String apply(Tuple2<IntWritable, Text> x) {
+ public String call(Tuple2<IntWritable, Text> x) {
return x.toString();
}
}).collect().toString());
@@ -534,7 +534,7 @@ public class JavaAPISuite implements Serializable {
rdd.map(new PairFunction<Tuple2<Integer, String>, IntWritable, Text>() {
@Override
- public Tuple2<IntWritable, Text> apply(Tuple2<Integer, String> pair) {
+ public Tuple2<IntWritable, Text> call(Tuple2<Integer, String> pair) {
return new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2()));
}
}).saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
@@ -544,7 +544,7 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(pairs.toString(), output.map(new Function<Tuple2<IntWritable, Text>,
String>() {
@Override
- public String apply(Tuple2<IntWritable, Text> x) {
+ public String call(Tuple2<IntWritable, Text> x) {
return x.toString();
}
}).collect().toString());
diff --git a/examples/src/main/java/spark/examples/JavaHdfsLR.java b/examples/src/main/java/spark/examples/JavaHdfsLR.java
index c7a6b4405a..71fc13fbce 100644
--- a/examples/src/main/java/spark/examples/JavaHdfsLR.java
+++ b/examples/src/main/java/spark/examples/JavaHdfsLR.java
@@ -26,8 +26,7 @@ public class JavaHdfsLR {
}
static class ParsePoint extends Function<String, DataPoint> {
-
- public DataPoint apply(String line) {
+ public DataPoint call(String line) {
StringTokenizer tok = new StringTokenizer(line, " ");
double y = Double.parseDouble(tok.nextToken());
double[] x = new double[D];
@@ -41,8 +40,7 @@ public class JavaHdfsLR {
}
static class VectorSum extends Function2<double[], double[], double[]> {
-
- public double[] apply(double[] a, double[] b) {
+ public double[] call(double[] a, double[] b) {
double[] result = new double[D];
for (int j = 0; j < D; j++) {
result[j] = a[j] + b[j];
@@ -52,14 +50,13 @@ public class JavaHdfsLR {
}
static class ComputeGradient extends Function<DataPoint, double[]> {
-
double[] weights;
public ComputeGradient(double[] weights) {
this.weights = weights;
}
- public double[] apply(DataPoint p) {
+ public double[] call(DataPoint p) {
double[] gradient = new double[D];
for (int i = 0; i < D; i++) {
double dot = dot(weights, p.x);
diff --git a/examples/src/main/java/spark/examples/JavaTC.java b/examples/src/main/java/spark/examples/JavaTC.java
index e6ca69ff97..25a465ec8e 100644
--- a/examples/src/main/java/spark/examples/JavaTC.java
+++ b/examples/src/main/java/spark/examples/JavaTC.java
@@ -11,6 +11,9 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
+/**
+ * Transitive closure on a graph, implemented in Java.
+ */
public class JavaTC {
static int numEdges = 200;
@@ -32,7 +35,7 @@ public class JavaTC {
Integer, Integer> {
static ProjectFn INSTANCE = new ProjectFn();
- public Tuple2<Integer, Integer> apply(Tuple2<Integer, Tuple2<Integer, Integer>> triple) {
+ public Tuple2<Integer, Integer> call(Tuple2<Integer, Tuple2<Integer, Integer>> triple) {
return new Tuple2<Integer, Integer>(triple._2()._2(), triple._2()._1());
}
}
@@ -53,12 +56,11 @@ public class JavaTC {
// the graph to obtain the path (x, z).
// Because join() joins on keys, the edges are stored in reversed order.
- JavaPairRDD<Integer, Integer> edges = tc.map(new PairFunction<Tuple2<Integer, Integer>,
- Integer, Integer>() {
- @Override
- public Tuple2<Integer, Integer> apply(Tuple2<Integer, Integer> e) {
- return new Tuple2<Integer, Integer>(e._2(), e._1());
- }
+ JavaPairRDD<Integer, Integer> edges = tc.map(
+ new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
+ public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> e) {
+ return new Tuple2<Integer, Integer>(e._2(), e._1());
+ }
});
long oldCount = 0;
diff --git a/examples/src/main/java/spark/examples/JavaWordCount.java b/examples/src/main/java/spark/examples/JavaWordCount.java
index fb2feec09d..a44cf8a120 100644
--- a/examples/src/main/java/spark/examples/JavaWordCount.java
+++ b/examples/src/main/java/spark/examples/JavaWordCount.java
@@ -12,9 +12,7 @@ import java.util.Arrays;
import java.util.List;
public class JavaWordCount {
-
public static void main(String[] args) throws Exception {
-
if (args.length < 2) {
System.err.println("Usage: JavaWordCount <master> <file>");
System.exit(1);
@@ -23,16 +21,20 @@ public class JavaWordCount {
JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount");
JavaRDD<String> lines = ctx.textFile(args[1], 1);
- JavaPairRDD<String, Integer> counts = lines.flatMap(new FlatMapFunction<String, String>() {
- public Iterable<String> apply(String s) {
+ JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
+ public Iterable<String> call(String s) {
return Arrays.asList(s.split(" "));
}
- }).map(new PairFunction<String, String, Integer>() {
- public Tuple2<String, Integer> apply(String s) {
+ });
+
+ JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() {
+ public Tuple2<String, Integer> call(String s) {
return new Tuple2(s, 1);
}
- }).reduceByKey(new Function2<Integer, Integer, Integer>() {
- public Integer apply(Integer i1, Integer i2) {
+ });
+
+ JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
+ public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
diff --git a/examples/src/main/scala/spark/examples/SparkTC.scala b/examples/src/main/scala/spark/examples/SparkTC.scala
index a095476a23..a6e4de4671 100644
--- a/examples/src/main/scala/spark/examples/SparkTC.scala
+++ b/examples/src/main/scala/spark/examples/SparkTC.scala
@@ -5,6 +5,9 @@ import SparkContext._
import scala.util.Random
import scala.collection.mutable
+/**
+ * Transitive closure on a graph.
+ */
object SparkTC {
val numEdges = 200