aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java6
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java6
-rw-r--r--external/flume/src/main/scala/org/apache/spark/streaming/api/java/flume/FlumeFunctions.scala (renamed from external/flume/src/main/scala/org/apache/spark/streaming/flume/JavaStreamingContextWithFlume.scala)10
-rw-r--r--external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java (renamed from external/flume/src/test/java/JavaFlumeStreamSuite.java)13
-rw-r--r--external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala (renamed from external/kafka/src/main/scala/org/apache/spark/streaming/kafka/JavaStreamingContextWithKafka.scala)12
-rw-r--r--external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java15
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala (renamed from external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/JavaStreamingContextWithMQTT.scala)10
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala (renamed from external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTFunctions.scala)0
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala (renamed from external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTInputDStream.scala)0
-rw-r--r--external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala (renamed from external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/package.scala)0
-rw-r--r--external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java10
-rw-r--r--external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala (renamed from external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala)18
-rw-r--r--external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java19
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala (renamed from external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala)12
-rw-r--r--external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java12
15 files changed, 67 insertions, 76 deletions
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
index e53c4f9e83..64832a9721 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
@@ -20,7 +20,7 @@ package org.apache.spark.streaming.examples;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
-import org.apache.spark.streaming.flume.JavaStreamingContextWithFlume;
+import org.apache.spark.streaming.api.java.flume.FlumeFunctions;
import org.apache.spark.streaming.flume.SparkFlumeEvent;
/**
@@ -52,8 +52,8 @@ public class JavaFlumeEventCount {
JavaStreamingContext ssc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval,
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
- JavaStreamingContextWithFlume sscWithFlume = new JavaStreamingContextWithFlume(ssc);
- JavaDStream<SparkFlumeEvent> flumeStream = sscWithFlume.flumeStream("localhost", port);
+ FlumeFunctions flumeFunc = new FlumeFunctions(ssc);
+ JavaDStream<SparkFlumeEvent> flumeStream = flumeFunc.flumeStream("localhost", port);
flumeStream.count();
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
index de0420ca83..207ce8cd4f 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
@@ -29,7 +29,7 @@ import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import org.apache.spark.streaming.kafka.JavaStreamingContextWithKafka;
+import org.apache.spark.streaming.api.java.kafka.KafkaFunctions;
import scala.Tuple2;
/**
@@ -64,8 +64,8 @@ public class JavaKafkaWordCount {
topicMap.put(topic, numThreads);
}
- JavaStreamingContextWithKafka sscWithKafka = new JavaStreamingContextWithKafka(ssc);
- JavaPairDStream<String, String> messages = sscWithKafka.kafkaStream(args[1], args[2], topicMap);
+ KafkaFunctions kafkaFunc = new KafkaFunctions(ssc);
+ JavaPairDStream<String, String> messages = kafkaFunc.kafkaStream(args[1], args[2], topicMap);
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/JavaStreamingContextWithFlume.scala b/external/flume/src/main/scala/org/apache/spark/streaming/api/java/flume/FlumeFunctions.scala
index 4e66ae3535..3347d19796 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/JavaStreamingContextWithFlume.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/api/java/flume/FlumeFunctions.scala
@@ -15,24 +15,24 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.flume
+package org.apache.spark.streaming.api.java.flume
import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
+import org.apache.spark.streaming.flume._
import org.apache.spark.storage.StorageLevel
/**
* Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
* functions for creating Flume input streams.
*/
-class JavaStreamingContextWithFlume(javaStreamingContext: JavaStreamingContext)
- extends JavaStreamingContext(javaStreamingContext.ssc) {
+class FlumeFunctions(javaStreamingContext: JavaStreamingContext) {
/**
* Creates a input stream from a Flume source.
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent
*/
def flumeStream(hostname: String, port: Int): JavaDStream[SparkFlumeEvent] = {
- ssc.flumeStream(hostname, port)
+ javaStreamingContext.ssc.flumeStream(hostname, port)
}
/**
@@ -43,6 +43,6 @@ class JavaStreamingContextWithFlume(javaStreamingContext: JavaStreamingContext)
*/
def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel):
JavaDStream[SparkFlumeEvent] = {
- ssc.flumeStream(hostname, port, storageLevel)
+ javaStreamingContext.ssc.flumeStream(hostname, port, storageLevel)
}
}
diff --git a/external/flume/src/test/java/JavaFlumeStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
index deffc78c4c..5930fee925 100644
--- a/external/flume/src/test/java/JavaFlumeStreamSuite.java
+++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
@@ -1,4 +1,4 @@
-/*
+package org.apache.spark.streaming.flume;/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
@@ -18,21 +18,18 @@
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.LocalJavaStreamingContext;
import org.apache.spark.streaming.api.java.JavaDStream;
-import org.apache.spark.streaming.flume.JavaStreamingContextWithFlume;
+import org.apache.spark.streaming.api.java.flume.FlumeFunctions;
import org.apache.spark.streaming.flume.SparkFlumeEvent;
import org.junit.Test;
public class JavaFlumeStreamSuite extends LocalJavaStreamingContext {
@Test
public void testFlumeStream() {
- JavaStreamingContextWithFlume sscWithFlume = new JavaStreamingContextWithFlume(ssc);
+ FlumeFunctions flumeFunc = new FlumeFunctions(ssc);
// tests the API, does not actually test data receiving
- JavaDStream<SparkFlumeEvent> test1 = sscWithFlume.flumeStream("localhost", 12345);
- JavaDStream<SparkFlumeEvent> test2 = sscWithFlume.flumeStream("localhost", 12345,
+ JavaDStream<SparkFlumeEvent> test1 = flumeFunc.flumeStream("localhost", 12345);
+ JavaDStream<SparkFlumeEvent> test2 = flumeFunc.flumeStream("localhost", 12345,
StorageLevel.MEMORY_AND_DISK_SER_2());
-
- // To verify that JavaStreamingContextWithKafka is also StreamingContext
- JavaDStream<String> socketStream = sscWithFlume.socketTextStream("localhost", 9999);
}
}
diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/JavaStreamingContextWithKafka.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala
index ab0e8a6c8d..491331bb37 100644
--- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/JavaStreamingContextWithKafka.scala
+++ b/external/kafka/src/main/scala/org/apache/spark/streaming/api/java/kafka/KafkaFunctions.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.kafka
+package org.apache.spark.streaming.api.java.kafka
import scala.reflect.ClassTag
import scala.collection.JavaConversions._
@@ -27,13 +27,13 @@ import kafka.serializer.Decoder
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaPairDStream}
+import org.apache.spark.streaming.kafka._
/**
* Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
* functions for creating Kafka input streams.
*/
-class JavaStreamingContextWithKafka(javaStreamingContext: JavaStreamingContext)
- extends JavaStreamingContext(javaStreamingContext.ssc) {
+class KafkaFunctions(javaStreamingContext: JavaStreamingContext) {
/**
* Create an input stream that pulls messages form a Kafka Broker.
@@ -49,7 +49,7 @@ class JavaStreamingContextWithKafka(javaStreamingContext: JavaStreamingContext)
): JavaPairDStream[String, String] = {
implicit val cmt: ClassTag[String] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
- ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
+ javaStreamingContext.ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
}
/**
@@ -69,7 +69,7 @@ class JavaStreamingContextWithKafka(javaStreamingContext: JavaStreamingContext)
): JavaPairDStream[String, String] = {
implicit val cmt: ClassTag[String] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
- ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
+ javaStreamingContext.ssc.kafkaStream(zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
}
/**
@@ -101,7 +101,7 @@ class JavaStreamingContextWithKafka(javaStreamingContext: JavaStreamingContext)
implicit val keyCmd: Manifest[U] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[U]]
implicit val valueCmd: Manifest[T] = implicitly[Manifest[AnyRef]].asInstanceOf[Manifest[T]]
- ssc.kafkaStream[K, V, U, T](
+ javaStreamingContext.ssc.kafkaStream[K, V, U, T](
kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel)
}
}
diff --git a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
index 66236df662..fdea96e506 100644
--- a/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
+++ b/external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java
@@ -18,6 +18,8 @@
package org.apache.spark.streaming.kafka;
import java.util.HashMap;
+
+import org.apache.spark.streaming.api.java.kafka.KafkaFunctions;
import org.junit.Test;
import com.google.common.collect.Maps;
import kafka.serializer.StringDecoder;
@@ -31,21 +33,18 @@ public class JavaKafkaStreamSuite extends LocalJavaStreamingContext {
public void testKafkaStream() {
HashMap<String, Integer> topics = Maps.newHashMap();
- JavaStreamingContextWithKafka sscWithKafka = new JavaStreamingContextWithKafka(ssc);
+ KafkaFunctions kafkaFunc = new KafkaFunctions(ssc);
// tests the API, does not actually test data receiving
- JavaPairDStream<String, String> test1 = sscWithKafka.kafkaStream("localhost:12345", "group", topics);
- JavaPairDStream<String, String> test2 = sscWithKafka.kafkaStream("localhost:12345", "group", topics,
+ JavaPairDStream<String, String> test1 = kafkaFunc.kafkaStream("localhost:12345", "group", topics);
+ JavaPairDStream<String, String> test2 = kafkaFunc.kafkaStream("localhost:12345", "group", topics,
StorageLevel.MEMORY_AND_DISK_SER_2());
HashMap<String, String> kafkaParams = Maps.newHashMap();
- kafkaParams.put("zookeeper.connect","localhost:12345");
+ kafkaParams.put("zookeeper.connect", "localhost:12345");
kafkaParams.put("group.id","consumer-group");
- JavaPairDStream<String, String> test3 = sscWithKafka.kafkaStream(
+ JavaPairDStream<String, String> test3 = kafkaFunc.kafkaStream(
String.class, String.class, StringDecoder.class, StringDecoder.class,
kafkaParams, topics, StorageLevel.MEMORY_AND_DISK_SER_2());
-
- // To verify that JavaStreamingContextWithKafka is also StreamingContext
- JavaDStream<String> socketStream = sscWithKafka.socketTextStream("localhost", 9999);
}
}
diff --git a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/JavaStreamingContextWithMQTT.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala
index d814da0f0d..72124956fc 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/JavaStreamingContextWithMQTT.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/api/java/mqtt/MQTTFunctions.scala
@@ -15,19 +15,19 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.mqtt
+package org.apache.spark.streaming.api.java.mqtt
import scala.reflect.ClassTag
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
+import org.apache.spark.streaming.mqtt._
/**
* Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
* functions for creating MQTT input streams.
*/
-class JavaStreamingContextWithMQTT(javaStreamingContext: JavaStreamingContext)
- extends JavaStreamingContext(javaStreamingContext.ssc) {
+class MQTTFunctions(javaStreamingContext: JavaStreamingContext) {
/**
* Create an input stream that receives messages pushed by a MQTT publisher.
@@ -39,7 +39,7 @@ class JavaStreamingContextWithMQTT(javaStreamingContext: JavaStreamingContext)
topic: String
): JavaDStream[String] = {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
- ssc.mqttStream(brokerUrl, topic)
+ javaStreamingContext.ssc.mqttStream(brokerUrl, topic)
}
/**
@@ -54,6 +54,6 @@ class JavaStreamingContextWithMQTT(javaStreamingContext: JavaStreamingContext)
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): JavaDStream[String] = {
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[String]]
- ssc.mqttStream(brokerUrl, topic, storageLevel)
+ javaStreamingContext.ssc.mqttStream(brokerUrl, topic, storageLevel)
}
}
diff --git a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTFunctions.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala
index 86f4e9c724..86f4e9c724 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTFunctions.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTFunctions.scala
diff --git a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTInputDStream.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
index c8987a3ee0..c8987a3ee0 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/MQTTInputDStream.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/MQTTInputDStream.scala
diff --git a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/package.scala b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala
index 28a944f57e..28a944f57e 100644
--- a/external/mqtt/src/main/scala/org/apache/spark/spark/streaming/mqtt/package.scala
+++ b/external/mqtt/src/main/scala/org/apache/spark/streaming/mqtt/package.scala
diff --git a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
index c1f41640dc..3ddb4d084f 100644
--- a/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
+++ b/external/mqtt/src/test/java/org/apache/spark/streaming/mqtt/JavaMQTTStreamSuite.java
@@ -19,6 +19,7 @@ package org.apache.spark.streaming.mqtt;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.mqtt.MQTTFunctions;
import org.junit.Test;
import org.apache.spark.streaming.LocalJavaStreamingContext;
@@ -28,14 +29,11 @@ public class JavaMQTTStreamSuite extends LocalJavaStreamingContext {
public void testMQTTStream() {
String brokerUrl = "abc";
String topic = "def";
- JavaStreamingContextWithMQTT sscWithMQTT = new JavaStreamingContextWithMQTT(ssc);
+ MQTTFunctions mqttFunc = new MQTTFunctions(ssc);
// tests the API, does not actually test data receiving
- JavaDStream<String> test1 = sscWithMQTT.mqttStream(brokerUrl, topic);
- JavaDStream<String> test2 = sscWithMQTT.mqttStream(brokerUrl, topic,
+ JavaDStream<String> test1 = mqttFunc.mqttStream(brokerUrl, topic);
+ JavaDStream<String> test2 = mqttFunc.mqttStream(brokerUrl, topic,
StorageLevel.MEMORY_AND_DISK_SER_2());
-
- // To verify that JavaStreamingContextWithKafka is also StreamingContext
- JavaDStream<String> socketStream = sscWithMQTT.socketTextStream("localhost", 9999);
}
}
diff --git a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala b/external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala
index 0250364331..22e297a03a 100644
--- a/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/JavaStreamingContextWithTwitter.scala
+++ b/external/twitter/src/main/scala/org/apache/spark/streaming/api/java/twitter/TwitterFunctions.scala
@@ -15,20 +15,20 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.twitter
+package org.apache.spark.streaming.api.java.twitter
import twitter4j.Status
import twitter4j.auth.Authorization
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
+import org.apache.spark.streaming.twitter._
/**
* Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
* functions for creating Twitter input streams.
*/
-class JavaStreamingContextWithTwitter(javaStreamingContext: JavaStreamingContext)
- extends JavaStreamingContext(javaStreamingContext.ssc) {
+class TwitterFunctions(javaStreamingContext: JavaStreamingContext) {
/**
* Create a input stream that returns tweets received from Twitter using Twitter4J's default
@@ -37,7 +37,7 @@ class JavaStreamingContextWithTwitter(javaStreamingContext: JavaStreamingContext
* twitter4j.oauth.accessTokenSecret.
*/
def twitterStream(): JavaDStream[Status] = {
- ssc.twitterStream(None)
+ javaStreamingContext.ssc.twitterStream(None)
}
/**
@@ -48,7 +48,7 @@ class JavaStreamingContextWithTwitter(javaStreamingContext: JavaStreamingContext
* @param filters Set of filter strings to get only those tweets that match them
*/
def twitterStream(filters: Array[String]): JavaDStream[Status] = {
- ssc.twitterStream(None, filters)
+ javaStreamingContext.ssc.twitterStream(None, filters)
}
/**
@@ -60,7 +60,7 @@ class JavaStreamingContextWithTwitter(javaStreamingContext: JavaStreamingContext
* @param storageLevel Storage level to use for storing the received objects
*/
def twitterStream(filters: Array[String], storageLevel: StorageLevel): JavaDStream[Status] = {
- ssc.twitterStream(None, filters, storageLevel)
+ javaStreamingContext.ssc.twitterStream(None, filters, storageLevel)
}
/**
@@ -68,7 +68,7 @@ class JavaStreamingContextWithTwitter(javaStreamingContext: JavaStreamingContext
* @param twitterAuth Twitter4J Authorization
*/
def twitterStream(twitterAuth: Authorization): JavaDStream[Status] = {
- ssc.twitterStream(Some(twitterAuth))
+ javaStreamingContext.ssc.twitterStream(Some(twitterAuth))
}
/**
@@ -80,7 +80,7 @@ class JavaStreamingContextWithTwitter(javaStreamingContext: JavaStreamingContext
twitterAuth: Authorization,
filters: Array[String]
): JavaDStream[Status] = {
- ssc.twitterStream(Some(twitterAuth), filters)
+ javaStreamingContext.ssc.twitterStream(Some(twitterAuth), filters)
}
/**
@@ -94,6 +94,6 @@ class JavaStreamingContextWithTwitter(javaStreamingContext: JavaStreamingContext
filters: Array[String],
storageLevel: StorageLevel
): JavaDStream[Status] = {
- ssc.twitterStream(Some(twitterAuth), filters, storageLevel)
+ javaStreamingContext.ssc.twitterStream(Some(twitterAuth), filters, storageLevel)
}
}
diff --git a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
index 34e4fbdd85..4564d6cd33 100644
--- a/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
+++ b/external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java
@@ -18,6 +18,8 @@
package org.apache.spark.streaming.twitter;
import java.util.Arrays;
+
+import org.apache.spark.streaming.api.java.twitter.TwitterFunctions;
import org.junit.Test;
import twitter4j.Status;
@@ -31,21 +33,18 @@ import org.apache.spark.streaming.api.java.JavaDStream;
public class JavaTwitterStreamSuite extends LocalJavaStreamingContext {
@Test
public void testTwitterStream() {
- JavaStreamingContextWithTwitter sscWithTwitter = new JavaStreamingContextWithTwitter(ssc);
+ TwitterFunctions twitterFunc = new TwitterFunctions(ssc);
String[] filters = (String[])Arrays.<String>asList("filter1", "filter2").toArray();
Authorization auth = NullAuthorization.getInstance();
// tests the API, does not actually test data receiving
- JavaDStream<Status> test1 = sscWithTwitter.twitterStream();
- JavaDStream<Status> test2 = sscWithTwitter.twitterStream(filters);
+ JavaDStream<Status> test1 = twitterFunc.twitterStream();
+ JavaDStream<Status> test2 = twitterFunc.twitterStream(filters);
JavaDStream<Status> test3 =
- sscWithTwitter.twitterStream(filters, StorageLevel.MEMORY_AND_DISK_SER_2());
- JavaDStream<Status> test4 = sscWithTwitter.twitterStream(auth);
- JavaDStream<Status> test5 = sscWithTwitter.twitterStream(auth, filters);
+ twitterFunc.twitterStream(filters, StorageLevel.MEMORY_AND_DISK_SER_2());
+ JavaDStream<Status> test4 = twitterFunc.twitterStream(auth);
+ JavaDStream<Status> test5 = twitterFunc.twitterStream(auth, filters);
JavaDStream<Status> test6 =
- sscWithTwitter.twitterStream(auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2());
-
- // To verify that JavaStreamingContextWithKafka is also StreamingContext
- JavaDStream<String> socketStream = sscWithTwitter.socketTextStream("localhost", 9999);
+ twitterFunc.twitterStream(auth, filters, StorageLevel.MEMORY_AND_DISK_SER_2());
}
}
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala
index dc5d1f05be..a9bbce71f5 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.zeromq
+package org.apache.spark.streaming.api.java.zeromq
import scala.reflect.ClassTag
import scala.collection.JavaConversions._
@@ -27,13 +27,13 @@ import akka.zeromq.Subscribe
import org.apache.spark.storage.StorageLevel
import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
+import org.apache.spark.streaming.zeromq._
/**
* Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
* functions for creating ZeroMQ input streams.
*/
-class JavaStreamingContextWithZeroMQ(javaStreamingContext: JavaStreamingContext)
- extends JavaStreamingContext(javaStreamingContext.ssc) {
+class ZeroMQFunctions(javaStreamingContext: JavaStreamingContext) {
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
@@ -55,7 +55,7 @@ class JavaStreamingContextWithZeroMQ(javaStreamingContext: JavaStreamingContext)
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
- ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel, supervisorStrategy)
+ javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel, supervisorStrategy)
}
/**
@@ -77,7 +77,7 @@ class JavaStreamingContextWithZeroMQ(javaStreamingContext: JavaStreamingContext)
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
- ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel)
+ javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel)
}
/**
@@ -97,6 +97,6 @@ class JavaStreamingContextWithZeroMQ(javaStreamingContext: JavaStreamingContext)
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
- ssc.zeroMQStream[T](publisherUrl, subscribe, fn)
+ javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn)
}
}
diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
index 96af7d737d..b020ae4cef 100644
--- a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
+++ b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
@@ -17,6 +17,7 @@
package org.apache.spark.streaming.zeromq;
+import org.apache.spark.streaming.api.java.zeromq.ZeroMQFunctions;
import org.junit.Test;
import akka.actor.SupervisorStrategy;
@@ -32,7 +33,7 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
@Test // tests the API, does not actually test data receiving
public void testZeroMQStream() {
- JavaStreamingContextWithZeroMQ sscWithZeroMQ = new JavaStreamingContextWithZeroMQ(ssc);
+ ZeroMQFunctions zeromqFunc = new ZeroMQFunctions(ssc);
String publishUrl = "abc";
Subscribe subscribe = new Subscribe((ByteString)null);
Function<byte[][], Iterable<String>> bytesToObjects = new Function<byte[][], Iterable<String>>() {
@@ -42,14 +43,11 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
}
};
- JavaDStream<String> test1 = sscWithZeroMQ.<String>zeroMQStream(
+ JavaDStream<String> test1 = zeromqFunc.<String>zeroMQStream(
publishUrl, subscribe, bytesToObjects);
- JavaDStream<String> test2 = sscWithZeroMQ.<String>zeroMQStream(
+ JavaDStream<String> test2 = zeromqFunc.<String>zeroMQStream(
publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2());
- JavaDStream<String> test3 = sscWithZeroMQ.<String>zeroMQStream(
+ JavaDStream<String> test3 = zeromqFunc.<String>zeroMQStream(
publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), SupervisorStrategy.defaultStrategy());
-
- // To verify that JavaStreamingContextWithKafka is also StreamingContext
- JavaDStream<String> socketStream = sscWithZeroMQ.socketTextStream("localhost", 9999);
}
}