aboutsummaryrefslogtreecommitdiff
path: root/graphx
diff options
context:
space:
mode:
authorXiangrui Meng <meng@databricks.com>2015-03-12 01:39:04 -0700
committerXiangrui Meng <meng@databricks.com>2015-03-12 01:39:04 -0700
commit0cba802adf15f5ab8da24dd1e8a5e7214cc4e148 (patch)
treea5dc95dc4f0fbf156cacffc6af130666711b3f98 /graphx
parent712679a7b447346a365b38574d7a86d56a93f767 (diff)
downloadspark-0cba802adf15f5ab8da24dd1e8a5e7214cc4e148.tar.gz
spark-0cba802adf15f5ab8da24dd1e8a5e7214cc4e148.tar.bz2
spark-0cba802adf15f5ab8da24dd1e8a5e7214cc4e148.zip
[SPARK-5814][MLLIB][GRAPHX] Remove JBLAS from runtime
The issue is discussed in https://issues.apache.org/jira/browse/SPARK-5669. Replacing all JBLAS usage by netlib-java gives us a simpler dependency tree and less license issues to worry about. I didn't touch the test scope in this PR. The user guide is not modified to avoid merge conflicts with branch-1.3. srowen ankurdave pwendell Author: Xiangrui Meng <meng@databricks.com> Closes #4699 from mengxr/SPARK-5814 and squashes the following commits: 48635c6 [Xiangrui Meng] move netlib-java version to parent pom ca21c74 [Xiangrui Meng] remove jblas from ml-guide 5f7767a [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into SPARK-5814 c5c4183 [Xiangrui Meng] merge master 0f20cad [Xiangrui Meng] add mima excludes e53e9f4 [Xiangrui Meng] remove jblas from mllib runtime ceaa14d [Xiangrui Meng] replace jblas by netlib-java in graphx fa7c2ca [Xiangrui Meng] move jblas to test scope
Diffstat (limited to 'graphx')
-rw-r--r--graphx/pom.xml11
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala96
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/lib/SVDPlusPlusSuite.scala6
3 files changed, 70 insertions, 43 deletions
diff --git a/graphx/pom.xml b/graphx/pom.xml
index 57e338c03e..c0d534e185 100644
--- a/graphx/pom.xml
+++ b/graphx/pom.xml
@@ -45,9 +45,14 @@
<artifactId>guava</artifactId>
</dependency>
<dependency>
- <groupId>org.jblas</groupId>
- <artifactId>jblas</artifactId>
- <version>${jblas.version}</version>
+ <groupId>com.github.fommil.netlib</groupId>
+ <artifactId>core</artifactId>
+ <version>${netlib.java.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>net.sourceforge.f2j</groupId>
+ <artifactId>arpack_combined_all</artifactId>
+ <version>0.1</version>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
index 3e4157a63f..1a7178b82e 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
@@ -18,7 +18,9 @@
package org.apache.spark.graphx.lib
import scala.util.Random
-import org.jblas.DoubleMatrix
+
+import com.github.fommil.netlib.BLAS.{getInstance => blas}
+
import org.apache.spark.rdd._
import org.apache.spark.graphx._
@@ -53,7 +55,7 @@ object SVDPlusPlus {
* a Multifaceted Collaborative Filtering Model",
* available at [[http://public.research.att.com/~volinsky/netflix/kdd08koren.pdf]].
*
- * The prediction rule is rui = u + bu + bi + qi*(pu + |N(u)|^(-0.5)*sum(y)),
+ * The prediction rule is rui = u + bu + bi + qi*(pu + |N(u)|^^-0.5^^*sum(y)),
* see the details on page 6.
*
* @param edges edges for constructing the graph
@@ -66,13 +68,10 @@ object SVDPlusPlus {
: (Graph[(Array[Double], Array[Double], Double, Double), Double], Double) =
{
// Generate default vertex attribute
- def defaultF(rank: Int): (DoubleMatrix, DoubleMatrix, Double, Double) = {
- val v1 = new DoubleMatrix(rank)
- val v2 = new DoubleMatrix(rank)
- for (i <- 0 until rank) {
- v1.put(i, Random.nextDouble())
- v2.put(i, Random.nextDouble())
- }
+ def defaultF(rank: Int): (Array[Double], Array[Double], Double, Double) = {
+ // TODO: use a fixed random seed
+ val v1 = Array.fill(rank)(Random.nextDouble())
+ val v2 = Array.fill(rank)(Random.nextDouble())
(v1, v2, 0.0, 0.0)
}
@@ -92,7 +91,7 @@ object SVDPlusPlus {
(g1, g2) => (g1._1 + g2._1, g1._2 + g2._2))
val gJoinT0 = g.outerJoinVertices(t0) {
- (vid: VertexId, vd: (DoubleMatrix, DoubleMatrix, Double, Double),
+ (vid: VertexId, vd: (Array[Double], Array[Double], Double, Double),
msg: Option[(Long, Double)]) =>
(vd._1, vd._2, msg.get._2 / msg.get._1, 1.0 / scala.math.sqrt(msg.get._1))
}.cache()
@@ -102,24 +101,28 @@ object SVDPlusPlus {
def sendMsgTrainF(conf: Conf, u: Double)
(ctx: EdgeContext[
- (DoubleMatrix, DoubleMatrix, Double, Double),
+ (Array[Double], Array[Double], Double, Double),
Double,
- (DoubleMatrix, DoubleMatrix, Double)]) {
+ (Array[Double], Array[Double], Double)]) {
val (usr, itm) = (ctx.srcAttr, ctx.dstAttr)
val (p, q) = (usr._1, itm._1)
- var pred = u + usr._3 + itm._3 + q.dot(usr._2)
+ val rank = p.length
+ var pred = u + usr._3 + itm._3 + blas.ddot(rank, q, 1, usr._2, 1)
pred = math.max(pred, conf.minVal)
pred = math.min(pred, conf.maxVal)
val err = ctx.attr - pred
- val updateP = q.mul(err)
- .subColumnVector(p.mul(conf.gamma7))
- .mul(conf.gamma2)
- val updateQ = usr._2.mul(err)
- .subColumnVector(q.mul(conf.gamma7))
- .mul(conf.gamma2)
- val updateY = q.mul(err * usr._4)
- .subColumnVector(itm._2.mul(conf.gamma7))
- .mul(conf.gamma2)
+ // updateP = (err * q - conf.gamma7 * p) * conf.gamma2
+ val updateP = q.clone()
+ blas.dscal(rank, err * conf.gamma2, updateP, 1)
+ blas.daxpy(rank, -conf.gamma7 * conf.gamma2, p, 1, updateP, 1)
+ // updateQ = (err * usr._2 - conf.gamma7 * q) * conf.gamma2
+ val updateQ = usr._2.clone()
+ blas.dscal(rank, err * conf.gamma2, updateQ, 1)
+ blas.daxpy(rank, -conf.gamma7 * conf.gamma2, q, 1, updateQ, 1)
+ // updateY = (err * usr._4 * q - conf.gamma7 * itm._2) * conf.gamma2
+ val updateY = q.clone()
+ blas.dscal(rank, err * usr._4 * conf.gamma2, updateY, 1)
+ blas.daxpy(rank, -conf.gamma7 * conf.gamma2, itm._2, 1, updateY, 1)
ctx.sendToSrc((updateP, updateY, (err - conf.gamma6 * usr._3) * conf.gamma1))
ctx.sendToDst((updateQ, updateY, (err - conf.gamma6 * itm._3) * conf.gamma1))
}
@@ -127,14 +130,23 @@ object SVDPlusPlus {
for (i <- 0 until conf.maxIters) {
// Phase 1, calculate pu + |N(u)|^(-0.5)*sum(y) for user nodes
g.cache()
- val t1 = g.aggregateMessages[DoubleMatrix](
+ val t1 = g.aggregateMessages[Array[Double]](
ctx => ctx.sendToSrc(ctx.dstAttr._2),
- (g1, g2) => g1.addColumnVector(g2))
+ (g1, g2) => {
+ val out = g1.clone()
+ blas.daxpy(out.length, 1.0, g2, 1, out, 1)
+ out
+ })
val gJoinT1 = g.outerJoinVertices(t1) {
- (vid: VertexId, vd: (DoubleMatrix, DoubleMatrix, Double, Double),
- msg: Option[DoubleMatrix]) =>
- if (msg.isDefined) (vd._1, vd._1
- .addColumnVector(msg.get.mul(vd._4)), vd._3, vd._4) else vd
+ (vid: VertexId, vd: (Array[Double], Array[Double], Double, Double),
+ msg: Option[Array[Double]]) =>
+ if (msg.isDefined) {
+ val out = vd._1.clone()
+ blas.daxpy(out.length, vd._4, msg.get, 1, out, 1)
+ (vd._1, out, vd._3, vd._4)
+ } else {
+ vd
+ }
}.cache()
materialize(gJoinT1)
g.unpersist()
@@ -144,14 +156,24 @@ object SVDPlusPlus {
g.cache()
val t2 = g.aggregateMessages(
sendMsgTrainF(conf, u),
- (g1: (DoubleMatrix, DoubleMatrix, Double), g2: (DoubleMatrix, DoubleMatrix, Double)) =>
- (g1._1.addColumnVector(g2._1), g1._2.addColumnVector(g2._2), g1._3 + g2._3))
+ (g1: (Array[Double], Array[Double], Double), g2: (Array[Double], Array[Double], Double)) =>
+ {
+ val out1 = g1._1.clone()
+ blas.daxpy(out1.length, 1.0, g2._1, 1, out1, 1)
+ val out2 = g2._2.clone()
+ blas.daxpy(out2.length, 1.0, g2._2, 1, out2, 1)
+ (out1, out2, g1._3 + g2._3)
+ })
val gJoinT2 = g.outerJoinVertices(t2) {
(vid: VertexId,
- vd: (DoubleMatrix, DoubleMatrix, Double, Double),
- msg: Option[(DoubleMatrix, DoubleMatrix, Double)]) =>
- (vd._1.addColumnVector(msg.get._1), vd._2.addColumnVector(msg.get._2),
- vd._3 + msg.get._3, vd._4)
+ vd: (Array[Double], Array[Double], Double, Double),
+ msg: Option[(Array[Double], Array[Double], Double)]) => {
+ val out1 = vd._1.clone()
+ blas.daxpy(out1.length, 1.0, msg.get._1, 1, out1, 1)
+ val out2 = vd._2.clone()
+ blas.daxpy(out2.length, 1.0, msg.get._2, 1, out2, 1)
+ (out1, out2, vd._3 + msg.get._3, vd._4)
+ }
}.cache()
materialize(gJoinT2)
g.unpersist()
@@ -160,10 +182,10 @@ object SVDPlusPlus {
// calculate error on training set
def sendMsgTestF(conf: Conf, u: Double)
- (ctx: EdgeContext[(DoubleMatrix, DoubleMatrix, Double, Double), Double, Double]) {
+ (ctx: EdgeContext[(Array[Double], Array[Double], Double, Double), Double, Double]) {
val (usr, itm) = (ctx.srcAttr, ctx.dstAttr)
val (p, q) = (usr._1, itm._1)
- var pred = u + usr._3 + itm._3 + q.dot(usr._2)
+ var pred = u + usr._3 + itm._3 + blas.ddot(q.length, q, 1, usr._2, 1)
pred = math.max(pred, conf.minVal)
pred = math.min(pred, conf.maxVal)
val err = (ctx.attr - pred) * (ctx.attr - pred)
@@ -173,7 +195,7 @@ object SVDPlusPlus {
g.cache()
val t3 = g.aggregateMessages[Double](sendMsgTestF(conf, u), _ + _)
val gJoinT3 = g.outerJoinVertices(t3) {
- (vid: VertexId, vd: (DoubleMatrix, DoubleMatrix, Double, Double), msg: Option[Double]) =>
+ (vid: VertexId, vd: (Array[Double], Array[Double], Double, Double), msg: Option[Double]) =>
if (msg.isDefined) (vd._1, vd._2, vd._3, msg.get) else vd
}.cache()
materialize(gJoinT3)
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/SVDPlusPlusSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/SVDPlusPlusSuite.scala
index 9987a4b1a3..7bd6b7f3c4 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/SVDPlusPlusSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/SVDPlusPlusSuite.scala
@@ -32,11 +32,11 @@ class SVDPlusPlusSuite extends FunSuite with LocalSparkContext {
Edge(fields(0).toLong * 2, fields(1).toLong * 2 + 1, fields(2).toDouble)
}
val conf = new SVDPlusPlus.Conf(10, 2, 0.0, 5.0, 0.007, 0.007, 0.005, 0.015) // 2 iterations
- var (graph, u) = SVDPlusPlus.runSVDPlusPlus(edges, conf)
+ val (graph, _) = SVDPlusPlus.run(edges, conf)
graph.cache()
- val err = graph.vertices.collect().map{ case (vid, vd) =>
+ val err = graph.vertices.map { case (vid, vd) =>
if (vid % 2 == 1) vd._4 else 0.0
- }.reduce(_ + _) / graph.triplets.collect().size
+ }.reduce(_ + _) / graph.numEdges
assert(err <= svdppErr)
}
}