aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorZheng RuiFeng <ruifengz@foxmail.com>2016-03-17 11:09:02 +0200
committerNick Pentreath <nick.pentreath@gmail.com>2016-03-17 11:09:02 +0200
commit204c9dec2c3876d20558ef5bda4dbd6edaf59643 (patch)
treed8dc5f0a21f84359c039eaf9a37864cba0d8e26a
parent30c18841e40abe768c015104f156dacf02e520eb (diff)
downloadspark-204c9dec2c3876d20558ef5bda4dbd6edaf59643.tar.gz
spark-204c9dec2c3876d20558ef5bda4dbd6edaf59643.tar.bz2
spark-204c9dec2c3876d20558ef5bda4dbd6edaf59643.zip
[MINOR][DOC] Add JavaStreamingTestExample
## What changes were proposed in this pull request? Add the java example of StreamingTest ## How was this patch tested? manual tests in CLI: bin/run-example mllib.JavaStreamingTestExample dataDir 5 100 Author: Zheng RuiFeng <ruifengz@foxmail.com> Closes #11776 from zhengruifeng/streaming_je.
-rw-r--r--docs/mllib-statistics.md7
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java121
2 files changed, 128 insertions, 0 deletions
diff --git a/docs/mllib-statistics.md b/docs/mllib-statistics.md
index 652d215fa8..b773031bc7 100644
--- a/docs/mllib-statistics.md
+++ b/docs/mllib-statistics.md
@@ -544,6 +544,13 @@ provides streaming hypothesis testing.
{% include_example scala/org/apache/spark/examples/mllib/StreamingTestExample.scala %}
</div>
+
+<div data-lang="java" markdown="1">
+[`StreamingTest`](api/java/index.html#org.apache.spark.mllib.stat.test.StreamingTest)
+provides streaming hypothesis testing.
+
+{% include_example java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java %}
+</div>
</div>
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java
new file mode 100644
index 0000000000..2197ef9481
--- /dev/null
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.mllib;
+
+
+import org.apache.spark.Accumulator;
+import org.apache.spark.api.java.function.VoidFunction;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.Function;
+// $example on$
+import org.apache.spark.mllib.stat.test.BinarySample;
+import org.apache.spark.mllib.stat.test.StreamingTest;
+import org.apache.spark.mllib.stat.test.StreamingTestResult;
+// $example off$
+import org.apache.spark.SparkConf;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.Seconds;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.util.Utils;
+
+
+/**
+ * Perform streaming testing using Welch's 2-sample t-test on a stream of data, where the data
+ * stream arrives as text files in a directory. Stops when the two groups are statistically
+ * significant (p-value < 0.05) or after a user-specified timeout in number of batches is exceeded.
+ *
+ * The rows of the text files must be in the form `Boolean, Double`. For example:
+ * false, -3.92
+ * true, 99.32
+ *
+ * Usage:
+ * JavaStreamingTestExample <dataDir> <batchDuration> <numBatchesTimeout>
+ *
+ * To run on your local machine using the directory `dataDir` with 5 seconds between each batch and
+ * a timeout after 100 insignificant batches, call:
+ * $ bin/run-example mllib.JavaStreamingTestExample dataDir 5 100
+ *
+ * As you add text files to `dataDir` the significance test wil continually update every
+ * `batchDuration` seconds until the test becomes significant (p-value < 0.05) or the number of
+ * batches processed exceeds `numBatchesTimeout`.
+ */
+public class JavaStreamingTestExample {
+ public static void main(String[] args) {
+ if (args.length != 3) {
+ System.err.println("Usage: JavaStreamingTestExample " +
+ "<dataDir> <batchDuration> <numBatchesTimeout>");
+ System.exit(1);
+ }
+
+ String dataDir = args[0];
+ Duration batchDuration = Seconds.apply(Long.valueOf(args[1]));
+ int numBatchesTimeout = Integer.valueOf(args[2]);
+
+ SparkConf conf = new SparkConf().setMaster("local").setAppName("StreamingTestExample");
+ JavaStreamingContext ssc = new JavaStreamingContext(conf, batchDuration);
+
+ ssc.checkpoint(Utils.createTempDir(System.getProperty("java.io.tmpdir"), "spark").toString());
+
+ // $example on$
+ JavaDStream<BinarySample> data = ssc.textFileStream(dataDir).map(
+ new Function<String, BinarySample>() {
+ @Override
+ public BinarySample call(String line) throws Exception {
+ String[] ts = line.split(",");
+ boolean label = Boolean.valueOf(ts[0]);
+ double value = Double.valueOf(ts[1]);
+ return new BinarySample(label, value);
+ }
+ });
+
+ StreamingTest streamingTest = new StreamingTest()
+ .setPeacePeriod(0)
+ .setWindowSize(0)
+ .setTestMethod("welch");
+
+ JavaDStream<StreamingTestResult> out = streamingTest.registerStream(data);
+ out.print();
+ // $example off$
+
+ // Stop processing if test becomes significant or we time out
+ final Accumulator<Integer> timeoutCounter =
+ ssc.sparkContext().accumulator(numBatchesTimeout);
+
+ out.foreachRDD(new VoidFunction<JavaRDD<StreamingTestResult>>() {
+ @Override
+ public void call(JavaRDD<StreamingTestResult> rdd) throws Exception {
+ timeoutCounter.add(-1);
+
+ long cntSignificant = rdd.filter(new Function<StreamingTestResult, Boolean>() {
+ @Override
+ public Boolean call(StreamingTestResult v) throws Exception {
+ return v.pValue() < 0.05;
+ }
+ }).count();
+
+ if (timeoutCounter.value() <= 0 || cntSignificant > 0) {
+ rdd.context().stop();
+ }
+ }
+ });
+
+ ssc.start();
+ ssc.awaitTermination();
+ }
+}