diff options
author | Feynman Liang <fliang@databricks.com> | 2015-08-01 23:11:25 -0700 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2015-08-01 23:11:25 -0700 |
commit | 28d944e86d066eb4c651dd803f0b022605ed644e (patch) | |
tree | 8d256c51c7c21abd47949b0f223ffef1e06babbd | |
parent | 57084e0c7c318912208ee31c52d61c14eeddd8f4 (diff) | |
download | spark-28d944e86d066eb4c651dd803f0b022605ed644e.tar.gz spark-28d944e86d066eb4c651dd803f0b022605ed644e.tar.bz2 spark-28d944e86d066eb4c651dd803f0b022605ed644e.zip |
[SPARK-9000] [MLLIB] Support generic item types in PrefixSpan
mengxr Please review after #7818 merges and master is rebased.
Continues work by rikima
Closes #7400
Author: Feynman Liang <fliang@databricks.com>
Author: masaki rikitoku <rikima3132@gmail.com>
Closes #7837 from feynmanliang/SPARK-7400-genericItems and squashes the following commits:
8b2c756 [Feynman Liang] Remove orig
92443c8 [Feynman Liang] Style fixes
42c6349 [Feynman Liang] Style fix
14e67fc [Feynman Liang] Generic prefixSpan itemtypes
b3b21e0 [Feynman Liang] Initial support for generic itemtype in public api
b86e0d5 [masaki rikitoku] modify to support generic item type
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala | 40 | ||||
-rw-r--r-- | mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala | 104 |
2 files changed, 132 insertions, 12 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 22b4ddb8b3..c1761c3642 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -18,6 +18,7 @@ package org.apache.spark.mllib.fpm import scala.collection.mutable.ArrayBuilder +import scala.reflect.ClassTag import org.apache.spark.Logging import org.apache.spark.annotation.Experimental @@ -90,15 +91,44 @@ class PrefixSpan private ( } /** - * Find the complete set of sequential patterns in the input sequences. + * Find the complete set of sequential patterns in the input sequences of itemsets. + * @param data ordered sequences of itemsets. + * @return (sequential itemset pattern, count) tuples + */ + def run[Item: ClassTag](data: RDD[Array[Array[Item]]]): RDD[(Array[Array[Item]], Long)] = { + val itemToInt = data.aggregate(Set[Item]())( + seqOp = { (uniqItems, item) => uniqItems ++ item.flatten.toSet }, + combOp = { _ ++ _ } + ).zipWithIndex.toMap + val intToItem = Map() ++ (itemToInt.map { case (k, v) => (v, k) }) + + val dataInternalRepr = data.map { seq => + seq.map(itemset => itemset.map(itemToInt)).reduce((a, b) => a ++ (DELIMITER +: b)) + } + val results = run(dataInternalRepr) + + def toPublicRepr(pattern: Iterable[Int]): List[Array[Item]] = { + pattern.span(_ != DELIMITER) match { + case (x, xs) if xs.size > 1 => x.map(intToItem).toArray :: toPublicRepr(xs.tail) + case (x, xs) => List(x.map(intToItem).toArray) + } + } + results.map { case (seq: Array[Int], count: Long) => + (toPublicRepr(seq).toArray, count) + } + } + + /** + * Find the complete set of sequential patterns in the input sequences. This method utilizes + * the internal representation of itemsets as Array[Int] where each itemset is represented by + * a contiguous sequence of non-negative integers and delimiters represented by [[DELIMITER]]. * @param data ordered sequences of itemsets. Items are represented by non-negative integers. - * Each itemset has one or more items and is delimited by [[DELIMITER]]. + * Each itemset has one or more items and is delimited by [[DELIMITER]]. * @return a set of sequential pattern pairs, * the key of pair is pattern (a list of elements), * the value of pair is the pattern's count. */ - // TODO: generalize to arbitrary item-types and use mapping to Ints for internal algorithm - def run(data: RDD[Array[Int]]): RDD[(Array[Int], Long)] = { + private[fpm] def run(data: RDD[Array[Int]]): RDD[(Array[Int], Long)] = { val sc = data.sparkContext if (data.getStorageLevel == StorageLevel.NONE) { @@ -260,7 +290,7 @@ class PrefixSpan private ( private[fpm] object PrefixSpan { private[fpm] val DELIMITER = -1 - /** Splits a sequence of itemsets delimited by [[DELIMITER]]. */ + /** Splits an array of itemsets delimited by [[DELIMITER]]. */ private[fpm] def splitSequence(sequence: List[Int]): List[Set[Int]] = { sequence.span(_ != DELIMITER) match { case (x, xs) if xs.length > 1 => x.toSet :: splitSequence(xs.tail) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala index 457f32670f..d87f61e385 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala @@ -21,7 +21,7 @@ import org.apache.spark.mllib.util.MLlibTestSparkContext class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext { - test("PrefixSpan using Integer type, singleton itemsets") { + test("PrefixSpan internal (integer seq, -1 delim) run, singleton itemsets") { /* library("arulesSequences") @@ -69,7 +69,7 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext { (Array(4, -1, 5), 2L), (Array(5), 3L) ) - compareResults(expectedValue1, result1.collect()) + compareInternalResults(expectedValue1, result1.collect()) prefixspan.setMinSupport(0.5).setMaxPatternLength(50) val result2 = prefixspan.run(rdd) @@ -80,7 +80,7 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext { (Array(4), 4L), (Array(5), 3L) ) - compareResults(expectedValue2, result2.collect()) + compareInternalResults(expectedValue2, result2.collect()) prefixspan.setMinSupport(0.33).setMaxPatternLength(2) val result3 = prefixspan.run(rdd) @@ -100,10 +100,10 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext { (Array(4, -1, 5), 2L), (Array(5), 3L) ) - compareResults(expectedValue3, result3.collect()) + compareInternalResults(expectedValue3, result3.collect()) } - test("PrefixSpan using Integer type, variable-size itemsets") { + test("PrefixSpan internal (integer seq, -1 delim) run, variable-size itemsets") { val sequences = Array( Array(1, -1, 1, 2, 3, -1, 1, 3, -1, 4, -1, 3, 6), Array(1, 4, -1, 3, -1, 2, 3, -1, 1, 5), @@ -254,10 +254,100 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext { (Array(1, -1, 2, 3, -1, 1), 2L), (Array(1, -1, 2, -1, 1), 2L)) - compareResults(expectedValue, result.collect()) + compareInternalResults(expectedValue, result.collect()) } - private def compareResults( + test("PrefixSpan Integer type, variable-size itemsets") { + val sequences = Seq( + Array(Array(1, 2), Array(3)), + Array(Array(1), Array(3, 2), Array(1, 2)), + Array(Array(1, 2), Array(5)), + Array(Array(6))) + val rdd = sc.parallelize(sequences, 2).cache() + + val prefixspan = new PrefixSpan() + .setMinSupport(0.5) + .setMaxPatternLength(5) + + /* + To verify results, create file "prefixSpanSeqs2" with content + (format = (transactionID, idxInTransaction, numItemsinItemset, itemset)): + 1 1 2 1 2 + 1 2 1 3 + 2 1 1 1 + 2 2 2 3 2 + 2 3 2 1 2 + 3 1 2 1 2 + 3 2 1 5 + 4 1 1 6 + In R, run: + library("arulesSequences") + prefixSpanSeqs = read_baskets("prefixSpanSeqs", info = c("sequenceID","eventID","SIZE")) + freqItemSeq = cspade(prefixSpanSeqs, + parameter = 0.5, maxlen = 5 )) + resSeq = as(freqItemSeq, "data.frame") + resSeq + + sequence support + 1 <{1}> 0.75 + 2 <{2}> 0.75 + 3 <{3}> 0.50 + 4 <{1},{3}> 0.50 + 5 <{1,2}> 0.75 + */ + + val result = prefixspan.run(rdd) + val expected = Array( + (Array(Array(1)), 3L), + (Array(Array(2)), 3L), + (Array(Array(3)), 2L), + (Array(Array(1), Array(3)), 2L), + (Array(Array(1, 2)), 3L) + ) + compareResults(expected, result.collect()) + } + + test("PrefixSpan String type, variable-size itemsets") { + // This is the same test as "PrefixSpan Int type, variable-size itemsets" except + // mapped to Strings + val intToString = (1 to 6).zip(Seq("a", "b", "c", "d", "e", "f")).toMap + val sequences = Seq( + Array(Array(1, 2), Array(3)), + Array(Array(1), Array(3, 2), Array(1, 2)), + Array(Array(1, 2), Array(5)), + Array(Array(6))).map(seq => seq.map(itemSet => itemSet.map(intToString))) + val rdd = sc.parallelize(sequences, 2).cache() + + val prefixspan = new PrefixSpan() + .setMinSupport(0.5) + .setMaxPatternLength(5) + + val result = prefixspan.run(rdd) + val expected = Array( + (Array(Array(1)), 3L), + (Array(Array(2)), 3L), + (Array(Array(3)), 2L), + (Array(Array(1), Array(3)), 2L), + (Array(Array(1, 2)), 3L) + ).map { case (pattern, count) => + (pattern.map(itemSet => itemSet.map(intToString)), count) + } + compareResults(expected, result.collect()) + } + + private def compareResults[Item]( + expectedValue: Array[(Array[Array[Item]], Long)], + actualValue: Array[(Array[Array[Item]], Long)]): Unit = { + val expectedSet = expectedValue.map { case (pattern: Array[Array[Item]], count: Long) => + (pattern.map(itemSet => itemSet.toSet).toSeq, count) + }.toSet + val actualSet = actualValue.map { case (pattern: Array[Array[Item]], count: Long) => + (pattern.map(itemSet => itemSet.toSet).toSeq, count) + }.toSet + assert(expectedSet === actualSet) + } + + private def compareInternalResults( expectedValue: Array[(Array[Int], Long)], actualValue: Array[(Array[Int], Long)]): Unit = { val expectedSet = expectedValue.map(x => (x._1.toSeq, x._2)).toSet |