aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-01-13 19:20:03 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-01-13 19:20:03 -0800
commitfabcc82528e923a43fd5b9907626e598e2be4967 (patch)
treecab95eba736840d091cd315e6c63be11a4b7fd3b /examples
parentfd5581a0d3a3be45e02adf67dff1da0d26833a4f (diff)
parent1ecc221f841d898d831499042f5bd27f667d2ae1 (diff)
downloadspark-fabcc82528e923a43fd5b9907626e598e2be4967.tar.gz
spark-fabcc82528e923a43fd5b9907626e598e2be4967.tar.bz2
spark-fabcc82528e923a43fd5b9907626e598e2be4967.zip
Merge pull request #103 from edisontung/master
Made improvements to takeSample. Also changed SparkLocalKMeans to SparkKMeans
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/scala/spark/examples/LocalKMeans.scala80
-rw-r--r--examples/src/main/scala/spark/examples/SparkKMeans.scala118
2 files changed, 142 insertions, 56 deletions
diff --git a/examples/src/main/scala/spark/examples/LocalKMeans.scala b/examples/src/main/scala/spark/examples/LocalKMeans.scala
new file mode 100644
index 0000000000..7e8e7a6959
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/LocalKMeans.scala
@@ -0,0 +1,80 @@
+package spark.examples
+
+import java.util.Random
+import Vector._
+import spark.SparkContext
+import spark.SparkContext._
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.HashSet
+
+object LocalKMeans {
+ val N = 1000
+ val R = 1000 // Scaling factor
+ val D = 10
+ val K = 10
+ val convergeDist = 0.001
+ val rand = new Random(42)
+
+ def generateData = {
+ def generatePoint(i: Int) = {
+ Vector(D, _ => rand.nextDouble * R)
+ }
+ Array.tabulate(N)(generatePoint)
+ }
+
+ def closestPoint(p: Vector, centers: HashMap[Int, Vector]): Int = {
+ var index = 0
+ var bestIndex = 0
+ var closest = Double.PositiveInfinity
+
+ for (i <- 1 to centers.size) {
+ val vCurr = centers.get(i).get
+ val tempDist = p.squaredDist(vCurr)
+ if (tempDist < closest) {
+ closest = tempDist
+ bestIndex = i
+ }
+ }
+
+ return bestIndex
+ }
+
+ def main(args: Array[String]) {
+ val data = generateData
+ var points = new HashSet[Vector]
+ var kPoints = new HashMap[Int, Vector]
+ var tempDist = 1.0
+
+ while (points.size < K) {
+ points.add(data(rand.nextInt(N)))
+ }
+
+ val iter = points.iterator
+ for (i <- 1 to points.size) {
+ kPoints.put(i, iter.next())
+ }
+
+ println("Initial centers: " + kPoints)
+
+ while(tempDist > convergeDist) {
+ var closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
+
+ var mappings = closest.groupBy[Int] (x => x._1)
+
+ var pointStats = mappings.map(pair => pair._2.reduceLeft [(Int, (Vector, Int))] {case ((id1, (x1, y1)), (id2, (x2, y2))) => (id1, (x1 + x2, y1+y2))})
+
+ var newPoints = pointStats.map {mapping => (mapping._1, mapping._2._1/mapping._2._2)}
+
+ tempDist = 0.0
+ for (mapping <- newPoints) {
+ tempDist += kPoints.get(mapping._1).get.squaredDist(mapping._2)
+ }
+
+ for (newP <- newPoints) {
+ kPoints.put(newP._1, newP._2)
+ }
+ }
+
+ println("Final centers: " + kPoints)
+ }
+}
diff --git a/examples/src/main/scala/spark/examples/SparkKMeans.scala b/examples/src/main/scala/spark/examples/SparkKMeans.scala
index 048001dc4f..b0d3407801 100644
--- a/examples/src/main/scala/spark/examples/SparkKMeans.scala
+++ b/examples/src/main/scala/spark/examples/SparkKMeans.scala
@@ -1,67 +1,73 @@
package spark.examples
import java.util.Random
+import Vector._
import spark.SparkContext
import spark.SparkContext._
-import spark.examples.Vector._
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.HashSet
object SparkKMeans {
- def parseVector(line: String): Vector = {
- return new Vector(line.split(' ').map(_.toDouble))
- }
+ val R = 1000 // Scaling factor
+ val rand = new Random(42)
+
+ def parseVector(line: String): Vector = {
+ return new Vector(line.split(' ').map(_.toDouble))
+ }
+
+ def closestPoint(p: Vector, centers: HashMap[Int, Vector]): Int = {
+ var index = 0
+ var bestIndex = 0
+ var closest = Double.PositiveInfinity
+
+ for (i <- 1 to centers.size) {
+ val vCurr = centers.get(i).get
+ val tempDist = p.squaredDist(vCurr)
+ if (tempDist < closest) {
+ closest = tempDist
+ bestIndex = i
+ }
+ }
+
+ return bestIndex
+ }
- def closestCenter(p: Vector, centers: Array[Vector]): Int = {
- var bestIndex = 0
- var bestDist = p.squaredDist(centers(0))
- for (i <- 1 until centers.length) {
- val dist = p.squaredDist(centers(i))
- if (dist < bestDist) {
- bestDist = dist
- bestIndex = i
- }
- }
- return bestIndex
- }
+ def main(args: Array[String]) {
+ if (args.length < 4) {
+ System.err.println("Usage: SparkLocalKMeans <master> <file> <k> <convergeDist>")
+ System.exit(1)
+ }
+ val sc = new SparkContext(args(0), "SparkLocalKMeans")
+ val lines = sc.textFile(args(1))
+ val data = lines.map(parseVector _).cache()
+ val K = args(2).toInt
+ val convergeDist = args(3).toDouble
+
+ var points = data.takeSample(false, K, 42)
+ var kPoints = new HashMap[Int, Vector]
+ var tempDist = 1.0
+
+ for (i <- 1 to points.size) {
+ kPoints.put(i, points(i-1))
+ }
- def main(args: Array[String]) {
- if (args.length < 3) {
- System.err.println("Usage: SparkKMeans <master> <file> <dimensions> <k> <iters>")
- System.exit(1)
- }
- val sc = new SparkContext(args(0), "SparkKMeans")
- val lines = sc.textFile(args(1))
- val points = lines.map(parseVector _).cache()
- val dimensions = args(2).toInt
- val k = args(3).toInt
- val iterations = args(4).toInt
+ while(tempDist > convergeDist) {
+ var closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
+
+ var pointStats = closest.reduceByKey {case ((x1, y1), (x2, y2)) => (x1 + x2, y1+y2)}
+
+ var newPoints = pointStats.map {mapping => (mapping._1, mapping._2._1/mapping._2._2)}.collect()
+
+ tempDist = 0.0
+ for (mapping <- newPoints) {
+ tempDist += kPoints.get(mapping._1).get.squaredDist(mapping._2)
+ }
+
+ for (newP <- newPoints) {
+ kPoints.put(newP._1, newP._2)
+ }
+ }
- // Initialize cluster centers randomly
- val rand = new Random(42)
- var centers = new Array[Vector](k)
- for (i <- 0 until k)
- centers(i) = Vector(dimensions, _ => 2 * rand.nextDouble - 1)
- println("Initial centers: " + centers.mkString(", "))
-
- for (i <- 1 to iterations) {
- println("On iteration " + i)
-
- // Map each point to the index of its closest center and a (point, 1) pair
- // that we will use to compute an average later
- val mappedPoints = points.map { p => (closestCenter(p, centers), (p, 1)) }
-
- // Compute the new centers by summing the (point, 1) pairs and taking an average
- val newCenters = mappedPoints.reduceByKey {
- case ((sum1, count1), (sum2, count2)) => (sum1 + sum2, count1 + count2)
- }.map {
- case (id, (sum, count)) => (id, sum / count)
- }.collect
-
- // Update the centers array with the new centers we collected
- for ((id, value) <- newCenters) {
- centers(id) = value
- }
- }
-
- println("Final centers: " + centers.mkString(", "))
- }
+ println("Final centers: " + kPoints)
+ }
}