diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-07 01:56:15 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2014-01-07 01:56:15 -0800 |
commit | aa99f226a691ddcb4442d60f4cd4908f434cc4ce (patch) | |
tree | 33a1614e3d5ee7a050776e3601ba8c7430b573f8 /external/zeromq | |
parent | 3b4c4c7f4d0d6e45a1acb0baf0d9416a8997b686 (diff) | |
download | spark-aa99f226a691ddcb4442d60f4cd4908f434cc4ce.tar.gz spark-aa99f226a691ddcb4442d60f4cd4908f434cc4ce.tar.bz2 spark-aa99f226a691ddcb4442d60f4cd4908f434cc4ce.zip |
Removed XYZFunctions and added XYZUtils as a common Scala and Java interface for creating XYZ streams.
Diffstat (limited to 'external/zeromq')
-rw-r--r-- | external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala | 57 | ||||
-rw-r--r-- | external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala (renamed from external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala) | 80 | ||||
-rw-r--r-- | external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala | 24 | ||||
-rw-r--r-- | external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java | 17 | ||||
-rw-r--r-- | external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala | 8 |
5 files changed, 63 insertions, 123 deletions
diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala deleted file mode 100644 index f4c75ab7c9..0000000000 --- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQFunctions.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming.zeromq - -import scala.reflect.ClassTag - -import akka.actor.{Props, SupervisorStrategy} -import akka.util.ByteString -import akka.zeromq.Subscribe - -import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming._ -import org.apache.spark.streaming.receivers._ - -/** - * Extra ZeroMQ input stream functions available on [[org.apache.spark.streaming.StreamingContext]] - * through implicit conversions. Import org.apache.spark.streaming.zeromq._ to use these functions. - */ -class ZeroMQFunctions(ssc: StreamingContext) { - /** - * Create an input stream that receives messages pushed by a zeromq publisher. - * @param publisherUrl Url of remote zeromq publisher - * @param subscribe topic to subscribe to - * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic - * and each frame has sequence of byte thus it needs the converter - * (which might be deserializer of bytes) to translate from sequence - * of sequence of bytes, where sequence refer to a frame - * and sub sequence refer to its payload. - * @param storageLevel RDD storage level. Defaults to memory-only. - */ - def zeroMQStream[T: ClassTag]( - publisherUrl: String, - subscribe: Subscribe, - bytesToObjects: Seq[ByteString] ⇒ Iterator[T], - storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, - supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy - ): DStream[T] = { - ssc.actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)), - "ZeroMQReceiver", storageLevel, supervisorStrategy) - } -} - diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala index a9bbce71f5..546d9df3b5 100644 --- a/external/zeromq/src/main/scala/org/apache/spark/streaming/api/java/zeromq/ZeroMQFunctions.scala +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala @@ -15,37 +15,57 @@ * limitations under the License. */ -package org.apache.spark.streaming.api.java.zeromq +package org.apache.spark.streaming.zeromq import scala.reflect.ClassTag import scala.collection.JavaConversions._ - -import akka.actor.SupervisorStrategy +import akka.actor.{Props, SupervisorStrategy} import akka.util.ByteString import akka.zeromq.Subscribe - -import org.apache.spark.storage.StorageLevel import org.apache.spark.api.java.function.{Function => JFunction} -import org.apache.spark.streaming.api.java.{JavaDStream, JavaStreamingContext} -import org.apache.spark.streaming.zeromq._ +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.receivers.ReceiverSupervisorStrategy +import org.apache.spark.streaming.{StreamingContext, DStream} +import org.apache.spark.streaming.api.java.{JavaStreamingContext, JavaDStream} -/** - * Subclass of [[org.apache.spark.streaming.api.java.JavaStreamingContext]] that has extra - * functions for creating ZeroMQ input streams. - */ -class ZeroMQFunctions(javaStreamingContext: JavaStreamingContext) { +object ZeroMQUtils { + /** + * Create an input stream that receives messages pushed by a zeromq publisher. + * @param ssc StreamingContext object + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe Topic to subscribe to + * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic + * and each frame has sequence of byte thus it needs the converter + * (which might be deserializer of bytes) to translate from sequence + * of sequence of bytes, where sequence refer to a frame + * and sub sequence refer to its payload. + * @param storageLevel RDD storage level. Defaults to StorageLevel.MEMORY_AND_DISK_SER_2. + */ + def createStream[T: ClassTag]( + ssc: StreamingContext, + publisherUrl: String, + subscribe: Subscribe, + bytesToObjects: Seq[ByteString] ⇒ Iterator[T], + storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2, + supervisorStrategy: SupervisorStrategy = ReceiverSupervisorStrategy.defaultStrategy + ): DStream[T] = { + ssc.actorStream(Props(new ZeroMQReceiver(publisherUrl, subscribe, bytesToObjects)), + "ZeroMQReceiver", storageLevel, supervisorStrategy) + } /** * Create an input stream that receives messages pushed by a zeromq publisher. - * @param publisherUrl Url of remote ZeroMQ publisher - * @param subscribe topic to subscribe to + * @param jssc JavaStreamingContext object + * @param publisherUrl Url of remote ZeroMQ publisher + * @param subscribe Topic to subscribe to * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence * of byte thus it needs the converter(which might be deserializer of bytes) * to translate from sequence of sequence of bytes, where sequence refer to a frame * and sub sequence refer to its payload. * @param storageLevel Storage level to use for storing the received objects */ - def zeroMQStream[T]( + def createStream[T]( + jssc: JavaStreamingContext, publisherUrl: String, subscribe: Subscribe, bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], @@ -54,21 +74,23 @@ class ZeroMQFunctions(javaStreamingContext: JavaStreamingContext) { ): JavaDStream[T] = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator - javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel, supervisorStrategy) + val fn = (x: Seq[ByteString]) => bytesToObjects.apply(x.map(_.toArray).toArray).toIterator + createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel, supervisorStrategy) } /** * Create an input stream that receives messages pushed by a zeromq publisher. - * @param publisherUrl Url of remote zeromq publisher - * @param subscribe topic to subscribe to + * @param jssc JavaStreamingContext object + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe Topic to subscribe to * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence * of byte thus it needs the converter(which might be deserializer of bytes) * to translate from sequence of sequence of bytes, where sequence refer to a frame * and sub sequence refer to its payload. - * @param storageLevel RDD storage level. + * @param storageLevel RDD storage level. */ - def zeroMQStream[T]( + def createStream[T]( + jssc: JavaStreamingContext, publisherUrl: String, subscribe: Subscribe, bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]], @@ -76,27 +98,29 @@ class ZeroMQFunctions(javaStreamingContext: JavaStreamingContext) { ): JavaDStream[T] = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator - javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel) + val fn = (x: Seq[ByteString]) => bytesToObjects.apply(x.map(_.toArray).toArray).toIterator + createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel) } /** * Create an input stream that receives messages pushed by a zeromq publisher. - * @param publisherUrl Url of remote zeromq publisher - * @param subscribe topic to subscribe to + * @param jssc JavaStreamingContext object + * @param publisherUrl Url of remote zeromq publisher + * @param subscribe Topic to subscribe to * @param bytesToObjects A zeroMQ stream publishes sequence of frames for each topic and each frame has sequence * of byte thus it needs the converter(which might be deserializer of bytes) * to translate from sequence of sequence of bytes, where sequence refer to a frame * and sub sequence refer to its payload. */ - def zeroMQStream[T]( + def createStream[T]( + jssc: JavaStreamingContext, publisherUrl: String, subscribe: Subscribe, bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]] ): JavaDStream[T] = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - def fn(x: Seq[ByteString]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator - javaStreamingContext.ssc.zeroMQStream[T](publisherUrl, subscribe, fn) + val fn = (x: Seq[ByteString]) => bytesToObjects.apply(x.map(_.toArray).toArray).toIterator + createStream[T](jssc.ssc, publisherUrl, subscribe, fn) } } diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala deleted file mode 100644 index dc27178149..0000000000 --- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/package.scala +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.streaming - -package object zeromq { - implicit def sscToZeroMQFunctions(ssc: StreamingContext) = new ZeroMQFunctions(ssc) -} - - diff --git a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java index b020ae4cef..d2361e14b8 100644 --- a/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java +++ b/external/zeromq/src/test/java/org/apache/spark/streaming/zeromq/JavaZeroMQStreamSuite.java @@ -17,13 +17,10 @@ package org.apache.spark.streaming.zeromq; -import org.apache.spark.streaming.api.java.zeromq.ZeroMQFunctions; import org.junit.Test; - import akka.actor.SupervisorStrategy; import akka.util.ByteString; import akka.zeromq.Subscribe; - import org.apache.spark.api.java.function.Function; import org.apache.spark.storage.StorageLevel; import org.apache.spark.streaming.LocalJavaStreamingContext; @@ -33,7 +30,6 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext { @Test // tests the API, does not actually test data receiving public void testZeroMQStream() { - ZeroMQFunctions zeromqFunc = new ZeroMQFunctions(ssc); String publishUrl = "abc"; Subscribe subscribe = new Subscribe((ByteString)null); Function<byte[][], Iterable<String>> bytesToObjects = new Function<byte[][], Iterable<String>>() { @@ -43,11 +39,12 @@ public class JavaZeroMQStreamSuite extends LocalJavaStreamingContext { } }; - JavaDStream<String> test1 = zeromqFunc.<String>zeroMQStream( - publishUrl, subscribe, bytesToObjects); - JavaDStream<String> test2 = zeromqFunc.<String>zeroMQStream( - publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2()); - JavaDStream<String> test3 = zeromqFunc.<String>zeroMQStream( - publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), SupervisorStrategy.defaultStrategy()); + JavaDStream<String> test1 = ZeroMQUtils.<String>createStream( + ssc, publishUrl, subscribe, bytesToObjects); + JavaDStream<String> test2 = ZeroMQUtils.<String>createStream( + ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2()); + JavaDStream<String> test3 = ZeroMQUtils.<String>createStream( + ssc,publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2(), + SupervisorStrategy.defaultStrategy()); } } diff --git a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala index 5adcdb821f..4193b8a02f 100644 --- a/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala +++ b/external/zeromq/src/test/scala/org/apache/spark/streaming/zeromq/ZeroMQStreamSuite.scala @@ -33,10 +33,10 @@ class ZeroMQStreamSuite extends TestSuiteBase { val bytesToObjects = (bytes: Seq[ByteString]) => null.asInstanceOf[Iterator[String]] // tests the API, does not actually test data receiving - val test1 = ssc.zeroMQStream(publishUrl, subscribe, bytesToObjects) - val test2 = ssc.zeroMQStream( - publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2) - val test3 = ssc.zeroMQStream(publishUrl, subscribe, bytesToObjects, + val test1 = ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects) + val test2 = ZeroMQUtils.createStream( + ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2) + val test3 = ZeroMQUtils.createStream(ssc, publishUrl, subscribe, bytesToObjects, StorageLevel.MEMORY_AND_DISK_SER_2, SupervisorStrategy.defaultStrategy) // TODO: Actually test data receiving |