diff options
author | Josh Rosen <rosenville@gmail.com> | 2012-07-18 17:32:31 -0700 |
---|---|---|
committer | Josh Rosen <rosenville@gmail.com> | 2012-07-18 17:34:29 -0700 |
commit | 01dce3f569e0085dae2d0e4bc5c9b2bef5bd3120 (patch) | |
tree | 89882e8acda5c8d3cbfecc9bd2aa892ebc496b10 /examples | |
parent | 628bb5ca7f563ab7f11c373572145df403de6fef (diff) | |
download | spark-01dce3f569e0085dae2d0e4bc5c9b2bef5bd3120.tar.gz spark-01dce3f569e0085dae2d0e4bc5c9b2bef5bd3120.tar.bz2 spark-01dce3f569e0085dae2d0e4bc5c9b2bef5bd3120.zip |
Add Java API
Add distinct() method to RDD.
Fix bug in DoubleRDDFunctions.
Diffstat (limited to 'examples')
-rw-r--r-- | examples/src/main/java/spark/examples/JavaLR.java | 127 | ||||
-rw-r--r-- | examples/src/main/java/spark/examples/JavaTC.java | 76 | ||||
-rw-r--r-- | examples/src/main/java/spark/examples/JavaTest.java | 38 | ||||
-rw-r--r-- | examples/src/main/java/spark/examples/JavaWordCount.java | 61 | ||||
-rw-r--r-- | examples/src/main/scala/spark/examples/SparkTC.scala | 53 |
5 files changed, 355 insertions, 0 deletions
diff --git a/examples/src/main/java/spark/examples/JavaLR.java b/examples/src/main/java/spark/examples/JavaLR.java new file mode 100644 index 0000000000..cb6abfad5b --- /dev/null +++ b/examples/src/main/java/spark/examples/JavaLR.java @@ -0,0 +1,127 @@ +package spark.examples; + +import scala.util.Random; +import spark.api.java.JavaSparkContext; +import spark.api.java.function.Function; +import spark.api.java.function.Function2; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class JavaLR { + + static int N = 10000; // Number of data points + static int D = 10; // Number of dimensions + static double R = 0.7; // Scaling factor + static int ITERATIONS = 5; + static Random rand = new Random(42); + + static class DataPoint implements Serializable { + public DataPoint(double[] x, int y) { + this.x = x; + this.y = y; + } + double[] x; + int y; + } + + static DataPoint generatePoint(int i) { + int y = (i % 2 == 0) ? -1 : 1; + double[] x = new double[D]; + for (int j = 0; j < D; j++) { + x[j] = rand.nextGaussian() + y * R; + } + return new DataPoint(x, y); + } + + static List<DataPoint> generateData() { + List<DataPoint> points = new ArrayList<DataPoint>(N); + for (int i = 0; i < N; i++) { + points.add(generatePoint(i)); + } + return points; + } + + static class VectorSum extends Function2<double[], double[], double[]> { + + @Override + public double[] apply(double[] a, double[] b) { + double[] result = new double[D]; + for (int j = 0; j < D; j++) { + result[j] = a[j] + b[j]; + } + return result; + } + } + + static class ComputeGradient extends Function<DataPoint, double[]> { + + double[] weights; + + public ComputeGradient(double[] weights) { + this.weights = weights; + } + + @Override + public double[] apply(DataPoint p) { + double[] gradient = new double[D]; + for (int i = 0; i < D; i++) { + double dot = dot(weights, p.x); + gradient[i] = (1 / (1 + Math.exp(-p.y * dot)) - 1) * p.y * p.x[i]; + } + return gradient; + } + } + + public static double dot(double[] a, double[] b) { + double x = 0; + for (int i = 0; i < D; i++) { + x += a[i] * b[i]; + } + return x; + } + + public static void printWeights(double[] a) { + System.out.println(Arrays.toString(a)); + } + + public static void main(String[] args) { + + if (args.length == 0) { + System.err.println("Usage: JavaLR <host> [<slices>]"); + System.exit(1); + } + + JavaSparkContext sc = new JavaSparkContext(args[0], "JavaLR"); + Integer numSlices = (args.length > 1) ? Integer.parseInt(args[1]): 2; + List<DataPoint> data = generateData(); + + // Initialize w to a random value + double[] w = new double[D]; + for (int i = 0; i < D; i++) { + w[i] = 2 * rand.nextDouble() - 1; + } + + System.out.print("Initial w: "); + printWeights(w); + + for (int i = 1; i <= ITERATIONS; i++) { + System.out.println("On iteration " + i); + + double[] gradient = sc.parallelize(data, numSlices).map( + new ComputeGradient(w) + ).reduce(new VectorSum()); + + for (int j = 0; j < D; j++) { + w[j] -= gradient[j]; + } + + } + + System.out.print("Final w: "); + printWeights(w); + System.exit(0); + } +} diff --git a/examples/src/main/java/spark/examples/JavaTC.java b/examples/src/main/java/spark/examples/JavaTC.java new file mode 100644 index 0000000000..7ee1c3e49c --- /dev/null +++ b/examples/src/main/java/spark/examples/JavaTC.java @@ -0,0 +1,76 @@ +package spark.examples; + +import scala.Tuple2; +import scala.util.Random; +import spark.api.java.JavaPairRDD; +import spark.api.java.JavaSparkContext; +import spark.api.java.function.PairFunction; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class JavaTC { + + static int numEdges = 200; + static int numVertices = 100; + static Random rand = new Random(42); + + static List<Tuple2<Integer, Integer>> generateGraph() { + Set<Tuple2<Integer, Integer>> edges = new HashSet<Tuple2<Integer, Integer>>(numEdges); + while (edges.size() < numEdges) { + int from = rand.nextInt(numVertices); + int to = rand.nextInt(numVertices); + Tuple2<Integer, Integer> e = new Tuple2<Integer, Integer>(from, to); + if (from != to) edges.add(e); + } + return new ArrayList(edges); + } + + static class ProjectFn extends PairFunction<Tuple2<Integer, Tuple2<Integer, Integer>>, + Integer, Integer> { + static ProjectFn INSTANCE = new ProjectFn(); + @Override + public Tuple2<Integer, Integer> apply(Tuple2<Integer, Tuple2<Integer, Integer>> triple) { + return new Tuple2<Integer, Integer>(triple._2()._2(), triple._2()._1()); + } + } + + public static void main(String[] args) { + if (args.length == 0) { + System.err.println("Usage: JavaTC <host> [<slices>]"); + System.exit(1); + } + + JavaSparkContext sc = new JavaSparkContext(args[0], "JavaTC"); + Integer slices = (args.length > 1) ? Integer.parseInt(args[1]): 2; + JavaPairRDD<Integer, Integer> tc = sc.parallelizePairs(generateGraph(), slices); + + // Linear transitive closure: each round grows paths by one edge, + // by joining the graph's edges with the already-discovered paths. + // e.g. join the path (y, z) from the TC with the edge (x, y) from + // the graph to obtain the path (x, z). + + // Because join() joins on keys, the edges are stored in reversed order. + JavaPairRDD<Integer, Integer> edges = tc.map(new PairFunction<Tuple2<Integer, Integer>, + Integer, Integer>() { + @Override + public Tuple2<Integer, Integer> apply(Tuple2<Integer, Integer> e) { + return new Tuple2<Integer, Integer>(e._2(), e._1()); + } + }); + + long oldCount = 0; + do { + oldCount = tc.count(); + // Perform the join, obtaining an RDD of (y, (z, x)) pairs, + // then project the result to obtain the new (x, z) paths. + + tc = tc.union(tc.join(edges).map(ProjectFn.INSTANCE)).distinct(); + } while (tc.count() != oldCount); + + System.out.println("TC has " + tc.count() + " edges."); + System.exit(0); + } +} diff --git a/examples/src/main/java/spark/examples/JavaTest.java b/examples/src/main/java/spark/examples/JavaTest.java new file mode 100644 index 0000000000..d45795a8e3 --- /dev/null +++ b/examples/src/main/java/spark/examples/JavaTest.java @@ -0,0 +1,38 @@ +package spark.examples; + +import spark.api.java.JavaDoubleRDD; +import spark.api.java.JavaRDD; +import spark.api.java.JavaSparkContext; +import spark.api.java.function.DoubleFunction; + +import java.util.List; + +public class JavaTest { + + public static class MapFunction extends DoubleFunction<String> { + @Override + public Double apply(String s) { + return java.lang.Double.parseDouble(s); + } + } + + public static void main(String[] args) throws Exception { + + JavaSparkContext ctx = new JavaSparkContext("local", "JavaTest"); + JavaRDD<String> lines = ctx.textFile("numbers.txt", 1).cache(); + List<String> lineArr = lines.collect(); + + for (String line : lineArr) { + System.out.println(line); + } + + JavaDoubleRDD data = lines.map(new MapFunction()).cache(); + + System.out.println("output"); + List<Double> output = data.collect(); + for (Double num : output) { + System.out.println(num); + } + System.exit(0); + } +} diff --git a/examples/src/main/java/spark/examples/JavaWordCount.java b/examples/src/main/java/spark/examples/JavaWordCount.java new file mode 100644 index 0000000000..b7901d2921 --- /dev/null +++ b/examples/src/main/java/spark/examples/JavaWordCount.java @@ -0,0 +1,61 @@ +package spark.examples; + +import scala.Tuple2; +import scala.collection.immutable.StringOps; +import spark.api.java.JavaPairRDD; +import spark.api.java.JavaRDD; +import spark.api.java.JavaSparkContext; +import spark.api.java.function.FlatMapFunction; +import spark.api.java.function.Function2; +import spark.api.java.function.PairFunction; + +import java.util.Arrays; +import java.util.List; + +public class JavaWordCount { + + public static class SplitFunction extends FlatMapFunction<String, String> { + @Override + public Iterable<String> apply(String s) { + StringOps op = new StringOps(s); + return Arrays.asList(op.split(' ')); + } + } + + public static class MapFunction extends PairFunction<String, String, Integer> { + @Override + public Tuple2<String, Integer> apply(String s) { + return new Tuple2(s, 1); + } + } + + public static class ReduceFunction extends Function2<Integer, Integer, Integer> { + @Override + public Integer apply(Integer i1, Integer i2) { + return i1 + i2; + } + } + public static void main(String[] args) throws Exception { + JavaSparkContext ctx = new JavaSparkContext("local", "JavaWordCount"); + JavaRDD<String> lines = ctx.textFile("numbers.txt", 1).cache(); + List<String> lineArr = lines.collect(); + + for (String line : lineArr) { + System.out.println(line); + } + + JavaRDD<String> words = lines.flatMap(new SplitFunction()); + + JavaPairRDD<String, Integer> splits = words.map(new MapFunction()); + + JavaPairRDD<String, Integer> counts = splits.reduceByKey(new ReduceFunction()); + + System.out.println("output"); + List<Tuple2<String, Integer>> output = counts.collect(); + for (Tuple2 tuple : output) { + System.out.print(tuple._1 + ": "); + System.out.println(tuple._2); + } + System.exit(0); + } +} diff --git a/examples/src/main/scala/spark/examples/SparkTC.scala b/examples/src/main/scala/spark/examples/SparkTC.scala new file mode 100644 index 0000000000..fa945b5082 --- /dev/null +++ b/examples/src/main/scala/spark/examples/SparkTC.scala @@ -0,0 +1,53 @@ +package spark.examples + +import spark._ +import SparkContext._ +import scala.util.Random +import scala.collection.mutable + +object SparkTC { + + val numEdges = 200 + val numVertices = 100 + val rand = new Random(42) + + def generateGraph = { + val edges: mutable.Set[(Int, Int)] = mutable.Set.empty + while (edges.size < numEdges) { + val from = rand.nextInt(numVertices) + val to = rand.nextInt(numVertices) + if (from != to) edges.+=((from, to)) + } + edges.toSeq + } + + def main(args: Array[String]) { + if (args.length == 0) { + System.err.println("Usage: SparkTC <host> [<slices>]") + System.exit(1) + } + val spark = new SparkContext(args(0), "SparkTC") + val slices = if (args.length > 1) args(1).toInt else 2 + var tc = spark.parallelize(generateGraph, slices) + + // Linear transitive closure: each round grows paths by one edge, + // by joining the graph's edges with the already-discovered paths. + // e.g. join the path (y, z) from the TC with the edge (x, y) from + // the graph to obtain the path (x, z). + + // Because join() joins on keys, the edges are stored in reversed order. + val edges = tc.map(x => (x._2, x._1)) + + // This join is iterated until a fixed point is reached. + var oldCount = 0L + do { + oldCount = tc.count() + // Perform the join, obtaining an RDD of (y, (z, x)) pairs, + // then project the result to obtain the new (x, z) paths. + tc = tc.union(tc.join(edges).map(x => (x._2._2, x._2._1))).distinct() + } while (tc.count() != oldCount) + + println("TC has " + tc.count() + " edges.") + System.exit(0) + } +} |