aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorZheng RuiFeng <ruifengz@foxmail.com>2016-03-18 12:34:14 +0000
committerSean Owen <sowen@cloudera.com>2016-03-18 12:34:14 +0000
commit53f32a22daa40b713fef477054290d8adbeb6f71 (patch)
treeecb703f194a139d798b937d6e62b16d8530e0207 /examples
parent0f1015ffdd40cd8647f6acdd5cdd717b883e4875 (diff)
downloadspark-53f32a22daa40b713fef477054290d8adbeb6f71.tar.gz
spark-53f32a22daa40b713fef477054290d8adbeb6f71.tar.bz2
spark-53f32a22daa40b713fef477054290d8adbeb6f71.zip
[MINOR][DOC] Fix nits in JavaStreamingTestExample
## What changes were proposed in this pull request? Fix some nits discussed in https://github.com/apache/spark/pull/11776#issuecomment-198207419 use !rdd.isEmpty instead of rdd.count > 0 use static instead of AtomicInteger remove unneeded "throws Exception" ## How was this patch tested? manual tests Author: Zheng RuiFeng <ruifengz@foxmail.com> Closes #11821 from zhengruifeng/je_fix.
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java20
1 files changed, 11 insertions, 9 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java
index 2197ef9481..4c8755916c 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java
@@ -56,6 +56,9 @@ import org.apache.spark.util.Utils;
* batches processed exceeds `numBatchesTimeout`.
*/
public class JavaStreamingTestExample {
+
+ private static int timeoutCounter = 0;
+
public static void main(String[] args) {
if (args.length != 3) {
System.err.println("Usage: JavaStreamingTestExample " +
@@ -76,7 +79,7 @@ public class JavaStreamingTestExample {
JavaDStream<BinarySample> data = ssc.textFileStream(dataDir).map(
new Function<String, BinarySample>() {
@Override
- public BinarySample call(String line) throws Exception {
+ public BinarySample call(String line) {
String[] ts = line.split(",");
boolean label = Boolean.valueOf(ts[0]);
double value = Double.valueOf(ts[1]);
@@ -94,22 +97,21 @@ public class JavaStreamingTestExample {
// $example off$
// Stop processing if test becomes significant or we time out
- final Accumulator<Integer> timeoutCounter =
- ssc.sparkContext().accumulator(numBatchesTimeout);
+ timeoutCounter = numBatchesTimeout;
out.foreachRDD(new VoidFunction<JavaRDD<StreamingTestResult>>() {
@Override
- public void call(JavaRDD<StreamingTestResult> rdd) throws Exception {
- timeoutCounter.add(-1);
+ public void call(JavaRDD<StreamingTestResult> rdd) {
+ timeoutCounter -= 1;
- long cntSignificant = rdd.filter(new Function<StreamingTestResult, Boolean>() {
+ boolean anySignificant = !rdd.filter(new Function<StreamingTestResult, Boolean>() {
@Override
- public Boolean call(StreamingTestResult v) throws Exception {
+ public Boolean call(StreamingTestResult v) {
return v.pValue() < 0.05;
}
- }).count();
+ }).isEmpty();
- if (timeoutCounter.value() <= 0 || cntSignificant > 0) {
+ if (timeoutCounter <= 0 || anySignificant) {
rdd.context().stop();
}
}