diff options
author | Feynman Liang <fliang@databricks.com> | 2015-08-01 23:11:25 -0700 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2015-08-01 23:11:25 -0700 |
commit | 28d944e86d066eb4c651dd803f0b022605ed644e (patch) | |
tree | 8d256c51c7c21abd47949b0f223ffef1e06babbd /mllib/src/main | |
parent | 57084e0c7c318912208ee31c52d61c14eeddd8f4 (diff) | |
download | spark-28d944e86d066eb4c651dd803f0b022605ed644e.tar.gz spark-28d944e86d066eb4c651dd803f0b022605ed644e.tar.bz2 spark-28d944e86d066eb4c651dd803f0b022605ed644e.zip |
[SPARK-9000] [MLLIB] Support generic item types in PrefixSpan
mengxr Please review after #7818 merges and master is rebased.
Continues work by rikima
Closes #7400
Author: Feynman Liang <fliang@databricks.com>
Author: masaki rikitoku <rikima3132@gmail.com>
Closes #7837 from feynmanliang/SPARK-7400-genericItems and squashes the following commits:
8b2c756 [Feynman Liang] Remove orig
92443c8 [Feynman Liang] Style fixes
42c6349 [Feynman Liang] Style fix
14e67fc [Feynman Liang] Generic prefixSpan itemtypes
b3b21e0 [Feynman Liang] Initial support for generic itemtype in public api
b86e0d5 [masaki rikitoku] modify to support generic item type
Diffstat (limited to 'mllib/src/main')
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala | 40 |
1 files changed, 35 insertions, 5 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 22b4ddb8b3..c1761c3642 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -18,6 +18,7 @@ package org.apache.spark.mllib.fpm import scala.collection.mutable.ArrayBuilder +import scala.reflect.ClassTag import org.apache.spark.Logging import org.apache.spark.annotation.Experimental @@ -90,15 +91,44 @@ class PrefixSpan private ( } /** - * Find the complete set of sequential patterns in the input sequences. + * Find the complete set of sequential patterns in the input sequences of itemsets. + * @param data ordered sequences of itemsets. + * @return (sequential itemset pattern, count) tuples + */ + def run[Item: ClassTag](data: RDD[Array[Array[Item]]]): RDD[(Array[Array[Item]], Long)] = { + val itemToInt = data.aggregate(Set[Item]())( + seqOp = { (uniqItems, item) => uniqItems ++ item.flatten.toSet }, + combOp = { _ ++ _ } + ).zipWithIndex.toMap + val intToItem = Map() ++ (itemToInt.map { case (k, v) => (v, k) }) + + val dataInternalRepr = data.map { seq => + seq.map(itemset => itemset.map(itemToInt)).reduce((a, b) => a ++ (DELIMITER +: b)) + } + val results = run(dataInternalRepr) + + def toPublicRepr(pattern: Iterable[Int]): List[Array[Item]] = { + pattern.span(_ != DELIMITER) match { + case (x, xs) if xs.size > 1 => x.map(intToItem).toArray :: toPublicRepr(xs.tail) + case (x, xs) => List(x.map(intToItem).toArray) + } + } + results.map { case (seq: Array[Int], count: Long) => + (toPublicRepr(seq).toArray, count) + } + } + + /** + * Find the complete set of sequential patterns in the input sequences. This method utilizes + * the internal representation of itemsets as Array[Int] where each itemset is represented by + * a contiguous sequence of non-negative integers and delimiters represented by [[DELIMITER]]. * @param data ordered sequences of itemsets. Items are represented by non-negative integers. - * Each itemset has one or more items and is delimited by [[DELIMITER]]. + * Each itemset has one or more items and is delimited by [[DELIMITER]]. * @return a set of sequential pattern pairs, * the key of pair is pattern (a list of elements), * the value of pair is the pattern's count. */ - // TODO: generalize to arbitrary item-types and use mapping to Ints for internal algorithm - def run(data: RDD[Array[Int]]): RDD[(Array[Int], Long)] = { + private[fpm] def run(data: RDD[Array[Int]]): RDD[(Array[Int], Long)] = { val sc = data.sparkContext if (data.getStorageLevel == StorageLevel.NONE) { @@ -260,7 +290,7 @@ class PrefixSpan private ( private[fpm] object PrefixSpan { private[fpm] val DELIMITER = -1 - /** Splits a sequence of itemsets delimited by [[DELIMITER]]. */ + /** Splits an array of itemsets delimited by [[DELIMITER]]. */ private[fpm] def splitSequence(sequence: List[Int]): List[Set[Int]] = { sequence.span(_ != DELIMITER) match { case (x, xs) if xs.length > 1 => x.toSet :: splitSequence(xs.tail) |