aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorXiangrui Meng <meng@databricks.com>2015-08-12 20:44:40 -0700
committerXiangrui Meng <meng@databricks.com>2015-08-12 20:44:40 -0700
commitd7053bea985679c514b3add029631ea23e1730ce (patch)
tree287efbc8452d90c06a8f86720636ad42b21c3fc3
parentd2d5e7fe2df582e1c866334b3014d7cb351f5b70 (diff)
downloadspark-d7053bea985679c514b3add029631ea23e1730ce.tar.gz
spark-d7053bea985679c514b3add029631ea23e1730ce.tar.bz2
spark-d7053bea985679c514b3add029631ea23e1730ce.zip
[SPARK-9903] [MLLIB] skip local processing in PrefixSpan if there are no small prefixes
There exists a chance that the prefixes keep growing to the maximum pattern length. Then the final local processing step becomes unnecessary. feynmanliang Author: Xiangrui Meng <meng@databricks.com> Closes #8136 from mengxr/SPARK-9903.
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala37
1 files changed, 21 insertions, 16 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
index ad6715b52f..dc4ae1d0b6 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
@@ -282,25 +282,30 @@ object PrefixSpan extends Logging {
largePrefixes = newLargePrefixes
}
- // Switch to local processing.
- val bcSmallPrefixes = sc.broadcast(smallPrefixes)
- val distributedFreqPattern = postfixes.flatMap { postfix =>
- bcSmallPrefixes.value.values.map { prefix =>
- (prefix.id, postfix.project(prefix).compressed)
- }.filter(_._2.nonEmpty)
- }.groupByKey().flatMap { case (id, projPostfixes) =>
- val prefix = bcSmallPrefixes.value(id)
- val localPrefixSpan = new LocalPrefixSpan(minCount, maxPatternLength - prefix.length)
- // TODO: We collect projected postfixes into memory. We should also compare the performance
- // TODO: of keeping them on shuffle files.
- localPrefixSpan.run(projPostfixes.toArray).map { case (pattern, count) =>
- (prefix.items ++ pattern, count)
+ var freqPatterns = sc.parallelize(localFreqPatterns, 1)
+
+ val numSmallPrefixes = smallPrefixes.size
+ logInfo(s"number of small prefixes for local processing: $numSmallPrefixes")
+ if (numSmallPrefixes > 0) {
+ // Switch to local processing.
+ val bcSmallPrefixes = sc.broadcast(smallPrefixes)
+ val distributedFreqPattern = postfixes.flatMap { postfix =>
+ bcSmallPrefixes.value.values.map { prefix =>
+ (prefix.id, postfix.project(prefix).compressed)
+ }.filter(_._2.nonEmpty)
+ }.groupByKey().flatMap { case (id, projPostfixes) =>
+ val prefix = bcSmallPrefixes.value(id)
+ val localPrefixSpan = new LocalPrefixSpan(minCount, maxPatternLength - prefix.length)
+ // TODO: We collect projected postfixes into memory. We should also compare the performance
+ // TODO: of keeping them on shuffle files.
+ localPrefixSpan.run(projPostfixes.toArray).map { case (pattern, count) =>
+ (prefix.items ++ pattern, count)
+ }
}
+ // Union local frequent patterns and distributed ones.
+ freqPatterns = freqPatterns ++ distributedFreqPattern
}
- // Union local frequent patterns and distributed ones.
- val freqPatterns = (sc.parallelize(localFreqPatterns, 1) ++ distributedFreqPattern)
- .persist(StorageLevel.MEMORY_AND_DISK)
freqPatterns
}