aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMechCoder <manojkumarsivaraj334@gmail.com>2015-08-13 13:42:35 -0700
committerJoseph K. Bradley <joseph@databricks.com>2015-08-13 13:42:35 -0700
commit864de8eaf4b6ad5c9099f6f29e251c56b029f631 (patch)
tree02817e0b6860c2899ff366af83757d9dd4d014df
parent8815ba2f674dbb18eb499216df9942b411e10daa (diff)
downloadspark-864de8eaf4b6ad5c9099f6f29e251c56b029f631.tar.gz
spark-864de8eaf4b6ad5c9099f6f29e251c56b029f631.tar.bz2
spark-864de8eaf4b6ad5c9099f6f29e251c56b029f631.zip
[SPARK-9661] [MLLIB] [ML] Java compatibility
I skimmed through the docs for various instance of Object and replaced them with Java compaible versions of the same. 1. Some methods in LDAModel. 2. runMiniBatchSGD 3. kolmogorovSmirnovTest Author: MechCoder <manojkumarsivaraj334@gmail.com> Closes #8126 from MechCoder/java_incop.
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala27
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala16
-rw-r--r--mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java24
-rw-r--r--mllib/src/test/java/org/apache/spark/mllib/stat/JavaStatisticsSuite.java22
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala13
5 files changed, 99 insertions, 3 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
index 5dc637ebdc..f31949f13a 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAModel.scala
@@ -26,7 +26,7 @@ import org.json4s.jackson.JsonMethods._
import org.apache.spark.SparkContext
import org.apache.spark.annotation.Experimental
-import org.apache.spark.api.java.JavaPairRDD
+import org.apache.spark.api.java.{JavaPairRDD, JavaRDD}
import org.apache.spark.graphx.{Edge, EdgeContext, Graph, VertexId}
import org.apache.spark.mllib.linalg.{Matrices, Matrix, Vector, Vectors}
import org.apache.spark.mllib.util.{Loader, Saveable}
@@ -228,6 +228,11 @@ class LocalLDAModel private[clustering] (
docConcentration, topicConcentration, topicsMatrix.toBreeze.toDenseMatrix, gammaShape, k,
vocabSize)
+ /** Java-friendly version of [[logLikelihood]] */
+ def logLikelihood(documents: JavaPairRDD[java.lang.Long, Vector]): Double = {
+ logLikelihood(documents.rdd.asInstanceOf[RDD[(Long, Vector)]])
+ }
+
/**
* Calculate an upper bound bound on perplexity. (Lower is better.)
* See Equation (16) in original Online LDA paper.
@@ -242,6 +247,11 @@ class LocalLDAModel private[clustering] (
-logLikelihood(documents) / corpusTokenCount
}
+ /** Java-friendly version of [[logPerplexity]] */
+ def logPerplexity(documents: JavaPairRDD[java.lang.Long, Vector]): Double = {
+ logPerplexity(documents.rdd.asInstanceOf[RDD[(Long, Vector)]])
+ }
+
/**
* Estimate the variational likelihood bound of from `documents`:
* log p(documents) >= E_q[log p(documents)] - E_q[log q(documents)]
@@ -341,8 +351,14 @@ class LocalLDAModel private[clustering] (
}
}
-}
+ /** Java-friendly version of [[topicDistributions]] */
+ def topicDistributions(
+ documents: JavaPairRDD[java.lang.Long, Vector]): JavaPairRDD[java.lang.Long, Vector] = {
+ val distributions = topicDistributions(documents.rdd.asInstanceOf[RDD[(Long, Vector)]])
+ JavaPairRDD.fromRDD(distributions.asInstanceOf[RDD[(java.lang.Long, Vector)]])
+ }
+}
@Experimental
object LocalLDAModel extends Loader[LocalLDAModel] {
@@ -657,6 +673,13 @@ class DistributedLDAModel private[clustering] (
}
}
+ /** Java-friendly version of [[topTopicsPerDocument]] */
+ def javaTopTopicsPerDocument(
+ k: Int): JavaRDD[(java.lang.Long, Array[Int], Array[java.lang.Double])] = {
+ val topics = topTopicsPerDocument(k)
+ topics.asInstanceOf[RDD[(java.lang.Long, Array[Int], Array[java.lang.Double])]].toJavaRDD()
+ }
+
// TODO:
// override def topicDistributions(documents: RDD[(Long, Vector)]): RDD[(Long, Vector)] = ???
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala
index f84502919e..24fe48cb8f 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala
@@ -20,7 +20,7 @@ package org.apache.spark.mllib.stat
import scala.annotation.varargs
import org.apache.spark.annotation.Experimental
-import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.api.java.{JavaRDD, JavaDoubleRDD}
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.mllib.linalg.{Matrix, Vector}
import org.apache.spark.mllib.regression.LabeledPoint
@@ -178,6 +178,9 @@ object Statistics {
ChiSqTest.chiSquaredFeatures(data)
}
+ /** Java-friendly version of [[chiSqTest()]] */
+ def chiSqTest(data: JavaRDD[LabeledPoint]): Array[ChiSqTestResult] = chiSqTest(data.rdd)
+
/**
* Conduct the two-sided Kolmogorov-Smirnov (KS) test for data sampled from a
* continuous distribution. By comparing the largest difference between the empirical cumulative
@@ -212,4 +215,15 @@ object Statistics {
: KolmogorovSmirnovTestResult = {
KolmogorovSmirnovTest.testOneSample(data, distName, params: _*)
}
+
+ /** Java-friendly version of [[kolmogorovSmirnovTest()]] */
+ @varargs
+ def kolmogorovSmirnovTest(
+ data: JavaDoubleRDD,
+ distName: String,
+ params: java.lang.Double*): KolmogorovSmirnovTestResult = {
+ val javaParams = params.asInstanceOf[Seq[Double]]
+ KolmogorovSmirnovTest.testOneSample(data.rdd.asInstanceOf[RDD[Double]],
+ distName, javaParams: _*)
+ }
}
diff --git a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java
index d272a42c85..427be9430d 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/clustering/JavaLDASuite.java
@@ -124,6 +124,10 @@ public class JavaLDASuite implements Serializable {
}
});
assertEquals(topicDistributions.count(), nonEmptyCorpus.count());
+
+ // Check: javaTopTopicsPerDocuments
+ JavaRDD<scala.Tuple3<java.lang.Long, int[], java.lang.Double[]>> topTopics =
+ model.javaTopTopicsPerDocument(3);
}
@Test
@@ -160,11 +164,31 @@ public class JavaLDASuite implements Serializable {
assertEquals(roundedLocalTopicSummary.length, k);
}
+ @Test
+ public void localLdaMethods() {
+ JavaRDD<Tuple2<Long, Vector>> docs = sc.parallelize(toyData, 2);
+ JavaPairRDD<Long, Vector> pairedDocs = JavaPairRDD.fromJavaRDD(docs);
+
+ // check: topicDistributions
+ assertEquals(toyModel.topicDistributions(pairedDocs).count(), pairedDocs.count());
+
+ // check: logPerplexity
+ double logPerplexity = toyModel.logPerplexity(pairedDocs);
+
+ // check: logLikelihood.
+ ArrayList<Tuple2<Long, Vector>> docsSingleWord = new ArrayList<Tuple2<Long, Vector>>();
+ docsSingleWord.add(new Tuple2<Long, Vector>(Long.valueOf(0), Vectors.dense(1.0, 0.0, 0.0)));
+ JavaPairRDD<Long, Vector> single = JavaPairRDD.fromJavaRDD(sc.parallelize(docsSingleWord));
+ double logLikelihood = toyModel.logLikelihood(single);
+ }
+
private static int tinyK = LDASuite$.MODULE$.tinyK();
private static int tinyVocabSize = LDASuite$.MODULE$.tinyVocabSize();
private static Matrix tinyTopics = LDASuite$.MODULE$.tinyTopics();
private static Tuple2<int[], double[]>[] tinyTopicDescription =
LDASuite$.MODULE$.tinyTopicDescription();
private JavaPairRDD<Long, Vector> corpus;
+ private LocalLDAModel toyModel = LDASuite$.MODULE$.toyModel();
+ private ArrayList<Tuple2<Long, Vector>> toyData = LDASuite$.MODULE$.javaToyData();
}
diff --git a/mllib/src/test/java/org/apache/spark/mllib/stat/JavaStatisticsSuite.java b/mllib/src/test/java/org/apache/spark/mllib/stat/JavaStatisticsSuite.java
index 62f7f26b7c..eb4e369862 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/stat/JavaStatisticsSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/stat/JavaStatisticsSuite.java
@@ -27,7 +27,12 @@ import org.junit.Test;
import static org.junit.Assert.assertEquals;
import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.mllib.linalg.Vectors;
+import org.apache.spark.mllib.regression.LabeledPoint;
+import org.apache.spark.mllib.stat.test.ChiSqTestResult;
+import org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult;
public class JavaStatisticsSuite implements Serializable {
private transient JavaSparkContext sc;
@@ -53,4 +58,21 @@ public class JavaStatisticsSuite implements Serializable {
// Check default method
assertEquals(corr1, corr2);
}
+
+ @Test
+ public void kolmogorovSmirnovTest() {
+ JavaDoubleRDD data = sc.parallelizeDoubles(Lists.newArrayList(0.2, 1.0, -1.0, 2.0));
+ KolmogorovSmirnovTestResult testResult1 = Statistics.kolmogorovSmirnovTest(data, "norm");
+ KolmogorovSmirnovTestResult testResult2 = Statistics.kolmogorovSmirnovTest(
+ data, "norm", 0.0, 1.0);
+ }
+
+ @Test
+ public void chiSqTest() {
+ JavaRDD<LabeledPoint> data = sc.parallelize(Lists.newArrayList(
+ new LabeledPoint(0.0, Vectors.dense(0.1, 2.3)),
+ new LabeledPoint(1.0, Vectors.dense(1.5, 5.1)),
+ new LabeledPoint(0.0, Vectors.dense(2.4, 8.1))));
+ ChiSqTestResult[] testResults = Statistics.chiSqTest(data);
+ }
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
index ce6a8eb8e8..926185e90b 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/LDASuite.scala
@@ -17,6 +17,8 @@
package org.apache.spark.mllib.clustering
+import java.util.{ArrayList => JArrayList}
+
import breeze.linalg.{DenseMatrix => BDM, argtopk, max, argmax}
import org.apache.spark.SparkFunSuite
@@ -575,6 +577,17 @@ private[clustering] object LDASuite {
Vectors.sparse(6, Array(4, 5), Array(1, 1))
).zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) }
+ /** Used in the Java Test Suite */
+ def javaToyData: JArrayList[(java.lang.Long, Vector)] = {
+ val javaData = new JArrayList[(java.lang.Long, Vector)]
+ var i = 0
+ while (i < toyData.size) {
+ javaData.add((toyData(i)._1, toyData(i)._2))
+ i += 1
+ }
+ javaData
+ }
+
def toyModel: LocalLDAModel = {
val k = 2
val vocabSize = 6