aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorzhangjiajin <zhangjiajin@huawei.com>2015-08-01 01:56:27 -0700
committerXiangrui Meng <meng@databricks.com>2015-08-01 01:56:27 -0700
commitd2a9b66f6c0de89d6d16370af1c77c7f51b11d3e (patch)
treec58b3009b8c8d2a936834022bf28aeb2f1d472df /mllib
parent65038973a17904e0e04d453799ec108af240fbab (diff)
downloadspark-d2a9b66f6c0de89d6d16370af1c77c7f51b11d3e.tar.gz
spark-d2a9b66f6c0de89d6d16370af1c77c7f51b11d3e.tar.bz2
spark-d2a9b66f6c0de89d6d16370af1c77c7f51b11d3e.zip
[SPARK-8999] [MLLIB] PrefixSpan non-temporal sequences
mengxr Extends PrefixSpan to non-temporal itemsets. Continues work by zhangjiajin * Internal API uses List[Set[Int]] which is likely not efficient; will need to refactor during QA Closes #7646 Author: zhangjiajin <zhangjiajin@huawei.com> Author: Feynman Liang <fliang@databricks.com> Author: zhang jiajin <zhangjiajin@huawei.com> Closes #7818 from feynmanliang/SPARK-8999-nonTemporal and squashes the following commits: 4ded81d [Feynman Liang] Replace all filters to filter nonempty 350e67e [Feynman Liang] Code review feedback 03156ca [Feynman Liang] Fix tests, drop delimiters at boundaries of sequences d1fe0ed [Feynman Liang] Remove comments 86ca4e5 [Feynman Liang] Fix style 7c7bf39 [Feynman Liang] Fixed itemSet sequences 6073b10 [Feynman Liang] Basic itemset functionality, failing test 1a7fb48 [Feynman Liang] Add delimiter to results 5db00aa [Feynman Liang] Working for items, not itemsets 6787716 [Feynman Liang] Working on temporal sequences f1114b9 [Feynman Liang] Add -1 delimiter 00fe756 [Feynman Liang] Reset base files for rebase f486dcd [zhangjiajin] change maxLocalProjDBSize and fix a bug (remove -3 from frequent items). 60a0b76 [zhangjiajin] fixed a scala style error. 740c203 [zhangjiajin] fixed a scala style error. 5785cb8 [zhangjiajin] support non-temporal sequence a5d649d [zhangjiajin] restore original version 09dc409 [zhangjiajin] Merge branch 'master' of https://github.com/apache/spark into multiItems_2 ae8c02d [zhangjiajin] Fixed some Scala style errors. 216ab0c [zhangjiajin] Support non-temporal sequence in PrefixSpan b572f54 [zhangjiajin] initialize file before rebase. f06772f [zhangjiajin] fix a scala style error. a7e50d4 [zhangjiajin] Add feature: Collect enough frequent prefixes before projection in PrefixSpan. c1d13d0 [zhang jiajin] Delete PrefixspanSuite.scala d9d8137 [zhang jiajin] Delete Prefixspan.scala c6ceb63 [zhangjiajin] Add new algorithm PrefixSpan and test file.
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala46
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala111
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala237
3 files changed, 302 insertions, 92 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala
index 0ea7920810..ccebf951c8 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala
@@ -25,7 +25,7 @@ import org.apache.spark.Logging
* Calculate all patterns of a projected database in local.
*/
private[fpm] object LocalPrefixSpan extends Logging with Serializable {
-
+ import PrefixSpan._
/**
* Calculate all patterns of a projected database.
* @param minCount minimum count
@@ -39,12 +39,19 @@ private[fpm] object LocalPrefixSpan extends Logging with Serializable {
def run(
minCount: Long,
maxPatternLength: Int,
- prefixes: List[Int],
- database: Iterable[Array[Int]]): Iterator[(List[Int], Long)] = {
- if (prefixes.length == maxPatternLength || database.isEmpty) return Iterator.empty
- val frequentItemAndCounts = getFreqItemAndCounts(minCount, database)
- val filteredDatabase = database.map(x => x.filter(frequentItemAndCounts.contains))
- frequentItemAndCounts.iterator.flatMap { case (item, count) =>
+ prefixes: List[Set[Int]],
+ database: Iterable[List[Set[Int]]]): Iterator[(List[Set[Int]], Long)] = {
+ if (prefixes.length == maxPatternLength || database.isEmpty) {
+ return Iterator.empty
+ }
+ val freqItemSetsAndCounts = getFreqItemAndCounts(minCount, database)
+ val freqItems = freqItemSetsAndCounts.keys.flatten.toSet
+ val filteredDatabase = database.map { suffix =>
+ suffix
+ .map(item => freqItems.intersect(item))
+ .filter(_.nonEmpty)
+ }
+ freqItemSetsAndCounts.iterator.flatMap { case (item, count) =>
val newPrefixes = item :: prefixes
val newProjected = project(filteredDatabase, item)
Iterator.single((newPrefixes, count)) ++
@@ -54,20 +61,23 @@ private[fpm] object LocalPrefixSpan extends Logging with Serializable {
/**
* Calculate suffix sequence immediately after the first occurrence of an item.
- * @param item item to get suffix after
+ * @param item itemset to get suffix after
* @param sequence sequence to extract suffix from
* @return suffix sequence
*/
- def getSuffix(item: Int, sequence: Array[Int]): Array[Int] = {
- val index = sequence.indexOf(item)
+ def getSuffix(item: Set[Int], sequence: List[Set[Int]]): List[Set[Int]] = {
+ val itemsetSeq = sequence
+ val index = itemsetSeq.indexWhere(item.subsetOf(_))
if (index == -1) {
- Array()
+ List()
} else {
- sequence.drop(index + 1)
+ itemsetSeq.drop(index + 1)
}
}
- def project(database: Iterable[Array[Int]], prefix: Int): Iterable[Array[Int]] = {
+ def project(
+ database: Iterable[List[Set[Int]]],
+ prefix: Set[Int]): Iterable[List[Set[Int]]] = {
database
.map(getSuffix(prefix, _))
.filter(_.nonEmpty)
@@ -81,14 +91,16 @@ private[fpm] object LocalPrefixSpan extends Logging with Serializable {
*/
private def getFreqItemAndCounts(
minCount: Long,
- database: Iterable[Array[Int]]): mutable.Map[Int, Long] = {
+ database: Iterable[List[Set[Int]]]): Map[Set[Int], Long] = {
// TODO: use PrimitiveKeyOpenHashMap
- val counts = mutable.Map[Int, Long]().withDefaultValue(0L)
+ val counts = mutable.Map[Set[Int], Long]().withDefaultValue(0L)
database.foreach { sequence =>
- sequence.distinct.foreach { item =>
+ sequence.flatMap(nonemptySubsets(_)).distinct.foreach { item =>
counts(item) += 1L
}
}
- counts.filter(_._2 >= minCount)
+ counts
+ .filter { case (_, count) => count >= minCount }
+ .toMap
}
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
index e6752332cd..22b4ddb8b3 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
@@ -17,7 +17,7 @@
package org.apache.spark.mllib.fpm
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.ArrayBuilder
import org.apache.spark.Logging
import org.apache.spark.annotation.Experimental
@@ -44,13 +44,14 @@ import org.apache.spark.storage.StorageLevel
class PrefixSpan private (
private var minSupport: Double,
private var maxPatternLength: Int) extends Logging with Serializable {
+ import PrefixSpan._
/**
* The maximum number of items allowed in a projected database before local processing. If a
* projected database exceeds this size, another iteration of distributed PrefixSpan is run.
*/
- // TODO: make configurable with a better default value, 10000 may be too small
- private val maxLocalProjDBSize: Long = 10000
+ // TODO: make configurable with a better default value
+ private val maxLocalProjDBSize: Long = 32000000L
/**
* Constructs a default instance with default parameters
@@ -90,35 +91,41 @@ class PrefixSpan private (
/**
* Find the complete set of sequential patterns in the input sequences.
- * @param sequences input data set, contains a set of sequences,
- * a sequence is an ordered list of elements.
+ * @param data ordered sequences of itemsets. Items are represented by non-negative integers.
+ * Each itemset has one or more items and is delimited by [[DELIMITER]].
* @return a set of sequential pattern pairs,
* the key of pair is pattern (a list of elements),
* the value of pair is the pattern's count.
*/
- def run(sequences: RDD[Array[Int]]): RDD[(Array[Int], Long)] = {
- val sc = sequences.sparkContext
+ // TODO: generalize to arbitrary item-types and use mapping to Ints for internal algorithm
+ def run(data: RDD[Array[Int]]): RDD[(Array[Int], Long)] = {
+ val sc = data.sparkContext
- if (sequences.getStorageLevel == StorageLevel.NONE) {
+ if (data.getStorageLevel == StorageLevel.NONE) {
logWarning("Input data is not cached.")
}
+ // Use List[Set[Item]] for internal computation
+ val sequences = data.map { seq => splitSequence(seq.toList) }
+
// Convert min support to a min number of transactions for this dataset
val minCount = if (minSupport == 0) 0L else math.ceil(sequences.count() * minSupport).toLong
// (Frequent items -> number of occurrences, all items here satisfy the `minSupport` threshold
val freqItemCounts = sequences
- .flatMap(seq => seq.distinct.map(item => (item, 1L)))
+ .flatMap(seq => seq.flatMap(nonemptySubsets(_)).distinct.map(item => (item, 1L)))
.reduceByKey(_ + _)
- .filter(_._2 >= minCount)
+ .filter { case (item, count) => (count >= minCount) }
.collect()
+ .toMap
// Pairs of (length 1 prefix, suffix consisting of frequent items)
val itemSuffixPairs = {
- val freqItems = freqItemCounts.map(_._1).toSet
+ val freqItemSets = freqItemCounts.keys.toSet
+ val freqItems = freqItemSets.flatten
sequences.flatMap { seq =>
- val filteredSeq = seq.filter(freqItems.contains(_))
- freqItems.flatMap { item =>
+ val filteredSeq = seq.map(item => freqItems.intersect(item)).filter(_.nonEmpty)
+ freqItemSets.flatMap { item =>
val candidateSuffix = LocalPrefixSpan.getSuffix(item, filteredSeq)
candidateSuffix match {
case suffix if !suffix.isEmpty => Some((List(item), suffix))
@@ -130,14 +137,15 @@ class PrefixSpan private (
// Accumulator for the computed results to be returned, initialized to the frequent items (i.e.
// frequent length-one prefixes)
- var resultsAccumulator = freqItemCounts.map(x => (List(x._1), x._2))
+ var resultsAccumulator = freqItemCounts.map { case (item, count) => (List(item), count) }.toList
// Remaining work to be locally and distributively processed respectfully
var (pairsForLocal, pairsForDistributed) = partitionByProjDBSize(itemSuffixPairs)
// Continue processing until no pairs for distributed processing remain (i.e. all prefixes have
- // projected database sizes <= `maxLocalProjDBSize`)
- while (pairsForDistributed.count() != 0) {
+ // projected database sizes <= `maxLocalProjDBSize`) or `maxPatternLength` is reached
+ var patternLength = 1
+ while (pairsForDistributed.count() != 0 && patternLength < maxPatternLength) {
val (nextPatternAndCounts, nextPrefixSuffixPairs) =
extendPrefixes(minCount, pairsForDistributed)
pairsForDistributed.unpersist()
@@ -146,6 +154,7 @@ class PrefixSpan private (
pairsForDistributed.persist(StorageLevel.MEMORY_AND_DISK)
pairsForLocal ++= smallerPairsPart
resultsAccumulator ++= nextPatternAndCounts.collect()
+ patternLength += 1 // pattern length grows one per iteration
}
// Process the small projected databases locally
@@ -153,7 +162,7 @@ class PrefixSpan private (
minCount, sc.parallelize(pairsForLocal, 1).groupByKey())
(sc.parallelize(resultsAccumulator, 1) ++ remainingResults)
- .map { case (pattern, count) => (pattern.toArray, count) }
+ .map { case (pattern, count) => (flattenSequence(pattern.reverse).toArray, count) }
}
@@ -163,8 +172,8 @@ class PrefixSpan private (
* @return prefix-suffix pairs partitioned by whether their projected database size is <= or
* greater than [[maxLocalProjDBSize]]
*/
- private def partitionByProjDBSize(prefixSuffixPairs: RDD[(List[Int], Array[Int])])
- : (Array[(List[Int], Array[Int])], RDD[(List[Int], Array[Int])]) = {
+ private def partitionByProjDBSize(prefixSuffixPairs: RDD[(List[Set[Int]], List[Set[Int]])])
+ : (List[(List[Set[Int]], List[Set[Int]])], RDD[(List[Set[Int]], List[Set[Int]])]) = {
val prefixToSuffixSize = prefixSuffixPairs
.aggregateByKey(0)(
seqOp = { case (count, suffix) => count + suffix.length },
@@ -176,12 +185,12 @@ class PrefixSpan private (
.toSet
val small = prefixSuffixPairs.filter { case (prefix, _) => smallPrefixes.contains(prefix) }
val large = prefixSuffixPairs.filter { case (prefix, _) => !smallPrefixes.contains(prefix) }
- (small.collect(), large)
+ (small.collect().toList, large)
}
/**
- * Extends all prefixes by one item from their suffix and computes the resulting frequent prefixes
- * and remaining work.
+ * Extends all prefixes by one itemset from their suffix and computes the resulting frequent
+ * prefixes and remaining work.
* @param minCount minimum count
* @param prefixSuffixPairs prefix (length N) and suffix pairs,
* @return (frequent length N+1 extended prefix, count) pairs and (frequent length N+1 extended
@@ -189,15 +198,16 @@ class PrefixSpan private (
*/
private def extendPrefixes(
minCount: Long,
- prefixSuffixPairs: RDD[(List[Int], Array[Int])])
- : (RDD[(List[Int], Long)], RDD[(List[Int], Array[Int])]) = {
+ prefixSuffixPairs: RDD[(List[Set[Int]], List[Set[Int]])])
+ : (RDD[(List[Set[Int]], Long)], RDD[(List[Set[Int]], List[Set[Int]])]) = {
- // (length N prefix, item from suffix) pairs and their corresponding number of occurrences
+ // (length N prefix, itemset from suffix) pairs and their corresponding number of occurrences
// Every (prefix :+ suffix) is guaranteed to have support exceeding `minSupport`
val prefixItemPairAndCounts = prefixSuffixPairs
- .flatMap { case (prefix, suffix) => suffix.distinct.map(y => ((prefix, y), 1L)) }
+ .flatMap { case (prefix, suffix) =>
+ suffix.flatMap(nonemptySubsets(_)).distinct.map(y => ((prefix, y), 1L)) }
.reduceByKey(_ + _)
- .filter(_._2 >= minCount)
+ .filter { case (item, count) => (count >= minCount) }
// Map from prefix to set of possible next items from suffix
val prefixToNextItems = prefixItemPairAndCounts
@@ -207,7 +217,6 @@ class PrefixSpan private (
.collect()
.toMap
-
// Frequent patterns with length N+1 and their corresponding counts
val extendedPrefixAndCounts = prefixItemPairAndCounts
.map { case ((prefix, item), count) => (item :: prefix, count) }
@@ -216,9 +225,12 @@ class PrefixSpan private (
val extendedPrefixAndSuffix = prefixSuffixPairs
.filter(x => prefixToNextItems.contains(x._1))
.flatMap { case (prefix, suffix) =>
- val frequentNextItems = prefixToNextItems(prefix)
- val filteredSuffix = suffix.filter(frequentNextItems.contains(_))
- frequentNextItems.flatMap { item =>
+ val frequentNextItemSets = prefixToNextItems(prefix)
+ val frequentNextItems = frequentNextItemSets.flatten
+ val filteredSuffix = suffix
+ .map(item => frequentNextItems.intersect(item))
+ .filter(_.nonEmpty)
+ frequentNextItemSets.flatMap { item =>
LocalPrefixSpan.getSuffix(item, filteredSuffix) match {
case suffix if !suffix.isEmpty => Some(item :: prefix, suffix)
case _ => None
@@ -237,13 +249,38 @@ class PrefixSpan private (
*/
private def getPatternsInLocal(
minCount: Long,
- data: RDD[(List[Int], Iterable[Array[Int]])]): RDD[(List[Int], Long)] = {
+ data: RDD[(List[Set[Int]], Iterable[List[Set[Int]]])]): RDD[(List[Set[Int]], Long)] = {
data.flatMap {
- case (prefix, projDB) =>
- LocalPrefixSpan.run(minCount, maxPatternLength, prefix.toList.reverse, projDB)
- .map { case (pattern: List[Int], count: Long) =>
- (pattern.reverse, count)
- }
+ case (prefix, projDB) => LocalPrefixSpan.run(minCount, maxPatternLength, prefix, projDB)
+ }
+ }
+
+}
+
+private[fpm] object PrefixSpan {
+ private[fpm] val DELIMITER = -1
+
+ /** Splits a sequence of itemsets delimited by [[DELIMITER]]. */
+ private[fpm] def splitSequence(sequence: List[Int]): List[Set[Int]] = {
+ sequence.span(_ != DELIMITER) match {
+ case (x, xs) if xs.length > 1 => x.toSet :: splitSequence(xs.tail)
+ case (x, xs) => List(x.toSet)
+ }
+ }
+
+ /** Flattens a sequence of itemsets into an Array, inserting[[DELIMITER]] between itemsets. */
+ private[fpm] def flattenSequence(sequence: List[Set[Int]]): List[Int] = {
+ val builder = ArrayBuilder.make[Int]()
+ for (itemSet <- sequence) {
+ builder += DELIMITER
+ builder ++= itemSet.toSeq.sorted
}
+ builder.result().toList.drop(1) // drop trailing delimiter
+ }
+
+ /** Returns an iterator over all non-empty subsets of `itemSet` */
+ private[fpm] def nonemptySubsets(itemSet: Set[Int]): Iterator[Set[Int]] = {
+ // TODO: improve complexity by using partial prefixes, considering one item at a time
+ itemSet.subsets.filter(_ != Set.empty[Int])
}
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala
index 6dd2dc926a..457f32670f 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala
@@ -21,7 +21,7 @@ import org.apache.spark.mllib.util.MLlibTestSparkContext
class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
- test("PrefixSpan using Integer type") {
+ test("PrefixSpan using Integer type, singleton itemsets") {
/*
library("arulesSequences")
@@ -35,12 +35,12 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
*/
val sequences = Array(
- Array(1, 3, 4, 5),
- Array(2, 3, 1),
- Array(2, 4, 1),
- Array(3, 1, 3, 4, 5),
- Array(3, 4, 4, 3),
- Array(6, 5, 3))
+ Array(1, -1, 3, -1, 4, -1, 5),
+ Array(2, -1, 3, -1, 1),
+ Array(2, -1, 4, -1, 1),
+ Array(3, -1, 1, -1, 3, -1, 4, -1, 5),
+ Array(3, -1, 4, -1, 4, -1, 3),
+ Array(6, -1, 5, -1, 3))
val rdd = sc.parallelize(sequences, 2).cache()
@@ -50,64 +50,225 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
val result1 = prefixspan.run(rdd)
val expectedValue1 = Array(
(Array(1), 4L),
- (Array(1, 3), 2L),
- (Array(1, 3, 4), 2L),
- (Array(1, 3, 4, 5), 2L),
- (Array(1, 3, 5), 2L),
- (Array(1, 4), 2L),
- (Array(1, 4, 5), 2L),
- (Array(1, 5), 2L),
+ (Array(1, -1, 3), 2L),
+ (Array(1, -1, 3, -1, 4), 2L),
+ (Array(1, -1, 3, -1, 4, -1, 5), 2L),
+ (Array(1, -1, 3, -1, 5), 2L),
+ (Array(1, -1, 4), 2L),
+ (Array(1, -1, 4, -1, 5), 2L),
+ (Array(1, -1, 5), 2L),
(Array(2), 2L),
- (Array(2, 1), 2L),
+ (Array(2, -1, 1), 2L),
(Array(3), 5L),
- (Array(3, 1), 2L),
- (Array(3, 3), 2L),
- (Array(3, 4), 3L),
- (Array(3, 4, 5), 2L),
- (Array(3, 5), 2L),
+ (Array(3, -1, 1), 2L),
+ (Array(3, -1, 3), 2L),
+ (Array(3, -1, 4), 3L),
+ (Array(3, -1, 4, -1, 5), 2L),
+ (Array(3, -1, 5), 2L),
(Array(4), 4L),
- (Array(4, 5), 2L),
+ (Array(4, -1, 5), 2L),
(Array(5), 3L)
)
- assert(compareResults(expectedValue1, result1.collect()))
+ compareResults(expectedValue1, result1.collect())
prefixspan.setMinSupport(0.5).setMaxPatternLength(50)
val result2 = prefixspan.run(rdd)
val expectedValue2 = Array(
(Array(1), 4L),
(Array(3), 5L),
- (Array(3, 4), 3L),
+ (Array(3, -1, 4), 3L),
(Array(4), 4L),
(Array(5), 3L)
)
- assert(compareResults(expectedValue2, result2.collect()))
+ compareResults(expectedValue2, result2.collect())
prefixspan.setMinSupport(0.33).setMaxPatternLength(2)
val result3 = prefixspan.run(rdd)
val expectedValue3 = Array(
(Array(1), 4L),
- (Array(1, 3), 2L),
- (Array(1, 4), 2L),
- (Array(1, 5), 2L),
- (Array(2, 1), 2L),
+ (Array(1, -1, 3), 2L),
+ (Array(1, -1, 4), 2L),
+ (Array(1, -1, 5), 2L),
+ (Array(2, -1, 1), 2L),
(Array(2), 2L),
(Array(3), 5L),
- (Array(3, 1), 2L),
- (Array(3, 3), 2L),
- (Array(3, 4), 3L),
- (Array(3, 5), 2L),
+ (Array(3, -1, 1), 2L),
+ (Array(3, -1, 3), 2L),
+ (Array(3, -1, 4), 3L),
+ (Array(3, -1, 5), 2L),
(Array(4), 4L),
- (Array(4, 5), 2L),
+ (Array(4, -1, 5), 2L),
(Array(5), 3L)
)
- assert(compareResults(expectedValue3, result3.collect()))
+ compareResults(expectedValue3, result3.collect())
+ }
+
+ test("PrefixSpan using Integer type, variable-size itemsets") {
+ val sequences = Array(
+ Array(1, -1, 1, 2, 3, -1, 1, 3, -1, 4, -1, 3, 6),
+ Array(1, 4, -1, 3, -1, 2, 3, -1, 1, 5),
+ Array(5, 6, -1, 1, 2, -1, 4, 6, -1, 3, -1, 2),
+ Array(5, -1, 7, -1, 1, 6, -1, 3, -1, 2, -1, 3))
+ val rdd = sc.parallelize(sequences, 2).cache()
+ val prefixspan = new PrefixSpan().setMinSupport(0.5).setMaxPatternLength(5)
+ val result = prefixspan.run(rdd)
+
+ /*
+ To verify results, create file "prefixSpanSeqs" with content
+ (format = (transactionID, idxInTransaction, numItemsinItemset, itemset)):
+ 1 1 1 1
+ 1 2 3 1 2 3
+ 1 3 2 1 3
+ 1 4 1 4
+ 1 5 2 3 6
+ 2 1 2 1 4
+ 2 2 1 3
+ 2 3 2 2 3
+ 2 4 2 1 5
+ 3 1 2 5 6
+ 3 2 2 1 2
+ 3 3 2 4 6
+ 3 4 1 3
+ 3 5 1 2
+ 4 1 1 5
+ 4 2 1 7
+ 4 3 2 1 6
+ 4 4 1 3
+ 4 5 1 2
+ 4 6 1 3
+ In R, run:
+ library("arulesSequences")
+ prefixSpanSeqs = read_baskets("prefixSpanSeqs", info = c("sequenceID","eventID","SIZE"))
+ freqItemSeq = cspade(prefixSpanSeqs,
+ parameter = list(support = 0.5, maxlen = 5 ))
+ resSeq = as(freqItemSeq, "data.frame")
+ resSeq
+
+ sequence support
+ 1 <{1}> 1.00
+ 2 <{2}> 1.00
+ 3 <{3}> 1.00
+ 4 <{4}> 0.75
+ 5 <{5}> 0.75
+ 6 <{6}> 0.75
+ 7 <{1},{6}> 0.50
+ 8 <{2},{6}> 0.50
+ 9 <{5},{6}> 0.50
+ 10 <{1,2},{6}> 0.50
+ 11 <{1},{4}> 0.50
+ 12 <{2},{4}> 0.50
+ 13 <{1,2},{4}> 0.50
+ 14 <{1},{3}> 1.00
+ 15 <{2},{3}> 0.75
+ 16 <{2,3}> 0.50
+ 17 <{3},{3}> 0.75
+ 18 <{4},{3}> 0.75
+ 19 <{5},{3}> 0.50
+ 20 <{6},{3}> 0.50
+ 21 <{5},{6},{3}> 0.50
+ 22 <{6},{2},{3}> 0.50
+ 23 <{5},{2},{3}> 0.50
+ 24 <{5},{1},{3}> 0.50
+ 25 <{2},{4},{3}> 0.50
+ 26 <{1},{4},{3}> 0.50
+ 27 <{1,2},{4},{3}> 0.50
+ 28 <{1},{3},{3}> 0.75
+ 29 <{1,2},{3}> 0.50
+ 30 <{1},{2},{3}> 0.50
+ 31 <{1},{2,3}> 0.50
+ 32 <{1},{2}> 1.00
+ 33 <{1,2}> 0.50
+ 34 <{3},{2}> 0.75
+ 35 <{4},{2}> 0.50
+ 36 <{5},{2}> 0.50
+ 37 <{6},{2}> 0.50
+ 38 <{5},{6},{2}> 0.50
+ 39 <{6},{3},{2}> 0.50
+ 40 <{5},{3},{2}> 0.50
+ 41 <{5},{1},{2}> 0.50
+ 42 <{4},{3},{2}> 0.50
+ 43 <{1},{3},{2}> 0.75
+ 44 <{5},{6},{3},{2}> 0.50
+ 45 <{5},{1},{3},{2}> 0.50
+ 46 <{1},{1}> 0.50
+ 47 <{2},{1}> 0.50
+ 48 <{3},{1}> 0.50
+ 49 <{5},{1}> 0.50
+ 50 <{2,3},{1}> 0.50
+ 51 <{1},{3},{1}> 0.50
+ 52 <{1},{2,3},{1}> 0.50
+ 53 <{1},{2},{1}> 0.50
+ */
+ val expectedValue = Array(
+ (Array(1), 4L),
+ (Array(2), 4L),
+ (Array(3), 4L),
+ (Array(4), 3L),
+ (Array(5), 3L),
+ (Array(6), 3L),
+ (Array(1, -1, 6), 2L),
+ (Array(2, -1, 6), 2L),
+ (Array(5, -1, 6), 2L),
+ (Array(1, 2, -1, 6), 2L),
+ (Array(1, -1, 4), 2L),
+ (Array(2, -1, 4), 2L),
+ (Array(1, 2, -1, 4), 2L),
+ (Array(1, -1, 3), 4L),
+ (Array(2, -1, 3), 3L),
+ (Array(2, 3), 2L),
+ (Array(3, -1, 3), 3L),
+ (Array(4, -1, 3), 3L),
+ (Array(5, -1, 3), 2L),
+ (Array(6, -1, 3), 2L),
+ (Array(5, -1, 6, -1, 3), 2L),
+ (Array(6, -1, 2, -1, 3), 2L),
+ (Array(5, -1, 2, -1, 3), 2L),
+ (Array(5, -1, 1, -1, 3), 2L),
+ (Array(2, -1, 4, -1, 3), 2L),
+ (Array(1, -1, 4, -1, 3), 2L),
+ (Array(1, 2, -1, 4, -1, 3), 2L),
+ (Array(1, -1, 3, -1, 3), 3L),
+ (Array(1, 2, -1, 3), 2L),
+ (Array(1, -1, 2, -1, 3), 2L),
+ (Array(1, -1, 2, 3), 2L),
+ (Array(1, -1, 2), 4L),
+ (Array(1, 2), 2L),
+ (Array(3, -1, 2), 3L),
+ (Array(4, -1, 2), 2L),
+ (Array(5, -1, 2), 2L),
+ (Array(6, -1, 2), 2L),
+ (Array(5, -1, 6, -1, 2), 2L),
+ (Array(6, -1, 3, -1, 2), 2L),
+ (Array(5, -1, 3, -1, 2), 2L),
+ (Array(5, -1, 1, -1, 2), 2L),
+ (Array(4, -1, 3, -1, 2), 2L),
+ (Array(1, -1, 3, -1, 2), 3L),
+ (Array(5, -1, 6, -1, 3, -1, 2), 2L),
+ (Array(5, -1, 1, -1, 3, -1, 2), 2L),
+ (Array(1, -1, 1), 2L),
+ (Array(2, -1, 1), 2L),
+ (Array(3, -1, 1), 2L),
+ (Array(5, -1, 1), 2L),
+ (Array(2, 3, -1, 1), 2L),
+ (Array(1, -1, 3, -1, 1), 2L),
+ (Array(1, -1, 2, 3, -1, 1), 2L),
+ (Array(1, -1, 2, -1, 1), 2L))
+
+ compareResults(expectedValue, result.collect())
}
private def compareResults(
- expectedValue: Array[(Array[Int], Long)],
- actualValue: Array[(Array[Int], Long)]): Boolean = {
- expectedValue.map(x => (x._1.toSeq, x._2)).toSet ==
- actualValue.map(x => (x._1.toSeq, x._2)).toSet
+ expectedValue: Array[(Array[Int], Long)],
+ actualValue: Array[(Array[Int], Long)]): Unit = {
+ val expectedSet = expectedValue.map(x => (x._1.toSeq, x._2)).toSet
+ val actualSet = actualValue.map(x => (x._1.toSeq, x._2)).toSet
+ assert(expectedSet === actualSet)
+ }
+
+ private def insertDelimiter(sequence: Array[Int]): Array[Int] = {
+ sequence.zip(Seq.fill(sequence.length)(PrefixSpan.DELIMITER)).map { case (a, b) =>
+ List(a, b)
+ }.flatten
}
}