From 29fe6bdaa29193c9dbf3a8fbd05094f3d812d4e5 Mon Sep 17 00:00:00 2001 From: Jianping J Wang Date: Mon, 30 Dec 2013 23:41:15 +0800 Subject: refactor and bug fix --- .../org/apache/spark/graph/algorithms/Svdpp.scala | 155 +++++++++------------ 1 file changed, 64 insertions(+), 91 deletions(-) (limited to 'graph/src') diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala b/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala index 26b999f4cf..cbbe240c90 100644 --- a/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala +++ b/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala @@ -5,18 +5,27 @@ import org.apache.spark.graph._ import scala.util.Random import org.apache.commons.math.linear._ -class VT ( // vertex type +class VT( // vertex type var v1: RealVector, // v1: p for user node, q for item node var v2: RealVector, // v2: pu + |N(u)|^(-0.5)*sum(y) for user node, y for item node var bias: Double, var norm: Double // |N(u)|^(-0.5) for user node -) extends Serializable + ) extends Serializable -class Msg ( // message +class Msg( // message var v1: RealVector, var v2: RealVector, - var bias: Double -) extends Serializable + var bias: Double) extends Serializable + +class SvdppConf( // Svdpp parameters + var rank: Int, + var maxIters: Int, + var minVal: Double, + var maxVal: Double, + var gamma1: Double, + var gamma2: Double, + var gamma6: Double, + var gamma7: Double) extends Serializable object Svdpp { /** @@ -24,21 +33,14 @@ object Svdpp { * paper is available at [[http://public.research.att.com/~volinsky/netflix/kdd08koren.pdf]]. * The prediction rule is rui = u + bu + bi + qi*(pu + |N(u)|^(-0.5)*sum(y)), see the details on page 6. * - * @param edges edges for constructing the graph + * @param edges edges for constructing the graph + * + * @param conf Svdpp parameters * * @return a graph with vertex attributes containing the trained model */ - def run(edges: RDD[Edge[Double]]): Graph[VT, Double] = { - // defalut parameters - val rank = 10 - val maxIters = 20 - val minVal = 0.0 - val maxVal = 5.0 - val gamma1 = 0.007 - val gamma2 = 0.007 - val gamma6 = 0.005 - val gamma7 = 0.015 + def run(edges: RDD[Edge[Double]], conf: SvdppConf): Graph[VT, Double] = { // generate default vertex attribute def defaultF(rank: Int) = { @@ -52,108 +54,79 @@ object Svdpp { vd } - // calculate initial bias and norm - def mapF0(et: EdgeTriplet[VT, Double]): Iterator[(Vid, (Long, Double))] = { - assert(et.srcAttr != null && et.dstAttr != null) - Iterator((et.srcId, (1L, et.attr)), (et.dstId, (1L, et.attr))) - } - def reduceF0(g1: (Long, Double), g2: (Long, Double)) = { - (g1._1 + g2._1, g1._2 + g2._2) - } - def updateF0(vid: Vid, vd: VT, msg: Option[(Long, Double)]) = { - if (msg.isDefined) { - vd.bias = msg.get._2 / msg.get._1 - vd.norm = 1.0 / scala.math.sqrt(msg.get._1) - } - vd - } - // calculate global rating mean val (rs, rc) = edges.map(e => (e.attr, 1L)).reduce((a, b) => (a._1 + b._1, a._2 + b._2)) val u = rs / rc // global rating mean - // make graph - var g = Graph.fromEdges(edges, defaultF(rank)).cache() + // construct graph + var g = Graph.fromEdges(edges, defaultF(conf.rank)).cache() // calculate initial bias and norm - val t0 = g.mapReduceTriplets(mapF0, reduceF0) - g.outerJoinVertices(t0) {updateF0} - - // phase 1 - def mapF1(et: EdgeTriplet[VT, Double]): Iterator[(Vid, RealVector)] = { - assert(et.srcAttr != null && et.dstAttr != null) - Iterator((et.srcId, et.dstAttr.v2)) // sum up y of connected item nodes - } - def reduceF1(g1: RealVector, g2: RealVector) = { - g1.add(g2) - } - def updateF1(vid: Vid, vd: VT, msg: Option[RealVector]) = { - if (msg.isDefined) { - vd.v2 = vd.v1.add(msg.get.mapMultiply(vd.norm)) // pu + |N(u)|^(-0.5)*sum(y) - } - vd + var t0: VertexRDD[(Long, Double)] = g.mapReduceTriplets(et => + Iterator((et.srcId, (1L, et.attr)), (et.dstId, (1L, et.attr))), + (g1: (Long, Double), g2: (Long, Double)) => + (g1._1 + g2._1, g1._2 + g2._2)) + g = g.outerJoinVertices(t0) { + (vid: Vid, vd: VT, msg: Option[(Long, Double)]) => + vd.bias = msg.get._2 / msg.get._1; vd.norm = 1.0 / scala.math.sqrt(msg.get._1) + vd } - // phase 2 - def mapF2(et: EdgeTriplet[VT, Double]): Iterator[(Vid, Msg)] = { + def mapTrainF(conf: SvdppConf, u: Double)(et: EdgeTriplet[VT, Double]): Iterator[(Vid, Msg)] = { assert(et.srcAttr != null && et.dstAttr != null) val (usr, itm) = (et.srcAttr, et.dstAttr) val (p, q) = (usr.v1, itm.v1) var pred = u + usr.bias + itm.bias + q.dotProduct(usr.v2) - pred = math.max(pred, minVal) - pred = math.min(pred, maxVal) + pred = math.max(pred, conf.minVal) + pred = math.min(pred, conf.maxVal) val err = et.attr - pred - val updateP = (q.mapMultiply(err)).subtract(p.mapMultiply(gamma7)) - val updateQ = (usr.v2.mapMultiply(err)).subtract(q.mapMultiply(gamma7)) - val updateY = (q.mapMultiply(err*usr.norm)).subtract((itm.v2).mapMultiply(gamma7)) - Iterator((et.srcId, new Msg(updateP, updateY, err - gamma6*usr.bias)), - (et.dstId, new Msg(updateQ, updateY, err - gamma6*itm.bias))) - } - def reduceF2(g1: Msg, g2: Msg):Msg = { - g1.v1 = g1.v1.add(g2.v1) - g1.v2 = g1.v2.add(g2.v2) - g1.bias += g2.bias - g1 - } - def updateF2(vid: Vid, vd: VT, msg: Option[Msg]) = { - if (msg.isDefined) { - vd.v1 = vd.v1.add(msg.get.v1.mapMultiply(gamma2)) - if (vid % 2 == 1) { // item nodes update y - vd.v2 = vd.v2.add(msg.get.v2.mapMultiply(gamma2)) - } - vd.bias += msg.get.bias*gamma1 - } - vd + val updateP = ((q.mapMultiply(err)).subtract(p.mapMultiply(conf.gamma7))).mapMultiply(conf.gamma2) + val updateQ = ((usr.v2.mapMultiply(err)).subtract(q.mapMultiply(conf.gamma7))).mapMultiply(conf.gamma2) + val updateY = ((q.mapMultiply(err * usr.norm)).subtract((itm.v2).mapMultiply(conf.gamma7))).mapMultiply(conf.gamma2) + Iterator((et.srcId, new Msg(updateP, updateY, (err - conf.gamma6 * usr.bias) * conf.gamma1)), + (et.dstId, new Msg(updateQ, updateY, (err - conf.gamma6 * itm.bias) * conf.gamma1))) } - for (i <- 0 until maxIters) { + for (i <- 0 until conf.maxIters) { // phase 1, calculate v2 for user nodes - val t1: VertexRDD[RealVector] = g.mapReduceTriplets(mapF1, reduceF1) - g.outerJoinVertices(t1) {updateF1} + var t1 = g.mapReduceTriplets(et => + Iterator((et.srcId, et.dstAttr.v2)), + (g1: RealVector, g2: RealVector) => g1.add(g2)) + g = g.outerJoinVertices(t1) { (vid: Vid, vd: VT, msg: Option[RealVector]) => + if (msg.isDefined) vd.v2 = vd.v1.add(msg.get.mapMultiply(vd.norm)) + vd + } // phase 2, update p for user nodes and q, y for item nodes - val t2: VertexRDD[Msg] = g.mapReduceTriplets(mapF2, reduceF2) - g.outerJoinVertices(t2) {updateF2} + val t2: VertexRDD[Msg] = g.mapReduceTriplets(mapTrainF(conf, u), (g1: Msg, g2: Msg) => { + g1.v1 = g1.v1.add(g2.v1) + g1.v2 = g1.v2.add(g2.v2) + g1.bias += g2.bias + g1 + }) + g = g.outerJoinVertices(t2) { (vid: Vid, vd: VT, msg: Option[Msg]) => + vd.v1 = vd.v1.add(msg.get.v1) + if (vid % 2 == 1) vd.v2 = vd.v2.add(msg.get.v2) + vd.bias += msg.get.bias + vd + } } // calculate error on training set - def mapF3(et: EdgeTriplet[VT, Double]): Iterator[(Vid, Double)] = { + def mapTestF(conf: SvdppConf, u: Double)(et: EdgeTriplet[VT, Double]): Iterator[(Vid, Double)] = { assert(et.srcAttr != null && et.dstAttr != null) val (usr, itm) = (et.srcAttr, et.dstAttr) val (p, q) = (usr.v1, itm.v1) var pred = u + usr.bias + itm.bias + q.dotProduct(usr.v2) - pred = math.max(pred, minVal) - pred = math.min(pred, maxVal) - val err = (et.attr - pred)*(et.attr - pred) + pred = math.max(pred, conf.minVal) + pred = math.min(pred, conf.maxVal) + val err = (et.attr - pred) * (et.attr - pred) Iterator((et.dstId, err)) } - def updateF3(vid: Vid, vd: VT, msg: Option[Double]) = { - if (msg.isDefined && vid % 2 == 1) { // item nodes sum up the errors - vd.norm = msg.get - } + val t3: VertexRDD[Double] = g.mapReduceTriplets(mapTestF(conf, u), _ + _) + g.outerJoinVertices(t3) { (vid: Vid, vd: VT, msg: Option[Double]) => + if (msg.isDefined && vid % 2 == 1) vd.norm = msg.get // item nodes sum up the errors vd } - val t3: VertexRDD[Double] = g.mapReduceTriplets(mapF3, _ + _) - g.outerJoinVertices(t3) {updateF3} - g + g } } -- cgit v1.2.3 From 600421d8bc258508d0e298131384d0abed4b87b7 Mon Sep 17 00:00:00 2001 From: Jianping J Wang Date: Mon, 30 Dec 2013 23:42:55 +0800 Subject: update --- .../test/scala/org/apache/spark/graph/algorithms/SvdppSuite.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'graph/src') diff --git a/graph/src/test/scala/org/apache/spark/graph/algorithms/SvdppSuite.scala b/graph/src/test/scala/org/apache/spark/graph/algorithms/SvdppSuite.scala index 4ea675c2dc..46dee4e9a0 100644 --- a/graph/src/test/scala/org/apache/spark/graph/algorithms/SvdppSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/algorithms/SvdppSuite.scala @@ -13,12 +13,13 @@ class SvdppSuite extends FunSuite with LocalSparkContext { test("Test SVD++ with mean square error on training set") { withSpark { sc => - val SvdppErr = 0.01 + val SvdppErr = 0.2 val edges = sc.textFile("mllib/data/als/test.data").map { line => val fields = line.split(",") Edge(fields(0).toLong * 2, fields(1).toLong * 2 + 1, fields(2).toDouble) } - val graph = Svdpp.run(edges) + val conf = new SvdppConf(10, 2, 0.0, 5.0, 0.007, 0.007, 0.005, 0.015) + val graph = Svdpp.run(edges, conf) val err = graph.vertices.collect.map{ case (vid, vd) => if (vid % 2 == 1) { vd.norm } else { 0.0 } }.reduce(_ + _) / graph.triplets.collect.size -- cgit v1.2.3 From 779c66ae4ee681f9cf8ab85cd48f4761ee49e031 Mon Sep 17 00:00:00 2001 From: Jianping J Wang Date: Tue, 31 Dec 2013 16:59:05 +0800 Subject: refactor and fix bugs --- .../org/apache/spark/graph/algorithms/Svdpp.scala | 87 ++++++++-------------- 1 file changed, 29 insertions(+), 58 deletions(-) (limited to 'graph/src') diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala b/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala index cbbe240c90..7c3e0c83c9 100644 --- a/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala +++ b/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala @@ -5,18 +5,6 @@ import org.apache.spark.graph._ import scala.util.Random import org.apache.commons.math.linear._ -class VT( // vertex type - var v1: RealVector, // v1: p for user node, q for item node - var v2: RealVector, // v2: pu + |N(u)|^(-0.5)*sum(y) for user node, y for item node - var bias: Double, - var norm: Double // |N(u)|^(-0.5) for user node - ) extends Serializable - -class Msg( // message - var v1: RealVector, - var v2: RealVector, - var bias: Double) extends Serializable - class SvdppConf( // Svdpp parameters var rank: Int, var maxIters: Int, @@ -40,92 +28,75 @@ object Svdpp { * @return a graph with vertex attributes containing the trained model */ - def run(edges: RDD[Edge[Double]], conf: SvdppConf): Graph[VT, Double] = { + def run(edges: RDD[Edge[Double]], conf: SvdppConf): Graph[(RealVector, RealVector, Double, Double), Double] = { // generate default vertex attribute - def defaultF(rank: Int) = { + def defaultF(rank: Int): (RealVector, RealVector, Double, Double) = { val v1 = new ArrayRealVector(rank) val v2 = new ArrayRealVector(rank) for (i <- 0 until rank) { v1.setEntry(i, Random.nextDouble) v2.setEntry(i, Random.nextDouble) } - var vd = new VT(v1, v2, 0.0, 0.0) - vd + (v1, v2, 0.0, 0.0) } // calculate global rating mean val (rs, rc) = edges.map(e => (e.attr, 1L)).reduce((a, b) => (a._1 + b._1, a._2 + b._2)) - val u = rs / rc // global rating mean + val u = rs / rc // construct graph var g = Graph.fromEdges(edges, defaultF(conf.rank)).cache() // calculate initial bias and norm - var t0: VertexRDD[(Long, Double)] = g.mapReduceTriplets(et => - Iterator((et.srcId, (1L, et.attr)), (et.dstId, (1L, et.attr))), - (g1: (Long, Double), g2: (Long, Double)) => - (g1._1 + g2._1, g1._2 + g2._2)) - g = g.outerJoinVertices(t0) { - (vid: Vid, vd: VT, msg: Option[(Long, Double)]) => - vd.bias = msg.get._2 / msg.get._1; vd.norm = 1.0 / scala.math.sqrt(msg.get._1) - vd + var t0 = g.mapReduceTriplets(et => + Iterator((et.srcId, (1L, et.attr)), (et.dstId, (1L, et.attr))), (g1: (Long, Double), g2: (Long, Double)) => (g1._1 + g2._1, g1._2 + g2._2)) + g = g.outerJoinVertices(t0) { (vid: Vid, vd: (RealVector, RealVector, Double, Double), msg: Option[(Long, Double)]) => + (vd._1, vd._2, msg.get._2 / msg.get._1, 1.0 / scala.math.sqrt(msg.get._1)) } - def mapTrainF(conf: SvdppConf, u: Double)(et: EdgeTriplet[VT, Double]): Iterator[(Vid, Msg)] = { - assert(et.srcAttr != null && et.dstAttr != null) + def mapTrainF(conf: SvdppConf, u: Double)(et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double]) + : Iterator[(Vid, (RealVector, RealVector, Double))] = { val (usr, itm) = (et.srcAttr, et.dstAttr) - val (p, q) = (usr.v1, itm.v1) - var pred = u + usr.bias + itm.bias + q.dotProduct(usr.v2) + val (p, q) = (usr._1, itm._1) + var pred = u + usr._3 + itm._3 + q.dotProduct(usr._2) pred = math.max(pred, conf.minVal) pred = math.min(pred, conf.maxVal) val err = et.attr - pred val updateP = ((q.mapMultiply(err)).subtract(p.mapMultiply(conf.gamma7))).mapMultiply(conf.gamma2) - val updateQ = ((usr.v2.mapMultiply(err)).subtract(q.mapMultiply(conf.gamma7))).mapMultiply(conf.gamma2) - val updateY = ((q.mapMultiply(err * usr.norm)).subtract((itm.v2).mapMultiply(conf.gamma7))).mapMultiply(conf.gamma2) - Iterator((et.srcId, new Msg(updateP, updateY, (err - conf.gamma6 * usr.bias) * conf.gamma1)), - (et.dstId, new Msg(updateQ, updateY, (err - conf.gamma6 * itm.bias) * conf.gamma1))) + val updateQ = ((usr._2.mapMultiply(err)).subtract(q.mapMultiply(conf.gamma7))).mapMultiply(conf.gamma2) + val updateY = ((q.mapMultiply(err * usr._4)).subtract((itm._2).mapMultiply(conf.gamma7))).mapMultiply(conf.gamma2) + Iterator((et.srcId, (updateP, updateY, (err - conf.gamma6 * usr._3) * conf.gamma1)), + (et.dstId, (updateQ, updateY, (err - conf.gamma6 * itm._3) * conf.gamma1))) } for (i <- 0 until conf.maxIters) { // phase 1, calculate v2 for user nodes - var t1 = g.mapReduceTriplets(et => - Iterator((et.srcId, et.dstAttr.v2)), - (g1: RealVector, g2: RealVector) => g1.add(g2)) - g = g.outerJoinVertices(t1) { (vid: Vid, vd: VT, msg: Option[RealVector]) => - if (msg.isDefined) vd.v2 = vd.v1.add(msg.get.mapMultiply(vd.norm)) - vd + var t1 = g.mapReduceTriplets(et => Iterator((et.srcId, et.dstAttr._2)), (g1: RealVector, g2: RealVector) => g1.add(g2)) + g = g.outerJoinVertices(t1) { (vid: Vid, vd: (RealVector, RealVector, Double, Double), msg: Option[RealVector]) => + if (msg.isDefined) (vd._1, vd._1.add(msg.get.mapMultiply(vd._4)), vd._3, vd._4) else vd } // phase 2, update p for user nodes and q, y for item nodes - val t2: VertexRDD[Msg] = g.mapReduceTriplets(mapTrainF(conf, u), (g1: Msg, g2: Msg) => { - g1.v1 = g1.v1.add(g2.v1) - g1.v2 = g1.v2.add(g2.v2) - g1.bias += g2.bias - g1 - }) - g = g.outerJoinVertices(t2) { (vid: Vid, vd: VT, msg: Option[Msg]) => - vd.v1 = vd.v1.add(msg.get.v1) - if (vid % 2 == 1) vd.v2 = vd.v2.add(msg.get.v2) - vd.bias += msg.get.bias - vd + val t2 = g.mapReduceTriplets(mapTrainF(conf, u), (g1: (RealVector, RealVector, Double), g2: (RealVector, RealVector, Double)) => + (g1._1.add(g2._1), g1._2.add(g2._2), g1._3 + g2._3)) + g = g.outerJoinVertices(t2) { (vid: Vid, vd: (RealVector, RealVector, Double, Double), msg: Option[(RealVector, RealVector, Double)]) => + (vd._1.add(msg.get._1), vd._2.add(msg.get._2), vd._3 + msg.get._3, vd._4) } } // calculate error on training set - def mapTestF(conf: SvdppConf, u: Double)(et: EdgeTriplet[VT, Double]): Iterator[(Vid, Double)] = { - assert(et.srcAttr != null && et.dstAttr != null) + def mapTestF(conf: SvdppConf, u: Double)(et: EdgeTriplet[(RealVector, RealVector, Double, Double), Double]): Iterator[(Vid, Double)] = { val (usr, itm) = (et.srcAttr, et.dstAttr) - val (p, q) = (usr.v1, itm.v1) - var pred = u + usr.bias + itm.bias + q.dotProduct(usr.v2) + val (p, q) = (usr._1, itm._1) + var pred = u + usr._3 + itm._3 + q.dotProduct(usr._2) pred = math.max(pred, conf.minVal) pred = math.min(pred, conf.maxVal) val err = (et.attr - pred) * (et.attr - pred) Iterator((et.dstId, err)) } - val t3: VertexRDD[Double] = g.mapReduceTriplets(mapTestF(conf, u), _ + _) - g.outerJoinVertices(t3) { (vid: Vid, vd: VT, msg: Option[Double]) => - if (msg.isDefined && vid % 2 == 1) vd.norm = msg.get // item nodes sum up the errors - vd + val t3 = g.mapReduceTriplets(mapTestF(conf, u), (g1: Double, g2: Double) => g1 + g2) + g.outerJoinVertices(t3) { (vid: Vid, vd: (RealVector, RealVector, Double, Double), msg: Option[Double]) => + if (msg.isDefined && vid % 2 == 1) (vd._1, vd._2, vd._3, msg.get) else vd } g } -- cgit v1.2.3 From 4a30f69b25239d0e7a4a27478be261e17c1dba1a Mon Sep 17 00:00:00 2001 From: Jianping J Wang Date: Tue, 31 Dec 2013 17:00:59 +0800 Subject: update svdpp test --- graph/src/test/scala/org/apache/spark/graph/algorithms/SvdppSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'graph/src') diff --git a/graph/src/test/scala/org/apache/spark/graph/algorithms/SvdppSuite.scala b/graph/src/test/scala/org/apache/spark/graph/algorithms/SvdppSuite.scala index 46dee4e9a0..c02a2d8398 100644 --- a/graph/src/test/scala/org/apache/spark/graph/algorithms/SvdppSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/algorithms/SvdppSuite.scala @@ -21,7 +21,7 @@ class SvdppSuite extends FunSuite with LocalSparkContext { val conf = new SvdppConf(10, 2, 0.0, 5.0, 0.007, 0.007, 0.005, 0.015) val graph = Svdpp.run(edges, conf) val err = graph.vertices.collect.map{ case (vid, vd) => - if (vid % 2 == 1) { vd.norm } else { 0.0 } + if (vid % 2 == 1) { vd._4 } else { 0.0 } }.reduce(_ + _) / graph.triplets.collect.size assert(err < SvdppErr) } -- cgit v1.2.3 From ab7b8ce13ec3a0a85f57ceede02df1e5eb9f6c23 Mon Sep 17 00:00:00 2001 From: Jianping J Wang Date: Tue, 31 Dec 2013 17:11:20 +0800 Subject: Update Svdpp.scala --- graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'graph/src') diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala b/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala index 7c3e0c83c9..cb1a69e318 100644 --- a/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala +++ b/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala @@ -71,7 +71,7 @@ object Svdpp { } for (i <- 0 until conf.maxIters) { - // phase 1, calculate v2 for user nodes + // phase 1, calculate pu + |N(u)|^(-0.5)*sum(y) for user nodes var t1 = g.mapReduceTriplets(et => Iterator((et.srcId, et.dstAttr._2)), (g1: RealVector, g2: RealVector) => g1.add(g2)) g = g.outerJoinVertices(t1) { (vid: Vid, vd: (RealVector, RealVector, Double, Double), msg: Option[RealVector]) => if (msg.isDefined) (vd._1, vd._1.add(msg.get.mapMultiply(vd._4)), vd._3, vd._4) else vd -- cgit v1.2.3 From 12c26d7fb912f07a9e4a9a3de532e0c53bc667a9 Mon Sep 17 00:00:00 2001 From: Jianping J Wang Date: Tue, 31 Dec 2013 17:18:15 +0800 Subject: Update Svdpp.scala --- graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'graph/src') diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala b/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala index cb1a69e318..e7e8d00d9a 100644 --- a/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala +++ b/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala @@ -96,7 +96,7 @@ object Svdpp { } val t3 = g.mapReduceTriplets(mapTestF(conf, u), (g1: Double, g2: Double) => g1 + g2) g.outerJoinVertices(t3) { (vid: Vid, vd: (RealVector, RealVector, Double, Double), msg: Option[Double]) => - if (msg.isDefined && vid % 2 == 1) (vd._1, vd._2, vd._3, msg.get) else vd + if (msg.isDefined) (vd._1, vd._2, vd._3, msg.get) else vd } g } -- cgit v1.2.3 From 61e6671f5abbbd0a96cc7359ea6302b84e6e9248 Mon Sep 17 00:00:00 2001 From: Jianping J Wang Date: Tue, 31 Dec 2013 22:01:02 +0800 Subject: fix test bug --- graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) (limited to 'graph/src') diff --git a/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala b/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala index e7e8d00d9a..18395bdc5f 100644 --- a/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala +++ b/graph/src/main/scala/org/apache/spark/graph/algorithms/Svdpp.scala @@ -28,7 +28,7 @@ object Svdpp { * @return a graph with vertex attributes containing the trained model */ - def run(edges: RDD[Edge[Double]], conf: SvdppConf): Graph[(RealVector, RealVector, Double, Double), Double] = { + def run(edges: RDD[Edge[Double]], conf: SvdppConf): (Graph[(RealVector, RealVector, Double, Double), Double], Double) = { // generate default vertex attribute def defaultF(rank: Int): (RealVector, RealVector, Double, Double) = { @@ -95,9 +95,9 @@ object Svdpp { Iterator((et.dstId, err)) } val t3 = g.mapReduceTriplets(mapTestF(conf, u), (g1: Double, g2: Double) => g1 + g2) - g.outerJoinVertices(t3) { (vid: Vid, vd: (RealVector, RealVector, Double, Double), msg: Option[Double]) => + g = g.outerJoinVertices(t3) { (vid: Vid, vd: (RealVector, RealVector, Double, Double), msg: Option[Double]) => if (msg.isDefined) (vd._1, vd._2, vd._3, msg.get) else vd } - g + (g, u) } } -- cgit v1.2.3 From 6e50df6255dbe25c880dced3d15a77241eba803d Mon Sep 17 00:00:00 2001 From: Jianping J Wang Date: Tue, 31 Dec 2013 22:02:16 +0800 Subject: Update SvdppSuite.scala --- .../scala/org/apache/spark/graph/algorithms/SvdppSuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'graph/src') diff --git a/graph/src/test/scala/org/apache/spark/graph/algorithms/SvdppSuite.scala b/graph/src/test/scala/org/apache/spark/graph/algorithms/SvdppSuite.scala index c02a2d8398..411dd3d336 100644 --- a/graph/src/test/scala/org/apache/spark/graph/algorithms/SvdppSuite.scala +++ b/graph/src/test/scala/org/apache/spark/graph/algorithms/SvdppSuite.scala @@ -13,17 +13,17 @@ class SvdppSuite extends FunSuite with LocalSparkContext { test("Test SVD++ with mean square error on training set") { withSpark { sc => - val SvdppErr = 0.2 + val SvdppErr = 8.0 val edges = sc.textFile("mllib/data/als/test.data").map { line => val fields = line.split(",") Edge(fields(0).toLong * 2, fields(1).toLong * 2 + 1, fields(2).toDouble) } - val conf = new SvdppConf(10, 2, 0.0, 5.0, 0.007, 0.007, 0.005, 0.015) - val graph = Svdpp.run(edges, conf) + val conf = new SvdppConf(10, 2, 0.0, 5.0, 0.007, 0.007, 0.005, 0.015) // 2 iterations + var (graph, u) = Svdpp.run(edges, conf) val err = graph.vertices.collect.map{ case (vid, vd) => - if (vid % 2 == 1) { vd._4 } else { 0.0 } + if (vid % 2 == 1) vd._4 else 0.0 }.reduce(_ + _) / graph.triplets.collect.size - assert(err < SvdppErr) + assert(err <= SvdppErr) } } -- cgit v1.2.3