aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorFeynman Liang <fliang@databricks.com>2015-08-01 23:11:25 -0700
committerXiangrui Meng <meng@databricks.com>2015-08-01 23:11:25 -0700
commit28d944e86d066eb4c651dd803f0b022605ed644e (patch)
tree8d256c51c7c21abd47949b0f223ffef1e06babbd /mllib
parent57084e0c7c318912208ee31c52d61c14eeddd8f4 (diff)
downloadspark-28d944e86d066eb4c651dd803f0b022605ed644e.tar.gz
spark-28d944e86d066eb4c651dd803f0b022605ed644e.tar.bz2
spark-28d944e86d066eb4c651dd803f0b022605ed644e.zip
[SPARK-9000] [MLLIB] Support generic item types in PrefixSpan
mengxr Please review after #7818 merges and master is rebased. Continues work by rikima Closes #7400 Author: Feynman Liang <fliang@databricks.com> Author: masaki rikitoku <rikima3132@gmail.com> Closes #7837 from feynmanliang/SPARK-7400-genericItems and squashes the following commits: 8b2c756 [Feynman Liang] Remove orig 92443c8 [Feynman Liang] Style fixes 42c6349 [Feynman Liang] Style fix 14e67fc [Feynman Liang] Generic prefixSpan itemtypes b3b21e0 [Feynman Liang] Initial support for generic itemtype in public api b86e0d5 [masaki rikitoku] modify to support generic item type
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala40
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala104
2 files changed, 132 insertions, 12 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
index 22b4ddb8b3..c1761c3642 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
@@ -18,6 +18,7 @@
package org.apache.spark.mllib.fpm
import scala.collection.mutable.ArrayBuilder
+import scala.reflect.ClassTag
import org.apache.spark.Logging
import org.apache.spark.annotation.Experimental
@@ -90,15 +91,44 @@ class PrefixSpan private (
}
/**
- * Find the complete set of sequential patterns in the input sequences.
+ * Find the complete set of sequential patterns in the input sequences of itemsets.
+ * @param data ordered sequences of itemsets.
+ * @return (sequential itemset pattern, count) tuples
+ */
+ def run[Item: ClassTag](data: RDD[Array[Array[Item]]]): RDD[(Array[Array[Item]], Long)] = {
+ val itemToInt = data.aggregate(Set[Item]())(
+ seqOp = { (uniqItems, item) => uniqItems ++ item.flatten.toSet },
+ combOp = { _ ++ _ }
+ ).zipWithIndex.toMap
+ val intToItem = Map() ++ (itemToInt.map { case (k, v) => (v, k) })
+
+ val dataInternalRepr = data.map { seq =>
+ seq.map(itemset => itemset.map(itemToInt)).reduce((a, b) => a ++ (DELIMITER +: b))
+ }
+ val results = run(dataInternalRepr)
+
+ def toPublicRepr(pattern: Iterable[Int]): List[Array[Item]] = {
+ pattern.span(_ != DELIMITER) match {
+ case (x, xs) if xs.size > 1 => x.map(intToItem).toArray :: toPublicRepr(xs.tail)
+ case (x, xs) => List(x.map(intToItem).toArray)
+ }
+ }
+ results.map { case (seq: Array[Int], count: Long) =>
+ (toPublicRepr(seq).toArray, count)
+ }
+ }
+
+ /**
+ * Find the complete set of sequential patterns in the input sequences. This method utilizes
+ * the internal representation of itemsets as Array[Int] where each itemset is represented by
+ * a contiguous sequence of non-negative integers and delimiters represented by [[DELIMITER]].
* @param data ordered sequences of itemsets. Items are represented by non-negative integers.
- * Each itemset has one or more items and is delimited by [[DELIMITER]].
+ * Each itemset has one or more items and is delimited by [[DELIMITER]].
* @return a set of sequential pattern pairs,
* the key of pair is pattern (a list of elements),
* the value of pair is the pattern's count.
*/
- // TODO: generalize to arbitrary item-types and use mapping to Ints for internal algorithm
- def run(data: RDD[Array[Int]]): RDD[(Array[Int], Long)] = {
+ private[fpm] def run(data: RDD[Array[Int]]): RDD[(Array[Int], Long)] = {
val sc = data.sparkContext
if (data.getStorageLevel == StorageLevel.NONE) {
@@ -260,7 +290,7 @@ class PrefixSpan private (
private[fpm] object PrefixSpan {
private[fpm] val DELIMITER = -1
- /** Splits a sequence of itemsets delimited by [[DELIMITER]]. */
+ /** Splits an array of itemsets delimited by [[DELIMITER]]. */
private[fpm] def splitSequence(sequence: List[Int]): List[Set[Int]] = {
sequence.span(_ != DELIMITER) match {
case (x, xs) if xs.length > 1 => x.toSet :: splitSequence(xs.tail)
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala
index 457f32670f..d87f61e385 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala
@@ -21,7 +21,7 @@ import org.apache.spark.mllib.util.MLlibTestSparkContext
class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
- test("PrefixSpan using Integer type, singleton itemsets") {
+ test("PrefixSpan internal (integer seq, -1 delim) run, singleton itemsets") {
/*
library("arulesSequences")
@@ -69,7 +69,7 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
(Array(4, -1, 5), 2L),
(Array(5), 3L)
)
- compareResults(expectedValue1, result1.collect())
+ compareInternalResults(expectedValue1, result1.collect())
prefixspan.setMinSupport(0.5).setMaxPatternLength(50)
val result2 = prefixspan.run(rdd)
@@ -80,7 +80,7 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
(Array(4), 4L),
(Array(5), 3L)
)
- compareResults(expectedValue2, result2.collect())
+ compareInternalResults(expectedValue2, result2.collect())
prefixspan.setMinSupport(0.33).setMaxPatternLength(2)
val result3 = prefixspan.run(rdd)
@@ -100,10 +100,10 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
(Array(4, -1, 5), 2L),
(Array(5), 3L)
)
- compareResults(expectedValue3, result3.collect())
+ compareInternalResults(expectedValue3, result3.collect())
}
- test("PrefixSpan using Integer type, variable-size itemsets") {
+ test("PrefixSpan internal (integer seq, -1 delim) run, variable-size itemsets") {
val sequences = Array(
Array(1, -1, 1, 2, 3, -1, 1, 3, -1, 4, -1, 3, 6),
Array(1, 4, -1, 3, -1, 2, 3, -1, 1, 5),
@@ -254,10 +254,100 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
(Array(1, -1, 2, 3, -1, 1), 2L),
(Array(1, -1, 2, -1, 1), 2L))
- compareResults(expectedValue, result.collect())
+ compareInternalResults(expectedValue, result.collect())
}
- private def compareResults(
+ test("PrefixSpan Integer type, variable-size itemsets") {
+ val sequences = Seq(
+ Array(Array(1, 2), Array(3)),
+ Array(Array(1), Array(3, 2), Array(1, 2)),
+ Array(Array(1, 2), Array(5)),
+ Array(Array(6)))
+ val rdd = sc.parallelize(sequences, 2).cache()
+
+ val prefixspan = new PrefixSpan()
+ .setMinSupport(0.5)
+ .setMaxPatternLength(5)
+
+ /*
+ To verify results, create file "prefixSpanSeqs2" with content
+ (format = (transactionID, idxInTransaction, numItemsinItemset, itemset)):
+ 1 1 2 1 2
+ 1 2 1 3
+ 2 1 1 1
+ 2 2 2 3 2
+ 2 3 2 1 2
+ 3 1 2 1 2
+ 3 2 1 5
+ 4 1 1 6
+ In R, run:
+ library("arulesSequences")
+ prefixSpanSeqs = read_baskets("prefixSpanSeqs", info = c("sequenceID","eventID","SIZE"))
+ freqItemSeq = cspade(prefixSpanSeqs,
+ parameter = 0.5, maxlen = 5 ))
+ resSeq = as(freqItemSeq, "data.frame")
+ resSeq
+
+ sequence support
+ 1 <{1}> 0.75
+ 2 <{2}> 0.75
+ 3 <{3}> 0.50
+ 4 <{1},{3}> 0.50
+ 5 <{1,2}> 0.75
+ */
+
+ val result = prefixspan.run(rdd)
+ val expected = Array(
+ (Array(Array(1)), 3L),
+ (Array(Array(2)), 3L),
+ (Array(Array(3)), 2L),
+ (Array(Array(1), Array(3)), 2L),
+ (Array(Array(1, 2)), 3L)
+ )
+ compareResults(expected, result.collect())
+ }
+
+ test("PrefixSpan String type, variable-size itemsets") {
+ // This is the same test as "PrefixSpan Int type, variable-size itemsets" except
+ // mapped to Strings
+ val intToString = (1 to 6).zip(Seq("a", "b", "c", "d", "e", "f")).toMap
+ val sequences = Seq(
+ Array(Array(1, 2), Array(3)),
+ Array(Array(1), Array(3, 2), Array(1, 2)),
+ Array(Array(1, 2), Array(5)),
+ Array(Array(6))).map(seq => seq.map(itemSet => itemSet.map(intToString)))
+ val rdd = sc.parallelize(sequences, 2).cache()
+
+ val prefixspan = new PrefixSpan()
+ .setMinSupport(0.5)
+ .setMaxPatternLength(5)
+
+ val result = prefixspan.run(rdd)
+ val expected = Array(
+ (Array(Array(1)), 3L),
+ (Array(Array(2)), 3L),
+ (Array(Array(3)), 2L),
+ (Array(Array(1), Array(3)), 2L),
+ (Array(Array(1, 2)), 3L)
+ ).map { case (pattern, count) =>
+ (pattern.map(itemSet => itemSet.map(intToString)), count)
+ }
+ compareResults(expected, result.collect())
+ }
+
+ private def compareResults[Item](
+ expectedValue: Array[(Array[Array[Item]], Long)],
+ actualValue: Array[(Array[Array[Item]], Long)]): Unit = {
+ val expectedSet = expectedValue.map { case (pattern: Array[Array[Item]], count: Long) =>
+ (pattern.map(itemSet => itemSet.toSet).toSeq, count)
+ }.toSet
+ val actualSet = actualValue.map { case (pattern: Array[Array[Item]], count: Long) =>
+ (pattern.map(itemSet => itemSet.toSet).toSeq, count)
+ }.toSet
+ assert(expectedSet === actualSet)
+ }
+
+ private def compareInternalResults(
expectedValue: Array[(Array[Int], Long)],
actualValue: Array[(Array[Int], Long)]): Unit = {
val expectedSet = expectedValue.map(x => (x._1.toSeq, x._2)).toSet