From 53f32a22daa40b713fef477054290d8adbeb6f71 Mon Sep 17 00:00:00 2001 From: Zheng RuiFeng Date: Fri, 18 Mar 2016 12:34:14 +0000 Subject: [MINOR][DOC] Fix nits in JavaStreamingTestExample ## What changes were proposed in this pull request? Fix some nits discussed in https://github.com/apache/spark/pull/11776#issuecomment-198207419 use !rdd.isEmpty instead of rdd.count > 0 use static instead of AtomicInteger remove unneeded "throws Exception" ## How was this patch tested? manual tests Author: Zheng RuiFeng Closes #11821 from zhengruifeng/je_fix. --- .../examples/mllib/JavaStreamingTestExample.java | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java index 2197ef9481..4c8755916c 100644 --- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java @@ -56,6 +56,9 @@ import org.apache.spark.util.Utils; * batches processed exceeds `numBatchesTimeout`. */ public class JavaStreamingTestExample { + + private static int timeoutCounter = 0; + public static void main(String[] args) { if (args.length != 3) { System.err.println("Usage: JavaStreamingTestExample " + @@ -76,7 +79,7 @@ public class JavaStreamingTestExample { JavaDStream data = ssc.textFileStream(dataDir).map( new Function() { @Override - public BinarySample call(String line) throws Exception { + public BinarySample call(String line) { String[] ts = line.split(","); boolean label = Boolean.valueOf(ts[0]); double value = Double.valueOf(ts[1]); @@ -94,22 +97,21 @@ public class JavaStreamingTestExample { // $example off$ // Stop processing if test becomes significant or we time out - final Accumulator timeoutCounter = - ssc.sparkContext().accumulator(numBatchesTimeout); + timeoutCounter = numBatchesTimeout; out.foreachRDD(new VoidFunction>() { @Override - public void call(JavaRDD rdd) throws Exception { - timeoutCounter.add(-1); + public void call(JavaRDD rdd) { + timeoutCounter -= 1; - long cntSignificant = rdd.filter(new Function() { + boolean anySignificant = !rdd.filter(new Function() { @Override - public Boolean call(StreamingTestResult v) throws Exception { + public Boolean call(StreamingTestResult v) { return v.pValue() < 0.05; } - }).count(); + }).isEmpty(); - if (timeoutCounter.value() <= 0 || cntSignificant > 0) { + if (timeoutCounter <= 0 || anySignificant) { rdd.context().stop(); } } -- cgit v1.2.3