From 8098fab06cb2be22cca4e531e8e65ab29dbb909a Mon Sep 17 00:00:00 2001 From: Yuu ISHIKAWA Date: Mon, 15 Dec 2014 13:44:15 -0800 Subject: [SPARK-4494][mllib] IDFModel.transform() add support for single vector I improved `IDFModel.transform` to allow using a single vector. [[SPARK-4494] IDFModel.transform() add support for single vector - ASF JIRA](https://issues.apache.org/jira/browse/SPARK-4494) Author: Yuu ISHIKAWA Closes #3603 from yu-iskw/idf and squashes the following commits: 256ff3d [Yuu ISHIKAWA] Fix typo a3bf566 [Yuu ISHIKAWA] - Fix typo - Optimize import order - Aggregate the assertion tests - Modify `IDFModel.transform` API for pyspark d25e49b [Yuu ISHIKAWA] Add the implementation of `IDFModel.transform` for a term frequency vector --- .../scala/org/apache/spark/mllib/feature/IDF.scala | 73 +++++++++++++--------- .../org/apache/spark/mllib/feature/IDFSuite.scala | 67 ++++++++++++-------- 2 files changed, 86 insertions(+), 54 deletions(-) (limited to 'mllib/src') diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala index 720bb70b08..19120e1e8a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala @@ -174,36 +174,17 @@ class IDFModel private[mllib] (val idf: Vector) extends Serializable { */ def transform(dataset: RDD[Vector]): RDD[Vector] = { val bcIdf = dataset.context.broadcast(idf) - dataset.mapPartitions { iter => - val thisIdf = bcIdf.value - iter.map { v => - val n = v.size - v match { - case sv: SparseVector => - val nnz = sv.indices.size - val newValues = new Array[Double](nnz) - var k = 0 - while (k < nnz) { - newValues(k) = sv.values(k) * thisIdf(sv.indices(k)) - k += 1 - } - Vectors.sparse(n, sv.indices, newValues) - case dv: DenseVector => - val newValues = new Array[Double](n) - var j = 0 - while (j < n) { - newValues(j) = dv.values(j) * thisIdf(j) - j += 1 - } - Vectors.dense(newValues) - case other => - throw new UnsupportedOperationException( - s"Only sparse and dense vectors are supported but got ${other.getClass}.") - } - } - } + dataset.mapPartitions(iter => iter.map(v => IDFModel.transform(bcIdf.value, v))) } + /** + * Transforms a term frequency (TF) vector to a TF-IDF vector + * + * @param v a term frequency vector + * @return a TF-IDF vector + */ + def transform(v: Vector): Vector = IDFModel.transform(idf, v) + /** * Transforms term frequency (TF) vectors to TF-IDF vectors (Java version). * @param dataset a JavaRDD of term frequency vectors @@ -213,3 +194,39 @@ class IDFModel private[mllib] (val idf: Vector) extends Serializable { transform(dataset.rdd).toJavaRDD() } } + +private object IDFModel { + + /** + * Transforms a term frequency (TF) vector to a TF-IDF vector with a IDF vector + * + * @param idf an IDF vector + * @param v a term frequence vector + * @return a TF-IDF vector + */ + def transform(idf: Vector, v: Vector): Vector = { + val n = v.size + v match { + case sv: SparseVector => + val nnz = sv.indices.size + val newValues = new Array[Double](nnz) + var k = 0 + while (k < nnz) { + newValues(k) = sv.values(k) * idf(sv.indices(k)) + k += 1 + } + Vectors.sparse(n, sv.indices, newValues) + case dv: DenseVector => + val newValues = new Array[Double](n) + var j = 0 + while (j < n) { + newValues(j) = dv.values(j) * idf(j) + j += 1 + } + Vectors.dense(newValues) + case other => + throw new UnsupportedOperationException( + s"Only sparse and dense vectors are supported but got ${other.getClass}.") + } + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/IDFSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/IDFSuite.scala index 30147e7fd9..0a5cad7caf 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/IDFSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/IDFSuite.scala @@ -19,8 +19,7 @@ package org.apache.spark.mllib.feature import org.scalatest.FunSuite -import org.apache.spark.SparkContext._ -import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vectors} +import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vectors, Vector} import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.util.TestingUtils._ @@ -41,18 +40,26 @@ class IDFSuite extends FunSuite with MLlibTestSparkContext { math.log((m + 1.0) / (x + 1.0)) }) assert(model.idf ~== expected absTol 1e-12) - val tfidf = model.transform(termFrequencies).cache().zipWithIndex().map(_.swap).collectAsMap() - assert(tfidf.size === 3) - val tfidf0 = tfidf(0L).asInstanceOf[SparseVector] - assert(tfidf0.indices === Array(1, 3)) - assert(Vectors.dense(tfidf0.values) ~== - Vectors.dense(1.0 * expected(1), 2.0 * expected(3)) absTol 1e-12) - val tfidf1 = tfidf(1L).asInstanceOf[DenseVector] - assert(Vectors.dense(tfidf1.values) ~== - Vectors.dense(0.0, 1.0 * expected(1), 2.0 * expected(2), 3.0 * expected(3)) absTol 1e-12) - val tfidf2 = tfidf(2L).asInstanceOf[SparseVector] - assert(tfidf2.indices === Array(1)) - assert(tfidf2.values(0) ~== (1.0 * expected(1)) absTol 1e-12) + + val assertHelper = (tfidf: Array[Vector]) => { + assert(tfidf.size === 3) + val tfidf0 = tfidf(0).asInstanceOf[SparseVector] + assert(tfidf0.indices === Array(1, 3)) + assert(Vectors.dense(tfidf0.values) ~== + Vectors.dense(1.0 * expected(1), 2.0 * expected(3)) absTol 1e-12) + val tfidf1 = tfidf(1).asInstanceOf[DenseVector] + assert(Vectors.dense(tfidf1.values) ~== + Vectors.dense(0.0, 1.0 * expected(1), 2.0 * expected(2), 3.0 * expected(3)) absTol 1e-12) + val tfidf2 = tfidf(2).asInstanceOf[SparseVector] + assert(tfidf2.indices === Array(1)) + assert(tfidf2.values(0) ~== (1.0 * expected(1)) absTol 1e-12) + } + // Transforms a RDD + val tfidf = model.transform(termFrequencies).collect() + assertHelper(tfidf) + // Transforms local vectors + val localTfidf = localTermFrequencies.map(model.transform(_)).toArray + assertHelper(localTfidf) } test("idf minimum document frequency filtering") { @@ -74,18 +81,26 @@ class IDFSuite extends FunSuite with MLlibTestSparkContext { } }) assert(model.idf ~== expected absTol 1e-12) - val tfidf = model.transform(termFrequencies).cache().zipWithIndex().map(_.swap).collectAsMap() - assert(tfidf.size === 3) - val tfidf0 = tfidf(0L).asInstanceOf[SparseVector] - assert(tfidf0.indices === Array(1, 3)) - assert(Vectors.dense(tfidf0.values) ~== - Vectors.dense(1.0 * expected(1), 2.0 * expected(3)) absTol 1e-12) - val tfidf1 = tfidf(1L).asInstanceOf[DenseVector] - assert(Vectors.dense(tfidf1.values) ~== - Vectors.dense(0.0, 1.0 * expected(1), 2.0 * expected(2), 3.0 * expected(3)) absTol 1e-12) - val tfidf2 = tfidf(2L).asInstanceOf[SparseVector] - assert(tfidf2.indices === Array(1)) - assert(tfidf2.values(0) ~== (1.0 * expected(1)) absTol 1e-12) + + val assertHelper = (tfidf: Array[Vector]) => { + assert(tfidf.size === 3) + val tfidf0 = tfidf(0).asInstanceOf[SparseVector] + assert(tfidf0.indices === Array(1, 3)) + assert(Vectors.dense(tfidf0.values) ~== + Vectors.dense(1.0 * expected(1), 2.0 * expected(3)) absTol 1e-12) + val tfidf1 = tfidf(1).asInstanceOf[DenseVector] + assert(Vectors.dense(tfidf1.values) ~== + Vectors.dense(0.0, 1.0 * expected(1), 2.0 * expected(2), 3.0 * expected(3)) absTol 1e-12) + val tfidf2 = tfidf(2).asInstanceOf[SparseVector] + assert(tfidf2.indices === Array(1)) + assert(tfidf2.values(0) ~== (1.0 * expected(1)) absTol 1e-12) + } + // Transforms a RDD + val tfidf = model.transform(termFrequencies).collect() + assertHelper(tfidf) + // Transforms local vectors + val localTfidf = localTermFrequencies.map(model.transform(_)).toArray + assertHelper(localTfidf) } } -- cgit v1.2.3