aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
Diffstat (limited to 'mllib')
-rw-r--r--mllib/pom.xml3
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala14
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLS.scala86
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala15
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala9
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala26
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala7
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala6
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/stat/KernelDensitySuite.scala4
9 files changed, 84 insertions, 86 deletions
diff --git a/mllib/pom.xml b/mllib/pom.xml
index b5c949e155..a76704a8c2 100644
--- a/mllib/pom.xml
+++ b/mllib/pom.xml
@@ -59,6 +59,7 @@
<groupId>org.jblas</groupId>
<artifactId>jblas</artifactId>
<version>${jblas.version}</version>
+ <scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalanlp</groupId>
@@ -116,7 +117,7 @@
<dependency>
<groupId>com.github.fommil.netlib</groupId>
<artifactId>all</artifactId>
- <version>1.1.2</version>
+ <version>${netlib.java.version}</version>
<type>pom</type>
</dependency>
</dependencies>
diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
index 7bb69df653..e3515ee81a 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala
@@ -26,7 +26,6 @@ import scala.util.hashing.byteswap64
import com.github.fommil.netlib.BLAS.{getInstance => blas}
import com.github.fommil.netlib.LAPACK.{getInstance => lapack}
-import org.jblas.DoubleMatrix
import org.netlib.util.intW
import org.apache.spark.{Logging, Partitioner}
@@ -361,14 +360,14 @@ object ALS extends Logging {
private[recommendation] class NNLSSolver extends LeastSquaresNESolver {
private var rank: Int = -1
private var workspace: NNLS.Workspace = _
- private var ata: DoubleMatrix = _
+ private var ata: Array[Double] = _
private var initialized: Boolean = false
private def initialize(rank: Int): Unit = {
if (!initialized) {
this.rank = rank
workspace = NNLS.createWorkspace(rank)
- ata = new DoubleMatrix(rank, rank)
+ ata = new Array[Double](rank * rank)
initialized = true
} else {
require(this.rank == rank)
@@ -385,7 +384,7 @@ object ALS extends Logging {
val rank = ne.k
initialize(rank)
fillAtA(ne.ata, lambda * ne.n)
- val x = NNLS.solve(ata, new DoubleMatrix(rank, 1, ne.atb: _*), workspace)
+ val x = NNLS.solve(ata, ne.atb, workspace)
ne.reset()
x.map(x => x.toFloat)
}
@@ -398,17 +397,16 @@ object ALS extends Logging {
var i = 0
var pos = 0
var a = 0.0
- val data = ata.data
while (i < rank) {
var j = 0
while (j <= i) {
a = triAtA(pos)
- data(i * rank + j) = a
- data(j * rank + i) = a
+ ata(i * rank + j) = a
+ ata(j * rank + i) = a
pos += 1
j += 1
}
- data(i * rank + i) += lambda
+ ata(i * rank + i) += lambda
i += 1
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLS.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLS.scala
index ccd93b318b..4766f77082 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLS.scala
@@ -17,7 +17,9 @@
package org.apache.spark.mllib.optimization
-import org.jblas.{DoubleMatrix, SimpleBlas}
+import java.{util => ju}
+
+import com.github.fommil.netlib.BLAS.{getInstance => blas}
/**
* Object used to solve nonnegative least squares problems using a modified
@@ -25,20 +27,20 @@ import org.jblas.{DoubleMatrix, SimpleBlas}
*/
private[spark] object NNLS {
class Workspace(val n: Int) {
- val scratch = new DoubleMatrix(n, 1)
- val grad = new DoubleMatrix(n, 1)
- val x = new DoubleMatrix(n, 1)
- val dir = new DoubleMatrix(n, 1)
- val lastDir = new DoubleMatrix(n, 1)
- val res = new DoubleMatrix(n, 1)
-
- def wipe() {
- scratch.fill(0.0)
- grad.fill(0.0)
- x.fill(0.0)
- dir.fill(0.0)
- lastDir.fill(0.0)
- res.fill(0.0)
+ val scratch = new Array[Double](n)
+ val grad = new Array[Double](n)
+ val x = new Array[Double](n)
+ val dir = new Array[Double](n)
+ val lastDir = new Array[Double](n)
+ val res = new Array[Double](n)
+
+ def wipe(): Unit = {
+ ju.Arrays.fill(scratch, 0.0)
+ ju.Arrays.fill(grad, 0.0)
+ ju.Arrays.fill(x, 0.0)
+ ju.Arrays.fill(dir, 0.0)
+ ju.Arrays.fill(lastDir, 0.0)
+ ju.Arrays.fill(res, 0.0)
}
}
@@ -60,18 +62,18 @@ private[spark] object NNLS {
* direction, however, while this method only uses a conjugate gradient direction if the last
* iteration did not cause a previously-inactive constraint to become active.
*/
- def solve(ata: DoubleMatrix, atb: DoubleMatrix, ws: Workspace): Array[Double] = {
+ def solve(ata: Array[Double], atb: Array[Double], ws: Workspace): Array[Double] = {
ws.wipe()
- val n = atb.rows
+ val n = atb.length
val scratch = ws.scratch
// find the optimal unconstrained step
- def steplen(dir: DoubleMatrix, res: DoubleMatrix): Double = {
- val top = SimpleBlas.dot(dir, res)
- SimpleBlas.gemv(1.0, ata, dir, 0.0, scratch)
+ def steplen(dir: Array[Double], res: Array[Double]): Double = {
+ val top = blas.ddot(n, dir, 1, res, 1)
+ blas.dgemv("N", n, n, 1.0, ata, n, dir, 1, 0.0, scratch, 1)
// Push the denominator upward very slightly to avoid infinities and silliness
- top / (SimpleBlas.dot(scratch, dir) + 1e-20)
+ top / (blas.ddot(n, scratch, 1, dir, 1) + 1e-20)
}
// stopping condition
@@ -96,52 +98,52 @@ private[spark] object NNLS {
var i = 0
while (iterno < iterMax) {
// find the residual
- SimpleBlas.gemv(1.0, ata, x, 0.0, res)
- SimpleBlas.axpy(-1.0, atb, res)
- SimpleBlas.copy(res, grad)
+ blas.dgemv("N", n, n, 1.0, ata, n, x, 1, 0.0, res, 1)
+ blas.daxpy(n, -1.0, atb, 1, res, 1)
+ blas.dcopy(n, res, 1, grad, 1)
// project the gradient
i = 0
while (i < n) {
- if (grad.data(i) > 0.0 && x.data(i) == 0.0) {
- grad.data(i) = 0.0
+ if (grad(i) > 0.0 && x(i) == 0.0) {
+ grad(i) = 0.0
}
i = i + 1
}
- val ngrad = SimpleBlas.dot(grad, grad)
+ val ngrad = blas.ddot(n, grad, 1, grad, 1)
- SimpleBlas.copy(grad, dir)
+ blas.dcopy(n, grad, 1, dir, 1)
// use a CG direction under certain conditions
var step = steplen(grad, res)
var ndir = 0.0
- val nx = SimpleBlas.dot(x, x)
+ val nx = blas.ddot(n, x, 1, x, 1)
if (iterno > lastWall + 1) {
val alpha = ngrad / lastNorm
- SimpleBlas.axpy(alpha, lastDir, dir)
+ blas.daxpy(n, alpha, lastDir, 1, dir, 1)
val dstep = steplen(dir, res)
- ndir = SimpleBlas.dot(dir, dir)
+ ndir = blas.ddot(n, dir, 1, dir, 1)
if (stop(dstep, ndir, nx)) {
// reject the CG step if it could lead to premature termination
- SimpleBlas.copy(grad, dir)
- ndir = SimpleBlas.dot(dir, dir)
+ blas.dcopy(n, grad, 1, dir, 1)
+ ndir = blas.ddot(n, dir, 1, dir, 1)
} else {
step = dstep
}
} else {
- ndir = SimpleBlas.dot(dir, dir)
+ ndir = blas.ddot(n, dir, 1, dir, 1)
}
// terminate?
if (stop(step, ndir, nx)) {
- return x.data.clone
+ return x.clone
}
// don't run through the walls
i = 0
while (i < n) {
- if (step * dir.data(i) > x.data(i)) {
- step = x.data(i) / dir.data(i)
+ if (step * dir(i) > x(i)) {
+ step = x(i) / dir(i)
}
i = i + 1
}
@@ -149,19 +151,19 @@ private[spark] object NNLS {
// take the step
i = 0
while (i < n) {
- if (step * dir.data(i) > x.data(i) * (1 - 1e-14)) {
- x.data(i) = 0
+ if (step * dir(i) > x(i) * (1 - 1e-14)) {
+ x(i) = 0
lastWall = iterno
} else {
- x.data(i) -= step * dir.data(i)
+ x(i) -= step * dir(i)
}
i = i + 1
}
iterno = iterno + 1
- SimpleBlas.copy(dir, lastDir)
+ blas.dcopy(n, dir, 1, lastDir, 1)
lastNorm = ngrad
}
- x.data.clone
+ x.clone
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
index 5f5a996a87..36cbf060d9 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
@@ -21,10 +21,10 @@ import java.io.IOException
import java.lang.{Integer => JavaInteger}
import org.apache.hadoop.fs.Path
-import org.jblas.DoubleMatrix
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
+import com.github.fommil.netlib.BLAS.{getInstance => blas}
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.api.java.{JavaPairRDD, JavaRDD}
@@ -70,9 +70,9 @@ class MatrixFactorizationModel(
/** Predict the rating of one user for one product. */
def predict(user: Int, product: Int): Double = {
- val userVector = new DoubleMatrix(userFeatures.lookup(user).head)
- val productVector = new DoubleMatrix(productFeatures.lookup(product).head)
- userVector.dot(productVector)
+ val userVector = userFeatures.lookup(user).head
+ val productVector = productFeatures.lookup(product).head
+ blas.ddot(userVector.length, userVector, 1, productVector, 1)
}
/**
@@ -89,9 +89,7 @@ class MatrixFactorizationModel(
}
users.join(productFeatures).map {
case (product, ((user, uFeatures), pFeatures)) =>
- val userVector = new DoubleMatrix(uFeatures)
- val productVector = new DoubleMatrix(pFeatures)
- Rating(user, product, userVector.dot(productVector))
+ Rating(user, product, blas.ddot(uFeatures.length, uFeatures, 1, pFeatures, 1))
}
}
@@ -143,9 +141,8 @@ class MatrixFactorizationModel(
recommendToFeatures: Array[Double],
recommendableFeatures: RDD[(Int, Array[Double])],
num: Int): Array[(Int, Double)] = {
- val recommendToVector = new DoubleMatrix(recommendToFeatures)
val scored = recommendableFeatures.map { case (id,features) =>
- (id, recommendToVector.dot(new DoubleMatrix(features)))
+ (id, blas.ddot(features.length, recommendToFeatures, 1, features, 1))
}
scored.top(num)(Ordering.by(_._2))
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala
index 97f54aa62d..c9d33787b0 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala
@@ -20,7 +20,7 @@ package org.apache.spark.mllib.util
import scala.collection.JavaConversions._
import scala.util.Random
-import org.jblas.DoubleMatrix
+import com.github.fommil.netlib.BLAS.{getInstance => blas}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.SparkContext
@@ -72,11 +72,10 @@ object LinearDataGenerator {
eps: Double = 0.1): Seq[LabeledPoint] = {
val rnd = new Random(seed)
- val weightsMat = new DoubleMatrix(1, weights.length, weights:_*)
val x = Array.fill[Array[Double]](nPoints)(
Array.fill[Double](weights.length)(2 * rnd.nextDouble - 1.0))
val y = x.map { xi =>
- new DoubleMatrix(1, xi.length, xi: _*).dot(weightsMat) + intercept + eps * rnd.nextGaussian()
+ blas.ddot(weights.length, xi, 1, weights, 1) + intercept + eps * rnd.nextGaussian()
}
y.zip(x).map(p => LabeledPoint(p._1, Vectors.dense(p._2)))
}
@@ -100,9 +99,9 @@ object LinearDataGenerator {
eps: Double,
nparts: Int = 2,
intercept: Double = 0.0) : RDD[LabeledPoint] = {
- org.jblas.util.Random.seed(42)
+ val random = new Random(42)
// Random values distributed uniformly in [-0.5, 0.5]
- val w = DoubleMatrix.rand(nfeatures, 1).subi(0.5)
+ val w = Array.fill(nfeatures)(random.nextDouble() - 0.5)
val data: RDD[LabeledPoint] = sc.parallelize(0 until nparts, nparts).flatMap { p =>
val seed = 42 + p
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala
index b76fbe89c3..0c5b4f9d04 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MFDataGenerator.scala
@@ -17,13 +17,14 @@
package org.apache.spark.mllib.util
+import java.{util => ju}
+
import scala.language.postfixOps
import scala.util.Random
-import org.jblas.DoubleMatrix
-
-import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.SparkContext
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.mllib.linalg.{BLAS, DenseMatrix}
import org.apache.spark.rdd.RDD
/**
@@ -72,24 +73,25 @@ object MFDataGenerator {
val sc = new SparkContext(sparkMaster, "MFDataGenerator")
- val A = DoubleMatrix.randn(m, rank)
- val B = DoubleMatrix.randn(rank, n)
- val z = 1 / scala.math.sqrt(scala.math.sqrt(rank))
- A.mmuli(z)
- B.mmuli(z)
- val fullData = A.mmul(B)
+ val random = new ju.Random(42L)
+
+ val A = DenseMatrix.randn(m, rank, random)
+ val B = DenseMatrix.randn(rank, n, random)
+ val z = 1 / math.sqrt(rank)
+ val fullData = DenseMatrix.zeros(m, n)
+ BLAS.gemm(z, A, B, 1.0, fullData)
val df = rank * (m + n - rank)
val sampSize = scala.math.min(scala.math.round(trainSampFact * df),
scala.math.round(.99 * m * n)).toInt
val rand = new Random()
val mn = m * n
- val shuffled = rand.shuffle(1 to mn toList)
+ val shuffled = rand.shuffle((0 until mn).toList)
val omega = shuffled.slice(0, sampSize)
val ordered = omega.sortWith(_ < _).toArray
val trainData: RDD[(Int, Int, Double)] = sc.parallelize(ordered)
- .map(x => (fullData.indexRows(x - 1), fullData.indexColumns(x - 1), fullData.get(x - 1)))
+ .map(x => (x % m, x / m, fullData.values(x)))
// optionally add gaussian noise
if (noise) {
@@ -105,7 +107,7 @@ object MFDataGenerator {
val testOmega = shuffled.slice(sampSize, sampSize + testSampSize)
val testOrdered = testOmega.sortWith(_ < _).toArray
val testData: RDD[(Int, Int, Double)] = sc.parallelize(testOrdered)
- .map(x => (fullData.indexRows(x - 1), fullData.indexColumns(x - 1), fullData.get(x - 1)))
+ .map(x => (x % m, x / m, fullData.values(x)))
testData.map(x => x._1 + "," + x._2 + "," + x._3).saveAsTextFile(outputPath)
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala
index 7db97e6bac..a8e30cc9d7 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala
@@ -19,7 +19,7 @@ package org.apache.spark.mllib.util
import scala.util.Random
-import org.jblas.DoubleMatrix
+import com.github.fommil.netlib.BLAS.{getInstance => blas}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.SparkContext
@@ -51,8 +51,7 @@ object SVMDataGenerator {
val sc = new SparkContext(sparkMaster, "SVMGenerator")
val globalRnd = new Random(94720)
- val trueWeights = new DoubleMatrix(1, nfeatures + 1,
- Array.fill[Double](nfeatures + 1)(globalRnd.nextGaussian()):_*)
+ val trueWeights = Array.fill[Double](nfeatures + 1)(globalRnd.nextGaussian())
val data: RDD[LabeledPoint] = sc.parallelize(0 until nexamples, parts).map { idx =>
val rnd = new Random(42 + idx)
@@ -60,7 +59,7 @@ object SVMDataGenerator {
val x = Array.fill[Double](nfeatures) {
rnd.nextDouble() * 2.0 - 1.0
}
- val yD = new DoubleMatrix(1, x.length, x: _*).dot(trueWeights) + rnd.nextGaussian() * 0.1
+ val yD = blas.ddot(trueWeights.length, x, 1, trueWeights, 1) + rnd.nextGaussian() * 0.1
val y = if (yD < 0) 0.0 else 1.0
LabeledPoint(y, Vectors.dense(x))
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala
index 82c327bd49..22855e4e8f 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala
@@ -55,7 +55,7 @@ class NNLSSuite extends FunSuite {
for (k <- 0 until 100) {
val (ata, atb) = genOnesData(n, rand)
- val x = new DoubleMatrix(NNLS.solve(ata, atb, ws))
+ val x = new DoubleMatrix(NNLS.solve(ata.data, atb.data, ws))
assert(x.length === n)
val answer = DoubleMatrix.ones(n, 1)
SimpleBlas.axpy(-1.0, answer, x)
@@ -79,7 +79,7 @@ class NNLSSuite extends FunSuite {
val goodx = Array(0.13025, 0.54506, 0.2874, 0.0, 0.028628)
val ws = NNLS.createWorkspace(n)
- val x = NNLS.solve(ata, atb, ws)
+ val x = NNLS.solve(ata.data, atb.data, ws)
for (i <- 0 until n) {
assert(x(i) ~== goodx(i) absTol 1E-3)
assert(x(i) >= 0)
@@ -104,7 +104,7 @@ class NNLSSuite extends FunSuite {
val ws = NNLS.createWorkspace(n)
- val x = new DoubleMatrix(NNLS.solve(ata, atb, ws))
+ val x = new DoubleMatrix(NNLS.solve(ata.data, atb.data, ws))
val obj = computeObjectiveValue(ata, atb, x)
assert(obj < refObj + 1E-5)
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/stat/KernelDensitySuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/stat/KernelDensitySuite.scala
index f6a1e19f50..16ecae23dd 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/stat/KernelDensitySuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/stat/KernelDensitySuite.scala
@@ -21,9 +21,9 @@ import org.scalatest.FunSuite
import org.apache.commons.math3.distribution.NormalDistribution
-import org.apache.spark.mllib.util.LocalClusterSparkContext
+import org.apache.spark.mllib.util.MLlibTestSparkContext
-class KernelDensitySuite extends FunSuite with LocalClusterSparkContext {
+class KernelDensitySuite extends FunSuite with MLlibTestSparkContext {
test("kernel density single sample") {
val rdd = sc.parallelize(Array(5.0))
val evaluationPoints = Array(5.0, 6.0)