diff options
author | Jacky Li <jacky.likun@huawei.com> | 2015-02-23 08:47:28 -0800 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2015-02-23 08:47:28 -0800 |
commit | 651a1c019eb911005e234a46cc559d63da352377 (patch) | |
tree | d2417c8ab58e204c8df20b43ede967f838e0847d /examples/src/main/scala | |
parent | 242d49584c6aa21d928db2552033661950f760a5 (diff) | |
download | spark-651a1c019eb911005e234a46cc559d63da352377.tar.gz spark-651a1c019eb911005e234a46cc559d63da352377.tar.bz2 spark-651a1c019eb911005e234a46cc559d63da352377.zip |
[SPARK-5939][MLLib] make FPGrowth example app take parameters
Add parameter parsing in FPGrowth example app in Scala and Java
And a sample data file is added in data/mllib folder
Author: Jacky Li <jacky.likun@huawei.com>
Closes #4714 from jackylk/parameter and squashes the following commits:
8c478b3 [Jacky Li] fix according to comments
3bb74f6 [Jacky Li] make FPGrowth exampl app take parameters
f0e4d10 [Jacky Li] make FPGrowth exampl app take parameters
Diffstat (limited to 'examples/src/main/scala')
-rw-r--r-- | examples/src/main/scala/org/apache/spark/examples/mllib/FPGrowthExample.scala | 59 |
1 files changed, 45 insertions, 14 deletions
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/FPGrowthExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/FPGrowthExample.scala index aaae275ec5..13f24a1e59 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/FPGrowthExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/FPGrowthExample.scala @@ -17,30 +17,61 @@ package org.apache.spark.examples.mllib +import scopt.OptionParser + import org.apache.spark.mllib.fpm.FPGrowth -import org.apache.spark.{SparkContext, SparkConf} +import org.apache.spark.{SparkConf, SparkContext} /** * Example for mining frequent itemsets using FP-growth. + * Example usage: ./bin/run-example mllib.FPGrowthExample \ + * --minSupport 0.8 --numPartition 2 ./data/mllib/sample_fpgrowth.txt */ object FPGrowthExample { + case class Params( + input: String = null, + minSupport: Double = 0.3, + numPartition: Int = -1) extends AbstractParams[Params] + def main(args: Array[String]) { - val conf = new SparkConf().setAppName("FPGrowthExample") + val defaultParams = Params() + + val parser = new OptionParser[Params]("FPGrowthExample") { + head("FPGrowth: an example FP-growth app.") + opt[Double]("minSupport") + .text(s"minimal support level, default: ${defaultParams.minSupport}") + .action((x, c) => c.copy(minSupport = x)) + opt[Int]("numPartition") + .text(s"number of partition, default: ${defaultParams.numPartition}") + .action((x, c) => c.copy(numPartition = x)) + arg[String]("<input>") + .text("input paths to input data set, whose file format is that each line " + + "contains a transaction with each item in String and separated by a space") + .required() + .action((x, c) => c.copy(input = x)) + } + + parser.parse(args, defaultParams).map { params => + run(params) + }.getOrElse { + sys.exit(1) + } + } + + def run(params: Params) { + val conf = new SparkConf().setAppName(s"FPGrowthExample with $params") val sc = new SparkContext(conf) + val transactions = sc.textFile(params.input).map(_.split(" ")).cache() + + println(s"Number of transactions: ${transactions.count()}") + + val model = new FPGrowth() + .setMinSupport(params.minSupport) + .setNumPartitions(params.numPartition) + .run(transactions) - // TODO: Read a user-specified input file. - val transactions = sc.parallelize(Seq( - "r z h k p", - "z y x w v u t s", - "s x o n r", - "x z y m t s q e", - "z", - "x z y r q t p").map(_.split(" ")), numSlices = 2) - - val fpg = new FPGrowth() - .setMinSupport(0.3) - val model = fpg.run(transactions) + println(s"Number of frequent itemsets: ${model.freqItemsets.count()}") model.freqItemsets.collect().foreach { itemset => println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq) |