aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorhyukjinkwon <gurwls223@gmail.com>2017-02-27 08:44:26 +0000
committerSean Owen <sowen@cloudera.com>2017-02-27 08:44:26 +0000
commit4ba9c6c453606f5e5a1e324d5f933d2c9307a604 (patch)
tree1c264800393b03714df7156ec515c2d445849f42 /streaming
parent9f8e392159ba65decddf62eb3cd85b6821db01b4 (diff)
downloadspark-4ba9c6c453606f5e5a1e324d5f933d2c9307a604.tar.gz
spark-4ba9c6c453606f5e5a1e324d5f933d2c9307a604.tar.bz2
spark-4ba9c6c453606f5e5a1e324d5f933d2c9307a604.zip
[MINOR][BUILD] Fix lint-java breaks in Java
## What changes were proposed in this pull request? This PR proposes to fix the lint-breaks as below: ``` [ERROR] src/test/java/org/apache/spark/network/TransportResponseHandlerSuite.java:[29,8] (imports) UnusedImports: Unused import - org.apache.spark.network.buffer.ManagedBuffer. [ERROR] src/main/java/org/apache/spark/unsafe/types/UTF8String.java:[156,10] (modifier) ModifierOrder: 'Nonnull' annotation modifier does not precede non-annotation modifiers. [ERROR] src/main/java/org/apache/spark/SparkFirehoseListener.java:[122] (sizes) LineLength: Line is longer than 100 characters (found 105). [ERROR] src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java:[164,78] (coding) OneStatementPerLine: Only one statement per line allowed. [ERROR] src/test/java/test/org/apache/spark/JavaAPISuite.java:[1157] (sizes) LineLength: Line is longer than 100 characters (found 121). [ERROR] src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java:[149] (sizes) LineLength: Line is longer than 100 characters (found 113). [ERROR] src/test/java/test/org/apache/spark/streaming/Java8APISuite.java:[146] (sizes) LineLength: Line is longer than 100 characters (found 122). [ERROR] src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java:[32,8] (imports) UnusedImports: Unused import - org.apache.spark.streaming.Time. [ERROR] src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java:[611] (sizes) LineLength: Line is longer than 100 characters (found 101). [ERROR] src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java:[1317] (sizes) LineLength: Line is longer than 100 characters (found 102). [ERROR] src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuite.java:[91] (sizes) LineLength: Line is longer than 100 characters (found 102). [ERROR] src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java:[113] (sizes) LineLength: Line is longer than 100 characters (found 101). [ERROR] src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java:[164] (sizes) LineLength: Line is longer than 100 characters (found 110). [ERROR] src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java:[212] (sizes) LineLength: Line is longer than 100 characters (found 114). [ERROR] src/test/java/org/apache/spark/mllib/tree/JavaDecisionTreeSuite.java:[36] (sizes) LineLength: Line is longer than 100 characters (found 101). [ERROR] src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java:[26,8] (imports) UnusedImports: Unused import - com.amazonaws.regions.RegionUtils. [ERROR] src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java:[20,8] (imports) UnusedImports: Unused import - com.amazonaws.regions.RegionUtils. [ERROR] src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java:[94] (sizes) LineLength: Line is longer than 100 characters (found 103). [ERROR] src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java:[30,8] (imports) UnusedImports: Unused import - org.apache.spark.sql.api.java.UDF1. [ERROR] src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java:[72] (sizes) LineLength: Line is longer than 100 characters (found 104). [ERROR] src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java:[121] (sizes) LineLength: Line is longer than 100 characters (found 101). [ERROR] src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java:[28,8] (imports) UnusedImports: Unused import - org.apache.spark.api.java.JavaRDD. [ERROR] src/main/java/org/apache/spark/examples/sql/JavaSQLDataSourceExample.java:[29,8] (imports) UnusedImports: Unused import - org.apache.spark.api.java.JavaSparkContext. ``` ## How was this patch tested? Manually via ```bash ./dev/lint-java ``` Author: hyukjinkwon <gurwls223@gmail.com> Closes #17072 from HyukjinKwon/java-lint.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java4
-rw-r--r--streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java65
-rw-r--r--streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java7
3 files changed, 44 insertions, 32 deletions
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
index cb8ed83e5a..b1367b8f2a 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
@@ -145,8 +145,8 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements
List<Set<Tuple2<K, S>>> expectedStateSnapshots) {
int numBatches = expectedOutputs.size();
JavaDStream<K> inputStream = JavaTestUtils.attachTestInputStream(ssc, input, 2);
- JavaMapWithStateDStream<K, Integer, S, T> mapWithStateDStream =
- JavaPairDStream.fromJavaDStream(inputStream.map(x -> new Tuple2<>(x, 1))).mapWithState(mapWithStateSpec);
+ JavaMapWithStateDStream<K, Integer, S, T> mapWithStateDStream = JavaPairDStream.fromJavaDStream(
+ inputStream.map(x -> new Tuple2<>(x, 1))).mapWithState(mapWithStateSpec);
List<Set<T>> collectedOutputs =
Collections.synchronizedList(new ArrayList<Set<T>>());
diff --git a/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java
index 9948a4074c..80513de4ee 100644
--- a/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java
+++ b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java
@@ -20,10 +20,13 @@ package test.org.apache.spark.streaming;
import java.io.Serializable;
import java.util.*;
+import org.apache.spark.api.java.function.Function3;
+import org.apache.spark.api.java.function.Function4;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.JavaTestUtils;
import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.State;
import org.apache.spark.streaming.StateSpec;
import org.apache.spark.streaming.Time;
import scala.Tuple2;
@@ -142,8 +145,8 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
Arrays.asList(24));
JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<Integer> reducedWindowed = stream.reduceByWindow((x, y) -> x + y,
- (x, y) -> x - y, new Duration(2000), new Duration(1000));
+ JavaDStream<Integer> reducedWindowed = stream.reduceByWindow(
+ (x, y) -> x + y, (x, y) -> x - y, new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(reducedWindowed);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
@@ -850,36 +853,44 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
JavaPairRDD<String, Boolean> initialRDD = null;
JavaPairDStream<String, Integer> wordsDstream = null;
+ Function4<Time, String, Optional<Integer>, State<Boolean>, Optional<Double>> mapFn =
+ (time, key, value, state) -> {
+ // Use all State's methods here
+ state.exists();
+ state.get();
+ state.isTimingOut();
+ state.remove();
+ state.update(true);
+ return Optional.of(2.0);
+ };
+
JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream =
- wordsDstream.mapWithState(
- StateSpec.<String, Integer, Boolean, Double>function((time, key, value, state) -> {
- // Use all State's methods here
- state.exists();
- state.get();
- state.isTimingOut();
- state.remove();
- state.update(true);
- return Optional.of(2.0);
- }).initialState(initialRDD)
- .numPartitions(10)
- .partitioner(new HashPartitioner(10))
- .timeout(Durations.seconds(10)));
+ wordsDstream.mapWithState(
+ StateSpec.function(mapFn)
+ .initialState(initialRDD)
+ .numPartitions(10)
+ .partitioner(new HashPartitioner(10))
+ .timeout(Durations.seconds(10)));
JavaPairDStream<String, Boolean> emittedRecords = stateDstream.stateSnapshots();
+ Function3<String, Optional<Integer>, State<Boolean>, Double> mapFn2 =
+ (key, value, state) -> {
+ state.exists();
+ state.get();
+ state.isTimingOut();
+ state.remove();
+ state.update(true);
+ return 2.0;
+ };
+
JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream2 =
- wordsDstream.mapWithState(
- StateSpec.<String, Integer, Boolean, Double>function((key, value, state) -> {
- state.exists();
- state.get();
- state.isTimingOut();
- state.remove();
- state.update(true);
- return 2.0;
- }).initialState(initialRDD)
- .numPartitions(10)
- .partitioner(new HashPartitioner(10))
- .timeout(Durations.seconds(10)));
+ wordsDstream.mapWithState(
+ StateSpec.function(mapFn2)
+ .initialState(initialRDD)
+ .numPartitions(10)
+ .partitioner(new HashPartitioner(10))
+ .timeout(Durations.seconds(10)));
JavaPairDStream<String, Boolean> mappedDStream = stateDstream2.stateSnapshots();
}
diff --git a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
index b966cbdca0..96f8d9593d 100644
--- a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
@@ -29,7 +29,6 @@ import org.apache.spark.streaming.LocalJavaStreamingContext;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.StreamingContextState;
import org.apache.spark.streaming.StreamingContextSuite;
-import org.apache.spark.streaming.Time;
import scala.Tuple2;
import org.apache.hadoop.conf.Configuration;
@@ -608,7 +607,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Arrays.asList("a","t","h","l","e","t","i","c","s"));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<String> flatMapped = stream.flatMap(x -> Arrays.asList(x.split("(?!^)")).iterator());
+ JavaDStream<String> flatMapped =
+ stream.flatMap(x -> Arrays.asList(x.split("(?!^)")).iterator());
JavaTestUtils.attachTestOutputStream(flatMapped);
List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -1314,7 +1314,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
ssc, inputData, 1);
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<String, String> mapped = pairStream.mapValues(s -> s.toUpperCase(Locale.ENGLISH));
+ JavaPairDStream<String, String> mapped =
+ pairStream.mapValues(s -> s.toUpperCase(Locale.ENGLISH));
JavaTestUtils.attachTestOutputStream(mapped);
List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);