From 01dce3f569e0085dae2d0e4bc5c9b2bef5bd3120 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 18 Jul 2012 17:32:31 -0700 Subject: Add Java API Add distinct() method to RDD. Fix bug in DoubleRDDFunctions. --- examples/src/main/java/spark/examples/JavaLR.java | 127 +++++++++++++++++++++ examples/src/main/java/spark/examples/JavaTC.java | 76 ++++++++++++ .../src/main/java/spark/examples/JavaTest.java | 38 ++++++ .../main/java/spark/examples/JavaWordCount.java | 61 ++++++++++ .../src/main/scala/spark/examples/SparkTC.scala | 53 +++++++++ 5 files changed, 355 insertions(+) create mode 100644 examples/src/main/java/spark/examples/JavaLR.java create mode 100644 examples/src/main/java/spark/examples/JavaTC.java create mode 100644 examples/src/main/java/spark/examples/JavaTest.java create mode 100644 examples/src/main/java/spark/examples/JavaWordCount.java create mode 100644 examples/src/main/scala/spark/examples/SparkTC.scala (limited to 'examples') diff --git a/examples/src/main/java/spark/examples/JavaLR.java b/examples/src/main/java/spark/examples/JavaLR.java new file mode 100644 index 0000000000..cb6abfad5b --- /dev/null +++ b/examples/src/main/java/spark/examples/JavaLR.java @@ -0,0 +1,127 @@ +package spark.examples; + +import scala.util.Random; +import spark.api.java.JavaSparkContext; +import spark.api.java.function.Function; +import spark.api.java.function.Function2; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class JavaLR { + + static int N = 10000; // Number of data points + static int D = 10; // Number of dimensions + static double R = 0.7; // Scaling factor + static int ITERATIONS = 5; + static Random rand = new Random(42); + + static class DataPoint implements Serializable { + public DataPoint(double[] x, int y) { + this.x = x; + this.y = y; + } + double[] x; + int y; + } + + static DataPoint generatePoint(int i) { + int y = (i % 2 == 0) ? -1 : 1; + double[] x = new double[D]; + for (int j = 0; j < D; j++) { + x[j] = rand.nextGaussian() + y * R; + } + return new DataPoint(x, y); + } + + static List generateData() { + List points = new ArrayList(N); + for (int i = 0; i < N; i++) { + points.add(generatePoint(i)); + } + return points; + } + + static class VectorSum extends Function2 { + + @Override + public double[] apply(double[] a, double[] b) { + double[] result = new double[D]; + for (int j = 0; j < D; j++) { + result[j] = a[j] + b[j]; + } + return result; + } + } + + static class ComputeGradient extends Function { + + double[] weights; + + public ComputeGradient(double[] weights) { + this.weights = weights; + } + + @Override + public double[] apply(DataPoint p) { + double[] gradient = new double[D]; + for (int i = 0; i < D; i++) { + double dot = dot(weights, p.x); + gradient[i] = (1 / (1 + Math.exp(-p.y * dot)) - 1) * p.y * p.x[i]; + } + return gradient; + } + } + + public static double dot(double[] a, double[] b) { + double x = 0; + for (int i = 0; i < D; i++) { + x += a[i] * b[i]; + } + return x; + } + + public static void printWeights(double[] a) { + System.out.println(Arrays.toString(a)); + } + + public static void main(String[] args) { + + if (args.length == 0) { + System.err.println("Usage: JavaLR []"); + System.exit(1); + } + + JavaSparkContext sc = new JavaSparkContext(args[0], "JavaLR"); + Integer numSlices = (args.length > 1) ? Integer.parseInt(args[1]): 2; + List data = generateData(); + + // Initialize w to a random value + double[] w = new double[D]; + for (int i = 0; i < D; i++) { + w[i] = 2 * rand.nextDouble() - 1; + } + + System.out.print("Initial w: "); + printWeights(w); + + for (int i = 1; i <= ITERATIONS; i++) { + System.out.println("On iteration " + i); + + double[] gradient = sc.parallelize(data, numSlices).map( + new ComputeGradient(w) + ).reduce(new VectorSum()); + + for (int j = 0; j < D; j++) { + w[j] -= gradient[j]; + } + + } + + System.out.print("Final w: "); + printWeights(w); + System.exit(0); + } +} diff --git a/examples/src/main/java/spark/examples/JavaTC.java b/examples/src/main/java/spark/examples/JavaTC.java new file mode 100644 index 0000000000..7ee1c3e49c --- /dev/null +++ b/examples/src/main/java/spark/examples/JavaTC.java @@ -0,0 +1,76 @@ +package spark.examples; + +import scala.Tuple2; +import scala.util.Random; +import spark.api.java.JavaPairRDD; +import spark.api.java.JavaSparkContext; +import spark.api.java.function.PairFunction; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +public class JavaTC { + + static int numEdges = 200; + static int numVertices = 100; + static Random rand = new Random(42); + + static List> generateGraph() { + Set> edges = new HashSet>(numEdges); + while (edges.size() < numEdges) { + int from = rand.nextInt(numVertices); + int to = rand.nextInt(numVertices); + Tuple2 e = new Tuple2(from, to); + if (from != to) edges.add(e); + } + return new ArrayList(edges); + } + + static class ProjectFn extends PairFunction>, + Integer, Integer> { + static ProjectFn INSTANCE = new ProjectFn(); + @Override + public Tuple2 apply(Tuple2> triple) { + return new Tuple2(triple._2()._2(), triple._2()._1()); + } + } + + public static void main(String[] args) { + if (args.length == 0) { + System.err.println("Usage: JavaTC []"); + System.exit(1); + } + + JavaSparkContext sc = new JavaSparkContext(args[0], "JavaTC"); + Integer slices = (args.length > 1) ? Integer.parseInt(args[1]): 2; + JavaPairRDD tc = sc.parallelizePairs(generateGraph(), slices); + + // Linear transitive closure: each round grows paths by one edge, + // by joining the graph's edges with the already-discovered paths. + // e.g. join the path (y, z) from the TC with the edge (x, y) from + // the graph to obtain the path (x, z). + + // Because join() joins on keys, the edges are stored in reversed order. + JavaPairRDD edges = tc.map(new PairFunction, + Integer, Integer>() { + @Override + public Tuple2 apply(Tuple2 e) { + return new Tuple2(e._2(), e._1()); + } + }); + + long oldCount = 0; + do { + oldCount = tc.count(); + // Perform the join, obtaining an RDD of (y, (z, x)) pairs, + // then project the result to obtain the new (x, z) paths. + + tc = tc.union(tc.join(edges).map(ProjectFn.INSTANCE)).distinct(); + } while (tc.count() != oldCount); + + System.out.println("TC has " + tc.count() + " edges."); + System.exit(0); + } +} diff --git a/examples/src/main/java/spark/examples/JavaTest.java b/examples/src/main/java/spark/examples/JavaTest.java new file mode 100644 index 0000000000..d45795a8e3 --- /dev/null +++ b/examples/src/main/java/spark/examples/JavaTest.java @@ -0,0 +1,38 @@ +package spark.examples; + +import spark.api.java.JavaDoubleRDD; +import spark.api.java.JavaRDD; +import spark.api.java.JavaSparkContext; +import spark.api.java.function.DoubleFunction; + +import java.util.List; + +public class JavaTest { + + public static class MapFunction extends DoubleFunction { + @Override + public Double apply(String s) { + return java.lang.Double.parseDouble(s); + } + } + + public static void main(String[] args) throws Exception { + + JavaSparkContext ctx = new JavaSparkContext("local", "JavaTest"); + JavaRDD lines = ctx.textFile("numbers.txt", 1).cache(); + List lineArr = lines.collect(); + + for (String line : lineArr) { + System.out.println(line); + } + + JavaDoubleRDD data = lines.map(new MapFunction()).cache(); + + System.out.println("output"); + List output = data.collect(); + for (Double num : output) { + System.out.println(num); + } + System.exit(0); + } +} diff --git a/examples/src/main/java/spark/examples/JavaWordCount.java b/examples/src/main/java/spark/examples/JavaWordCount.java new file mode 100644 index 0000000000..b7901d2921 --- /dev/null +++ b/examples/src/main/java/spark/examples/JavaWordCount.java @@ -0,0 +1,61 @@ +package spark.examples; + +import scala.Tuple2; +import scala.collection.immutable.StringOps; +import spark.api.java.JavaPairRDD; +import spark.api.java.JavaRDD; +import spark.api.java.JavaSparkContext; +import spark.api.java.function.FlatMapFunction; +import spark.api.java.function.Function2; +import spark.api.java.function.PairFunction; + +import java.util.Arrays; +import java.util.List; + +public class JavaWordCount { + + public static class SplitFunction extends FlatMapFunction { + @Override + public Iterable apply(String s) { + StringOps op = new StringOps(s); + return Arrays.asList(op.split(' ')); + } + } + + public static class MapFunction extends PairFunction { + @Override + public Tuple2 apply(String s) { + return new Tuple2(s, 1); + } + } + + public static class ReduceFunction extends Function2 { + @Override + public Integer apply(Integer i1, Integer i2) { + return i1 + i2; + } + } + public static void main(String[] args) throws Exception { + JavaSparkContext ctx = new JavaSparkContext("local", "JavaWordCount"); + JavaRDD lines = ctx.textFile("numbers.txt", 1).cache(); + List lineArr = lines.collect(); + + for (String line : lineArr) { + System.out.println(line); + } + + JavaRDD words = lines.flatMap(new SplitFunction()); + + JavaPairRDD splits = words.map(new MapFunction()); + + JavaPairRDD counts = splits.reduceByKey(new ReduceFunction()); + + System.out.println("output"); + List> output = counts.collect(); + for (Tuple2 tuple : output) { + System.out.print(tuple._1 + ": "); + System.out.println(tuple._2); + } + System.exit(0); + } +} diff --git a/examples/src/main/scala/spark/examples/SparkTC.scala b/examples/src/main/scala/spark/examples/SparkTC.scala new file mode 100644 index 0000000000..fa945b5082 --- /dev/null +++ b/examples/src/main/scala/spark/examples/SparkTC.scala @@ -0,0 +1,53 @@ +package spark.examples + +import spark._ +import SparkContext._ +import scala.util.Random +import scala.collection.mutable + +object SparkTC { + + val numEdges = 200 + val numVertices = 100 + val rand = new Random(42) + + def generateGraph = { + val edges: mutable.Set[(Int, Int)] = mutable.Set.empty + while (edges.size < numEdges) { + val from = rand.nextInt(numVertices) + val to = rand.nextInt(numVertices) + if (from != to) edges.+=((from, to)) + } + edges.toSeq + } + + def main(args: Array[String]) { + if (args.length == 0) { + System.err.println("Usage: SparkTC []") + System.exit(1) + } + val spark = new SparkContext(args(0), "SparkTC") + val slices = if (args.length > 1) args(1).toInt else 2 + var tc = spark.parallelize(generateGraph, slices) + + // Linear transitive closure: each round grows paths by one edge, + // by joining the graph's edges with the already-discovered paths. + // e.g. join the path (y, z) from the TC with the edge (x, y) from + // the graph to obtain the path (x, z). + + // Because join() joins on keys, the edges are stored in reversed order. + val edges = tc.map(x => (x._2, x._1)) + + // This join is iterated until a fixed point is reached. + var oldCount = 0L + do { + oldCount = tc.count() + // Perform the join, obtaining an RDD of (y, (z, x)) pairs, + // then project the result to obtain the new (x, z) paths. + tc = tc.union(tc.join(edges).map(x => (x._2._2, x._2._1))).distinct() + } while (tc.count() != oldCount) + + println("TC has " + tc.count() + " edges.") + System.exit(0) + } +} -- cgit v1.2.3