aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorYanbo Liang <ybliang8@gmail.com>2016-07-19 12:31:04 +0100
committerSean Owen <sowen@cloudera.com>2016-07-19 12:31:04 +0100
commit670891496a82538a5e2bf981a4044fb6f4cbb062 (patch)
treea91eb7ad9962b2eb5f2de12e3522ec45aab951d1 /mllib
parent5d92326be76cb15edc6e18e94a373e197f696803 (diff)
downloadspark-670891496a82538a5e2bf981a4044fb6f4cbb062.tar.gz
spark-670891496a82538a5e2bf981a4044fb6f4cbb062.tar.bz2
spark-670891496a82538a5e2bf981a4044fb6f4cbb062.zip
[SPARK-16494][ML] Upgrade breeze version to 0.12
## What changes were proposed in this pull request? breeze 0.12 has been released for more than half a year, and it brings lots of new features, performance improvement and bug fixes. One of the biggest features is ```LBFGS-B``` which is an implementation of ```LBFGS``` with box constraints and much faster for some special case. We would like to implement Huber loss function for ```LinearRegression``` ([SPARK-3181](https://issues.apache.org/jira/browse/SPARK-3181)) and it requires ```LBFGS-B``` as the optimization solver. So we should bump up the dependent breeze version to 0.12. For more features, improvements and bug fixes of breeze 0.12, you can refer the following link: https://groups.google.com/forum/#!topic/scala-breeze/nEeRi_DcY5c ## How was this patch tested? No new tests, should pass the existing ones. Author: Yanbo Liang <ybliang8@gmail.com> Closes #14150 from yanboliang/spark-16494.
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala5
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala6
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala5
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala8
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala5
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala5
-rw-r--r--mllib/src/test/java/org/apache/spark/ml/feature/JavaPCASuite.java6
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala4
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala9
9 files changed, 23 insertions, 30 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index 91eee0e69d..7694773c81 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -424,11 +424,6 @@ class LogisticRegression @Since("1.2.0") (
throw new SparkException(msg)
}
- if (!state.actuallyConverged) {
- logWarning("LogisticRegression training finished but the result " +
- s"is not converged because: ${state.convergedReason.get.reason}")
- }
-
/*
The coefficients are trained in the scaled space; we're converting them back to
the original space.
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
index 700a92cc26..2b9912657f 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/AFTSurvivalRegression.scala
@@ -244,12 +244,6 @@ class AFTSurvivalRegression @Since("1.6.0") (@Since("1.6.0") override val uid: S
val msg = s"${optimizer.getClass.getName} failed."
throw new SparkException(msg)
}
-
- if (!state.actuallyConverged) {
- logWarning("AFTSurvivalRegression training finished but the result " +
- s"is not converged because: ${state.convergedReason.get.reason}")
- }
-
state.x.toArray.clone()
}
diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
index 0a155e1844..a0ff7f07aa 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala
@@ -325,11 +325,6 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String
throw new SparkException(msg)
}
- if (!state.actuallyConverged) {
- logWarning("LinearRegression training finished but the result " +
- s"is not converged because: ${state.convergedReason.get.reason}")
- }
-
/*
The coefficients are trained in the scaled space; we're converting them back to
the original space.
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
index 9ebba1de0d..90d8a558f1 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
@@ -784,7 +784,13 @@ class DistributedLDAModel private[clustering] (
@Since("1.5.0")
def topTopicsPerDocument(k: Int): RDD[(Long, Array[Int], Array[Double])] = {
graph.vertices.filter(LDA.isDocumentVertex).map { case (docID, topicCounts) =>
- val topIndices = argtopk(topicCounts, k)
+ // TODO: Remove work-around for the breeze bug.
+ // https://github.com/scalanlp/breeze/issues/561
+ val topIndices = if (k == topicCounts.length) {
+ Seq.range(0, k)
+ } else {
+ argtopk(topicCounts, k)
+ }
val sumCounts = sum(topicCounts)
val weights = if (sumCounts != 0) {
topicCounts(topIndices) / sumCounts
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
index 2436efba32..e2c6aca553 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
@@ -508,8 +508,9 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
val weight = rho()
val N = gammat.rows.toDouble
val alpha = this.alpha.asBreeze.toDenseVector
- val logphat: BDM[Double] = sum(LDAUtils.dirichletExpectation(gammat)(::, breeze.linalg.*)) / N
- val gradf = N * (-LDAUtils.dirichletExpectation(alpha) + logphat.toDenseVector)
+ val logphat: BDV[Double] =
+ sum(LDAUtils.dirichletExpectation(gammat)(::, breeze.linalg.*)).t / N
+ val gradf = N * (-LDAUtils.dirichletExpectation(alpha) + logphat)
val c = N * trigamma(sum(alpha))
val q = -N * trigamma(alpha)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
index fd09f35277..e49363c2c6 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala
@@ -213,11 +213,6 @@ object LBFGS extends Logging {
}
lossHistory += state.value
- if (!state.actuallyConverged) {
- logWarning("LBFGS training finished but the result " +
- s"is not converged because: ${state.convergedReason.get.reason}")
- }
-
val weights = Vectors.fromBreeze(state.x)
val lossHistoryArray = lossHistory.result()
diff --git a/mllib/src/test/java/org/apache/spark/ml/feature/JavaPCASuite.java b/mllib/src/test/java/org/apache/spark/ml/feature/JavaPCASuite.java
index ac479c0841..8c0338e284 100644
--- a/mllib/src/test/java/org/apache/spark/ml/feature/JavaPCASuite.java
+++ b/mllib/src/test/java/org/apache/spark/ml/feature/JavaPCASuite.java
@@ -107,7 +107,11 @@ public class JavaPCASuite extends SharedSparkSession {
.fit(df);
List<Row> result = pca.transform(df).select("pca_features", "expected").toJavaRDD().collect();
for (Row r : result) {
- Assert.assertEquals(r.get(1), r.get(0));
+ Vector calculatedVector = (Vector) r.get(0);
+ Vector expectedVector = (Vector) r.get(1);
+ for (int i = 0; i < calculatedVector.size(); i++) {
+ Assert.assertEquals(calculatedVector.apply(i), expectedVector.apply(i), 1.0e-8);
+ }
}
}
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
index eb050158d4..211e2bc026 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
@@ -118,8 +118,8 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext {
assert(weights.length == 2)
val bdvTopicDist = topicDistribution.asBreeze
val top2Indices = argtopk(bdvTopicDist, 2)
- assert(top2Indices.toArray === indices)
- assert(bdvTopicDist(top2Indices).toArray === weights)
+ assert(top2Indices.toSet === indices.toSet)
+ assert(bdvTopicDist(top2Indices).toArray.toSet === weights.toSet)
}
// Check: log probabilities
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala
index a8d82932d3..2f90afdcee 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala
@@ -18,9 +18,10 @@
package org.apache.spark.mllib.feature
import org.apache.spark.SparkFunSuite
-import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.mllib.util.TestingUtils._
class PCASuite extends SparkFunSuite with MLlibTestSparkContext {
@@ -42,7 +43,9 @@ class PCASuite extends SparkFunSuite with MLlibTestSparkContext {
val pca_transform = pca.transform(dataRDD).collect()
val mat_multiply = mat.multiply(pc).rows.collect()
- assert(pca_transform.toSet === mat_multiply.toSet)
- assert(pca.explainedVariance === explainedVariance)
+ pca_transform.zip(mat_multiply).foreach { case (calculated, expected) =>
+ assert(calculated ~== expected relTol 1e-8)
+ }
+ assert(pca.explainedVariance ~== explainedVariance relTol 1e-8)
}
}