aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--common/network-common/src/test/java/org/apache/spark/network/TransportResponseHandlerSuite.java1
-rw-r--r--common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java3
-rw-r--r--core/src/main/java/org/apache/spark/SparkFirehoseListener.java225
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java4
-rw-r--r--core/src/test/java/test/org/apache/spark/JavaAPISuite.java109
-rw-r--r--examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java4
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java4
-rw-r--r--external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java1
-rw-r--r--external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java4
-rw-r--r--mllib/src/test/java/org/apache/spark/mllib/tree/JavaDecisionTreeSuite.java3
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java2
-rw-r--r--sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuite.java2
-rw-r--r--sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java26
-rw-r--r--sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFRawList.java1
-rw-r--r--sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFRawMap.java1
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java4
-rw-r--r--streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java65
-rw-r--r--streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java7
18 files changed, 245 insertions, 221 deletions
diff --git a/common/network-common/src/test/java/org/apache/spark/network/TransportResponseHandlerSuite.java b/common/network-common/src/test/java/org/apache/spark/network/TransportResponseHandlerSuite.java
index 4477c9a935..09fc80d12d 100644
--- a/common/network-common/src/test/java/org/apache/spark/network/TransportResponseHandlerSuite.java
+++ b/common/network-common/src/test/java/org/apache/spark/network/TransportResponseHandlerSuite.java
@@ -26,7 +26,6 @@ import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.*;
-import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NioManagedBuffer;
import org.apache.spark.network.client.ChunkReceivedCallback;
import org.apache.spark.network.client.RpcResponseCallback;
diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
index 87b9e8eb44..10a7cb1d06 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
@@ -153,7 +153,8 @@ public final class UTF8String implements Comparable<UTF8String>, Externalizable,
*
* Unlike getBytes this will not create a copy the array if this is a slice.
*/
- public @Nonnull ByteBuffer getByteBuffer() {
+ @Nonnull
+ public ByteBuffer getByteBuffer() {
if (base instanceof byte[] && offset >= BYTE_ARRAY_OFFSET) {
final byte[] bytes = (byte[]) base;
diff --git a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
index 9fe97b4d9c..140c52fd12 100644
--- a/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
+++ b/core/src/main/java/org/apache/spark/SparkFirehoseListener.java
@@ -30,116 +30,117 @@ import org.apache.spark.scheduler.*;
*/
public class SparkFirehoseListener implements SparkListenerInterface {
- public void onEvent(SparkListenerEvent event) { }
-
- @Override
- public final void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
- onEvent(stageCompleted);
- }
-
- @Override
- public final void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) {
- onEvent(stageSubmitted);
- }
-
- @Override
- public final void onTaskStart(SparkListenerTaskStart taskStart) {
- onEvent(taskStart);
- }
-
- @Override
- public final void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) {
- onEvent(taskGettingResult);
- }
-
- @Override
- public final void onTaskEnd(SparkListenerTaskEnd taskEnd) {
- onEvent(taskEnd);
- }
-
- @Override
- public final void onJobStart(SparkListenerJobStart jobStart) {
- onEvent(jobStart);
- }
-
- @Override
- public final void onJobEnd(SparkListenerJobEnd jobEnd) {
- onEvent(jobEnd);
- }
-
- @Override
- public final void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) {
- onEvent(environmentUpdate);
- }
-
- @Override
- public final void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) {
- onEvent(blockManagerAdded);
- }
-
- @Override
- public final void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) {
- onEvent(blockManagerRemoved);
- }
-
- @Override
- public final void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) {
- onEvent(unpersistRDD);
- }
-
- @Override
- public final void onApplicationStart(SparkListenerApplicationStart applicationStart) {
- onEvent(applicationStart);
- }
-
- @Override
- public final void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
- onEvent(applicationEnd);
- }
-
- @Override
- public final void onExecutorMetricsUpdate(
- SparkListenerExecutorMetricsUpdate executorMetricsUpdate) {
- onEvent(executorMetricsUpdate);
- }
-
- @Override
- public final void onExecutorAdded(SparkListenerExecutorAdded executorAdded) {
- onEvent(executorAdded);
- }
-
- @Override
- public final void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) {
- onEvent(executorRemoved);
- }
-
- @Override
- public final void onExecutorBlacklisted(SparkListenerExecutorBlacklisted executorBlacklisted) {
- onEvent(executorBlacklisted);
- }
-
- @Override
- public final void onExecutorUnblacklisted(SparkListenerExecutorUnblacklisted executorUnblacklisted) {
- onEvent(executorUnblacklisted);
- }
-
- @Override
- public final void onNodeBlacklisted(SparkListenerNodeBlacklisted nodeBlacklisted) {
- onEvent(nodeBlacklisted);
- }
-
- @Override
- public final void onNodeUnblacklisted(SparkListenerNodeUnblacklisted nodeUnblacklisted) {
- onEvent(nodeUnblacklisted);
- }
-
- @Override
- public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) {
- onEvent(blockUpdated);
- }
-
- @Override
- public void onOtherEvent(SparkListenerEvent event) {
- onEvent(event);
- }
+ public void onEvent(SparkListenerEvent event) { }
+
+ @Override
+ public final void onStageCompleted(SparkListenerStageCompleted stageCompleted) {
+ onEvent(stageCompleted);
+ }
+
+ @Override
+ public final void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) {
+ onEvent(stageSubmitted);
+ }
+
+ @Override
+ public final void onTaskStart(SparkListenerTaskStart taskStart) {
+ onEvent(taskStart);
+ }
+
+ @Override
+ public final void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) {
+ onEvent(taskGettingResult);
+ }
+
+ @Override
+ public final void onTaskEnd(SparkListenerTaskEnd taskEnd) {
+ onEvent(taskEnd);
+ }
+
+ @Override
+ public final void onJobStart(SparkListenerJobStart jobStart) {
+ onEvent(jobStart);
+ }
+
+ @Override
+ public final void onJobEnd(SparkListenerJobEnd jobEnd) {
+ onEvent(jobEnd);
+ }
+
+ @Override
+ public final void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) {
+ onEvent(environmentUpdate);
+ }
+
+ @Override
+ public final void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) {
+ onEvent(blockManagerAdded);
+ }
+
+ @Override
+ public final void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) {
+ onEvent(blockManagerRemoved);
+ }
+
+ @Override
+ public final void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) {
+ onEvent(unpersistRDD);
+ }
+
+ @Override
+ public final void onApplicationStart(SparkListenerApplicationStart applicationStart) {
+ onEvent(applicationStart);
+ }
+
+ @Override
+ public final void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) {
+ onEvent(applicationEnd);
+ }
+
+ @Override
+ public final void onExecutorMetricsUpdate(
+ SparkListenerExecutorMetricsUpdate executorMetricsUpdate) {
+ onEvent(executorMetricsUpdate);
+ }
+
+ @Override
+ public final void onExecutorAdded(SparkListenerExecutorAdded executorAdded) {
+ onEvent(executorAdded);
+ }
+
+ @Override
+ public final void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) {
+ onEvent(executorRemoved);
+ }
+
+ @Override
+ public final void onExecutorBlacklisted(SparkListenerExecutorBlacklisted executorBlacklisted) {
+ onEvent(executorBlacklisted);
+ }
+
+ @Override
+ public final void onExecutorUnblacklisted(
+ SparkListenerExecutorUnblacklisted executorUnblacklisted) {
+ onEvent(executorUnblacklisted);
+ }
+
+ @Override
+ public final void onNodeBlacklisted(SparkListenerNodeBlacklisted nodeBlacklisted) {
+ onEvent(nodeBlacklisted);
+ }
+
+ @Override
+ public final void onNodeUnblacklisted(SparkListenerNodeUnblacklisted nodeUnblacklisted) {
+ onEvent(nodeUnblacklisted);
+ }
+
+ @Override
+ public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) {
+ onEvent(blockUpdated);
+ }
+
+ @Override
+ public void onOtherEvent(SparkListenerEvent event) {
+ onEvent(event);
+ }
}
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 29aca04a3d..f312fa2b2d 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -161,7 +161,9 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
// Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at
// the end of the task. This is necessary to avoid memory leaks in when the downstream operator
// does not fully consume the sorter's output (e.g. sort followed by limit).
- taskContext.addTaskCompletionListener(context -> { cleanupResources(); });
+ taskContext.addTaskCompletionListener(context -> {
+ cleanupResources();
+ });
}
/**
diff --git a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
index 512149127d..01b5fb7b46 100644
--- a/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/test/org/apache/spark/JavaAPISuite.java
@@ -358,7 +358,7 @@ public class JavaAPISuite implements Serializable {
// Regression test for SPARK-4459
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
Function<Tuple2<Integer, Integer>, Boolean> areOdd =
- x -> (x._1() % 2 == 0) && (x._2() % 2 == 0);
+ x -> (x._1() % 2 == 0) && (x._2() % 2 == 0);
JavaPairRDD<Integer, Integer> pairRDD = rdd.zip(rdd);
JavaPairRDD<Boolean, Iterable<Tuple2<Integer, Integer>>> oddsAndEvens = pairRDD.groupBy(areOdd);
assertEquals(2, oddsAndEvens.count());
@@ -528,14 +528,14 @@ public class JavaAPISuite implements Serializable {
new Tuple2<>(5, 3)), 2);
Map<Integer, HashSet<Integer>> sets = pairs.aggregateByKey(new HashSet<Integer>(),
- (a, b) -> {
- a.add(b);
- return a;
- },
- (a, b) -> {
- a.addAll(b);
- return a;
- }).collectAsMap();
+ (a, b) -> {
+ a.add(b);
+ return a;
+ },
+ (a, b) -> {
+ a.addAll(b);
+ return a;
+ }).collectAsMap();
assertEquals(3, sets.size());
assertEquals(new HashSet<>(Arrays.asList(1)), sets.get(1));
assertEquals(new HashSet<>(Arrays.asList(2)), sets.get(3));
@@ -666,8 +666,8 @@ public class JavaAPISuite implements Serializable {
assertArrayEquals(expected_counts, histogram);
// SPARK-5744
assertArrayEquals(
- new long[] {0},
- sc.parallelizeDoubles(new ArrayList<>(0), 1).histogram(new double[]{0.0, 1.0}));
+ new long[] {0},
+ sc.parallelizeDoubles(new ArrayList<>(0), 1).histogram(new double[]{0.0, 1.0}));
}
private static class DoubleComparator implements Comparator<Double>, Serializable {
@@ -807,7 +807,7 @@ public class JavaAPISuite implements Serializable {
// Regression test for SPARK-668:
JavaPairRDD<String, Integer> swapped = pairRDD.flatMapToPair(
- item -> Collections.singletonList(item.swap()).iterator());
+ item -> Collections.singletonList(item.swap()).iterator());
swapped.collect();
// There was never a bug here, but it's worth testing:
@@ -845,11 +845,13 @@ public class JavaAPISuite implements Serializable {
public void getNumPartitions(){
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 3);
JavaDoubleRDD rdd2 = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0), 2);
- JavaPairRDD<String, Integer> rdd3 = sc.parallelizePairs(Arrays.asList(
- new Tuple2<>("a", 1),
- new Tuple2<>("aa", 2),
- new Tuple2<>("aaa", 3)
- ), 2);
+ JavaPairRDD<String, Integer> rdd3 = sc.parallelizePairs(
+ Arrays.asList(
+ new Tuple2<>("a", 1),
+ new Tuple2<>("aa", 2),
+ new Tuple2<>("aaa", 3)
+ ),
+ 2);
assertEquals(3, rdd1.getNumPartitions());
assertEquals(2, rdd2.getNumPartitions());
assertEquals(2, rdd3.getNumPartitions());
@@ -977,7 +979,7 @@ public class JavaAPISuite implements Serializable {
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
- .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
+ .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
// Try reading the output back as an object file
JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class,
@@ -1068,11 +1070,11 @@ public class JavaAPISuite implements Serializable {
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
- .saveAsNewAPIHadoopFile(outputDir, IntWritable.class, Text.class,
+ .saveAsNewAPIHadoopFile(outputDir, IntWritable.class, Text.class,
org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat.class);
JavaPairRDD<IntWritable, Text> output =
- sc.sequenceFile(outputDir, IntWritable.class, Text.class);
+ sc.sequenceFile(outputDir, IntWritable.class, Text.class);
assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString());
}
@@ -1088,11 +1090,11 @@ public class JavaAPISuite implements Serializable {
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
- .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
+ .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
JavaPairRDD<IntWritable, Text> output = sc.newAPIHadoopFile(outputDir,
- org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class,
- IntWritable.class, Text.class, Job.getInstance().getConfiguration());
+ org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat.class,
+ IntWritable.class, Text.class, Job.getInstance().getConfiguration());
assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString());
}
@@ -1135,10 +1137,10 @@ public class JavaAPISuite implements Serializable {
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
- .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
+ .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir,
- SequenceFileInputFormat.class, IntWritable.class, Text.class);
+ SequenceFileInputFormat.class, IntWritable.class, Text.class);
assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString());
}
@@ -1154,10 +1156,11 @@ public class JavaAPISuite implements Serializable {
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
rdd.mapToPair(pair -> new Tuple2<>(new IntWritable(pair._1()), new Text(pair._2())))
- .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class, DefaultCodec.class);
+ .saveAsHadoopFile(outputDir, IntWritable.class, Text.class,
+ SequenceFileOutputFormat.class, DefaultCodec.class);
JavaPairRDD<IntWritable, Text> output = sc.hadoopFile(outputDir,
- SequenceFileInputFormat.class, IntWritable.class, Text.class);
+ SequenceFileInputFormat.class, IntWritable.class, Text.class);
assertEquals(pairs.toString(), output.map(Tuple2::toString).collect().toString());
}
@@ -1263,23 +1266,23 @@ public class JavaAPISuite implements Serializable {
Function2<Integer, Integer, Integer> mergeValueFunction = (v1, v2) -> v1 + v2;
JavaPairRDD<Integer, Integer> combinedRDD = originalRDD.keyBy(keyFunction)
- .combineByKey(createCombinerFunction, mergeValueFunction, mergeValueFunction);
+ .combineByKey(createCombinerFunction, mergeValueFunction, mergeValueFunction);
Map<Integer, Integer> results = combinedRDD.collectAsMap();
ImmutableMap<Integer, Integer> expected = ImmutableMap.of(0, 9, 1, 5, 2, 7);
assertEquals(expected, results);
Partitioner defaultPartitioner = Partitioner.defaultPartitioner(
- combinedRDD.rdd(),
- JavaConverters.collectionAsScalaIterableConverter(
- Collections.<RDD<?>>emptyList()).asScala().toSeq());
+ combinedRDD.rdd(),
+ JavaConverters.collectionAsScalaIterableConverter(
+ Collections.<RDD<?>>emptyList()).asScala().toSeq());
combinedRDD = originalRDD.keyBy(keyFunction)
- .combineByKey(
- createCombinerFunction,
- mergeValueFunction,
- mergeValueFunction,
- defaultPartitioner,
- false,
- new KryoSerializer(new SparkConf()));
+ .combineByKey(
+ createCombinerFunction,
+ mergeValueFunction,
+ mergeValueFunction,
+ defaultPartitioner,
+ false,
+ new KryoSerializer(new SparkConf()));
results = combinedRDD.collectAsMap();
assertEquals(expected, results);
}
@@ -1291,11 +1294,10 @@ public class JavaAPISuite implements Serializable {
JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(i -> new Tuple2<>(i, i % 2));
JavaPairRDD<Integer, Integer> rdd3 = rdd2.mapToPair(in -> new Tuple2<>(in._2(), in._1()));
assertEquals(Arrays.asList(
- new Tuple2<>(1, 1),
- new Tuple2<>(0, 2),
- new Tuple2<>(1, 3),
- new Tuple2<>(0, 4)), rdd3.collect());
-
+ new Tuple2<>(1, 1),
+ new Tuple2<>(0, 2),
+ new Tuple2<>(1, 3),
+ new Tuple2<>(0, 4)), rdd3.collect());
}
@SuppressWarnings("unchecked")
@@ -1312,16 +1314,18 @@ public class JavaAPISuite implements Serializable {
assertEquals(Arrays.asList(3, 4), parts[0]);
assertEquals(Arrays.asList(5, 6, 7), parts[1]);
- assertEquals(Arrays.asList(new Tuple2<>(1, 1),
- new Tuple2<>(2, 0)),
- rdd2.collectPartitions(new int[] {0})[0]);
+ assertEquals(
+ Arrays.asList(new Tuple2<>(1, 1), new Tuple2<>(2, 0)),
+ rdd2.collectPartitions(new int[] {0})[0]);
List<Tuple2<Integer,Integer>>[] parts2 = rdd2.collectPartitions(new int[] {1, 2});
assertEquals(Arrays.asList(new Tuple2<>(3, 1), new Tuple2<>(4, 0)), parts2[0]);
- assertEquals(Arrays.asList(new Tuple2<>(5, 1),
- new Tuple2<>(6, 0),
- new Tuple2<>(7, 1)),
- parts2[1]);
+ assertEquals(
+ Arrays.asList(
+ new Tuple2<>(5, 1),
+ new Tuple2<>(6, 0),
+ new Tuple2<>(7, 1)),
+ parts2[1]);
}
@Test
@@ -1352,7 +1356,6 @@ public class JavaAPISuite implements Serializable {
double error = Math.abs((resCount - count) / count);
assertTrue(error < 0.1);
}
-
}
@Test
@@ -1531,8 +1534,8 @@ public class JavaAPISuite implements Serializable {
SparkConf conf = new SparkConf();
conf.registerKryoClasses(new Class<?>[]{ Class1.class, Class2.class });
assertEquals(
- Class1.class.getName() + "," + Class2.class.getName(),
- conf.get("spark.kryo.classesToRegister"));
+ Class1.class.getName() + "," + Class2.class.getName(),
+ conf.get("spark.kryo.classesToRegister"));
}
@Test
diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java
index 3f809eba7f..a0979aa2d2 100644
--- a/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java
@@ -27,7 +27,6 @@ import scala.collection.mutable.WrappedArray;
import org.apache.spark.ml.feature.RegexTokenizer;
import org.apache.spark.ml.feature.Tokenizer;
-import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
@@ -69,7 +68,8 @@ public class JavaTokenizerExample {
.setOutputCol("words")
.setPattern("\\W"); // alternatively .setPattern("\\w+").setGaps(false);
- spark.udf().register("countTokens", (WrappedArray<?> words) -> words.size(), DataTypes.IntegerType);
+ spark.udf().register(
+ "countTokens", (WrappedArray<?> words) -> words.size(), DataTypes.IntegerType);
Dataset<Row> tokenized = tokenizer.transform(sentenceDataFrame);
tokenized.select("sentence", "words")
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java
index bd49f059b2..dc9970d885 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaRankingMetricsExample.java
@@ -118,7 +118,9 @@ public class JavaRankingMetricsExample {
new Tuple2<>(new Tuple2<>(r.user(), r.product()), r.rating())));
JavaRDD<Tuple2<Object, Object>> ratesAndPreds =
JavaPairRDD.fromJavaRDD(ratings.map(r ->
- new Tuple2<Tuple2<Integer, Integer>, Object>(new Tuple2<>(r.user(), r.product()), r.rating())
+ new Tuple2<Tuple2<Integer, Integer>, Object>(
+ new Tuple2<>(r.user(), r.product()),
+ r.rating())
)).join(predictions).values();
// Create regression metrics object
diff --git a/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java b/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
index d1274a687f..626bde48e1 100644
--- a/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
+++ b/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
@@ -23,7 +23,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.regex.Pattern;
-import com.amazonaws.regions.RegionUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
diff --git a/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java b/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java
index 26b1fda2ff..b37b087467 100644
--- a/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java
+++ b/external/kinesis-asl/src/test/java/org/apache/spark/streaming/kinesis/JavaKinesisStreamSuite.java
@@ -17,7 +17,6 @@
package org.apache.spark.streaming.kinesis;
-import com.amazonaws.regions.RegionUtils;
import com.amazonaws.services.kinesis.model.Record;
import org.junit.Test;
@@ -91,7 +90,8 @@ public class JavaKinesisStreamSuite extends LocalJavaStreamingContext {
JavaDStream<String> kinesisStream = KinesisUtils.createStream(ssc, "testApp", "mySparkStream",
"https://kinesis.us-west-2.amazonaws.com", "us-west-2", InitialPositionInStream.LATEST,
new Duration(2000), StorageLevel.MEMORY_AND_DISK_2(), handler, String.class,
- "fakeAccessKey", "fakeSecretKey", "fakeSTSRoleArn", "fakeSTSSessionName", "fakeSTSExternalId");
+ "fakeAccessKey", "fakeSecretKey", "fakeSTSRoleArn", "fakeSTSSessionName",
+ "fakeSTSExternalId");
ssc.stop();
}
diff --git a/mllib/src/test/java/org/apache/spark/mllib/tree/JavaDecisionTreeSuite.java b/mllib/src/test/java/org/apache/spark/mllib/tree/JavaDecisionTreeSuite.java
index 0f71deb9ea..d2fe6bb2ca 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/tree/JavaDecisionTreeSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/tree/JavaDecisionTreeSuite.java
@@ -33,7 +33,8 @@ import org.apache.spark.mllib.tree.model.DecisionTreeModel;
public class JavaDecisionTreeSuite extends SharedSparkSession {
- private static int validatePrediction(List<LabeledPoint> validationData, DecisionTreeModel model) {
+ private static int validatePrediction(
+ List<LabeledPoint> validationData, DecisionTreeModel model) {
int numCorrect = 0;
for (LabeledPoint point : validationData) {
Double prediction = model.predict(point.features());
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java
index afea467689..791e8d80e6 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/UnsafeArrayWriter.java
@@ -117,7 +117,7 @@ public class UnsafeArrayWriter {
public void setNullInt(int ordinal) {
setNullBit(ordinal);
// put zero into the corresponding field when set null
- Platform.putInt(holder.buffer, getElementOffset(ordinal, 4), (int)0);
+ Platform.putInt(holder.buffer, getElementOffset(ordinal, 4), 0);
}
public void setNullLong(int ordinal) {
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuite.java
index d3769a74b9..539976d5af 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetAggregatorSuite.java
@@ -88,7 +88,7 @@ public class JavaDatasetAggregatorSuite extends JavaDatasetAggregatorSuiteBase {
@Test
public void testTypedAggregationAverage() {
KeyValueGroupedDataset<String, Tuple2<String, Integer>> grouped = generateGroupedDataset();
- Dataset<Tuple2<String, Double>> agged = grouped.agg(typed.avg(value -> (double)(value._2() * 2)));
+ Dataset<Tuple2<String, Double>> agged = grouped.agg(typed.avg(value -> value._2() * 2.0));
Assert.assertEquals(
Arrays.asList(new Tuple2<>("a", 3.0), new Tuple2<>("b", 6.0)),
agged.collectAsList());
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
index 4581c6ebe9..e3b0e37cca 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java
@@ -110,7 +110,8 @@ public class JavaDatasetSuite implements Serializable {
Assert.assertEquals(Arrays.asList("hello"), filtered.collectAsList());
- Dataset<Integer> mapped = ds.map((MapFunction<String, Integer>) v -> v.length(), Encoders.INT());
+ Dataset<Integer> mapped =
+ ds.map((MapFunction<String, Integer>) String::length, Encoders.INT());
Assert.assertEquals(Arrays.asList(5, 5), mapped.collectAsList());
Dataset<String> parMapped = ds.mapPartitions((MapPartitionsFunction<String, String>) it -> {
@@ -157,17 +158,17 @@ public class JavaDatasetSuite implements Serializable {
public void testGroupBy() {
List<String> data = Arrays.asList("a", "foo", "bar");
Dataset<String> ds = spark.createDataset(data, Encoders.STRING());
- KeyValueGroupedDataset<Integer, String> grouped = ds.groupByKey(
- (MapFunction<String, Integer>) v -> v.length(),
- Encoders.INT());
+ KeyValueGroupedDataset<Integer, String> grouped =
+ ds.groupByKey((MapFunction<String, Integer>) String::length, Encoders.INT());
- Dataset<String> mapped = grouped.mapGroups((MapGroupsFunction<Integer, String, String>) (key, values) -> {
- StringBuilder sb = new StringBuilder(key.toString());
- while (values.hasNext()) {
- sb.append(values.next());
- }
- return sb.toString();
- }, Encoders.STRING());
+ Dataset<String> mapped = grouped.mapGroups(
+ (MapGroupsFunction<Integer, String, String>) (key, values) -> {
+ StringBuilder sb = new StringBuilder(key.toString());
+ while (values.hasNext()) {
+ sb.append(values.next());
+ }
+ return sb.toString();
+ }, Encoders.STRING());
Assert.assertEquals(asSet("1a", "3foobar"), toSet(mapped.collectAsList()));
@@ -209,7 +210,8 @@ public class JavaDatasetSuite implements Serializable {
Assert.assertEquals(asSet("1a", "3foobar"), toSet(flatMapped2.collectAsList()));
- Dataset<Tuple2<Integer, String>> reduced = grouped.reduceGroups((ReduceFunction<String>) (v1, v2) -> v1 + v2);
+ Dataset<Tuple2<Integer, String>> reduced =
+ grouped.reduceGroups((ReduceFunction<String>) (v1, v2) -> v1 + v2);
Assert.assertEquals(
asSet(tuple2(1, "a"), tuple2(3, "foobar")),
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFRawList.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFRawList.java
index 6adb1657bf..8211cbf16f 100644
--- a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFRawList.java
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFRawList.java
@@ -25,6 +25,7 @@ import java.util.List;
* UDF that returns a raw (non-parameterized) java List.
*/
public class UDFRawList extends UDF {
+ @SuppressWarnings("rawtypes")
public List evaluate(Object o) {
return Collections.singletonList("data1");
}
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFRawMap.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFRawMap.java
index 4731b6eee8..58c81f9945 100644
--- a/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFRawMap.java
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/execution/UDFRawMap.java
@@ -25,6 +25,7 @@ import java.util.Map;
* UDF that returns a raw (non-parameterized) java Map.
*/
public class UDFRawMap extends UDF {
+ @SuppressWarnings("rawtypes")
public Map evaluate(Object o) {
return Collections.singletonMap("a", "1");
}
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
index cb8ed83e5a..b1367b8f2a 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaMapWithStateSuite.java
@@ -145,8 +145,8 @@ public class JavaMapWithStateSuite extends LocalJavaStreamingContext implements
List<Set<Tuple2<K, S>>> expectedStateSnapshots) {
int numBatches = expectedOutputs.size();
JavaDStream<K> inputStream = JavaTestUtils.attachTestInputStream(ssc, input, 2);
- JavaMapWithStateDStream<K, Integer, S, T> mapWithStateDStream =
- JavaPairDStream.fromJavaDStream(inputStream.map(x -> new Tuple2<>(x, 1))).mapWithState(mapWithStateSpec);
+ JavaMapWithStateDStream<K, Integer, S, T> mapWithStateDStream = JavaPairDStream.fromJavaDStream(
+ inputStream.map(x -> new Tuple2<>(x, 1))).mapWithState(mapWithStateSpec);
List<Set<T>> collectedOutputs =
Collections.synchronizedList(new ArrayList<Set<T>>());
diff --git a/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java
index 9948a4074c..80513de4ee 100644
--- a/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java
+++ b/streaming/src/test/java/test/org/apache/spark/streaming/Java8APISuite.java
@@ -20,10 +20,13 @@ package test.org.apache.spark.streaming;
import java.io.Serializable;
import java.util.*;
+import org.apache.spark.api.java.function.Function3;
+import org.apache.spark.api.java.function.Function4;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.JavaTestUtils;
import org.apache.spark.streaming.LocalJavaStreamingContext;
+import org.apache.spark.streaming.State;
import org.apache.spark.streaming.StateSpec;
import org.apache.spark.streaming.Time;
import scala.Tuple2;
@@ -142,8 +145,8 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
Arrays.asList(24));
JavaDStream<Integer> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<Integer> reducedWindowed = stream.reduceByWindow((x, y) -> x + y,
- (x, y) -> x - y, new Duration(2000), new Duration(1000));
+ JavaDStream<Integer> reducedWindowed = stream.reduceByWindow(
+ (x, y) -> x + y, (x, y) -> x - y, new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(reducedWindowed);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
@@ -850,36 +853,44 @@ public class Java8APISuite extends LocalJavaStreamingContext implements Serializ
JavaPairRDD<String, Boolean> initialRDD = null;
JavaPairDStream<String, Integer> wordsDstream = null;
+ Function4<Time, String, Optional<Integer>, State<Boolean>, Optional<Double>> mapFn =
+ (time, key, value, state) -> {
+ // Use all State's methods here
+ state.exists();
+ state.get();
+ state.isTimingOut();
+ state.remove();
+ state.update(true);
+ return Optional.of(2.0);
+ };
+
JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream =
- wordsDstream.mapWithState(
- StateSpec.<String, Integer, Boolean, Double>function((time, key, value, state) -> {
- // Use all State's methods here
- state.exists();
- state.get();
- state.isTimingOut();
- state.remove();
- state.update(true);
- return Optional.of(2.0);
- }).initialState(initialRDD)
- .numPartitions(10)
- .partitioner(new HashPartitioner(10))
- .timeout(Durations.seconds(10)));
+ wordsDstream.mapWithState(
+ StateSpec.function(mapFn)
+ .initialState(initialRDD)
+ .numPartitions(10)
+ .partitioner(new HashPartitioner(10))
+ .timeout(Durations.seconds(10)));
JavaPairDStream<String, Boolean> emittedRecords = stateDstream.stateSnapshots();
+ Function3<String, Optional<Integer>, State<Boolean>, Double> mapFn2 =
+ (key, value, state) -> {
+ state.exists();
+ state.get();
+ state.isTimingOut();
+ state.remove();
+ state.update(true);
+ return 2.0;
+ };
+
JavaMapWithStateDStream<String, Integer, Boolean, Double> stateDstream2 =
- wordsDstream.mapWithState(
- StateSpec.<String, Integer, Boolean, Double>function((key, value, state) -> {
- state.exists();
- state.get();
- state.isTimingOut();
- state.remove();
- state.update(true);
- return 2.0;
- }).initialState(initialRDD)
- .numPartitions(10)
- .partitioner(new HashPartitioner(10))
- .timeout(Durations.seconds(10)));
+ wordsDstream.mapWithState(
+ StateSpec.function(mapFn2)
+ .initialState(initialRDD)
+ .numPartitions(10)
+ .partitioner(new HashPartitioner(10))
+ .timeout(Durations.seconds(10)));
JavaPairDStream<String, Boolean> mappedDStream = stateDstream2.stateSnapshots();
}
diff --git a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
index b966cbdca0..96f8d9593d 100644
--- a/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/test/org/apache/spark/streaming/JavaAPISuite.java
@@ -29,7 +29,6 @@ import org.apache.spark.streaming.LocalJavaStreamingContext;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.StreamingContextState;
import org.apache.spark.streaming.StreamingContextSuite;
-import org.apache.spark.streaming.Time;
import scala.Tuple2;
import org.apache.hadoop.conf.Configuration;
@@ -608,7 +607,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
Arrays.asList("a","t","h","l","e","t","i","c","s"));
JavaDStream<String> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
- JavaDStream<String> flatMapped = stream.flatMap(x -> Arrays.asList(x.split("(?!^)")).iterator());
+ JavaDStream<String> flatMapped =
+ stream.flatMap(x -> Arrays.asList(x.split("(?!^)")).iterator());
JavaTestUtils.attachTestOutputStream(flatMapped);
List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -1314,7 +1314,8 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
ssc, inputData, 1);
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<String, String> mapped = pairStream.mapValues(s -> s.toUpperCase(Locale.ENGLISH));
+ JavaPairDStream<String, String> mapped =
+ pairStream.mapValues(s -> s.toUpperCase(Locale.ENGLISH));
JavaTestUtils.attachTestOutputStream(mapped);
List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);