aboutsummaryrefslogtreecommitdiff
path: root/mllib/src/main
diff options
context:
space:
mode:
authorXiangrui Meng <meng@databricks.com>2015-08-02 11:50:17 -0700
committerXiangrui Meng <meng@databricks.com>2015-08-02 11:50:17 -0700
commit66924ffa6bdb8e0df1b90b789cb7ad443377e729 (patch)
tree1ba157fd78b99c18d7773c52d389cd02bed858b3 /mllib/src/main
parent8eafa2aeb6c1b465cfdb99f04c2137fc3eac0c01 (diff)
downloadspark-66924ffa6bdb8e0df1b90b789cb7ad443377e729.tar.gz
spark-66924ffa6bdb8e0df1b90b789cb7ad443377e729.tar.bz2
spark-66924ffa6bdb8e0df1b90b789cb7ad443377e729.zip
[SPARK-9527] [MLLIB] add PrefixSpanModel and make PrefixSpan Java friendly
1. Use `PrefixSpanModel` to wrap the frequent sequences. 2. Define `FreqSequence` to wrap each frequent sequence, which contains a Java-friendly method `javaSequence` 3. Overload `run` for Java users. 4. Added a unit test in Java to check Java compatibility. zhangjiajin feynmanliang Author: Xiangrui Meng <meng@databricks.com> Closes #7869 from mengxr/SPARK-9527 and squashes the following commits: 4345594 [Xiangrui Meng] add PrefixSpanModel and make PrefixSpan Java friendly
Diffstat (limited to 'mllib/src/main')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala52
1 files changed, 47 insertions, 5 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
index c1761c3642..9eaf733fad 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
@@ -17,11 +17,16 @@
package org.apache.spark.mllib.fpm
+import java.{lang => jl, util => ju}
+
+import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuilder
import scala.reflect.ClassTag
import org.apache.spark.Logging
import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
@@ -93,9 +98,9 @@ class PrefixSpan private (
/**
* Find the complete set of sequential patterns in the input sequences of itemsets.
* @param data ordered sequences of itemsets.
- * @return (sequential itemset pattern, count) tuples
+ * @return a [[PrefixSpanModel]] that contains the frequent sequences
*/
- def run[Item: ClassTag](data: RDD[Array[Array[Item]]]): RDD[(Array[Array[Item]], Long)] = {
+ def run[Item: ClassTag](data: RDD[Array[Array[Item]]]): PrefixSpanModel[Item] = {
val itemToInt = data.aggregate(Set[Item]())(
seqOp = { (uniqItems, item) => uniqItems ++ item.flatten.toSet },
combOp = { _ ++ _ }
@@ -113,9 +118,25 @@ class PrefixSpan private (
case (x, xs) => List(x.map(intToItem).toArray)
}
}
- results.map { case (seq: Array[Int], count: Long) =>
- (toPublicRepr(seq).toArray, count)
+ val freqSequences = results.map { case (seq: Array[Int], count: Long) =>
+ new FreqSequence[Item](toPublicRepr(seq).toArray, count)
}
+ new PrefixSpanModel[Item](freqSequences)
+ }
+
+ /**
+ * A Java-friendly version of [[run()]] that reads sequences from a [[JavaRDD]] and returns
+ * frequent sequences in a [[PrefixSpanModel]].
+ * @param data ordered sequences of itemsets stored as Java Iterable of Iterables
+ * @tparam Item item type
+ * @tparam Itemset itemset type, which is an Iterable of Items
+ * @tparam Sequence sequence type, which is an Iterable of Itemsets
+ * @return a [[PrefixSpanModel]] that contains the frequent sequences
+ */
+ def run[Item, Itemset <: jl.Iterable[Item], Sequence <: jl.Iterable[Itemset]](
+ data: JavaRDD[Sequence]): PrefixSpanModel[Item] = {
+ implicit val tag = fakeClassTag[Item]
+ run(data.rdd.map(_.asScala.map(_.asScala.toArray).toArray))
}
/**
@@ -287,7 +308,7 @@ class PrefixSpan private (
}
-private[fpm] object PrefixSpan {
+object PrefixSpan {
private[fpm] val DELIMITER = -1
/** Splits an array of itemsets delimited by [[DELIMITER]]. */
@@ -313,4 +334,25 @@ private[fpm] object PrefixSpan {
// TODO: improve complexity by using partial prefixes, considering one item at a time
itemSet.subsets.filter(_ != Set.empty[Int])
}
+
+ /**
+ * Represents a frequence sequence.
+ * @param sequence a sequence of itemsets stored as an Array of Arrays
+ * @param freq frequency
+ * @tparam Item item type
+ */
+ class FreqSequence[Item](val sequence: Array[Array[Item]], val freq: Long) extends Serializable {
+ /**
+ * Returns sequence as a Java List of lists for Java users.
+ */
+ def javaSequence: ju.List[ju.List[Item]] = sequence.map(_.toList.asJava).toList.asJava
+ }
}
+
+/**
+ * Model fitted by [[PrefixSpan]]
+ * @param freqSequences frequent sequences
+ * @tparam Item item type
+ */
+class PrefixSpanModel[Item](val freqSequences: RDD[PrefixSpan.FreqSequence[Item]])
+ extends Serializable