diff options
author | DB Tsai <dbtsai@alpinenow.com> | 2014-11-25 11:07:11 -0800 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2014-11-25 11:07:11 -0800 |
commit | bf1a6aaac577757a293a573fe8eae9669697310a (patch) | |
tree | a8b55ae43f7332cf3ccdd418716baae99b451d13 | |
parent | 69cd53eae205eb10d52eaf38466db58a23b6ae81 (diff) | |
download | spark-bf1a6aaac577757a293a573fe8eae9669697310a.tar.gz spark-bf1a6aaac577757a293a573fe8eae9669697310a.tar.bz2 spark-bf1a6aaac577757a293a573fe8eae9669697310a.zip |
[SPARK-4581][MLlib] Refactorize StandardScaler to improve the transformation performance
The following optimizations are done to improve the StandardScaler model
transformation performance.
1) Covert Breeze dense vector to primitive vector to reduce the overhead.
2) Since mean can be potentially a sparse vector, we explicitly convert it to dense primitive vector.
3) Have a local reference to `shift` and `factor` array so JVM can locate the value with one operation call.
4) In pattern matching part, we use the mllib SparseVector/DenseVector instead of breeze's vector to
make the codebase cleaner.
Benchmark with mnist8m dataset:
Before,
DenseVector withMean and withStd: 50.97secs
DenseVector withMean and withoutStd: 42.11secs
DenseVector withoutMean and withStd: 8.75secs
SparseVector withoutMean and withStd: 5.437secs
With this PR,
DenseVector withMean and withStd: 5.76secs
DenseVector withMean and withoutStd: 5.28secs
DenseVector withoutMean and withStd: 5.30secs
SparseVector withoutMean and withStd: 1.27secs
Note that without the local reference copy of `factor` and `shift` arrays,
the runtime is almost three time slower.
DenseVector withMean and withStd: 18.15secs
DenseVector withMean and withoutStd: 18.05secs
DenseVector withoutMean and withStd: 18.54secs
SparseVector withoutMean and withStd: 2.01secs
The following code,
```scala
while (i < size) {
values(i) = (values(i) - shift(i)) * factor(i)
i += 1
}
```
will generate the bytecode
```
L13
LINENUMBER 106 L13
FRAME FULL [org/apache/spark/mllib/feature/StandardScalerModel org/apache/spark/mllib/linalg/Vector org/apache/spark/mllib/linalg/Vector org/apache/spark/mllib/linalg/DenseVector T [D I I] []
ILOAD 7
ILOAD 6
IF_ICMPGE L14
L15
LINENUMBER 107 L15
ALOAD 5
ILOAD 7
ALOAD 5
ILOAD 7
DALOAD
ALOAD 0
INVOKESPECIAL org/apache/spark/mllib/feature/StandardScalerModel.shift ()[D
ILOAD 7
DALOAD
DSUB
ALOAD 0
INVOKESPECIAL org/apache/spark/mllib/feature/StandardScalerModel.factor ()[D
ILOAD 7
DALOAD
DMUL
DASTORE
L16
LINENUMBER 108 L16
ILOAD 7
ICONST_1
IADD
ISTORE 7
GOTO L13
```
, while with the local reference of the `shift` and `factor` arrays, the bytecode will be
```
L14
LINENUMBER 107 L14
ALOAD 0
INVOKESPECIAL org/apache/spark/mllib/feature/StandardScalerModel.factor ()[D
ASTORE 9
L15
LINENUMBER 108 L15
FRAME FULL [org/apache/spark/mllib/feature/StandardScalerModel org/apache/spark/mllib/linalg/Vector [D org/apache/spark/mllib/linalg/Vector org/apache/spark/mllib/linalg/DenseVector T [D I I [D] []
ILOAD 8
ILOAD 7
IF_ICMPGE L16
L17
LINENUMBER 109 L17
ALOAD 6
ILOAD 8
ALOAD 6
ILOAD 8
DALOAD
ALOAD 2
ILOAD 8
DALOAD
DSUB
ALOAD 9
ILOAD 8
DALOAD
DMUL
DASTORE
L18
LINENUMBER 110 L18
ILOAD 8
ICONST_1
IADD
ISTORE 8
GOTO L15
```
You can see that with local reference, the both of the arrays will be in the stack, so JVM can access the value without calling `INVOKESPECIAL`.
Author: DB Tsai <dbtsai@alpinenow.com>
Closes #3435 from dbtsai/standardscaler and squashes the following commits:
85885a9 [DB Tsai] revert to have lazy in shift array.
daf2b06 [DB Tsai] Address the feedback
cdb5cef [DB Tsai] small change
9c51eef [DB Tsai] style
fc795e4 [DB Tsai] update
5bffd3d [DB Tsai] first commit
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala | 70 |
1 files changed, 50 insertions, 20 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala index 4dfd1f0ab8..8c4c5db525 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/StandardScaler.scala @@ -17,11 +17,9 @@ package org.apache.spark.mllib.feature -import breeze.linalg.{DenseVector => BDV, SparseVector => BSV} - import org.apache.spark.Logging import org.apache.spark.annotation.Experimental -import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors} import org.apache.spark.mllib.rdd.RDDFunctions._ import org.apache.spark.mllib.stat.MultivariateOnlineSummarizer import org.apache.spark.rdd.RDD @@ -77,8 +75,8 @@ class StandardScalerModel private[mllib] ( require(mean.size == variance.size) - private lazy val factor: BDV[Double] = { - val f = BDV.zeros[Double](variance.size) + private lazy val factor: Array[Double] = { + val f = Array.ofDim[Double](variance.size) var i = 0 while (i < f.size) { f(i) = if (variance(i) != 0.0) 1.0 / math.sqrt(variance(i)) else 0.0 @@ -87,6 +85,11 @@ class StandardScalerModel private[mllib] ( f } + // Since `shift` will be only used in `withMean` branch, we have it as + // `lazy val` so it will be evaluated in that branch. Note that we don't + // want to create this array multiple times in `transform` function. + private lazy val shift: Array[Double] = mean.toArray + /** * Applies standardization transformation on a vector. * @@ -97,30 +100,57 @@ class StandardScalerModel private[mllib] ( override def transform(vector: Vector): Vector = { require(mean.size == vector.size) if (withMean) { - vector.toBreeze match { - case dv: BDV[Double] => - val output = vector.toBreeze.copy - var i = 0 - while (i < output.length) { - output(i) = (output(i) - mean(i)) * (if (withStd) factor(i) else 1.0) - i += 1 + // By default, Scala generates Java methods for member variables. So every time when + // the member variables are accessed, `invokespecial` will be called which is expensive. + // This can be avoid by having a local reference of `shift`. + val localShift = shift + vector match { + case dv: DenseVector => + val values = dv.values.clone() + val size = values.size + if (withStd) { + // Having a local reference of `factor` to avoid overhead as the comment before. + val localFactor = factor + var i = 0 + while (i < size) { + values(i) = (values(i) - localShift(i)) * localFactor(i) + i += 1 + } + } else { + var i = 0 + while (i < size) { + values(i) -= localShift(i) + i += 1 + } } - Vectors.fromBreeze(output) + Vectors.dense(values) case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass) } } else if (withStd) { - vector.toBreeze match { - case dv: BDV[Double] => Vectors.fromBreeze(dv :* factor) - case sv: BSV[Double] => + // Having a local reference of `factor` to avoid overhead as the comment before. + val localFactor = factor + vector match { + case dv: DenseVector => + val values = dv.values.clone() + val size = values.size + var i = 0 + while(i < size) { + values(i) *= localFactor(i) + i += 1 + } + Vectors.dense(values) + case sv: SparseVector => // For sparse vector, the `index` array inside sparse vector object will not be changed, // so we can re-use it to save memory. - val output = new BSV[Double](sv.index, sv.data.clone(), sv.length) + val indices = sv.indices + val values = sv.values.clone() + val nnz = values.size var i = 0 - while (i < output.data.length) { - output.data(i) *= factor(output.index(i)) + while (i < nnz) { + values(i) *= localFactor(indices(i)) i += 1 } - Vectors.fromBreeze(output) + Vectors.sparse(sv.size, indices, values) case v => throw new IllegalArgumentException("Do not support vector type " + v.getClass) } } else { |