aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/scala/SparkHdfsLR.scala
blob: f14d48b17c49e9fb261d8285627d9a45fad2595f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import java.util.Random
import scala.math.exp
import Vector._
import spark._

object SparkHdfsLR {
  val D = 10   // Numer of dimensions
  val rand = new Random(42)

  case class DataPoint(x: Vector, y: Double)

  def parsePoint(line: String): DataPoint = {
    //val nums = line.split(' ').map(_.toDouble)
    //return DataPoint(new Vector(nums.slice(1, D+1)), nums(0))
    val tok = new java.util.StringTokenizer(line, " ")
    var y = tok.nextToken.toDouble
    var x = new Array[Double](D)
    var i = 0
    while (i < D) {
      x(i) = tok.nextToken.toDouble; i += 1
    }
    return DataPoint(new Vector(x), y)
  }

  def main(args: Array[String]) {
    if (args.length < 3) {
      System.err.println("Usage: SparkHdfsLR <master> <file> <iters>")
      System.exit(1)
    }
    val sc = new SparkContext(args(0), "SparkHdfsLR")
    val lines = sc.textFile(args(1))
    val points = lines.map(parsePoint _).cache()
    val ITERATIONS = args(2).toInt

    // Initialize w to a random value
    var w = Vector(D, _ => 2 * rand.nextDouble - 1)
    println("Initial w: " + w)

    for (i <- 1 to ITERATIONS) {
      println("On iteration " + i)
      val gradient = sc.accumulator(Vector.zeros(D))
      for (p <- points) {
        val scale = (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y
        gradient +=  scale * p.x
      }
      w -= gradient.value
    }

    println("Final w: " + w)
  }
}