From 3b9d9de583bf2ee0c7b46c75944aedfcfa784a02 Mon Sep 17 00:00:00 2001 From: Edison Tung Date: Mon, 21 Nov 2011 16:37:58 -0800 Subject: Added KMeans examples LocalKMeans runs locally with a randomly generated dataset. SparkLocalKMeans takes an input file and runs KMeans on it. --- .../main/scala/spark/examples/LocalKMeans.scala | 80 ++++++++++++++++++++++ .../scala/spark/examples/SparkLocalKMeans.scala | 73 ++++++++++++++++++++ 2 files changed, 153 insertions(+) create mode 100644 examples/src/main/scala/spark/examples/LocalKMeans.scala create mode 100644 examples/src/main/scala/spark/examples/SparkLocalKMeans.scala (limited to 'examples') diff --git a/examples/src/main/scala/spark/examples/LocalKMeans.scala b/examples/src/main/scala/spark/examples/LocalKMeans.scala new file mode 100644 index 0000000000..7e8e7a6959 --- /dev/null +++ b/examples/src/main/scala/spark/examples/LocalKMeans.scala @@ -0,0 +1,80 @@ +package spark.examples + +import java.util.Random +import Vector._ +import spark.SparkContext +import spark.SparkContext._ +import scala.collection.mutable.HashMap +import scala.collection.mutable.HashSet + +object LocalKMeans { + val N = 1000 + val R = 1000 // Scaling factor + val D = 10 + val K = 10 + val convergeDist = 0.001 + val rand = new Random(42) + + def generateData = { + def generatePoint(i: Int) = { + Vector(D, _ => rand.nextDouble * R) + } + Array.tabulate(N)(generatePoint) + } + + def closestPoint(p: Vector, centers: HashMap[Int, Vector]): Int = { + var index = 0 + var bestIndex = 0 + var closest = Double.PositiveInfinity + + for (i <- 1 to centers.size) { + val vCurr = centers.get(i).get + val tempDist = p.squaredDist(vCurr) + if (tempDist < closest) { + closest = tempDist + bestIndex = i + } + } + + return bestIndex + } + + def main(args: Array[String]) { + val data = generateData + var points = new HashSet[Vector] + var kPoints = new HashMap[Int, Vector] + var tempDist = 1.0 + + while (points.size < K) { + points.add(data(rand.nextInt(N))) + } + + val iter = points.iterator + for (i <- 1 to points.size) { + kPoints.put(i, iter.next()) + } + + println("Initial centers: " + kPoints) + + while(tempDist > convergeDist) { + var closest = data.map (p => (closestPoint(p, kPoints), (p, 1))) + + var mappings = closest.groupBy[Int] (x => x._1) + + var pointStats = mappings.map(pair => pair._2.reduceLeft [(Int, (Vector, Int))] {case ((id1, (x1, y1)), (id2, (x2, y2))) => (id1, (x1 + x2, y1+y2))}) + + var newPoints = pointStats.map {mapping => (mapping._1, mapping._2._1/mapping._2._2)} + + tempDist = 0.0 + for (mapping <- newPoints) { + tempDist += kPoints.get(mapping._1).get.squaredDist(mapping._2) + } + + for (newP <- newPoints) { + kPoints.put(newP._1, newP._2) + } + } + + println("Final centers: " + kPoints) + } +} diff --git a/examples/src/main/scala/spark/examples/SparkLocalKMeans.scala b/examples/src/main/scala/spark/examples/SparkLocalKMeans.scala new file mode 100644 index 0000000000..8d9527b7c1 --- /dev/null +++ b/examples/src/main/scala/spark/examples/SparkLocalKMeans.scala @@ -0,0 +1,73 @@ +package spark.examples + +import java.util.Random +import Vector._ +import spark.SparkContext +import spark.SparkContext._ +import scala.collection.mutable.HashMap +import scala.collection.mutable.HashSet + +object SparkLocalKMeans { + val R = 1000 // Scaling factor + val rand = new Random(42) + + def parseVector(line: String): Vector = { + return new Vector(line.split(' ').map(_.toDouble)) + } + + def closestPoint(p: Vector, centers: HashMap[Int, Vector]): Int = { + var index = 0 + var bestIndex = 0 + var closest = Double.PositiveInfinity + + for (i <- 1 to centers.size) { + val vCurr = centers.get(i).get + val tempDist = p.squaredDist(vCurr) + if (tempDist < closest) { + closest = tempDist + bestIndex = i + } + } + + return bestIndex + } + + def main(args: Array[String]) { + if (args.length < 4) { + System.err.println("Usage: SparkLocalKMeans ") + System.exit(1) + } + val sc = new SparkContext(args(0), "SparkLocalKMeans") + val lines = sc.textFile(args(1)) + val data = lines.map(parseVector _).cache() + val K = args(2).toInt + val convergeDist = args(3).toDouble + + var points = data.sample(false, (K+1)/data.count().toDouble, 42).collect + var kPoints = new HashMap[Int, Vector] + var tempDist = 1.0 + + for (i <- 1 to points.size) { + kPoints.put(i, points(i-1)) + } + + while(tempDist > convergeDist) { + var closest = data.map (p => (closestPoint(p, kPoints), (p, 1))) + + var pointStats = closest.reduceByKey {case ((x1, y1), (x2, y2)) => (x1 + x2, y1+y2)} + + var newPoints = pointStats.map {mapping => (mapping._1, mapping._2._1/mapping._2._2)}.collect() + + tempDist = 0.0 + for (mapping <- newPoints) { + tempDist += kPoints.get(mapping._1).get.squaredDist(mapping._2) + } + + for (newP <- newPoints) { + kPoints.put(newP._1, newP._2) + } + } + + println("Final centers: " + kPoints) + } +} -- cgit v1.2.3