aboutsummaryrefslogtreecommitdiff
path: root/external/zeromq
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-06 01:47:53 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-06 01:47:53 -0800
commitd0fd3b9ad238294346eb3465c489eabd41fb2380 (patch)
treed7f7f17fc842589f80c3ef16669741e3499d9692 /external/zeromq
parent977bcc36d4440ff562d5dbcc12449bf383d0d9e2 (diff)
downloadspark-d0fd3b9ad238294346eb3465c489eabd41fb2380.tar.gz
spark-d0fd3b9ad238294346eb3465c489eabd41fb2380.tar.bz2
spark-d0fd3b9ad238294346eb3465c489eabd41fb2380.zip
Changed JavaStreamingContextWith*** to ***Function in streaming.api.java.*** package. Also fixed packages of Flume and MQTT tests.
Diffstat (limited to 'external/zeromq')
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala (renamed from external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala)12
-rw-r--r--external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java12
2 files changed, 11 insertions, 13 deletions
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala
index dc5d1f05be..a9bbce71f5 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/JavaStreamingContextWithZeroMQ.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.zeromq
+package org.apache.spark.streaming.api.java.zeromq
import scala.reflect.ClassTag
import scala.collection.JavaConversions._
@@ -27,13 +27,13 @@ import akka.zeromq.Subscribe
import org.apache.spark.storage.StorageLevel
import org.apache.spark.api.java.function.{Function => JFunction}
import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
+import org.apache.spark.streaming.zeromq._
/**
* Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
* functions for creating ZeroMQ input streams.
*/
-class JavaStreamingContextWithZeroMQ(javaStreamingContext: JavaStreamingContext)
- extends JavaStreamingContext(javaStreamingContext.ssc) {
+class ZeroMQFunctions(javaStreamingContext: JavaStreamingContext) {
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
@@ -55,7 +55,7 @@ class JavaStreamingContextWithZeroMQ(javaStreamingContext: JavaStreamingContext)
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
- ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel, supervisorStrategy)
+ javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel, supervisorStrategy)
}
/**
@@ -77,7 +77,7 @@ class JavaStreamingContextWithZeroMQ(javaStreamingContext: JavaStreamingContext)
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
- ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel)
+ javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel)
}
/**
@@ -97,6 +97,6 @@ class JavaStreamingContextWithZeroMQ(javaStreamingContext: JavaStreamingContext)
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
- ssc.zeroMQStream[T](publisherUrl, subscribe, fn)
+ javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn)
}
}
diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
index 96af7d737d..b020ae4cef 100644
--- a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
+++ b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
@@ -17,6 +17,7 @@
package org.apache.spark.streaming.zeromq;
+import org.apache.spark.streaming.api.java.zeromq.ZeroMQFunctions;
import org.junit.Test;
import akka.actor.SupervisorStrategy;
@@ -32,7 +33,7 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
@Test // tests the API, does not actually test data receiving
public void testZeroMQStream() {
- JavaStreamingContextWithZeroMQ sscWithZeroMQ = new JavaStreamingContextWithZeroMQ(ssc);
+ ZeroMQFunctions zeromqFunc = new ZeroMQFunctions(ssc);
String publishUrl = "abc";
Subscribe subscribe = new Subscribe((ByteString)null);
Function<byte[][], Iterable<String>> bytesToObjects = new Function<byte[][], Iterable<String>>() {
@@ -42,14 +43,11 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
}
};
- JavaDStream<String> test1 = sscWithZeroMQ.<String>zeroMQStream(
+ JavaDStream<String> test1 = zeromqFunc.<String>zeroMQStream(
publishUrl, subscribe, bytesToObjects);
- JavaDStream<String> test2 = sscWithZeroMQ.<String>zeroMQStream(
+ JavaDStream<String> test2 = zeromqFunc.<String>zeroMQStream(
publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2());
- JavaDStream<String> test3 = sscWithZeroMQ.<String>zeroMQStream(
+ JavaDStream<String> test3 = zeromqFunc.<String>zeroMQStream(
publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), SupervisorStrategy.defaultStrategy());
-
- // To verify that JavaStreamingContextWithKafka is also StreamingContext
- JavaDStream<String> socketStream = sscWithZeroMQ.socketTextStream("localhost", 9999);
}
}