aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--data/mllib/sample_fpgrowth.txt6
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaFPGrowthExample.java43
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/mllib/FPGrowthExample.scala59
3 files changed, 81 insertions, 27 deletions
diff --git a/data/mllib/sample_fpgrowth.txt b/data/mllib/sample_fpgrowth.txt
new file mode 100644
index 0000000000..c451583e51
--- /dev/null
+++ b/data/mllib/sample_fpgrowth.txt
@@ -0,0 +1,6 @@
+r z h k p
+z y x w v u t s
+s x o n r
+x z y m t s q e
+z
+x z y r q t p
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaFPGrowthExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaFPGrowthExample.java
index f50e802cf6..36baf58687 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaFPGrowthExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaFPGrowthExample.java
@@ -25,32 +25,49 @@ import com.google.common.collect.Lists;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.Function;
import org.apache.spark.mllib.fpm.FPGrowth;
import org.apache.spark.mllib.fpm.FPGrowthModel;
/**
* Java example for mining frequent itemsets using FP-growth.
+ * Example usage: ./bin/run-example mllib.JavaFPGrowthExample ./data/mllib/sample_fpgrowth.txt
*/
public class JavaFPGrowthExample {
public static void main(String[] args) {
+ String inputFile;
+ double minSupport = 0.3;
+ int numPartition = -1;
+ if (args.length < 1) {
+ System.err.println(
+ "Usage: JavaFPGrowth <input_file> [minSupport] [numPartition]");
+ System.exit(1);
+ }
+ inputFile = args[0];
+ if (args.length >= 2) {
+ minSupport = Double.parseDouble(args[1]);
+ }
+ if (args.length >= 3) {
+ numPartition = Integer.parseInt(args[2]);
+ }
+
SparkConf sparkConf = new SparkConf().setAppName("JavaFPGrowthExample");
JavaSparkContext sc = new JavaSparkContext(sparkConf);
+ JavaRDD<ArrayList<String>> transactions = sc.textFile(inputFile).map(
+ new Function<String, ArrayList<String>>() {
+ @Override
+ public ArrayList<String> call(String s) {
+ return Lists.newArrayList(s.split(" "));
+ }
+ }
+ );
- // TODO: Read a user-specified input file.
- @SuppressWarnings("unchecked")
- JavaRDD<ArrayList<String>> transactions = sc.parallelize(Lists.newArrayList(
- Lists.newArrayList("r z h k p".split(" ")),
- Lists.newArrayList("z y x w v u t s".split(" ")),
- Lists.newArrayList("s x o n r".split(" ")),
- Lists.newArrayList("x z y m t s q e".split(" ")),
- Lists.newArrayList("z".split(" ")),
- Lists.newArrayList("x z y r q t p".split(" "))), 2);
-
- FPGrowth fpg = new FPGrowth()
- .setMinSupport(0.3);
- FPGrowthModel<String> model = fpg.run(transactions);
+ FPGrowthModel<String> model = new FPGrowth()
+ .setMinSupport(minSupport)
+ .setNumPartitions(numPartition)
+ .run(transactions);
for (FPGrowth.FreqItemset<String> s: model.freqItemsets().toJavaRDD().collect()) {
System.out.println("[" + Joiner.on(",").join(s.javaItems()) + "], " + s.freq());
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/FPGrowthExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/FPGrowthExample.scala
index aaae275ec5..13f24a1e59 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/FPGrowthExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/FPGrowthExample.scala
@@ -17,30 +17,61 @@
package org.apache.spark.examples.mllib
+import scopt.OptionParser
+
import org.apache.spark.mllib.fpm.FPGrowth
-import org.apache.spark.{SparkContext, SparkConf}
+import org.apache.spark.{SparkConf, SparkContext}
/**
* Example for mining frequent itemsets using FP-growth.
+ * Example usage: ./bin/run-example mllib.FPGrowthExample \
+ * --minSupport 0.8 --numPartition 2 ./data/mllib/sample_fpgrowth.txt
*/
object FPGrowthExample {
+ case class Params(
+ input: String = null,
+ minSupport: Double = 0.3,
+ numPartition: Int = -1) extends AbstractParams[Params]
+
def main(args: Array[String]) {
- val conf = new SparkConf().setAppName("FPGrowthExample")
+ val defaultParams = Params()
+
+ val parser = new OptionParser[Params]("FPGrowthExample") {
+ head("FPGrowth: an example FP-growth app.")
+ opt[Double]("minSupport")
+ .text(s"minimal support level, default: ${defaultParams.minSupport}")
+ .action((x, c) => c.copy(minSupport = x))
+ opt[Int]("numPartition")
+ .text(s"number of partition, default: ${defaultParams.numPartition}")
+ .action((x, c) => c.copy(numPartition = x))
+ arg[String]("<input>")
+ .text("input paths to input data set, whose file format is that each line " +
+ "contains a transaction with each item in String and separated by a space")
+ .required()
+ .action((x, c) => c.copy(input = x))
+ }
+
+ parser.parse(args, defaultParams).map { params =>
+ run(params)
+ }.getOrElse {
+ sys.exit(1)
+ }
+ }
+
+ def run(params: Params) {
+ val conf = new SparkConf().setAppName(s"FPGrowthExample with $params")
val sc = new SparkContext(conf)
+ val transactions = sc.textFile(params.input).map(_.split(" ")).cache()
+
+ println(s"Number of transactions: ${transactions.count()}")
+
+ val model = new FPGrowth()
+ .setMinSupport(params.minSupport)
+ .setNumPartitions(params.numPartition)
+ .run(transactions)
- // TODO: Read a user-specified input file.
- val transactions = sc.parallelize(Seq(
- "r z h k p",
- "z y x w v u t s",
- "s x o n r",
- "x z y m t s q e",
- "z",
- "x z y r q t p").map(_.split(" ")), numSlices = 2)
-
- val fpg = new FPGrowth()
- .setMinSupport(0.3)
- val model = fpg.run(transactions)
+ println(s"Number of frequent itemsets: ${model.freqItemsets.count()}")
model.freqItemsets.collect().foreach { itemset =>
println(itemset.items.mkString("[", ",", "]") + ", " + itemset.freq)