diff options
author | Xiangrui Meng <meng@databricks.com> | 2015-08-04 22:28:49 -0700 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2015-08-04 22:28:49 -0700 |
commit | a02bcf20c4fc9e2e182630d197221729e996afc2 (patch) | |
tree | addf5a311acafad2849dd32c7a8c47f88f1f702f /mllib/src/test | |
parent | f7abd6bec9d51ed4ab6359e50eac853e64ecae86 (diff) | |
download | spark-a02bcf20c4fc9e2e182630d197221729e996afc2.tar.gz spark-a02bcf20c4fc9e2e182630d197221729e996afc2.tar.bz2 spark-a02bcf20c4fc9e2e182630d197221729e996afc2.zip |
[SPARK-9540] [MLLIB] optimize PrefixSpan implementation
This is a major refactoring of the PrefixSpan implementation. It contains the following changes:
1. Expand prefix with one item at a time. The existing implementation generates all subsets for each itemset, which might have scalability issue when the itemset is large.
2. Use a new internal format. `<(12)(31)>` is represented by `[0, 1, 2, 0, 1, 3, 0]` internally. We use `0` because negative numbers are used to indicates partial prefix items, e.g., `_2` is represented by `-2`.
3. Remember the start indices of all partial projections in the projected postfix to help next projection.
4. Reuse the original sequence array for projected postfixes.
5. Use `Prefix` IDs in aggregation rather than its content.
6. Use `ArrayBuilder` for building primitive arrays.
7. Expose `maxLocalProjDBSize`.
8. Tests are not changed except using `0` instead of `-1` as the delimiter.
`Postfix`'s API doc should be a good place to start.
Closes #7594
feynmanliang zhangjiajin
Author: Xiangrui Meng <meng@databricks.com>
Closes #7937 from mengxr/SPARK-9540 and squashes the following commits:
2d0ec31 [Xiangrui Meng] address more comments
48f450c [Xiangrui Meng] address comments from Feynman; fixed a bug in project and added a test
65f90e8 [Xiangrui Meng] naming and documentation
8afc86a [Xiangrui Meng] refactor impl
Diffstat (limited to 'mllib/src/test')
-rw-r--r-- | mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala | 271 |
1 files changed, 143 insertions, 128 deletions
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala index 0ae48d62cc..a83e543859 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala @@ -21,7 +21,7 @@ import org.apache.spark.mllib.util.MLlibTestSparkContext class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext { - test("PrefixSpan internal (integer seq, -1 delim) run, singleton itemsets") { + test("PrefixSpan internal (integer seq, 0 delim) run, singleton itemsets") { /* library("arulesSequences") @@ -35,83 +35,81 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext { */ val sequences = Array( - Array(1, -1, 3, -1, 4, -1, 5), - Array(2, -1, 3, -1, 1), - Array(2, -1, 4, -1, 1), - Array(3, -1, 1, -1, 3, -1, 4, -1, 5), - Array(3, -1, 4, -1, 4, -1, 3), - Array(6, -1, 5, -1, 3)) + Array(0, 1, 0, 3, 0, 4, 0, 5, 0), + Array(0, 2, 0, 3, 0, 1, 0), + Array(0, 2, 0, 4, 0, 1, 0), + Array(0, 3, 0, 1, 0, 3, 0, 4, 0, 5, 0), + Array(0, 3, 0, 4, 0, 4, 0, 3, 0), + Array(0, 6, 0, 5, 0, 3, 0)) val rdd = sc.parallelize(sequences, 2).cache() - val prefixspan = new PrefixSpan() - .setMinSupport(0.33) - .setMaxPatternLength(50) - val result1 = prefixspan.run(rdd) + val result1 = PrefixSpan.genFreqPatterns( + rdd, minCount = 2L, maxPatternLength = 50, maxLocalProjDBSize = 16L) val expectedValue1 = Array( - (Array(1), 4L), - (Array(1, -1, 3), 2L), - (Array(1, -1, 3, -1, 4), 2L), - (Array(1, -1, 3, -1, 4, -1, 5), 2L), - (Array(1, -1, 3, -1, 5), 2L), - (Array(1, -1, 4), 2L), - (Array(1, -1, 4, -1, 5), 2L), - (Array(1, -1, 5), 2L), - (Array(2), 2L), - (Array(2, -1, 1), 2L), - (Array(3), 5L), - (Array(3, -1, 1), 2L), - (Array(3, -1, 3), 2L), - (Array(3, -1, 4), 3L), - (Array(3, -1, 4, -1, 5), 2L), - (Array(3, -1, 5), 2L), - (Array(4), 4L), - (Array(4, -1, 5), 2L), - (Array(5), 3L) + (Array(0, 1, 0), 4L), + (Array(0, 1, 0, 3, 0), 2L), + (Array(0, 1, 0, 3, 0, 4, 0), 2L), + (Array(0, 1, 0, 3, 0, 4, 0, 5, 0), 2L), + (Array(0, 1, 0, 3, 0, 5, 0), 2L), + (Array(0, 1, 0, 4, 0), 2L), + (Array(0, 1, 0, 4, 0, 5, 0), 2L), + (Array(0, 1, 0, 5, 0), 2L), + (Array(0, 2, 0), 2L), + (Array(0, 2, 0, 1, 0), 2L), + (Array(0, 3, 0), 5L), + (Array(0, 3, 0, 1, 0), 2L), + (Array(0, 3, 0, 3, 0), 2L), + (Array(0, 3, 0, 4, 0), 3L), + (Array(0, 3, 0, 4, 0, 5, 0), 2L), + (Array(0, 3, 0, 5, 0), 2L), + (Array(0, 4, 0), 4L), + (Array(0, 4, 0, 5, 0), 2L), + (Array(0, 5, 0), 3L) ) compareInternalResults(expectedValue1, result1.collect()) - prefixspan.setMinSupport(0.5).setMaxPatternLength(50) - val result2 = prefixspan.run(rdd) + val result2 = PrefixSpan.genFreqPatterns( + rdd, minCount = 3, maxPatternLength = 50, maxLocalProjDBSize = 32L) val expectedValue2 = Array( - (Array(1), 4L), - (Array(3), 5L), - (Array(3, -1, 4), 3L), - (Array(4), 4L), - (Array(5), 3L) + (Array(0, 1, 0), 4L), + (Array(0, 3, 0), 5L), + (Array(0, 3, 0, 4, 0), 3L), + (Array(0, 4, 0), 4L), + (Array(0, 5, 0), 3L) ) compareInternalResults(expectedValue2, result2.collect()) - prefixspan.setMinSupport(0.33).setMaxPatternLength(2) - val result3 = prefixspan.run(rdd) + val result3 = PrefixSpan.genFreqPatterns( + rdd, minCount = 2, maxPatternLength = 2, maxLocalProjDBSize = 32L) val expectedValue3 = Array( - (Array(1), 4L), - (Array(1, -1, 3), 2L), - (Array(1, -1, 4), 2L), - (Array(1, -1, 5), 2L), - (Array(2, -1, 1), 2L), - (Array(2), 2L), - (Array(3), 5L), - (Array(3, -1, 1), 2L), - (Array(3, -1, 3), 2L), - (Array(3, -1, 4), 3L), - (Array(3, -1, 5), 2L), - (Array(4), 4L), - (Array(4, -1, 5), 2L), - (Array(5), 3L) + (Array(0, 1, 0), 4L), + (Array(0, 1, 0, 3, 0), 2L), + (Array(0, 1, 0, 4, 0), 2L), + (Array(0, 1, 0, 5, 0), 2L), + (Array(0, 2, 0, 1, 0), 2L), + (Array(0, 2, 0), 2L), + (Array(0, 3, 0), 5L), + (Array(0, 3, 0, 1, 0), 2L), + (Array(0, 3, 0, 3, 0), 2L), + (Array(0, 3, 0, 4, 0), 3L), + (Array(0, 3, 0, 5, 0), 2L), + (Array(0, 4, 0), 4L), + (Array(0, 4, 0, 5, 0), 2L), + (Array(0, 5, 0), 3L) ) compareInternalResults(expectedValue3, result3.collect()) } test("PrefixSpan internal (integer seq, -1 delim) run, variable-size itemsets") { val sequences = Array( - Array(1, -1, 1, 2, 3, -1, 1, 3, -1, 4, -1, 3, 6), - Array(1, 4, -1, 3, -1, 2, 3, -1, 1, 5), - Array(5, 6, -1, 1, 2, -1, 4, 6, -1, 3, -1, 2), - Array(5, -1, 7, -1, 1, 6, -1, 3, -1, 2, -1, 3)) + Array(0, 1, 0, 1, 2, 3, 0, 1, 3, 0, 4, 0, 3, 6, 0), + Array(0, 1, 4, 0, 3, 0, 2, 3, 0, 1, 5, 0), + Array(0, 5, 6, 0, 1, 2, 0, 4, 6, 0, 3, 0, 2, 0), + Array(0, 5, 0, 7, 0, 1, 6, 0, 3, 0, 2, 0, 3, 0)) val rdd = sc.parallelize(sequences, 2).cache() - val prefixspan = new PrefixSpan().setMinSupport(0.5).setMaxPatternLength(5) - val result = prefixspan.run(rdd) + val result = PrefixSpan.genFreqPatterns( + rdd, minCount = 2, maxPatternLength = 5, maxLocalProjDBSize = 128L) /* To verify results, create file "prefixSpanSeqs" with content @@ -200,63 +198,87 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext { 53 <{1},{2},{1}> 0.50 */ val expectedValue = Array( - (Array(1), 4L), - (Array(2), 4L), - (Array(3), 4L), - (Array(4), 3L), - (Array(5), 3L), - (Array(6), 3L), - (Array(1, -1, 6), 2L), - (Array(2, -1, 6), 2L), - (Array(5, -1, 6), 2L), - (Array(1, 2, -1, 6), 2L), - (Array(1, -1, 4), 2L), - (Array(2, -1, 4), 2L), - (Array(1, 2, -1, 4), 2L), - (Array(1, -1, 3), 4L), - (Array(2, -1, 3), 3L), - (Array(2, 3), 2L), - (Array(3, -1, 3), 3L), - (Array(4, -1, 3), 3L), - (Array(5, -1, 3), 2L), - (Array(6, -1, 3), 2L), - (Array(5, -1, 6, -1, 3), 2L), - (Array(6, -1, 2, -1, 3), 2L), - (Array(5, -1, 2, -1, 3), 2L), - (Array(5, -1, 1, -1, 3), 2L), - (Array(2, -1, 4, -1, 3), 2L), - (Array(1, -1, 4, -1, 3), 2L), - (Array(1, 2, -1, 4, -1, 3), 2L), - (Array(1, -1, 3, -1, 3), 3L), - (Array(1, 2, -1, 3), 2L), - (Array(1, -1, 2, -1, 3), 2L), - (Array(1, -1, 2, 3), 2L), - (Array(1, -1, 2), 4L), - (Array(1, 2), 2L), - (Array(3, -1, 2), 3L), - (Array(4, -1, 2), 2L), - (Array(5, -1, 2), 2L), - (Array(6, -1, 2), 2L), - (Array(5, -1, 6, -1, 2), 2L), - (Array(6, -1, 3, -1, 2), 2L), - (Array(5, -1, 3, -1, 2), 2L), - (Array(5, -1, 1, -1, 2), 2L), - (Array(4, -1, 3, -1, 2), 2L), - (Array(1, -1, 3, -1, 2), 3L), - (Array(5, -1, 6, -1, 3, -1, 2), 2L), - (Array(5, -1, 1, -1, 3, -1, 2), 2L), - (Array(1, -1, 1), 2L), - (Array(2, -1, 1), 2L), - (Array(3, -1, 1), 2L), - (Array(5, -1, 1), 2L), - (Array(2, 3, -1, 1), 2L), - (Array(1, -1, 3, -1, 1), 2L), - (Array(1, -1, 2, 3, -1, 1), 2L), - (Array(1, -1, 2, -1, 1), 2L)) + (Array(0, 1, 0), 4L), + (Array(0, 2, 0), 4L), + (Array(0, 3, 0), 4L), + (Array(0, 4, 0), 3L), + (Array(0, 5, 0), 3L), + (Array(0, 6, 0), 3L), + (Array(0, 1, 0, 6, 0), 2L), + (Array(0, 2, 0, 6, 0), 2L), + (Array(0, 5, 0, 6, 0), 2L), + (Array(0, 1, 2, 0, 6, 0), 2L), + (Array(0, 1, 0, 4, 0), 2L), + (Array(0, 2, 0, 4, 0), 2L), + (Array(0, 1, 2, 0, 4, 0), 2L), + (Array(0, 1, 0, 3, 0), 4L), + (Array(0, 2, 0, 3, 0), 3L), + (Array(0, 2, 3, 0), 2L), + (Array(0, 3, 0, 3, 0), 3L), + (Array(0, 4, 0, 3, 0), 3L), + (Array(0, 5, 0, 3, 0), 2L), + (Array(0, 6, 0, 3, 0), 2L), + (Array(0, 5, 0, 6, 0, 3, 0), 2L), + (Array(0, 6, 0, 2, 0, 3, 0), 2L), + (Array(0, 5, 0, 2, 0, 3, 0), 2L), + (Array(0, 5, 0, 1, 0, 3, 0), 2L), + (Array(0, 2, 0, 4, 0, 3, 0), 2L), + (Array(0, 1, 0, 4, 0, 3, 0), 2L), + (Array(0, 1, 2, 0, 4, 0, 3, 0), 2L), + (Array(0, 1, 0, 3, 0, 3, 0), 3L), + (Array(0, 1, 2, 0, 3, 0), 2L), + (Array(0, 1, 0, 2, 0, 3, 0), 2L), + (Array(0, 1, 0, 2, 3, 0), 2L), + (Array(0, 1, 0, 2, 0), 4L), + (Array(0, 1, 2, 0), 2L), + (Array(0, 3, 0, 2, 0), 3L), + (Array(0, 4, 0, 2, 0), 2L), + (Array(0, 5, 0, 2, 0), 2L), + (Array(0, 6, 0, 2, 0), 2L), + (Array(0, 5, 0, 6, 0, 2, 0), 2L), + (Array(0, 6, 0, 3, 0, 2, 0), 2L), + (Array(0, 5, 0, 3, 0, 2, 0), 2L), + (Array(0, 5, 0, 1, 0, 2, 0), 2L), + (Array(0, 4, 0, 3, 0, 2, 0), 2L), + (Array(0, 1, 0, 3, 0, 2, 0), 3L), + (Array(0, 5, 0, 6, 0, 3, 0, 2, 0), 2L), + (Array(0, 5, 0, 1, 0, 3, 0, 2, 0), 2L), + (Array(0, 1, 0, 1, 0), 2L), + (Array(0, 2, 0, 1, 0), 2L), + (Array(0, 3, 0, 1, 0), 2L), + (Array(0, 5, 0, 1, 0), 2L), + (Array(0, 2, 3, 0, 1, 0), 2L), + (Array(0, 1, 0, 3, 0, 1, 0), 2L), + (Array(0, 1, 0, 2, 3, 0, 1, 0), 2L), + (Array(0, 1, 0, 2, 0, 1, 0), 2L)) compareInternalResults(expectedValue, result.collect()) } + test("PrefixSpan projections with multiple partial starts") { + val sequences = Seq( + Array(Array(1, 2), Array(1, 2, 3))) + val rdd = sc.parallelize(sequences, 2) + val prefixSpan = new PrefixSpan() + .setMinSupport(1.0) + .setMaxPatternLength(2) + val model = prefixSpan.run(rdd) + val expected = Array( + (Array(Array(1)), 1L), + (Array(Array(1, 2)), 1L), + (Array(Array(1), Array(1)), 1L), + (Array(Array(1), Array(2)), 1L), + (Array(Array(1), Array(3)), 1L), + (Array(Array(1, 3)), 1L), + (Array(Array(2)), 1L), + (Array(Array(2, 3)), 1L), + (Array(Array(2), Array(1)), 1L), + (Array(Array(2), Array(2)), 1L), + (Array(Array(2), Array(3)), 1L), + (Array(Array(3)), 1L)) + compareResults(expected, model.freqSequences.collect()) + } + test("PrefixSpan Integer type, variable-size itemsets") { val sequences = Seq( Array(Array(1, 2), Array(3)), @@ -265,7 +287,7 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext { Array(Array(6))) val rdd = sc.parallelize(sequences, 2).cache() - val prefixspan = new PrefixSpan() + val prefixSpan = new PrefixSpan() .setMinSupport(0.5) .setMaxPatternLength(5) @@ -296,7 +318,7 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext { 5 <{1,2}> 0.75 */ - val model = prefixspan.run(rdd) + val model = prefixSpan.run(rdd) val expected = Array( (Array(Array(1)), 3L), (Array(Array(2)), 3L), @@ -304,7 +326,7 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext { (Array(Array(1), Array(3)), 2L), (Array(Array(1, 2)), 3L) ) - compareResults(expected, model.freqSequences.collect().map(x => (x.sequence, x.freq))) + compareResults(expected, model.freqSequences.collect()) } test("PrefixSpan String type, variable-size itemsets") { @@ -318,11 +340,11 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext { Array(Array(6))).map(seq => seq.map(itemSet => itemSet.map(intToString))) val rdd = sc.parallelize(sequences, 2).cache() - val prefixspan = new PrefixSpan() + val prefixSpan = new PrefixSpan() .setMinSupport(0.5) .setMaxPatternLength(5) - val model = prefixspan.run(rdd) + val model = prefixSpan.run(rdd) val expected = Array( (Array(Array(1)), 3L), (Array(Array(2)), 3L), @@ -332,17 +354,17 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext { ).map { case (pattern, count) => (pattern.map(itemSet => itemSet.map(intToString)), count) } - compareResults(expected, model.freqSequences.collect().map(x => (x.sequence, x.freq))) + compareResults(expected, model.freqSequences.collect()) } private def compareResults[Item]( expectedValue: Array[(Array[Array[Item]], Long)], - actualValue: Array[(Array[Array[Item]], Long)]): Unit = { + actualValue: Array[PrefixSpan.FreqSequence[Item]]): Unit = { val expectedSet = expectedValue.map { case (pattern: Array[Array[Item]], count: Long) => (pattern.map(itemSet => itemSet.toSet).toSeq, count) }.toSet - val actualSet = actualValue.map { case (pattern: Array[Array[Item]], count: Long) => - (pattern.map(itemSet => itemSet.toSet).toSeq, count) + val actualSet = actualValue.map { x => + (x.sequence.map(_.toSet).toSeq, x.freq) }.toSet assert(expectedSet === actualSet) } @@ -354,11 +376,4 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext { val actualSet = actualValue.map(x => (x._1.toSeq, x._2)).toSet assert(expectedSet === actualSet) } - - private def insertDelimiter(sequence: Array[Int]): Array[Int] = { - sequence.zip(Seq.fill(sequence.length)(PrefixSpan.DELIMITER)).map { case (a, b) => - List(a, b) - }.flatten - } - } |