diff options
author | Xiangrui Meng <meng@databricks.com> | 2016-02-22 23:54:21 -0800 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2016-02-22 23:54:21 -0800 |
commit | 764ca18037b6b1884fbc4be9a011714a81495020 (patch) | |
tree | a98d020e1ac9804ac231351526273ffe1abf016a | |
parent | 72427c3e115daf06f7ad8aa50115a8e0da2c6d62 (diff) | |
download | spark-764ca18037b6b1884fbc4be9a011714a81495020.tar.gz spark-764ca18037b6b1884fbc4be9a011714a81495020.tar.bz2 spark-764ca18037b6b1884fbc4be9a011714a81495020.zip |
[SPARK-13355][MLLIB] replace GraphImpl.fromExistingRDDs by Graph.apply
`GraphImpl.fromExistingRDDs` expects preprocessed vertex RDD as input. We call it in LDA without validating this requirement. So it might introduce errors. Replacing it by `Graph.apply` would be safer and more proper because it is a public API. The tests still pass. So maybe it is safe to use `fromExistingRDDs` here (though it doesn't seem so based on the implementation) or the test cases are special. jkbradley ankurdave
Author: Xiangrui Meng <meng@databricks.com>
Closes #11226 from mengxr/SPARK-13355.
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala | 3 |
1 files changed, 1 insertions, 2 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index 7a41f74191..7491ab0d51 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -25,7 +25,6 @@ import breeze.stats.distributions.{Gamma, RandBasis} import org.apache.spark.annotation.{DeveloperApi, Since} import org.apache.spark.graphx._ -import org.apache.spark.graphx.impl.GraphImpl import org.apache.spark.mllib.impl.PeriodicGraphCheckpointer import org.apache.spark.mllib.linalg.{DenseVector, Matrices, SparseVector, Vector, Vectors} import org.apache.spark.rdd.RDD @@ -188,7 +187,7 @@ final class EMLDAOptimizer extends LDAOptimizer { graph.aggregateMessages[(Boolean, TopicCounts)](sendMsg, mergeMsg) .mapValues(_._2) // Update the vertex descriptors with the new counts. - val newGraph = GraphImpl.fromExistingRDDs(docTopicDistributions, graph.edges) + val newGraph = Graph(docTopicDistributions, graph.edges) graph = newGraph graphCheckpointer.update(newGraph) globalTopicTotals = computeGlobalTopicTotals() |