diff options
author | Zheng RuiFeng <ruifengz@foxmail.com> | 2016-03-18 12:34:14 +0000 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-03-18 12:34:14 +0000 |
commit | 53f32a22daa40b713fef477054290d8adbeb6f71 (patch) | |
tree | ecb703f194a139d798b937d6e62b16d8530e0207 | |
parent | 0f1015ffdd40cd8647f6acdd5cdd717b883e4875 (diff) | |
download | spark-53f32a22daa40b713fef477054290d8adbeb6f71.tar.gz spark-53f32a22daa40b713fef477054290d8adbeb6f71.tar.bz2 spark-53f32a22daa40b713fef477054290d8adbeb6f71.zip |
[MINOR][DOC] Fix nits in JavaStreamingTestExample
## What changes were proposed in this pull request?
Fix some nits discussed in https://github.com/apache/spark/pull/11776#issuecomment-198207419
use !rdd.isEmpty instead of rdd.count > 0
use static instead of AtomicInteger
remove unneeded "throws Exception"
## How was this patch tested?
manual tests
Author: Zheng RuiFeng <ruifengz@foxmail.com>
Closes #11821 from zhengruifeng/je_fix.
-rw-r--r-- | examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java | 20 |
1 files changed, 11 insertions, 9 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java index 2197ef9481..4c8755916c 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java @@ -56,6 +56,9 @@ import org.apache.spark.util.Utils; * batches processed exceeds `numBatchesTimeout`. */ public class JavaStreamingTestExample { + + private static int timeoutCounter = 0; + public static void main(String[] args) { if (args.length != 3) { System.err.println("Usage: JavaStreamingTestExample " + @@ -76,7 +79,7 @@ public class JavaStreamingTestExample { JavaDStream<BinarySample> data = ssc.textFileStream(dataDir).map( new Function<String, BinarySample>() { @Override - public BinarySample call(String line) throws Exception { + public BinarySample call(String line) { String[] ts = line.split(","); boolean label = Boolean.valueOf(ts[0]); double value = Double.valueOf(ts[1]); @@ -94,22 +97,21 @@ public class JavaStreamingTestExample { // $example off$ // Stop processing if test becomes significant or we time out - final Accumulator<Integer> timeoutCounter = - ssc.sparkContext().accumulator(numBatchesTimeout); + timeoutCounter = numBatchesTimeout; out.foreachRDD(new VoidFunction<JavaRDD<StreamingTestResult>>() { @Override - public void call(JavaRDD<StreamingTestResult> rdd) throws Exception { - timeoutCounter.add(-1); + public void call(JavaRDD<StreamingTestResult> rdd) { + timeoutCounter -= 1; - long cntSignificant = rdd.filter(new Function<StreamingTestResult, Boolean>() { + boolean anySignificant = !rdd.filter(new Function<StreamingTestResult, Boolean>() { @Override - public Boolean call(StreamingTestResult v) throws Exception { + public Boolean call(StreamingTestResult v) { return v.pValue() < 0.05; } - }).count(); + }).isEmpty(); - if (timeoutCounter.value() <= 0 || cntSignificant > 0) { + if (timeoutCounter <= 0 || anySignificant) { rdd.context().stop(); } } |