aboutsummaryrefslogtreecommitdiff
path: root/mllib/src/test
diff options
context:
space:
mode:
authorXiangrui Meng <meng@databricks.com>2015-08-04 22:28:49 -0700
committerXiangrui Meng <meng@databricks.com>2015-08-04 22:28:49 -0700
commita02bcf20c4fc9e2e182630d197221729e996afc2 (patch)
treeaddf5a311acafad2849dd32c7a8c47f88f1f702f /mllib/src/test
parentf7abd6bec9d51ed4ab6359e50eac853e64ecae86 (diff)
downloadspark-a02bcf20c4fc9e2e182630d197221729e996afc2.tar.gz
spark-a02bcf20c4fc9e2e182630d197221729e996afc2.tar.bz2
spark-a02bcf20c4fc9e2e182630d197221729e996afc2.zip
[SPARK-9540] [MLLIB] optimize PrefixSpan implementation
This is a major refactoring of the PrefixSpan implementation. It contains the following changes: 1. Expand prefix with one item at a time. The existing implementation generates all subsets for each itemset, which might have scalability issue when the itemset is large. 2. Use a new internal format. `<(12)(31)>` is represented by `[0, 1, 2, 0, 1, 3, 0]` internally. We use `0` because negative numbers are used to indicates partial prefix items, e.g., `_2` is represented by `-2`. 3. Remember the start indices of all partial projections in the projected postfix to help next projection. 4. Reuse the original sequence array for projected postfixes. 5. Use `Prefix` IDs in aggregation rather than its content. 6. Use `ArrayBuilder` for building primitive arrays. 7. Expose `maxLocalProjDBSize`. 8. Tests are not changed except using `0` instead of `-1` as the delimiter. `Postfix`'s API doc should be a good place to start. Closes #7594 feynmanliang zhangjiajin Author: Xiangrui Meng <meng@databricks.com> Closes #7937 from mengxr/SPARK-9540 and squashes the following commits: 2d0ec31 [Xiangrui Meng] address more comments 48f450c [Xiangrui Meng] address comments from Feynman; fixed a bug in project and added a test 65f90e8 [Xiangrui Meng] naming and documentation 8afc86a [Xiangrui Meng] refactor impl
Diffstat (limited to 'mllib/src/test')
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala271
1 files changed, 143 insertions, 128 deletions
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala
index 0ae48d62cc..a83e543859 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala
@@ -21,7 +21,7 @@ import org.apache.spark.mllib.util.MLlibTestSparkContext
class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
- test("PrefixSpan internal (integer seq, -1 delim) run, singleton itemsets") {
+ test("PrefixSpan internal (integer seq, 0 delim) run, singleton itemsets") {
/*
library("arulesSequences")
@@ -35,83 +35,81 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
*/
val sequences = Array(
- Array(1, -1, 3, -1, 4, -1, 5),
- Array(2, -1, 3, -1, 1),
- Array(2, -1, 4, -1, 1),
- Array(3, -1, 1, -1, 3, -1, 4, -1, 5),
- Array(3, -1, 4, -1, 4, -1, 3),
- Array(6, -1, 5, -1, 3))
+ Array(0, 1, 0, 3, 0, 4, 0, 5, 0),
+ Array(0, 2, 0, 3, 0, 1, 0),
+ Array(0, 2, 0, 4, 0, 1, 0),
+ Array(0, 3, 0, 1, 0, 3, 0, 4, 0, 5, 0),
+ Array(0, 3, 0, 4, 0, 4, 0, 3, 0),
+ Array(0, 6, 0, 5, 0, 3, 0))
val rdd = sc.parallelize(sequences, 2).cache()
- val prefixspan = new PrefixSpan()
- .setMinSupport(0.33)
- .setMaxPatternLength(50)
- val result1 = prefixspan.run(rdd)
+ val result1 = PrefixSpan.genFreqPatterns(
+ rdd, minCount = 2L, maxPatternLength = 50, maxLocalProjDBSize = 16L)
val expectedValue1 = Array(
- (Array(1), 4L),
- (Array(1, -1, 3), 2L),
- (Array(1, -1, 3, -1, 4), 2L),
- (Array(1, -1, 3, -1, 4, -1, 5), 2L),
- (Array(1, -1, 3, -1, 5), 2L),
- (Array(1, -1, 4), 2L),
- (Array(1, -1, 4, -1, 5), 2L),
- (Array(1, -1, 5), 2L),
- (Array(2), 2L),
- (Array(2, -1, 1), 2L),
- (Array(3), 5L),
- (Array(3, -1, 1), 2L),
- (Array(3, -1, 3), 2L),
- (Array(3, -1, 4), 3L),
- (Array(3, -1, 4, -1, 5), 2L),
- (Array(3, -1, 5), 2L),
- (Array(4), 4L),
- (Array(4, -1, 5), 2L),
- (Array(5), 3L)
+ (Array(0, 1, 0), 4L),
+ (Array(0, 1, 0, 3, 0), 2L),
+ (Array(0, 1, 0, 3, 0, 4, 0), 2L),
+ (Array(0, 1, 0, 3, 0, 4, 0, 5, 0), 2L),
+ (Array(0, 1, 0, 3, 0, 5, 0), 2L),
+ (Array(0, 1, 0, 4, 0), 2L),
+ (Array(0, 1, 0, 4, 0, 5, 0), 2L),
+ (Array(0, 1, 0, 5, 0), 2L),
+ (Array(0, 2, 0), 2L),
+ (Array(0, 2, 0, 1, 0), 2L),
+ (Array(0, 3, 0), 5L),
+ (Array(0, 3, 0, 1, 0), 2L),
+ (Array(0, 3, 0, 3, 0), 2L),
+ (Array(0, 3, 0, 4, 0), 3L),
+ (Array(0, 3, 0, 4, 0, 5, 0), 2L),
+ (Array(0, 3, 0, 5, 0), 2L),
+ (Array(0, 4, 0), 4L),
+ (Array(0, 4, 0, 5, 0), 2L),
+ (Array(0, 5, 0), 3L)
)
compareInternalResults(expectedValue1, result1.collect())
- prefixspan.setMinSupport(0.5).setMaxPatternLength(50)
- val result2 = prefixspan.run(rdd)
+ val result2 = PrefixSpan.genFreqPatterns(
+ rdd, minCount = 3, maxPatternLength = 50, maxLocalProjDBSize = 32L)
val expectedValue2 = Array(
- (Array(1), 4L),
- (Array(3), 5L),
- (Array(3, -1, 4), 3L),
- (Array(4), 4L),
- (Array(5), 3L)
+ (Array(0, 1, 0), 4L),
+ (Array(0, 3, 0), 5L),
+ (Array(0, 3, 0, 4, 0), 3L),
+ (Array(0, 4, 0), 4L),
+ (Array(0, 5, 0), 3L)
)
compareInternalResults(expectedValue2, result2.collect())
- prefixspan.setMinSupport(0.33).setMaxPatternLength(2)
- val result3 = prefixspan.run(rdd)
+ val result3 = PrefixSpan.genFreqPatterns(
+ rdd, minCount = 2, maxPatternLength = 2, maxLocalProjDBSize = 32L)
val expectedValue3 = Array(
- (Array(1), 4L),
- (Array(1, -1, 3), 2L),
- (Array(1, -1, 4), 2L),
- (Array(1, -1, 5), 2L),
- (Array(2, -1, 1), 2L),
- (Array(2), 2L),
- (Array(3), 5L),
- (Array(3, -1, 1), 2L),
- (Array(3, -1, 3), 2L),
- (Array(3, -1, 4), 3L),
- (Array(3, -1, 5), 2L),
- (Array(4), 4L),
- (Array(4, -1, 5), 2L),
- (Array(5), 3L)
+ (Array(0, 1, 0), 4L),
+ (Array(0, 1, 0, 3, 0), 2L),
+ (Array(0, 1, 0, 4, 0), 2L),
+ (Array(0, 1, 0, 5, 0), 2L),
+ (Array(0, 2, 0, 1, 0), 2L),
+ (Array(0, 2, 0), 2L),
+ (Array(0, 3, 0), 5L),
+ (Array(0, 3, 0, 1, 0), 2L),
+ (Array(0, 3, 0, 3, 0), 2L),
+ (Array(0, 3, 0, 4, 0), 3L),
+ (Array(0, 3, 0, 5, 0), 2L),
+ (Array(0, 4, 0), 4L),
+ (Array(0, 4, 0, 5, 0), 2L),
+ (Array(0, 5, 0), 3L)
)
compareInternalResults(expectedValue3, result3.collect())
}
test("PrefixSpan internal (integer seq, -1 delim) run, variable-size itemsets") {
val sequences = Array(
- Array(1, -1, 1, 2, 3, -1, 1, 3, -1, 4, -1, 3, 6),
- Array(1, 4, -1, 3, -1, 2, 3, -1, 1, 5),
- Array(5, 6, -1, 1, 2, -1, 4, 6, -1, 3, -1, 2),
- Array(5, -1, 7, -1, 1, 6, -1, 3, -1, 2, -1, 3))
+ Array(0, 1, 0, 1, 2, 3, 0, 1, 3, 0, 4, 0, 3, 6, 0),
+ Array(0, 1, 4, 0, 3, 0, 2, 3, 0, 1, 5, 0),
+ Array(0, 5, 6, 0, 1, 2, 0, 4, 6, 0, 3, 0, 2, 0),
+ Array(0, 5, 0, 7, 0, 1, 6, 0, 3, 0, 2, 0, 3, 0))
val rdd = sc.parallelize(sequences, 2).cache()
- val prefixspan = new PrefixSpan().setMinSupport(0.5).setMaxPatternLength(5)
- val result = prefixspan.run(rdd)
+ val result = PrefixSpan.genFreqPatterns(
+ rdd, minCount = 2, maxPatternLength = 5, maxLocalProjDBSize = 128L)
/*
To verify results, create file "prefixSpanSeqs" with content
@@ -200,63 +198,87 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
53 <{1},{2},{1}> 0.50
*/
val expectedValue = Array(
- (Array(1), 4L),
- (Array(2), 4L),
- (Array(3), 4L),
- (Array(4), 3L),
- (Array(5), 3L),
- (Array(6), 3L),
- (Array(1, -1, 6), 2L),
- (Array(2, -1, 6), 2L),
- (Array(5, -1, 6), 2L),
- (Array(1, 2, -1, 6), 2L),
- (Array(1, -1, 4), 2L),
- (Array(2, -1, 4), 2L),
- (Array(1, 2, -1, 4), 2L),
- (Array(1, -1, 3), 4L),
- (Array(2, -1, 3), 3L),
- (Array(2, 3), 2L),
- (Array(3, -1, 3), 3L),
- (Array(4, -1, 3), 3L),
- (Array(5, -1, 3), 2L),
- (Array(6, -1, 3), 2L),
- (Array(5, -1, 6, -1, 3), 2L),
- (Array(6, -1, 2, -1, 3), 2L),
- (Array(5, -1, 2, -1, 3), 2L),
- (Array(5, -1, 1, -1, 3), 2L),
- (Array(2, -1, 4, -1, 3), 2L),
- (Array(1, -1, 4, -1, 3), 2L),
- (Array(1, 2, -1, 4, -1, 3), 2L),
- (Array(1, -1, 3, -1, 3), 3L),
- (Array(1, 2, -1, 3), 2L),
- (Array(1, -1, 2, -1, 3), 2L),
- (Array(1, -1, 2, 3), 2L),
- (Array(1, -1, 2), 4L),
- (Array(1, 2), 2L),
- (Array(3, -1, 2), 3L),
- (Array(4, -1, 2), 2L),
- (Array(5, -1, 2), 2L),
- (Array(6, -1, 2), 2L),
- (Array(5, -1, 6, -1, 2), 2L),
- (Array(6, -1, 3, -1, 2), 2L),
- (Array(5, -1, 3, -1, 2), 2L),
- (Array(5, -1, 1, -1, 2), 2L),
- (Array(4, -1, 3, -1, 2), 2L),
- (Array(1, -1, 3, -1, 2), 3L),
- (Array(5, -1, 6, -1, 3, -1, 2), 2L),
- (Array(5, -1, 1, -1, 3, -1, 2), 2L),
- (Array(1, -1, 1), 2L),
- (Array(2, -1, 1), 2L),
- (Array(3, -1, 1), 2L),
- (Array(5, -1, 1), 2L),
- (Array(2, 3, -1, 1), 2L),
- (Array(1, -1, 3, -1, 1), 2L),
- (Array(1, -1, 2, 3, -1, 1), 2L),
- (Array(1, -1, 2, -1, 1), 2L))
+ (Array(0, 1, 0), 4L),
+ (Array(0, 2, 0), 4L),
+ (Array(0, 3, 0), 4L),
+ (Array(0, 4, 0), 3L),
+ (Array(0, 5, 0), 3L),
+ (Array(0, 6, 0), 3L),
+ (Array(0, 1, 0, 6, 0), 2L),
+ (Array(0, 2, 0, 6, 0), 2L),
+ (Array(0, 5, 0, 6, 0), 2L),
+ (Array(0, 1, 2, 0, 6, 0), 2L),
+ (Array(0, 1, 0, 4, 0), 2L),
+ (Array(0, 2, 0, 4, 0), 2L),
+ (Array(0, 1, 2, 0, 4, 0), 2L),
+ (Array(0, 1, 0, 3, 0), 4L),
+ (Array(0, 2, 0, 3, 0), 3L),
+ (Array(0, 2, 3, 0), 2L),
+ (Array(0, 3, 0, 3, 0), 3L),
+ (Array(0, 4, 0, 3, 0), 3L),
+ (Array(0, 5, 0, 3, 0), 2L),
+ (Array(0, 6, 0, 3, 0), 2L),
+ (Array(0, 5, 0, 6, 0, 3, 0), 2L),
+ (Array(0, 6, 0, 2, 0, 3, 0), 2L),
+ (Array(0, 5, 0, 2, 0, 3, 0), 2L),
+ (Array(0, 5, 0, 1, 0, 3, 0), 2L),
+ (Array(0, 2, 0, 4, 0, 3, 0), 2L),
+ (Array(0, 1, 0, 4, 0, 3, 0), 2L),
+ (Array(0, 1, 2, 0, 4, 0, 3, 0), 2L),
+ (Array(0, 1, 0, 3, 0, 3, 0), 3L),
+ (Array(0, 1, 2, 0, 3, 0), 2L),
+ (Array(0, 1, 0, 2, 0, 3, 0), 2L),
+ (Array(0, 1, 0, 2, 3, 0), 2L),
+ (Array(0, 1, 0, 2, 0), 4L),
+ (Array(0, 1, 2, 0), 2L),
+ (Array(0, 3, 0, 2, 0), 3L),
+ (Array(0, 4, 0, 2, 0), 2L),
+ (Array(0, 5, 0, 2, 0), 2L),
+ (Array(0, 6, 0, 2, 0), 2L),
+ (Array(0, 5, 0, 6, 0, 2, 0), 2L),
+ (Array(0, 6, 0, 3, 0, 2, 0), 2L),
+ (Array(0, 5, 0, 3, 0, 2, 0), 2L),
+ (Array(0, 5, 0, 1, 0, 2, 0), 2L),
+ (Array(0, 4, 0, 3, 0, 2, 0), 2L),
+ (Array(0, 1, 0, 3, 0, 2, 0), 3L),
+ (Array(0, 5, 0, 6, 0, 3, 0, 2, 0), 2L),
+ (Array(0, 5, 0, 1, 0, 3, 0, 2, 0), 2L),
+ (Array(0, 1, 0, 1, 0), 2L),
+ (Array(0, 2, 0, 1, 0), 2L),
+ (Array(0, 3, 0, 1, 0), 2L),
+ (Array(0, 5, 0, 1, 0), 2L),
+ (Array(0, 2, 3, 0, 1, 0), 2L),
+ (Array(0, 1, 0, 3, 0, 1, 0), 2L),
+ (Array(0, 1, 0, 2, 3, 0, 1, 0), 2L),
+ (Array(0, 1, 0, 2, 0, 1, 0), 2L))
compareInternalResults(expectedValue, result.collect())
}
+ test("PrefixSpan projections with multiple partial starts") {
+ val sequences = Seq(
+ Array(Array(1, 2), Array(1, 2, 3)))
+ val rdd = sc.parallelize(sequences, 2)
+ val prefixSpan = new PrefixSpan()
+ .setMinSupport(1.0)
+ .setMaxPatternLength(2)
+ val model = prefixSpan.run(rdd)
+ val expected = Array(
+ (Array(Array(1)), 1L),
+ (Array(Array(1, 2)), 1L),
+ (Array(Array(1), Array(1)), 1L),
+ (Array(Array(1), Array(2)), 1L),
+ (Array(Array(1), Array(3)), 1L),
+ (Array(Array(1, 3)), 1L),
+ (Array(Array(2)), 1L),
+ (Array(Array(2, 3)), 1L),
+ (Array(Array(2), Array(1)), 1L),
+ (Array(Array(2), Array(2)), 1L),
+ (Array(Array(2), Array(3)), 1L),
+ (Array(Array(3)), 1L))
+ compareResults(expected, model.freqSequences.collect())
+ }
+
test("PrefixSpan Integer type, variable-size itemsets") {
val sequences = Seq(
Array(Array(1, 2), Array(3)),
@@ -265,7 +287,7 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
Array(Array(6)))
val rdd = sc.parallelize(sequences, 2).cache()
- val prefixspan = new PrefixSpan()
+ val prefixSpan = new PrefixSpan()
.setMinSupport(0.5)
.setMaxPatternLength(5)
@@ -296,7 +318,7 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
5 <{1,2}> 0.75
*/
- val model = prefixspan.run(rdd)
+ val model = prefixSpan.run(rdd)
val expected = Array(
(Array(Array(1)), 3L),
(Array(Array(2)), 3L),
@@ -304,7 +326,7 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
(Array(Array(1), Array(3)), 2L),
(Array(Array(1, 2)), 3L)
)
- compareResults(expected, model.freqSequences.collect().map(x => (x.sequence, x.freq)))
+ compareResults(expected, model.freqSequences.collect())
}
test("PrefixSpan String type, variable-size itemsets") {
@@ -318,11 +340,11 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
Array(Array(6))).map(seq => seq.map(itemSet => itemSet.map(intToString)))
val rdd = sc.parallelize(sequences, 2).cache()
- val prefixspan = new PrefixSpan()
+ val prefixSpan = new PrefixSpan()
.setMinSupport(0.5)
.setMaxPatternLength(5)
- val model = prefixspan.run(rdd)
+ val model = prefixSpan.run(rdd)
val expected = Array(
(Array(Array(1)), 3L),
(Array(Array(2)), 3L),
@@ -332,17 +354,17 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
).map { case (pattern, count) =>
(pattern.map(itemSet => itemSet.map(intToString)), count)
}
- compareResults(expected, model.freqSequences.collect().map(x => (x.sequence, x.freq)))
+ compareResults(expected, model.freqSequences.collect())
}
private def compareResults[Item](
expectedValue: Array[(Array[Array[Item]], Long)],
- actualValue: Array[(Array[Array[Item]], Long)]): Unit = {
+ actualValue: Array[PrefixSpan.FreqSequence[Item]]): Unit = {
val expectedSet = expectedValue.map { case (pattern: Array[Array[Item]], count: Long) =>
(pattern.map(itemSet => itemSet.toSet).toSeq, count)
}.toSet
- val actualSet = actualValue.map { case (pattern: Array[Array[Item]], count: Long) =>
- (pattern.map(itemSet => itemSet.toSet).toSeq, count)
+ val actualSet = actualValue.map { x =>
+ (x.sequence.map(_.toSet).toSeq, x.freq)
}.toSet
assert(expectedSet === actualSet)
}
@@ -354,11 +376,4 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
val actualSet = actualValue.map(x => (x._1.toSeq, x._2)).toSet
assert(expectedSet === actualSet)
}
-
- private def insertDelimiter(sequence: Array[Int]): Array[Int] = {
- sequence.zip(Seq.fill(sequence.length)(PrefixSpan.DELIMITER)).map { case (a, b) =>
- List(a, b)
- }.flatten
- }
-
}