aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorJosh Rosen <rosenville@gmail.com>2012-07-18 17:32:31 -0700
committerJosh Rosen <rosenville@gmail.com>2012-07-18 17:34:29 -0700
commit01dce3f569e0085dae2d0e4bc5c9b2bef5bd3120 (patch)
tree89882e8acda5c8d3cbfecc9bd2aa892ebc496b10 /examples
parent628bb5ca7f563ab7f11c373572145df403de6fef (diff)
downloadspark-01dce3f569e0085dae2d0e4bc5c9b2bef5bd3120.tar.gz
spark-01dce3f569e0085dae2d0e4bc5c9b2bef5bd3120.tar.bz2
spark-01dce3f569e0085dae2d0e4bc5c9b2bef5bd3120.zip
Add Java API
Add distinct() method to RDD. Fix bug in DoubleRDDFunctions.
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/java/spark/examples/JavaLR.java127
-rw-r--r--examples/src/main/java/spark/examples/JavaTC.java76
-rw-r--r--examples/src/main/java/spark/examples/JavaTest.java38
-rw-r--r--examples/src/main/java/spark/examples/JavaWordCount.java61
-rw-r--r--examples/src/main/scala/spark/examples/SparkTC.scala53
5 files changed, 355 insertions, 0 deletions
diff --git a/examples/src/main/java/spark/examples/JavaLR.java b/examples/src/main/java/spark/examples/JavaLR.java
new file mode 100644
index 0000000000..cb6abfad5b
--- /dev/null
+++ b/examples/src/main/java/spark/examples/JavaLR.java
@@ -0,0 +1,127 @@
+package spark.examples;
+
+import scala.util.Random;
+import spark.api.java.JavaSparkContext;
+import spark.api.java.function.Function;
+import spark.api.java.function.Function2;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+public class JavaLR {
+
+ static int N = 10000; // Number of data points
+ static int D = 10; // Number of dimensions
+ static double R = 0.7; // Scaling factor
+ static int ITERATIONS = 5;
+ static Random rand = new Random(42);
+
+ static class DataPoint implements Serializable {
+ public DataPoint(double[] x, int y) {
+ this.x = x;
+ this.y = y;
+ }
+ double[] x;
+ int y;
+ }
+
+ static DataPoint generatePoint(int i) {
+ int y = (i % 2 == 0) ? -1 : 1;
+ double[] x = new double[D];
+ for (int j = 0; j < D; j++) {
+ x[j] = rand.nextGaussian() + y * R;
+ }
+ return new DataPoint(x, y);
+ }
+
+ static List<DataPoint> generateData() {
+ List<DataPoint> points = new ArrayList<DataPoint>(N);
+ for (int i = 0; i < N; i++) {
+ points.add(generatePoint(i));
+ }
+ return points;
+ }
+
+ static class VectorSum extends Function2<double[], double[], double[]> {
+
+ @Override
+ public double[] apply(double[] a, double[] b) {
+ double[] result = new double[D];
+ for (int j = 0; j < D; j++) {
+ result[j] = a[j] + b[j];
+ }
+ return result;
+ }
+ }
+
+ static class ComputeGradient extends Function<DataPoint, double[]> {
+
+ double[] weights;
+
+ public ComputeGradient(double[] weights) {
+ this.weights = weights;
+ }
+
+ @Override
+ public double[] apply(DataPoint p) {
+ double[] gradient = new double[D];
+ for (int i = 0; i < D; i++) {
+ double dot = dot(weights, p.x);
+ gradient[i] = (1 / (1 + Math.exp(-p.y * dot)) - 1) * p.y * p.x[i];
+ }
+ return gradient;
+ }
+ }
+
+ public static double dot(double[] a, double[] b) {
+ double x = 0;
+ for (int i = 0; i < D; i++) {
+ x += a[i] * b[i];
+ }
+ return x;
+ }
+
+ public static void printWeights(double[] a) {
+ System.out.println(Arrays.toString(a));
+ }
+
+ public static void main(String[] args) {
+
+ if (args.length == 0) {
+ System.err.println("Usage: JavaLR <host> [<slices>]");
+ System.exit(1);
+ }
+
+ JavaSparkContext sc = new JavaSparkContext(args[0], "JavaLR");
+ Integer numSlices = (args.length > 1) ? Integer.parseInt(args[1]): 2;
+ List<DataPoint> data = generateData();
+
+ // Initialize w to a random value
+ double[] w = new double[D];
+ for (int i = 0; i < D; i++) {
+ w[i] = 2 * rand.nextDouble() - 1;
+ }
+
+ System.out.print("Initial w: ");
+ printWeights(w);
+
+ for (int i = 1; i <= ITERATIONS; i++) {
+ System.out.println("On iteration " + i);
+
+ double[] gradient = sc.parallelize(data, numSlices).map(
+ new ComputeGradient(w)
+ ).reduce(new VectorSum());
+
+ for (int j = 0; j < D; j++) {
+ w[j] -= gradient[j];
+ }
+
+ }
+
+ System.out.print("Final w: ");
+ printWeights(w);
+ System.exit(0);
+ }
+}
diff --git a/examples/src/main/java/spark/examples/JavaTC.java b/examples/src/main/java/spark/examples/JavaTC.java
new file mode 100644
index 0000000000..7ee1c3e49c
--- /dev/null
+++ b/examples/src/main/java/spark/examples/JavaTC.java
@@ -0,0 +1,76 @@
+package spark.examples;
+
+import scala.Tuple2;
+import scala.util.Random;
+import spark.api.java.JavaPairRDD;
+import spark.api.java.JavaSparkContext;
+import spark.api.java.function.PairFunction;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class JavaTC {
+
+ static int numEdges = 200;
+ static int numVertices = 100;
+ static Random rand = new Random(42);
+
+ static List<Tuple2<Integer, Integer>> generateGraph() {
+ Set<Tuple2<Integer, Integer>> edges = new HashSet<Tuple2<Integer, Integer>>(numEdges);
+ while (edges.size() < numEdges) {
+ int from = rand.nextInt(numVertices);
+ int to = rand.nextInt(numVertices);
+ Tuple2<Integer, Integer> e = new Tuple2<Integer, Integer>(from, to);
+ if (from != to) edges.add(e);
+ }
+ return new ArrayList(edges);
+ }
+
+ static class ProjectFn extends PairFunction<Tuple2<Integer, Tuple2<Integer, Integer>>,
+ Integer, Integer> {
+ static ProjectFn INSTANCE = new ProjectFn();
+ @Override
+ public Tuple2<Integer, Integer> apply(Tuple2<Integer, Tuple2<Integer, Integer>> triple) {
+ return new Tuple2<Integer, Integer>(triple._2()._2(), triple._2()._1());
+ }
+ }
+
+ public static void main(String[] args) {
+ if (args.length == 0) {
+ System.err.println("Usage: JavaTC <host> [<slices>]");
+ System.exit(1);
+ }
+
+ JavaSparkContext sc = new JavaSparkContext(args[0], "JavaTC");
+ Integer slices = (args.length > 1) ? Integer.parseInt(args[1]): 2;
+ JavaPairRDD<Integer, Integer> tc = sc.parallelizePairs(generateGraph(), slices);
+
+ // Linear transitive closure: each round grows paths by one edge,
+ // by joining the graph's edges with the already-discovered paths.
+ // e.g. join the path (y, z) from the TC with the edge (x, y) from
+ // the graph to obtain the path (x, z).
+
+ // Because join() joins on keys, the edges are stored in reversed order.
+ JavaPairRDD<Integer, Integer> edges = tc.map(new PairFunction<Tuple2<Integer, Integer>,
+ Integer, Integer>() {
+ @Override
+ public Tuple2<Integer, Integer> apply(Tuple2<Integer, Integer> e) {
+ return new Tuple2<Integer, Integer>(e._2(), e._1());
+ }
+ });
+
+ long oldCount = 0;
+ do {
+ oldCount = tc.count();
+ // Perform the join, obtaining an RDD of (y, (z, x)) pairs,
+ // then project the result to obtain the new (x, z) paths.
+
+ tc = tc.union(tc.join(edges).map(ProjectFn.INSTANCE)).distinct();
+ } while (tc.count() != oldCount);
+
+ System.out.println("TC has " + tc.count() + " edges.");
+ System.exit(0);
+ }
+}
diff --git a/examples/src/main/java/spark/examples/JavaTest.java b/examples/src/main/java/spark/examples/JavaTest.java
new file mode 100644
index 0000000000..d45795a8e3
--- /dev/null
+++ b/examples/src/main/java/spark/examples/JavaTest.java
@@ -0,0 +1,38 @@
+package spark.examples;
+
+import spark.api.java.JavaDoubleRDD;
+import spark.api.java.JavaRDD;
+import spark.api.java.JavaSparkContext;
+import spark.api.java.function.DoubleFunction;
+
+import java.util.List;
+
+public class JavaTest {
+
+ public static class MapFunction extends DoubleFunction<String> {
+ @Override
+ public Double apply(String s) {
+ return java.lang.Double.parseDouble(s);
+ }
+ }
+
+ public static void main(String[] args) throws Exception {
+
+ JavaSparkContext ctx = new JavaSparkContext("local", "JavaTest");
+ JavaRDD<String> lines = ctx.textFile("numbers.txt", 1).cache();
+ List<String> lineArr = lines.collect();
+
+ for (String line : lineArr) {
+ System.out.println(line);
+ }
+
+ JavaDoubleRDD data = lines.map(new MapFunction()).cache();
+
+ System.out.println("output");
+ List<Double> output = data.collect();
+ for (Double num : output) {
+ System.out.println(num);
+ }
+ System.exit(0);
+ }
+}
diff --git a/examples/src/main/java/spark/examples/JavaWordCount.java b/examples/src/main/java/spark/examples/JavaWordCount.java
new file mode 100644
index 0000000000..b7901d2921
--- /dev/null
+++ b/examples/src/main/java/spark/examples/JavaWordCount.java
@@ -0,0 +1,61 @@
+package spark.examples;
+
+import scala.Tuple2;
+import scala.collection.immutable.StringOps;
+import spark.api.java.JavaPairRDD;
+import spark.api.java.JavaRDD;
+import spark.api.java.JavaSparkContext;
+import spark.api.java.function.FlatMapFunction;
+import spark.api.java.function.Function2;
+import spark.api.java.function.PairFunction;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class JavaWordCount {
+
+ public static class SplitFunction extends FlatMapFunction<String, String> {
+ @Override
+ public Iterable<String> apply(String s) {
+ StringOps op = new StringOps(s);
+ return Arrays.asList(op.split(' '));
+ }
+ }
+
+ public static class MapFunction extends PairFunction<String, String, Integer> {
+ @Override
+ public Tuple2<String, Integer> apply(String s) {
+ return new Tuple2(s, 1);
+ }
+ }
+
+ public static class ReduceFunction extends Function2<Integer, Integer, Integer> {
+ @Override
+ public Integer apply(Integer i1, Integer i2) {
+ return i1 + i2;
+ }
+ }
+ public static void main(String[] args) throws Exception {
+ JavaSparkContext ctx = new JavaSparkContext("local", "JavaWordCount");
+ JavaRDD<String> lines = ctx.textFile("numbers.txt", 1).cache();
+ List<String> lineArr = lines.collect();
+
+ for (String line : lineArr) {
+ System.out.println(line);
+ }
+
+ JavaRDD<String> words = lines.flatMap(new SplitFunction());
+
+ JavaPairRDD<String, Integer> splits = words.map(new MapFunction());
+
+ JavaPairRDD<String, Integer> counts = splits.reduceByKey(new ReduceFunction());
+
+ System.out.println("output");
+ List<Tuple2<String, Integer>> output = counts.collect();
+ for (Tuple2 tuple : output) {
+ System.out.print(tuple._1 + ": ");
+ System.out.println(tuple._2);
+ }
+ System.exit(0);
+ }
+}
diff --git a/examples/src/main/scala/spark/examples/SparkTC.scala b/examples/src/main/scala/spark/examples/SparkTC.scala
new file mode 100644
index 0000000000..fa945b5082
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/SparkTC.scala
@@ -0,0 +1,53 @@
+package spark.examples
+
+import spark._
+import SparkContext._
+import scala.util.Random
+import scala.collection.mutable
+
+object SparkTC {
+
+ val numEdges = 200
+ val numVertices = 100
+ val rand = new Random(42)
+
+ def generateGraph = {
+ val edges: mutable.Set[(Int, Int)] = mutable.Set.empty
+ while (edges.size < numEdges) {
+ val from = rand.nextInt(numVertices)
+ val to = rand.nextInt(numVertices)
+ if (from != to) edges.+=((from, to))
+ }
+ edges.toSeq
+ }
+
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: SparkTC <host> [<slices>]")
+ System.exit(1)
+ }
+ val spark = new SparkContext(args(0), "SparkTC")
+ val slices = if (args.length > 1) args(1).toInt else 2
+ var tc = spark.parallelize(generateGraph, slices)
+
+ // Linear transitive closure: each round grows paths by one edge,
+ // by joining the graph's edges with the already-discovered paths.
+ // e.g. join the path (y, z) from the TC with the edge (x, y) from
+ // the graph to obtain the path (x, z).
+
+ // Because join() joins on keys, the edges are stored in reversed order.
+ val edges = tc.map(x => (x._2, x._1))
+
+ // This join is iterated until a fixed point is reached.
+ var oldCount = 0L
+ do {
+ oldCount = tc.count()
+ // Perform the join, obtaining an RDD of (y, (z, x)) pairs,
+ // then project the result to obtain the new (x, z) paths.
+ tc = tc.union(tc.join(edges).map(x => (x._2._2, x._2._1))).distinct()
+ } while (tc.count() != oldCount)
+
+ println("TC has " + tc.count() + " edges.")
+ System.exit(0)
+ }
+}