aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorWeichenXu <WeichenXu123@outlook.com>2016-07-26 10:41:41 +0100
committerSean Owen <sowen@cloudera.com>2016-07-26 10:41:41 +0100
commit4c9695598ee00f68aff4eb32d4629edf6facb29f (patch)
treee254af39c8bce081b34cd2c21995d3962f61d1fe /mllib
parent3b2b785ece4394ca332377647a6305ea493f411b (diff)
downloadspark-4c9695598ee00f68aff4eb32d4629edf6facb29f.tar.gz
spark-4c9695598ee00f68aff4eb32d4629edf6facb29f.tar.bz2
spark-4c9695598ee00f68aff4eb32d4629edf6facb29f.zip
[SPARK-16697][ML][MLLIB] improve LDA submitMiniBatch method to avoid redundant RDD computation
## What changes were proposed in this pull request? In `LDAOptimizer.submitMiniBatch`, do persist on `stats: RDD[(BDM[Double], List[BDV[Double]])]` and also move the place of unpersisting `expElogbetaBc` broadcast variable, to avoid the `expElogbetaBc` broadcast variable to be unpersisted too early, and update previous `expElogbetaBc.unpersist()` into `expElogbetaBc.destroy(false)` ## How was this patch tested? Existing test. Author: WeichenXu <WeichenXu123@outlook.com> Closes #14335 from WeichenXu123/improve_LDA.
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala6
1 files changed, 4 insertions, 2 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
index e2c6aca553..ae324f86fe 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
@@ -28,6 +28,7 @@ import org.apache.spark.graphx._
import org.apache.spark.mllib.impl.PeriodicGraphCheckpointer
import org.apache.spark.mllib.linalg.{DenseVector, Matrices, SparseVector, Vector, Vectors}
import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
/**
* :: DeveloperApi ::
@@ -472,12 +473,13 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
gammaPart = gammad :: gammaPart
}
Iterator((stat, gammaPart))
- }
+ }.persist(StorageLevel.MEMORY_AND_DISK)
val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
_ += _, _ += _)
- expElogbetaBc.unpersist()
val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
+ stats.unpersist()
+ expElogbetaBc.destroy(false)
val batchResult = statsSum :* expElogbeta.t
// Note that this is an optimization to avoid batch.count