aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSyrux <pokcyril@hotmail.com>2017-04-13 09:44:33 +0100
committerSean Owen <sowen@cloudera.com>2017-04-13 09:44:33 +0100
commit095d1cb3aa0021c9078a6e910967b9189ddfa177 (patch)
tree50803658690156142ed3766207d7956a29d63496
parentec68d8f8cfdede8a0de1d56476205158544cc4eb (diff)
downloadspark-095d1cb3aa0021c9078a6e910967b9189ddfa177.tar.gz
spark-095d1cb3aa0021c9078a6e910967b9189ddfa177.tar.bz2
spark-095d1cb3aa0021c9078a6e910967b9189ddfa177.zip
[SPARK-20265][MLLIB] Improve Prefix'span pre-processing efficiency
## What changes were proposed in this pull request? Improve PrefixSpan pre-processing efficency by preventing sequences of zero in the cleaned database. The efficiency gain is reflected in the following graph : https://postimg.org/image/9x6ireuvn/ ## How was this patch tested? Using MLlib's PrefixSpan existing tests and tests of my own on the 8 datasets shown in the graph. All result obtained were stricly the same as the original implementation (without this change). dev/run-tests was also runned, no error were found. Author : Cyril de Vogelaere <cyril.devogelaeregmail.com> Author: Syrux <pokcyril@hotmail.com> Closes #17575 from Syrux/SPARK-20265.
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala99
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala51
2 files changed, 115 insertions, 35 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
index 327cb974ef..3f8d65a378 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
@@ -144,45 +144,13 @@ class PrefixSpan private (
logInfo(s"minimum count for a frequent pattern: $minCount")
// Find frequent items.
- val freqItemAndCounts = data.flatMap { itemsets =>
- val uniqItems = mutable.Set.empty[Item]
- itemsets.foreach { _.foreach { item =>
- uniqItems += item
- }}
- uniqItems.toIterator.map((_, 1L))
- }.reduceByKey(_ + _)
- .filter { case (_, count) =>
- count >= minCount
- }.collect()
- val freqItems = freqItemAndCounts.sortBy(-_._2).map(_._1)
+ val freqItems = findFrequentItems(data, minCount)
logInfo(s"number of frequent items: ${freqItems.length}")
// Keep only frequent items from input sequences and convert them to internal storage.
val itemToInt = freqItems.zipWithIndex.toMap
- val dataInternalRepr = data.flatMap { itemsets =>
- val allItems = mutable.ArrayBuilder.make[Int]
- var containsFreqItems = false
- allItems += 0
- itemsets.foreach { itemsets =>
- val items = mutable.ArrayBuilder.make[Int]
- itemsets.foreach { item =>
- if (itemToInt.contains(item)) {
- items += itemToInt(item) + 1 // using 1-indexing in internal format
- }
- }
- val result = items.result()
- if (result.nonEmpty) {
- containsFreqItems = true
- allItems ++= result.sorted
- }
- allItems += 0
- }
- if (containsFreqItems) {
- Iterator.single(allItems.result())
- } else {
- Iterator.empty
- }
- }.persist(StorageLevel.MEMORY_AND_DISK)
+ val dataInternalRepr = toDatabaseInternalRepr(data, itemToInt)
+ .persist(StorageLevel.MEMORY_AND_DISK)
val results = genFreqPatterns(dataInternalRepr, minCount, maxPatternLength, maxLocalProjDBSize)
@@ -232,6 +200,67 @@ class PrefixSpan private (
object PrefixSpan extends Logging {
/**
+ * This methods finds all frequent items in a input dataset.
+ *
+ * @param data Sequences of itemsets.
+ * @param minCount The minimal number of sequence an item should be present in to be frequent
+ *
+ * @return An array of Item containing only frequent items.
+ */
+ private[fpm] def findFrequentItems[Item: ClassTag](
+ data: RDD[Array[Array[Item]]],
+ minCount: Long): Array[Item] = {
+
+ data.flatMap { itemsets =>
+ val uniqItems = mutable.Set.empty[Item]
+ itemsets.foreach(set => uniqItems ++= set)
+ uniqItems.toIterator.map((_, 1L))
+ }.reduceByKey(_ + _).filter { case (_, count) =>
+ count >= minCount
+ }.sortBy(-_._2).map(_._1).collect()
+ }
+
+ /**
+ * This methods cleans the input dataset from un-frequent items, and translate it's item
+ * to their corresponding Int identifier.
+ *
+ * @param data Sequences of itemsets.
+ * @param itemToInt A map allowing translation of frequent Items to their Int Identifier.
+ * The map should only contain frequent item.
+ *
+ * @return The internal repr of the inputted dataset. With properly placed zero delimiter.
+ */
+ private[fpm] def toDatabaseInternalRepr[Item: ClassTag](
+ data: RDD[Array[Array[Item]]],
+ itemToInt: Map[Item, Int]): RDD[Array[Int]] = {
+
+ data.flatMap { itemsets =>
+ val allItems = mutable.ArrayBuilder.make[Int]
+ var containsFreqItems = false
+ allItems += 0
+ itemsets.foreach { itemsets =>
+ val items = mutable.ArrayBuilder.make[Int]
+ itemsets.foreach { item =>
+ if (itemToInt.contains(item)) {
+ items += itemToInt(item) + 1 // using 1-indexing in internal format
+ }
+ }
+ val result = items.result()
+ if (result.nonEmpty) {
+ containsFreqItems = true
+ allItems ++= result.sorted
+ allItems += 0
+ }
+ }
+ if (containsFreqItems) {
+ Iterator.single(allItems.result())
+ } else {
+ Iterator.empty
+ }
+ }
+ }
+
+ /**
* Find the complete set of frequent sequential patterns in the input sequences.
* @param data ordered sequences of itemsets. We represent a sequence internally as Array[Int],
* where each itemset is represented by a contiguous sequence of distinct and ordered
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala
index 4c2376376d..c2e08d078f 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala
@@ -360,6 +360,49 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
compareResults(expected, model.freqSequences.collect())
}
+ test("PrefixSpan pre-processing's cleaning test") {
+
+ // One item per itemSet
+ val itemToInt1 = (4 to 5).zipWithIndex.toMap
+ val sequences1 = Seq(
+ Array(Array(4), Array(1), Array(2), Array(5), Array(2), Array(4), Array(5)),
+ Array(Array(6), Array(7), Array(8)))
+ val rdd1 = sc.parallelize(sequences1, 2).cache()
+
+ val cleanedSequence1 = PrefixSpan.toDatabaseInternalRepr(rdd1, itemToInt1).collect()
+
+ val expected1 = Array(Array(0, 4, 0, 5, 0, 4, 0, 5, 0))
+ .map(_.map(x => if (x == 0) 0 else itemToInt1(x) + 1))
+
+ compareInternalSequences(expected1, cleanedSequence1)
+
+ // Multi-item sequence
+ val itemToInt2 = (4 to 6).zipWithIndex.toMap
+ val sequences2 = Seq(
+ Array(Array(4, 5), Array(1, 6, 2), Array(2), Array(5), Array(2), Array(4), Array(5, 6, 7)),
+ Array(Array(8, 9), Array(1, 2)))
+ val rdd2 = sc.parallelize(sequences2, 2).cache()
+
+ val cleanedSequence2 = PrefixSpan.toDatabaseInternalRepr(rdd2, itemToInt2).collect()
+
+ val expected2 = Array(Array(0, 4, 5, 0, 6, 0, 5, 0, 4, 0, 5, 6, 0))
+ .map(_.map(x => if (x == 0) 0 else itemToInt2(x) + 1))
+
+ compareInternalSequences(expected2, cleanedSequence2)
+
+ // Emptied sequence
+ val itemToInt3 = (10 to 10).zipWithIndex.toMap
+ val sequences3 = Seq(
+ Array(Array(4, 5), Array(1, 6, 2), Array(2), Array(5), Array(2), Array(4), Array(5, 6, 7)),
+ Array(Array(8, 9), Array(1, 2)))
+ val rdd3 = sc.parallelize(sequences3, 2).cache()
+
+ val cleanedSequence3 = PrefixSpan.toDatabaseInternalRepr(rdd3, itemToInt3).collect()
+ val expected3 = Array[Array[Int]]()
+
+ compareInternalSequences(expected3, cleanedSequence3)
+ }
+
test("model save/load") {
val sequences = Seq(
Array(Array(1, 2), Array(3)),
@@ -409,4 +452,12 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
val actualSet = actualValue.map(x => (x._1.toSeq, x._2)).toSet
assert(expectedSet === actualSet)
}
+
+ private def compareInternalSequences(
+ expectedValue: Array[Array[Int]],
+ actualValue: Array[Array[Int]]): Unit = {
+ val expectedSet = expectedValue.map(x => x.toSeq).toSet
+ val actualSet = actualValue.map(x => x.toSeq).toSet
+ assert(expectedSet === actualSet)
+ }
}