aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorXiangrui Meng <meng@databricks.com>2015-03-12 01:39:04 -0700
committerXiangrui Meng <meng@databricks.com>2015-03-12 01:39:04 -0700
commit0cba802adf15f5ab8da24dd1e8a5e7214cc4e148 (patch)
treea5dc95dc4f0fbf156cacffc6af130666711b3f98
parent712679a7b447346a365b38574d7a86d56a93f767 (diff)
downloadspark-0cba802adf15f5ab8da24dd1e8a5e7214cc4e148.tar.gz
spark-0cba802adf15f5ab8da24dd1e8a5e7214cc4e148.tar.bz2
spark-0cba802adf15f5ab8da24dd1e8a5e7214cc4e148.zip
[SPARK-5814][MLLIB][GRAPHX] Remove JBLAS from runtime
The issue is discussed in https://issues.apache.org/jira/browse/SPARK-5669. Replacing all JBLAS usage by netlib-java gives us a simpler dependency tree and less license issues to worry about. I didn't touch the test scope in this PR. The user guide is not modified to avoid merge conflicts with branch-1.3. srowen ankurdave pwendell Author: Xiangrui Meng <meng@databricks.com> Closes #4699 from mengxr/SPARK-5814 and squashes the following commits: 48635c6 [Xiangrui Meng] move netlib-java version to parent pom ca21c74 [Xiangrui Meng] remove jblas from ml-guide 5f7767a [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into SPARK-5814 c5c4183 [Xiangrui Meng] merge master 0f20cad [Xiangrui Meng] add mima excludes e53e9f4 [Xiangrui Meng] remove jblas from mllib runtime ceaa14d [Xiangrui Meng] replace jblas by netlib-java in graphx fa7c2ca [Xiangrui Meng] move jblas to test scope
-rw-r--r--assembly/pom.xml10
-rw-r--r--docs/mllib-guide.md5
-rw-r--r--graphx/pom.xml11
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala96
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/lib/SVDPlusPlusSuite.scala6
-rw-r--r--mllib/pom.xml3
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala14
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLS.scala86
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala15
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala9
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala26
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala7
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala6
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/stat/KernelDensitySuite.scala4
-rw-r--r--pom.xml1
-rw-r--r--project/MimaExcludes.scala28
16 files changed, 183 insertions, 144 deletions
diff --git a/assembly/pom.xml b/assembly/pom.xml
index cbf5b6c4aa..d3bb4bde0c 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -114,16 +114,6 @@
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
- <filter>
- <!-- Exclude libgfortran, libgcc for license issues -->
- <artifact>org.jblas:jblas</artifact>
- <excludes>
- <!-- Linux amd64 is OK; not statically linked -->
- <exclude>lib/static/Linux/i386/**</exclude>
- <exclude>lib/static/Mac OS X/**</exclude>
- <exclude>lib/static/Windows/**</exclude>
- </excludes>
- </filter>
</filters>
</configuration>
<executions>
diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md
index 4c7a7d9115..598374f66d 100644
--- a/docs/mllib-guide.md
+++ b/docs/mllib-guide.md
@@ -80,11 +80,6 @@ include `netlib-java`'s native proxies by default. To configure
[netlib-java](https://github.com/fommil/netlib-java) documentation for
your platform's additional installation instructions.
-MLlib also uses [jblas](https://github.com/mikiobraun/jblas) which
-will require you to install the
-[gfortran runtime library](https://github.com/mikiobraun/jblas/wiki/Missing-Libraries)
-if it is not already present on your nodes.
-
To use MLlib in Python, you will need [NumPy](http://www.numpy.org)
version 1.4 or newer.
diff --git a/graphx/pom.xml b/graphx/pom.xml
index 57e338c03e..c0d534e185 100644
--- a/graphx/pom.xml
+++ b/graphx/pom.xml
@@ -45,9 +45,14 @@
<artifactId>guava</artifactId>
</dependency>
<dependency>
- <groupId>org.jblas</groupId>
- <artifactId>jblas</artifactId>
- <version>${jblas.version}</version>
+ <groupId>com.github.fommil.netlib</groupId>
+ <artifactId>core</artifactId>
+ <version>${netlib.java.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>net.sourceforge.f2j</groupId>
+ <artifactId>arpack_combined_all</artifactId>
+ <version>0.1</version>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
index 3e4157a63f..1a7178b82e 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/SVDPlusPlus.scala
@@ -18,7 +18,9 @@
package org.apache.spark.graphx.lib
import scala.util.Random
-import org.jblas.DoubleMatrix
+
+import com.github.fommil.netlib.BLAS.{getInstance => blas}
+
import org.apache.spark.rdd._
import org.apache.spark.graphx._
@@ -53,7 +55,7 @@ object SVDPlusPlus {
* a Multifaceted Collaborative Filtering Model",
* available at [[http://public.research.att.com/~volinsky/netflix/kdd08koren.pdf]].
*
- * The prediction rule is rui = u + bu + bi + qi*(pu + |N(u)|^(-0.5)*sum(y)),
+ * The prediction rule is rui = u + bu + bi + qi*(pu + |N(u)|^^-0.5^^*sum(y)),
* see the details on page 6.
*
* @param edges edges for constructing the graph
@@ -66,13 +68,10 @@ object SVDPlusPlus {
: (Graph[(Array[Double], Array[Double], Double, Double), Double], Double) =
{
// Generate default vertex attribute
- def defaultF(rank: Int): (DoubleMatrix, DoubleMatrix, Double, Double) = {
- val v1 = new DoubleMatrix(rank)
- val v2 = new DoubleMatrix(rank)
- for (i <- 0 until rank) {
- v1.put(i, Random.nextDouble())
- v2.put(i, Random.nextDouble())
- }
+ def defaultF(rank: Int): (Array[Double], Array[Double], Double, Double) = {
+ // TODO: use a fixed random seed
+ val v1 = Array.fill(rank)(Random.nextDouble())
+ val v2 = Array.fill(rank)(Random.nextDouble())
(v1, v2, 0.0, 0.0)
}
@@ -92,7 +91,7 @@ object SVDPlusPlus {
(g1, g2) => (g1._1 + g2._1, g1._2 + g2._2))
val gJoinT0 = g.outerJoinVertices(t0) {
- (vid: VertexId, vd: (DoubleMatrix, DoubleMatrix, Double, Double),
+ (vid: VertexId, vd: (Array[Double], Array[Double], Double, Double),
msg: Option[(Long, Double)]) =>
(vd._1, vd._2, msg.get._2 / msg.get._1, 1.0 / scala.math.sqrt(msg.get._1))
}.cache()
@@ -102,24 +101,28 @@ object SVDPlusPlus {
def sendMsgTrainF(conf: Conf, u: Double)
(ctx: EdgeContext[
- (DoubleMatrix, DoubleMatrix, Double, Double),
+ (Array[Double], Array[Double], Double, Double),
Double,
- (DoubleMatrix, DoubleMatrix, Double)]) {
+ (Array[Double], Array[Double], Double)]) {
val (usr, itm) = (ctx.srcAttr, ctx.dstAttr)
val (p, q) = (usr._1, itm._1)
- var pred = u + usr._3 + itm._3 + q.dot(usr._2)
+ val rank = p.length
+ var pred = u + usr._3 + itm._3 + blas.ddot(rank, q, 1, usr._2, 1)
pred = math.max(pred, conf.minVal)
pred = math.min(pred, conf.maxVal)
val err = ctx.attr - pred
- val updateP = q.mul(err)
- .subColumnVector(p.mul(conf.gamma7))
- .mul(conf.gamma2)
- val updateQ = usr._2.mul(err)
- .subColumnVector(q.mul(conf.gamma7))
- .mul(conf.gamma2)
- val updateY = q.mul(err * usr._4)
- .subColumnVector(itm._2.mul(conf.gamma7))
- .mul(conf.gamma2)
+ // updateP = (err * q - conf.gamma7 * p) * conf.gamma2
+ val updateP = q.clone()
+ blas.dscal(rank, err * conf.gamma2, updateP, 1)
+ blas.daxpy(rank, -conf.gamma7 * conf.gamma2, p, 1, updateP, 1)
+ // updateQ = (err * usr._2 - conf.gamma7 * q) * conf.gamma2
+ val updateQ = usr._2.clone()
+ blas.dscal(rank, err * conf.gamma2, updateQ, 1)
+ blas.daxpy(rank, -conf.gamma7 * conf.gamma2, q, 1, updateQ, 1)
+ // updateY = (err * usr._4 * q - conf.gamma7 * itm._2) * conf.gamma2
+ val updateY = q.clone()
+ blas.dscal(rank, err * usr._4 * conf.gamma2, updateY, 1)
+ blas.daxpy(rank, -conf.gamma7 * conf.gamma2, itm._2, 1, updateY, 1)
ctx.sendToSrc((updateP, updateY, (err - conf.gamma6 * usr._3) * conf.gamma1))
ctx.sendToDst((updateQ, updateY, (err - conf.gamma6 * itm._3) * conf.gamma1))
}
@@ -127,14 +130,23 @@ object SVDPlusPlus {
for (i <- 0 until conf.maxIters) {
// Phase 1, calculate pu + |N(u)|^(-0.5)*sum(y) for user nodes
g.cache()
- val t1 = g.aggregateMessages[DoubleMatrix](
+ val t1 = g.aggregateMessages[Array[Double]](
ctx => ctx.sendToSrc(ctx.dstAttr._2),
- (g1, g2) => g1.addColumnVector(g2))
+ (g1, g2) => {
+ val out = g1.clone()
+ blas.daxpy(out.length, 1.0, g2, 1, out, 1)
+ out
+ })
val gJoinT1 = g.outerJoinVertices(t1) {
- (vid: VertexId, vd: (DoubleMatrix, DoubleMatrix, Double, Double),
- msg: Option[DoubleMatrix]) =>
- if (msg.isDefined) (vd._1, vd._1
- .addColumnVector(msg.get.mul(vd._4)), vd._3, vd._4) else vd
+ (vid: VertexId, vd: (Array[Double], Array[Double], Double, Double),
+ msg: Option[Array[Double]]) =>
+ if (msg.isDefined) {
+ val out = vd._1.clone()
+ blas.daxpy(out.length, vd._4, msg.get, 1, out, 1)
+ (vd._1, out, vd._3, vd._4)
+ } else {
+ vd
+ }
}.cache()
materialize(gJoinT1)
g.unpersist()
@@ -144,14 +156,24 @@ object SVDPlusPlus {
g.cache()
val t2 = g.aggregateMessages(
sendMsgTrainF(conf, u),
- (g1: (DoubleMatrix, DoubleMatrix, Double), g2: (DoubleMatrix, DoubleMatrix, Double)) =>
- (g1._1.addColumnVector(g2._1), g1._2.addColumnVector(g2._2), g1._3 + g2._3))
+ (g1: (Array[Double], Array[Double], Double), g2: (Array[Double], Array[Double], Double)) =>
+ {
+ val out1 = g1._1.clone()
+ blas.daxpy(out1.length, 1.0, g2._1, 1, out1, 1)
+ val out2 = g2._2.clone()
+ blas.daxpy(out2.length, 1.0, g2._2, 1, out2, 1)
+ (out1, out2, g1._3 + g2._3)
+ })
val gJoinT2 = g.outerJoinVertices(t2) {
(vid: VertexId,
- vd: (DoubleMatrix, DoubleMatrix, Double, Double),
- msg: Option[(DoubleMatrix, DoubleMatrix, Double)]) =>
- (vd._1.addColumnVector(msg.get._1), vd._2.addColumnVector(msg.get._2),
- vd._3 + msg.get._3, vd._4)
+ vd: (Array[Double], Array[Double], Double, Double),
+ msg: Option[(Array[Double], Array[Double], Double)]) => {
+ val out1 = vd._1.clone()
+ blas.daxpy(out1.length, 1.0, msg.get._1, 1, out1, 1)
+ val out2 = vd._2.clone()
+ blas.daxpy(out2.length, 1.0, msg.get._2, 1, out2, 1)
+ (out1, out2, vd._3 + msg.get._3, vd._4)
+ }
}.cache()
materialize(gJoinT2)
g.unpersist()
@@ -160,10 +182,10 @@ object SVDPlusPlus {
// calculate error on training set
def sendMsgTestF(conf: Conf, u: Double)
- (ctx: EdgeContext[(DoubleMatrix, DoubleMatrix, Double, Double), Double, Double]) {
+ (ctx: EdgeContext[(Array[Double], Array[Double], Double, Double), Double, Double]) {
val (usr, itm) = (ctx.srcAttr, ctx.dstAttr)
val (p, q) = (usr._1, itm._1)
- var pred = u + usr._3 + itm._3 + q.dot(usr._2)
+ var pred = u + usr._3 + itm._3 + blas.ddot(q.length, q, 1, usr._2, 1)
pred = math.max(pred, conf.minVal)
pred = math.min(pred, conf.maxVal)
val err = (ctx.attr - pred) * (ctx.attr - pred)
@@ -173,7 +195,7 @@ object SVDPlusPlus {
g.cache()
val t3 = g.aggregateMessages[Double](sendMsgTestF(conf, u), _ + _)
val gJoinT3 = g.outerJoinVertices(t3) {
- (vid: VertexId, vd: (DoubleMatrix, DoubleMatrix, Double, Double), msg: Option[Double]) =>
+ (vid: VertexId, vd: (Array[Double], Array[Double], Double, Double), msg: Option[Double]) =>
if (msg.isDefined) (vd._1, vd._2, vd._3, msg.get) else vd
}.cache()
materialize(gJoinT3)
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/SVDPlusPlusSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/SVDPlusPlusSuite.scala
index 9987a4b1a3..7bd6b7f3c4 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/SVDPlusPlusSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/SVDPlusPlusSuite.scala
@@ -32,11 +32,11 @@ class SVDPlusPlusSuite extends FunSuite with LocalSparkContext {
Edge(fields(0).toLong * 2, fields(1).toLong * 2 + 1, fields(2).toDouble)
}
val conf = new SVDPlusPlus.Conf(10, 2, 0.0, 5.0, 0.007, 0.007, 0.005, 0.015) // 2 iterations
- var (graph, u) = SVDPlusPlus.runSVDPlusPlus(edges, conf)
+ val (graph, _) = SVDPlusPlus.run(edges, conf)
graph.cache()
- val err = graph.vertices.collect().map{ case (vid, vd) =>
+ val err = graph.vertices.map { case (vid, vd) =>
if (vid % 2 == 1) vd._4 else 0.0
- }.reduce(_ + _) / graph.triplets.collect().size
+ }.reduce(_ + _) / graph.numEdges
assert(err <= svdppErr)
}
}
diff --git a/mllib/pom.xml b/mllib/pom.xml
index b5c949e155..a76704a8c2 100644
--- a/mllib/pom.xml
+++ b/mllib/pom.xml
@@ -59,6 +59,7 @@
<groupId>org.jblas</groupId>
<artifactId>jblas</artifactId>
<version>${jblas.version}</version>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalanlp</groupId>
@@ -116,7 +117,7 @@
<dependency>
<groupId>com.github.fommil.netlib</groupId>
<artifactId>all</artifactId>
- <version>1.1.2</version>
+ <version>${netlib.java.version}</version>
<type>pom</type>
</dependency>
</dependencies>
diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
index 7bb69df653..e3515ee81a 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
@@ -26,7 +26,6 @@ import scala.util.hashing.byteswap64
import com.github.fommil.netlib.BLAS.{getInstance => blas}
import com.github.fommil.netlib.LAPACK.{getInstance => lapack}
-import org.jblas.DoubleMatrix
import org.netlib.util.intW
import org.apache.spark.{Logging, Partitioner}
@@ -361,14 +360,14 @@ object ALS extends Logging {
private[recommendation] class NNLSSolver extends LeastSquaresNESolver {
private var rank: Int = -1
private var workspace: NNLS.Workspace = _
- private var ata: DoubleMatrix = _
+ private var ata: Array[Double] = _
private var initialized: Boolean = false
private def initialize(rank: Int): Unit = {
if (!initialized) {
this.rank = rank
workspace = NNLS.createWorkspace(rank)
- ata = new DoubleMatrix(rank, rank)
+ ata = new Array[Double](rank * rank)
initialized = true
} else {
require(this.rank == rank)
@@ -385,7 +384,7 @@ object ALS extends Logging {
val rank = ne.k
initialize(rank)
fillAtA(ne.ata, lambda * ne.n)
- val x = NNLS.solve(ata, new DoubleMatrix(rank, 1, ne.atb: _*), workspace)
+ val x = NNLS.solve(ata, ne.atb, workspace)
ne.reset()
x.map(x => x.toFloat)
}
@@ -398,17 +397,16 @@ object ALS extends Logging {
var i = 0
var pos = 0
var a = 0.0
- val data = ata.data
while (i < rank) {
var j = 0
while (j <= i) {
a = triAtA(pos)
- data(i * rank + j) = a
- data(j * rank + i) = a
+ ata(i * rank + j) = a
+ ata(j * rank + i) = a
pos += 1
j += 1
}
- data(i * rank + i) += lambda
+ ata(i * rank + i) += lambda
i += 1
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLS.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLS.scala
index ccd93b318b..4766f77082 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLS.scala
@@ -17,7 +17,9 @@
package org.apache.spark.mllib.optimization
-import org.jblas.{DoubleMatrix, SimpleBlas}
+import java.{util => ju}
+
+import com.github.fommil.netlib.BLAS.{getInstance => blas}
/**
* Object used to solve nonnegative least squares problems using a modified
@@ -25,20 +27,20 @@ import org.jblas.{DoubleMatrix, SimpleBlas}
*/
private[spark] object NNLS {
class Workspace(val n: Int) {
- val scratch = new DoubleMatrix(n, 1)
- val grad = new DoubleMatrix(n, 1)
- val x = new DoubleMatrix(n, 1)
- val dir = new DoubleMatrix(n, 1)
- val lastDir = new DoubleMatrix(n, 1)
- val res = new DoubleMatrix(n, 1)
-
- def wipe() {
- scratch.fill(0.0)
- grad.fill(0.0)
- x.fill(0.0)
- dir.fill(0.0)
- lastDir.fill(0.0)
- res.fill(0.0)
+ val scratch = new Array[Double](n)
+ val grad = new Array[Double](n)
+ val x = new Array[Double](n)
+ val dir = new Array[Double](n)
+ val lastDir = new Array[Double](n)
+ val res = new Array[Double](n)
+
+ def wipe(): Unit = {
+ ju.Arrays.fill(scratch, 0.0)
+ ju.Arrays.fill(grad, 0.0)
+ ju.Arrays.fill(x, 0.0)
+ ju.Arrays.fill(dir, 0.0)
+ ju.Arrays.fill(lastDir, 0.0)
+ ju.Arrays.fill(res, 0.0)
}
}
@@ -60,18 +62,18 @@ private[spark] object NNLS {
* direction, however, while this method only uses a conjugate gradient direction if the last
* iteration did not cause a previously-inactive constraint to become active.
*/
- def solve(ata: DoubleMatrix, atb: DoubleMatrix, ws: Workspace): Array[Double] = {
+ def solve(ata: Array[Double], atb: Array[Double], ws: Workspace): Array[Double] = {
ws.wipe()
- val n = atb.rows
+ val n = atb.length
val scratch = ws.scratch
// find the optimal unconstrained step
- def steplen(dir: DoubleMatrix, res: DoubleMatrix): Double = {
- val top = SimpleBlas.dot(dir, res)
- SimpleBlas.gemv(1.0, ata, dir, 0.0, scratch)
+ def steplen(dir: Array[Double], res: Array[Double]): Double = {
+ val top = blas.ddot(n, dir, 1, res, 1)
+ blas.dgemv("N", n, n, 1.0, ata, n, dir, 1, 0.0, scratch, 1)
// Push the denominator upward very slightly to avoid infinities and silliness
- top / (SimpleBlas.dot(scratch, dir) + 1e-20)
+ top / (blas.ddot(n, scratch, 1, dir, 1) + 1e-20)
}
// stopping condition
@@ -96,52 +98,52 @@ private[spark] object NNLS {
var i = 0
while (iterno < iterMax) {
// find the residual
- SimpleBlas.gemv(1.0, ata, x, 0.0, res)
- SimpleBlas.axpy(-1.0, atb, res)
- SimpleBlas.copy(res, grad)
+ blas.dgemv("N", n, n, 1.0, ata, n, x, 1, 0.0, res, 1)
+ blas.daxpy(n, -1.0, atb, 1, res, 1)
+ blas.dcopy(n, res, 1, grad, 1)
// project the gradient
i = 0
while (i < n) {
- if (grad.data(i) > 0.0 && x.data(i) == 0.0) {
- grad.data(i) = 0.0
+ if (grad(i) > 0.0 && x(i) == 0.0) {
+ grad(i) = 0.0
}
i = i + 1
}
- val ngrad = SimpleBlas.dot(grad, grad)
+ val ngrad = blas.ddot(n, grad, 1, grad, 1)
- SimpleBlas.copy(grad, dir)
+ blas.dcopy(n, grad, 1, dir, 1)
// use a CG direction under certain conditions
var step = steplen(grad, res)
var ndir = 0.0
- val nx = SimpleBlas.dot(x, x)
+ val nx = blas.ddot(n, x, 1, x, 1)
if (iterno > lastWall + 1) {
val alpha = ngrad / lastNorm
- SimpleBlas.axpy(alpha, lastDir, dir)
+ blas.daxpy(n, alpha, lastDir, 1, dir, 1)
val dstep = steplen(dir, res)
- ndir = SimpleBlas.dot(dir, dir)
+ ndir = blas.ddot(n, dir, 1, dir, 1)
if (stop(dstep, ndir, nx)) {
// reject the CG step if it could lead to premature termination
- SimpleBlas.copy(grad, dir)
- ndir = SimpleBlas.dot(dir, dir)
+ blas.dcopy(n, grad, 1, dir, 1)
+ ndir = blas.ddot(n, dir, 1, dir, 1)
} else {
step = dstep
}
} else {
- ndir = SimpleBlas.dot(dir, dir)
+ ndir = blas.ddot(n, dir, 1, dir, 1)
}
// terminate?
if (stop(step, ndir, nx)) {
- return x.data.clone
+ return x.clone
}
// don't run through the walls
i = 0
while (i < n) {
- if (step * dir.data(i) > x.data(i)) {
- step = x.data(i) / dir.data(i)
+ if (step * dir(i) > x(i)) {
+ step = x(i) / dir(i)
}
i = i + 1
}
@@ -149,19 +151,19 @@ private[spark] object NNLS {
// take the step
i = 0
while (i < n) {
- if (step * dir.data(i) > x.data(i) * (1 - 1e-14)) {
- x.data(i) = 0
+ if (step * dir(i) > x(i) * (1 - 1e-14)) {
+ x(i) = 0
lastWall = iterno
} else {
- x.data(i) -= step * dir.data(i)
+ x(i) -= step * dir(i)
}
i = i + 1
}
iterno = iterno + 1
- SimpleBlas.copy(dir, lastDir)
+ blas.dcopy(n, dir, 1, lastDir, 1)
lastNorm = ngrad
}
- x.data.clone
+ x.clone
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
index 5f5a996a87..36cbf060d9 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
@@ -21,10 +21,10 @@ import java.io.IOException
import java.lang.{Integer => JavaInteger}
import org.apache.hadoop.fs.Path
-import org.jblas.DoubleMatrix
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
+import com.github.fommil.netlib.BLAS.{getInstance => blas}
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD}
@@ -70,9 +70,9 @@ class MatrixFactorizationModel(
/** Predict the rating of one user for one product. */
def predict(user: Int, product: Int): Double = {
- val userVector = new DoubleMatrix(userFeatures.lookup(user).head)
- val productVector = new DoubleMatrix(productFeatures.lookup(product).head)
- userVector.dot(productVector)
+ val userVector = userFeatures.lookup(user).head
+ val productVector = productFeatures.lookup(product).head
+ blas.ddot(userVector.length, userVector, 1, productVector, 1)
}
/**
@@ -89,9 +89,7 @@ class MatrixFactorizationModel(
}
users.join(productFeatures).map {
case (product, ((user, uFeatures), pFeatures)) =>
- val userVector = new DoubleMatrix(uFeatures)
- val productVector = new DoubleMatrix(pFeatures)
- Rating(user, product, userVector.dot(productVector))
+ Rating(user, product, blas.ddot(uFeatures.length, uFeatures, 1, pFeatures, 1))
}
}
@@ -143,9 +141,8 @@ class MatrixFactorizationModel(
recommendToFeatures: Array[Double],
recommendableFeatures: RDD[(Int, Array[Double])],
num: Int): Array[(Int, Double)] = {
- val recommendToVector = new DoubleMatrix(recommendToFeatures)
val scored = recommendableFeatures.map { case (id,features) =>
- (id, recommendToVector.dot(new DoubleMatrix(features)))
+ (id, blas.ddot(features.length, recommendToFeatures, 1, features, 1))
}
scored.top(num)(Ordering.by(_._2))
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala
index 97f54aa62d..c9d33787b0 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala
@@ -20,7 +20,7 @@ package org.apache.spark.mllib.util
import scala.collection.JavaConversions._
import scala.util.Random
-import org.jblas.DoubleMatrix
+import com.github.fommil.netlib.BLAS.{getInstance => blas}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.SparkContext
@@ -72,11 +72,10 @@ object LinearDataGenerator {
eps: Double = 0.1): Seq[LabeledPoint] = {
val rnd = new Random(seed)
- val weightsMat = new DoubleMatrix(1, weights.length, weights:_*)
val x = Array.fill[Array[Double]](nPoints)(
Array.fill[Double](weights.length)(2 * rnd.nextDouble - 1.0))
val y = x.map { xi =>
- new DoubleMatrix(1, xi.length, xi: _*).dot(weightsMat) + intercept + eps * rnd.nextGaussian()
+ blas.ddot(weights.length, xi, 1, weights, 1) + intercept + eps * rnd.nextGaussian()
}
y.zip(x).map(p => LabeledPoint(p._1, Vectors.dense(p._2)))
}
@@ -100,9 +99,9 @@ object LinearDataGenerator {
eps: Double,
nparts: Int = 2,
intercept: Double = 0.0) : RDD[LabeledPoint] = {
- org.jblas.util.Random.seed(42)
+ val random = new Random(42)
// Random values distributed uniformly in [-0.5, 0.5]
- val w = DoubleMatrix.rand(nfeatures, 1).subi(0.5)
+ val w = Array.fill(nfeatures)(random.nextDouble() - 0.5)
val data: RDD[LabeledPoint] = sc.parallelize(0 until nparts, nparts).flatMap { p =>
val seed = 42 + p
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala
index b76fbe89c3..0c5b4f9d04 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala
@@ -17,13 +17,14 @@
package org.apache.spark.mllib.util
+import java.{util => ju}
+
import scala.language.postfixOps
import scala.util.Random
-import org.jblas.DoubleMatrix
-
-import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.SparkContext
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mllib.linalg.{BLAS, DenseMatrix}
import org.apache.spark.rdd.RDD
/**
@@ -72,24 +73,25 @@ object MFDataGenerator {
val sc = new SparkContext(sparkMaster, "MFDataGenerator")
- val A = DoubleMatrix.randn(m, rank)
- val B = DoubleMatrix.randn(rank, n)
- val z = 1 / scala.math.sqrt(scala.math.sqrt(rank))
- A.mmuli(z)
- B.mmuli(z)
- val fullData = A.mmul(B)
+ val random = new ju.Random(42L)
+
+ val A = DenseMatrix.randn(m, rank, random)
+ val B = DenseMatrix.randn(rank, n, random)
+ val z = 1 / math.sqrt(rank)
+ val fullData = DenseMatrix.zeros(m, n)
+ BLAS.gemm(z, A, B, 1.0, fullData)
val df = rank * (m + n - rank)
val sampSize = scala.math.min(scala.math.round(trainSampFact * df),
scala.math.round(.99 * m * n)).toInt
val rand = new Random()
val mn = m * n
- val shuffled = rand.shuffle(1 to mn toList)
+ val shuffled = rand.shuffle((0 until mn).toList)
val omega = shuffled.slice(0, sampSize)
val ordered = omega.sortWith(_ < _).toArray
val trainData: RDD[(Int, Int, Double)] = sc.parallelize(ordered)
- .map(x => (fullData.indexRows(x - 1), fullData.indexColumns(x - 1), fullData.get(x - 1)))
+ .map(x => (x % m, x / m, fullData.values(x)))
// optionally add gaussian noise
if (noise) {
@@ -105,7 +107,7 @@ object MFDataGenerator {
val testOmega = shuffled.slice(sampSize, sampSize + testSampSize)
val testOrdered = testOmega.sortWith(_ < _).toArray
val testData: RDD[(Int, Int, Double)] = sc.parallelize(testOrdered)
- .map(x => (fullData.indexRows(x - 1), fullData.indexColumns(x - 1), fullData.get(x - 1)))
+ .map(x => (x % m, x / m, fullData.values(x)))
testData.map(x => x._1 + "," + x._2 + "," + x._3).saveAsTextFile(outputPath)
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala
index 7db97e6bac..a8e30cc9d7 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala
@@ -19,7 +19,7 @@ package org.apache.spark.mllib.util
import scala.util.Random
-import org.jblas.DoubleMatrix
+import com.github.fommil.netlib.BLAS.{getInstance => blas}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.SparkContext
@@ -51,8 +51,7 @@ object SVMDataGenerator {
val sc = new SparkContext(sparkMaster, "SVMGenerator")
val globalRnd = new Random(94720)
- val trueWeights = new DoubleMatrix(1, nfeatures + 1,
- Array.fill[Double](nfeatures + 1)(globalRnd.nextGaussian()):_*)
+ val trueWeights = Array.fill[Double](nfeatures + 1)(globalRnd.nextGaussian())
val data: RDD[LabeledPoint] = sc.parallelize(0 until nexamples, parts).map { idx =>
val rnd = new Random(42 + idx)
@@ -60,7 +59,7 @@ object SVMDataGenerator {
val x = Array.fill[Double](nfeatures) {
rnd.nextDouble() * 2.0 - 1.0
}
- val yD = new DoubleMatrix(1, x.length, x: _*).dot(trueWeights) + rnd.nextGaussian() * 0.1
+ val yD = blas.ddot(trueWeights.length, x, 1, trueWeights, 1) + rnd.nextGaussian() * 0.1
val y = if (yD < 0) 0.0 else 1.0
LabeledPoint(y, Vectors.dense(x))
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala
index 82c327bd49..22855e4e8f 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala
@@ -55,7 +55,7 @@ class NNLSSuite extends FunSuite {
for (k <- 0 until 100) {
val (ata, atb) = genOnesData(n, rand)
- val x = new DoubleMatrix(NNLS.solve(ata, atb, ws))
+ val x = new DoubleMatrix(NNLS.solve(ata.data, atb.data, ws))
assert(x.length === n)
val answer = DoubleMatrix.ones(n, 1)
SimpleBlas.axpy(-1.0, answer, x)
@@ -79,7 +79,7 @@ class NNLSSuite extends FunSuite {
val goodx = Array(0.13025, 0.54506, 0.2874, 0.0, 0.028628)
val ws = NNLS.createWorkspace(n)
- val x = NNLS.solve(ata, atb, ws)
+ val x = NNLS.solve(ata.data, atb.data, ws)
for (i <- 0 until n) {
assert(x(i) ~== goodx(i) absTol 1E-3)
assert(x(i) >= 0)
@@ -104,7 +104,7 @@ class NNLSSuite extends FunSuite {
val ws = NNLS.createWorkspace(n)
- val x = new DoubleMatrix(NNLS.solve(ata, atb, ws))
+ val x = new DoubleMatrix(NNLS.solve(ata.data, atb.data, ws))
val obj = computeObjectiveValue(ata, atb, x)
assert(obj < refObj + 1E-5)
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/stat/KernelDensitySuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/stat/KernelDensitySuite.scala
index f6a1e19f50..16ecae23dd 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/stat/KernelDensitySuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/stat/KernelDensitySuite.scala
@@ -21,9 +21,9 @@ import org.scalatest.FunSuite
import org.apache.commons.math3.distribution.NormalDistribution
-import org.apache.spark.mllib.util.LocalClusterSparkContext
+import org.apache.spark.mllib.util.MLlibTestSparkContext
-class KernelDensitySuite extends FunSuite with LocalClusterSparkContext {
+class KernelDensitySuite extends FunSuite with MLlibTestSparkContext {
test("kernel density single sample") {
val rdd = sc.parallelize(Array(5.0))
val evaluationPoints = Array(5.0, 6.0)
diff --git a/pom.xml b/pom.xml
index a19da73cf4..6fc56a86d4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -157,6 +157,7 @@
<codehaus.jackson.version>1.8.8</codehaus.jackson.version>
<fasterxml.jackson.version>2.4.4</fasterxml.jackson.version>
<snappy.version>1.1.1.6</snappy.version>
+ <netlib.java.version>1.1.2</netlib.java.version>
<!--
Dependency scopes that can be overridden by enabling certain profiles. These profiles are
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index ee6229aa6b..627b2cea4d 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -149,6 +149,34 @@ object MimaExcludes {
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.mllib.linalg.VectorUDT.serialize"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.mllib.linalg.VectorUDT.sqlType")
) ++ Seq(
+ // SPARK-5814
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$wrapDoubleArray"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$fillFullMatrix"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$iterations"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$makeOutLinkBlock"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$computeYtY"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$makeLinkRDDs"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$alpha"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$randomFactor"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$makeInLinkBlock"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$dspr"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$lambda"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$implicitPrefs"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$$rank")
+ ) ++ Seq(
// SPARK-4682
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.RealClock"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.Clock"),