aboutsummaryrefslogtreecommitdiff
path: root/mllib/src
diff options
context:
space:
mode:
authorXusen Yin <yinxusen@gmail.com>2015-04-24 00:39:29 -0700
committerXiangrui Meng <meng@databricks.com>2015-04-24 00:39:29 -0700
commit8509519d8bcf99e2d1b5e21da514d51357f9116d (patch)
tree88793e5ffe728bbe0ef4ff2027d623610aef69cb /mllib/src
parent4c722d77ae7e77eeaa7531687fa9bd6050344d18 (diff)
downloadspark-8509519d8bcf99e2d1b5e21da514d51357f9116d.tar.gz
spark-8509519d8bcf99e2d1b5e21da514d51357f9116d.tar.bz2
spark-8509519d8bcf99e2d1b5e21da514d51357f9116d.zip
[SPARK-5894] [ML] Add polynomial mapper
See [SPARK-5894](https://issues.apache.org/jira/browse/SPARK-5894). Author: Xusen Yin <yinxusen@gmail.com> Author: Xiangrui Meng <meng@databricks.com> Closes #5245 from yinxusen/SPARK-5894 and squashes the following commits: dc461a6 [Xusen Yin] merge polynomial expansion v2 6d0c3cc [Xusen Yin] Merge branch 'SPARK-5894' of https://github.com/mengxr/spark into mengxr-SPARK-5894 57bfdd5 [Xusen Yin] Merge branch 'master' into SPARK-5894 3d02a7d [Xusen Yin] Merge branch 'master' into SPARK-5894 a067da2 [Xiangrui Meng] a new approach for poly expansion 0789d81 [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into SPARK-5894 4e9aed0 [Xusen Yin] fix test suite 95d8fb9 [Xusen Yin] fix sparse vector indices 8d39674 [Xusen Yin] fix sparse vector expansion error 5998dd6 [Xusen Yin] fix dense vector fillin fa3ade3 [Xusen Yin] change the functional code into imperative one to speedup b70e7e1 [Xusen Yin] remove useless case class 6fa236f [Xusen Yin] fix vector slice error daff601 [Xusen Yin] fix index error of sparse vector 6bd0a10 [Xusen Yin] merge repeated features 419f8a2 [Xusen Yin] need to merge same columns 4ebf34e [Xusen Yin] add test suite of polynomial expansion 372227c [Xusen Yin] add polynomial expansion
Diffstat (limited to 'mllib/src')
-rw-r--r--mllib/src/main/scala/org/apache/spark/ml/feature/PolynomialExpansion.scala167
-rw-r--r--mllib/src/test/scala/org/apache/spark/ml/feature/PolynomialExpansionSuite.scala104
2 files changed, 271 insertions, 0 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/PolynomialExpansion.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/PolynomialExpansion.scala
new file mode 100644
index 0000000000..c3a59a361d
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/PolynomialExpansion.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.feature
+
+import scala.collection.mutable
+
+import org.apache.spark.annotation.AlphaComponent
+import org.apache.spark.ml.UnaryTransformer
+import org.apache.spark.ml.param.{IntParam, ParamMap}
+import org.apache.spark.mllib.linalg._
+import org.apache.spark.sql.types.DataType
+
+/**
+ * :: AlphaComponent ::
+ * Perform feature expansion in a polynomial space. As said in wikipedia of Polynomial Expansion,
+ * which is available at [[http://en.wikipedia.org/wiki/Polynomial_expansion]], "In mathematics, an
+ * expansion of a product of sums expresses it as a sum of products by using the fact that
+ * multiplication distributes over addition". Take a 2-variable feature vector as an example:
+ * `(x, y)`, if we want to expand it with degree 2, then we get `(x, y, x * x, x * y, y * y)`.
+ */
+@AlphaComponent
+class PolynomialExpansion extends UnaryTransformer[Vector, Vector, PolynomialExpansion] {
+
+ /**
+ * The polynomial degree to expand, which should be larger than 1.
+ * @group param
+ */
+ val degree = new IntParam(this, "degree", "the polynomial degree to expand")
+ setDefault(degree -> 2)
+
+ /** @group getParam */
+ def getDegree: Int = getOrDefault(degree)
+
+ /** @group setParam */
+ def setDegree(value: Int): this.type = set(degree, value)
+
+ override protected def createTransformFunc(paramMap: ParamMap): Vector => Vector = { v =>
+ val d = paramMap(degree)
+ PolynomialExpansion.expand(v, d)
+ }
+
+ override protected def outputDataType: DataType = new VectorUDT()
+}
+
+/**
+ * The expansion is done via recursion. Given n features and degree d, the size after expansion is
+ * (n + d choose d) (including 1 and first-order values). For example, let f([a, b, c], 3) be the
+ * function that expands [a, b, c] to their monomials of degree 3. We have the following recursion:
+ *
+ * {{{
+ * f([a, b, c], 3) = f([a, b], 3) ++ f([a, b], 2) * c ++ f([a, b], 1) * c^2 ++ [c^3]
+ * }}}
+ *
+ * To handle sparsity, if c is zero, we can skip all monomials that contain it. We remember the
+ * current index and increment it properly for sparse input.
+ */
+object PolynomialExpansion {
+
+ private def choose(n: Int, k: Int): Int = {
+ Range(n, n - k, -1).product / Range(k, 1, -1).product
+ }
+
+ private def getPolySize(numFeatures: Int, degree: Int): Int = choose(numFeatures + degree, degree)
+
+ private def expandDense(
+ values: Array[Double],
+ lastIdx: Int,
+ degree: Int,
+ multiplier: Double,
+ polyValues: Array[Double],
+ curPolyIdx: Int): Int = {
+ if (multiplier == 0.0) {
+ // do nothing
+ } else if (degree == 0 || lastIdx < 0) {
+ polyValues(curPolyIdx) = multiplier
+ } else {
+ val v = values(lastIdx)
+ val lastIdx1 = lastIdx - 1
+ var alpha = multiplier
+ var i = 0
+ var curStart = curPolyIdx
+ while (i <= degree && alpha != 0.0) {
+ curStart = expandDense(values, lastIdx1, degree - i, alpha, polyValues, curStart)
+ i += 1
+ alpha *= v
+ }
+ }
+ curPolyIdx + getPolySize(lastIdx + 1, degree)
+ }
+
+ private def expandSparse(
+ indices: Array[Int],
+ values: Array[Double],
+ lastIdx: Int,
+ lastFeatureIdx: Int,
+ degree: Int,
+ multiplier: Double,
+ polyIndices: mutable.ArrayBuilder[Int],
+ polyValues: mutable.ArrayBuilder[Double],
+ curPolyIdx: Int): Int = {
+ if (multiplier == 0.0) {
+ // do nothing
+ } else if (degree == 0 || lastIdx < 0) {
+ polyIndices += curPolyIdx
+ polyValues += multiplier
+ } else {
+ // Skip all zeros at the tail.
+ val v = values(lastIdx)
+ val lastIdx1 = lastIdx - 1
+ val lastFeatureIdx1 = indices(lastIdx) - 1
+ var alpha = multiplier
+ var curStart = curPolyIdx
+ var i = 0
+ while (i <= degree && alpha != 0.0) {
+ curStart = expandSparse(indices, values, lastIdx1, lastFeatureIdx1, degree - i, alpha,
+ polyIndices, polyValues, curStart)
+ i += 1
+ alpha *= v
+ }
+ }
+ curPolyIdx + getPolySize(lastFeatureIdx + 1, degree)
+ }
+
+ private def expand(dv: DenseVector, degree: Int): DenseVector = {
+ val n = dv.size
+ val polySize = getPolySize(n, degree)
+ val polyValues = new Array[Double](polySize)
+ expandDense(dv.values, n - 1, degree, 1.0, polyValues, 0)
+ new DenseVector(polyValues)
+ }
+
+ private def expand(sv: SparseVector, degree: Int): SparseVector = {
+ val polySize = getPolySize(sv.size, degree)
+ val nnz = sv.values.length
+ val nnzPolySize = getPolySize(nnz, degree)
+ val polyIndices = mutable.ArrayBuilder.make[Int]
+ polyIndices.sizeHint(nnzPolySize)
+ val polyValues = mutable.ArrayBuilder.make[Double]
+ polyValues.sizeHint(nnzPolySize)
+ expandSparse(
+ sv.indices, sv.values, nnz - 1, sv.size - 1, degree, 1.0, polyIndices, polyValues, 0)
+ new SparseVector(polySize, polyIndices.result(), polyValues.result())
+ }
+
+ def expand(v: Vector, degree: Int): Vector = {
+ v match {
+ case dv: DenseVector => expand(dv, degree)
+ case sv: SparseVector => expand(sv, degree)
+ case _ => throw new IllegalArgumentException
+ }
+ }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/PolynomialExpansionSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/PolynomialExpansionSuite.scala
new file mode 100644
index 0000000000..b0a537be42
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/PolynomialExpansionSuite.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.feature
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, Vectors}
+import org.apache.spark.mllib.util.MLlibTestSparkContext
+import org.apache.spark.mllib.util.TestingUtils._
+import org.apache.spark.sql.{Row, SQLContext}
+import org.scalatest.exceptions.TestFailedException
+
+class PolynomialExpansionSuite extends FunSuite with MLlibTestSparkContext {
+
+ @transient var sqlContext: SQLContext = _
+
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ sqlContext = new SQLContext(sc)
+ }
+
+ test("Polynomial expansion with default parameter") {
+ val data = Array(
+ Vectors.sparse(3, Seq((0, -2.0), (1, 2.3))),
+ Vectors.dense(-2.0, 2.3),
+ Vectors.dense(0.0, 0.0, 0.0),
+ Vectors.dense(0.6, -1.1, -3.0),
+ Vectors.sparse(3, Seq())
+ )
+
+ val twoDegreeExpansion: Array[Vector] = Array(
+ Vectors.sparse(10, Array(0, 1, 2, 3, 4, 5), Array(1.0, -2.0, 4.0, 2.3, -4.6, 5.29)),
+ Vectors.dense(1.0, -2.0, 4.0, 2.3, -4.6, 5.29),
+ Vectors.dense(Array(1.0) ++ Array.fill[Double](9)(0.0)),
+ Vectors.dense(1.0, 0.6, 0.36, -1.1, -0.66, 1.21, -3.0, -1.8, 3.3, 9.0),
+ Vectors.sparse(10, Array(0), Array(1.0)))
+
+ val df = sqlContext.createDataFrame(data.zip(twoDegreeExpansion)).toDF("features", "expected")
+
+ val polynomialExpansion = new PolynomialExpansion()
+ .setInputCol("features")
+ .setOutputCol("polyFeatures")
+
+ polynomialExpansion.transform(df).select("polyFeatures", "expected").collect().foreach {
+ case Row(expanded: DenseVector, expected: DenseVector) =>
+ assert(expanded ~== expected absTol 1e-1)
+ case Row(expanded: SparseVector, expected: SparseVector) =>
+ assert(expanded ~== expected absTol 1e-1)
+ case _ =>
+ throw new TestFailedException("Unmatched data types after polynomial expansion", 0)
+ }
+ }
+
+ test("Polynomial expansion with setter") {
+ val data = Array(
+ Vectors.sparse(3, Seq((0, -2.0), (1, 2.3))),
+ Vectors.dense(-2.0, 2.3),
+ Vectors.dense(0.0, 0.0, 0.0),
+ Vectors.dense(0.6, -1.1, -3.0),
+ Vectors.sparse(3, Seq())
+ )
+
+ val threeDegreeExpansion: Array[Vector] = Array(
+ Vectors.sparse(20, Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9),
+ Array(1.0, -2.0, 4.0, -8.0, 2.3, -4.6, 9.2, 5.29, -10.58, 12.17)),
+ Vectors.dense(1.0, -2.0, 4.0, -8.0, 2.3, -4.6, 9.2, 5.29, -10.58, 12.17),
+ Vectors.dense(Array(1.0) ++ Array.fill[Double](19)(0.0)),
+ Vectors.dense(1.0, 0.6, 0.36, 0.216, -1.1, -0.66, -0.396, 1.21, 0.726, -1.331, -3.0, -1.8,
+ -1.08, 3.3, 1.98, -3.63, 9.0, 5.4, -9.9, -27.0),
+ Vectors.sparse(20, Array(0), Array(1.0)))
+
+ val df = sqlContext.createDataFrame(data.zip(threeDegreeExpansion)).toDF("features", "expected")
+
+ val polynomialExpansion = new PolynomialExpansion()
+ .setInputCol("features")
+ .setOutputCol("polyFeatures")
+ .setDegree(3)
+
+ polynomialExpansion.transform(df).select("polyFeatures", "expected").collect().foreach {
+ case Row(expanded: DenseVector, expected: DenseVector) =>
+ assert(expanded ~== expected absTol 1e-1)
+ case Row(expanded: SparseVector, expected: SparseVector) =>
+ assert(expanded ~== expected absTol 1e-1)
+ case _ =>
+ throw new TestFailedException("Unmatched data types after polynomial expansion", 0)
+ }
+ }
+}
+