From 095d1cb3aa0021c9078a6e910967b9189ddfa177 Mon Sep 17 00:00:00 2001 From: Syrux Date: Thu, 13 Apr 2017 09:44:33 +0100 Subject: [SPARK-20265][MLLIB] Improve Prefix'span pre-processing efficiency ## What changes were proposed in this pull request? Improve PrefixSpan pre-processing efficency by preventing sequences of zero in the cleaned database. The efficiency gain is reflected in the following graph : https://postimg.org/image/9x6ireuvn/ ## How was this patch tested? Using MLlib's PrefixSpan existing tests and tests of my own on the 8 datasets shown in the graph. All result obtained were stricly the same as the original implementation (without this change). dev/run-tests was also runned, no error were found. Author : Cyril de Vogelaere Author: Syrux Closes #17575 from Syrux/SPARK-20265. --- .../org/apache/spark/mllib/fpm/PrefixSpan.scala | 99 ++++++++++++++-------- .../apache/spark/mllib/fpm/PrefixSpanSuite.scala | 51 +++++++++++ 2 files changed, 115 insertions(+), 35 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 327cb974ef..3f8d65a378 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -144,45 +144,13 @@ class PrefixSpan private ( logInfo(s"minimum count for a frequent pattern: $minCount") // Find frequent items. - val freqItemAndCounts = data.flatMap { itemsets => - val uniqItems = mutable.Set.empty[Item] - itemsets.foreach { _.foreach { item => - uniqItems += item - }} - uniqItems.toIterator.map((_, 1L)) - }.reduceByKey(_ + _) - .filter { case (_, count) => - count >= minCount - }.collect() - val freqItems = freqItemAndCounts.sortBy(-_._2).map(_._1) + val freqItems = findFrequentItems(data, minCount) logInfo(s"number of frequent items: ${freqItems.length}") // Keep only frequent items from input sequences and convert them to internal storage. val itemToInt = freqItems.zipWithIndex.toMap - val dataInternalRepr = data.flatMap { itemsets => - val allItems = mutable.ArrayBuilder.make[Int] - var containsFreqItems = false - allItems += 0 - itemsets.foreach { itemsets => - val items = mutable.ArrayBuilder.make[Int] - itemsets.foreach { item => - if (itemToInt.contains(item)) { - items += itemToInt(item) + 1 // using 1-indexing in internal format - } - } - val result = items.result() - if (result.nonEmpty) { - containsFreqItems = true - allItems ++= result.sorted - } - allItems += 0 - } - if (containsFreqItems) { - Iterator.single(allItems.result()) - } else { - Iterator.empty - } - }.persist(StorageLevel.MEMORY_AND_DISK) + val dataInternalRepr = toDatabaseInternalRepr(data, itemToInt) + .persist(StorageLevel.MEMORY_AND_DISK) val results = genFreqPatterns(dataInternalRepr, minCount, maxPatternLength, maxLocalProjDBSize) @@ -231,6 +199,67 @@ class PrefixSpan private ( @Since("1.5.0") object PrefixSpan extends Logging { + /** + * This methods finds all frequent items in a input dataset. + * + * @param data Sequences of itemsets. + * @param minCount The minimal number of sequence an item should be present in to be frequent + * + * @return An array of Item containing only frequent items. + */ + private[fpm] def findFrequentItems[Item: ClassTag]( + data: RDD[Array[Array[Item]]], + minCount: Long): Array[Item] = { + + data.flatMap { itemsets => + val uniqItems = mutable.Set.empty[Item] + itemsets.foreach(set => uniqItems ++= set) + uniqItems.toIterator.map((_, 1L)) + }.reduceByKey(_ + _).filter { case (_, count) => + count >= minCount + }.sortBy(-_._2).map(_._1).collect() + } + + /** + * This methods cleans the input dataset from un-frequent items, and translate it's item + * to their corresponding Int identifier. + * + * @param data Sequences of itemsets. + * @param itemToInt A map allowing translation of frequent Items to their Int Identifier. + * The map should only contain frequent item. + * + * @return The internal repr of the inputted dataset. With properly placed zero delimiter. + */ + private[fpm] def toDatabaseInternalRepr[Item: ClassTag]( + data: RDD[Array[Array[Item]]], + itemToInt: Map[Item, Int]): RDD[Array[Int]] = { + + data.flatMap { itemsets => + val allItems = mutable.ArrayBuilder.make[Int] + var containsFreqItems = false + allItems += 0 + itemsets.foreach { itemsets => + val items = mutable.ArrayBuilder.make[Int] + itemsets.foreach { item => + if (itemToInt.contains(item)) { + items += itemToInt(item) + 1 // using 1-indexing in internal format + } + } + val result = items.result() + if (result.nonEmpty) { + containsFreqItems = true + allItems ++= result.sorted + allItems += 0 + } + } + if (containsFreqItems) { + Iterator.single(allItems.result()) + } else { + Iterator.empty + } + } + } + /** * Find the complete set of frequent sequential patterns in the input sequences. * @param data ordered sequences of itemsets. We represent a sequence internally as Array[Int], diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala index 4c2376376d..c2e08d078f 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala @@ -360,6 +360,49 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext { compareResults(expected, model.freqSequences.collect()) } + test("PrefixSpan pre-processing's cleaning test") { + + // One item per itemSet + val itemToInt1 = (4 to 5).zipWithIndex.toMap + val sequences1 = Seq( + Array(Array(4), Array(1), Array(2), Array(5), Array(2), Array(4), Array(5)), + Array(Array(6), Array(7), Array(8))) + val rdd1 = sc.parallelize(sequences1, 2).cache() + + val cleanedSequence1 = PrefixSpan.toDatabaseInternalRepr(rdd1, itemToInt1).collect() + + val expected1 = Array(Array(0, 4, 0, 5, 0, 4, 0, 5, 0)) + .map(_.map(x => if (x == 0) 0 else itemToInt1(x) + 1)) + + compareInternalSequences(expected1, cleanedSequence1) + + // Multi-item sequence + val itemToInt2 = (4 to 6).zipWithIndex.toMap + val sequences2 = Seq( + Array(Array(4, 5), Array(1, 6, 2), Array(2), Array(5), Array(2), Array(4), Array(5, 6, 7)), + Array(Array(8, 9), Array(1, 2))) + val rdd2 = sc.parallelize(sequences2, 2).cache() + + val cleanedSequence2 = PrefixSpan.toDatabaseInternalRepr(rdd2, itemToInt2).collect() + + val expected2 = Array(Array(0, 4, 5, 0, 6, 0, 5, 0, 4, 0, 5, 6, 0)) + .map(_.map(x => if (x == 0) 0 else itemToInt2(x) + 1)) + + compareInternalSequences(expected2, cleanedSequence2) + + // Emptied sequence + val itemToInt3 = (10 to 10).zipWithIndex.toMap + val sequences3 = Seq( + Array(Array(4, 5), Array(1, 6, 2), Array(2), Array(5), Array(2), Array(4), Array(5, 6, 7)), + Array(Array(8, 9), Array(1, 2))) + val rdd3 = sc.parallelize(sequences3, 2).cache() + + val cleanedSequence3 = PrefixSpan.toDatabaseInternalRepr(rdd3, itemToInt3).collect() + val expected3 = Array[Array[Int]]() + + compareInternalSequences(expected3, cleanedSequence3) + } + test("model save/load") { val sequences = Seq( Array(Array(1, 2), Array(3)), @@ -409,4 +452,12 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext { val actualSet = actualValue.map(x => (x._1.toSeq, x._2)).toSet assert(expectedSet === actualSet) } + + private def compareInternalSequences( + expectedValue: Array[Array[Int]], + actualValue: Array[Array[Int]]): Unit = { + val expectedSet = expectedValue.map(x => x.toSeq).toSet + val actualSet = actualValue.map(x => x.toSeq).toSet + assert(expectedSet === actualSet) + } } -- cgit v1.2.3