From 1a340da8d7590d831b040c74f5a6eb560e14d585 Mon Sep 17 00:00:00 2001 From: Luciano Resende Date: Sun, 21 Feb 2016 16:27:56 +0000 Subject: [SPARK-13248][STREAMING] Remove deprecated Streaming APIs. Remove deprecated Streaming APIs and adjust sample applications. Author: Luciano Resende Closes #11139 from lresende/streaming-deprecated-apis. --- .../streaming/JavaRecoverableNetworkWordCount.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) (limited to 'examples') diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java index bc8cbcdef7..f9929fc86d 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java @@ -26,17 +26,14 @@ import java.util.List; import java.util.regex.Pattern; import scala.Tuple2; + import com.google.common.io.Files; import org.apache.spark.Accumulator; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; -import org.apache.spark.api.java.function.FlatMapFunction; -import org.apache.spark.api.java.function.Function; -import org.apache.spark.api.java.function.Function2; -import org.apache.spark.api.java.function.PairFunction; -import org.apache.spark.api.java.function.VoidFunction2; +import org.apache.spark.api.java.function.*; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.Time; @@ -44,7 +41,6 @@ import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; -import org.apache.spark.streaming.api.java.JavaStreamingContextFactory; /** * Use this singleton to get or register a Broadcast variable. @@ -204,13 +200,17 @@ public final class JavaRecoverableNetworkWordCount { final int port = Integer.parseInt(args[1]); final String checkpointDirectory = args[2]; final String outputPath = args[3]; - JavaStreamingContextFactory factory = new JavaStreamingContextFactory() { + + // Function to create JavaStreamingContext without any output operations + // (used to detect the new context) + Function0 createContextFunc = new Function0() { @Override - public JavaStreamingContext create() { + public JavaStreamingContext call() { return createContext(ip, port, checkpointDirectory, outputPath); } }; - JavaStreamingContext ssc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory); + + JavaStreamingContext ssc = JavaStreamingContext.getOrCreate(checkpointDirectory, createContextFunc); ssc.start(); ssc.awaitTermination(); } -- cgit v1.2.3