aboutsummaryrefslogtreecommitdiff
path: root/graph/src
diff options
context:
space:
mode:
authorJianping J Wang <jianping.j.wang@gmail.com>2013-12-31 16:59:05 +0800
committerJianping J Wang <jianping.j.wang@gmail.com>2013-12-31 16:59:05 +0800
commit779c66ae4ee681f9cf8ab85cd48f4761ee49e031 (patch)
treebf778a314fb585b455527756942590007b02efd2 /graph/src
parent600421d8bc258508d0e298131384d0abed4b87b7 (diff)
downloadspark-779c66ae4ee681f9cf8ab85cd48f4761ee49e031.tar.gz
spark-779c66ae4ee681f9cf8ab85cd48f4761ee49e031.tar.bz2
spark-779c66ae4ee681f9cf8ab85cd48f4761ee49e031.zip
refactor and fix bugs
Diffstat (limited to 'graph/src')
-rw-r--r--graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala87
1 files changed, 29 insertions, 58 deletions
diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala b/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala
index cbbe240c90..7c3e0c83c9 100644
--- a/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala
+++ b/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala
@@ -5,18 +5,6 @@ import org.apache.spark.graph._
import scala.util.Random
import org.apache.commons.math.linear._
-class VT( // vertex type
- var v1: RealVector, // v1: p for user node, q for item node
- var v2: RealVector, // v2: pu + |N(u)|^(-0.5)*sum(y) for user node, y for item node
- var bias: Double,
- var norm: Double // |N(u)|^(-0.5) for user node
- ) extends Serializable
-
-class Msg( // message
- var v1: RealVector,
- var v2: RealVector,
- var bias: Double) extends Serializable
-
class SvdppConf( // Svdpp parameters
var rank: Int,
var maxIters: Int,
@@ -40,92 +28,75 @@ object Svdpp {
* @return a graph with vertex attributes containing the trained model
*/
- def run(edges: RDD[Edge[Double]], conf: SvdppConf): Graph[VT, Double] = {
+ def run(edges: RDD[Edge[Double]], conf: SvdppConf): Graph[(RealVector, RealVector, Double, Double), Double] = {
// generate default vertex attribute
- def defaultF(rank: Int) = {
+ def defaultF(rank: Int): (RealVector, RealVector, Double, Double) = {
val v1 = new ArrayRealVector(rank)
val v2 = new ArrayRealVector(rank)
for (i <- 0 until rank) {
v1.setEntry(i, Random.nextDouble)
v2.setEntry(i, Random.nextDouble)
}
- var vd = new VT(v1, v2, 0.0, 0.0)
- vd
+ (v1, v2, 0.0, 0.0)
}
// calculate global rating mean
val (rs, rc) = edges.map(e => (e.attr, 1L)).reduce((a, b) => (a._1 + b._1, a._2 + b._2))
- val u = rs / rc // global rating mean
+ val u = rs / rc
// construct graph
var g = Graph.fromEdges(edges, defaultF(conf.rank)).cache()
// calculate initial bias and norm
- var t0: VertexRDD[(Long, Double)] = g.mapReduceTriplets(et =>
- Iterator((et.srcId, (1L, et.attr)), (et.dstId, (1L, et.attr))),
- (g1: (Long, Double), g2: (Long, Double)) =>
- (g1._1 + g2._1, g1._2 + g2._2))
- g = g.outerJoinVertices(t0) {
- (vid: Vid, vd: VT, msg: Option[(Long, Double)]) =>
- vd.bias = msg.get._2 / msg.get._1; vd.norm = 1.0 / scala.math.sqrt(msg.get._1)
- vd
+ var t0 = g.mapReduceTriplets(et =>
+ Iterator((et.srcId, (1L, et.attr)), (et.dstId, (1L, et.attr))), (g1: (Long, Double), g2: (Long, Double)) => (g1._1 + g2._1, g1._2 + g2._2))
+ g = g.outerJoinVertices(t0) { (vid: Vid, vd: (RealVector, RealVector, Double, Double), msg: Option[(Long, Double)]) =>
+ (vd._1, vd._2, msg.get._2 / msg.get._1, 1.0 / scala.math.sqrt(msg.get._1))
}
- def mapTrainF(conf: SvdppConf, u: Double)(et: EdgeTriplet[VT, Double]): Iterator[(Vid, Msg)] = {
- assert(et.srcAttr != null && et.dstAttr != null)
+ def mapTrainF(conf: SvdppConf, u: Double)(et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double])
+ : Iterator[(Vid, (RealVector, RealVector, Double))] = {
val (usr, itm) = (et.srcAttr, et.dstAttr)
- val (p, q) = (usr.v1, itm.v1)
- var pred = u + usr.bias + itm.bias + q.dotProduct(usr.v2)
+ val (p, q) = (usr._1, itm._1)
+ var pred = u + usr._3 + itm._3 + q.dotProduct(usr._2)
pred = math.max(pred, conf.minVal)
pred = math.min(pred, conf.maxVal)
val err = et.attr - pred
val updateP = ((q.mapMultiply(err)).subtract(p.mapMultiply(conf.gamma7))).mapMultiply(conf.gamma2)
- val updateQ = ((usr.v2.mapMultiply(err)).subtract(q.mapMultiply(conf.gamma7))).mapMultiply(conf.gamma2)
- val updateY = ((q.mapMultiply(err * usr.norm)).subtract((itm.v2).mapMultiply(conf.gamma7))).mapMultiply(conf.gamma2)
- Iterator((et.srcId, new Msg(updateP, updateY, (err - conf.gamma6 * usr.bias) * conf.gamma1)),
- (et.dstId, new Msg(updateQ, updateY, (err - conf.gamma6 * itm.bias) * conf.gamma1)))
+ val updateQ = ((usr._2.mapMultiply(err)).subtract(q.mapMultiply(conf.gamma7))).mapMultiply(conf.gamma2)
+ val updateY = ((q.mapMultiply(err * usr._4)).subtract((itm._2).mapMultiply(conf.gamma7))).mapMultiply(conf.gamma2)
+ Iterator((et.srcId, (updateP, updateY, (err - conf.gamma6 * usr._3) * conf.gamma1)),
+ (et.dstId, (updateQ, updateY, (err - conf.gamma6 * itm._3) * conf.gamma1)))
}
for (i <- 0 until conf.maxIters) {
// phase 1, calculate v2 for user nodes
- var t1 = g.mapReduceTriplets(et =>
- Iterator((et.srcId, et.dstAttr.v2)),
- (g1: RealVector, g2: RealVector) => g1.add(g2))
- g = g.outerJoinVertices(t1) { (vid: Vid, vd: VT, msg: Option[RealVector]) =>
- if (msg.isDefined) vd.v2 = vd.v1.add(msg.get.mapMultiply(vd.norm))
- vd
+ var t1 = g.mapReduceTriplets(et => Iterator((et.srcId, et.dstAttr._2)), (g1: RealVector, g2: RealVector) => g1.add(g2))
+ g = g.outerJoinVertices(t1) { (vid: Vid, vd: (RealVector, RealVector, Double, Double), msg: Option[RealVector]) =>
+ if (msg.isDefined) (vd._1, vd._1.add(msg.get.mapMultiply(vd._4)), vd._3, vd._4) else vd
}
// phase 2, update p for user nodes and q, y for item nodes
- val t2: VertexRDD[Msg] = g.mapReduceTriplets(mapTrainF(conf, u), (g1: Msg, g2: Msg) => {
- g1.v1 = g1.v1.add(g2.v1)
- g1.v2 = g1.v2.add(g2.v2)
- g1.bias += g2.bias
- g1
- })
- g = g.outerJoinVertices(t2) { (vid: Vid, vd: VT, msg: Option[Msg]) =>
- vd.v1 = vd.v1.add(msg.get.v1)
- if (vid % 2 == 1) vd.v2 = vd.v2.add(msg.get.v2)
- vd.bias += msg.get.bias
- vd
+ val t2 = g.mapReduceTriplets(mapTrainF(conf, u), (g1: (RealVector, RealVector, Double), g2: (RealVector, RealVector, Double)) =>
+ (g1._1.add(g2._1), g1._2.add(g2._2), g1._3 + g2._3))
+ g = g.outerJoinVertices(t2) { (vid: Vid, vd: (RealVector, RealVector, Double, Double), msg: Option[(RealVector, RealVector, Double)]) =>
+ (vd._1.add(msg.get._1), vd._2.add(msg.get._2), vd._3 + msg.get._3, vd._4)
}
}
// calculate error on training set
- def mapTestF(conf: SvdppConf, u: Double)(et: EdgeTriplet[VT, Double]): Iterator[(Vid, Double)] = {
- assert(et.srcAttr != null && et.dstAttr != null)
+ def mapTestF(conf: SvdppConf, u: Double)(et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double]): Iterator[(Vid, Double)] = {
val (usr, itm) = (et.srcAttr, et.dstAttr)
- val (p, q) = (usr.v1, itm.v1)
- var pred = u + usr.bias + itm.bias + q.dotProduct(usr.v2)
+ val (p, q) = (usr._1, itm._1)
+ var pred = u + usr._3 + itm._3 + q.dotProduct(usr._2)
pred = math.max(pred, conf.minVal)
pred = math.min(pred, conf.maxVal)
val err = (et.attr - pred) * (et.attr - pred)
Iterator((et.dstId, err))
}
- val t3: VertexRDD[Double] = g.mapReduceTriplets(mapTestF(conf, u), _ + _)
- g.outerJoinVertices(t3) { (vid: Vid, vd: VT, msg: Option[Double]) =>
- if (msg.isDefined && vid % 2 == 1) vd.norm = msg.get // item nodes sum up the errors
- vd
+ val t3 = g.mapReduceTriplets(mapTestF(conf, u), (g1: Double, g2: Double) => g1 + g2)
+ g.outerJoinVertices(t3) { (vid: Vid, vd: (RealVector, RealVector, Double, Double), msg: Option[Double]) =>
+ if (msg.isDefined && vid % 2 == 1) (vd._1, vd._2, vd._3, msg.get) else vd
}
g
}