aboutsummaryrefslogtreecommitdiff
path: root/external/zeromq
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-07 01:56:15 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-07 01:56:15 -0800
commitaa99f226a691ddcb4442d60f4cd4908f434cc4ce (patch)
tree33a1614e3d5ee7a050776e3601ba8c7430b573f8 /external/zeromq
parent3b4c4c7f4d0d6e45a1acb0baf0d9416a8997b686 (diff)
downloadspark-aa99f226a691ddcb4442d60f4cd4908f434cc4ce.tar.gz
spark-aa99f226a691ddcb4442d60f4cd4908f434cc4ce.tar.bz2
spark-aa99f226a691ddcb4442d60f4cd4908f434cc4ce.zip
Removed XYZFunctions and added XYZUtils as a common Scala and Java interface for creating XYZ streams.
Diffstat (limited to 'external/zeromq')
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala57
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala (renamed from external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala)80
-rw-r--r--external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala24
-rw-r--r--external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java17
-rw-r--r--external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala8
5 files changed, 63 insertions, 123 deletions
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala
deleted file mode 100644
index f4c75ab7c9..0000000000
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming.zeromq
-
-import scala.reflect.ClassTag
-
-import akka.actor.{Props, SupervisorStrategy}
-import akka.util.ByteString
-import akka.zeromq.Subscribe
-
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming._
-import org.apache.spark.streaming.receivers._
-
-/**
- * Extra ZeroMQ input stream functions available on [[org.apache.spark.streaming.StreamingContext]]
- * through implicit conversions. Import org.apache.spark.streaming.zeromq._ to use these functions.
- */
-class ZeroMQFunctions(ssc: StreamingContext) {
- /**
- * Create an input stream that receives messages pushed by a zeromq publisher.
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe topic to subscribe to
- * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic
- * and each frame has sequence of byte thus it needs the converter
- * (which might be deserializer of bytes) to translate from sequence
- * of sequence of bytes, where sequence refer to a frame
- * and sub sequence refer to its payload.
- * @param storageLevel RDD storage level. Defaults to memory-only.
- */
- def zeroMQStream[T: ClassTag](
- publisherUrl: String,
- subscribe: Subscribe,
- bytesToObjects: Seq[ByteString] ⇒ Iterator[T],
- storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
- supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
- ): DStream[T] = {
- ssc.actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
- "ZeroMQReceiver", storageLevel, supervisorStrategy)
- }
-}
-
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
index a9bbce71f5..546d9df3b5 100644
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala
+++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala
@@ -15,37 +15,57 @@
* limitations under the License.
*/
-package org.apache.spark.streaming.api.java.zeromq
+package org.apache.spark.streaming.zeromq
import scala.reflect.ClassTag
import scala.collection.JavaConversions._
-
-import akka.actor.SupervisorStrategy
+import akka.actor.{Props, SupervisorStrategy}
import akka.util.ByteString
import akka.zeromq.Subscribe
-
-import org.apache.spark.storage.StorageLevel
import org.apache.spark.api.java.function.{Function => JFunction}
-import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext}
-import org.apache.spark.streaming.zeromq._
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.receivers.ReceiverSupervisorStrategy
+import org.apache.spark.streaming.{StreamingContext, DStream}
+import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream}
-/**
- * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra
- * functions for creating ZeroMQ input streams.
- */
-class ZeroMQFunctions(javaStreamingContext: JavaStreamingContext) {
+object ZeroMQUtils {
+ /**
+ * Create an input stream that receives messages pushed by a zeromq publisher.
+ * @param ssc StreamingContext object
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe Topic to subscribe to
+ * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic
+ * and each frame has sequence of byte thus it needs the converter
+ * (which might be deserializer of bytes) to translate from sequence
+ * of sequence of bytes, where sequence refer to a frame
+ * and sub sequence refer to its payload.
+ * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2.
+ */
+ def createStream[T: ClassTag](
+ ssc: StreamingContext,
+ publisherUrl: String,
+ subscribe: Subscribe,
+ bytesToObjects: Seq[ByteString] ⇒ Iterator[T],
+ storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2,
+ supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy
+ ): DStream[T] = {
+ ssc.actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)),
+ "ZeroMQReceiver", storageLevel, supervisorStrategy)
+ }
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
- * @param publisherUrl Url of remote ZeroMQ publisher
- * @param subscribe topic to subscribe to
+ * @param jssc JavaStreamingContext object
+ * @param publisherUrl Url of remote ZeroMQ publisher
+ * @param subscribe Topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
* of byte thus it needs the converter(which might be deserializer of bytes)
* to translate from sequence of sequence of bytes, where sequence refer to a frame
* and sub sequence refer to its payload.
* @param storageLevel Storage level to use for storing the received objects
*/
- def zeroMQStream[T](
+ def createStream[T](
+ jssc: JavaStreamingContext,
publisherUrl: String,
subscribe: Subscribe,
bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
@@ -54,21 +74,23 @@ class ZeroMQFunctions(javaStreamingContext: JavaStreamingContext) {
): JavaDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
- javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel, supervisorStrategy)
+ val fn = (x: Seq[ByteString]) => bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel, supervisorStrategy)
}
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe topic to subscribe to
+ * @param jssc JavaStreamingContext object
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe Topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
* of byte thus it needs the converter(which might be deserializer of bytes)
* to translate from sequence of sequence of bytes, where sequence refer to a frame
* and sub sequence refer to its payload.
- * @param storageLevel RDD storage level.
+ * @param storageLevel RDD storage level.
*/
- def zeroMQStream[T](
+ def createStream[T](
+ jssc: JavaStreamingContext,
publisherUrl: String,
subscribe: Subscribe,
bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
@@ -76,27 +98,29 @@ class ZeroMQFunctions(javaStreamingContext: JavaStreamingContext) {
): JavaDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
- javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel)
+ val fn = (x: Seq[ByteString]) => bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel)
}
/**
* Create an input stream that receives messages pushed by a zeromq publisher.
- * @param publisherUrl Url of remote zeromq publisher
- * @param subscribe topic to subscribe to
+ * @param jssc JavaStreamingContext object
+ * @param publisherUrl Url of remote zeromq publisher
+ * @param subscribe Topic to subscribe to
* @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence
* of byte thus it needs the converter(which might be deserializer of bytes)
* to translate from sequence of sequence of bytes, where sequence refer to a frame
* and sub sequence refer to its payload.
*/
- def zeroMQStream[T](
+ def createStream[T](
+ jssc: JavaStreamingContext,
publisherUrl: String,
subscribe: Subscribe,
bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]]
): JavaDStream[T] = {
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
- def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
- javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn)
+ val fn = (x: Seq[ByteString]) => bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
+ createStream[T](jssc.ssc, publisherUrl, subscribe, fn)
}
}
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala
deleted file mode 100644
index dc27178149..0000000000
--- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.streaming
-
-package object zeromq {
- implicit def sscToZeroMQFunctions(ssc: StreamingContext) = new ZeroMQFunctions(ssc)
-}
-
-
diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
index b020ae4cef..d2361e14b8 100644
--- a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
+++ b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java
@@ -17,13 +17,10 @@
package org.apache.spark.streaming.zeromq;
-import org.apache.spark.streaming.api.java.zeromq.ZeroMQFunctions;
import org.junit.Test;
-
import akka.actor.SupervisorStrategy;
import akka.util.ByteString;
import akka.zeromq.Subscribe;
-
import org.apache.spark.api.java.function.Function;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.LocalJavaStreamingContext;
@@ -33,7 +30,6 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
@Test // tests the API, does not actually test data receiving
public void testZeroMQStream() {
- ZeroMQFunctions zeromqFunc = new ZeroMQFunctions(ssc);
String publishUrl = "abc";
Subscribe subscribe = new Subscribe((ByteString)null);
Function<byte[][], Iterable<String>> bytesToObjects = new Function<byte[][], Iterable<String>>() {
@@ -43,11 +39,12 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext {
}
};
- JavaDStream<String> test1 = zeromqFunc.<String>zeroMQStream(
- publishUrl, subscribe, bytesToObjects);
- JavaDStream<String> test2 = zeromqFunc.<String>zeroMQStream(
- publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2());
- JavaDStream<String> test3 = zeromqFunc.<String>zeroMQStream(
- publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), SupervisorStrategy.defaultStrategy());
+ JavaDStream<String> test1 = ZeroMQUtils.<String>createStream(
+ ssc, publishUrl, subscribe, bytesToObjects);
+ JavaDStream<String> test2 = ZeroMQUtils.<String>createStream(
+ ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2());
+ JavaDStream<String> test3 = ZeroMQUtils.<String>createStream(
+ ssc,publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(),
+ SupervisorStrategy.defaultStrategy());
}
}
diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
index 5adcdb821f..4193b8a02f 100644
--- a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
+++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala
@@ -33,10 +33,10 @@ class ZeroMQStreamSuite extends TestSuiteBase {
val bytesToObjects = (bytes: Seq[ByteString]) => null.asInstanceOf[Iterator[String]]
// tests the API, does not actually test data receiving
- val test1 = ssc.zeroMQStream(publishUrl, subscribe, bytesToObjects)
- val test2 = ssc.zeroMQStream(
- publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2)
- val test3 = ssc.zeroMQStream(publishUrl, subscribe, bytesToObjects,
+ val test1 = ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects)
+ val test2 = ZeroMQUtils.createStream(
+ ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2)
+ val test3 = ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects,
StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy)
// TODO: Actually test data receiving