aboutsummaryrefslogtreecommitdiff
path: root/mllib/src
diff options
context:
space:
mode:
authorYuu ISHIKAWA <yuu.ishikawa@gmail.com>2014-12-15 13:44:15 -0800
committerXiangrui Meng <meng@databricks.com>2014-12-15 13:44:15 -0800
commit8098fab06cb2be22cca4e531e8e65ab29dbb909a (patch)
treef419594f9e6671f1bb4af54d17544d0ee78ca7e3 /mllib/src
parent4c0673879b5c504797dafb11607d14b04c1bf47d (diff)
downloadspark-8098fab06cb2be22cca4e531e8e65ab29dbb909a.tar.gz
spark-8098fab06cb2be22cca4e531e8e65ab29dbb909a.tar.bz2
spark-8098fab06cb2be22cca4e531e8e65ab29dbb909a.zip
[SPARK-4494][mllib] IDFModel.transform() add support for single vector
I improved `IDFModel.transform` to allow using a single vector. [[SPARK-4494] IDFModel.transform() add support for single vector - ASF JIRA](https://issues.apache.org/jira/browse/SPARK-4494) Author: Yuu ISHIKAWA <yuu.ishikawa@gmail.com> Closes #3603 from yu-iskw/idf and squashes the following commits: 256ff3d [Yuu ISHIKAWA] Fix typo a3bf566 [Yuu ISHIKAWA] - Fix typo - Optimize import order - Aggregate the assertion tests - Modify `IDFModel.transform` API for pyspark d25e49b [Yuu ISHIKAWA] Add the implementation of `IDFModel.transform` for a term frequency vector
Diffstat (limited to 'mllib/src')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala73
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/feature/IDFSuite.scala67
2 files changed, 86 insertions, 54 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala
index 720bb70b08..19120e1e8a 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/IDF.scala
@@ -174,37 +174,18 @@ class IDFModel private[mllib] (val idf: Vector) extends Serializable {
*/
def transform(dataset: RDD[Vector]): RDD[Vector] = {
val bcIdf = dataset.context.broadcast(idf)
- dataset.mapPartitions { iter =>
- val thisIdf = bcIdf.value
- iter.map { v =>
- val n = v.size
- v match {
- case sv: SparseVector =>
- val nnz = sv.indices.size
- val newValues = new Array[Double](nnz)
- var k = 0
- while (k < nnz) {
- newValues(k) = sv.values(k) * thisIdf(sv.indices(k))
- k += 1
- }
- Vectors.sparse(n, sv.indices, newValues)
- case dv: DenseVector =>
- val newValues = new Array[Double](n)
- var j = 0
- while (j < n) {
- newValues(j) = dv.values(j) * thisIdf(j)
- j += 1
- }
- Vectors.dense(newValues)
- case other =>
- throw new UnsupportedOperationException(
- s"Only sparse and dense vectors are supported but got ${other.getClass}.")
- }
- }
- }
+ dataset.mapPartitions(iter => iter.map(v => IDFModel.transform(bcIdf.value, v)))
}
/**
+ * Transforms a term frequency (TF) vector to a TF-IDF vector
+ *
+ * @param v a term frequency vector
+ * @return a TF-IDF vector
+ */
+ def transform(v: Vector): Vector = IDFModel.transform(idf, v)
+
+ /**
* Transforms term frequency (TF) vectors to TF-IDF vectors (Java version).
* @param dataset a JavaRDD of term frequency vectors
* @return a JavaRDD of TF-IDF vectors
@@ -213,3 +194,39 @@ class IDFModel private[mllib] (val idf: Vector) extends Serializable {
transform(dataset.rdd).toJavaRDD()
}
}
+
+private object IDFModel {
+
+ /**
+ * Transforms a term frequency (TF) vector to a TF-IDF vector with a IDF vector
+ *
+ * @param idf an IDF vector
+ * @param v a term frequence vector
+ * @return a TF-IDF vector
+ */
+ def transform(idf: Vector, v: Vector): Vector = {
+ val n = v.size
+ v match {
+ case sv: SparseVector =>
+ val nnz = sv.indices.size
+ val newValues = new Array[Double](nnz)
+ var k = 0
+ while (k < nnz) {
+ newValues(k) = sv.values(k) * idf(sv.indices(k))
+ k += 1
+ }
+ Vectors.sparse(n, sv.indices, newValues)
+ case dv: DenseVector =>
+ val newValues = new Array[Double](n)
+ var j = 0
+ while (j < n) {
+ newValues(j) = dv.values(j) * idf(j)
+ j += 1
+ }
+ Vectors.dense(newValues)
+ case other =>
+ throw new UnsupportedOperationException(
+ s"Only sparse and dense vectors are supported but got ${other.getClass}.")
+ }
+ }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/IDFSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/IDFSuite.scala
index 30147e7fd9..0a5cad7caf 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/feature/IDFSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/IDFSuite.scala
@@ -19,8 +19,7 @@ package org.apache.spark.mllib.feature
import org.scalatest.FunSuite
-import org.apache.spark.SparkContext._
-import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vectors}
+import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vectors, Vector}
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.mllib.util.TestingUtils._
@@ -41,18 +40,26 @@ class IDFSuite extends FunSuite with MLlibTestSparkContext {
math.log((m + 1.0) / (x + 1.0))
})
assert(model.idf ~== expected absTol 1e-12)
- val tfidf = model.transform(termFrequencies).cache().zipWithIndex().map(_.swap).collectAsMap()
- assert(tfidf.size === 3)
- val tfidf0 = tfidf(0L).asInstanceOf[SparseVector]
- assert(tfidf0.indices === Array(1, 3))
- assert(Vectors.dense(tfidf0.values) ~==
- Vectors.dense(1.0 * expected(1), 2.0 * expected(3)) absTol 1e-12)
- val tfidf1 = tfidf(1L).asInstanceOf[DenseVector]
- assert(Vectors.dense(tfidf1.values) ~==
- Vectors.dense(0.0, 1.0 * expected(1), 2.0 * expected(2), 3.0 * expected(3)) absTol 1e-12)
- val tfidf2 = tfidf(2L).asInstanceOf[SparseVector]
- assert(tfidf2.indices === Array(1))
- assert(tfidf2.values(0) ~== (1.0 * expected(1)) absTol 1e-12)
+
+ val assertHelper = (tfidf: Array[Vector]) => {
+ assert(tfidf.size === 3)
+ val tfidf0 = tfidf(0).asInstanceOf[SparseVector]
+ assert(tfidf0.indices === Array(1, 3))
+ assert(Vectors.dense(tfidf0.values) ~==
+ Vectors.dense(1.0 * expected(1), 2.0 * expected(3)) absTol 1e-12)
+ val tfidf1 = tfidf(1).asInstanceOf[DenseVector]
+ assert(Vectors.dense(tfidf1.values) ~==
+ Vectors.dense(0.0, 1.0 * expected(1), 2.0 * expected(2), 3.0 * expected(3)) absTol 1e-12)
+ val tfidf2 = tfidf(2).asInstanceOf[SparseVector]
+ assert(tfidf2.indices === Array(1))
+ assert(tfidf2.values(0) ~== (1.0 * expected(1)) absTol 1e-12)
+ }
+ // Transforms a RDD
+ val tfidf = model.transform(termFrequencies).collect()
+ assertHelper(tfidf)
+ // Transforms local vectors
+ val localTfidf = localTermFrequencies.map(model.transform(_)).toArray
+ assertHelper(localTfidf)
}
test("idf minimum document frequency filtering") {
@@ -74,18 +81,26 @@ class IDFSuite extends FunSuite with MLlibTestSparkContext {
}
})
assert(model.idf ~== expected absTol 1e-12)
- val tfidf = model.transform(termFrequencies).cache().zipWithIndex().map(_.swap).collectAsMap()
- assert(tfidf.size === 3)
- val tfidf0 = tfidf(0L).asInstanceOf[SparseVector]
- assert(tfidf0.indices === Array(1, 3))
- assert(Vectors.dense(tfidf0.values) ~==
- Vectors.dense(1.0 * expected(1), 2.0 * expected(3)) absTol 1e-12)
- val tfidf1 = tfidf(1L).asInstanceOf[DenseVector]
- assert(Vectors.dense(tfidf1.values) ~==
- Vectors.dense(0.0, 1.0 * expected(1), 2.0 * expected(2), 3.0 * expected(3)) absTol 1e-12)
- val tfidf2 = tfidf(2L).asInstanceOf[SparseVector]
- assert(tfidf2.indices === Array(1))
- assert(tfidf2.values(0) ~== (1.0 * expected(1)) absTol 1e-12)
+
+ val assertHelper = (tfidf: Array[Vector]) => {
+ assert(tfidf.size === 3)
+ val tfidf0 = tfidf(0).asInstanceOf[SparseVector]
+ assert(tfidf0.indices === Array(1, 3))
+ assert(Vectors.dense(tfidf0.values) ~==
+ Vectors.dense(1.0 * expected(1), 2.0 * expected(3)) absTol 1e-12)
+ val tfidf1 = tfidf(1).asInstanceOf[DenseVector]
+ assert(Vectors.dense(tfidf1.values) ~==
+ Vectors.dense(0.0, 1.0 * expected(1), 2.0 * expected(2), 3.0 * expected(3)) absTol 1e-12)
+ val tfidf2 = tfidf(2).asInstanceOf[SparseVector]
+ assert(tfidf2.indices === Array(1))
+ assert(tfidf2.values(0) ~== (1.0 * expected(1)) absTol 1e-12)
+ }
+ // Transforms a RDD
+ val tfidf = model.transform(termFrequencies).collect()
+ assertHelper(tfidf)
+ // Transforms local vectors
+ val localTfidf = localTermFrequencies.map(model.transform(_)).toArray
+ assertHelper(localTfidf)
}
}