aboutsummaryrefslogtreecommitdiff
path: root/mllib/src/main
diff options
context:
space:
mode:
authorFeynman Liang <fliang@databricks.com>2015-08-01 23:11:25 -0700
committerXiangrui Meng <meng@databricks.com>2015-08-01 23:11:25 -0700
commit28d944e86d066eb4c651dd803f0b022605ed644e (patch)
tree8d256c51c7c21abd47949b0f223ffef1e06babbd /mllib/src/main
parent57084e0c7c318912208ee31c52d61c14eeddd8f4 (diff)
downloadspark-28d944e86d066eb4c651dd803f0b022605ed644e.tar.gz
spark-28d944e86d066eb4c651dd803f0b022605ed644e.tar.bz2
spark-28d944e86d066eb4c651dd803f0b022605ed644e.zip
[SPARK-9000] [MLLIB] Support generic item types in PrefixSpan
mengxr Please review after #7818 merges and master is rebased. Continues work by rikima Closes #7400 Author: Feynman Liang <fliang@databricks.com> Author: masaki rikitoku <rikima3132@gmail.com> Closes #7837 from feynmanliang/SPARK-7400-genericItems and squashes the following commits: 8b2c756 [Feynman Liang] Remove orig 92443c8 [Feynman Liang] Style fixes 42c6349 [Feynman Liang] Style fix 14e67fc [Feynman Liang] Generic prefixSpan itemtypes b3b21e0 [Feynman Liang] Initial support for generic itemtype in public api b86e0d5 [masaki rikitoku] modify to support generic item type
Diffstat (limited to 'mllib/src/main')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala40
1 files changed, 35 insertions, 5 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
index 22b4ddb8b3..c1761c3642 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
@@ -18,6 +18,7 @@
package org.apache.spark.mllib.fpm
import scala.collection.mutable.ArrayBuilder
+import scala.reflect.ClassTag
import org.apache.spark.Logging
import org.apache.spark.annotation.Experimental
@@ -90,15 +91,44 @@ class PrefixSpan private (
}
/**
- * Find the complete set of sequential patterns in the input sequences.
+ * Find the complete set of sequential patterns in the input sequences of itemsets.
+ * @param data ordered sequences of itemsets.
+ * @return (sequential itemset pattern, count) tuples
+ */
+ def run[Item: ClassTag](data: RDD[Array[Array[Item]]]): RDD[(Array[Array[Item]], Long)] = {
+ val itemToInt = data.aggregate(Set[Item]())(
+ seqOp = { (uniqItems, item) => uniqItems ++ item.flatten.toSet },
+ combOp = { _ ++ _ }
+ ).zipWithIndex.toMap
+ val intToItem = Map() ++ (itemToInt.map { case (k, v) => (v, k) })
+
+ val dataInternalRepr = data.map { seq =>
+ seq.map(itemset => itemset.map(itemToInt)).reduce((a, b) => a ++ (DELIMITER +: b))
+ }
+ val results = run(dataInternalRepr)
+
+ def toPublicRepr(pattern: Iterable[Int]): List[Array[Item]] = {
+ pattern.span(_ != DELIMITER) match {
+ case (x, xs) if xs.size > 1 => x.map(intToItem).toArray :: toPublicRepr(xs.tail)
+ case (x, xs) => List(x.map(intToItem).toArray)
+ }
+ }
+ results.map { case (seq: Array[Int], count: Long) =>
+ (toPublicRepr(seq).toArray, count)
+ }
+ }
+
+ /**
+ * Find the complete set of sequential patterns in the input sequences. This method utilizes
+ * the internal representation of itemsets as Array[Int] where each itemset is represented by
+ * a contiguous sequence of non-negative integers and delimiters represented by [[DELIMITER]].
* @param data ordered sequences of itemsets. Items are represented by non-negative integers.
- * Each itemset has one or more items and is delimited by [[DELIMITER]].
+ * Each itemset has one or more items and is delimited by [[DELIMITER]].
* @return a set of sequential pattern pairs,
* the key of pair is pattern (a list of elements),
* the value of pair is the pattern's count.
*/
- // TODO: generalize to arbitrary item-types and use mapping to Ints for internal algorithm
- def run(data: RDD[Array[Int]]): RDD[(Array[Int], Long)] = {
+ private[fpm] def run(data: RDD[Array[Int]]): RDD[(Array[Int], Long)] = {
val sc = data.sparkContext
if (data.getStorageLevel == StorageLevel.NONE) {
@@ -260,7 +290,7 @@ class PrefixSpan private (
private[fpm] object PrefixSpan {
private[fpm] val DELIMITER = -1
- /** Splits a sequence of itemsets delimited by [[DELIMITER]]. */
+ /** Splits an array of itemsets delimited by [[DELIMITER]]. */
private[fpm] def splitSequence(sequence: List[Int]): List[Set[Int]] = {
sequence.span(_ != DELIMITER) match {
case (x, xs) if xs.length > 1 => x.toSet :: splitSequence(xs.tail)